diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 75d9ef26567df0fbd83f5d9f94c8548d1e8e633d..a3e207f3022a4d882d1b90f2756b51ab4e0b2775 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -22,7 +22,6 @@ import ( ) import ( - "github.com/dubbogo/gost/container/set" "go.uber.org/atomic" ) @@ -35,8 +34,6 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -var routerURLSet = gxset.NewSet() - // BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers type BaseDirectory struct { url *common.URL @@ -120,14 +117,3 @@ func (dir *BaseDirectory) Destroy(doDestroy func()) { func (dir *BaseDirectory) IsAvailable() bool { return !dir.destroyed.Load() } - -// GetRouterURLSet Return router URL -func GetRouterURLSet() *gxset.HashSet { - return routerURLSet -} - -// AddRouterURLSet Add router URL -// Router URL will init in config/config_loader.go -func AddRouterURLSet(url *common.URL) { - routerURLSet.Add(url) -} diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index 488d94ebf13162a135a1e38b3375cb69532a5f25..f820721515d9091c454067cf059bc29664e04808 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -18,28 +18,31 @@ package extension import ( - "github.com/apache/dubbo-go/registry" perrors "github.com/pkg/errors" ) import ( - "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" ) var ( - discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4) + discoveryCreatorMap = make(map[string]func(name string) (registry.ServiceDiscovery, error), 4) ) // SetServiceDiscovery will store the creator and name -func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) { - discoveryCreatorMap[name] = creator +// protocol indicate the implementation, like nacos +// the name like nacos-1... +func SetServiceDiscovery(protocol string, creator func(name string) (registry.ServiceDiscovery, error)) { + discoveryCreatorMap[protocol] = creator } // GetServiceDiscovery will return the registry.ServiceDiscovery +// protocol indicate the implementation, like nacos +// the name like nacos-1... // if not found, or initialize instance failed, it will return error. -func GetServiceDiscovery(name string, url *common.URL) (registry.ServiceDiscovery, error) { - creator, ok := discoveryCreatorMap[name] +func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) { + creator, ok := discoveryCreatorMap[protocol] if !ok { return nil, perrors.New("Could not find the service discovery with name: " + name) } - return creator(url) + return creator(name) } diff --git a/config/base_config.go b/config/base_config.go index de0f9e7f8e9b7e8e01cab00a86d80ee3f6840cea..d95ea27ee6f20d51144ded8904a0051c448cce9e 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -42,9 +42,10 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { - ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` - Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"` - // application + ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"` + ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"` + // application config ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` configCenterUrl *common.URL @@ -55,6 +56,11 @@ type BaseConfig struct { fileStream *bytes.Buffer } +func (c *BaseConfig) GetServiceDiscoveries(name string) (config *ServiceDiscoveryConfig, ok bool) { + config, ok = c.ServiceDiscoveries[name] + return +} + // GetRemoteConfig will return the remote's config with the name if found func (c *BaseConfig) GetRemoteConfig(name string) (config *RemoteConfig, ok bool) { config, ok = c.Remotes[name] diff --git a/config/base_config_test.go b/config/base_config_test.go index 9c4b4f903dc7dac6aca60fd6ceb1eb1af0fd6c69..4a4b2b06b985971b7a397daa52107fa812fc88e9 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -482,7 +482,7 @@ func Test_initializeStruct(t *testing.T) { reflect.ValueOf(consumerConfig).Elem().Set(v.Elem()) assert.Condition(t, func() (success bool) { - return consumerConfig.ApplicationConfig != nil + return consumerConfig.Registry != nil }) assert.Condition(t, func() (success bool) { return consumerConfig.Registries != nil diff --git a/config/config_loader.go b/config/config_loader.go index 8552caef89bc33210d29e2cfea0cc3715883d83d..0922a7a28569bdbc97a39d2f7b3f9d1e2673f004 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -314,6 +314,7 @@ func GetBaseConfig() *BaseConfig { ConfigCenterConfig: &ConfigCenterConfig{}, Remotes: make(map[string]*RemoteConfig, 0), ApplicationConfig: &ApplicationConfig{}, + ServiceDiscoveries: make(map[string]*ServiceDiscoveryConfig, 0), } }) } diff --git a/config/provider_config.go b/config/provider_config.go index bdb9bd2f96eecfc0e9d3fc0e1f72ad95879e264a..81f20e864d6e10424c66ee60f501bd41ec6db12f 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -42,15 +42,15 @@ type ProviderConfig struct { Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"` // metadata-report - MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"` - Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"` - Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"` - Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` - Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"` - ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` - FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` - ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` - ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` + MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"` + Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"` + Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"` + Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` + Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"` + ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` + FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` + ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` + ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` } // UnmarshalYAML ... diff --git a/config/reference_config.go b/config/reference_config.go index 3710cbc4bc62a01a014e91bcb978742c4a93c5cb..f343a9a2362928d4d87874d7d1ddc5d7ad40316c 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -144,6 +144,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) { regUrl = u } } + + // TODO(decouple from directory, config should not depend on directory module) if regUrl != nil { cluster := extension.GetCluster("registryAware") c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) diff --git a/config/remote_config.go b/config/remote_config.go index f126afc90f1147ab323460104b53308e30a0d1df..ed9dab37a43954ad1fd5061748a0419d21d4304d 100644 --- a/config/remote_config.go +++ b/config/remote_config.go @@ -17,9 +17,22 @@ package config +import ( + "time" +) + type RemoteConfig struct { - Address string `yaml:"address" json:"address,omitempty"` - Params map[string]string `yaml:"params" json:"address,omitempty"` + Address string `yaml:"address" json:"address,omitempty"` + Timeout time.Duration `default:"10s" yaml:"timeout" json:"timeout,omitempty"` + Params map[string]string `yaml:"params" json:"address,omitempty"` } - +// GetParam will return the value of the key. If not found, def will be return; +// def => default value +func (rc *RemoteConfig) GetParam(key string, def string) string { + param, ok := rc.Params[key] + if !ok { + return def + } + return param +} diff --git a/config/remote_config_test.go b/config/remote_config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..99facb7dda98ba46ac7b5a6f86de070de8ca3d78 --- /dev/null +++ b/config/remote_config_test.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRemoteConfig_GetParam(t *testing.T) { + rc := &RemoteConfig{ + Params: make(map[string]string, 1), + } + + def := "default value" + key := "key" + value := rc.GetParam(key, def) + assert.Equal(t, def, value) + + actualVal := "actual value" + rc.Params[key] = actualVal + + value = rc.GetParam(key, def) + assert.Equal(t, actualVal, value) +} diff --git a/config/router_config.go b/config/router_config.go index 0670ee9c20f618021d1d574344a0df85d837bd66..16943d96be76f93c2d540e2ccf16670b7424298f 100644 --- a/config/router_config.go +++ b/config/router_config.go @@ -18,16 +18,20 @@ package config import ( + gxset "github.com/dubbogo/gost/container/set" perrors "github.com/pkg/errors" ) import ( - "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/yaml" ) +var ( + routerURLSet = gxset.NewSet() +) + // RouterInit Load config file to init router config func RouterInit(confRouterFile string) error { fileRouterFactories := extension.GetFileRouterFactories() @@ -40,10 +44,14 @@ func RouterInit(confRouterFile string) error { r, e := factory.NewFileRouter(bytes) if e == nil { url := r.URL() - directory.AddRouterURLSet(&url) + routerURLSet.Add(url) return nil } logger.Warnf("router config type %s create fail {%v}\n", k, e) } return perrors.Errorf("no file router exists for parse %s , implement router.FIleRouterFactory please.", confRouterFile) } + +func GetRouterURLSet() *gxset.HashSet { + return routerURLSet +} diff --git a/config/router_config_test.go b/config/router_config_test.go index 2f0a38b2fdf59578c77076680c05b3eca5c26a1c..bf189b600f1135e4059c8833a3de042bba5427ff 100644 --- a/config/router_config_test.go +++ b/config/router_config_test.go @@ -27,7 +27,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/cluster/directory" _ "github.com/apache/dubbo-go/cluster/router/condition" ) @@ -53,15 +52,3 @@ func TestString(t *testing.T) { assert.Equal(t, n2[0], "a1") assert.Equal(t, n2[1], "") } - -func TestRouterInit(t *testing.T) { - errPro := RouterInit(errorTestYML) - assert.Error(t, errPro) - - assert.Equal(t, 0, directory.GetRouterURLSet().Size()) - - errPro = RouterInit(testYML) - assert.NoError(t, errPro) - - assert.Equal(t, 1, directory.GetRouterURLSet().Size()) -} diff --git a/config/service_config_test.go b/config/service_config_test.go index eff9735b3de2c4bae340cecf8ec601b57bf3b303..949566f82a30e92c049dcf7b4063e26397df0a3c 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -145,6 +145,7 @@ func Test_Export(t *testing.T) { for i := range providerConfig.Services { service := providerConfig.Services[i] service.Implement(&MockService{}) + service.Protocols = providerConfig.Protocols service.Export() } providerConfig = nil diff --git a/config/service_discovery_config.go b/config/service_discovery_config.go new file mode 100644 index 0000000000000000000000000000000000000000..343c366ec5dd429f5cf2b4e862393536696bde82 --- /dev/null +++ b/config/service_discovery_config.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +// ServiceDiscoveryConfig will be used to create +type ServiceDiscoveryConfig struct { + // Protocol indicate which implementation will be used. + // for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery + Protocol string + // Group, usually you don't need to config this field. + // you can use this to do some isolation + Group string + // RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances. + RemoteRef string +} diff --git a/metadata/service/exporter/configurable/exporter_test.go b/metadata/service/exporter/configurable/exporter_test.go index 220ef71daca47f46bdcd4b88b215970399a5da31..364169b3174969267627a61476164dd736cf6f36 100644 --- a/metadata/service/exporter/configurable/exporter_test.go +++ b/metadata/service/exporter/configurable/exporter_test.go @@ -66,13 +66,15 @@ func TestConfigurableExporter(t *testing.T) { // mockInitProviderWithSingleRegistry will init a mocked providerConfig func mockInitProviderWithSingleRegistry() { providerConfig := &config.ProviderConfig{ - ApplicationConfig: &config.ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "1.0.0", - Owner: "dubbo", - Environment: "test"}, + BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "1.0.0", + Owner: "dubbo", + Environment: "test"}, + }, Registry: &config.RegistryConfig{ Address: "mock://127.0.0.1:2181", Username: "user1", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 552aa57061c99bf92ff986b6e672743ebb375e76..d689bc6237c77fef9eb43412817bf30f6ea863c2 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -52,7 +52,7 @@ type RegistryDirectory struct { listenerLock sync.Mutex serviceType string registry registry.Registry - cacheInvokersMap *sync.Map //use sync.map + cacheInvokersMap *sync.Map // use sync.map cacheOriginUrl *common.URL configurators []config_center.Configurator consumerConfigurationListener *consumerConfigurationListener @@ -79,7 +79,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. return dir, nil } -//subscribe from registry +// subscribe from registry func (dir *RegistryDirectory) subscribe(url *common.URL) { dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) @@ -105,15 +105,15 @@ func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { url *common.URL oldInvoker protocol.Invoker = nil ) - //judge is override or others + // judge is override or others if res != nil { url = &res.Service - //1.for override url in 2.6.x + // 1.for override url in 2.6.x if url.Protocol == constant.OVERRIDE_PROTOCOL || url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY { dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url)) url = nil - } else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router + } else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { url = nil @@ -123,7 +123,7 @@ func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { logger.Infof("selector add service url{%s}", res.Service) var urls []*common.URL - for _, v := range directory.GetRouterURLSet().Values() { + for _, v := range config.GetRouterURLSet().Values() { urls = append(urls, v.(*common.URL)) } @@ -171,7 +171,7 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { } } if len(groupInvokersMap) == 1 { - //len is 1 it means no group setting ,so do not need cluster again + // len is 1 it means no group setting ,so do not need cluster again for _, invokers := range groupInvokersMap { groupInvokersList = invokers } @@ -211,7 +211,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { logger.Error("URL is nil ,pls check if service url is subscribe successfully!") return nil } - //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol + // check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { newUrl := common.MergeUrl(url, referenceUrl) dir.overrideUrl(newUrl) @@ -259,7 +259,7 @@ func (dir *RegistryDirectory) IsAvailable() bool { } func (dir *RegistryDirectory) Destroy() { - //TODO:unregister & unsubscribe + // TODO:unregister & unsubscribe dir.BaseDirectory.Destroy(func() { invokers := dir.cacheInvokers dir.cacheInvokers = []protocol.Invoker{} diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index f1d5ce434aa00185f784f208eefe603274f05ab0..1e3d611bf7c213894edbcbcedde79a93900572b4 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -43,7 +43,11 @@ import ( ) func init() { - config.SetConsumerConfig(config.ConsumerConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) + config.SetConsumerConfig(config.ConsumerConfig{ + BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}, + }, + }) } func TestSubscribe(t *testing.T) { diff --git a/registry/inmemory/service_discovery.go b/registry/inmemory/service_discovery.go index 3dac35cd381a7e0e1c0694d6376c85eb7762afc4..f7c3ef3bb566e81587d3845c33ce7fb799b2cd43 100644 --- a/registry/inmemory/service_discovery.go +++ b/registry/inmemory/service_discovery.go @@ -23,7 +23,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/registry" ) @@ -39,7 +38,7 @@ func init() { listeners: make([]*registry.ServiceInstancesChangedListener, 0, 2), } - extension.SetServiceDiscovery(name, func(url *common.URL) (discovery registry.ServiceDiscovery, err error) { + extension.SetServiceDiscovery(name, func(name string) (discovery registry.ServiceDiscovery, err error) { return instance, nil }) } diff --git a/registry/inmemory/service_discovery_test.go b/registry/inmemory/service_discovery_test.go index a934dbabfff7f4041df6dcca77ecc825b3ce391b..fac4699913000c44a566e6a84f850150046f8ce0 100644 --- a/registry/inmemory/service_discovery_test.go +++ b/registry/inmemory/service_discovery_test.go @@ -31,7 +31,7 @@ import ( ) func TestInMemoryServiceDiscovery(t *testing.T) { - discovery, _ := extension.GetServiceDiscovery(name, nil) + discovery, _ := extension.GetServiceDiscovery(name, "in") serviceName := "my-service" err := discovery.Register(®istry.DefaultServiceInstance{ ServiceName: serviceName, diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index a8d4916c6ebf990dc951963f05fe80ee2320add2..61a0fee8f660106379cc505f491ee432de416890 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -19,9 +19,14 @@ package nacos import ( "bytes" + "net" "strconv" "strings" "time" + + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" ) import ( @@ -43,7 +48,7 @@ var ( ) const ( - //RegistryConnDelay registry connection delay + // RegistryConnDelay registry connection delay RegistryConnDelay = 3 ) @@ -53,18 +58,8 @@ func init() { } type nacosRegistry struct { - nacosBaseRegistry -} - -// newNacosRegistry will create an instance -func newNacosRegistry(url *common.URL) (registry.Registry, error) { - base, err := newBaseRegistry(url) - if err != nil { - return nil, perrors.WithStack(err) - } - return &nacosRegistry{ - base, - }, nil + *common.URL + namingClient naming_client.INamingClient } func getCategory(url common.URL) string { @@ -186,3 +181,62 @@ func (nr *nacosRegistry) IsAvailable() bool { func (nr *nacosRegistry) Destroy() { return } + +// newNacosRegistry will create new instance +func newNacosRegistry(url *common.URL) (registry.Registry, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return &nacosRegistry{}, err + } + client, err := clients.CreateNamingClient(nacosConfig) + if err != nil { + return &nacosRegistry{}, err + } + registry := &nacosRegistry{ + URL: url, + namingClient: client, + } + return registry, nil +} + +// getNacosConfig will return the nacos config +// TODO support RemoteRef +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if len(url.Location) == 0 { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}, 2) + + addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index fbd84ac44d5c76fba4d7607c4c7e8d7103e2fc35..e9230195f6fe4191064c7aa308db1494d8635eec 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -18,15 +18,21 @@ package nacos import ( + "fmt" + "sync" + "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/model" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/remoting/nacos" ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -47,8 +53,12 @@ func init() { // There is a problem, the go client for nacos does not support the id field. // we will use the metadata to store the id of ServiceInstance type nacosServiceDiscovery struct { - nacosBaseRegistry group string + // descriptor is a short string about the basic information of this instance + descriptor string + + // namingClient is the Nacos' client + namingClient naming_client.INamingClient } // Destroy will close the service discovery. @@ -271,15 +281,58 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn } } -// toDeregisterInstance will create new service discovery instance -func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { +func (n *nacosServiceDiscovery) String() string { + return n.descriptor +} + +var ( + // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition + instanceMap = make(map[string]registry.ServiceDiscovery, 16) + initLock sync.Mutex +) + +// newNacosServiceDiscovery will create new service discovery instance +// use double-check pattern to reduce race condition +func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + + instance, ok := instanceMap[name] + if ok { + return instance, nil + } + + initLock.Lock() + defer initLock.Unlock() - base, err := newBaseRegistry(url) + // double check + instance, ok = instanceMap[name] + if ok { + return instance, nil + } + + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || len(sdc.RemoteRef) == 0 { + return nil, perrors.New("could not init the instance because the config is invalid") + } + + remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) + if !ok { + return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) + } + group := sdc.Group + if len(group) == 0 { + group = defaultGroup + } + + client, err := nacos.NewNacosClient(remoteConfig) if err != nil { - return nil, perrors.WithStack(err) + return nil, perrors.WithMessage(err, "create nacos client failed.") } + + descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) + return &nacosServiceDiscovery{ - nacosBaseRegistry: base, - group: url.GetParam(constant.NACOS_GROUP, defaultGroup), + group: group, + namingClient: client, + descriptor: descriptor, }, nil } diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index 0ac46cb9a214ba3317cb8ede80df5cf9e7a095f2..b2a4fa53cd96f7f736c0d4f2b00f44b6ba647a89 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -18,8 +18,10 @@ package nacos import ( - "strconv" "testing" + "time" + + "github.com/apache/dubbo-go/config" ) import ( @@ -27,7 +29,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" @@ -35,8 +36,13 @@ import ( "github.com/apache/dubbo-go/registry" ) +var ( + testName = "test" +) + func TestNacosServiceDiscovery_Destroy(t *testing.T) { - serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + prepareData() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) assert.Nil(t, err) assert.NotNil(t, serviceDiscovery) err = serviceDiscovery.Destroy() @@ -45,7 +51,7 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) { } func TestNacosServiceDiscovery_CRUD(t *testing.T) { - + prepareData() extension.SetEventDispatcher("mock", func() observer.EventDispatcher { return &dispatcher.MockEventDispatcher{} }) @@ -68,7 +74,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) { // clean data - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) // clean data for local test serviceDiscovry.Unregister(®istry.DefaultServiceInstance{ @@ -121,11 +127,19 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) { } func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) { - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + prepareData() + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) } -func mockUrl() *common.URL { - regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) - return ®url +func prepareData() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "nacos", + RemoteRef: testName, + } + + config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ + Address: "console.nacos.io:80", + Timeout: 10 * time.Second, + } } diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index cee2a6a625368f655d1b9bc5fe8cc37031e1aef7..6adc0fc7d7cfa3a9c966166e622550ad05d7da45 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -42,7 +42,9 @@ import ( ) func init() { - config.SetProviderConfig(config.ProviderConfig{ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) + config.SetProviderConfig(config.ProviderConfig{BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}, + }}) } func referNormal(t *testing.T, regProtocol *registryProtocol) { @@ -66,8 +68,9 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) { func TestRefer(t *testing.T) { config.SetConsumerConfig( - config.ConsumerConfig{ - ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}}) + config.ConsumerConfig{BaseConfig: config.BaseConfig{ + ApplicationConfig: &config.ApplicationConfig{Name: "test-application"}, + }}) regProtocol := newRegistryProtocol() referNormal(t, regProtocol) } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 1734899f8bb025538e732c4b05e9f445298eefe9..c110eb22828330adc42cfa2f027dc7b55e6071fe 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -87,7 +87,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { } func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - return extension.GetServiceDiscovery(url.Protocol, url) + return extension.GetServiceDiscovery(url.Protocol, "TODO") } func parseServices(literalServices string) *gxset.HashSet { diff --git a/registry/nacos/base_registry.go b/remoting/nacos/builder.go similarity index 55% rename from registry/nacos/base_registry.go rename to remoting/nacos/builder.go index 63f4999675470853d0f48d1a22b709efdc1c9d26..578fef49eaad0caae608dabcc69f1bd2d1e45209 100644 --- a/registry/nacos/base_registry.go +++ b/remoting/nacos/builder.go @@ -21,56 +21,23 @@ import ( "net" "strconv" "strings" - "time" -) -import ( "github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/clients/naming_client" nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" perrors "github.com/pkg/errors" -) -import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" ) -// baseRegistry is the parent of both interface-level registry -// and service discovery(related to application-level registry) -type nacosBaseRegistry struct { - *common.URL - namingClient naming_client.INamingClient -} - -// newBaseRegistry will create new instance -func newBaseRegistry(url *common.URL) (nacosBaseRegistry, error) { - nacosConfig, err := getNacosConfig(url) - if err != nil { - return nacosBaseRegistry{}, err - } - client, err := clients.CreateNamingClient(nacosConfig) - if err != nil { - return nacosBaseRegistry{}, err - } - registry := nacosBaseRegistry{ - URL: url, - namingClient: client, - } - return registry, nil -} - -// getNacosConfig will return the nacos config -func getNacosConfig(url *common.URL) (map[string]interface{}, error) { - if url == nil { - return nil, perrors.New("url is empty!") - } - if len(url.Location) == 0 { - return nil, perrors.New("url.location is empty!") +func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error) { + if len(rc.Address) == 0 { + return nil, perrors.New("nacos address is empty!") } configMap := make(map[string]interface{}, 2) - addresses := strings.Split(url.Location, ",") + addresses := strings.Split(rc.Address, ",") serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) for _, addr := range addresses { ip, portStr, err := net.SplitHostPort(addr) @@ -86,17 +53,14 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) { configMap["serverConfigs"] = serverConfigs var clientConfig nacosConstant.ClientConfig - timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - if err != nil { - return nil, err - } - clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + timeout := rc.Timeout + clientConfig.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate) clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs - clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") - clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") - clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = rc.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = rc.GetParam(constant.NACOS_ENDPOINT, "") clientConfig.NotLoadCacheAtStart = true configMap["clientConfig"] = clientConfig - return configMap, nil + return clients.CreateNamingClient(configMap) } diff --git a/remoting/nacos/builder_test.go b/remoting/nacos/builder_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bbfadef71bf1fe98446dbe0eaa34540b3504ba52 --- /dev/null +++ b/remoting/nacos/builder_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/config" +) + +func TestNewNacosClient(t *testing.T) { + rc := &config.RemoteConfig{} + client, err := NewNacosClient(rc) + + // address is nil + assert.NotNil(t, err) + + rc.Address = "console.nacos.io:80:123" + client, err = NewNacosClient(rc) + // invalid address + assert.NotNil(t, err) + + rc.Address = "console.nacos.io:80" + rc.Timeout = 10 * time.Second + client, err = NewNacosClient(rc) + assert.NotNil(t, client) + assert.Nil(t, err) +}