Skip to content
Snippets Groups Projects
Commit 37772947 authored by Patrick's avatar Patrick
Browse files

align 2.7.8: direct event dispatcher

parent 8f876683
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import "github.com/apache/dubbo-go/common/observer"
func GetDispatcherEvent(name string) {
}
func SetDefaultDispatcherEvent() {
}
func AddEventListener(listener observer.EventListener) {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"github.com/apache/dubbo-go/common/logger"
"reflect"
)
var directEventDispatcher *DirectEventDispatcher
// DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
// Dispatcher event to listener direct
type DirectEventDispatcher struct {
BaseListenable
EventDispatcher
}
func NewDirectEventDispatcher() *DirectEventDispatcher {
return &DirectEventDispatcher{}
}
func (ded *DirectEventDispatcher) Dispatch(event Event) {
eventType := reflect.TypeOf(event).Elem()
value, loaded := ded.listenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]EventListener)
for _, listener := range listenersSlice {
if err := listener.OnEvent(event); err != nil {
logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err)
}
}
}
// GetSingleDirectEventDispatcher ...
func GetSingleDirectEventDispatcher() EventDispatcher {
if directEventDispatcher == nil {
directEventDispatcher = NewDirectEventDispatcher()
}
return directEventDispatcher
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Event is align with Event interface in Java.
// it's the top abstraction
// Align with 2.7.5
type Event interface {
fmt.Stringer
GetSource() interface{}
GetTimestamp() time.Time
}
// baseEvent is the base implementation of Event
// You should never use it directly
type BaseEvent struct {
source interface{}
timestamp time.Time
}
// GetSource return the source
func (b *BaseEvent) GetSource() interface{} {
return b.source
}
// GetTimestamp return the timestamp when the event is created
func (b *BaseEvent) GetTimestamp() time.Time {
return b.timestamp
}
// String return a human readable string representing this event
func (b *BaseEvent) String() string {
return fmt.Sprintf("baseEvent[source = %#v]", b.source)
}
func newBaseEvent(source interface{}) *BaseEvent {
return &BaseEvent{
source: source,
timestamp: time.Now(),
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"fmt"
"github.com/apache/dubbo-go/common/observer"
)
// ServiceInstancesChangedEvent represents service instances make some changing
type ServiceInstancesChangedEvent struct {
fmt.Stringer
observer.BaseEvent
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
// EventDispatcher is align with EventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
type EventDispatcher interface {
Listenable
// Dispatch event
Dispatch(event Event)
}
......@@ -15,10 +15,11 @@
* limitations under the License.
*/
package registry
package observer
import (
gxsort "github.com/dubbogo/gost/sort"
"reflect"
)
// EventListener is an new interface used to align with dubbo 2.7.5
......@@ -27,6 +28,8 @@ type EventListener interface {
gxsort.Prioritizer
// OnEvent handle this event
OnEvent(e Event) error
// GetEventType listen which event type
GetEventType() reflect.Type
}
// ConditionalEventListener only handle the event which it can handle
......@@ -35,7 +38,3 @@ type ConditionalEventListener interface {
// Accept will make the decision whether it should handle this event
Accept(e Event) bool
}
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"sort"
"sync"
)
type Listenable interface {
AddEventListener(listener EventListener)
AddEventListeners(listenersSlice []EventListener)
RemoveEventListener(listener EventListener)
RemoveEventListeners(listenersSlice []EventListener)
GetAllEventListeners() []EventListener
RemoveAllEventListeners()
}
type BaseListenable struct {
Listenable
listenersCache sync.Map
mutex sync.Mutex
}
func (bl *BaseListenable) AddEventListener(listener EventListener) {
eventType := listener.GetEventType()
var listenersSlice []EventListener
bl.mutex.Lock()
defer bl.mutex.Unlock()
if value, loaded := bl.listenersCache.Load(eventType); loaded {
listenersSlice = value.([]EventListener)
if !containListener(listenersSlice, listener) {
listenersSlice = append(listenersSlice, listener)
}
} else {
listenersSlice = make([]EventListener, 0, 8)
listenersSlice = append(listenersSlice, listener)
}
sort.Slice(listenersSlice, func(i, j int) bool {
return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority()
})
bl.listenersCache.Store(eventType, listenersSlice)
}
func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) {
bl.mutex.Lock()
defer bl.mutex.Unlock()
for _, listener := range listenersSlice {
bl.AddEventListener(listener)
}
}
func (bl *BaseListenable) RemoveEventListener(listener EventListener) {
eventType := listener.GetEventType()
bl.mutex.Lock()
defer bl.mutex.Unlock()
value, loaded := bl.listenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]EventListener)
for i, listener := range listenersSlice {
if listener == listener {
listenersSlice = append(listenersSlice[:i], listenersSlice[i+1:]...)
}
}
}
func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) {
bl.mutex.Lock()
defer bl.mutex.Unlock()
for _, listener := range listenersSlice {
bl.RemoveEventListener(listener)
}
}
func (bl *BaseListenable) RemoveAllEventListeners() {
bl.mutex.Lock()
defer bl.mutex.Unlock()
bl.listenersCache = *new(sync.Map)
}
func (bl *BaseListenable) GetAllEventListeners() []EventListener {
bl.mutex.Lock()
defer bl.mutex.Unlock()
allListenersSlice := make([]EventListener, 0, 16)
bl.listenersCache.Range(func(_, value interface{}) bool {
listenersSlice := value.([]EventListener)
allListenersSlice = append(allListenersSlice, listenersSlice...)
return true
})
sort.Slice(allListenersSlice, func(i, j int) bool {
return allListenersSlice[i].GetPriority() < allListenersSlice[j].GetPriority()
})
return allListenersSlice
}
func containListener(listenersSlice []EventListener, listener EventListener) bool {
for _, loadListener := range listenersSlice {
if loadListener == listener {
return true
}
}
return false
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestListenable(t *testing.T) {
var b EventListener = &ServiceInstancesChangedListener{}
var a EventListener = &ServiceInstancesChangedListener{}
assert.True(t, b == a)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package listener
import (
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/event"
"reflect"
)
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
observer.EventListener
}
func (sicl *ServiceInstancesChangedListener) OnEvent(e observer.Event) error {
return nil
}
func (sicl *ServiceInstancesChangedListener) GetPriority() int {
return -1
}
func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type {
return reflect.TypeOf(&event.ServiceInstancesChangedEvent{})
}
......@@ -45,47 +45,3 @@ type ServiceEvent struct {
func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service)
}
// Event is align with Event interface in Java.
// it's the top abstraction
// Align with 2.7.5
type Event interface {
fmt.Stringer
GetSource() interface{}
GetTimestamp() time.Time
}
// baseEvent is the base implementation of Event
// You should never use it directly
type baseEvent struct {
source interface{}
timestamp time.Time
}
// GetSource return the source
func (b *baseEvent) GetSource() interface{} {
return b.source
}
// GetTimestamp return the timestamp when the event is created
func (b *baseEvent) GetTimestamp() time.Time {
return b.timestamp
}
// String return a human readable string representing this event
func (b *baseEvent) String() string {
return fmt.Sprintf("baseEvent[source = %#v]", b.source)
}
func newBaseEvent(source interface{}) *baseEvent {
return &baseEvent{
source: source,
timestamp: time.Now(),
}
}
// ServiceInstancesChangedEvent represents service instances make some changing
type ServiceInstancesChangedEvent struct {
fmt.Stringer
baseEvent
}
......@@ -28,6 +28,8 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/observer/event"
eventlistener "github.com/apache/dubbo-go/common/observer/listener"
)
type ServiceDiscovery interface {
......@@ -78,7 +80,7 @@ type ServiceDiscovery interface {
// ----------------- event ----------------------
// AddListener adds a new ServiceInstancesChangedListener
AddListener(listener *ServiceInstancesChangedListener) error
AddListener(listener *eventlistener.ServiceInstancesChangedListener) error
// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
DispatchEventByServiceName(serviceName string) error
......@@ -87,5 +89,5 @@ type ServiceDiscovery interface {
DispatchEventForInstances(serviceName string, instances []ServiceInstance) error
// DispatchEvent dispatches the event
DispatchEvent(event ServiceInstancesChangedEvent) error
DispatchEvent(event event.ServiceInstancesChangedEvent) error
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment