diff --git a/common/constant/key.go b/common/constant/key.go index 1479af2305b1ce4280c2aa7f4016e314ac358513..16986c7421345318d3477adca59de800b9d9c415 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -40,6 +40,7 @@ const ( TOKEN_KEY = "token" LOCAL_ADDR = "local-addr" REMOTE_ADDR = "remote-addr" + PATH_SEPARATOR = "/" DUBBO_KEY = "dubbo" RELEASE_KEY = "release" ANYHOST_KEY = "anyhost" diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go new file mode 100644 index 0000000000000000000000000000000000000000..2d33528259e85f50a0784aceba37ecd87be61620 --- /dev/null +++ b/common/extension/event_dispatcher.go @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" +) + +var ( + globalEventDispatcher observer.EventDispatcher + initEventListeners []observer.EventListener +) + +var ( + dispatchers = make(map[string]func() observer.EventDispatcher, 8) +) + +// SetEventDispatcher by name +func SetEventDispatcher(name string, v func() observer.EventDispatcher) { + dispatchers[name] = v +} + +// SetAndInitGlobalDispatcher +func SetAndInitGlobalDispatcher(name string) { + if len(name) == 0 { + name = "direct" + } + if globalEventDispatcher != nil { + logger.Warnf("EventDispatcher already init. It will be replaced") + } + if dp, ok := dispatchers[name]; !ok || dp == nil { + panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.") + } + globalEventDispatcher = dispatchers[name]() + globalEventDispatcher.AddEventListeners(initEventListeners) +} + +// GetGlobalDispatcher +func GetGlobalDispatcher() observer.EventDispatcher { + return globalEventDispatcher +} + +// AddEventListener it will be added in global event dispatcher +func AddEventListener(listener observer.EventListener) { + initEventListeners = append(initEventListeners, listener) +} diff --git a/common/observer/dispatcher/direct_event_dispatcher.go b/common/observer/dispatcher/direct_event_dispatcher.go new file mode 100644 index 0000000000000000000000000000000000000000..2b7567b47ed40caa8867901ff0a05e0a2497cd31 --- /dev/null +++ b/common/observer/dispatcher/direct_event_dispatcher.go @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dispatcher + +import ( + "reflect" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" +) + +func init() { + extension.SetEventDispatcher("direct", NewDirectEventDispatcher) +} + +// DirectEventDispatcher is align with DirectEventDispatcher interface in Java. +// it's the top abstraction +// Align with 2.7.5 +// Dispatcher event to listener direct +type DirectEventDispatcher struct { + observer.BaseListenable +} + +// NewDirectEventDispatcher ac constructor of DirectEventDispatcher +func NewDirectEventDispatcher() observer.EventDispatcher { + return &DirectEventDispatcher{} +} + +// Dispatch event directly +func (ded *DirectEventDispatcher) Dispatch(event observer.Event) { + if event == nil { + logger.Warnf("[DirectEventDispatcher] dispatch event nil") + return + } + eventType := reflect.TypeOf(event).Elem() + value, loaded := ded.ListenersCache.Load(eventType) + if !loaded { + return + } + listenersSlice := value.([]observer.EventListener) + for _, listener := range listenersSlice { + if err := listener.OnEvent(event); err != nil { + logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err) + } + } +} diff --git a/common/observer/dispatcher/direct_event_dispatcher_test.go b/common/observer/dispatcher/direct_event_dispatcher_test.go new file mode 100644 index 0000000000000000000000000000000000000000..355c930a9e86dbe6e82adc2795f8590c15a473c2 --- /dev/null +++ b/common/observer/dispatcher/direct_event_dispatcher_test.go @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dispatcher + +import ( + "fmt" + "reflect" + "testing" +) + +import ( + "github.com/apache/dubbo-go/common/observer" +) + +func TestDirectEventDispatcher_Dispatch(t *testing.T) { + ded := NewDirectEventDispatcher() + ded.AddEventListener(&TestEventListener{}) + ded.AddEventListener(&TestEventListener1{}) + ded.Dispatch(&TestEvent{}) + ded.Dispatch(nil) +} + +type TestEvent struct { + observer.BaseEvent +} + +type TestEventListener struct { + observer.BaseListenable + observer.EventListener +} + +func (tel *TestEventListener) OnEvent(e observer.Event) error { + fmt.Println("TestEventListener") + return nil +} + +func (tel *TestEventListener) GetPriority() int { + return -1 +} + +func (tel *TestEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(&TestEvent{}) +} + +type TestEventListener1 struct { + observer.EventListener +} + +func (tel *TestEventListener1) OnEvent(e observer.Event) error { + fmt.Println("TestEventListener1") + return nil +} + +func (tel *TestEventListener1) GetPriority() int { + return 1 +} + +func (tel *TestEventListener1) GetEventType() reflect.Type { + return reflect.TypeOf(TestEvent{}) +} diff --git a/common/observer/event.go b/common/observer/event.go new file mode 100644 index 0000000000000000000000000000000000000000..8c3362feeee3d62315eb734460f486dcdbfe2f36 --- /dev/null +++ b/common/observer/event.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observer + +import ( + "fmt" + "math/rand" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Event is align with Event interface in Java. +// it's the top abstraction +// Align with 2.7.5 +type Event interface { + fmt.Stringer + GetSource() interface{} + GetTimestamp() time.Time +} + +// BaseEvent is the base implementation of Event +// You should never use it directly +type BaseEvent struct { + Source interface{} + Timestamp time.Time +} + +// GetSource return the source +func (b *BaseEvent) GetSource() interface{} { + return b.Source +} + +// GetTimestamp return the Timestamp when the event is created +func (b *BaseEvent) GetTimestamp() time.Time { + return b.Timestamp +} + +// String return a human readable string representing this event +func (b *BaseEvent) String() string { + return fmt.Sprintf("BaseEvent[source = %#v]", b.Source) +} + +func newBaseEvent(source interface{}) *BaseEvent { + return &BaseEvent{ + Source: source, + Timestamp: time.Now(), + } +} diff --git a/common/observer/event_dispatcher.go b/common/observer/event_dispatcher.go new file mode 100644 index 0000000000000000000000000000000000000000..17745e68c07065f954d456914fb276fadf6d975d --- /dev/null +++ b/common/observer/event_dispatcher.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observer + +// EventDispatcher is align with EventDispatcher interface in Java. +// it's the top abstraction +// Align with 2.7.5 +type EventDispatcher interface { + Listenable + // Dispatch event + Dispatch(event Event) +} diff --git a/common/observer/event_listener.go b/common/observer/event_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..8db60d8475da49262947329fc71fd8e364d8d0af --- /dev/null +++ b/common/observer/event_listener.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observer + +import ( + "reflect" +) + +import ( + gxsort "github.com/dubbogo/gost/sort" +) + +// EventListener is an new interface used to align with dubbo 2.7.5 +// It contains the Prioritized means that the listener has its priority +type EventListener interface { + gxsort.Prioritizer + // OnEvent handle this event + OnEvent(e Event) error + // GetEventType listen which event type + GetEventType() reflect.Type +} + +// ConditionalEventListener only handle the event which it can handle +type ConditionalEventListener interface { + EventListener + // Accept will make the decision whether it should handle this event + Accept(e Event) bool +} + +// TODO (implement ConditionalEventListener) +type ServiceInstancesChangedListener struct { + ServiceName string +} diff --git a/common/observer/listenable.go b/common/observer/listenable.go new file mode 100644 index 0000000000000000000000000000000000000000..7b64aa8f2d263793c7aa4dc5e294466c48cd7b36 --- /dev/null +++ b/common/observer/listenable.go @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observer + +import ( + "reflect" + "sort" + "sync" +) + +// Listenable could add and remove the event listener +type Listenable interface { + AddEventListener(listener EventListener) + AddEventListeners(listenersSlice []EventListener) + RemoveEventListener(listener EventListener) + RemoveEventListeners(listenersSlice []EventListener) + GetAllEventListeners() []EventListener + RemoveAllEventListeners() +} + +// BaseListenable base listenable +type BaseListenable struct { + Listenable + ListenersCache sync.Map + Mutex sync.Mutex +} + +// NewBaseListenable a constructor of base listenable +func NewBaseListenable() Listenable { + return &BaseListenable{} +} + +// AddEventListener add event listener +func (bl *BaseListenable) AddEventListener(listener EventListener) { + eventType := listener.GetEventType() + if eventType.Kind() == reflect.Ptr { + eventType = eventType.Elem() + } + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + value, loaded := bl.ListenersCache.LoadOrStore(eventType, make([]EventListener, 0, 8)) + listenersSlice := value.([]EventListener) + // return if listenersSlice already has this listener + if loaded && containListener(listenersSlice, listener) { + return + } + listenersSlice = append(listenersSlice, listener) + sort.Slice(listenersSlice, func(i, j int) bool { + return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority() + }) + bl.ListenersCache.Store(eventType, listenersSlice) +} + +// AddEventListeners add the slice of event listener +func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) { + for _, listener := range listenersSlice { + bl.AddEventListener(listener) + } +} + +// RemoveEventListener remove the event listener +func (bl *BaseListenable) RemoveEventListener(listener EventListener) { + eventType := listener.GetEventType() + if eventType.Kind() == reflect.Ptr { + eventType = eventType.Elem() + } + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + value, loaded := bl.ListenersCache.Load(eventType) + if !loaded { + return + } + listenersSlice := value.([]EventListener) + for i, l := range listenersSlice { + if l == listener { + listenersSlice = append(listenersSlice[:i], listenersSlice[i+1:]...) + } + } + bl.ListenersCache.Store(eventType, listenersSlice) +} + +// RemoveEventListeners remove the slice of event listener +func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) { + for _, listener := range listenersSlice { + bl.RemoveEventListener(listener) + } +} + +// RemoveAllEventListeners remove all +func (bl *BaseListenable) RemoveAllEventListeners() { + bl.Mutex.Lock() + defer bl.Mutex.Unlock() + bl.ListenersCache = sync.Map{} +} + +// GetAllEventListeners get all +func (bl *BaseListenable) GetAllEventListeners() []EventListener { + allListenersSlice := make([]EventListener, 0, 16) + bl.ListenersCache.Range(func(_, value interface{}) bool { + listenersSlice := value.([]EventListener) + allListenersSlice = append(allListenersSlice, listenersSlice...) + return true + }) + sort.Slice(allListenersSlice, func(i, j int) bool { + return allListenersSlice[i].GetPriority() < allListenersSlice[j].GetPriority() + }) + return allListenersSlice +} + +// containListener true if contain listener +func containListener(listenersSlice []EventListener, listener EventListener) bool { + for _, loadListener := range listenersSlice { + if loadListener == listener { + return true + } + } + return false +} diff --git a/common/observer/listenable_test.go b/common/observer/listenable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..df46bfc2ba47f6e447074b44208a809949f7ae3d --- /dev/null +++ b/common/observer/listenable_test.go @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package observer + +import ( + "reflect" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestListenable(t *testing.T) { + el := &TestEventListener{} + b := &BaseListenable{} + b.AddEventListener(el) + b.AddEventListener(el) + al := b.GetAllEventListeners() + assert.Equal(t, len(al), 1) + assert.Equal(t, al[0].GetEventType(), reflect.TypeOf(TestEvent{})) + b.RemoveEventListener(el) + assert.Equal(t, len(b.GetAllEventListeners()), 0) + var ts []EventListener + ts = append(ts, el) + b.AddEventListeners(ts) + assert.Equal(t, len(al), 1) + +} + +type TestEvent struct { + BaseEvent +} + +type TestEventListener struct { + EventListener +} + +func (tel *TestEventListener) OnEvent(e Event) error { + return nil +} + +func (tel *TestEventListener) GetPriority() int { + return -1 +} + +func (tel *TestEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(TestEvent{}) +} diff --git a/config/base_config.go b/config/base_config.go index 93c0ce6a6692193e7ea7b1b9f2f74e9eaed0c858..f58138d2e58a1b3da06894d3afa4728ea2ecf8fb 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -42,14 +42,13 @@ type multiConfiger interface { // BaseConfig is the common configuration for provider and consumer type BaseConfig struct { - ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` - configCenterUrl *common.URL - prefix string - fatherConfig interface{} - - MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` - - fileStream *bytes.Buffer + ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` + configCenterUrl *common.URL + prefix string + fatherConfig interface{} + eventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` + MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` + fileStream *bytes.Buffer } // startConfigCenter will start the config center. diff --git a/config/config_loader.go b/config/config_loader.go index 61cb49457b7a2435cb22f02a9df0a02a71ae68cb..11107e83e1c6660c5265bdbe31851e2380dfdd00 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -33,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + _ "github.com/apache/dubbo-go/common/observer/dispatcher" ) var ( @@ -91,6 +92,17 @@ func Load() { } } + var eventDispatcherType string + if consumerConfig != nil { + eventDispatcherType = consumerConfig.eventDispatcherType + } + // notice consumerConfig.eventDispatcherType will be replaced + if providerConfig != nil { + eventDispatcherType = providerConfig.eventDispatcherType + } + // init EventDispatcher should before everything + extension.SetAndInitGlobalDispatcher(eventDispatcherType) + // reference config if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") diff --git a/config/metadata_report_config_test.go b/config/metadata_report_config_test.go index d6b08d5fb0c51495940d4dc021a0796c1d577923..635feecc2d433366534566d184e058eb54a881ed 100644 --- a/config/metadata_report_config_test.go +++ b/config/metadata_report_config_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package config import "testing" diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go index fd3a290b41e870674366943e12a396c3dae7e238..321a216a3e3ad3f2390ab832782924a81e226160 100644 --- a/metadata/identifier/subscribe_metadata_identifier.go +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package identifier type SubscriberMetadataIdentifier struct { diff --git a/registry/event.go b/registry/event.go index be9f11d00bb5a70b0d787d15bcdc98471aad0a4b..6f647185cc213b80b9ab25b4702f91b36aa8ad4b 100644 --- a/registry/event.go +++ b/registry/event.go @@ -25,6 +25,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/remoting" ) @@ -47,47 +48,9 @@ func (e ServiceEvent) String() string { return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service) } -// Event is align with Event interface in Java. -// it's the top abstraction -// Align with 2.7.5 -type Event interface { - fmt.Stringer - GetSource() interface{} - GetTimestamp() time.Time -} - -// baseEvent is the base implementation of Event -// You should never use it directly -type baseEvent struct { - source interface{} - timestamp time.Time -} - -// GetSource return the source -func (b *baseEvent) GetSource() interface{} { - return b.source -} - -// GetTimestamp return the timestamp when the event is created -func (b *baseEvent) GetTimestamp() time.Time { - return b.timestamp -} - -// String return a human readable string representing this event -func (b *baseEvent) String() string { - return fmt.Sprintf("baseEvent[source = %#v]", b.source) -} - -func newBaseEvent(source interface{}) *baseEvent { - return &baseEvent{ - source: source, - timestamp: time.Now(), - } -} - // ServiceInstancesChangedEvent represents service instances make some changing type ServiceInstancesChangedEvent struct { - baseEvent + observer.BaseEvent ServiceName string Instances []ServiceInstance } @@ -100,9 +63,9 @@ func (s *ServiceInstancesChangedEvent) String() string { // NewServiceInstancesChangedEvent will create the ServiceInstanceChangedEvent instance func NewServiceInstancesChangedEvent(serviceName string, instances []ServiceInstance) *ServiceInstancesChangedEvent { return &ServiceInstancesChangedEvent{ - baseEvent: baseEvent{ - source: serviceName, - timestamp: time.Now(), + BaseEvent: observer.BaseEvent{ + Source: serviceName, + Timestamp: time.Now(), }, ServiceName: serviceName, Instances: instances, diff --git a/registry/event_listener.go b/registry/event_listener.go index b8d6148442d9e10e210958dead690c4a95b33fb6..0aaa081a4488de5c291485277cdc0fc14ef4e380 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -18,25 +18,27 @@ package registry import ( - gxsort "github.com/dubbogo/gost/sort" + "reflect" ) -// EventListener is an new interface used to align with dubbo 2.7.5 -// It contains the Prioritized means that the listener has its priority -type EventListener interface { - gxsort.Prioritizer - // OnEvent handle this event - OnEvent(e Event) error -} - -// ConditionalEventListener only handle the event which it can handle -type ConditionalEventListener interface { - EventListener - // Accept will make the decision whether it should handle this event - Accept(e Event) bool -} +import ( + "github.com/apache/dubbo-go/common/observer" +) // TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { ServiceName string + observer.EventListener +} + +func (sicl *ServiceInstancesChangedListener) OnEvent(e observer.Event) error { + return nil +} + +func (sicl *ServiceInstancesChangedListener) GetPriority() int { + return -1 +} + +func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type { + return reflect.TypeOf(&ServiceInstancesChangedEvent{}) }