diff --git a/common/constant/key.go b/common/constant/key.go index a2f893875719171ace6cdd40f2ce0d5e1c609656..6e91750ddd0309545f72ce71d4317af17fa7993e 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -254,9 +254,7 @@ const ( ) // service discovery - const ( - NACOS_GROUP = "nacos.group" SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services" PROVIDER_BY = "provided-by" EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision" @@ -267,5 +265,10 @@ const ( METADATA_SERVICE_PREFIX = "dubbo.metadata-service." METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" - SERVICE_NAME_MAPPING_KEY = "service-name-mapping" + + // used by URL + // SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used + SERVICE_NAME_MAPPING_KEY = "name_mapping" + // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used + SERVICE_DISCOVERY_KEY = "service_discovery" ) diff --git a/config/base_config.go b/config/base_config.go index d95ea27ee6f20d51144ded8904a0051c448cce9e..2f51690d30d847871fe547055b76309a93f88066 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -43,7 +43,7 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` - Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"` + Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"` ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"` // application config ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` diff --git a/config/service_discovery_config.go b/config/service_discovery_config.go index 343c366ec5dd429f5cf2b4e862393536696bde82..d3dc697a9b31e6949375d74aa92234093da22378 100644 --- a/config/service_discovery_config.go +++ b/config/service_discovery_config.go @@ -21,10 +21,10 @@ package config type ServiceDiscoveryConfig struct { // Protocol indicate which implementation will be used. // for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery - Protocol string + Protocol string `yaml:"protocol" json:"protocol,omitempty"` // Group, usually you don't need to config this field. // you can use this to do some isolation - Group string + Group string `yaml:"group" json:"group,omitempty"` // RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances. - RemoteRef string + RemoteRef string `yaml:"remote_ref" json:"remote_ref,omitempty"` } diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index e9230195f6fe4191064c7aa308db1494d8635eec..be558a815b5d258b0c13bf64f15dc37040f68c95 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -281,6 +281,7 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn } } +// String will return the description of the instance func (n *nacosServiceDiscovery) String() string { return n.descriptor } @@ -291,7 +292,7 @@ var ( initLock sync.Mutex ) -// newNacosServiceDiscovery will create new service discovery instance +// newNacosServiceDiscovery will try to find the instance withe name, if not found, a new one will be created. // use double-check pattern to reduce race condition func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { @@ -330,9 +331,12 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) - return &nacosServiceDiscovery{ + instance = &nacosServiceDiscovery{ group: group, namingClient: client, descriptor: descriptor, - }, nil + } + + instanceMap[name] = instance + return instance, nil } diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index b2a4fa53cd96f7f736c0d4f2b00f44b6ba647a89..ca0f24976e988af63297c524101c24d40e55b1f8 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -40,6 +40,35 @@ var ( testName = "test" ) +func Test_newNacosServiceDiscovery(t *testing.T) { + name := "nacos1" + _, err := newNacosServiceDiscovery(name) + + // the ServiceDiscoveryConfig not found + assert.NotNil(t, err) + + sdc := &config.ServiceDiscoveryConfig{ + Protocol: "nacos", + RemoteRef: "mock", + } + config.GetBaseConfig().ServiceDiscoveries[name] = sdc + + _, err = newNacosServiceDiscovery(name) + + // RemoteConfig not found + assert.NotNil(t, err) + + config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ + Address: "console.nacos.io:80", + Timeout: 10 * time.Second, + } + + res, err := newNacosServiceDiscovery(name) + assert.Nil(t, err) + assert.NotNil(t, res) + +} + func TestNacosServiceDiscovery_Destroy(t *testing.T) { prepareData() serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 0adbee203bedb5d26b46fed0cfe0af6ccdb80903..d32227b4f96c75d5ce526745f42317c9decb399d 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -23,6 +23,10 @@ import ( "strconv" "strings" "sync" + + perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/config" ) import ( @@ -48,16 +52,22 @@ const ( protocolName = "service-discovery" ) +var ( + registryInstance *serviceDiscoveryRegistry + registryInitOnce sync.Once +) + func init() { extension.SetRegistry(protocolName, newServiceDiscoveryRegistry) } // serviceDiscoveryRegistry is the implementation of application-level registry. // It's completely different from other registry implementations +// The serviceDiscoveryRegistry should be singleton // This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping // In order to keep compatible with interface-level registry, // 1. when we registry the service, we should create the mapping from service name to application name -// 2. when we sub +// 2. when we subscribe the service, we should find out related application and then find application's information type serviceDiscoveryRegistry struct { lock sync.RWMutex url *common.URL @@ -65,38 +75,51 @@ type serviceDiscoveryRegistry struct { subscribedServices *gxset.HashSet serviceNameMapping mapping.ServiceNameMapping metaDataService service.MetadataService - //cache the registered listen + // cache the registered listen registeredListeners *gxset.HashSet - //all synthesize + // all synthesize subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer - //cache exported urls, serviceName->revision->[]URL + // cache exported urls, serviceName->revision->[]URL serviceRevisionExportedURLsCache map[string]map[string][]common.URL } -func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { - serviceDiscovery, err := creatServiceDiscovery(url) - if err != nil { - return nil, err - } - subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) - subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() - serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) - //TODO it's need to get implement by factory - metaDataService := inmemory.NewMetadataService() - return &serviceDiscoveryRegistry{ - url: url, - serviceDiscovery: serviceDiscovery, - subscribedServices: subscribedServices, - subscribedURLsSynthesizers: subscribedURLsSynthesizers, - registeredListeners: gxset.NewSet(), - serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), - serviceNameMapping: serviceNameMapping, - metaDataService: metaDataService, - }, nil +// newServiceDiscoveryRegistry will return the instance +// if not found, it will create one +func newServiceDiscoveryRegistry(url *common.URL) (res registry.Registry, err error) { + + registryInitOnce.Do(func() { + serviceDiscovery, err := creatServiceDiscovery(url) + if err != nil { + return + } + subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) + subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() + serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) + // TODO it's need to get implement by factory + metaDataService := inmemory.NewMetadataService() + registryInstance = &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: serviceDiscovery, + subscribedServices: subscribedServices, + subscribedURLsSynthesizers: subscribedURLsSynthesizers, + registeredListeners: gxset.NewSet(), + serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), + serviceNameMapping: serviceNameMapping, + metaDataService: metaDataService, + } + }) + res = registryInstance + return } func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - return extension.GetServiceDiscovery(url.Protocol, "TODO") + discovery := url.GetParam(constant.SERVICE_DISCOVERY_KEY, constant.DEFAULT_KEY) + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(discovery) + + if !ok { + return nil, perrors.New("could not find the ServiceDiscoverConfig with name: " + discovery) + } + return extension.GetServiceDiscovery(sdc.Protocol, discovery) } func parseServices(literalServices string) *gxset.HashSet { @@ -113,22 +136,22 @@ func parseServices(literalServices string) *gxset.HashSet { return set } -//GetServiceDiscovery for get serviceDiscovery of the registry +// GetServiceDiscovery for get serviceDiscovery of the registry func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { return s.serviceDiscovery } -//GetUrl for get url of the registry +// GetUrl for get url of the registry func (s *serviceDiscoveryRegistry) GetUrl() common.URL { return *s.url } -//IsAvailable for make sure is't available +// IsAvailable for make sure is't available func (s *serviceDiscoveryRegistry) IsAvailable() bool { return true } -//Destroy for destroy graceful down +// Destroy for destroy graceful down func (s *serviceDiscoveryRegistry) Destroy() { err := s.serviceDiscovery.Destroy() if err != nil { @@ -162,7 +185,7 @@ func shouldRegister(url common.URL) bool { return false } -//Subscribe for listen the change of services that from the exported url +// Subscribe for listen the change of services that from the exported url func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) { if !shouldSubscribe(*url) { return @@ -243,7 +266,7 @@ func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.No if len(subscribedURLs) == 0 { subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances) } - //TODO make sure it's workable + // TODO make sure it's workable for _, url := range subscribedURLs { notify.Notify(®istry.ServiceEvent{ Action: remoting.EventTypeAdd, @@ -461,7 +484,7 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc serviceInstance.GetHost(), serviceInstance.GetPort(), revision) } } else { - //Else, The cache is hit + // Else, The cache is hit logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+ "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(), @@ -499,8 +522,8 @@ func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsa u.RemoveParams(removeParamSet) port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol)) if u.Location != host || u.Port != port { - u.Port = port //reset port - u.Location = host //reset host + u.Port = port // reset port + u.Location = host // reset host } clonedExportedURLs = append(clonedExportedURLs, u) }