diff --git a/common/constant/key.go b/common/constant/key.go index a2f893875719171ace6cdd40f2ce0d5e1c609656..6e91750ddd0309545f72ce71d4317af17fa7993e 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -254,9 +254,7 @@ const ( ) // service discovery - const ( - NACOS_GROUP = "nacos.group" SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services" PROVIDER_BY = "provided-by" EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision" @@ -267,5 +265,10 @@ const ( METADATA_SERVICE_PREFIX = "dubbo.metadata-service." METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" - SERVICE_NAME_MAPPING_KEY = "service-name-mapping" + + // used by URL + // SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used + SERVICE_NAME_MAPPING_KEY = "name_mapping" + // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used + SERVICE_DISCOVERY_KEY = "service_discovery" ) diff --git a/common/extension/service_name_mapping.go b/common/extension/service_name_mapping.go index 1598430752d3f54d08374a9e5367217b52bf6850..795595905d7dd5178e9107e535fca4fc8c945535 100644 --- a/common/extension/service_name_mapping.go +++ b/common/extension/service_name_mapping.go @@ -18,18 +18,21 @@ package extension import ( - "github.com/apache/dubbo-go/metadata/mapping" + "github.com/apache/dubbo-go/metadata" ) var ( - nameMappings = make(map[string]func() mapping.ServiceNameMapping) + nameMappings = make(map[string]func() metadata.ServiceNameMapping) ) -func SetServiceNameMapping(name string, creator func() mapping.ServiceNameMapping) { - // TODO(@邓大明) +func SetServiceNameMapping(name string, creator func() metadata.ServiceNameMapping) { + nameMappings[name] = creator } -func GetServiceNameMapping(name string) mapping.ServiceNameMapping { - // TODO(@邓大明) - return nil +func GetServiceNameMapping(name string) metadata.ServiceNameMapping { + creator, ok := nameMappings[name] + if !ok { + panic("Can not find the target service name mapping: " + name) + } + return creator() } diff --git a/config/base_config.go b/config/base_config.go index d95ea27ee6f20d51144ded8904a0051c448cce9e..2f51690d30d847871fe547055b76309a93f88066 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -43,7 +43,7 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` - Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"` + Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"` ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"` // application config ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` diff --git a/config/service_discovery_config.go b/config/service_discovery_config.go index 343c366ec5dd429f5cf2b4e862393536696bde82..d3dc697a9b31e6949375d74aa92234093da22378 100644 --- a/config/service_discovery_config.go +++ b/config/service_discovery_config.go @@ -21,10 +21,10 @@ package config type ServiceDiscoveryConfig struct { // Protocol indicate which implementation will be used. // for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery - Protocol string + Protocol string `yaml:"protocol" json:"protocol,omitempty"` // Group, usually you don't need to config this field. // you can use this to do some isolation - Group string + Group string `yaml:"group" json:"group,omitempty"` // RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances. - RemoteRef string + RemoteRef string `yaml:"remote_ref" json:"remote_ref,omitempty"` } diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 5e406929050e5cfd388d2a69bb2f9d6e4cc229af..bf578d54aca1c22ec1564791d1f86caabe6b5bab 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -29,27 +29,19 @@ import ( ) import ( - env "github.com/apache/dubbo-go/common/config" + common_cfg "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config_center" - "github.com/apache/dubbo-go/metadata/mapping" + "github.com/apache/dubbo-go/metadata" ) const ( defaultGroup = config_center.DEFAULT_GROUP slash = "/" - name = "dynamic" ) -func init() { - extension.SetServiceNameMapping(name, GetServiceNameMappingInstance) - extension.SetServiceNameMapping(constant.DEFAULT_KEY, GetServiceNameMappingInstance) -} - // DynamicConfigurationServiceNameMapping is the implementation based on config center -// It could be thought as singleton pattern. type DynamicConfigurationServiceNameMapping struct { dc config_center.DynamicConfiguration } @@ -87,21 +79,15 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str } var ( - instance *DynamicConfigurationServiceNameMapping - initOnce sync.Once + serviceNameMappingInstance *DynamicConfigurationServiceNameMapping + serviceNameMappingInitOnce sync.Once ) -// newServiceNameMapping will create an instance of DynamicConfigurationServiceNameMapping -func newServiceNameMapping(dc config_center.DynamicConfiguration) *DynamicConfigurationServiceNameMapping { - return &DynamicConfigurationServiceNameMapping{dc: dc} -} - -// GetServiceNameMappingInstance will return the instance. -// If the instance is not initiated, it will create one -func GetServiceNameMappingInstance() mapping.ServiceNameMapping { - initOnce.Do(func() { - dc := env.GetEnvInstance().GetDynamicConfiguration() - instance = newServiceNameMapping(dc) +// GetServiceNameMappingInstance will return an instance of DynamicConfigurationServiceNameMapping +func GetServiceNameMappingInstance() metadata.ServiceNameMapping { + serviceNameMappingInitOnce.Do(func() { + dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() + serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc} }) - return instance + return serviceNameMappingInstance } diff --git a/metadata/mapping/dynamic/service_name_mapping_test.go b/metadata/mapping/dynamic/service_name_mapping_test.go index 31aaf3f11cf63071b405e3cf1f9b2fb43c4f2c1f..647a15ae78c3a29a3ea58c507ca6b84e8d54b466 100644 --- a/metadata/mapping/dynamic/service_name_mapping_test.go +++ b/metadata/mapping/dynamic/service_name_mapping_test.go @@ -41,7 +41,7 @@ func TestDynamicConfigurationServiceNameMapping(t *testing.T) { }).GetDynamicConfiguration(nil) config.GetApplicationConfig().Name = appName - mapping := newServiceNameMapping(dc) + mapping := &DynamicConfigurationServiceNameMapping{dc: dc} intf := constant.METADATA_SERVICE_NAME group := "myGroup" version := "myVersion" diff --git a/metadata/mapping/memory/service_name_mapping.go b/metadata/mapping/memory/service_name_mapping.go index 8a891491bdb97808b77422092a1043c1c0ffafbf..cf051a11e49b3e3ba8e5346ffd0acc10340f861c 100644 --- a/metadata/mapping/memory/service_name_mapping.go +++ b/metadata/mapping/memory/service_name_mapping.go @@ -17,14 +17,24 @@ package memory +import ( + "sync" +) + import ( gxset "github.com/dubbogo/gost/container/set" ) import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata" ) +func init() { + extension.SetServiceNameMapping("in-memory", GetInMemoryServiceNameMappingInstance) +} + type InMemoryServiceNameMapping struct{} func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { @@ -34,3 +44,15 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { return gxset.NewSet(config.GetApplicationConfig().Name), nil } + +var ( + nameMappingInstance *InMemoryServiceNameMapping + nameMappingInitOnce sync.Once +) + +func GetInMemoryServiceNameMappingInstance() metadata.ServiceNameMapping { + nameMappingInitOnce.Do(func() { + nameMappingInstance = &InMemoryServiceNameMapping{} + }) + return nameMappingInstance +} diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index e9230195f6fe4191064c7aa308db1494d8635eec..7d3406cac233dd5293c7522b4f12148fdcdd704e 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -18,21 +18,15 @@ package nacos import ( - "fmt" - "sync" - "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" - "github.com/nacos-group/nacos-sdk-go/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/model" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" - - "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/remoting/nacos" ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -53,12 +47,8 @@ func init() { // There is a problem, the go client for nacos does not support the id field. // we will use the metadata to store the id of ServiceInstance type nacosServiceDiscovery struct { + nacosBaseRegistry group string - // descriptor is a short string about the basic information of this instance - descriptor string - - // namingClient is the Nacos' client - namingClient naming_client.INamingClient } // Destroy will close the service discovery. @@ -247,7 +237,7 @@ func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, in // DispatchEvent will dispatch the event func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { - extension.GetGlobalDispatcher().Dispatch(event) + // TODO(waiting for event dispatcher, another task) return nil } @@ -281,58 +271,15 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn } } -func (n *nacosServiceDiscovery) String() string { - return n.descriptor -} - -var ( - // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition - instanceMap = make(map[string]registry.ServiceDiscovery, 16) - initLock sync.Mutex -) - -// newNacosServiceDiscovery will create new service discovery instance -// use double-check pattern to reduce race condition -func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { - - instance, ok := instanceMap[name] - if ok { - return instance, nil - } - - initLock.Lock() - defer initLock.Unlock() +// toDeregisterInstance will create new service discovery instance +func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - // double check - instance, ok = instanceMap[name] - if ok { - return instance, nil - } - - sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) - if !ok || len(sdc.RemoteRef) == 0 { - return nil, perrors.New("could not init the instance because the config is invalid") - } - - remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) - if !ok { - return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) - } - group := sdc.Group - if len(group) == 0 { - group = defaultGroup - } - - client, err := nacos.NewNacosClient(remoteConfig) + base, err := newBaseRegistry(url) if err != nil { - return nil, perrors.WithMessage(err, "create nacos client failed.") + return nil, perrors.WithStack(err) } - - descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) - return &nacosServiceDiscovery{ - group: group, - namingClient: client, - descriptor: descriptor, + nacosBaseRegistry: base, + group: url.GetParam(constant.NACOS_GROUP, defaultGroup), }, nil } diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index b2a4fa53cd96f7f736c0d4f2b00f44b6ba647a89..ca0f24976e988af63297c524101c24d40e55b1f8 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -40,6 +40,35 @@ var ( testName = "test" ) +func Test_newNacosServiceDiscovery(t *testing.T) { + name := "nacos1" + _, err := newNacosServiceDiscovery(name) + + // the ServiceDiscoveryConfig not found + assert.NotNil(t, err) + + sdc := &config.ServiceDiscoveryConfig{ + Protocol: "nacos", + RemoteRef: "mock", + } + config.GetBaseConfig().ServiceDiscoveries[name] = sdc + + _, err = newNacosServiceDiscovery(name) + + // RemoteConfig not found + assert.NotNil(t, err) + + config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ + Address: "console.nacos.io:80", + Timeout: 10 * time.Second, + } + + res, err := newNacosServiceDiscovery(name) + assert.Nil(t, err) + assert.NotNil(t, res) + +} + func TestNacosServiceDiscovery_Destroy(t *testing.T) { prepareData() serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 0adbee203bedb5d26b46fed0cfe0af6ccdb80903..d32227b4f96c75d5ce526745f42317c9decb399d 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -23,6 +23,10 @@ import ( "strconv" "strings" "sync" + + perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/config" ) import ( @@ -48,16 +52,22 @@ const ( protocolName = "service-discovery" ) +var ( + registryInstance *serviceDiscoveryRegistry + registryInitOnce sync.Once +) + func init() { extension.SetRegistry(protocolName, newServiceDiscoveryRegistry) } // serviceDiscoveryRegistry is the implementation of application-level registry. // It's completely different from other registry implementations +// The serviceDiscoveryRegistry should be singleton // This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping // In order to keep compatible with interface-level registry, // 1. when we registry the service, we should create the mapping from service name to application name -// 2. when we sub +// 2. when we subscribe the service, we should find out related application and then find application's information type serviceDiscoveryRegistry struct { lock sync.RWMutex url *common.URL @@ -65,38 +75,51 @@ type serviceDiscoveryRegistry struct { subscribedServices *gxset.HashSet serviceNameMapping mapping.ServiceNameMapping metaDataService service.MetadataService - //cache the registered listen + // cache the registered listen registeredListeners *gxset.HashSet - //all synthesize + // all synthesize subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer - //cache exported urls, serviceName->revision->[]URL + // cache exported urls, serviceName->revision->[]URL serviceRevisionExportedURLsCache map[string]map[string][]common.URL } -func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { - serviceDiscovery, err := creatServiceDiscovery(url) - if err != nil { - return nil, err - } - subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) - subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() - serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) - //TODO it's need to get implement by factory - metaDataService := inmemory.NewMetadataService() - return &serviceDiscoveryRegistry{ - url: url, - serviceDiscovery: serviceDiscovery, - subscribedServices: subscribedServices, - subscribedURLsSynthesizers: subscribedURLsSynthesizers, - registeredListeners: gxset.NewSet(), - serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), - serviceNameMapping: serviceNameMapping, - metaDataService: metaDataService, - }, nil +// newServiceDiscoveryRegistry will return the instance +// if not found, it will create one +func newServiceDiscoveryRegistry(url *common.URL) (res registry.Registry, err error) { + + registryInitOnce.Do(func() { + serviceDiscovery, err := creatServiceDiscovery(url) + if err != nil { + return + } + subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) + subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() + serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) + // TODO it's need to get implement by factory + metaDataService := inmemory.NewMetadataService() + registryInstance = &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: serviceDiscovery, + subscribedServices: subscribedServices, + subscribedURLsSynthesizers: subscribedURLsSynthesizers, + registeredListeners: gxset.NewSet(), + serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), + serviceNameMapping: serviceNameMapping, + metaDataService: metaDataService, + } + }) + res = registryInstance + return } func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - return extension.GetServiceDiscovery(url.Protocol, "TODO") + discovery := url.GetParam(constant.SERVICE_DISCOVERY_KEY, constant.DEFAULT_KEY) + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(discovery) + + if !ok { + return nil, perrors.New("could not find the ServiceDiscoverConfig with name: " + discovery) + } + return extension.GetServiceDiscovery(sdc.Protocol, discovery) } func parseServices(literalServices string) *gxset.HashSet { @@ -113,22 +136,22 @@ func parseServices(literalServices string) *gxset.HashSet { return set } -//GetServiceDiscovery for get serviceDiscovery of the registry +// GetServiceDiscovery for get serviceDiscovery of the registry func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { return s.serviceDiscovery } -//GetUrl for get url of the registry +// GetUrl for get url of the registry func (s *serviceDiscoveryRegistry) GetUrl() common.URL { return *s.url } -//IsAvailable for make sure is't available +// IsAvailable for make sure is't available func (s *serviceDiscoveryRegistry) IsAvailable() bool { return true } -//Destroy for destroy graceful down +// Destroy for destroy graceful down func (s *serviceDiscoveryRegistry) Destroy() { err := s.serviceDiscovery.Destroy() if err != nil { @@ -162,7 +185,7 @@ func shouldRegister(url common.URL) bool { return false } -//Subscribe for listen the change of services that from the exported url +// Subscribe for listen the change of services that from the exported url func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) { if !shouldSubscribe(*url) { return @@ -243,7 +266,7 @@ func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.No if len(subscribedURLs) == 0 { subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances) } - //TODO make sure it's workable + // TODO make sure it's workable for _, url := range subscribedURLs { notify.Notify(®istry.ServiceEvent{ Action: remoting.EventTypeAdd, @@ -461,7 +484,7 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc serviceInstance.GetHost(), serviceInstance.GetPort(), revision) } } else { - //Else, The cache is hit + // Else, The cache is hit logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+ "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(), @@ -499,8 +522,8 @@ func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsa u.RemoveParams(removeParamSet) port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol)) if u.Location != host || u.Port != port { - u.Port = port //reset port - u.Location = host //reset host + u.Port = port // reset port + u.Location = host // reset host } clonedExportedURLs = append(clonedExportedURLs, u) }