From 16b650049f2a711e7c582406c3ba96f6be25f588 Mon Sep 17 00:00:00 2001 From: flycash <mingflycash@gmail.com> Date: Sun, 7 Jun 2020 22:57:39 +0800 Subject: [PATCH] it can be found by java consumer --- common/extension/event_dispatcher.go | 18 +++++++++---- common/extension/service_name_mapping.go | 8 +++--- common/observer/event_listener.go | 1 + .../mapping/dynamic/service_name_mapping.go | 20 ++++++++++++--- .../mapping/memory/service_name_mapping.go | 16 +++++++++++- metadata/report/nacos/report_test.go | 6 ----- metadata/service/inmemory/service.go | 25 +++++++++++++------ .../customizable_service_instance_listener.go | 14 ++++++++++- ...vent_publishing_service_deiscovery_test.go | 16 ++++++++++-- registry/event/log_event_listener.go | 14 ++++++++++- .../metadata_service_url_params_customizer.go | 2 +- .../event/service_config_exported_event.go | 7 +++--- .../event/service_name_mapping_listener.go | 19 +++++++++++--- registry/nacos/service_discovery.go | 10 +++++--- .../service_discovery_registry.go | 3 +-- 15 files changed, 133 insertions(+), 46 deletions(-) diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go index 2d3352825..a543910cc 100644 --- a/common/extension/event_dispatcher.go +++ b/common/extension/event_dispatcher.go @@ -18,13 +18,16 @@ package extension import ( + "sync" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/observer" ) var ( globalEventDispatcher observer.EventDispatcher - initEventListeners []observer.EventListener + initEventListeners []func() observer.EventListener + initEventOnce sync.Once ) var ( @@ -48,15 +51,20 @@ func SetAndInitGlobalDispatcher(name string) { panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.") } globalEventDispatcher = dispatchers[name]() - globalEventDispatcher.AddEventListeners(initEventListeners) } -// GetGlobalDispatcher +// GetGlobalDispatcher will init all listener and then return dispatcher func GetGlobalDispatcher() observer.EventDispatcher { + initEventOnce.Do(func() { + // we should delay to add the listeners to avoid some listeners left + for _, l := range initEventListeners { + globalEventDispatcher.AddEventListener(l()) + } + }) return globalEventDispatcher } // AddEventListener it will be added in global event dispatcher -func AddEventListener(listener observer.EventListener) { - initEventListeners = append(initEventListeners, listener) +func AddEventListener(creator func() observer.EventListener) { + initEventListeners = append(initEventListeners, creator) } diff --git a/common/extension/service_name_mapping.go b/common/extension/service_name_mapping.go index 0330b8a4d..cd2630198 100644 --- a/common/extension/service_name_mapping.go +++ b/common/extension/service_name_mapping.go @@ -20,13 +20,13 @@ package extension import "github.com/apache/dubbo-go/metadata/mapping" var ( - globalNameMapping mapping.ServiceNameMapping + globalNameMappingCreator func() mapping.ServiceNameMapping ) -func SetGlobalServiceNameMapping(nameMapping mapping.ServiceNameMapping) { - globalNameMapping = nameMapping +func SetGlobalServiceNameMapping(nameMappingCreator func() mapping.ServiceNameMapping) { + globalNameMappingCreator = nameMappingCreator } func GetGlobalServiceNameMapping() mapping.ServiceNameMapping { - return globalNameMapping + return globalNameMappingCreator() } diff --git a/common/observer/event_listener.go b/common/observer/event_listener.go index f4423d7dd..faa8705a4 100644 --- a/common/observer/event_listener.go +++ b/common/observer/event_listener.go @@ -29,6 +29,7 @@ import ( // It contains the Prioritized means that the listener has its priority // Usually the priority of your custom implementation should be between [100, 9000] // the number outside the range will be though as system reserve number +// usually implementation should be singleton type EventListener interface { gxsort.Prioritizer // OnEvent handle this event diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 9f65adf73..9a8d8299a 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -19,11 +19,12 @@ package dynamic import ( "strconv" + "sync" "time" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/mapping" ) import ( @@ -39,16 +40,16 @@ import ( ) const ( - defaultGroup = config_center.DEFAULT_GROUP + defaultGroup = "mapping" slash = "/" ) func init() { - dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() - extension.SetGlobalServiceNameMapping(&DynamicConfigurationServiceNameMapping{dc: dc}) + extension.SetGlobalServiceNameMapping(GetNameMappingInstance) } // DynamicConfigurationServiceNameMapping is the implementation based on config center +// it's a singleton type DynamicConfigurationServiceNameMapping struct { dc config_center.DynamicConfiguration } @@ -85,3 +86,14 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str // so other params are ignored and remove, including group string, version string, protocol string return defaultGroup + slash + serviceInterface } + +var serviceNameMappingInstance *DynamicConfigurationServiceNameMapping +var serviceNameMappingOnce sync.Once + +func GetNameMappingInstance() mapping.ServiceNameMapping { + serviceNameMappingOnce.Do(func() { + dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() + serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc} + }) + return serviceNameMappingInstance +} diff --git a/metadata/mapping/memory/service_name_mapping.go b/metadata/mapping/memory/service_name_mapping.go index 293d97c84..ef2e5fa06 100644 --- a/metadata/mapping/memory/service_name_mapping.go +++ b/metadata/mapping/memory/service_name_mapping.go @@ -18,7 +18,11 @@ package memory import ( + "sync" + gxset "github.com/dubbogo/gost/container/set" + + "github.com/apache/dubbo-go/metadata/mapping" ) import ( @@ -27,7 +31,7 @@ import ( ) func init() { - extension.SetGlobalServiceNameMapping(&InMemoryServiceNameMapping{}) + extension.SetGlobalServiceNameMapping(GetNameMappingInstance) } type InMemoryServiceNameMapping struct{} @@ -39,3 +43,13 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { return gxset.NewSet(config.GetApplicationConfig().Name), nil } + +var serviceNameMappingInstance *InMemoryServiceNameMapping +var serviceNameMappingOnce sync.Once + +func GetNameMappingInstance() mapping.ServiceNameMapping { + serviceNameMappingOnce.Do(func() { + serviceNameMappingInstance = &InMemoryServiceNameMapping{} + }) + return serviceNameMappingInstance +} diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go index f23ed8a8d..88aec72a0 100644 --- a/metadata/report/nacos/report_test.go +++ b/metadata/report/nacos/report_test.go @@ -49,18 +49,12 @@ func TestNacosMetadataReport_CRUD(t *testing.T) { err = rpt.SaveServiceMetadata(serviceMi, serviceUrl) assert.Nil(t, err) - exportedUrls := rpt.GetExportedURLs(serviceMi) - assert.Equal(t, 1, len(exportedUrls)) - subMi := newSubscribeMetadataIdentifier() urlList := make([]common.URL, 0, 1) urlList = append(urlList, serviceUrl) err = rpt.SaveSubscribedData(subMi, urlList) assert.Nil(t, err) - subscribeUrl := rpt.GetSubscribedURLs(subMi) - assert.Equal(t, 1, len(subscribeUrl)) - err = rpt.RemoveServiceMetadata(serviceMi) assert.Nil(t, err) diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index 4327304ad..c9d2ed47b 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -53,15 +53,24 @@ type MetadataService struct { lock *sync.RWMutex } +var ( + metadataServiceInstance *MetadataService + metadataServiceInitOnce sync.Once +) + // NewMetadataService: initiate a metadata service +// it should be singleton func NewMetadataService() (service.MetadataService, error) { - return &MetadataService{ - BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name), - exportedServiceURLs: &sync.Map{}, - subscribedServiceURLs: &sync.Map{}, - serviceDefinitions: &sync.Map{}, - lock: &sync.RWMutex{}, - }, nil + metadataServiceInitOnce.Do(func() { + metadataServiceInstance = &MetadataService{ + BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name), + exportedServiceURLs: &sync.Map{}, + subscribedServiceURLs: &sync.Map{}, + serviceDefinitions: &sync.Map{}, + lock: &sync.RWMutex{}, + } + }) + return metadataServiceInstance, nil } // Comparator is defined as Comparator for skip list to compare the URL @@ -149,7 +158,7 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s urls := serviceList.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { url := common.URL(urls.ByPosition(i).(Comparator)) - if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol { + if len(protocol) == 0 || protocol == constant.ANY_VALUE || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol { res = append(res, url) } } diff --git a/registry/event/customizable_service_instance_listener.go b/registry/event/customizable_service_instance_listener.go index cd4a8d2db..89d162197 100644 --- a/registry/event/customizable_service_instance_listener.go +++ b/registry/event/customizable_service_instance_listener.go @@ -19,15 +19,17 @@ package event import ( "reflect" + "sync" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" ) func init() { - extension.AddEventListener(&customizableServiceInstanceListener{}) + extension.AddEventListener(GetCustomizableServiceInstanceListener) } +// customizableServiceInstanceListener is singleton type customizableServiceInstanceListener struct { } @@ -53,3 +55,13 @@ func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error { func (c *customizableServiceInstanceListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{}) } + +var customizableServiceInstanceListenerInstance *customizableServiceInstanceListener +var customizableServiceInstanceListenerOnce sync.Once + +func GetCustomizableServiceInstanceListener() observer.EventListener { + customizableServiceInstanceListenerOnce.Do(func() { + customizableServiceInstanceListenerInstance = &customizableServiceInstanceListener{} + }) + return customizableServiceInstanceListenerInstance +} diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index 27d2482d0..ea6a97a32 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -20,6 +20,9 @@ package event import ( "reflect" "testing" + + "github.com/apache/dubbo-go/config" + _ "github.com/apache/dubbo-go/metadata/service/inmemory" ) import ( @@ -37,13 +40,22 @@ import ( ) func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { + + // extension.SetMetadataService("local", inmemory.NewMetadataService) + + config.GetApplicationConfig().MetadataType = "local" + dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{}) tsd := &TestServiceDiscoveryDestroyingEventListener{} tsd.SetT(t) tsi := &TestServiceInstancePreRegisteredEventListener{} tsi.SetT(t) - extension.AddEventListener(tsd) - extension.AddEventListener(tsi) + extension.AddEventListener(func() observer.EventListener { + return tsd + }) + extension.AddEventListener(func() observer.EventListener { + return tsi + }) extension.SetEventDispatcher("direct", dispatcher2.NewDirectEventDispatcher) extension.SetAndInitGlobalDispatcher("direct") err := dc.Destroy() diff --git a/registry/event/log_event_listener.go b/registry/event/log_event_listener.go index c52b02a78..a06d5e449 100644 --- a/registry/event/log_event_listener.go +++ b/registry/event/log_event_listener.go @@ -19,6 +19,7 @@ package event import ( "reflect" + "sync" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -26,9 +27,10 @@ import ( ) func init() { - extension.AddEventListener(&logEventListener{}) + extension.AddEventListener(GetLogEventListener) } +// logEventListener is singleton type logEventListener struct { } @@ -44,3 +46,13 @@ func (l *logEventListener) OnEvent(e observer.Event) error { func (l *logEventListener) GetEventType() reflect.Type { return reflect.TypeOf(&observer.BaseEvent{}) } + +var logEventListenerInstance *logEventListener +var logEventListenerOnce sync.Once + +func GetLogEventListener() observer.EventListener { + logEventListenerOnce.Do(func() { + logEventListenerInstance = &logEventListener{} + }) + return logEventListenerInstance +} diff --git a/registry/event/metadata_service_url_params_customizer.go b/registry/event/metadata_service_url_params_customizer.go index e5ff2bc26..811d9308e 100644 --- a/registry/event/metadata_service_url_params_customizer.go +++ b/registry/event/metadata_service_url_params_customizer.go @@ -84,7 +84,7 @@ func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []comm p := make(map[string]string, len(u.GetParams())) for k, v := range u.GetParams() { // we will ignore that - if m.exceptKeys.Contains(k) || len(v) == 0 { + if m.exceptKeys.Contains(k) || len(v) == 0 || len(v[0]) == 0 { continue } p[k] = v[0] diff --git a/registry/event/service_config_exported_event.go b/registry/event/service_config_exported_event.go index c3d8eee92..7946609ac 100644 --- a/registry/event/service_config_exported_event.go +++ b/registry/event/service_config_exported_event.go @@ -31,11 +31,10 @@ type ServiceConfigExportedEvent struct { func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) *ServiceConfigExportedEvent { return &ServiceConfigExportedEvent{ - BaseEvent: observer.BaseEvent{ - Source:serviceConfig, - Timestamp:time.Now(), + BaseEvent: observer.BaseEvent{ + Source: serviceConfig, + Timestamp: time.Now(), }, ServiceConfig: serviceConfig, } } - diff --git a/registry/event/service_name_mapping_listener.go b/registry/event/service_name_mapping_listener.go index 480c51cc5..68cf588c6 100644 --- a/registry/event/service_name_mapping_listener.go +++ b/registry/event/service_name_mapping_listener.go @@ -19,6 +19,7 @@ package event import ( "reflect" + "sync" perrors "github.com/pkg/errors" @@ -29,9 +30,7 @@ import ( ) func init() { - extension.AddEventListener(&serviceNameMappingListener{ - nameMapping: extension.GetGlobalServiceNameMapping(), - }) + extension.AddEventListener(GetCustomizableServiceInstanceListener) } type serviceNameMappingListener struct { @@ -64,3 +63,17 @@ func (s *serviceNameMappingListener) OnEvent(e observer.Event) error { func (s *serviceNameMappingListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceConfigExportedEvent{}) } + +var ( + serviceNameMappingListenerInstance *serviceNameMappingListener + serviceNameMappingListenerOnce sync.Once +) + +func GetServiceNameMappingListener() observer.EventListener { + serviceNameMappingListenerOnce.Do(func() { + serviceNameMappingListenerInstance = &serviceNameMappingListener{ + nameMapping: extension.GetGlobalServiceNameMapping(), + } + }) + return serviceNameMappingListenerInstance +} diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 6b1d3c23d..5b952e254 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -264,10 +264,12 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInst Ip: instance.GetHost(), Port: uint64(instance.GetPort()), Metadata: metadata, - Enable: instance.IsEnable(), - Healthy: instance.IsHealthy(), - GroupName: n.group, - Ephemeral: true, + // We must specify the weight since Java nacos client will ignore the instance whose weight is 0 + Weight: 1, + Enable: instance.IsEnable(), + Healthy: instance.IsHealthy(), + GroupName: n.group, + Ephemeral: true, } } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 39b2b1c28..aad4b2641 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -38,7 +38,6 @@ import ( "github.com/apache/dubbo-go/metadata/mapping" "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/exporter/configurable" - "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry/event" "github.com/apache/dubbo-go/registry/servicediscovery/proxy" @@ -89,7 +88,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() serviceNameMapping := extension.GetGlobalServiceNameMapping() - metaDataService, err := remote.NewMetadataService() + metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType) if err != nil { return nil, perrors.WithMessage(err, "could not init metadata service") } -- GitLab