From 613e5e777cfdae26913d88ab4f4ecca87bdfd6e9 Mon Sep 17 00:00:00 2001 From: Patrick <dreamlike.sky@foxmail.com> Date: Thu, 26 Mar 2020 20:33:32 +0800 Subject: [PATCH] dispatcher in extension --- common/extension/event_dispatcher.go | 34 ++++++++++++++++--- .../direct_event_dispatcher.go | 28 +++++++-------- common/observer/listenable.go | 32 +++++++---------- common/observer/listenable_test.go | 5 +-- config/base_config.go | 15 ++++---- config/config_loader.go | 12 +++++++ .../ServiceInstancesChangedListener.go | 5 +++ registry/service_discovery.go | 2 +- 8 files changed, 83 insertions(+), 50 deletions(-) rename common/observer/{ => dispatcher}/direct_event_dispatcher.go (72%) rename {common/observer => registry}/listener/ServiceInstancesChangedListener.go (91%) diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go index 9bc784246..d2349e0db 100644 --- a/common/extension/event_dispatcher.go +++ b/common/extension/event_dispatcher.go @@ -17,16 +17,42 @@ package extension -import "github.com/apache/dubbo-go/common/observer" +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/prometheus/common/log" +) -func GetDispatcherEvent(name string) { +var eventListeners []observer.EventListener -} +var globalEventDispatcher observer.EventDispatcher -func SetDefaultDispatcherEvent() { +var ( + dispatchers = make(map[string]func() observer.EventDispatcher, 8) +) +func SetEventDispatcher(name string, v func() observer.EventDispatcher) { + dispatchers[name] = v } func AddEventListener(listener observer.EventListener) { + eventListeners = append(eventListeners, listener) +} + +// s +func SetAndInitGlobalDispatcher(name string) { + if len(name) == 0 { + name = "direct" + } + if globalEventDispatcher != nil { + log.Warnf("EventDispatcher already init. It will be replaced") + } + if dispatchers[name] == nil { + panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.") + } + globalEventDispatcher = dispatchers[name]() + globalEventDispatcher.AddEventListeners(eventListeners) +} +func GetGlobalDispatcher() observer.EventDispatcher { + return globalEventDispatcher } diff --git a/common/observer/direct_event_dispatcher.go b/common/observer/dispatcher/direct_event_dispatcher.go similarity index 72% rename from common/observer/direct_event_dispatcher.go rename to common/observer/dispatcher/direct_event_dispatcher.go index ddd999c2a..ab53ef0b2 100644 --- a/common/observer/direct_event_dispatcher.go +++ b/common/observer/dispatcher/direct_event_dispatcher.go @@ -15,46 +15,42 @@ * limitations under the License. */ -package observer +package dispatcher import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" "reflect" ) -var directEventDispatcher *DirectEventDispatcher +func init() { + extension.SetEventDispatcher("direct", NewDirectEventDispatcher) +} // DirectEventDispatcher is align with DirectEventDispatcher interface in Java. // it's the top abstraction // Align with 2.7.5 // Dispatcher event to listener direct type DirectEventDispatcher struct { - BaseListenable - EventDispatcher + observer.BaseListenable + observer.EventDispatcher } -func NewDirectEventDispatcher() *DirectEventDispatcher { +func NewDirectEventDispatcher() observer.EventDispatcher { return &DirectEventDispatcher{} } -func (ded *DirectEventDispatcher) Dispatch(event Event) { +func (ded *DirectEventDispatcher) Dispatch(event observer.Event) { eventType := reflect.TypeOf(event).Elem() - value, loaded := ded.listenersCache.Load(eventType) + value, loaded := ded.ListenersCache.Load(eventType) if !loaded { return } - listenersSlice := value.([]EventListener) + listenersSlice := value.([]observer.EventListener) for _, listener := range listenersSlice { if err := listener.OnEvent(event); err != nil { logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err) } } } - -// GetSingleDirectEventDispatcher ... -func GetSingleDirectEventDispatcher() EventDispatcher { - if directEventDispatcher == nil { - directEventDispatcher = NewDirectEventDispatcher() - } - return directEventDispatcher -} diff --git a/common/observer/listenable.go b/common/observer/listenable.go index f91e96902..4ceb03b0a 100644 --- a/common/observer/listenable.go +++ b/common/observer/listenable.go @@ -33,16 +33,16 @@ type Listenable interface { type BaseListenable struct { Listenable - listenersCache sync.Map - mutex sync.Mutex + ListenersCache sync.Map + Mutex sync.Mutex } func (bl *BaseListenable) AddEventListener(listener EventListener) { eventType := listener.GetEventType() var listenersSlice []EventListener - bl.mutex.Lock() - defer bl.mutex.Unlock() - if value, loaded := bl.listenersCache.Load(eventType); loaded { + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + if value, loaded := bl.ListenersCache.Load(eventType); loaded { listenersSlice = value.([]EventListener) if !containListener(listenersSlice, listener) { listenersSlice = append(listenersSlice, listener) @@ -54,12 +54,10 @@ func (bl *BaseListenable) AddEventListener(listener EventListener) { sort.Slice(listenersSlice, func(i, j int) bool { return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority() }) - bl.listenersCache.Store(eventType, listenersSlice) + bl.ListenersCache.Store(eventType, listenersSlice) } func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { - bl.mutex.Lock() - defer bl.mutex.Unlock() for _, listener := range listenersSlice { bl.AddEventListener(listener) } @@ -67,9 +65,9 @@ func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { func (bl *BaseListenable) RemoveEventListener(listener EventListener) { eventType := listener.GetEventType() - bl.mutex.Lock() - defer bl.mutex.Unlock() - value, loaded := bl.listenersCache.Load(eventType) + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + value, loaded := bl.ListenersCache.Load(eventType) if !loaded { return } @@ -82,24 +80,20 @@ func (bl *BaseListenable) RemoveEventListener(listener EventListener) { } func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) { - bl.mutex.Lock() - defer bl.mutex.Unlock() for _, listener := range listenersSlice { bl.RemoveEventListener(listener) } } func (bl *BaseListenable) RemoveAllEventListeners() { - bl.mutex.Lock() - defer bl.mutex.Unlock() - bl.listenersCache = *new(sync.Map) + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + bl.ListenersCache = *new(sync.Map) } func (bl *BaseListenable) GetAllEventListeners() []EventListener { - bl.mutex.Lock() - defer bl.mutex.Unlock() allListenersSlice := make([]EventListener, 0, 16) - bl.listenersCache.Range(func(_, value interface{}) bool { + bl.ListenersCache.Range(func(_, value interface{}) bool { listenersSlice := value.([]EventListener) allListenersSlice = append(allListenersSlice, listenersSlice...) return true diff --git a/common/observer/listenable_test.go b/common/observer/listenable_test.go index 35843bc91..d1a16176f 100644 --- a/common/observer/listenable_test.go +++ b/common/observer/listenable_test.go @@ -18,13 +18,14 @@ package observer import ( + "github.com/apache/dubbo-go/registry/listener" "github.com/stretchr/testify/assert" "testing" ) func TestListenable(t *testing.T) { - var b EventListener = &ServiceInstancesChangedListener{} - var a EventListener = &ServiceInstancesChangedListener{} + var b EventListener = &listener.ServiceInstancesChangedListener{} + var a EventListener = &listener.ServiceInstancesChangedListener{} assert.True(t, b == a) } diff --git a/config/base_config.go b/config/base_config.go index 93c0ce6a6..74634a5f6 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -42,14 +42,13 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { - ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` - configCenterUrl *common.URL - prefix string - fatherConfig interface{} - - MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` - - fileStream *bytes.Buffer + ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + configCenterUrl *common.URL + prefix string + fatherConfig interface{} + eventDispatcherType string `yaml:"direct" json:"direct,omitempty"` + MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` + fileStream *bytes.Buffer } // startConfigCenter will start the config center. diff --git a/config/config_loader.go b/config/config_loader.go index c0687d8fc..7af073923 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -33,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + _ "github.com/apache/dubbo-go/common/observer/dispatcher" ) var ( @@ -91,6 +92,17 @@ func Load() { } } + var eventDispatcherType string + if consumerConfig != nil { + eventDispatcherType = consumerConfig.eventDispatcherType + } + // notice consumerConfig.eventDispatcherType will be replaced + if providerConfig != nil { + eventDispatcherType = providerConfig.eventDispatcherType + } + // init EventDispatcher + extension.SetAndInitGlobalDispatcher(eventDispatcherType) + // reference config if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") diff --git a/common/observer/listener/ServiceInstancesChangedListener.go b/registry/listener/ServiceInstancesChangedListener.go similarity index 91% rename from common/observer/listener/ServiceInstancesChangedListener.go rename to registry/listener/ServiceInstancesChangedListener.go index 6ba680e2a..cd48ceb4f 100644 --- a/common/observer/listener/ServiceInstancesChangedListener.go +++ b/registry/listener/ServiceInstancesChangedListener.go @@ -18,11 +18,16 @@ package listener import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/common/observer/event" "reflect" ) +func init() { + extension.AddEventListener(&ServiceInstancesChangedListener{}) +} + // TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { observer.EventListener diff --git a/registry/service_discovery.go b/registry/service_discovery.go index 5a65e9f67..b3ebc0556 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -29,7 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/observer/event" - eventlistener "github.com/apache/dubbo-go/common/observer/listener" + eventlistener "github.com/apache/dubbo-go/registry/listener" ) type ServiceDiscovery interface { -- GitLab