From d6a68d5c04c3b12f03c3a89b58a85fb357d49efb Mon Sep 17 00:00:00 2001 From: flycash <mingflycash@gmail.com> Date: Mon, 15 Jun 2020 20:28:06 +0800 Subject: [PATCH] Fix Review And Add UT --- common/extension/event_dispatcher.go | 13 +- common/extension/event_dispatcher_test.go | 111 ++++++++++++++++++ common/extension/metadata_service.go | 5 + .../metadata_service_proxy_factory.go | 5 + .../extension/service_instance_customizer.go | 9 +- .../service_instance_selector_factory.go | 3 + common/extension/service_name_mapping.go | 4 +- .../dispatcher/direct_event_dispatcher.go | 13 +- .../direct_event_dispatcher_test.go | 6 +- common/observer/event.go | 2 + common/observer/listenable.go | 59 ++++++---- common/observer/listenable_test.go | 3 +- common/rpc_service.go | 3 + common/url.go | 9 +- config/base_config.go | 3 +- config/config_loader.go | 10 +- config/instance/metadata_report_test.go | 85 ++++++++++++++ metadata/service/inmemory/service_proxy.go | 14 +-- metadata/service/remote/service_proxy.go | 4 + metadata/service/service.go | 9 ++ ...vent_publishing_service_deiscovery_test.go | 8 +- .../service_discovery_registry.go | 16 ++- 22 files changed, 333 insertions(+), 61 deletions(-) create mode 100644 common/extension/event_dispatcher_test.go create mode 100644 config/instance/metadata_report_test.go diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go index a543910cc..ac71e3b5e 100644 --- a/common/extension/event_dispatcher.go +++ b/common/extension/event_dispatcher.go @@ -19,7 +19,9 @@ package extension import ( "sync" +) +import ( "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/observer" ) @@ -34,12 +36,15 @@ var ( dispatchers = make(map[string]func() observer.EventDispatcher, 8) ) -// SetEventDispatcher by name +// SetEventDispatcher, actually, it doesn't really init the global dispatcher func SetEventDispatcher(name string, v func() observer.EventDispatcher) { dispatchers[name] = v } -// SetAndInitGlobalDispatcher +// SetAndInitGlobalDispatcher will actually init the global dispatcher +// if there is already a global dispatcher, +// it will be override +// if the dispatcher with the name not found, it will panic func SetAndInitGlobalDispatcher(name string) { if len(name) == 0 { name = "direct" @@ -47,8 +52,10 @@ func SetAndInitGlobalDispatcher(name string) { if globalEventDispatcher != nil { logger.Warnf("EventDispatcher already init. It will be replaced") } + if dp, ok := dispatchers[name]; !ok || dp == nil { - panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.") + panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " + + "like github.com/apache/dubbo-go/common/observer/dispatcher ") } globalEventDispatcher = dispatchers[name]() } diff --git a/common/extension/event_dispatcher_test.go b/common/extension/event_dispatcher_test.go new file mode 100644 index 000000000..472360cea --- /dev/null +++ b/common/extension/event_dispatcher_test.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "reflect" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/common/observer" +) + +func TestSetAndInitGlobalDispatcher(t *testing.T) { + mock := &mockEventDispatcher{} + SetEventDispatcher("mock", func() observer.EventDispatcher { + return mock + }) + + SetAndInitGlobalDispatcher("mock") + dispatcher := GetGlobalDispatcher() + assert.NotNil(t, dispatcher) + assert.Equal(t, mock, dispatcher) + + mock1 := &mockEventDispatcher{} + + SetEventDispatcher("mock1", func() observer.EventDispatcher { + return mock1 + }) + + SetAndInitGlobalDispatcher("mock1") + dispatcher = GetGlobalDispatcher() + assert.NotNil(t, dispatcher) + assert.Equal(t, mock1, dispatcher) +} + +func TestAddEventListener(t *testing.T) { + AddEventListener(func() observer.EventListener { + return &mockEventListener{} + }) + + AddEventListener(func() observer.EventListener { + return &mockEventListener{} + }) + + assert.Equal(t, 2, len(initEventListeners)) +} + +type mockEventListener struct { +} + +func (m mockEventListener) GetPriority() int { + panic("implement me") +} + +func (m mockEventListener) OnEvent(e observer.Event) error { + panic("implement me") +} + +func (m mockEventListener) GetEventType() reflect.Type { + panic("implement me") +} + +type mockEventDispatcher struct { +} + +func (m mockEventDispatcher) AddEventListener(listener observer.EventListener) { + panic("implement me") +} + +func (m mockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { + panic("implement me") +} + +func (m mockEventDispatcher) RemoveEventListener(listener observer.EventListener) { + panic("implement me") +} + +func (m mockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { + panic("implement me") +} + +func (m mockEventDispatcher) GetAllEventListeners() []observer.EventListener { + panic("implement me") +} + +func (m mockEventDispatcher) RemoveAllEventListeners() { + panic("implement me") +} + +func (m mockEventDispatcher) Dispatch(event observer.Event) { + panic("implement me") +} diff --git a/common/extension/metadata_service.go b/common/extension/metadata_service.go index 93ff40aa7..1823273b8 100644 --- a/common/extension/metadata_service.go +++ b/common/extension/metadata_service.go @@ -19,7 +19,9 @@ package extension import ( "fmt" +) +import ( "github.com/apache/dubbo-go/metadata/service" ) @@ -28,10 +30,13 @@ var ( metadataServiceInsMap = make(map[string]func() (service.MetadataService, error), 2) ) +// SetMetadataService will store the msType => creator pair func SetMetadataService(msType string, creator func() (service.MetadataService, error)) { metadataServiceInsMap[msType] = creator } +// GetMetadataService will create a MetadataService instance +// it will panic if msType not found func GetMetadataService(msType string) (service.MetadataService, error) { if creator, ok := metadataServiceInsMap[msType]; ok { return creator() diff --git a/common/extension/metadata_service_proxy_factory.go b/common/extension/metadata_service_proxy_factory.go index 35f63c5fa..e8c9e73d7 100644 --- a/common/extension/metadata_service_proxy_factory.go +++ b/common/extension/metadata_service_proxy_factory.go @@ -19,7 +19,9 @@ package extension import ( "fmt" +) +import ( "github.com/apache/dubbo-go/metadata/service" ) @@ -27,10 +29,13 @@ var ( metadataServiceProxyFactoryMap = make(map[string]func() service.MetadataServiceProxyFactory) ) +// SetMetadataServiceProxyFactory store the name-creator pair func SetMetadataServiceProxyFactory(name string, creator func() service.MetadataServiceProxyFactory) { metadataServiceProxyFactoryMap[name] = creator } +// GetMetadataServiceProxyFactory will create an instance. +// it will panic if the factory with name not found func GetMetadataServiceProxyFactory(name string) service.MetadataServiceProxyFactory { if f, ok := metadataServiceProxyFactoryMap[name]; ok { return f() diff --git a/common/extension/service_instance_customizer.go b/common/extension/service_instance_customizer.go index a0e443aff..3ebb3e40f 100644 --- a/common/extension/service_instance_customizer.go +++ b/common/extension/service_instance_customizer.go @@ -19,7 +19,9 @@ package extension import ( "sort" +) +import ( "github.com/apache/dubbo-go/registry" ) @@ -35,15 +37,20 @@ func AddCustomizers(cus registry.ServiceInstanceCustomizer) { } // GetCustomizers will return the sorted customizer +// the result won't be nil func GetCustomizers() []registry.ServiceInstanceCustomizer { return customizers } type customizerSlice []registry.ServiceInstanceCustomizer +// nolint func (c customizerSlice) Len() int { return len(c) } -func (c customizerSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +// nolint +func (c customizerSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] } + +// nolint func (c customizerSlice) Less(i, j int) bool { return c[i].GetPriority() < c[j].GetPriority() } diff --git a/common/extension/service_instance_selector_factory.go b/common/extension/service_instance_selector_factory.go index 3ba3db46e..7776a408d 100644 --- a/common/extension/service_instance_selector_factory.go +++ b/common/extension/service_instance_selector_factory.go @@ -29,10 +29,13 @@ var ( serviceInstanceSelectorMappings = make(map[string]func() instance.ServiceInstanceSelector) ) +// nolint func SetServiceInstanceSelector(name string, f func() instance.ServiceInstanceSelector) { serviceInstanceSelectorMappings[name] = f } +// GetServiceInstanceSelector will create an instance +// it will panic if selector with the @name not found func GetServiceInstanceSelector(name string) (instance.ServiceInstanceSelector, error) { serviceInstanceSelector, ok := serviceInstanceSelectorMappings[name] if !ok { diff --git a/common/extension/service_name_mapping.go b/common/extension/service_name_mapping.go index cd2630198..9e5aac52f 100644 --- a/common/extension/service_name_mapping.go +++ b/common/extension/service_name_mapping.go @@ -17,7 +17,9 @@ package extension -import "github.com/apache/dubbo-go/metadata/mapping" +import ( + "github.com/apache/dubbo-go/metadata/mapping" +) var ( globalNameMappingCreator func() mapping.ServiceNameMapping diff --git a/common/observer/dispatcher/direct_event_dispatcher.go b/common/observer/dispatcher/direct_event_dispatcher.go index 2b7567b47..a2d334ce2 100644 --- a/common/observer/dispatcher/direct_event_dispatcher.go +++ b/common/observer/dispatcher/direct_event_dispatcher.go @@ -36,26 +36,31 @@ func init() { // Align with 2.7.5 // Dispatcher event to listener direct type DirectEventDispatcher struct { - observer.BaseListenable + observer.BaseListener } // NewDirectEventDispatcher ac constructor of DirectEventDispatcher func NewDirectEventDispatcher() observer.EventDispatcher { - return &DirectEventDispatcher{} + return &DirectEventDispatcher{ + BaseListener: observer.NewBaseListener(), + } } // Dispatch event directly +// it lookup the listener by event's type. +// if listener not found, it just return and do nothing func (ded *DirectEventDispatcher) Dispatch(event observer.Event) { if event == nil { logger.Warnf("[DirectEventDispatcher] dispatch event nil") return } eventType := reflect.TypeOf(event).Elem() - value, loaded := ded.ListenersCache.Load(eventType) + ded.Mutex.RLock() + defer ded.Mutex.RUnlock() + listenersSlice, loaded := ded.ListenersCache[eventType] if !loaded { return } - listenersSlice := value.([]observer.EventListener) for _, listener := range listenersSlice { if err := listener.OnEvent(event); err != nil { logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err) diff --git a/common/observer/dispatcher/direct_event_dispatcher_test.go b/common/observer/dispatcher/direct_event_dispatcher_test.go index 355c930a9..12facbb9c 100644 --- a/common/observer/dispatcher/direct_event_dispatcher_test.go +++ b/common/observer/dispatcher/direct_event_dispatcher_test.go @@ -29,7 +29,9 @@ import ( func TestDirectEventDispatcher_Dispatch(t *testing.T) { ded := NewDirectEventDispatcher() - ded.AddEventListener(&TestEventListener{}) + ded.AddEventListener(&TestEventListener{ + BaseListener: observer.NewBaseListener(), + }) ded.AddEventListener(&TestEventListener1{}) ded.Dispatch(&TestEvent{}) ded.Dispatch(nil) @@ -40,7 +42,7 @@ type TestEvent struct { } type TestEventListener struct { - observer.BaseListenable + observer.BaseListener observer.EventListener } diff --git a/common/observer/event.go b/common/observer/event.go index d78179043..209a50c78 100644 --- a/common/observer/event.go +++ b/common/observer/event.go @@ -58,6 +58,8 @@ func (b *BaseEvent) String() string { return fmt.Sprintf("BaseEvent[source = %#v]", b.Source) } +// NewBaseEvent create an BaseEvent instance +// and the Timestamp will be current timestamp func NewBaseEvent(source interface{}) *BaseEvent { return &BaseEvent{ Source: source, diff --git a/common/observer/listenable.go b/common/observer/listenable.go index 7b64aa8f2..887f7a377 100644 --- a/common/observer/listenable.go +++ b/common/observer/listenable.go @@ -33,28 +33,32 @@ type Listenable interface { RemoveAllEventListeners() } -// BaseListenable base listenable -type BaseListenable struct { +// BaseListener base listenable +type BaseListener struct { Listenable - ListenersCache sync.Map - Mutex sync.Mutex + ListenersCache map[reflect.Type][]EventListener + Mutex sync.RWMutex } -// NewBaseListenable a constructor of base listenable -func NewBaseListenable() Listenable { - return &BaseListenable{} +// NewBaseListener a constructor of base listenable +func NewBaseListener() BaseListener { + return BaseListener{ + ListenersCache: make(map[reflect.Type][]EventListener, 8), + } } // AddEventListener add event listener -func (bl *BaseListenable) AddEventListener(listener EventListener) { +func (bl *BaseListener) AddEventListener(listener EventListener) { eventType := listener.GetEventType() if eventType.Kind() == reflect.Ptr { eventType = eventType.Elem() } bl.Mutex.Lock() defer bl.Mutex.Unlock() - value, loaded := bl.ListenersCache.LoadOrStore(eventType, make([]EventListener, 0, 8)) - listenersSlice := value.([]EventListener) + listenersSlice, loaded := bl.ListenersCache[eventType] + if !loaded { + listenersSlice = make([]EventListener, 0, 8) + } // return if listenersSlice already has this listener if loaded && containListener(listenersSlice, listener) { return @@ -63,59 +67,62 @@ func (bl *BaseListenable) AddEventListener(listener EventListener) { sort.Slice(listenersSlice, func(i, j int) bool { return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority() }) - bl.ListenersCache.Store(eventType, listenersSlice) + bl.ListenersCache[eventType] = listenersSlice } // AddEventListeners add the slice of event listener -func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { +func (bl *BaseListener) AddEventListeners(listenersSlice []EventListener) { for _, listener := range listenersSlice { bl.AddEventListener(listener) } } // RemoveEventListener remove the event listener -func (bl *BaseListenable) RemoveEventListener(listener EventListener) { +func (bl *BaseListener) RemoveEventListener(listener EventListener) { eventType := listener.GetEventType() if eventType.Kind() == reflect.Ptr { eventType = eventType.Elem() } bl.Mutex.Lock() defer bl.Mutex.Unlock() - value, loaded := bl.ListenersCache.Load(eventType) + listenersSlice, loaded := bl.ListenersCache[eventType] if !loaded { return } - listenersSlice := value.([]EventListener) for i, l := range listenersSlice { if l == listener { listenersSlice = append(listenersSlice[:i], listenersSlice[i+1:]...) } } - bl.ListenersCache.Store(eventType, listenersSlice) + bl.ListenersCache[eventType] = listenersSlice } // RemoveEventListeners remove the slice of event listener -func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) { +// it will iterate all listener and remove it one by one +func (bl *BaseListener) RemoveEventListeners(listenersSlice []EventListener) { for _, listener := range listenersSlice { bl.RemoveEventListener(listener) } } // RemoveAllEventListeners remove all -func (bl *BaseListenable) RemoveAllEventListeners() { +// using Lock +func (bl *BaseListener) RemoveAllEventListeners() { bl.Mutex.Lock() defer bl.Mutex.Unlock() - bl.ListenersCache = sync.Map{} + bl.ListenersCache = make(map[reflect.Type][]EventListener) } -// GetAllEventListeners get all -func (bl *BaseListenable) GetAllEventListeners() []EventListener { +// GetAllEventListeners get all listener +// using RLock +func (bl *BaseListener) GetAllEventListeners() []EventListener { allListenersSlice := make([]EventListener, 0, 16) - bl.ListenersCache.Range(func(_, value interface{}) bool { - listenersSlice := value.([]EventListener) + + bl.Mutex.RLock() + defer bl.Mutex.RUnlock() + for _, listenersSlice := range bl.ListenersCache { allListenersSlice = append(allListenersSlice, listenersSlice...) - return true - }) + } sort.Slice(allListenersSlice, func(i, j int) bool { return allListenersSlice[i].GetPriority() < allListenersSlice[j].GetPriority() }) @@ -123,6 +130,8 @@ func (bl *BaseListenable) GetAllEventListeners() []EventListener { } // containListener true if contain listener +// it's not thread safe +// usually it should be use in lock scope func containListener(listenersSlice []EventListener, listener EventListener) bool { for _, loadListener := range listenersSlice { if loadListener == listener { diff --git a/common/observer/listenable_test.go b/common/observer/listenable_test.go index df46bfc2b..5a03382a9 100644 --- a/common/observer/listenable_test.go +++ b/common/observer/listenable_test.go @@ -28,7 +28,8 @@ import ( func TestListenable(t *testing.T) { el := &TestEventListener{} - b := &BaseListenable{} + bl := NewBaseListener() + b := &bl b.AddEventListener(el) b.AddEventListener(el) al := b.GetAllEventListeners() diff --git a/common/rpc_service.go b/common/rpc_service.go index 1127e6c01..f9fb145c8 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -343,6 +343,9 @@ func suiteMethod(method reflect.Method) *MethodType { argsType []reflect.Type ) + // this method is in RPCService + // we force users must implement RPCService interface in their provider + // see RPCService if mname == "Reference" { return nil } diff --git a/common/url.go b/common/url.go index 9c3f06549..e0e15739d 100644 --- a/common/url.go +++ b/common/url.go @@ -25,8 +25,6 @@ import ( "net/url" "strconv" "strings" - - "github.com/apache/dubbo-go/common/logger" ) import ( @@ -38,6 +36,7 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" ) // /////////////////////////////// @@ -182,6 +181,7 @@ func WithToken(token string) option { u, err := uuid.NewV4() if err != nil { logger.Errorf("could not generator UUID: %v", err) + return } value = u.String() } @@ -657,16 +657,21 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f return methodConfigMergeFcn } +// URLSlice will be used to sort URL instance +// Instances will be order by URL.String() type URLSlice []URL +// nolint func (s URLSlice) Len() int { return len(s) } +// nolint func (s URLSlice) Less(i, j int) bool { return s[i].String() < s[j].String() } +// nolint func (s URLSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } diff --git a/config/base_config.go b/config/base_config.go index 67313a14f..46ff5fb17 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -40,7 +40,7 @@ type multiConfiger interface { Prefix() string } -// BaseConfig is the event configuration for provider and consumer +// BaseConfig is the common configuration for provider and consumer type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` @@ -60,6 +60,7 @@ type BaseConfig struct { fileStream *bytes.Buffer } +// nolint func (c *BaseConfig) GetServiceDiscoveries(name string) (config *ServiceDiscoveryConfig, ok bool) { config, ok = c.ServiceDiscoveries[name] return diff --git a/config/config_loader.go b/config/config_loader.go index 10a338701..e0cb09d94 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -146,9 +146,9 @@ func loadConsumerConfig() { checkok = false count++ if count > maxWait { - // errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version) - // logger.Error(errMsg) - // panic(errMsg) + errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version) + logger.Error(errMsg) + panic(errMsg) } time.Sleep(time.Second * 1) break @@ -219,7 +219,7 @@ func Load() { // init router initRouter() - // event part + // init the global event dispatcher extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType) // start the metadata report if config set @@ -285,7 +285,6 @@ func GetApplicationConfig() *ApplicationConfig { // if not found, create new one func GetProviderConfig() ProviderConfig { if providerConfig == nil { - logger.Warnf("providerConfig is nil! we will try to create one") if providerConfig == nil { return ProviderConfig{} } @@ -308,7 +307,6 @@ func GetConsumerConfig() ConsumerConfig { } func GetBaseConfig() *BaseConfig { - if baseConfig == nil { baseConfigOnce.Do(func() { baseConfig = &BaseConfig{ diff --git a/config/instance/metadata_report_test.go b/config/instance/metadata_report_test.go new file mode 100644 index 000000000..51780a53f --- /dev/null +++ b/config/instance/metadata_report_test.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package instance + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" +) + +func TestGetMetadataReportInstance(t *testing.T) { + extension.SetMetadataReportFactory("mock", func() factory.MetadataReportFactory { + return &mockMetadataReportFactory{} + }) + u, _ := common.NewURL("mock://127.0.0.1") + rpt := GetMetadataReportInstance(&u) + assert.NotNil(t, rpt) +} + +type mockMetadataReportFactory struct { +} + +func (m *mockMetadataReportFactory) CreateMetadataReport(*common.URL) report.MetadataReport { + return &mockMetadataReport{} +} + +type mockMetadataReport struct { +} + +func (m mockMetadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error { + panic("implement me") +} + +func (m mockMetadataReport) StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error { + panic("implement me") +} + +func (m mockMetadataReport) SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error { + panic("implement me") +} + +func (m mockMetadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error { + panic("implement me") +} + +func (m mockMetadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string { + panic("implement me") +} + +func (m mockMetadataReport) SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error { + panic("implement me") +} + +func (m mockMetadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string { + panic("implement me") +} + +func (m mockMetadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string { + panic("implement me") +} diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go index a62d14457..69e349a0f 100644 --- a/metadata/service/inmemory/service_proxy.go +++ b/metadata/service/inmemory/service_proxy.go @@ -49,15 +49,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st vV := reflect.ValueOf(version) pV := reflect.ValueOf(protocol) - // this is a strange logic - // we should notice that - // when we call java server, the method was register as "getExportedURLs" - // however, if we call golang server, the method was register as "GetExportedURLs" - // it's so tricky... - methodName := "getExportedURLs" - if m.golangServer { - methodName = "GetExportedURLs" - } + const methodName = "getExportedURLs" inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName), invocation.WithArguments([]interface{}{siV.Interface(), gV.Interface(), vV.Interface(), pV.Interface()}), @@ -84,6 +76,10 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st return ret, nil } +func (m *MetadataServiceProxy) MethodMapper() map[string]string { + return map[string]string{} +} + func (m *MetadataServiceProxy) Reference() string { logger.Error("you should never invoke this implementation") return constant.METADATA_SERVICE_NAME diff --git a/metadata/service/remote/service_proxy.go b/metadata/service/remote/service_proxy.go index 8fe2e51d4..5e37d39fd 100644 --- a/metadata/service/remote/service_proxy.go +++ b/metadata/service/remote/service_proxy.go @@ -92,6 +92,10 @@ func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group st return service.ConvertURLArrToIntfArr(res), nil } +func (m *metadataServiceProxy) MethodMapper() map[string]string { + return map[string]string{} +} + func (m *metadataServiceProxy) GetSubscribedURLs() ([]common.URL, error) { logger.Error("you should never invoke this implementation") return []common.URL{}, nil diff --git a/metadata/service/service.go b/metadata/service/service.go index 803a84773..ae0485824 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -46,6 +46,9 @@ type MetadataService interface { // due to dubbo-go only support return array []interface{} in RPCService, so we should declare the return type as []interface{} // actually, it's []String GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) + + MethodMapper() map[string]string + // GetExportedURLs will get the target subscribed url in metadata // the url should be unique GetSubscribedURLs() ([]common.URL, error) @@ -70,6 +73,12 @@ func NewBaseMetadataService(serviceName string) BaseMetadataService { } } +func (mts *BaseMetadataService) MethodMapper() map[string]string { + return map[string]string{ + "GetExportedURLs": "getExportedURLs", + } +} + // ServiceName can get the service's name in meta service , which is application name func (mts *BaseMetadataService) ServiceName() (string, error) { return mts.serviceName, nil diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index ea6a97a32..21bddb7f0 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -46,7 +46,9 @@ func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { config.GetApplicationConfig().MetadataType = "local" dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{}) - tsd := &TestServiceDiscoveryDestroyingEventListener{} + tsd := &TestServiceDiscoveryDestroyingEventListener{ + BaseListener: observer.NewBaseListener(), + } tsd.SetT(t) tsi := &TestServiceInstancePreRegisteredEventListener{} tsi.SetT(t) @@ -68,7 +70,7 @@ func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { type TestServiceDiscoveryDestroyingEventListener struct { suite.Suite - observer.BaseListenable + observer.BaseListener } func (tel *TestServiceDiscoveryDestroyingEventListener) OnEvent(e observer.Event) error { @@ -89,7 +91,7 @@ func (tel *TestServiceDiscoveryDestroyingEventListener) GetEventType() reflect.T type TestServiceInstancePreRegisteredEventListener struct { suite.Suite - observer.BaseListenable + observer.BaseListener } func (tel *TestServiceInstancePreRegisteredEventListener) OnEvent(e observer.Event) error { diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index c6145e663..79194fb08 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -28,6 +28,7 @@ import ( gxset "github.com/dubbogo/gost/container/set" gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" + "go.uber.org/atomic" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -674,8 +675,12 @@ func (icn *InstanceChangeNotify) Notify(event observer.Event) { } } -var exporting = false +var ( + exporting = &atomic.Bool{} +) +// tryInitMetadataService will try to initialize metadata service +// TODO (move to somewhere) func tryInitMetadataService() { ms, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType) @@ -683,11 +688,16 @@ func tryInitMetadataService() { logger.Errorf("could not init metadata service", err) } - if !config.IsProvider() || exporting { + if !config.IsProvider() || exporting.Load() { return } - exporting = true + // In theory, we can use sync.Once + // But sync.Once is not reentrant. + // Now the invocation chain is createRegistry -> tryInitMetadataService -> metadataServiceExporter.export + // -> createRegistry -> initMetadataService... + // So using sync.Once will result in dead lock + exporting.Store(true) expt := configurable.NewMetadataServiceExporter(ms) -- GitLab