diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go index 9bc784246927e298db160bd3c0e1b4a33bd67ace..d2349e0db8d033863d01320baf47a24043c9b951 100644 --- a/common/extension/event_dispatcher.go +++ b/common/extension/event_dispatcher.go @@ -17,16 +17,42 @@ package extension -import "github.com/apache/dubbo-go/common/observer" +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/prometheus/common/log" +) -func GetDispatcherEvent(name string) { +var eventListeners []observer.EventListener -} +var globalEventDispatcher observer.EventDispatcher -func SetDefaultDispatcherEvent() { +var ( + dispatchers = make(map[string]func() observer.EventDispatcher, 8) +) +func SetEventDispatcher(name string, v func() observer.EventDispatcher) { + dispatchers[name] = v } func AddEventListener(listener observer.EventListener) { + eventListeners = append(eventListeners, listener) +} + +// s +func SetAndInitGlobalDispatcher(name string) { + if len(name) == 0 { + name = "direct" + } + if globalEventDispatcher != nil { + log.Warnf("EventDispatcher already init. It will be replaced") + } + if dispatchers[name] == nil { + panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.") + } + globalEventDispatcher = dispatchers[name]() + globalEventDispatcher.AddEventListeners(eventListeners) +} +func GetGlobalDispatcher() observer.EventDispatcher { + return globalEventDispatcher } diff --git a/common/observer/direct_event_dispatcher.go b/common/observer/dispatcher/direct_event_dispatcher.go similarity index 72% rename from common/observer/direct_event_dispatcher.go rename to common/observer/dispatcher/direct_event_dispatcher.go index ddd999c2a585d840a2eec8ede8da4f58cd98c249..ab53ef0b2f52c6f107b2400b7b2c9af74d473c19 100644 --- a/common/observer/direct_event_dispatcher.go +++ b/common/observer/dispatcher/direct_event_dispatcher.go @@ -15,46 +15,42 @@ * limitations under the License. */ -package observer +package dispatcher import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" "reflect" ) -var directEventDispatcher *DirectEventDispatcher +func init() { + extension.SetEventDispatcher("direct", NewDirectEventDispatcher) +} // DirectEventDispatcher is align with DirectEventDispatcher interface in Java. // it's the top abstraction // Align with 2.7.5 // Dispatcher event to listener direct type DirectEventDispatcher struct { - BaseListenable - EventDispatcher + observer.BaseListenable + observer.EventDispatcher } -func NewDirectEventDispatcher() *DirectEventDispatcher { +func NewDirectEventDispatcher() observer.EventDispatcher { return &DirectEventDispatcher{} } -func (ded *DirectEventDispatcher) Dispatch(event Event) { +func (ded *DirectEventDispatcher) Dispatch(event observer.Event) { eventType := reflect.TypeOf(event).Elem() - value, loaded := ded.listenersCache.Load(eventType) + value, loaded := ded.ListenersCache.Load(eventType) if !loaded { return } - listenersSlice := value.([]EventListener) + listenersSlice := value.([]observer.EventListener) for _, listener := range listenersSlice { if err := listener.OnEvent(event); err != nil { logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err) } } } - -// GetSingleDirectEventDispatcher ... -func GetSingleDirectEventDispatcher() EventDispatcher { - if directEventDispatcher == nil { - directEventDispatcher = NewDirectEventDispatcher() - } - return directEventDispatcher -} diff --git a/common/observer/listenable.go b/common/observer/listenable.go index f91e96902c5dfd7a9a91cfbccce2d88ff8dc2280..4ceb03b0aeec5f57b4674f70199d5aac96c71a21 100644 --- a/common/observer/listenable.go +++ b/common/observer/listenable.go @@ -33,16 +33,16 @@ type Listenable interface { type BaseListenable struct { Listenable - listenersCache sync.Map - mutex sync.Mutex + ListenersCache sync.Map + Mutex sync.Mutex } func (bl *BaseListenable) AddEventListener(listener EventListener) { eventType := listener.GetEventType() var listenersSlice []EventListener - bl.mutex.Lock() - defer bl.mutex.Unlock() - if value, loaded := bl.listenersCache.Load(eventType); loaded { + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + if value, loaded := bl.ListenersCache.Load(eventType); loaded { listenersSlice = value.([]EventListener) if !containListener(listenersSlice, listener) { listenersSlice = append(listenersSlice, listener) @@ -54,12 +54,10 @@ func (bl *BaseListenable) AddEventListener(listener EventListener) { sort.Slice(listenersSlice, func(i, j int) bool { return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority() }) - bl.listenersCache.Store(eventType, listenersSlice) + bl.ListenersCache.Store(eventType, listenersSlice) } func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { - bl.mutex.Lock() - defer bl.mutex.Unlock() for _, listener := range listenersSlice { bl.AddEventListener(listener) } @@ -67,9 +65,9 @@ func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { func (bl *BaseListenable) RemoveEventListener(listener EventListener) { eventType := listener.GetEventType() - bl.mutex.Lock() - defer bl.mutex.Unlock() - value, loaded := bl.listenersCache.Load(eventType) + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + value, loaded := bl.ListenersCache.Load(eventType) if !loaded { return } @@ -82,24 +80,20 @@ func (bl *BaseListenable) RemoveEventListener(listener EventListener) { } func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) { - bl.mutex.Lock() - defer bl.mutex.Unlock() for _, listener := range listenersSlice { bl.RemoveEventListener(listener) } } func (bl *BaseListenable) RemoveAllEventListeners() { - bl.mutex.Lock() - defer bl.mutex.Unlock() - bl.listenersCache = *new(sync.Map) + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + bl.ListenersCache = *new(sync.Map) } func (bl *BaseListenable) GetAllEventListeners() []EventListener { - bl.mutex.Lock() - defer bl.mutex.Unlock() allListenersSlice := make([]EventListener, 0, 16) - bl.listenersCache.Range(func(_, value interface{}) bool { + bl.ListenersCache.Range(func(_, value interface{}) bool { listenersSlice := value.([]EventListener) allListenersSlice = append(allListenersSlice, listenersSlice...) return true diff --git a/common/observer/listenable_test.go b/common/observer/listenable_test.go index 35843bc91a5d7d0f7c093fba4188b1ea5b62d340..d1a16176f03d7187b870f992e1444e3a71dfdddd 100644 --- a/common/observer/listenable_test.go +++ b/common/observer/listenable_test.go @@ -18,13 +18,14 @@ package observer import ( + "github.com/apache/dubbo-go/registry/listener" "github.com/stretchr/testify/assert" "testing" ) func TestListenable(t *testing.T) { - var b EventListener = &ServiceInstancesChangedListener{} - var a EventListener = &ServiceInstancesChangedListener{} + var b EventListener = &listener.ServiceInstancesChangedListener{} + var a EventListener = &listener.ServiceInstancesChangedListener{} assert.True(t, b == a) } diff --git a/config/base_config.go b/config/base_config.go index 93c0ce6a6692193e7ea7b1b9f2f74e9eaed0c858..74634a5f63c6fab36fb1597c8f843c22d2ec5ee1 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -42,14 +42,13 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { - ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` - configCenterUrl *common.URL - prefix string - fatherConfig interface{} - - MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` - - fileStream *bytes.Buffer + ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + configCenterUrl *common.URL + prefix string + fatherConfig interface{} + eventDispatcherType string `yaml:"direct" json:"direct,omitempty"` + MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` + fileStream *bytes.Buffer } // startConfigCenter will start the config center. diff --git a/config/config_loader.go b/config/config_loader.go index c0687d8fc162331afc5098e347d4bbba6a1750c6..7af073923e77c424870003215bee17070e1f962d 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -33,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + _ "github.com/apache/dubbo-go/common/observer/dispatcher" ) var ( @@ -91,6 +92,17 @@ func Load() { } } + var eventDispatcherType string + if consumerConfig != nil { + eventDispatcherType = consumerConfig.eventDispatcherType + } + // notice consumerConfig.eventDispatcherType will be replaced + if providerConfig != nil { + eventDispatcherType = providerConfig.eventDispatcherType + } + // init EventDispatcher + extension.SetAndInitGlobalDispatcher(eventDispatcherType) + // reference config if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") diff --git a/common/observer/listener/ServiceInstancesChangedListener.go b/registry/listener/ServiceInstancesChangedListener.go similarity index 91% rename from common/observer/listener/ServiceInstancesChangedListener.go rename to registry/listener/ServiceInstancesChangedListener.go index 6ba680e2a72de30f3e3d59d11cc1f6deeb82726c..cd48ceb4f2fa7bc135bfb8fab89e1a137a8fd772 100644 --- a/common/observer/listener/ServiceInstancesChangedListener.go +++ b/registry/listener/ServiceInstancesChangedListener.go @@ -18,11 +18,16 @@ package listener import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/common/observer/event" "reflect" ) +func init() { + extension.AddEventListener(&ServiceInstancesChangedListener{}) +} + // TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { observer.EventListener diff --git a/registry/service_discovery.go b/registry/service_discovery.go index 5a65e9f679645fc1d52602ee9b8f6e7c22b38461..b3ebc05564d385607674f8f0a9caf79bbf5bd0a3 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -29,7 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/observer/event" - eventlistener "github.com/apache/dubbo-go/common/observer/listener" + eventlistener "github.com/apache/dubbo-go/registry/listener" ) type ServiceDiscovery interface {