Skip to content
Snippets Groups Projects
Commit 613e5e77 authored by Patrick's avatar Patrick
Browse files

dispatcher in extension

parent 37772947
No related branches found
No related tags found
No related merge requests found
...@@ -17,16 +17,42 @@ ...@@ -17,16 +17,42 @@
package extension package extension
import "github.com/apache/dubbo-go/common/observer" import (
"github.com/apache/dubbo-go/common/observer"
"github.com/prometheus/common/log"
)
func GetDispatcherEvent(name string) { var eventListeners []observer.EventListener
} var globalEventDispatcher observer.EventDispatcher
func SetDefaultDispatcherEvent() { var (
dispatchers = make(map[string]func() observer.EventDispatcher, 8)
)
func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
dispatchers[name] = v
} }
func AddEventListener(listener observer.EventListener) { func AddEventListener(listener observer.EventListener) {
eventListeners = append(eventListeners, listener)
}
// s
func SetAndInitGlobalDispatcher(name string) {
if len(name) == 0 {
name = "direct"
}
if globalEventDispatcher != nil {
log.Warnf("EventDispatcher already init. It will be replaced")
}
if dispatchers[name] == nil {
panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
}
globalEventDispatcher = dispatchers[name]()
globalEventDispatcher.AddEventListeners(eventListeners)
}
func GetGlobalDispatcher() observer.EventDispatcher {
return globalEventDispatcher
} }
...@@ -15,46 +15,42 @@ ...@@ -15,46 +15,42 @@
* limitations under the License. * limitations under the License.
*/ */
package observer package dispatcher
import ( import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
"reflect" "reflect"
) )
var directEventDispatcher *DirectEventDispatcher func init() {
extension.SetEventDispatcher("direct", NewDirectEventDispatcher)
}
// DirectEventDispatcher is align with DirectEventDispatcher interface in Java. // DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
// it's the top abstraction // it's the top abstraction
// Align with 2.7.5 // Align with 2.7.5
// Dispatcher event to listener direct // Dispatcher event to listener direct
type DirectEventDispatcher struct { type DirectEventDispatcher struct {
BaseListenable observer.BaseListenable
EventDispatcher observer.EventDispatcher
} }
func NewDirectEventDispatcher() *DirectEventDispatcher { func NewDirectEventDispatcher() observer.EventDispatcher {
return &DirectEventDispatcher{} return &DirectEventDispatcher{}
} }
func (ded *DirectEventDispatcher) Dispatch(event Event) { func (ded *DirectEventDispatcher) Dispatch(event observer.Event) {
eventType := reflect.TypeOf(event).Elem() eventType := reflect.TypeOf(event).Elem()
value, loaded := ded.listenersCache.Load(eventType) value, loaded := ded.ListenersCache.Load(eventType)
if !loaded { if !loaded {
return return
} }
listenersSlice := value.([]EventListener) listenersSlice := value.([]observer.EventListener)
for _, listener := range listenersSlice { for _, listener := range listenersSlice {
if err := listener.OnEvent(event); err != nil { if err := listener.OnEvent(event); err != nil {
logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err) logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err)
} }
} }
} }
// GetSingleDirectEventDispatcher ...
func GetSingleDirectEventDispatcher() EventDispatcher {
if directEventDispatcher == nil {
directEventDispatcher = NewDirectEventDispatcher()
}
return directEventDispatcher
}
...@@ -33,16 +33,16 @@ type Listenable interface { ...@@ -33,16 +33,16 @@ type Listenable interface {
type BaseListenable struct { type BaseListenable struct {
Listenable Listenable
listenersCache sync.Map ListenersCache sync.Map
mutex sync.Mutex Mutex sync.Mutex
} }
func (bl *BaseListenable) AddEventListener(listener EventListener) { func (bl *BaseListenable) AddEventListener(listener EventListener) {
eventType := listener.GetEventType() eventType := listener.GetEventType()
var listenersSlice []EventListener var listenersSlice []EventListener
bl.mutex.Lock() bl.Mutex.Lock()
defer bl.mutex.Unlock() defer bl.Mutex.Unlock()
if value, loaded := bl.listenersCache.Load(eventType); loaded { if value, loaded := bl.ListenersCache.Load(eventType); loaded {
listenersSlice = value.([]EventListener) listenersSlice = value.([]EventListener)
if !containListener(listenersSlice, listener) { if !containListener(listenersSlice, listener) {
listenersSlice = append(listenersSlice, listener) listenersSlice = append(listenersSlice, listener)
...@@ -54,12 +54,10 @@ func (bl *BaseListenable) AddEventListener(listener EventListener) { ...@@ -54,12 +54,10 @@ func (bl *BaseListenable) AddEventListener(listener EventListener) {
sort.Slice(listenersSlice, func(i, j int) bool { sort.Slice(listenersSlice, func(i, j int) bool {
return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority() return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority()
}) })
bl.listenersCache.Store(eventType, listenersSlice) bl.ListenersCache.Store(eventType, listenersSlice)
} }
func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) {
bl.mutex.Lock()
defer bl.mutex.Unlock()
for _, listener := range listenersSlice { for _, listener := range listenersSlice {
bl.AddEventListener(listener) bl.AddEventListener(listener)
} }
...@@ -67,9 +65,9 @@ func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { ...@@ -67,9 +65,9 @@ func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) {
func (bl *BaseListenable) RemoveEventListener(listener EventListener) { func (bl *BaseListenable) RemoveEventListener(listener EventListener) {
eventType := listener.GetEventType() eventType := listener.GetEventType()
bl.mutex.Lock() bl.Mutex.Lock()
defer bl.mutex.Unlock() defer bl.Mutex.Unlock()
value, loaded := bl.listenersCache.Load(eventType) value, loaded := bl.ListenersCache.Load(eventType)
if !loaded { if !loaded {
return return
} }
...@@ -82,24 +80,20 @@ func (bl *BaseListenable) RemoveEventListener(listener EventListener) { ...@@ -82,24 +80,20 @@ func (bl *BaseListenable) RemoveEventListener(listener EventListener) {
} }
func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) { func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) {
bl.mutex.Lock()
defer bl.mutex.Unlock()
for _, listener := range listenersSlice { for _, listener := range listenersSlice {
bl.RemoveEventListener(listener) bl.RemoveEventListener(listener)
} }
} }
func (bl *BaseListenable) RemoveAllEventListeners() { func (bl *BaseListenable) RemoveAllEventListeners() {
bl.mutex.Lock() bl.Mutex.Lock()
defer bl.mutex.Unlock() defer bl.Mutex.Unlock()
bl.listenersCache = *new(sync.Map) bl.ListenersCache = *new(sync.Map)
} }
func (bl *BaseListenable) GetAllEventListeners() []EventListener { func (bl *BaseListenable) GetAllEventListeners() []EventListener {
bl.mutex.Lock()
defer bl.mutex.Unlock()
allListenersSlice := make([]EventListener, 0, 16) allListenersSlice := make([]EventListener, 0, 16)
bl.listenersCache.Range(func(_, value interface{}) bool { bl.ListenersCache.Range(func(_, value interface{}) bool {
listenersSlice := value.([]EventListener) listenersSlice := value.([]EventListener)
allListenersSlice = append(allListenersSlice, listenersSlice...) allListenersSlice = append(allListenersSlice, listenersSlice...)
return true return true
......
...@@ -18,13 +18,14 @@ ...@@ -18,13 +18,14 @@
package observer package observer
import ( import (
"github.com/apache/dubbo-go/registry/listener"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing" "testing"
) )
func TestListenable(t *testing.T) { func TestListenable(t *testing.T) {
var b EventListener = &ServiceInstancesChangedListener{} var b EventListener = &listener.ServiceInstancesChangedListener{}
var a EventListener = &ServiceInstancesChangedListener{} var a EventListener = &listener.ServiceInstancesChangedListener{}
assert.True(t, b == a) assert.True(t, b == a)
} }
...@@ -42,14 +42,13 @@ type multiConfiger interface { ...@@ -42,14 +42,13 @@ type multiConfiger interface {
// BaseConfig is the common configuration for provider and consumer // BaseConfig is the common configuration for provider and consumer
type BaseConfig struct { type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL configCenterUrl *common.URL
prefix string prefix string
fatherConfig interface{} fatherConfig interface{}
eventDispatcherType string `yaml:"direct" json:"direct,omitempty"`
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
fileStream *bytes.Buffer
fileStream *bytes.Buffer
} }
// startConfigCenter will start the config center. // startConfigCenter will start the config center.
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
_ "github.com/apache/dubbo-go/common/observer/dispatcher"
) )
var ( var (
...@@ -91,6 +92,17 @@ func Load() { ...@@ -91,6 +92,17 @@ func Load() {
} }
} }
var eventDispatcherType string
if consumerConfig != nil {
eventDispatcherType = consumerConfig.eventDispatcherType
}
// notice consumerConfig.eventDispatcherType will be replaced
if providerConfig != nil {
eventDispatcherType = providerConfig.eventDispatcherType
}
// init EventDispatcher
extension.SetAndInitGlobalDispatcher(eventDispatcherType)
// reference config // reference config
if consumerConfig == nil { if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!") logger.Warnf("consumerConfig is nil!")
......
...@@ -18,11 +18,16 @@ ...@@ -18,11 +18,16 @@
package listener package listener
import ( import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/event" "github.com/apache/dubbo-go/common/observer/event"
"reflect" "reflect"
) )
func init() {
extension.AddEventListener(&ServiceInstancesChangedListener{})
}
// TODO (implement ConditionalEventListener) // TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct { type ServiceInstancesChangedListener struct {
observer.EventListener observer.EventListener
......
...@@ -29,7 +29,7 @@ import ( ...@@ -29,7 +29,7 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/observer/event" "github.com/apache/dubbo-go/common/observer/event"
eventlistener "github.com/apache/dubbo-go/common/observer/listener" eventlistener "github.com/apache/dubbo-go/registry/listener"
) )
type ServiceDiscovery interface { type ServiceDiscovery interface {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment