Skip to content
Snippets Groups Projects
Commit 85cebb73 authored by AlexStocks's avatar AlexStocks
Browse files

Merge branch 'develop'

parents 44a71ec3 c0663b8b
No related branches found
No related tags found
No related merge requests found
Showing
with 100 additions and 81 deletions
......@@ -22,7 +22,8 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// Directory: Extension - Directory
// Directory
//Extension - Directory
type Directory interface {
common.Node
List(invocation protocol.Invocation) []protocol.Invoker
......
......@@ -21,7 +21,8 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// LoadBalance: Extension - LoadBalance
// LoadBalance
//Extension - LoadBalance
type LoadBalance interface {
Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker
}
......@@ -37,7 +37,7 @@ import (
)
const (
//ROUTE_PATTERN: route pattern regex
//ROUTE_PATTERN route pattern regex
ROUTE_PATTERN = `([&!=,]*)\\s*([^&!=,\\s]+)`
// FORCE ...
FORCE = "force"
......@@ -107,7 +107,8 @@ func newConditionRouter(url *common.URL) (*ConditionRouter, error) {
}, nil
}
//Route: Router determine the target server list.
// Route
// Router determine the target server list.
func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 {
return invokers
......
......@@ -93,12 +93,12 @@ type rest struct {
var count int
func (bi *MockInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result {
count++
var (
success bool
err error = nil
err error
)
if count >= bi.successCount {
success = true
......
......@@ -27,7 +27,7 @@ import (
"github.com/apache/dubbo-go/config_center"
)
// Environment:
// Environment
// There is dubbo.properties file and application level config center configuration which higner than normal config center in java. So in java the
// configuration sequence will be config center > application level config center > dubbo.properties > spring bean configuration.
// But in go, neither the dubbo.properties file or application level config center configuration will not support for the time being.
......@@ -83,11 +83,11 @@ func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string
// Configuration ...
func (env *Environment) Configuration() *list.List {
list := list.New()
cfgList := list.New()
// The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
list.PushFront(newInmemoryConfiguration(&(env.externalConfigMap)))
list.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap)))
return list
cfgList.PushFront(newInmemoryConfiguration(&(env.externalConfigMap)))
cfgList.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap)))
return cfgList
}
// SetDynamicConfiguration ...
......
......@@ -18,10 +18,10 @@
package constant
const (
// Version: apache/dubbo-go version
// Version apache/dubbo-go version
Version = "1.3.0"
// Name: module name
// Name module name
Name = "dubbogo"
// Date: release date
// Date release date
DATE = "2020/01/12"
)
......@@ -26,7 +26,8 @@ var (
)
/**
* AddCustomShutdownCallback: you should not make any assumption about the order.
* AddCustomShutdownCallback
* you should not make any assumption about the order.
* For example, if you have more than one callbacks, and you wish the order is:
* callback1()
* callback2()
......
......@@ -40,7 +40,9 @@ type Proxy struct {
once sync.Once
}
var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
var (
typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
)
// NewProxy ...
func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy {
......@@ -51,7 +53,8 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str
}
}
// Implement: proxy implement
// Implement
// proxy implement
// In consumer, RPCService like:
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
......
......@@ -34,7 +34,8 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
// RPCService: rpc service interface
// RPCService
//rpc service interface
type RPCService interface {
// Reference:
// rpc service id or reference id
......@@ -67,8 +68,8 @@ var (
// because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// todo: lowerecas?
// ServiceMap ...
// todo: lowerecas?
ServiceMap = &serviceMap{
serviceMap: make(map[string]map[string]*Service),
}
......@@ -232,8 +233,8 @@ func (sm *serviceMap) UnRegister(protocol, serviceId string) error {
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
s, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(s)
}
// Is this type exported or a builtin?
......
......@@ -69,7 +69,7 @@ func checkApplicationName(config *ApplicationConfig) {
}
}
// Load: Dubbo Init
// Load Dubbo Init
func Load() {
// reference config
if consumerConfig == nil {
......@@ -153,12 +153,12 @@ func Load() {
GracefulShutdownInit()
}
// GetRPCService: get rpc service for consumer
// GetRPCService get rpc service for consumer
func GetRPCService(name string) common.RPCService {
return consumerConfig.References[name].GetRPCService()
}
// RPCService: create rpc service for consumer
// RPCService create rpc service for consumer
func RPCService(service common.RPCService) {
consumerConfig.References[service.Reference()].Implement(service)
}
......@@ -70,7 +70,7 @@ func (c *ReferenceConfig) Prefix() string {
return constant.ReferenceConfigPrefix + c.InterfaceName + "."
}
// NewReferenceConfig: The only way to get a new ReferenceConfig
// NewReferenceConfig The only way to get a new ReferenceConfig
func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{id: id, context: ctx}
}
......@@ -93,8 +93,9 @@ func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
}
// Refer ...
func (c *ReferenceConfig) Refer(impl interface{}) {
url := common.NewURLWithOptions(common.WithPath(c.id),
func (c *ReferenceConfig) Refer(_ interface{}) {
cfgURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
......@@ -109,14 +110,14 @@ func (c *ReferenceConfig) Refer(impl interface{}) {
panic(fmt.Sprintf("user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
}
if serviceUrl.Protocol == constant.REGISTRY_PROTOCOL {
serviceUrl.SubURL = url
serviceUrl.SubURL = cfgURL
c.urls = append(c.urls, &serviceUrl)
} else {
if serviceUrl.Path == "" {
serviceUrl.Path = "/" + c.id
}
// merge url need to do
newUrl := common.MergeUrl(&serviceUrl, url)
newUrl := common.MergeUrl(&serviceUrl, cfgURL)
c.urls = append(c.urls, newUrl)
}
......@@ -127,7 +128,7 @@ func (c *ReferenceConfig) Refer(impl interface{}) {
//set url to regUrls
for _, regUrl := range c.urls {
regUrl.SubURL = url
regUrl.SubURL = cfgURL
}
}
if len(c.urls) == 1 {
......@@ -153,12 +154,13 @@ func (c *ReferenceConfig) Refer(impl interface{}) {
//create proxy
if c.Async {
callback := GetCallback(c.id)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, url)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
} else {
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, url)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
}
}
// Implement
// @v is service provider implemented RPCService
func (c *ReferenceConfig) Implement(v common.RPCService) {
c.pxy.Implement(v)
......
......@@ -92,7 +92,7 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}
// NewServiceConfig: The only way to get a new ServiceConfig
// NewServiceConfig The only way to get a new ServiceConfig
func NewServiceConfig(id string, context context.Context) *ServiceConfig {
return &ServiceConfig{
......@@ -134,7 +134,8 @@ func (c *ServiceConfig) Export() error {
logger.Errorf(err.Error())
return err
}
url := common.NewURLWithOptions(common.WithPath(c.id),
ivkURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(proto.Port),
......@@ -146,11 +147,11 @@ func (c *ServiceConfig) Export() error {
if len(regUrls) > 0 {
for _, regUrl := range regUrls {
regUrl.SubURL = url
regUrl.SubURL = ivkURL
c.cacheMutex.Lock()
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", url))
logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()
......@@ -158,14 +159,14 @@ func (c *ServiceConfig) Export() error {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter := c.cacheProtocol.Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, url)))
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL)))
}
}
} else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*url)
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", url)))
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL)))
}
}
......
......@@ -29,6 +29,14 @@ type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{}
}
// NewApolloListener ...
func NewApolloListener() *apolloListener {
return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
}
}
// OnChange ...
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners {
......@@ -41,22 +49,15 @@ func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
}
}
// NewApolloListener ...
func NewApolloListener() *apolloListener {
return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
}
}
// AddListener ...
func (al *apolloListener) AddListener(l config_center.ConfigurationListener) {
if _, ok := al.listeners[l]; !ok {
al.listeners[l] = struct{}{}
agollo.AddChangeListener(al)
func (a *apolloListener) AddListener(l config_center.ConfigurationListener) {
if _, ok := a.listeners[l]; !ok {
a.listeners[l] = struct{}{}
agollo.AddChangeListener(a)
}
}
// RemoveListener ...
func (al *apolloListener) RemoveListener(l config_center.ConfigurationListener) {
delete(al.listeners, l)
func (a *apolloListener) RemoveListener(l config_center.ConfigurationListener) {
delete(a.listeners, l)
}
......@@ -43,7 +43,7 @@ var (
)
// GetDynamicConfiguration ...
func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) {
func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(_ *common.URL) (DynamicConfiguration, error) {
var err error
once.Do(func() {
dynamicConfiguration = &MockDynamicConfiguration{listener: map[string]ConfigurationListener{}}
......@@ -89,21 +89,21 @@ type MockDynamicConfiguration struct {
}
// AddListener ...
func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, opions ...Option) {
func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, _ ...Option) {
c.listener[key] = listener
}
// RemoveListener ...
func (c *MockDynamicConfiguration) RemoveListener(key string, listener ConfigurationListener, opions ...Option) {
func (c *MockDynamicConfiguration) RemoveListener(_ string, _ ConfigurationListener, _ ...Option) {
}
// GetConfig ...
func (c *MockDynamicConfiguration) GetConfig(key string, opts ...Option) (string, error) {
func (c *MockDynamicConfiguration) GetConfig(_ string, _ ...Option) (string, error) {
return c.content, nil
}
// GetConfigs: For zookeeper, getConfig and getConfigs have the same meaning.
// GetConfigs For zookeeper, getConfig and getConfigs have the same meaning.
func (c *MockDynamicConfiguration) GetConfigs(key string, opts ...Option) (string, error) {
return c.GetConfig(key, opts...)
}
......@@ -119,11 +119,11 @@ func (c *MockDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
}
// GetProperties ...
func (c *MockDynamicConfiguration) GetProperties(key string, opts ...Option) (string, error) {
func (c *MockDynamicConfiguration) GetProperties(_ string, _ ...Option) (string, error) {
return c.content, nil
}
// GetInternalProperty: For zookeeper, getConfig and getConfigs have the same meaning.
// GetInternalProperty For zookeeper, getConfig and getConfigs have the same meaning.
func (c *MockDynamicConfiguration) GetInternalProperty(key string, opts ...Option) (string, error) {
return c.GetProperties(key, opts...)
}
......
......@@ -48,7 +48,7 @@ type ConfigurationParser interface {
ParseToUrls(content string) ([]*common.URL, error)
}
//for support properties file in config center
// DefaultConfigurationParser for support properties file in config center
type DefaultConfigurationParser struct{}
// ConfiguratorConfig ...
......@@ -74,12 +74,12 @@ type ConfigItem struct {
// Parse ...
func (parser *DefaultConfigurationParser) Parse(content string) (map[string]string, error) {
properties, err := properties.LoadString(content)
pps, err := properties.LoadString(content)
if err != nil {
logger.Errorf("Parse the content {%v} in DefaultConfigurationParser error ,error message is {%v}", content, err)
return nil, err
}
return properties.Map(), nil
return pps.Map(), nil
}
// ParseToUrls ...
......
......@@ -38,7 +38,8 @@ import (
)
const (
// ZkClient: zookeeper client name
// ZkClient
//zookeeper client name
ZkClient = "zk config_center"
)
......@@ -137,10 +138,9 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
content, _, err := c.client.GetContent(c.rootPath + "/" + key)
if err != nil {
return "", perrors.WithStack(err)
} else {
return string(content), nil
}
return string(content), nil
}
//For zookeeper, getConfig and getConfigs have the same meaning.
......
......@@ -24,7 +24,8 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// Filter: Extension - Filter
// Filter
// Extension - Filter
type Filter interface {
Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result
OnResponse(context.Context, protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result
......
......@@ -35,6 +35,7 @@ import (
const (
//used in URL.
// FileDateFormat ...
FileDateFormat = "2006-01-02"
// MessageDateLayout ...
......@@ -45,6 +46,7 @@ const (
LogFileMode = 0600
// those fields are the data collected by this filter
// Types ...
Types = "types"
// Arguments ...
......@@ -56,6 +58,7 @@ func init() {
}
/*
* AccessLogFilter
* Although the access log filter is a default filter,
* you should config "accesslog" in service's config to tell the filter where store the access log.
* for example:
......@@ -94,7 +97,7 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
}
}
func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocation protocol.Invocation) map[string]string {
func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
dataMap[constant.INTERFACE_KEY] = attachments[constant.INTERFACE_KEY]
......@@ -128,7 +131,7 @@ func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocati
}
// OnResponse ...
func (ef *AccessLogFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (ef *AccessLogFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}
......
......@@ -30,7 +30,7 @@ import (
)
const (
// ECHO: echo module name
// ECHO echo module name
ECHO = "echo"
)
......@@ -38,7 +38,7 @@ func init() {
extension.SetFilter(ECHO, GetFilter)
}
// EchoFilter:
// EchoFilter
// RPCService need a Echo method in consumer, if you want to use EchoFilter
// eg:
// Echo func(ctx context.Context, arg interface{}, rsp *Xxx) error
......@@ -59,7 +59,9 @@ func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invo
}
// OnResponse ...
func (ef *EchoFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (ef *EchoFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {
return result
}
......
......@@ -46,7 +46,8 @@ func init() {
}
/**
* ExecuteLimitFilter: The filter will limit the number of in-progress request and it's thread-safe.
* ExecuteLimitFilter
* The filter will limit the number of in-progress request and it's thread-safe.
* example:
* "UserProvider":
* registry: "hangzhouzk"
......@@ -82,17 +83,17 @@ type ExecuteState struct {
// Invoke ...
func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
url := invoker.GetUrl()
limitTarget := url.ServiceKey()
ivkURL := invoker.GetUrl()
limitTarget := ivkURL.ServiceKey()
limitRateConfig := constant.DEFAULT_EXECUTE_LIMIT
methodLevelConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "")
methodLevelConfig := ivkURL.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "")
if len(methodLevelConfig) > 0 {
// we have the method-level configuration
limitTarget = limitTarget + "#" + invocation.MethodName()
limitRateConfig = methodLevelConfig
} else {
limitRateConfig = url.GetParam(constant.EXECUTE_LIMIT_KEY, constant.DEFAULT_EXECUTE_LIMIT)
limitRateConfig = ivkURL.GetParam(constant.EXECUTE_LIMIT_KEY, constant.DEFAULT_EXECUTE_LIMIT)
}
limitRate, err := strconv.ParseInt(limitRateConfig, 0, 0)
......@@ -112,17 +113,17 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok
concurrentCount := state.(*ExecuteState).increase()
defer state.(*ExecuteState).decrease()
if concurrentCount > limitRate {
logger.Errorf("The invocation was rejected due to over the execute limitation, url: %s ", url.String())
rejectedHandlerConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY,
url.GetParam(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY))
return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(url, invocation)
logger.Errorf("The invocation was rejected due to over the execute limitation, url: %s ", ivkURL.String())
rejectedHandlerConfig := ivkURL.GetParam(methodConfigPrefix+constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY,
ivkURL.GetParam(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY))
return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(ivkURL, invocation)
}
return invoker.Invoke(ctx, invocation)
}
// OnResponse ...
func (ef *ExecuteLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (ef *ExecuteLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment