diff --git a/.travis.yml b/.travis.yml index 7fb7420d8d33426d2e5873af9bc2fb4bacf80946..566c88ece05bd80175eea2d1de8fd061a279e273 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,22 +26,7 @@ script: - chmod u+x before_ut.sh && ./before_ut.sh - go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic # integrate-test - - echo 'start integrate-test' - # start zookeeper registry insecure listen in [:]:2181 - - docker run -d --network host zookeeper - - ROOTDIR=$(pwd) - - cd ./test/integrate/dubbo/go-client && docker build . -t ci-consumer --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_SLUG} --build-arg PR_ORIGIN_COMMITID=${TRAVIS_PULL_REQUEST_SHA} && cd $ROOTDIR - - cd ./test/integrate/dubbo/go-server && docker build . -t ci-provider --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_SLUG} --build-arg PR_ORIGIN_COMMITID=${TRAVIS_PULL_REQUEST_SHA} && cd $ROOTDIR - - docker run -d --network host ci-provider - - docker run -it --network host ci-consumer - - # another registry instance, start it by dep - # start etcd registry insecure listen in [:]:2379 - #- docker run -d --network host k8s.gcr.io/etcd:3.3.10 etcd - # start consul registry insecure listen in [:]:8500 - #- docker run -d --network host consul - # start nacos registry insecure listen in [:]:8848 - #- docker run -d --network host nacos/nacos-server:latest + - chmod +x integrate_test.sh && ./integrate_test.sh after_success: - bash <(curl -s https://codecov.io/bash) diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go index c2cebd3843d453a2d46d031e711e0efebd240fda..61d1c934522008e4d9bc46bbd57eb6fed6bf00f9 100644 --- a/cluster/cluster_impl/available_cluster_invoker_test.go +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -42,7 +42,7 @@ var ( availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") ) -func registerAvailable(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) availableCluster := NewAvailableCluster() @@ -60,7 +60,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerAvailable(t, invoker) + clusterInvoker := registerAvailable(invoker) mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} invoker.EXPECT().IsAvailable().Return(true) @@ -76,7 +76,7 @@ func TestAvailableClusterInvokerNoAvail(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerAvailable(t, invoker) + clusterInvoker := registerAvailable(invoker) invoker.EXPECT().IsAvailable().Return(false) diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index 12799994125c4bf5d968dfc811cda374effbf85c..cabd6c5f17cd3a3310054c0ff7b9a9877d581345 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -87,7 +87,6 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { } func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { - var selectedInvoker protocol.Invoker url := invokers[0].GetUrl() sticky := url.GetParamBool(constant.STICKY_KEY, false) diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go index 1be21067a6a9045cb6ae6f84655d516fea1f844b..ee7d48f3497772db3143b1ae62a30f66f99faa58 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -107,8 +107,8 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i), common.WithParams(urlParam)) - invokers = append(invokers, NewMockInvoker(url, successCount)) + newUrl, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i), common.WithParams(urlParam)) + invokers = append(invokers, NewMockInvoker(newUrl, successCount)) } staticDir := directory.NewStaticDirectory(invokers) diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 75d9ef26567df0fbd83f5d9f94c8548d1e8e633d..0f941fdccf4c1ed0c2423e28ad0be0f01974beb4 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -92,7 +92,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { factory := extension.GetRouterFactory(url.Protocol) r, err := factory.NewRouter(url) if err != nil { - logger.Errorf("Create router fail. router key: %s, error: %v", routerKey, url.Service(), err) + logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) return } routers = append(routers, r) diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go index d5993959f1d37f343a612e2bee305461d49535d0..6dc55b39407c9e88d18a65b5ec02fa866571624b 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base_directory_test.go @@ -19,7 +19,6 @@ package directory import ( "encoding/base64" - "fmt" "testing" ) @@ -35,7 +34,7 @@ import ( ) func TestNewBaseDirectory(t *testing.T) { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")) + url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") directory := NewBaseDirectory(&url) assert.NotNil(t, directory) @@ -46,7 +45,7 @@ func TestNewBaseDirectory(t *testing.T) { } func TestBuildRouterChain(t *testing.T) { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")) + url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") directory := NewBaseDirectory(&url) assert.NotNil(t, directory) diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index 9f600fedc40cf29a40abca6c11652935f20473b4..87f51356495dbd0a956c42bf4f34022b4d21ad4d 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -61,7 +61,7 @@ func (dir *staticDirectory) IsAvailable() bool { // List List invokers func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker { l := len(dir.invokers) - invokers := make([]protocol.Invoker, l, l) + invokers := make([]protocol.Invoker, l) copy(invokers, dir.invokers) routerChain := dir.RouterChain() diff --git a/common/config/environment.go b/common/config/environment.go index 071af31152ba4ce3c579f70aa23df59d718ce506..446c46aa1ef71a68aa024bf83dd9088cf03677f2 100644 --- a/common/config/environment.go +++ b/common/config/environment.go @@ -46,7 +46,7 @@ var ( once sync.Once ) -// GetEnvInstance ... +// GetEnvInstance gets env instance by singleton func GetEnvInstance() *Environment { once.Do(func() { instance = &Environment{configCenterFirst: true} @@ -54,7 +54,7 @@ func GetEnvInstance() *Environment { return instance } -// NewEnvInstance ... +// NewEnvInstance creates Environment instance func NewEnvInstance() { instance = &Environment{configCenterFirst: true} } @@ -67,21 +67,22 @@ func NewEnvInstance() { // return env.configCenterFirst //} -// UpdateExternalConfigMap ... +// UpdateExternalConfigMap updates env externalConfigMap field func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) { for k, v := range externalMap { env.externalConfigMap.Store(k, v) } } -// UpdateAppExternalConfigMap ... +// UpdateAppExternalConfigMap updates env appExternalConfigMap field func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string) { for k, v := range externalMap { env.appExternalConfigMap.Store(k, v) } } -// Configuration ... +// Configuration puts externalConfigMap and appExternalConfigMap into list +// List represents a doubly linked list. func (env *Environment) Configuration() *list.List { cfgList := list.New() // The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration @@ -90,17 +91,17 @@ func (env *Environment) Configuration() *list.List { return cfgList } -// SetDynamicConfiguration ... +// SetDynamicConfiguration sets value for dynamicConfiguration func (env *Environment) SetDynamicConfiguration(dc config_center.DynamicConfiguration) { env.dynamicConfiguration = dc } -// GetDynamicConfiguration ... +// GetDynamicConfiguration gets dynamicConfiguration func (env *Environment) GetDynamicConfiguration() config_center.DynamicConfiguration { return env.dynamicConfiguration } -// InmemoryConfiguration ... +// InmemoryConfiguration stores config in memory type InmemoryConfiguration struct { store *sync.Map } @@ -109,7 +110,7 @@ func newInmemoryConfiguration(p *sync.Map) *InmemoryConfiguration { return &InmemoryConfiguration{store: p} } -// GetProperty ... +// GetProperty gets value from InmemoryConfiguration instance by @key func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) { if conf.store == nil { return false, "" @@ -123,7 +124,7 @@ func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) { return false, "" } -// GetSubProperty ... +// GetSubProperty gets sub property from InmemoryConfiguration instance by @subkey func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} { if conf.store == nil { return nil diff --git a/common/extension/auth.go b/common/extension/auth.go index d7900045d3f7db9e2587e4e92e377325c74971b3..7caae00e84fd80666ff79b599e12f8516e23209c 100644 --- a/common/extension/auth.go +++ b/common/extension/auth.go @@ -26,12 +26,12 @@ var ( accesskeyStorages = make(map[string]func() filter.AccessKeyStorage) ) -// SetAuthenticator put the fcn into map with name +// SetAuthenticator puts the @fcn into map with name func SetAuthenticator(name string, fcn func() filter.Authenticator) { authenticators[name] = fcn } -// GetAuthenticator find the Authenticator with name +// GetAuthenticator finds the Authenticator with @name // if not found, it will panic func GetAuthenticator(name string) filter.Authenticator { if authenticators[name] == nil { @@ -40,12 +40,12 @@ func GetAuthenticator(name string) filter.Authenticator { return authenticators[name]() } -// SetAccesskeyStorages will set the fcn into map with this name +// SetAccesskeyStorages will set the @fcn into map with this name func SetAccesskeyStorages(name string, fcn func() filter.AccessKeyStorage) { accesskeyStorages[name] = fcn } -// GetAccesskeyStorages find the storage with the name. +// GetAccesskeyStorages finds the storage with the @name. // If not found, it will panic. func GetAccesskeyStorages(name string) filter.AccessKeyStorage { if accesskeyStorages[name] == nil { diff --git a/common/extension/cluster.go b/common/extension/cluster.go index b2d81f6b1e56bb487b1d408b878308f6dfe042e4..8be27a1ca3aaf93dd54201c4ff7081478c746f0f 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -25,12 +25,13 @@ var ( clusters = make(map[string]func() cluster.Cluster) ) -// SetCluster ... +// SetCluster sets the cluster fault-tolerant mode with @name +// For example: available/failfast/broadcast/failfast/failsafe/... func SetCluster(name string, fcn func() cluster.Cluster) { clusters[name] = fcn } -// GetCluster ... +// GetCluster finds the cluster fault-tolerant mode with @name func GetCluster(name string) cluster.Cluster { if clusters[name] == nil { panic("cluster for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_center.go b/common/extension/config_center.go index 03d27db46c94b0ea0e212646077d97f948a8e328..3cbced8d3bbcdb3dc7f9af800fa36681d6dc063d 100644 --- a/common/extension/config_center.go +++ b/common/extension/config_center.go @@ -26,12 +26,12 @@ var ( configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error)) ) -// SetConfigCenter ... +// SetConfigCenter sets the DynamicConfiguration with @name func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) { configCenters[name] = v } -// GetConfigCenter ... +// GetConfigCenter finds the DynamicConfiguration with @name func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) { if configCenters[name] == nil { panic("config center for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_center_factory.go b/common/extension/config_center_factory.go index 85913fdce1ed3472c2bd9eb4aadbb0f631481dbd..dff89752296c6d2441d043ec628aa13ad219e698 100644 --- a/common/extension/config_center_factory.go +++ b/common/extension/config_center_factory.go @@ -25,12 +25,12 @@ var ( configCenterFactories = make(map[string]func() config_center.DynamicConfigurationFactory) ) -// SetConfigCenterFactory ... +// SetConfigCenterFactory sets the DynamicConfigurationFactory with @name func SetConfigCenterFactory(name string, v func() config_center.DynamicConfigurationFactory) { configCenterFactories[name] = v } -// GetConfigCenterFactory ... +// GetConfigCenterFactory finds the DynamicConfigurationFactory with @name func GetConfigCenterFactory(name string) config_center.DynamicConfigurationFactory { if configCenterFactories[name] == nil { panic("config center for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_reader.go b/common/extension/config_reader.go index aced5b0281ff9313461425e5ec6d70d562c6c947..5e13d8629fd145dac680619a427c68b29226b051 100644 --- a/common/extension/config_reader.go +++ b/common/extension/config_reader.go @@ -26,12 +26,12 @@ var ( defaults = make(map[string]string) ) -// SetConfigReaders set a creator of config reader. +// SetConfigReaders sets a creator of config reader with @name func SetConfigReaders(name string, v func() interfaces.ConfigReader) { configReaders[name] = v } -// GetConfigReaders get a config reader by name. +// GetConfigReaders gets a config reader with @name func GetConfigReaders(name string) interfaces.ConfigReader { if configReaders[name] == nil { panic("config reader for " + name + " is not existing, make sure you have imported the package.") @@ -39,12 +39,12 @@ func GetConfigReaders(name string) interfaces.ConfigReader { return configReaders[name]() } -// SetDefaultConfigReader set {name} to default config reader for {module} +// SetDefaultConfigReader sets @name for @module in default config reader func SetDefaultConfigReader(module, name string) { defaults[module] = name } -// GetDefaultConfigReader +// GetDefaultConfigReader gets default config reader func GetDefaultConfigReader() map[string]string { return defaults } diff --git a/common/extension/configurator.go b/common/extension/configurator.go index de98f8a260ea1f3a2e2a1f32c82dc869585e2789..dc2bea73afb79aaab36e2ce7cc9675169a446eb7 100644 --- a/common/extension/configurator.go +++ b/common/extension/configurator.go @@ -23,7 +23,7 @@ import ( ) const ( - // DefaultKey ... + // DefaultKey for default Configurator DefaultKey = "default" ) @@ -33,12 +33,12 @@ var ( configurator = make(map[string]getConfiguratorFunc) ) -// SetConfigurator ... +// SetConfigurator sets the getConfiguratorFunc with @name func SetConfigurator(name string, v getConfiguratorFunc) { configurator[name] = v } -// GetConfigurator ... +// GetConfigurator finds the Configurator with @name func GetConfigurator(name string, url *common.URL) config_center.Configurator { if configurator[name] == nil { panic("configurator for " + name + " is not existing, make sure you have import the package.") @@ -47,12 +47,12 @@ func GetConfigurator(name string, url *common.URL) config_center.Configurator { } -// SetDefaultConfigurator ... +// SetDefaultConfigurator sets the default Configurator func SetDefaultConfigurator(v getConfiguratorFunc) { configurator[DefaultKey] = v } -// GetDefaultConfigurator ... +// GetDefaultConfigurator gets default configurator func GetDefaultConfigurator(url *common.URL) config_center.Configurator { if configurator[DefaultKey] == nil { panic("configurator for default is not existing, make sure you have import the package.") @@ -61,7 +61,7 @@ func GetDefaultConfigurator(url *common.URL) config_center.Configurator { } -// GetDefaultConfiguratorFunc ... +// GetDefaultConfiguratorFunc gets default configurator function func GetDefaultConfiguratorFunc() getConfiguratorFunc { if configurator[DefaultKey] == nil { panic("configurator for default is not existing, make sure you have import the package.") diff --git a/common/extension/filter.go b/common/extension/filter.go index deea2d908bc2741e0f15ecc36e9d4fc5975e531e..96059c4363060c41f14ececb466ca62bdaefb1a9 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -26,12 +26,13 @@ var ( rejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler) ) -// SetFilter ... +// SetFilter sets the filter extension with @name +// For example: hystrix/metrics/token/tracing/limit/... func SetFilter(name string, v func() filter.Filter) { filters[name] = v } -// GetFilter ... +// GetFilter finds the filter extension with @name func GetFilter(name string) filter.Filter { if filters[name] == nil { panic("filter for " + name + " is not existing, make sure you have imported the package.") @@ -39,12 +40,12 @@ func GetFilter(name string) filter.Filter { return filters[name]() } -// SetRejectedExecutionHandler ... +// SetRejectedExecutionHandler sets the RejectedExecutionHandler with @name func SetRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { rejectedExecutionHandler[name] = creator } -// GetRejectedExecutionHandler ... +// GetRejectedExecutionHandler finds the RejectedExecutionHandler with @name func GetRejectedExecutionHandler(name string) filter.RejectedExecutionHandler { creator, ok := rejectedExecutionHandler[name] if !ok { diff --git a/common/extension/graceful_shutdown.go b/common/extension/graceful_shutdown.go index 3abd75c0aa328f3553c3d83340ae440b8dfe3356..cb55419aabbce26b41e5b10f49268f6b3ace516d 100644 --- a/common/extension/graceful_shutdown.go +++ b/common/extension/graceful_shutdown.go @@ -49,7 +49,7 @@ func AddCustomShutdownCallback(callback func()) { customShutdownCallbacks.PushBack(callback) } -// GetAllCustomShutdownCallbacks ... +// GetAllCustomShutdownCallbacks gets all custom shutdown callbacks func GetAllCustomShutdownCallbacks() *list.List { return customShutdownCallbacks } diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go index 365c5d0910812efb00eb94408bb226115b037c02..548d4dc761b31773a2a39ccb0ae3de1d7ab39eb4 100644 --- a/common/extension/health_checker.go +++ b/common/extension/health_checker.go @@ -26,12 +26,12 @@ var ( healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker) ) -// SethealthChecker set the HealthChecker with name +// SethealthChecker sets the HealthChecker with @name func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) { healthCheckers[name] = fcn } -// GetHealthChecker get the HealthChecker with name +// GetHealthChecker gets the HealthChecker with @name func GetHealthChecker(name string, url *common.URL) router.HealthChecker { if healthCheckers[name] == nil { panic("healthCheckers for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/loadbalance.go b/common/extension/loadbalance.go index 0d557a4640ed892a18ad59a3247763ab5807a593..aa19141014a6c42df0c17dad05301997f67fbd79 100644 --- a/common/extension/loadbalance.go +++ b/common/extension/loadbalance.go @@ -25,12 +25,13 @@ var ( loadbalances = make(map[string]func() cluster.LoadBalance) ) -// SetLoadbalance ... +// SetLoadbalance sets the loadbalance extension with @name +// For example: random/round_robin/consistent_hash/least_active/... func SetLoadbalance(name string, fcn func() cluster.LoadBalance) { loadbalances[name] = fcn } -// GetLoadbalance ... +// GetLoadbalance finds the loadbalance extension with @name func GetLoadbalance(name string) cluster.LoadBalance { if loadbalances[name] == nil { panic("loadbalance for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/metadata_report_factory.go b/common/extension/metadata_report_factory.go index 0ae0793bb4459767cb42fb1860fc484388aae1a3..c55f8617fadd9d09d68547b05341d127716ce73c 100644 --- a/common/extension/metadata_report_factory.go +++ b/common/extension/metadata_report_factory.go @@ -25,12 +25,12 @@ var ( metaDataReportFactories = make(map[string]func() metadata.MetadataReportFactory, 8) ) -// SetMetadataReportFactory ... +// SetMetadataReportFactory sets the MetadataReportFactory with @name func SetMetadataReportFactory(name string, v func() metadata.MetadataReportFactory) { metaDataReportFactories[name] = v } -// GetMetadataReportFactory ... +// GetMetadataReportFactory finds the MetadataReportFactory with @name func GetMetadataReportFactory(name string) metadata.MetadataReportFactory { if metaDataReportFactories[name] == nil { panic("metadata report for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/metrics.go b/common/extension/metrics.go index 42fca7a2db36614fcef31dd5ba7324a156164d4f..60cf6bac2384c7367094adad83e01f7dcf64a33d 100644 --- a/common/extension/metrics.go +++ b/common/extension/metrics.go @@ -27,12 +27,12 @@ var ( metricReporterMap = make(map[string]func() metrics.Reporter, 4) ) -// SetMetricReporter set a reporter with the name +// SetMetricReporter sets a reporter with the @name func SetMetricReporter(name string, reporterFunc func() metrics.Reporter) { metricReporterMap[name] = reporterFunc } -// GetMetricReporter find the reporter with name. +// GetMetricReporter finds the reporter with @name. // if not found, it will panic. // we should know that this method usually is called when system starts, so we should panic func GetMetricReporter(name string) metrics.Reporter { diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 009687a17ace8cea567248af655e04604d09d9b8..c89dd08fae5d12b384d6ca4e797343fe79897bbd 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -25,12 +25,12 @@ var ( protocols = make(map[string]func() protocol.Protocol) ) -// SetProtocol ... +// SetProtocol sets the protocol extension with @name func SetProtocol(name string, v func() protocol.Protocol) { protocols[name] = v } -// GetProtocol ... +// GetProtocol finds the protocol extension with @name func GetProtocol(name string) protocol.Protocol { if protocols[name] == nil { panic("protocol for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/proxy_factory.go b/common/extension/proxy_factory.go index 19826bb0560ea0d3fa471c04873b20a6878f57d8..1e326d884b5dd37925c38ffdf0a87e69bf6a865c 100644 --- a/common/extension/proxy_factory.go +++ b/common/extension/proxy_factory.go @@ -25,12 +25,12 @@ var ( proxyFactories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) ) -// SetProxyFactory ... +// SetProxyFactory sets the ProxyFactory extension with @name func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { proxyFactories[name] = f } -// GetProxyFactory ... +// GetProxyFactory finds the ProxyFactory extension with @name func GetProxyFactory(name string) proxy.ProxyFactory { if name == "" { name = "default" diff --git a/common/extension/registry.go b/common/extension/registry.go index 6ba746dc47382927d12ce39b7936212c5d75153d..291a7a7fc2cae07c9228043acae7cc0ed5459a1f 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -26,12 +26,12 @@ var ( registrys = make(map[string]func(config *common.URL) (registry.Registry, error)) ) -// SetRegistry ... +// SetRegistry sets the registry extension with @name func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) { registrys[name] = v } -// GetRegistry ... +// GetRegistry finds the registry extension with @name func GetRegistry(name string, config *common.URL) (registry.Registry, error) { if registrys[name] == nil { panic("registry for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go index 6b92189c4e98b391a90e6e71a68d51a252eede2a..330fc46400daf81047e5c24c1634249e355d74b7 100644 --- a/common/extension/registry_directory.go +++ b/common/extension/registry_directory.go @@ -27,12 +27,12 @@ type registryDirectory func(url *common.URL, registry registry.Registry) (cluste var defaultRegistry registryDirectory -// SetDefaultRegistryDirectory ... +// SetDefaultRegistryDirectory sets the default registryDirectory func SetDefaultRegistryDirectory(v registryDirectory) { defaultRegistry = v } -// GetDefaultRegistryDirectory ... +// GetDefaultRegistryDirectory finds the registryDirectory with url and registry func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) { if defaultRegistry == nil { panic("registry directory is not existing, make sure you have import the package.") diff --git a/common/extension/rest_client.go b/common/extension/rest_client.go index 514d1fdfd2efb5c291fdb47df4dd69da26fa90b1..9caf8c67df76bb160d5e2c3100f83e2d198b6381 100644 --- a/common/extension/rest_client.go +++ b/common/extension/rest_client.go @@ -25,10 +25,12 @@ var ( restClients = make(map[string]func(restOptions *client.RestOptions) client.RestClient, 8) ) +// SetRestClient sets the RestClient with @name func SetRestClient(name string, fun func(restOptions *client.RestOptions) client.RestClient) { restClients[name] = fun } +// GetNewRestClient finds the RestClient with @name func GetNewRestClient(name string, restOptions *client.RestOptions) client.RestClient { if restClients[name] == nil { panic("restClient for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/rest_server.go b/common/extension/rest_server.go index fa8d435a5c976a4c95b036810fa2916a327a73b9..37a231a57c861ae49aab244eb9fa8b611ae63f6d 100644 --- a/common/extension/rest_server.go +++ b/common/extension/rest_server.go @@ -25,10 +25,12 @@ var ( restServers = make(map[string]func() server.RestServer, 8) ) +// SetRestServer sets the RestServer with @name func SetRestServer(name string, fun func() server.RestServer) { restServers[name] = fun } +// GetNewRestServer finds the RestServer with @name func GetNewRestServer(name string) server.RestServer { if restServers[name] == nil { panic("restServer for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index 1339228618def41ccebc8d54cdebb5a623e605fa..21a49d2681b500bf4e4942d1b92e5b23bc7cf6b7 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -31,12 +31,12 @@ var ( fileRouterFactories = make(map[string]router.FileRouterFactory) ) -// SetRouterFactory Set create router factory function by name +// SetRouterFactory sets create router factory function with @name func SetRouterFactory(name string, fun func() router.RouterFactory) { routers[name] = fun } -// GetRouterFactory Get create router factory function by name +// GetRouterFactory gets create router factory function by @name func GetRouterFactory(name string) router.RouterFactory { if routers[name] == nil { panic("router_factory for " + name + " is not existing, make sure you have import the package.") @@ -44,12 +44,12 @@ func GetRouterFactory(name string) router.RouterFactory { return routers[name]() } -// GetRouterFactories Get all create router factory function +// GetRouterFactories gets all create router factory function func GetRouterFactories() map[string]func() router.RouterFactory { return routers } -// GetFileRouterFactories Get all create file router factory instance +// GetFileRouterFactories gets all create file router factory instance func GetFileRouterFactories() map[string]router.FileRouterFactory { l := len(routers) if l == 0 { diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index 25b80cf3353505c058bea40cc4c80712ad923d2d..d70b032306f567fca5b9b57213f29bde3e7a937b 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -29,7 +29,7 @@ var ( discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4) ) -// SetServiceDiscovery will store the creator and name +// SetServiceDiscovery will store the @creator and @name func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) { discoveryCreatorMap[name] = creator } diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index c72c2b030fc0f391362189bfe18a65582543693a..d25821deee626cb75c94af2257f877c9983023de 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -26,12 +26,12 @@ var ( tpsLimiter = make(map[string]func() filter.TpsLimiter) ) -// SetTpsLimiter ... +// SetTpsLimiter sets the TpsLimiter with @name func SetTpsLimiter(name string, creator func() filter.TpsLimiter) { tpsLimiter[name] = creator } -// GetTpsLimiter ... +// GetTpsLimiter finds the TpsLimiter with @name func GetTpsLimiter(name string) filter.TpsLimiter { creator, ok := tpsLimiter[name] if !ok { @@ -41,12 +41,12 @@ func GetTpsLimiter(name string) filter.TpsLimiter { return creator() } -// SetTpsLimitStrategy ... +// SetTpsLimitStrategy sets the TpsLimitStrategyCreator with @name func SetTpsLimitStrategy(name string, creator filter.TpsLimitStrategyCreator) { tpsLimitStrategy[name] = creator } -// GetTpsLimitStrategyCreator ... +// GetTpsLimitStrategyCreator finds the TpsLimitStrategyCreator with @name func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator { creator, ok := tpsLimitStrategy[name] if !ok { diff --git a/common/logger/logger.go b/common/logger/logger.go index 016afe69808f2007541c617f406db64beb511f1c..9bc6a461003d086e8951ebac3d6997774ac69b90 100644 --- a/common/logger/logger.go +++ b/common/logger/logger.go @@ -40,13 +40,13 @@ var ( logger Logger ) -// DubboLogger ... +// nolint type DubboLogger struct { Logger dynamicLevel zap.AtomicLevel } -// Logger ... +// Logger is the interface for Logger types type Logger interface { Info(args ...interface{}) Warn(args ...interface{}) @@ -67,7 +67,7 @@ func init() { } } -// InitLog ... +// InitLog use for init logger by call InitLogger func InitLog(logConfFile string) error { if logConfFile == "" { InitLogger(nil) @@ -96,7 +96,7 @@ func InitLog(logConfFile string) error { return nil } -// InitLogger ... +// InitLogger use for init logger by @conf func InitLogger(conf *zap.Config) { var zapLoggerConfig zap.Config if conf == nil { @@ -125,18 +125,18 @@ func InitLogger(conf *zap.Config) { getty.SetLogger(logger) } -// SetLogger ... +// SetLogger sets logger for dubbo and getty func SetLogger(log Logger) { logger = log getty.SetLogger(logger) } -// GetLogger ... +// GetLogger gets the logger func GetLogger() Logger { return logger } -// SetLoggerLevel ... +// SetLoggerLevel use for set logger level func SetLoggerLevel(level string) bool { if l, ok := logger.(OpsLogger); ok { l.SetLoggerLevel(level) @@ -145,13 +145,13 @@ func SetLoggerLevel(level string) bool { return false } -// OpsLogger ... +// OpsLogger use for the SetLoggerLevel type OpsLogger interface { Logger SetLoggerLevel(level string) } -// SetLoggerLevel ... +// SetLoggerLevel use for set logger level func (dl *DubboLogger) SetLoggerLevel(level string) { l := new(zapcore.Level) l.Set(level) diff --git a/common/logger/logging.go b/common/logger/logging.go index 36d48ee61e8a4a986abfbaa79f3d361cd81494f4..7a31ece203815287384ade282b2a4f12e11abc2a 100644 --- a/common/logger/logging.go +++ b/common/logger/logging.go @@ -17,42 +17,42 @@ package logger -// Info ... +// Info is info level func Info(args ...interface{}) { logger.Info(args...) } -// Warn ... +// Warn is warning level func Warn(args ...interface{}) { logger.Warn(args...) } -// Error ... +// Error is error level func Error(args ...interface{}) { logger.Error(args...) } -// Debug ... +// Debug is debug level func Debug(args ...interface{}) { logger.Debug(args...) } -// Infof ... +// Infof is format info level func Infof(fmt string, args ...interface{}) { logger.Infof(fmt, args...) } -// Warnf ... +// Warnf is format warning level func Warnf(fmt string, args ...interface{}) { logger.Warnf(fmt, args...) } -// Errorf ... +// Errorf is format error level func Errorf(fmt string, args ...interface{}) { logger.Errorf(fmt, args...) } -// Debugf ... +// Debugf is format debug level func Debugf(fmt string, args ...interface{}) { logger.Debugf(fmt, args...) } diff --git a/common/node.go b/common/node.go index 979eee31ef3a63eb21af6c9045aee7f6d784f2ba..4febd78536126c67bdc65fc09d4be47fb869ef5e 100644 --- a/common/node.go +++ b/common/node.go @@ -17,7 +17,7 @@ package common -// Node ... +// Node use for process dubbo node type Node interface { GetUrl() URL IsAvailable() bool diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index a77d26d1103e605f5d0b38ac14c8bb3e20fc27b8..abcf87cd9d297769bf8aff6fa07d6a4659091eb6 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -31,7 +31,7 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) -// Proxy struct +// nolint type Proxy struct { rpc common.RPCService invoke protocol.Invoker @@ -205,12 +205,12 @@ func (p *Proxy) Implement(v common.RPCService) { } -// Get get rpc service instance. +// Get gets rpc service instance. func (p *Proxy) Get() common.RPCService { return p.rpc } -// GetCallback get callback. +// GetCallback gets callback. func (p *Proxy) GetCallback() interface{} { return p.callBack } diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 34fa3fd07eacae95351f302158d7da68165ea5cf..117428cb253e1ad4a4ceee59aa620d7097b41a75 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -29,5 +29,5 @@ type ProxyFactory interface { GetInvoker(url common.URL) protocol.Invoker } -// Option ... +// Option will define a function of handling ProxyFactory type Option func(ProxyFactory) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 013b39911054ef5bd89d38cd50116a204b117872..1bb1e29c5ced78ad9e2e2483b73379c66328050a 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -40,7 +40,7 @@ func init() { extension.SetProxyFactory("default", NewDefaultProxyFactory) } -// DefaultProxyFactory ... +// DefaultProxyFactory is the default proxy factory type DefaultProxyFactory struct { //delegate ProxyFactory } @@ -53,17 +53,17 @@ type DefaultProxyFactory struct { // } //} -// NewDefaultProxyFactory ... +// NewDefaultProxyFactory returns a proxy factory instance func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } -// GetProxy ... +// GetProxy gets a proxy func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { return factory.GetAsyncProxy(invoker, nil, url) } -// GetAsyncProxy ... +// GetAsyncProxy gets a async proxy func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} @@ -71,19 +71,19 @@ func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, call return proxy.NewProxy(invoker, callBack, attachments) } -// GetInvoker ... +// GetInvoker gets a invoker func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), } } -// ProxyInvoker ... +// ProxyInvoker is a invoker struct type ProxyInvoker struct { protocol.BaseInvoker } -// Invoke ... +// Invoke is used to call service method by invocation func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { result := &protocol.RPCResult{} result.SetAttachments(invocation.Attachments()) diff --git a/common/rpc_service.go b/common/rpc_service.go index d7d900718e911fe12915225c04338d52e5f084f5..211e4eae9c395bbeb0e2a46f2d3f8c7d460c3f40 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -35,23 +35,23 @@ import ( ) // RPCService -//rpc service interface +// rpc service interface type RPCService interface { // Reference: // rpc service id or reference id Reference() string } -//AsyncCallbackService callback interface for async +// AsyncCallbackService callback interface for async type AsyncCallbackService interface { // Callback: callback CallBack(response CallbackResponse) } -//CallbackResponse for different protocol +// CallbackResponse for different protocol type CallbackResponse interface{} -//AsyncCallback async callback method +// AsyncCallback async callback method type AsyncCallback func(response CallbackResponse) // for lowercase func @@ -87,27 +87,27 @@ type MethodType struct { replyType reflect.Type // return value, otherwise it is nil } -// Method get @m.method. +// Method gets @m.method. func (m *MethodType) Method() reflect.Method { return m.method } -// CtxType get @m.ctxType. +// CtxType gets @m.ctxType. func (m *MethodType) CtxType() reflect.Type { return m.ctxType } -// ArgsType get @m.argsType. +// ArgsType gets @m.argsType. func (m *MethodType) ArgsType() []reflect.Type { return m.argsType } -// ReplyType get @m.replyType. +// ReplyType gets @m.replyType. func (m *MethodType) ReplyType() reflect.Type { return m.replyType } -// SuiteContext tranfer @ctx to reflect.Value type or get it from @m.ctxType. +// SuiteContext tranfers @ctx to reflect.Value type or get it from @m.ctxType. func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value { if contextv := reflect.ValueOf(ctx); contextv.IsValid() { return contextv @@ -127,17 +127,17 @@ type Service struct { methods map[string]*MethodType } -// Method get @s.methods. +// Method gets @s.methods. func (s *Service) Method() map[string]*MethodType { return s.methods } -// RcvrType get @s.rcvrType. +// RcvrType gets @s.rcvrType. func (s *Service) RcvrType() reflect.Type { return s.rcvrType } -// Rcvr get @s.rcvr. +// Rcvr gets @s.rcvr. func (s *Service) Rcvr() reflect.Value { return s.rcvr } @@ -152,7 +152,7 @@ type serviceMap struct { interfaceMap map[string][]*Service // interface -> service } -// GetService get a service defination by protocol and name +// GetService gets a service defination by protocol and name func (sm *serviceMap) GetService(protocol, name string) *Service { sm.mutex.RLock() defer sm.mutex.RUnlock() @@ -165,7 +165,7 @@ func (sm *serviceMap) GetService(protocol, name string) *Service { return nil } -// GetInterface get an interface defination by interface name +// GetInterface gets an interface defination by interface name func (sm *serviceMap) GetInterface(interfaceName string) []*Service { sm.mutex.RLock() defer sm.mutex.RUnlock() @@ -175,7 +175,7 @@ func (sm *serviceMap) GetInterface(interfaceName string) []*Service { return nil } -// Register register a service by @interfaceName and @protocol +// Register registers a service by @interfaceName and @protocol func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService) (string, error) { if sm.serviceMap[protocol] == nil { sm.serviceMap[protocol] = make(map[string]*Service) @@ -223,7 +223,7 @@ func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService) return strings.TrimSuffix(methods, ","), nil } -// UnRegister cancel a service by @interfaceName, @protocol and @serviceId +// UnRegister cancels a service by @interfaceName, @protocol and @serviceId func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) error { if protocol == "" || serviceId == "" { return perrors.New("protocol or serviceName is nil") diff --git a/common/url.go b/common/url.go index a70ac7dc9dd6f415aa458689864604a793c8e256..1cfa47ae28451a6ab6c00029247ba7179b43371a 100644 --- a/common/url.go +++ b/common/url.go @@ -18,7 +18,6 @@ package common import ( - "bytes" "encoding/base64" "fmt" "math" @@ -26,7 +25,6 @@ import ( "net/url" "strconv" "strings" - "sync" ) import ( @@ -40,136 +38,140 @@ import ( "github.com/apache/dubbo-go/common/constant" ) -///////////////////////////////// +// /////////////////////////////// // dubbo role type -///////////////////////////////// +// /////////////////////////////// // role constant const ( - // CONSUMER ... + // CONSUMER is consumer role CONSUMER = iota - // CONFIGURATOR ... + // CONFIGURATOR is configurator role CONFIGURATOR - // ROUTER ... + // ROUTER is router role ROUTER - // PROVIDER ... + // PROVIDER is provider role PROVIDER ) var ( - // DubboNodes ... + // DubboNodes Dubbo service node DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} // DubboRole Dubbo service role DubboRole = [...]string{"consumer", "", "routers", "provider"} ) -// RoleType ... +// nolint type RoleType int func (t RoleType) String() string { return DubboNodes[t] } -// Role ... +// Role returns role by @RoleType func (t RoleType) Role() string { return DubboRole[t] } type baseUrl struct { - Protocol string - Location string // ip+port - Ip string - Port string - //url.Values is not safe map, add to avoid concurrent map read and map write error - paramsLock sync.RWMutex + Protocol string + Location string // ip+port + Ip string + Port string params url.Values PrimitiveURL string } -// URL ... +// URL is not thread-safe. +// we fail to define this struct to be immutable object. +// but, those method which will update the URL, including SetParam, SetParams +// are only allowed to be invoked in creating URL instance +// Please keep in mind that this struct is immutable after it has been created and initialized. type URL struct { baseUrl Path string // like /com.ikurento.dubbo.UserProvider3 Username string Password string Methods []string - //special for registry + // special for registry SubURL *URL } +// Option accepts url +// Option will define a function of handling URL type option func(*URL) -// WithUsername ... +// WithUsername sets username for url func WithUsername(username string) option { return func(url *URL) { url.Username = username } } -// WithPassword ... +// WithPassword sets password for url func WithPassword(pwd string) option { return func(url *URL) { url.Password = pwd } } -// WithMethods ... +// WithMethods sets methods for url func WithMethods(methods []string) option { return func(url *URL) { url.Methods = methods } } -// WithParams ... +// WithParams sets params for url func WithParams(params url.Values) option { return func(url *URL) { url.params = params } } -// WithParamsValue ... +// WithParamsValue sets params field for url func WithParamsValue(key, val string) option { return func(url *URL) { url.SetParam(key, val) } } -// WithProtocol ... +// WithProtocol sets protocol for url func WithProtocol(proto string) option { return func(url *URL) { url.Protocol = proto } } -// WithIp ... +// WithIp sets ip for url func WithIp(ip string) option { return func(url *URL) { url.Ip = ip } } -// WithPort ... +// WithPort sets port for url func WithPort(port string) option { return func(url *URL) { url.Port = port } } -// WithPath ... +// WithPath sets path for url func WithPath(path string) option { return func(url *URL) { url.Path = "/" + strings.TrimPrefix(path, "/") } } -// WithLocation ... +// WithLocation sets location for url func WithLocation(location string) option { return func(url *URL) { url.Location = location } } -// WithToken ... +// WithToken sets token for url func WithToken(token string) option { return func(url *URL) { if len(token) > 0 { @@ -182,7 +184,7 @@ func WithToken(token string) option { } } -// NewURLWithOptions ... +// NewURLWithOptions will create a new url with options func NewURLWithOptions(opts ...option) *URL { url := &URL{} for _, opt := range opts { @@ -212,7 +214,7 @@ func NewURL(urlString string, opts ...option) (URL, error) { return s, perrors.Errorf("url.QueryUnescape(%s), error{%v}", urlString, err) } - //rawUrlString = "//" + rawUrlString + // rawUrlString = "//" + rawUrlString if strings.Index(rawUrlString, "//") < 0 { t := URL{baseUrl: baseUrl{}} for _, opt := range opts { @@ -275,7 +277,7 @@ func (c URL) URLEqual(url URL) bool { return false } - //TODO :may need add interface key any value condition + // TODO :may need add interface key any value condition return isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) } @@ -292,23 +294,21 @@ func isMatchCategory(category1 string, category2 string) bool { } func (c URL) String() string { - var buildString string + var buf strings.Builder if len(c.Username) == 0 && len(c.Password) == 0 { - buildString = fmt.Sprintf( + buf.WriteString(fmt.Sprintf( "%s://%s:%s%s?", - c.Protocol, c.Ip, c.Port, c.Path) + c.Protocol, c.Ip, c.Port, c.Path)) } else { - buildString = fmt.Sprintf( + buf.WriteString(fmt.Sprintf( "%s://%s:%s@%s:%s%s?", - c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path) + c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path)) } - c.paramsLock.RLock() - buildString += c.params.Encode() - c.paramsLock.RUnlock() - return buildString + buf.WriteString(c.params.Encode()) + return buf.String() } -// Key ... +// Key gets key func (c URL) Key() string { buildString := fmt.Sprintf( "%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s", @@ -316,13 +316,13 @@ func (c URL) Key() string { return buildString } -// ServiceKey get a unique key of a service. +// ServiceKey gets a unique key of a service. func (c URL) ServiceKey() string { intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if intf == "" { return "" } - buf := &bytes.Buffer{} + var buf strings.Builder group := c.GetParam(constant.GROUP_KEY, "") if group != "" { buf.WriteString(group) @@ -347,7 +347,7 @@ func (c *URL) ColonSeparatedKey() string { if intf == "" { return "" } - buf := &bytes.Buffer{} + var buf strings.Builder buf.WriteString(intf) buf.WriteString(":") version := c.GetParam(constant.VERSION_KEY, "") @@ -362,44 +362,44 @@ func (c *URL) ColonSeparatedKey() string { return buf.String() } -// EncodedServiceKey ... +// EncodedServiceKey encode the service key func (c *URL) EncodedServiceKey() string { serviceKey := c.ServiceKey() return strings.Replace(serviceKey, "/", "*", 1) } -// Service ... +// Service gets service func (c URL) Service() string { service := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if service != "" { return service } else if c.SubURL != nil { service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) - if service != "" { //if url.path is "" then return suburl's path, special for registry url + if service != "" { // if url.path is "" then return suburl's path, special for registry url return service } } return "" } -// AddParam ... +// AddParam will add the key-value pair +// Not thread-safe +// think twice before using it. func (c *URL) AddParam(key string, value string) { - c.paramsLock.Lock() c.params.Add(key, value) - c.paramsLock.Unlock() } -// SetParam ... +// SetParam will put the key-value pair into url +// it's not thread safe. +// think twice before you want to use this method +// usually it should only be invoked when you want to initialized an url func (c *URL) SetParam(key string, value string) { - c.paramsLock.Lock() c.params.Set(key, value) - c.paramsLock.Unlock() } -// RangeParams ... +// RangeParams will iterate the params +// it's not thread-safe func (c *URL) RangeParams(f func(key, value string) bool) { - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() for k, v := range c.params { if !f(k, v[0]) { break @@ -407,10 +407,8 @@ func (c *URL) RangeParams(f func(key, value string) bool) { } } -// GetParam ... +// GetParam gets value by key func (c URL) GetParam(s string, d string) string { - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() r := c.params.Get(s) if len(r) == 0 { r = d @@ -418,21 +416,19 @@ func (c URL) GetParam(s string, d string) string { return r } -// GetParams ... +// GetParams gets values func (c URL) GetParams() url.Values { return c.params } -// GetParamAndDecoded ... +// GetParamAndDecoded gets values and decode func (c URL) GetParamAndDecoded(key string) (string, error) { - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() ruleDec, err := base64.URLEncoding.DecodeString(c.GetParam(key, "")) value := string(ruleDec) return value, err } -// GetRawParam ... +// GetRawParam gets raw param func (c URL) GetRawParam(key string) string { switch key { case "protocol": @@ -452,25 +448,25 @@ func (c URL) GetRawParam(key string) string { } } -// GetParamBool ... -func (c URL) GetParamBool(s string, d bool) bool { - r, err := strconv.ParseBool(c.GetParam(s, "")) +// GetParamBool judge whether @key exists or not +func (c URL) GetParamBool(key string, d bool) bool { + r, err := strconv.ParseBool(c.GetParam(key, "")) if err != nil { return d } return r } -// GetParamInt ... -func (c URL) GetParamInt(s string, d int64) int64 { - r, err := strconv.Atoi(c.GetParam(s, "")) +// GetParamInt gets int value by @key +func (c URL) GetParamInt(key string, d int64) int64 { + r, err := strconv.Atoi(c.GetParam(key, "")) if r == 0 || err != nil { return d } return int64(r) } -// GetMethodParamInt ... +// GetMethodParamInt gets int method param func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { r, err := strconv.Atoi(c.GetParam("methods."+method+"."+key, "")) if r == 0 || err != nil { @@ -479,7 +475,7 @@ func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { return int64(r) } -// GetMethodParamInt64 ... +// GetMethodParamInt64 gets int64 method param func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 { r := c.GetMethodParamInt(method, key, math.MinInt64) if r == math.MinInt64 { @@ -488,7 +484,7 @@ func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 { return r } -// GetMethodParam ... +// GetMethodParam gets method param func (c URL) GetMethodParam(method string, key string, d string) string { r := c.GetParam("methods."+method+"."+key, "") if r == "" { @@ -497,23 +493,16 @@ func (c URL) GetMethodParam(method string, key string, d string) string { return r } -// GetMethodParamBool ... +// GetMethodParamBool judge whether @method param exists or not func (c URL) GetMethodParamBool(method string, key string, d bool) bool { r := c.GetParamBool("methods."+method+"."+key, d) return r } -// RemoveParams ... -func (c *URL) RemoveParams(set *gxset.HashSet) { - c.paramsLock.Lock() - defer c.paramsLock.Unlock() - for k := range set.Items { - s := k.(string) - delete(c.params, s) - } -} - -// SetParams ... +// SetParams will put all key-value pair into url. +// 1. if there already has same key, the value will be override +// 2. it's not thread safe +// 3. think twice when you want to invoke this method func (c *URL) SetParams(m url.Values) { for k := range m { c.SetParam(k, m.Get(k)) @@ -562,29 +551,35 @@ func (c URL) ToMap() map[string]string { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. -//TODO configuration merge, in the future , the configuration center's config should merge too. - -// MergeUrl ... +// TODO configuration merge, in the future , the configuration center's config should merge too. + +// MergeUrl will merge those two url +// the result is based on serviceUrl, and the key which si only contained in referenceUrl +// will be added into result. +// for example, if serviceUrl contains params (a1->v1, b1->v2) and referenceUrl contains params(a2->v3, b1 -> v4) +// the params of result will be (a1->v1, b1->v2, a2->v3). +// You should notice that the value of b1 is v2, not v4. +// due to URL is not thread-safe, so this method is not thread-safe func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { mergedUrl := serviceUrl.Clone() - //iterator the referenceUrl if serviceUrl not have the key ,merge in + // iterator the referenceUrl if serviceUrl not have the key ,merge in referenceUrl.RangeParams(func(key, value string) bool { if v := mergedUrl.GetParam(key, ""); len(v) == 0 { mergedUrl.SetParam(key, value) } return true }) - //loadBalance,cluster,retries strategy config + // loadBalance,cluster,retries strategy config methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY}) - //remote timestamp + // remote timestamp if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 { mergedUrl.SetParam(constant.REMOTE_TIMESTAMP_KEY, v) mergedUrl.SetParam(constant.TIMESTAMP_KEY, referenceUrl.GetParam(constant.TIMESTAMP_KEY, "")) } - //finally execute methodConfigMergeFcn + // finally execute methodConfigMergeFcn for _, method := range referenceUrl.Methods { for _, fcn := range methodConfigMergeFcn { fcn("methods." + method) @@ -594,7 +589,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { return mergedUrl } -// Clone ... +// Clone will copy the url func (c *URL) Clone() *URL { newUrl := &URL{} copier.Copy(newUrl, c) @@ -606,7 +601,20 @@ func (c *URL) Clone() *URL { return newUrl } -// Copy url based on the reserved parameters' keys. +func (c *URL) CloneExceptParams(excludeParams *gxset.HashSet) *URL { + newUrl := &URL{} + copier.Copy(newUrl, c) + newUrl.params = url.Values{} + c.RangeParams(func(key, value string) bool { + if !excludeParams.Contains(key) { + newUrl.SetParam(key, value) + } + return true + }) + return newUrl +} + +// Copy url based on the reserved parameter's keys. func (c *URL) CloneWithParams(reserveParams []string) *URL { params := url.Values{} for _, reserveParam := range reserveParams { diff --git a/common/yaml/yaml.go b/common/yaml/yaml.go index 93ebb166144510236aff27a67422a6377ccb5c9f..5edda1b3c7751e8171528d121148b6c3c60fe128 100644 --- a/common/yaml/yaml.go +++ b/common/yaml/yaml.go @@ -40,7 +40,7 @@ func LoadYMLConfig(confProFile string) ([]byte, error) { return ioutil.ReadFile(confProFile) } -// unmarshalYMLConfig Load yml config byte from file , then unmarshal to object +// unmarshalYMLConfig Load yml config byte from file, then unmarshal to object func UnmarshalYMLConfig(confProFile string, out interface{}) ([]byte, error) { confFileStream, err := LoadYMLConfig(confProFile) if err != nil { diff --git a/config/base_config_test.go b/config/base_config_test.go index d16b2420922ece60ef2135729cd47d5aa73a3760..bc422d018946017a2c50dccefe54357d786a7589 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -21,9 +21,11 @@ import ( "reflect" "testing" ) + import ( "github.com/stretchr/testify/assert" ) + import ( "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/extension" @@ -481,7 +483,6 @@ func Test_refreshProvider(t *testing.T) { } func Test_startConfigCenter(t *testing.T) { - extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory { return &config_center.MockDynamicConfigurationFactory{} }) @@ -499,21 +500,21 @@ func Test_startConfigCenter(t *testing.T) { } func Test_initializeStruct(t *testing.T) { - consumerConfig := &ConsumerConfig{} + testConsumerConfig := &ConsumerConfig{} tp := reflect.TypeOf(ConsumerConfig{}) v := reflect.New(tp) initializeStruct(tp, v.Elem()) - fmt.Println(reflect.ValueOf(consumerConfig).Elem().Type().String()) + fmt.Println(reflect.ValueOf(testConsumerConfig).Elem().Type().String()) fmt.Println(v.Elem().Type().String()) - reflect.ValueOf(consumerConfig).Elem().Set(v.Elem()) + reflect.ValueOf(testConsumerConfig).Elem().Set(v.Elem()) assert.Condition(t, func() (success bool) { - return consumerConfig.ApplicationConfig != nil + return testConsumerConfig.ApplicationConfig != nil }) assert.Condition(t, func() (success bool) { - return consumerConfig.Registries != nil + return testConsumerConfig.Registries != nil }) assert.Condition(t, func() (success bool) { - return consumerConfig.References != nil + return testConsumerConfig.References != nil }) } diff --git a/config/consumer_config.go b/config/consumer_config.go index debcd79fa281c40e5526f60f5c5cdb66688688f4..1b563054ec15a3abdab45981bc58a452d0fea244 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -129,7 +129,7 @@ func configCenterRefreshConsumer() error { var err error if consumerConfig.ConfigCenterConfig != nil { consumerConfig.SetFatherConfig(consumerConfig) - if err := consumerConfig.startConfigCenter(); err != nil { + if err = consumerConfig.startConfigCenter(); err != nil { return perrors.Errorf("start config center error , error message is {%v}", perrors.WithStack(err)) } consumerConfig.fresh() @@ -144,6 +144,5 @@ func configCenterRefreshConsumer() error { return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout) } } - return nil } diff --git a/config/instance/metedata_report.go b/config/instance/metadata_report.go similarity index 100% rename from config/instance/metedata_report.go rename to config/instance/metadata_report.go diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 41fb6b4769e59784d8d18c3f82b956fd029d4ff7..2b926c0b94d2eeb421a33c455dc3dbe02f167acd 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -103,7 +103,7 @@ func startMetadataReport(metadataType string, metadataReportConfig *MetadataRepo if url, err := metadataReportConfig.ToUrl(); err == nil { instance.GetMetadataReportInstance(url) } else { - return perrors.New("MetadataConfig is invalid!") + return perrors.Wrap(err, "Start MetadataReport failed.") } return nil diff --git a/config/service_config.go b/config/service_config.go index 45e7df6306fc016f014497868eb45ec3be768a11..09308d032e73ce5addb01661cef312948b759d7a 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -154,9 +154,9 @@ func (c *ServiceConfig) Export() error { // registry the service reflect methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService) if err != nil { - err := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", c.InterfaceName, proto.Name, err.Error()) - logger.Errorf(err.Error()) - return err + formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", c.InterfaceName, proto.Name, err.Error()) + logger.Errorf(formatErr.Error()) + return formatErr } port := proto.Port diff --git a/config_center/apollo/listener.go b/config_center/apollo/listener.go index fb257a4828aed077f61568685ee7823e9c215cf9..1cf65ed22ba0a1f765af66191ed19a04f81b0fe6 100644 --- a/config_center/apollo/listener.go +++ b/config_center/apollo/listener.go @@ -29,7 +29,7 @@ type apolloListener struct { listeners map[config_center.ConfigurationListener]struct{} } -// NewApolloListener ... +// NewApolloListener creates a new apolloListener func NewApolloListener() *apolloListener { return &apolloListener{ listeners: make(map[config_center.ConfigurationListener]struct{}, 0), @@ -49,7 +49,7 @@ func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { } } -// AddListener ... +// AddListener adds a listener for apollo func (a *apolloListener) AddListener(l config_center.ConfigurationListener) { if _, ok := a.listeners[l]; !ok { a.listeners[l] = struct{}{} @@ -57,7 +57,7 @@ func (a *apolloListener) AddListener(l config_center.ConfigurationListener) { } } -// RemoveListener ... +// RemoveListener removes listeners of apollo func (a *apolloListener) RemoveListener(l config_center.ConfigurationListener) { delete(a.listeners, l) } diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go index e70e4f68075c51c33f1110ef44a7b703e36fb78d..541cc09286bb83fa5b66db3745e45ad0a9df5e2f 100644 --- a/config_center/configuration_listener.go +++ b/config_center/configuration_listener.go @@ -25,12 +25,12 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// ConfigurationListener ... +// ConfigurationListener for changing listener's event type ConfigurationListener interface { Process(*ConfigChangeEvent) } -// ConfigChangeEvent ... +// ConfigChangeEvent for changing listener's event type ConfigChangeEvent struct { Key string Value interface{} diff --git a/config_center/configurator.go b/config_center/configurator.go index ffa9034e05c4c3d4cc254886e2ed19576f155dec..9db4804365689d8eb357965973a2916e86383cf8 100644 --- a/config_center/configurator.go +++ b/config_center/configurator.go @@ -21,7 +21,7 @@ import ( "github.com/apache/dubbo-go/common" ) -// Configurator ... +// Configurator supports GetUrl and constructor type Configurator interface { GetUrl() *common.URL Configure(url *common.URL) diff --git a/config_center/configurator/mock.go b/config_center/configurator/mock.go index d294b9195db9cfe60056bc29ec26816f740ea396..7ec7179634cfd967cd27e85ed248e2075c387cb5 100644 --- a/config_center/configurator/mock.go +++ b/config_center/configurator/mock.go @@ -23,7 +23,7 @@ import ( "github.com/apache/dubbo-go/config_center" ) -// NewMockConfigurator ... +// NewMockConfigurator creates a new mockConfigurator func NewMockConfigurator(url *common.URL) config_center.Configurator { return &mockConfigurator{configuratorUrl: url} } @@ -32,12 +32,12 @@ type mockConfigurator struct { configuratorUrl *common.URL } -// GetUrl ... +// GetUrl gets a configuratorUrl func (c *mockConfigurator) GetUrl() *common.URL { return c.configuratorUrl } -// Configure ... +// Configure sets up param CLUSTER_KEY and cluster for url func (c *mockConfigurator) Configure(url *common.URL) { if cluster := c.GetUrl().GetParam(constant.CLUSTER_KEY, ""); cluster != "" { url.SetParam(constant.CLUSTER_KEY, cluster) diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index 18415bee3a28b37ffc2f3f73cc7309b685de5408..ebd3dc601b2821f3f4e1e4405720e4ebc55b607e 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -50,12 +50,12 @@ func (c *overrideConfigurator) GetUrl() *common.URL { } func (c *overrideConfigurator) Configure(url *common.URL) { - //remove configuratorUrl some param that can not be configured + // remove configuratorUrl some param that can not be configured if c.configuratorUrl.GetParam(constant.ENABLED_KEY, "true") == "false" || len(c.configuratorUrl.Location) == 0 { return } - //branch for version 2.7.x + // branch for version 2.7.x apiVersion := c.configuratorUrl.GetParam(constant.CONFIG_VERSION_KEY, "") if len(apiVersion) != 0 { currentSide := url.GetParam(constant.SIDE_KEY, "") @@ -67,12 +67,12 @@ func (c *overrideConfigurator) Configure(url *common.URL) { c.configureIfMatch(url.Ip, url) } } else { - //branch for version 2.6.x and less + // branch for version 2.6.x and less c.configureDeprecated(url) } } -//translate from java, compatible rules in java +// configureIfMatch translate from java, compatible rules in java func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { if constant.ANYHOST_VALUE == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip { providers := c.configuratorUrl.GetParam(constant.OVERRIDE_PROVIDERS_KEY, "") @@ -105,8 +105,7 @@ func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { if returnUrl { return } - configUrl := c.configuratorUrl.Clone() - configUrl.RemoveParams(conditionKeys) + configUrl := c.configuratorUrl.CloneExceptParams(conditionKeys) url.SetParams(configUrl.GetParams()) } } diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 9013d7140e757520f2e8f048ce53a5ac2a13f982..540febc9d38e164afcc62538478df140b7d671c7 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -40,7 +40,7 @@ const ( DEFAULT_CONFIG_TIMEOUT = "10s" ) -// DynamicConfiguration ... +// DynamicConfiguration for modify listener and get properties file type DynamicConfiguration interface { Parser() parser.ConfigurationParser SetParser(parser.ConfigurationParser) @@ -71,14 +71,14 @@ type Options struct { // Option ... type Option func(*Options) -// WithGroup ... +// WithGroup assigns group to opt.Group func WithGroup(group string) Option { return func(opt *Options) { opt.Group = group } } -// WithTimeout ... +// WithTimeout assigns time to opt.Timeout func WithTimeout(time time.Duration) Option { return func(opt *Options) { opt.Timeout = time diff --git a/config_center/dynamic_configuration_factory.go b/config_center/dynamic_configuration_factory.go index 9f9b13227f6623a02b0261c46d8d1e43624005f8..46faf864443b7f8780584213b758f26395224956 100644 --- a/config_center/dynamic_configuration_factory.go +++ b/config_center/dynamic_configuration_factory.go @@ -21,7 +21,7 @@ import ( "github.com/apache/dubbo-go/common" ) -// DynamicConfigurationFactory ... +// DynamicConfigurationFactory gets the DynamicConfiguration type DynamicConfigurationFactory interface { GetDynamicConfiguration(*common.URL) (DynamicConfiguration, error) } diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 59c788b65bce2a4773975ea1a96a314649781832..9cfb9e6078be60fbe2072e8e293143e8b111df58 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -43,7 +43,7 @@ var ( dynamicConfiguration *MockDynamicConfiguration ) -// GetDynamicConfiguration ... +// GetDynamicConfiguration returns a DynamicConfiguration func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(_ *common.URL) (DynamicConfiguration, error) { var err error once.Do(func() { @@ -99,16 +99,16 @@ type MockDynamicConfiguration struct { listener map[string]ConfigurationListener } -// AddListener ... +// AddListener adds a listener for MockDynamicConfiguration func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, _ ...Option) { c.listener[key] = listener } -// RemoveListener ... +// RemoveListener removes the listener for MockDynamicConfiguration func (c *MockDynamicConfiguration) RemoveListener(_ string, _ ConfigurationListener, _ ...Option) { } -// GetConfig ... +// GetConfig returns content of MockDynamicConfiguration func (c *MockDynamicConfiguration) GetConfig(_ string, _ ...Option) (string, error) { return c.content, nil @@ -119,17 +119,17 @@ func (c *MockDynamicConfiguration) GetConfigs(key string, opts ...Option) (strin return c.GetConfig(key, opts...) } -// Parser ... +// Parser returns a parser of MockDynamicConfiguration func (c *MockDynamicConfiguration) Parser() parser.ConfigurationParser { return c.parser } -// SetParser ... +// SetParser sets parser of MockDynamicConfiguration func (c *MockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } -// GetProperties ... +// GetProperties gets content of MockDynamicConfiguration func (c *MockDynamicConfiguration) GetProperties(_ string, _ ...Option) (string, error) { return c.content, nil } @@ -139,7 +139,7 @@ func (c *MockDynamicConfiguration) GetInternalProperty(key string, opts ...Optio return c.GetProperties(key, opts...) } -// GetRule ... +// GetRule gets properties of MockDynamicConfiguration func (c *MockDynamicConfiguration) GetRule(key string, opts ...Option) (string, error) { return c.GetProperties(key, opts...) } diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go index f33b4ba866da69e1d23b493f42152bbb0f437878..6fbdc27d4339150bfec624f7dc5ea0f6a608d7a7 100644 --- a/config_center/parser/configuration_parser.go +++ b/config_center/parser/configuration_parser.go @@ -47,7 +47,7 @@ type ConfigurationParser interface { ParseToUrls(content string) ([]*common.URL, error) } -// DefaultConfigurationParser for support properties file in config center +// DefaultConfigurationParser for supporting properties file in config center type DefaultConfigurationParser struct{} // ConfiguratorConfig ... @@ -71,7 +71,7 @@ type ConfigItem struct { Side string `yaml:"side"` } -// Parse ... +// Parse load content func (parser *DefaultConfigurationParser) Parse(content string) (map[string]string, error) { pps, err := properties.LoadString(content) if err != nil { diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index 122dfaf4f268a706151de6acdaa78bb46e59f8fb..747c4be352add3f549eaf02e83da6442a8a84c6a 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -33,12 +33,12 @@ type CacheListener struct { rootPath string } -// NewCacheListener ... +// NewCacheListener creates a new CacheListener func NewCacheListener(rootPath string) *CacheListener { return &CacheListener{rootPath: rootPath} } -// AddListener ... +// AddListener will add a listener if loaded func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure @@ -50,7 +50,7 @@ func (l *CacheListener) AddListener(key string, listener config_center.Configura } } -// RemoveListener ... +// RemoveListener will delete a listener if loaded func (l *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) { listeners, loaded := l.keyListeners.Load(key) if loaded { @@ -58,7 +58,7 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config } } -// DataChange ... +// DataChange changes all listeners' event func (l *CacheListener) DataChange(event remoting.Event) bool { if event.Content == "" { //meanings new node diff --git a/filter/filter_impl/hystrix_filter.go b/filter/filter_impl/hystrix_filter.go index 9fd97b57b677c9aa8ec492151df9aace6dc78b62..4c872bed3e7fef8eca47f51422525a4918d6c1d8 100644 --- a/filter/filter_impl/hystrix_filter.go +++ b/filter/filter_impl/hystrix_filter.go @@ -124,7 +124,7 @@ func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i _, _, err := hystrix.GetCircuit(cmdName) configLoadMutex.RUnlock() if err != nil { - logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: ", cmdName, err) + logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: %+v", cmdName, err) return invoker.Invoke(ctx, invocation) } logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName) diff --git a/filter/filter_impl/token_filter_test.go b/filter/filter_impl/token_filter_test.go index d7a7f20a5d7648f6ee18fd26f041acb313dd97fe..b8b297e67267640a1c294541afdd4e062bfebb25 100644 --- a/filter/filter_impl/token_filter_test.go +++ b/filter/filter_impl/token_filter_test.go @@ -53,10 +53,10 @@ func TestTokenFilter_Invoke(t *testing.T) { func TestTokenFilter_InvokeEmptyToken(t *testing.T) { filter := GetTokenFilter() - url := common.URL{} + testUrl := common.URL{} attch := make(map[string]string, 0) attch[constant.TOKEN_KEY] = "ori_key" - result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } @@ -64,23 +64,23 @@ func TestTokenFilter_InvokeEmptyToken(t *testing.T) { func TestTokenFilter_InvokeEmptyAttach(t *testing.T) { filter := GetTokenFilter() - url := common.NewURLWithOptions( + testUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) attch := make(map[string]string, 0) - result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(*url), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(*testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.NotNil(t, result.Error()) } func TestTokenFilter_InvokeNotEqual(t *testing.T) { filter := GetTokenFilter() - url := common.NewURLWithOptions( + testUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) attch := make(map[string]string, 0) attch[constant.TOKEN_KEY] = "err_key" result := filter.Invoke(context.Background(), - protocol.NewBaseInvoker(*url), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + protocol.NewBaseInvoker(*testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.NotNil(t, result.Error()) } diff --git a/integrate_test.sh b/integrate_test.sh new file mode 100644 index 0000000000000000000000000000000000000000..c9c2f23b5b07f0baf96260d8092e7464d4d15659 --- /dev/null +++ b/integrate_test.sh @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +set -e +set -x + +echo 'start integrate-test' + +# set root workspace +ROOT_DIR=$(pwd) +echo "integrate-test root work-space -> ${ROOT_DIR}" + +# show all travis-env +echo "travis current commit id -> ${TRAVIS_COMMIT}" +echo "travis pull request -> ${TRAVIS_PULL_REQUEST}" +echo "travis pull request branch -> ${TRAVIS_PULL_REQUEST_BRANCH}" +echo "travis pull request slug -> ${TRAVIS_PULL_REQUEST_SLUG}" +echo "travis pull request sha -> ${TRAVIS_PULL_REQUEST_SHA}" +echo "travis pull request repo slug -> ${TRAVIS_REPO_SLUG}" + + +# #start etcd registry insecure listen in [:]:2379 +# docker run -d --network host k8s.gcr.io/etcd:3.3.10 etcd +# echo "etcdv3 listen in [:]2379" + +# #start consul registry insecure listen in [:]:8500 +# docker run -d --network host consul +# echo "consul listen in [:]8500" + +# #start nacos registry insecure listen in [:]:8848 +# docker run -d --network host nacos/nacos-server:latest +# echo "ncacos listen in [:]8848" + +# default use zk as registry +#start zookeeper registry insecure listen in [:]:2181 +docker run -d --network host zookeeper +echo "zookeeper listen in [:]2181" + +# build go-server image +cd ./test/integrate/dubbo/go-server +docker build . -t ci-provider --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_SLUG} --build-arg PR_ORIGIN_COMMITID=${TRAVIS_PULL_REQUEST_SHA} +cd ${ROOT_DIR} +docker run -d --network host ci-provider + +# build go-client image +cd ./test/integrate/dubbo/go-client +docker build . -t ci-consumer --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_SLUG} --build-arg PR_ORIGIN_COMMITID=${TRAVIS_PULL_REQUEST_SHA} +cd ${ROOT_DIR} +# run provider +# check consumer status +docker run -it --network host ci-consumer diff --git a/metadata/report.go b/metadata/report.go index 3fcc71241411d4a8f9577bb5fb3233e67942cd52..f2380f50cd0eb15182c137f02e5f78b4ba8e4fd2 100644 --- a/metadata/report.go +++ b/metadata/report.go @@ -25,7 +25,7 @@ import ( type MetadataReport interface { StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) - StoreConsumeretadata(*identifier.MetadataIdentifier, map[string]string) + StoreConsumerMetadata(*identifier.MetadataIdentifier, map[string]string) SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, *common.URL) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index 1636b14da2fe5ab714853aa662eaa774ddbc1791..bd1e7986ca709a4e10dfcad04d2380931308d568 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -68,9 +68,8 @@ func init() { extension.SetMetricReporter(reporterName, newPrometheusReporter) } -// PrometheusReporter -// it will collect the data for Prometheus -// if you want to use this, you should initialize your prometheus. +// PrometheusReporter will collect the data for Prometheus +// if you want to use this feature, you need to initialize your prometheus. // https://prometheus.io/docs/guides/go-application/ type PrometheusReporter struct { @@ -85,7 +84,7 @@ type PrometheusReporter struct { consumerHistogramVec *prometheus.HistogramVec } -// Report report the duration to Prometheus +// Report reports the duration to Prometheus // the role in url must be consumer or provider // or it will be ignored func (reporter *PrometheusReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { @@ -99,7 +98,7 @@ func (reporter *PrometheusReporter) Report(ctx context.Context, invoker protocol sumVec = reporter.consumerSummaryVec hisVec = reporter.consumerHistogramVec } else { - logger.Warnf("The url is not the consumer's or provider's, "+ + logger.Warnf("The url belongs neither the consumer nor the provider, "+ "so the invocation will be ignored. url: %s", url.String()) return } diff --git a/metrics/reporter.go b/metrics/reporter.go index 85ef1dcdf0dad275edecc1f3a85502c1493c1395..9a7094fa62d9c0fa3e6ee0a8ef373f91c28d2c90 100644 --- a/metrics/reporter.go +++ b/metrics/reporter.go @@ -29,7 +29,7 @@ const ( NameSpace = "dubbo" ) -// it will be use to report the invocation's duration +// Reporter will be used to report the invocation's duration type Reporter interface { // report the duration of an invocation Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 1f4cc0068efb5688b545fa35b784b4fb2e923dc7..f57d89d1a716d2a6056e0e4a581926dc237934e4 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -143,7 +143,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { // OnCron ... func (h *RpcClientHandler) OnCron(session getty.Session) { - rpcSession, err := h.conn.getClientRpcSession(session) + clientRpcSession, err := h.conn.getClientRpcSession(session) if err != nil { logger.Errorf("client.getClientSession(session{%s}) = error{%v}", session.Stat(), perrors.WithStack(err)) @@ -151,7 +151,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { } if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() { logger.Warnf("session{%s} timeout{%s}, reqNum{%d}", - session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum) + session.Stat(), time.Since(session.GetActive()).String(), clientRpcSession.reqNum) h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn) return } diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index 918514c2676cfc69336a9f53e6d16d7f23cf7dca..f0bd09ba7c3392dd1dbe10306c7c70cc0eab8ccb 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -219,25 +219,25 @@ func (c *gettyRPCClient) updateSession(session getty.Session) { func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, error) { var ( - err error - rpcSession rpcSession + err error + rpcClientSession rpcSession ) c.lock.RLock() defer c.lock.RUnlock() if c.sessions == nil { - return rpcSession, errClientClosed + return rpcClientSession, errClientClosed } err = errSessionNotExist for _, s := range c.sessions { if s.session == session { - rpcSession = *s + rpcClientSession = *s err = nil break } } - return rpcSession, perrors.WithStack(err) + return rpcClientSession, perrors.WithStack(err) } func (c *gettyRPCClient) isAvailable() bool { @@ -319,7 +319,8 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC conn, err := p.get() if err == nil && conn == nil { // create new conn - rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr) + var rpcClientConn *gettyRPCClient + rpcClientConn, err = newGettyRPCClientConn(p, protocol, addr) return rpcClientConn, perrors.WithStack(err) } return conn, perrors.WithStack(err) diff --git a/protocol/invocation.go b/protocol/invocation.go index f32f2c3449ac063ecb89952bd4653312a07a3df4..eedf5f0253c2b76a3e0e1b52a00124d648351cfc 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -30,5 +30,8 @@ type Invocation interface { Reply() interface{} Attachments() map[string]string AttachmentsByKey(string, string) string + // Refer to dubbo 2.7.6. It is different from attachment. It is used in internal process. + Attributes() map[string]interface{} + AttributeByKey(string, interface{}) interface{} Invoker() Invoker } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index b207fd0b0cc4eb7de8409a8c46c6fc9ef0baa5c7..68fe7b92042e6b4cf4a253c9ce354184f79af558 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -40,8 +40,10 @@ type RPCInvocation struct { reply interface{} callBack interface{} attachments map[string]string - invoker protocol.Invoker - lock sync.RWMutex + // Refer to dubbo 2.7.6. It is different from attachment. It is used in internal process. + attributes map[string]interface{} + invoker protocol.Invoker + lock sync.RWMutex } // NewRPCInvocation ... @@ -50,6 +52,7 @@ func NewRPCInvocation(methodName string, arguments []interface{}, attachments ma methodName: methodName, arguments: arguments, attachments: attachments, + attributes: make(map[string]interface{}, 8), } } @@ -59,6 +62,9 @@ func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation { for _, opt := range opts { opt(invo) } + if invo.attributes == nil { + invo.attributes = make(map[string]interface{}) + } return invo } @@ -111,6 +117,22 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string return defaultValue } +// get attributes +func (r *RPCInvocation) Attributes() map[string]interface{} { + return r.attributes +} + +// get attribute by key. If it is not exist, it will return default value +func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) interface{} { + r.lock.RLock() + defer r.lock.RUnlock() + value, ok := r.attributes[key] + if ok { + return value + } + return defaultValue +} + // SetAttachments ... func (r *RPCInvocation) SetAttachments(key string, value string) { r.lock.Lock() @@ -121,6 +143,13 @@ func (r *RPCInvocation) SetAttachments(key string, value string) { r.attachments[key] = value } +// SetAttribute. If Attributes is nil, it will be inited. +func (r *RPCInvocation) SetAttribute(key string, value interface{}) { + r.lock.Lock() + defer r.lock.Unlock() + r.attributes[key] = value +} + // Invoker ... func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index ba7197dbc857c2ed7acda1a9f246a5b826e86915..70b3abd24f9451b4fa81d02eb9390823e6714470 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -172,7 +172,7 @@ func (c *HTTPClient) Do(addr, path string, httpHeader http.Header, body []byte) httpReq.Close = true reqBuf := bytes.NewBuffer(make([]byte, 0)) - if err := httpReq.Write(reqBuf); err != nil { + if err = httpReq.Write(reqBuf); err != nil { return nil, perrors.WithStack(err) } @@ -191,7 +191,7 @@ func (c *HTTPClient) Do(addr, path string, httpHeader http.Header, body []byte) } setNetConnTimeout(tcpConn, c.options.HTTPTimeout) - if _, err := reqBuf.WriteTo(tcpConn); err != nil { + if _, err = reqBuf.WriteTo(tcpConn); err != nil { return nil, perrors.WithStack(err) } diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index d1c2a858b4e4223ac32fc1160b56f6ee1862c8ce..3176e193816bf95882539374672eeed7f9cddc44 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -67,8 +67,8 @@ type Error struct { func (e *Error) Error() string { buf, err := json.Marshal(e) if err != nil { - msg, err := json.Marshal(err.Error()) - if err != nil { + msg, retryErr := json.Marshal(err.Error()) + if retryErr != nil { msg = []byte("jsonrpc2.Error: json.Marshal failed") } return fmt.Sprintf(`{"code":%d,"message":%s}`, -32001, string(msg)) @@ -133,7 +133,7 @@ func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) { } case reflect.Array, reflect.Struct: case reflect.Ptr: - switch k := reflect.TypeOf(param).Elem().Kind(); k { + switch ptrK := reflect.TypeOf(param).Elem().Kind(); ptrK { case reflect.Map: if reflect.TypeOf(param).Elem().Key().Kind() == reflect.String { if reflect.ValueOf(param).Elem().IsNil() { @@ -146,7 +146,7 @@ func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) { } case reflect.Array, reflect.Struct: default: - return nil, perrors.New("unsupported param type: Ptr to " + k.String()) + return nil, perrors.New("unsupported param type: Ptr to " + ptrK.String()) } default: return nil, perrors.New("unsupported param type: " + k.String()) diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index bed7099ab60a6c05c3799f993c0bb348a4b00f02..64f708652d8cb4bf2a6d53488c9fe17e64f10ad0 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -109,8 +109,8 @@ func (jp *JsonrpcProtocol) Destroy() { func (jp *JsonrpcProtocol) openServer(url common.URL) { _, ok := jp.serverMap[url.Location] if !ok { - _, ok := jp.ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) - if !ok { + _, loadOk := jp.ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + if !loadOk { panic("[JsonrpcProtocol]" + url.Key() + "is not existing") } diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 8600f02dad3d32d797613823de0bbe40261d2e71..fcea66632e787083823c1d06ca6c1698c45d5b23 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -349,9 +349,9 @@ func serveRequest(ctx context.Context, constant.PATH_KEY: path, constant.VERSION_KEY: codec.req.Version})) if err := result.Error(); err != nil { - rspStream, err := codec.Write(err.Error(), invalidRequest) - if err != nil { - return perrors.WithStack(err) + rspStream, codecErr := codec.Write(err.Error(), invalidRequest) + if codecErr != nil { + return perrors.WithStack(codecErr) } if errRsp := sendErrorResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s", diff --git a/registry/consul/listener.go b/registry/consul/listener.go index b047a4c08c9f6c809ed3dca8bd0d06ceaa576cae..5fac9ec0f9b6c08620021de9d0b92e3b94773c12 100644 --- a/registry/consul/listener.go +++ b/registry/consul/listener.go @@ -142,7 +142,6 @@ func (l *consulListener) run() { func (l *consulListener) handler(idx uint64, raw interface{}) { var ( service *consul.ServiceEntry - event *registry.ServiceEvent url common.URL ok bool err error @@ -183,7 +182,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) { } l.urls = newUrls - for _, event = range events { + for _, event := range events { l.eventCh <- event } } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index bf2d0e2dc2d5bc2022c9fd418b7425d19dca5744..49b0027f433861c76cc8d827f85091f47dc4217f 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -153,9 +153,11 @@ func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { } func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { - newInvokersList := []protocol.Invoker{} + var ( + err error + newInvokersList []protocol.Invoker + ) groupInvokersMap := make(map[string][]protocol.Invoker) - groupInvokersList := []protocol.Invoker{} dir.cacheInvokersMap.Range(func(key, value interface{}) bool { newInvokersList = append(newInvokersList, value.(protocol.Invoker)) @@ -171,6 +173,7 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { groupInvokersMap[group] = []protocol.Invoker{invoker} } } + groupInvokersList := make([]protocol.Invoker, 0, len(groupInvokersMap)) if len(groupInvokersMap) == 1 { //len is 1 it means no group setting ,so do not need cluster again for _, invokers := range groupInvokersMap { @@ -179,9 +182,13 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { } else { for _, invokers := range groupInvokersMap { staticDir := directory.NewStaticDirectory(invokers) - cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) - staticDir.BuildRouterChain(invokers) - groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) + cst := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) + err = staticDir.BuildRouterChain(invokers) + if err != nil { + logger.Error(err) + continue + } + groupInvokersList = append(groupInvokersList, cst.Join(staticDir)) } } diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index f1d5ce434aa00185f784f208eefe603274f05ab0..ac3f7124c12788959f529193b871652085fe6303 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -25,6 +25,7 @@ import ( ) import ( + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -37,6 +38,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/remoting" @@ -169,7 +171,21 @@ Loop1: break Loop1 } } +} +func Test_toGroupInvokers(t *testing.T) { + registryDirectory, _ := normalRegistryDir() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + invoker := mock.NewMockInvoker(ctrl) + newUrl, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + invoker.EXPECT().GetUrl().Return(newUrl).AnyTimes() + + registryDirectory.cacheInvokersMap.Store("group1", invoker) + registryDirectory.cacheInvokersMap.Store("group2", invoker) + registryDirectory.cacheInvokersMap.Store("group1", invoker) + groupInvokers := registryDirectory.toGroupInvokers() + assert.Len(t, groupInvokers, 2) } func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) { diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go index c50b5b670a5491b9813652f7aa46bec18a35a7d7..1c9d8bdd5e0b713d61764163eff3b9fd3d5f320a 100644 --- a/registry/kubernetes/listener_test.go +++ b/registry/kubernetes/listener_test.go @@ -191,7 +191,6 @@ type KubernetesRegistryTestSuite struct { } func (s *KubernetesRegistryTestSuite) initRegistry() *kubernetesRegistry { - t := s.T() regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) @@ -204,7 +203,7 @@ func (s *KubernetesRegistryTestSuite) initRegistry() *kubernetesRegistry { out := fake.NewSimpleClientset() // mock current pod - if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil { + if _, err = out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil { t.Fatal(err) } return out, nil diff --git a/registry/kubernetes/registry_test.go b/registry/kubernetes/registry_test.go index ea6d7663a9ceeab241c7a94a91f94288ab2990fc..a650b189c39b94849dee4fbf7fc292e33e87b829 100644 --- a/registry/kubernetes/registry_test.go +++ b/registry/kubernetes/registry_test.go @@ -68,9 +68,9 @@ func (s *KubernetesRegistryTestSuite) TestSubscribe() { time.Sleep(1e9) go func() { - err := r.Register(url) - if err != nil { - t.Fatal(err) + registerErr := r.Register(url) + if registerErr != nil { + t.Fatal(registerErr) } }() diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go index 7475b455c0dda09da65012465711ece264bb3dd5..d0311b200b27081c60bc97b2307a54774ca977bd 100644 --- a/registry/nacos/registry_test.go +++ b/registry/nacos/registry_test.go @@ -42,7 +42,7 @@ func TestNacosRegistry_Register(t *testing.T) { urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") urlMap.Set(constant.VERSION_KEY, "1.0.0") urlMap.Set(constant.CLUSTER_KEY, "mock") - url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + testUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) reg, err := newNacosRegistry(®url) assert.Nil(t, err) @@ -50,7 +50,7 @@ func TestNacosRegistry_Register(t *testing.T) { t.Errorf("new nacos registry error:%s \n", err.Error()) return } - err = reg.Register(url) + err = reg.Register(testUrl) assert.Nil(t, err) if err != nil { t.Errorf("register error:%s \n", err.Error()) @@ -72,10 +72,10 @@ func TestNacosRegistry_Subscribe(t *testing.T) { urlMap.Set(constant.VERSION_KEY, "1.0.0") urlMap.Set(constant.CLUSTER_KEY, "mock") urlMap.Set(constant.NACOS_PATH_KEY, "") - url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + testUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) reg, _ := newNacosRegistry(®url) - err := reg.Register(url) + err := reg.Register(testUrl) assert.Nil(t, err) if err != nil { t.Errorf("new nacos registry error:%s \n", err.Error()) @@ -84,7 +84,7 @@ func TestNacosRegistry_Subscribe(t *testing.T) { regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) reg2, _ := newNacosRegistry(®url) - listener, err := reg2.(*nacosRegistry).subscribe(&url) + listener, err := reg2.(*nacosRegistry).subscribe(&testUrl) assert.Nil(t, err) if err != nil { t.Errorf("subscribe error:%s \n", err.Error()) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index aa8fbcbe7d6eca682892d4627878fe6bfc3756fe..a936db80bf2c3b46ba389142cc40686ed3df17b1 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -56,8 +56,8 @@ type registryProtocol struct { invokers []protocol.Invoker // Registry Map<RegistryAddress, Registry> registries *sync.Map - //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. - //providerurl <--> exporter + // To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. + // providerurl <--> exporter bounds *sync.Map overrideListeners *sync.Map serviceConfigurationListeners *sync.Map @@ -70,10 +70,8 @@ func init() { } func getCacheKey(url *common.URL) string { - newUrl := url.Clone() delKeys := gxset.NewSet("dynamic", "enabled") - newUrl.RemoveParams(delKeys) - return newUrl.String() + return url.CloneExceptParams(delKeys).String() } func newRegistryProtocol() *registryProtocol { @@ -103,16 +101,14 @@ func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common. // filterHideKey filter the parameters that do not need to be output in url(Starting with .) func filterHideKey(url *common.URL) *common.URL { - //be careful params maps in url is map type - cloneURL := url.Clone() + // be careful params maps in url is map type removeSet := gxset.NewSet() - for k, _ := range cloneURL.GetParams() { + for k, _ := range url.GetParams() { if strings.HasPrefix(k, ".") { removeSet.Add(k) } } - cloneURL.RemoveParams(removeSet) - return cloneURL + return url.CloneExceptParams(removeSet) } func (proto *registryProtocol) initConfigurationListeners() { @@ -138,7 +134,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { reg = regI.(registry.Registry) } - //new registry directory for store service url from registry + // new registry directory for store service url from registry directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", @@ -152,7 +148,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { serviceUrl.String(), registryUrl.String(), err.Error()) } - //new cluster invoker + // new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) invoker := cluster.Join(directory) @@ -217,7 +213,7 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common oldExporter.(protocol.Exporter).Unexport() proto.bounds.Delete(key) proto.Export(wrappedNewInvoker) - //TODO: unregister & unsubscribe + // TODO: unregister & unsubscribe } } @@ -300,7 +296,7 @@ func isMatched(providerUrl *common.URL, consumerUrl *common.URL) bool { providerGroup := providerUrl.GetParam(constant.GROUP_KEY, "") providerVersion := providerUrl.GetParam(constant.VERSION_KEY, "") providerClassifier := providerUrl.GetParam(constant.CLASSIFIER_KEY, "") - //todo: public static boolean isContains(String values, String value) { + // todo: public static boolean isContains(String values, String value) { // return isNotEmpty(values) && isContains(COMMA_SPLIT_PATTERN.split(values), value); // } return (consumerGroup == constant.ANY_VALUE || consumerGroup == providerGroup || @@ -353,9 +349,9 @@ func (proto *registryProtocol) Destroy() { } func getRegistryUrl(invoker protocol.Invoker) *common.URL { - //here add * for return a new url + // here add * for return a new url url := invoker.GetUrl() - //if the protocol == registry ,set protocol the registry value in url.params + // if the protocol == registry ,set protocol the registry value in url.params if url.Protocol == constant.REGISTRY_PROTOCOL { protocol := url.GetParam(constant.REGISTRY_KEY, "") url.Protocol = protocol @@ -365,7 +361,7 @@ func getRegistryUrl(invoker protocol.Invoker) *common.URL { func getProviderUrl(invoker protocol.Invoker) *common.URL { url := invoker.GetUrl() - //be careful params maps in url is map type + // be careful params maps in url is map type return url.SubURL.Clone() } diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index bd1da547766abb12dc742234787262212e3db314..92ea76046f002cbdf6dbe754453ef8ebb4a14de2 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -87,8 +87,6 @@ func StateToString(state zk.State) string { default: return state.String() } - - return "zookeeper unknown state" } // Options ... @@ -111,12 +109,10 @@ func WithZkName(name string) Option { // ValidateZookeeperClient ... func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { - var ( - err error - ) - opions := &Options{} + var err error + options := &Options{} for _, opt := range opts { - opt(opions) + opt(options) } connected := false err = nil @@ -129,17 +125,18 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { if container.ZkClient() == nil { //in dubbo ,every registry only connect one node ,so this is []string{r.Address} - timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + var timeout time.Duration + timeout, err = time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) if err != nil { logger.Errorf("timeout config %v is invalid ,err is %v", url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) } zkAddresses := strings.Split(url.Location, ",") - newClient, err := newZookeeperClient(opions.zkName, zkAddresses, timeout) + newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout) if err != nil { logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}", - opions.zkName, url.Location, timeout.String(), err) + options.zkName, url.Location, timeout.String(), err) return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) } container.SetZkClient(newClient) @@ -157,7 +154,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { } if connected { - logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location) + logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location) container.WaitGroup().Add(1) //zk client start successful, then registry wg +1 } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 84877667763ce870e76202844e9dc9dc1c3f008c..b3f6e29bf81d4cddef058940e8942219427ac400 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -18,7 +18,6 @@ package zookeeper import ( - "github.com/apache/dubbo-go/common" "path" "strings" "sync" @@ -32,6 +31,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/remoting" @@ -96,8 +96,6 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remo return false } } - - return false } func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) { diff --git a/test/integrate/dubbo/go-client/Dockerfile b/test/integrate/dubbo/go-client/Dockerfile index 1c683613f5cd4c4b92964360996d9e38a079f131..d48df36dc72d7e75f8c2c8c91d5acbb01e39757d 100644 --- a/test/integrate/dubbo/go-client/Dockerfile +++ b/test/integrate/dubbo/go-client/Dockerfile @@ -28,8 +28,8 @@ ARG PR_ORIGIN_COMMITID ADD . /go/src/github.com/apache/dubbo-go/test/integrate/dubbo/go-client # update dubbo-go to current commit id -RUN echo "github.com/apache/dubbo-go will be replace to github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID}" -RUN go mod edit -replace=github.com/apache/dubbo-go=github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID} +RUN test ${PR_ORIGIN_REPO} && echo "github.com/apache/dubbo-go will be replace to github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID}" || echo 'go get github.com/apache/dubbo-go@develop' +RUN test ${PR_ORIGIN_REPO} && go mod edit -replace=github.com/apache/dubbo-go=github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID} || go get -u github.com/apache/dubbo-go@develop RUN go install github.com/apache/dubbo-go/test/integrate/dubbo/go-client diff --git a/test/integrate/dubbo/go-server/Dockerfile b/test/integrate/dubbo/go-server/Dockerfile index 05596980c3e899dcdafc158d2c54af0252e58a63..c2f2d63462d94df7624ac100023e8b8c24e23e11 100644 --- a/test/integrate/dubbo/go-server/Dockerfile +++ b/test/integrate/dubbo/go-server/Dockerfile @@ -27,8 +27,8 @@ ARG PR_ORIGIN_COMMITID ADD . /go/src/github.com/apache/dubbo-go/test/integrate/dubbo/go-server # update dubbo-go to current commit id -RUN echo "github.com/apache/dubbo-go will be replace to github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID}" -RUN go mod edit -replace=github.com/apache/dubbo-go=github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID} +RUN test ${PR_ORIGIN_REPO} && echo "github.com/apache/dubbo-go will be replace to github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID}" || echo 'go get github.com/apache/dubbo-go@develop' +RUN test ${PR_ORIGIN_REPO} && go mod edit -replace=github.com/apache/dubbo-go=github.com/${PR_ORIGIN_REPO}@${PR_ORIGIN_COMMITID} || go get -u github.com/apache/dubbo-go@develop RUN go install github.com/apache/dubbo-go/test/integrate/dubbo/go-server