diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 75d9ef26567df0fbd83f5d9f94c8548d1e8e633d..a3e207f3022a4d882d1b90f2756b51ab4e0b2775 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -22,7 +22,6 @@ import ( ) import ( - "github.com/dubbogo/gost/container/set" "go.uber.org/atomic" ) @@ -35,8 +34,6 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -var routerURLSet = gxset.NewSet() - // BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers type BaseDirectory struct { url *common.URL @@ -120,14 +117,3 @@ func (dir *BaseDirectory) Destroy(doDestroy func()) { func (dir *BaseDirectory) IsAvailable() bool { return !dir.destroyed.Load() } - -// GetRouterURLSet Return router URL -func GetRouterURLSet() *gxset.HashSet { - return routerURLSet -} - -// AddRouterURLSet Add router URL -// Router URL will init in config/config_loader.go -func AddRouterURLSet(url *common.URL) { - routerURLSet.Add(url) -} diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index ac2fe672916d0ca093b652e0ae7334c0edfe58d0..f820721515d9091c454067cf059bc29664e04808 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -20,27 +20,29 @@ package extension import ( perrors "github.com/pkg/errors" ) - import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/registry" ) var ( - discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4) + discoveryCreatorMap = make(map[string]func(name string) (registry.ServiceDiscovery, error), 4) ) // SetServiceDiscovery will store the creator and name -func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) { - discoveryCreatorMap[name] = creator +// protocol indicate the implementation, like nacos +// the name like nacos-1... +func SetServiceDiscovery(protocol string, creator func(name string) (registry.ServiceDiscovery, error)) { + discoveryCreatorMap[protocol] = creator } // GetServiceDiscovery will return the registry.ServiceDiscovery +// protocol indicate the implementation, like nacos +// the name like nacos-1... // if not found, or initialize instance failed, it will return error. -func GetServiceDiscovery(name string, url *common.URL) (registry.ServiceDiscovery, error) { - creator, ok := discoveryCreatorMap[name] +func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) { + creator, ok := discoveryCreatorMap[protocol] if !ok { return nil, perrors.New("Could not find the service discovery with name: " + name) } - return creator(url) + return creator(name) } diff --git a/config/base_config.go b/config/base_config.go index f58138d2e58a1b3da06894d3afa4728ea2ecf8fb..d95ea27ee6f20d51144ded8904a0051c448cce9e 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -42,7 +42,12 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { - ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"` + ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"` + // application config + ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` + configCenterUrl *common.URL prefix string fatherConfig interface{} @@ -51,6 +56,17 @@ type BaseConfig struct { fileStream *bytes.Buffer } +func (c *BaseConfig) GetServiceDiscoveries(name string) (config *ServiceDiscoveryConfig, ok bool) { + config, ok = c.ServiceDiscoveries[name] + return +} + +// GetRemoteConfig will return the remote's config with the name if found +func (c *BaseConfig) GetRemoteConfig(name string) (config *RemoteConfig, ok bool) { + config, ok = c.Remotes[name] + return +} + // startConfigCenter will start the config center. // it will prepare the environment func (c *BaseConfig) startConfigCenter() error { @@ -63,7 +79,7 @@ func (c *BaseConfig) startConfigCenter() error { if c.prepareEnvironment() != nil { return perrors.WithMessagef(err, "start config center error!") } - //c.fresh() + // c.fresh() return err } @@ -101,14 +117,14 @@ func (c *BaseConfig) prepareEnvironment() error { return perrors.WithStack(err) } } - //global config file + // global config file mapContent, err := dynamicConfig.Parser().Parse(content) if err != nil { return perrors.WithStack(err) } config.GetEnvInstance().UpdateExternalConfigMap(mapContent) - //appGroup config file + // appGroup config file if len(appContent) != 0 { appMapConent, err := dynamicConfig.Parser().Parse(appContent) if err != nil { @@ -264,7 +280,7 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC if f.Kind() == reflect.Map { if f.Type().Elem().Kind() == reflect.Ptr { - //initiate config + // initiate config s := reflect.New(f.Type().Elem().Elem()) prefix := s.MethodByName("Prefix").Call(nil)[0].String() for _, pfx := range strings.Split(prefix, "|") { @@ -279,7 +295,7 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC } - //iter := f.MapRange() + // iter := f.MapRange() for _, k := range f.MapKeys() { v := f.MapIndex(k) @@ -314,7 +330,7 @@ func (c *BaseConfig) fresh() { } func (c *BaseConfig) freshInternalConfig(config *config.InmemoryConfiguration) { - //reflect to init struct + // reflect to init struct tp := reflect.ValueOf(c.fatherConfig).Elem().Type() initializeStruct(tp, reflect.ValueOf(c.fatherConfig).Elem()) diff --git a/config/base_config_test.go b/config/base_config_test.go index 60eccfb1836dccbec0e8dc593a0954005117c28e..4a4b2b06b985971b7a397daa52107fa812fc88e9 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -45,13 +45,15 @@ func Test_refresh(t *testing.T) { father := &ConsumerConfig{ Check: &[]bool{true}[0], - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{ "shanghai_reg2": { Protocol: "mock", @@ -139,13 +141,15 @@ func Test_appExternal_refresh(t *testing.T) { config.GetEnvInstance().UpdateExternalConfigMap(mockMap) father := &ConsumerConfig{ Check: &[]bool{true}[0], - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{ "shanghai_reg2": { Protocol: "mock", @@ -225,13 +229,15 @@ func Test_appExternalWithoutId_refresh(t *testing.T) { config.GetEnvInstance().UpdateExternalConfigMap(mockMap) father := &ConsumerConfig{ Check: &[]bool{true}[0], - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{ "shanghai_reg2": { Protocol: "mock", @@ -310,13 +316,15 @@ func Test_refresh_singleRegistry(t *testing.T) { father := &ConsumerConfig{ Check: &[]bool{true}[0], - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{}, Registry: &RegistryConfig{}, References: map[string]*ReferenceConfig{ @@ -373,13 +381,15 @@ func Test_refreshProvider(t *testing.T) { config.GetEnvInstance().UpdateExternalConfigMap(mockMap) father := &ProviderConfig{ - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{ "shanghai_reg2": { Protocol: "mock", @@ -472,7 +482,7 @@ func Test_initializeStruct(t *testing.T) { reflect.ValueOf(consumerConfig).Elem().Set(v.Elem()) assert.Condition(t, func() (success bool) { - return consumerConfig.ApplicationConfig != nil + return consumerConfig.Registry != nil }) assert.Condition(t, func() (success bool) { return consumerConfig.Registries != nil diff --git a/config/config_loader.go b/config/config_loader.go index 1ed43ededdf9c8bfeb30c0c8f62b8c9a414246e6..0922a7a28569bdbc97a39d2f7b3f9d1e2673f004 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -21,6 +21,7 @@ import ( "fmt" "log" "os" + "sync" "time" ) @@ -37,12 +38,19 @@ import ( ) var ( - consumerConfig *ConsumerConfig - providerConfig *ProviderConfig - metricConfig *MetricConfig - applicationConfig *ApplicationConfig - maxWait = 3 - confRouterFile string + consumerConfig *ConsumerConfig + providerConfig *ProviderConfig + // baseConfig = providerConfig.BaseConfig or consumerConfig + baseConfig *BaseConfig + // baseConfigOnce is used to make sure that we only create it once. + baseConfigOnce sync.Once + + // configAccessMutex is used to make sure that BaseConfig.xxxxConfig will only be created once if needed. + // it should be used combine with double-check to avoid the race condition + configAccessMutex sync.Mutex + + maxWait = 3 + confRouterFile string ) // loaded consumer & provider config from xxx.yml, and log config from xxx.xml @@ -100,12 +108,6 @@ func loadConsumerConfig() { } } - metricConfig = consumerConfig.MetricConfig - applicationConfig = consumerConfig.ApplicationConfig - extension.SetAndInitGlobalDispatcher(consumerConfig.eventDispatcherType) - - extension.SetAndInitGlobalDispatcher(consumerConfig.eventDispatcherType) - checkApplicationName(consumerConfig.ApplicationConfig) if err := configCenterRefreshConsumer(); err != nil { logger.Errorf("[consumer config center refresh] %#v", err) @@ -126,14 +128,14 @@ func loadConsumerConfig() { ref.Implement(rpcService) } - //wait for invoker is available, if wait over default 3s, then panic + // wait for invoker is available, if wait over default 3s, then panic var count int checkok := true for { for _, refconfig := range consumerConfig.References { if (refconfig.Check != nil && *refconfig.Check) || (refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) || - (refconfig.Check == nil && consumerConfig.Check == nil) { //default to true + (refconfig.Check == nil && consumerConfig.Check == nil) { // default to true if refconfig.invoker != nil && !refconfig.invoker.IsAvailable() { @@ -178,13 +180,6 @@ func loadProviderConfig() { } } - // so, you should know that the consumer's config will be override - metricConfig = providerConfig.MetricConfig - applicationConfig = providerConfig.ApplicationConfig - extension.SetAndInitGlobalDispatcher(providerConfig.eventDispatcherType) - - extension.SetAndInitGlobalDispatcher(consumerConfig.eventDispatcherType) - checkApplicationName(providerConfig.ApplicationConfig) if err := configCenterRefreshProvider(); err != nil { logger.Errorf("[provider config center refresh] %#v", err) @@ -225,6 +220,9 @@ func Load() { // service config loadProviderConfig() + // common part + extension.SetAndInitGlobalDispatcher(providerConfig.eventDispatcherType) + // init the shutdown callback GracefulShutdownInit() } @@ -241,39 +239,84 @@ func RPCService(service common.RPCService) { // GetMetricConfig find the MetricConfig // if it is nil, create a new one +// we use double-check to reduce race condition +// In general, it will be locked 0 or 1 time. +// So you don't need to worry about the race condition func GetMetricConfig() *MetricConfig { - if metricConfig == nil { - metricConfig = &MetricConfig{} + if GetBaseConfig().MetricConfig == nil { + configAccessMutex.Lock() + defer configAccessMutex.Unlock() + if GetBaseConfig().MetricConfig == nil { + GetBaseConfig().MetricConfig = &MetricConfig{} + } } - return metricConfig + return GetBaseConfig().MetricConfig } // GetApplicationConfig find the application config // if not, we will create one // Usually applicationConfig will be initialized when system start +// we use double-check to reduce race condition +// In general, it will be locked 0 or 1 time. +// So you don't need to worry about the race condition func GetApplicationConfig() *ApplicationConfig { - if applicationConfig == nil { - applicationConfig = &ApplicationConfig{} + if GetBaseConfig().ApplicationConfig == nil { + configAccessMutex.Lock() + defer configAccessMutex.Unlock() + if GetBaseConfig().ApplicationConfig == nil { + GetBaseConfig().ApplicationConfig = &ApplicationConfig{} + } } - return applicationConfig + return GetBaseConfig().ApplicationConfig } // GetProviderConfig find the provider config // if not found, create new one +// we use double-check to reduce race condition +// In general, it will be locked 0 or 1 time. +// So you don't need to worry about the race condition func GetProviderConfig() ProviderConfig { if providerConfig == nil { - logger.Warnf("providerConfig is nil!") - return ProviderConfig{} + logger.Warnf("providerConfig is nil! we will try to create one") + configAccessMutex.Lock() + defer configAccessMutex.Unlock() + if providerConfig == nil { + logger.Warnf("creating empty provider config. You should see this log only once.") + providerConfig = &ProviderConfig{} + } } return *providerConfig } // GetConsumerConfig find the consumer config // if not found, create new one +// we use double-check to reduce race condition +// In general, it will be locked 0 or 1 time. +// So you don't need to worry about the race condition func GetConsumerConfig() ConsumerConfig { if consumerConfig == nil { - logger.Warnf("consumerConfig is nil!") - return ConsumerConfig{} + logger.Warnf("consumerConfig is nil! we will try to create one") + configAccessMutex.Lock() + defer configAccessMutex.Unlock() + if consumerConfig == nil { + logger.Warnf("creating empty consumer config. You should see this log only once.") + consumerConfig = &ConsumerConfig{} + } } return *consumerConfig } + +func GetBaseConfig() *BaseConfig { + if baseConfig == nil { + baseConfigOnce.Do(func() { + baseConfig = &BaseConfig{ + MetricConfig: &MetricConfig{}, + ConfigCenterConfig: &ConfigCenterConfig{}, + Remotes: make(map[string]*RemoteConfig, 0), + ApplicationConfig: &ApplicationConfig{}, + ServiceDiscoveries: make(map[string]*ServiceDiscoveryConfig, 0), + } + }) + } + return baseConfig +} diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 0192b4c8a06263266cb80b344a0792ea2f6af8c8..f653f5e7598756514fd1e50dd2077329cbb3c9c0 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -235,16 +235,25 @@ func TestConfigLoaderWithConfigCenterSingleRegistry(t *testing.T) { } +func TestGetBaseConfig(t *testing.T) { + bc := GetBaseConfig() + assert.NotNil(t, bc) + _, found := bc.GetRemoteConfig("mock") + assert.False(t, found) +} + // mockInitProviderWithSingleRegistry will init a mocked providerConfig func mockInitProviderWithSingleRegistry() { providerConfig = &ProviderConfig{ - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "1.0.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "1.0.0", + Owner: "dubbo", + Environment: "test"}, + }, Registry: &RegistryConfig{ Address: "mock://127.0.0.1:2181", Username: "user1", diff --git a/config/consumer_config.go b/config/consumer_config.go index debcd79fa281c40e5526f60f5c5cdb66688688f4..76476511fbf74a5e973d59e560f65cab978d4527 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -42,9 +42,6 @@ import ( type ConsumerConfig struct { BaseConfig `yaml:",inline"` Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - // application - ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` - // client Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty" property:"connect_timeout"` ConnectTimeout time.Duration diff --git a/config/provider_config.go b/config/provider_config.go index 79569917455773653750d1d5921a722daf079b0a..81f20e864d6e10424c66ee60f501bd41ec6db12f 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -42,17 +42,15 @@ type ProviderConfig struct { Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"` // metadata-report - MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"` - - ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` - Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"` - Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"` - Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` - Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"` - ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` - FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` - ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` - ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` + MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"` + Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"` + Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"` + Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` + Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"` + ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` + FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` + ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` + ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` } // UnmarshalYAML ... diff --git a/config/reference_config.go b/config/reference_config.go index 3710cbc4bc62a01a014e91bcb978742c4a93c5cb..f343a9a2362928d4d87874d7d1ddc5d7ad40316c 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -144,6 +144,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) { regUrl = u } } + + // TODO(decouple from directory, config should not depend on directory module) if regUrl != nil { cluster := extension.GetCluster("registryAware") c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 7a65e55f09c997cb49b83f1f185faf9338cf0f5a..faaa461a75a5c887f6fa1cc568d7e809c42a6ac4 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -38,13 +38,15 @@ var regProtocol protocol.Protocol func doInitConsumer() { consumerConfig = &ConsumerConfig{ - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{ "shanghai_reg1": { Protocol: "mock", @@ -135,13 +137,15 @@ func doInitConsumerAsync() { func doInitConsumerWithSingleRegistry() { consumerConfig = &ConsumerConfig{ - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registry: &RegistryConfig{ Address: "mock://27.0.0.1:2181", Username: "user1", diff --git a/config/remote_config.go b/config/remote_config.go new file mode 100644 index 0000000000000000000000000000000000000000..ed9dab37a43954ad1fd5061748a0419d21d4304d --- /dev/null +++ b/config/remote_config.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "time" +) + +type RemoteConfig struct { + Address string `yaml:"address" json:"address,omitempty"` + Timeout time.Duration `default:"10s" yaml:"timeout" json:"timeout,omitempty"` + Params map[string]string `yaml:"params" json:"address,omitempty"` +} + +// GetParam will return the value of the key. If not found, def will be return; +// def => default value +func (rc *RemoteConfig) GetParam(key string, def string) string { + param, ok := rc.Params[key] + if !ok { + return def + } + return param +} diff --git a/config/remote_config_test.go b/config/remote_config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..99facb7dda98ba46ac7b5a6f86de070de8ca3d78 --- /dev/null +++ b/config/remote_config_test.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRemoteConfig_GetParam(t *testing.T) { + rc := &RemoteConfig{ + Params: make(map[string]string, 1), + } + + def := "default value" + key := "key" + value := rc.GetParam(key, def) + assert.Equal(t, def, value) + + actualVal := "actual value" + rc.Params[key] = actualVal + + value = rc.GetParam(key, def) + assert.Equal(t, actualVal, value) +} diff --git a/config/router_config.go b/config/router_config.go index 0670ee9c20f618021d1d574344a0df85d837bd66..16943d96be76f93c2d540e2ccf16670b7424298f 100644 --- a/config/router_config.go +++ b/config/router_config.go @@ -18,16 +18,20 @@ package config import ( + gxset "github.com/dubbogo/gost/container/set" perrors "github.com/pkg/errors" ) import ( - "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/yaml" ) +var ( + routerURLSet = gxset.NewSet() +) + // RouterInit Load config file to init router config func RouterInit(confRouterFile string) error { fileRouterFactories := extension.GetFileRouterFactories() @@ -40,10 +44,14 @@ func RouterInit(confRouterFile string) error { r, e := factory.NewFileRouter(bytes) if e == nil { url := r.URL() - directory.AddRouterURLSet(&url) + routerURLSet.Add(url) return nil } logger.Warnf("router config type %s create fail {%v}\n", k, e) } return perrors.Errorf("no file router exists for parse %s , implement router.FIleRouterFactory please.", confRouterFile) } + +func GetRouterURLSet() *gxset.HashSet { + return routerURLSet +} diff --git a/config/router_config_test.go b/config/router_config_test.go index 2f0a38b2fdf59578c77076680c05b3eca5c26a1c..bf189b600f1135e4059c8833a3de042bba5427ff 100644 --- a/config/router_config_test.go +++ b/config/router_config_test.go @@ -27,7 +27,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/cluster/directory" _ "github.com/apache/dubbo-go/cluster/router/condition" ) @@ -53,15 +52,3 @@ func TestString(t *testing.T) { assert.Equal(t, n2[0], "a1") assert.Equal(t, n2[1], "") } - -func TestRouterInit(t *testing.T) { - errPro := RouterInit(errorTestYML) - assert.Error(t, errPro) - - assert.Equal(t, 0, directory.GetRouterURLSet().Size()) - - errPro = RouterInit(testYML) - assert.NoError(t, errPro) - - assert.Equal(t, 1, directory.GetRouterURLSet().Size()) -} diff --git a/config/service_config.go b/config/service_config.go index 4597c4a0a52ff6cdb11944d92f118373f84fb118..d233e2b8a55facd2ca62f86a7af9f5d6e7e309d5 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -113,6 +113,16 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { } } +// InitExported will set exported as false atom bool +func (c *ServiceConfig) InitExported() { + c.exported = atomic.NewBool(false) +} + +// IsExport will return whether the service config is exported or not +func (c *ServiceConfig) IsExport() bool { + return c.exported.Load() +} + // Get Random Port func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { ports := list.New() @@ -131,16 +141,6 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { return ports } -// InitExported will set exported as false atom bool -func (c *ServiceConfig) InitExported() { - c.exported = atomic.NewBool(false) -} - -// IsExport will return whether the service config is exported or not -func (c *ServiceConfig) IsExport() bool { - return c.exported.Load() -} - // Export ... func (c *ServiceConfig) Export() error { // TODO: config center start here diff --git a/config/service_config_test.go b/config/service_config_test.go index efa2cafa3669a406157bd22af8d02930ec71ba48..949566f82a30e92c049dcf7b4063e26397df0a3c 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -24,9 +24,6 @@ import ( import ( gxnet "github.com/dubbogo/gost/net" "github.com/stretchr/testify/assert" -) - -import ( "go.uber.org/atomic" ) @@ -36,13 +33,15 @@ import ( func doInitProvider() { providerConfig = &ProviderConfig{ - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: BaseConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + }, Registries: map[string]*RegistryConfig{ "shanghai_reg1": { Protocol: "mock", @@ -146,6 +145,7 @@ func Test_Export(t *testing.T) { for i := range providerConfig.Services { service := providerConfig.Services[i] service.Implement(&MockService{}) + service.Protocols = providerConfig.Protocols service.Export() } providerConfig = nil diff --git a/config/service_discovery_config.go b/config/service_discovery_config.go new file mode 100644 index 0000000000000000000000000000000000000000..343c366ec5dd429f5cf2b4e862393536696bde82 --- /dev/null +++ b/config/service_discovery_config.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +// ServiceDiscoveryConfig will be used to create +type ServiceDiscoveryConfig struct { + // Protocol indicate which implementation will be used. + // for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery + Protocol string + // Group, usually you don't need to config this field. + // you can use this to do some isolation + Group string + // RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances. + RemoteRef string +} diff --git a/metadata/service/exporter/configurable/exporter_test.go b/metadata/service/exporter/configurable/exporter_test.go index 220ef71daca47f46bdcd4b88b215970399a5da31..364169b3174969267627a61476164dd736cf6f36 100644 --- a/metadata/service/exporter/configurable/exporter_test.go +++ b/metadata/service/exporter/configurable/exporter_test.go @@ -66,13 +66,15 @@ func TestConfigurableExporter(t *testing.T) { // mockInitProviderWithSingleRegistry will init a mocked providerConfig func mockInitProviderWithSingleRegistry() { providerConfig := &config.ProviderConfig{ - ApplicationConfig: &config.ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "1.0.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "1.0.0", + Owner: "dubbo", + Environment: "test"}, + }, Registry: &config.RegistryConfig{ Address: "mock://127.0.0.1:2181", Username: "user1", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 552aa57061c99bf92ff986b6e672743ebb375e76..d689bc6237c77fef9eb43412817bf30f6ea863c2 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -52,7 +52,7 @@ type RegistryDirectory struct { listenerLock sync.Mutex serviceType string registry registry.Registry - cacheInvokersMap *sync.Map //use sync.map + cacheInvokersMap *sync.Map // use sync.map cacheOriginUrl *common.URL configurators []config_center.Configurator consumerConfigurationListener *consumerConfigurationListener @@ -79,7 +79,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. return dir, nil } -//subscribe from registry +// subscribe from registry func (dir *RegistryDirectory) subscribe(url *common.URL) { dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) @@ -105,15 +105,15 @@ func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { url *common.URL oldInvoker protocol.Invoker = nil ) - //judge is override or others + // judge is override or others if res != nil { url = &res.Service - //1.for override url in 2.6.x + // 1.for override url in 2.6.x if url.Protocol == constant.OVERRIDE_PROTOCOL || url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY { dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url)) url = nil - } else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router + } else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { url = nil @@ -123,7 +123,7 @@ func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { logger.Infof("selector add service url{%s}", res.Service) var urls []*common.URL - for _, v := range directory.GetRouterURLSet().Values() { + for _, v := range config.GetRouterURLSet().Values() { urls = append(urls, v.(*common.URL)) } @@ -171,7 +171,7 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { } } if len(groupInvokersMap) == 1 { - //len is 1 it means no group setting ,so do not need cluster again + // len is 1 it means no group setting ,so do not need cluster again for _, invokers := range groupInvokersMap { groupInvokersList = invokers } @@ -211,7 +211,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { logger.Error("URL is nil ,pls check if service url is subscribe successfully!") return nil } - //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol + // check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { newUrl := common.MergeUrl(url, referenceUrl) dir.overrideUrl(newUrl) @@ -259,7 +259,7 @@ func (dir *RegistryDirectory) IsAvailable() bool { } func (dir *RegistryDirectory) Destroy() { - //TODO:unregister & unsubscribe + // TODO:unregister & unsubscribe dir.BaseDirectory.Destroy(func() { invokers := dir.cacheInvokers dir.cacheInvokers = []protocol.Invoker{} diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index f1d5ce434aa00185f784f208eefe603274f05ab0..1e3d611bf7c213894edbcbcedde79a93900572b4 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -43,7 +43,11 @@ import ( ) func init() { - config.SetConsumerConfig(config.ConsumerConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) + config.SetConsumerConfig(config.ConsumerConfig{ + BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}, + }, + }) } func TestSubscribe(t *testing.T) { diff --git a/registry/inmemory/service_discovery.go b/registry/inmemory/service_discovery.go index 3dac35cd381a7e0e1c0694d6376c85eb7762afc4..f7c3ef3bb566e81587d3845c33ce7fb799b2cd43 100644 --- a/registry/inmemory/service_discovery.go +++ b/registry/inmemory/service_discovery.go @@ -23,7 +23,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/registry" ) @@ -39,7 +38,7 @@ func init() { listeners: make([]*registry.ServiceInstancesChangedListener, 0, 2), } - extension.SetServiceDiscovery(name, func(url *common.URL) (discovery registry.ServiceDiscovery, err error) { + extension.SetServiceDiscovery(name, func(name string) (discovery registry.ServiceDiscovery, err error) { return instance, nil }) } diff --git a/registry/inmemory/service_discovery_test.go b/registry/inmemory/service_discovery_test.go index a934dbabfff7f4041df6dcca77ecc825b3ce391b..fac4699913000c44a566e6a84f850150046f8ce0 100644 --- a/registry/inmemory/service_discovery_test.go +++ b/registry/inmemory/service_discovery_test.go @@ -31,7 +31,7 @@ import ( ) func TestInMemoryServiceDiscovery(t *testing.T) { - discovery, _ := extension.GetServiceDiscovery(name, nil) + discovery, _ := extension.GetServiceDiscovery(name, "in") serviceName := "my-service" err := discovery.Register(®istry.DefaultServiceInstance{ ServiceName: serviceName, diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index a436b85064829b9f42c9dcc45545e5bf2fd2fefe..61a0fee8f660106379cc505f491ee432de416890 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -19,9 +19,14 @@ package nacos import ( "bytes" + "net" "strconv" "strings" "time" + + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" ) import ( @@ -43,7 +48,7 @@ var ( ) const ( - //RegistryConnDelay registry connection delay + // RegistryConnDelay registry connection delay RegistryConnDelay = 3 ) @@ -53,18 +58,8 @@ func init() { } type nacosRegistry struct { - nacosBaseRegistry -} - -// newNacosRegistry will create an instance -func newNacosRegistry(url *common.URL) (registry.Registry, error) { - base, err := newBaseRegistry(url) - if err != nil { - return nil, perrors.WithStack(err) - } - return &nacosRegistry{ - base, - }, nil + *common.URL + namingClient naming_client.INamingClient } func getCategory(url common.URL) string { @@ -140,7 +135,7 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) return NewNacosListener(*conf, nr.namingClient) } -//subscribe from registry +// subscribe from registry func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { for { if !nr.IsAvailable() { @@ -179,9 +174,69 @@ func (nr *nacosRegistry) GetUrl() common.URL { } func (nr *nacosRegistry) IsAvailable() bool { + // TODO return true } func (nr *nacosRegistry) Destroy() { return } + +// newNacosRegistry will create new instance +func newNacosRegistry(url *common.URL) (registry.Registry, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return &nacosRegistry{}, err + } + client, err := clients.CreateNamingClient(nacosConfig) + if err != nil { + return &nacosRegistry{}, err + } + registry := &nacosRegistry{ + URL: url, + namingClient: client, + } + return registry, nil +} + +// getNacosConfig will return the nacos config +// TODO support RemoteRef +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if len(url.Location) == 0 { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}, 2) + + addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index fbd84ac44d5c76fba4d7607c4c7e8d7103e2fc35..e9230195f6fe4191064c7aa308db1494d8635eec 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -18,15 +18,21 @@ package nacos import ( + "fmt" + "sync" + "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/model" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/remoting/nacos" ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -47,8 +53,12 @@ func init() { // There is a problem, the go client for nacos does not support the id field. // we will use the metadata to store the id of ServiceInstance type nacosServiceDiscovery struct { - nacosBaseRegistry group string + // descriptor is a short string about the basic information of this instance + descriptor string + + // namingClient is the Nacos' client + namingClient naming_client.INamingClient } // Destroy will close the service discovery. @@ -271,15 +281,58 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn } } -// toDeregisterInstance will create new service discovery instance -func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { +func (n *nacosServiceDiscovery) String() string { + return n.descriptor +} + +var ( + // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition + instanceMap = make(map[string]registry.ServiceDiscovery, 16) + initLock sync.Mutex +) + +// newNacosServiceDiscovery will create new service discovery instance +// use double-check pattern to reduce race condition +func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + + instance, ok := instanceMap[name] + if ok { + return instance, nil + } + + initLock.Lock() + defer initLock.Unlock() - base, err := newBaseRegistry(url) + // double check + instance, ok = instanceMap[name] + if ok { + return instance, nil + } + + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || len(sdc.RemoteRef) == 0 { + return nil, perrors.New("could not init the instance because the config is invalid") + } + + remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) + if !ok { + return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) + } + group := sdc.Group + if len(group) == 0 { + group = defaultGroup + } + + client, err := nacos.NewNacosClient(remoteConfig) if err != nil { - return nil, perrors.WithStack(err) + return nil, perrors.WithMessage(err, "create nacos client failed.") } + + descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) + return &nacosServiceDiscovery{ - nacosBaseRegistry: base, - group: url.GetParam(constant.NACOS_GROUP, defaultGroup), + group: group, + namingClient: client, + descriptor: descriptor, }, nil } diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index 0ac46cb9a214ba3317cb8ede80df5cf9e7a095f2..b2a4fa53cd96f7f736c0d4f2b00f44b6ba647a89 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -18,8 +18,10 @@ package nacos import ( - "strconv" "testing" + "time" + + "github.com/apache/dubbo-go/config" ) import ( @@ -27,7 +29,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" @@ -35,8 +36,13 @@ import ( "github.com/apache/dubbo-go/registry" ) +var ( + testName = "test" +) + func TestNacosServiceDiscovery_Destroy(t *testing.T) { - serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + prepareData() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) assert.Nil(t, err) assert.NotNil(t, serviceDiscovery) err = serviceDiscovery.Destroy() @@ -45,7 +51,7 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) { } func TestNacosServiceDiscovery_CRUD(t *testing.T) { - + prepareData() extension.SetEventDispatcher("mock", func() observer.EventDispatcher { return &dispatcher.MockEventDispatcher{} }) @@ -68,7 +74,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) { // clean data - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) // clean data for local test serviceDiscovry.Unregister(®istry.DefaultServiceInstance{ @@ -121,11 +127,19 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) { } func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) { - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + prepareData() + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) } -func mockUrl() *common.URL { - regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) - return ®url +func prepareData() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "nacos", + RemoteRef: testName, + } + + config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ + Address: "console.nacos.io:80", + Timeout: 10 * time.Second, + } } diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index cee2a6a625368f655d1b9bc5fe8cc37031e1aef7..6adc0fc7d7cfa3a9c966166e622550ad05d7da45 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -42,7 +42,9 @@ import ( ) func init() { - config.SetProviderConfig(config.ProviderConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) + config.SetProviderConfig(config.ProviderConfig{BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}, + }}) } func referNormal(t *testing.T, regProtocol *registryProtocol) { @@ -66,8 +68,9 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) { func TestRefer(t *testing.T) { config.SetConsumerConfig( - config.ConsumerConfig{ - ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) + config.ConsumerConfig{BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}, + }}) regProtocol := newRegistryProtocol() referNormal(t, regProtocol) } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 1cb36f6c733f0edb1f54be7e85f6109191823cd5..0adbee203bedb5d26b46fed0cfe0af6ccdb80903 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -96,7 +96,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { } func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - return extension.GetServiceDiscovery(url.Protocol, url) + return extension.GetServiceDiscovery(url.Protocol, "TODO") } func parseServices(literalServices string) *gxset.HashSet { diff --git a/registry/nacos/base_registry.go b/remoting/nacos/builder.go similarity index 55% rename from registry/nacos/base_registry.go rename to remoting/nacos/builder.go index 63f4999675470853d0f48d1a22b709efdc1c9d26..578fef49eaad0caae608dabcc69f1bd2d1e45209 100644 --- a/registry/nacos/base_registry.go +++ b/remoting/nacos/builder.go @@ -21,56 +21,23 @@ import ( "net" "strconv" "strings" - "time" -) -import ( "github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/clients/naming_client" nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" perrors "github.com/pkg/errors" -) -import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" ) -// baseRegistry is the parent of both interface-level registry -// and service discovery(related to application-level registry) -type nacosBaseRegistry struct { - *common.URL - namingClient naming_client.INamingClient -} - -// newBaseRegistry will create new instance -func newBaseRegistry(url *common.URL) (nacosBaseRegistry, error) { - nacosConfig, err := getNacosConfig(url) - if err != nil { - return nacosBaseRegistry{}, err - } - client, err := clients.CreateNamingClient(nacosConfig) - if err != nil { - return nacosBaseRegistry{}, err - } - registry := nacosBaseRegistry{ - URL: url, - namingClient: client, - } - return registry, nil -} - -// getNacosConfig will return the nacos config -func getNacosConfig(url *common.URL) (map[string]interface{}, error) { - if url == nil { - return nil, perrors.New("url is empty!") - } - if len(url.Location) == 0 { - return nil, perrors.New("url.location is empty!") +func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error) { + if len(rc.Address) == 0 { + return nil, perrors.New("nacos address is empty!") } configMap := make(map[string]interface{}, 2) - addresses := strings.Split(url.Location, ",") + addresses := strings.Split(rc.Address, ",") serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) for _, addr := range addresses { ip, portStr, err := net.SplitHostPort(addr) @@ -86,17 +53,14 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) { configMap["serverConfigs"] = serverConfigs var clientConfig nacosConstant.ClientConfig - timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - if err != nil { - return nil, err - } - clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + timeout := rc.Timeout + clientConfig.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate) clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs - clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") - clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") - clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = rc.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = rc.GetParam(constant.NACOS_ENDPOINT, "") clientConfig.NotLoadCacheAtStart = true configMap["clientConfig"] = clientConfig - return configMap, nil + return clients.CreateNamingClient(configMap) } diff --git a/remoting/nacos/builder_test.go b/remoting/nacos/builder_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bbfadef71bf1fe98446dbe0eaa34540b3504ba52 --- /dev/null +++ b/remoting/nacos/builder_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/config" +) + +func TestNewNacosClient(t *testing.T) { + rc := &config.RemoteConfig{} + client, err := NewNacosClient(rc) + + // address is nil + assert.NotNil(t, err) + + rc.Address = "console.nacos.io:80:123" + client, err = NewNacosClient(rc) + // invalid address + assert.NotNil(t, err) + + rc.Address = "console.nacos.io:80" + rc.Timeout = 10 * time.Second + client, err = NewNacosClient(rc) + assert.NotNil(t, client) + assert.Nil(t, err) +}