Skip to content
Snippets Groups Projects
Commit f8e27d5f authored by vito.he's avatar vito.he
Browse files

Merge branch 'feature/dubbo-2.7.5' into metadata_report

parents d544ae81 2643562d
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,7 @@ const (
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
PATH_SEPARATOR = "/"
DUBBO_KEY = "dubbo"
RELEASE_KEY = "release"
ANYHOST_KEY = "anyhost"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)
var (
globalEventDispatcher observer.EventDispatcher
initEventListeners []observer.EventListener
)
var (
dispatchers = make(map[string]func() observer.EventDispatcher, 8)
)
// SetEventDispatcher by name
func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
dispatchers[name] = v
}
// SetAndInitGlobalDispatcher
func SetAndInitGlobalDispatcher(name string) {
if len(name) == 0 {
name = "direct"
}
if globalEventDispatcher != nil {
logger.Warnf("EventDispatcher already init. It will be replaced")
}
if dp, ok := dispatchers[name]; !ok || dp == nil {
panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
}
globalEventDispatcher = dispatchers[name]()
globalEventDispatcher.AddEventListeners(initEventListeners)
}
// GetGlobalDispatcher
func GetGlobalDispatcher() observer.EventDispatcher {
return globalEventDispatcher
}
// AddEventListener it will be added in global event dispatcher
func AddEventListener(listener observer.EventListener) {
initEventListeners = append(initEventListeners, listener)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dispatcher
import (
"reflect"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)
func init() {
extension.SetEventDispatcher("direct", NewDirectEventDispatcher)
}
// DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
// Dispatcher event to listener direct
type DirectEventDispatcher struct {
observer.BaseListenable
}
// NewDirectEventDispatcher ac constructor of DirectEventDispatcher
func NewDirectEventDispatcher() observer.EventDispatcher {
return &DirectEventDispatcher{}
}
// Dispatch event directly
func (ded *DirectEventDispatcher) Dispatch(event observer.Event) {
if event == nil {
logger.Warnf("[DirectEventDispatcher] dispatch event nil")
return
}
eventType := reflect.TypeOf(event).Elem()
value, loaded := ded.ListenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]observer.EventListener)
for _, listener := range listenersSlice {
if err := listener.OnEvent(event); err != nil {
logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dispatcher
import (
"fmt"
"reflect"
"testing"
)
import (
"github.com/apache/dubbo-go/common/observer"
)
func TestDirectEventDispatcher_Dispatch(t *testing.T) {
ded := NewDirectEventDispatcher()
ded.AddEventListener(&TestEventListener{})
ded.AddEventListener(&TestEventListener1{})
ded.Dispatch(&TestEvent{})
ded.Dispatch(nil)
}
type TestEvent struct {
observer.BaseEvent
}
type TestEventListener struct {
observer.BaseListenable
observer.EventListener
}
func (tel *TestEventListener) OnEvent(e observer.Event) error {
fmt.Println("TestEventListener")
return nil
}
func (tel *TestEventListener) GetPriority() int {
return -1
}
func (tel *TestEventListener) GetEventType() reflect.Type {
return reflect.TypeOf(&TestEvent{})
}
type TestEventListener1 struct {
observer.EventListener
}
func (tel *TestEventListener1) OnEvent(e observer.Event) error {
fmt.Println("TestEventListener1")
return nil
}
func (tel *TestEventListener1) GetPriority() int {
return 1
}
func (tel *TestEventListener1) GetEventType() reflect.Type {
return reflect.TypeOf(TestEvent{})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Event is align with Event interface in Java.
// it's the top abstraction
// Align with 2.7.5
type Event interface {
fmt.Stringer
GetSource() interface{}
GetTimestamp() time.Time
}
// BaseEvent is the base implementation of Event
// You should never use it directly
type BaseEvent struct {
Source interface{}
Timestamp time.Time
}
// GetSource return the source
func (b *BaseEvent) GetSource() interface{} {
return b.Source
}
// GetTimestamp return the Timestamp when the event is created
func (b *BaseEvent) GetTimestamp() time.Time {
return b.Timestamp
}
// String return a human readable string representing this event
func (b *BaseEvent) String() string {
return fmt.Sprintf("BaseEvent[source = %#v]", b.Source)
}
func newBaseEvent(source interface{}) *BaseEvent {
return &BaseEvent{
Source: source,
Timestamp: time.Now(),
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
// EventDispatcher is align with EventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
type EventDispatcher interface {
Listenable
// Dispatch event
Dispatch(event Event)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"reflect"
)
import (
gxsort "github.com/dubbogo/gost/sort"
)
// EventListener is an new interface used to align with dubbo 2.7.5
// It contains the Prioritized means that the listener has its priority
type EventListener interface {
gxsort.Prioritizer
// OnEvent handle this event
OnEvent(e Event) error
// GetEventType listen which event type
GetEventType() reflect.Type
}
// ConditionalEventListener only handle the event which it can handle
type ConditionalEventListener interface {
EventListener
// Accept will make the decision whether it should handle this event
Accept(e Event) bool
}
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"reflect"
"sort"
"sync"
)
// Listenable could add and remove the event listener
type Listenable interface {
AddEventListener(listener EventListener)
AddEventListeners(listenersSlice []EventListener)
RemoveEventListener(listener EventListener)
RemoveEventListeners(listenersSlice []EventListener)
GetAllEventListeners() []EventListener
RemoveAllEventListeners()
}
// BaseListenable base listenable
type BaseListenable struct {
Listenable
ListenersCache sync.Map
Mutex sync.Mutex
}
// NewBaseListenable a constructor of base listenable
func NewBaseListenable() Listenable {
return &BaseListenable{}
}
// AddEventListener add event listener
func (bl *BaseListenable) AddEventListener(listener EventListener) {
eventType := listener.GetEventType()
if eventType.Kind() == reflect.Ptr {
eventType = eventType.Elem()
}
bl.Mutex.Lock()
defer bl.Mutex.Unlock()
value, loaded := bl.ListenersCache.LoadOrStore(eventType, make([]EventListener, 0, 8))
listenersSlice := value.([]EventListener)
// return if listenersSlice already has this listener
if loaded && containListener(listenersSlice, listener) {
return
}
listenersSlice = append(listenersSlice, listener)
sort.Slice(listenersSlice, func(i, j int) bool {
return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority()
})
bl.ListenersCache.Store(eventType, listenersSlice)
}
// AddEventListeners add the slice of event listener
func (bl *BaseListenable) AddEventListeners(listenersSlice []EventListener) {
for _, listener := range listenersSlice {
bl.AddEventListener(listener)
}
}
// RemoveEventListener remove the event listener
func (bl *BaseListenable) RemoveEventListener(listener EventListener) {
eventType := listener.GetEventType()
if eventType.Kind() == reflect.Ptr {
eventType = eventType.Elem()
}
bl.Mutex.Lock()
defer bl.Mutex.Unlock()
value, loaded := bl.ListenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]EventListener)
for i, l := range listenersSlice {
if l == listener {
listenersSlice = append(listenersSlice[:i], listenersSlice[i+1:]...)
}
}
bl.ListenersCache.Store(eventType, listenersSlice)
}
// RemoveEventListeners remove the slice of event listener
func (bl *BaseListenable) RemoveEventListeners(listenersSlice []EventListener) {
for _, listener := range listenersSlice {
bl.RemoveEventListener(listener)
}
}
// RemoveAllEventListeners remove all
func (bl *BaseListenable) RemoveAllEventListeners() {
bl.Mutex.Lock()
defer bl.Mutex.Unlock()
bl.ListenersCache = sync.Map{}
}
// GetAllEventListeners get all
func (bl *BaseListenable) GetAllEventListeners() []EventListener {
allListenersSlice := make([]EventListener, 0, 16)
bl.ListenersCache.Range(func(_, value interface{}) bool {
listenersSlice := value.([]EventListener)
allListenersSlice = append(allListenersSlice, listenersSlice...)
return true
})
sort.Slice(allListenersSlice, func(i, j int) bool {
return allListenersSlice[i].GetPriority() < allListenersSlice[j].GetPriority()
})
return allListenersSlice
}
// containListener true if contain listener
func containListener(listenersSlice []EventListener, listener EventListener) bool {
for _, loadListener := range listenersSlice {
if loadListener == listener {
return true
}
}
return false
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observer
import (
"reflect"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestListenable(t *testing.T) {
el := &TestEventListener{}
b := &BaseListenable{}
b.AddEventListener(el)
b.AddEventListener(el)
al := b.GetAllEventListeners()
assert.Equal(t, len(al), 1)
assert.Equal(t, al[0].GetEventType(), reflect.TypeOf(TestEvent{}))
b.RemoveEventListener(el)
assert.Equal(t, len(b.GetAllEventListeners()), 0)
var ts []EventListener
ts = append(ts, el)
b.AddEventListeners(ts)
assert.Equal(t, len(al), 1)
}
type TestEvent struct {
BaseEvent
}
type TestEventListener struct {
EventListener
}
func (tel *TestEventListener) OnEvent(e Event) error {
return nil
}
func (tel *TestEventListener) GetPriority() int {
return -1
}
func (tel *TestEventListener) GetEventType() reflect.Type {
return reflect.TypeOf(TestEvent{})
}
......@@ -42,14 +42,13 @@ type multiConfiger interface {
// BaseConfig is the common configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
fileStream *bytes.Buffer
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
eventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"`
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
fileStream *bytes.Buffer
}
// startConfigCenter will start the config center.
......
......@@ -33,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
_ "github.com/apache/dubbo-go/common/observer/dispatcher"
)
var (
......@@ -91,6 +92,17 @@ func Load() {
}
}
var eventDispatcherType string
if consumerConfig != nil {
eventDispatcherType = consumerConfig.eventDispatcherType
}
// notice consumerConfig.eventDispatcherType will be replaced
if providerConfig != nil {
eventDispatcherType = providerConfig.eventDispatcherType
}
// init EventDispatcher should before everything
extension.SetAndInitGlobalDispatcher(eventDispatcherType)
// reference config
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
......
......@@ -25,6 +25,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/remoting"
)
......@@ -47,47 +48,9 @@ func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service)
}
// Event is align with Event interface in Java.
// it's the top abstraction
// Align with 2.7.5
type Event interface {
fmt.Stringer
GetSource() interface{}
GetTimestamp() time.Time
}
// baseEvent is the base implementation of Event
// You should never use it directly
type baseEvent struct {
source interface{}
timestamp time.Time
}
// GetSource return the source
func (b *baseEvent) GetSource() interface{} {
return b.source
}
// GetTimestamp return the timestamp when the event is created
func (b *baseEvent) GetTimestamp() time.Time {
return b.timestamp
}
// String return a human readable string representing this event
func (b *baseEvent) String() string {
return fmt.Sprintf("baseEvent[source = %#v]", b.source)
}
func newBaseEvent(source interface{}) *baseEvent {
return &baseEvent{
source: source,
timestamp: time.Now(),
}
}
// ServiceInstancesChangedEvent represents service instances make some changing
type ServiceInstancesChangedEvent struct {
baseEvent
observer.BaseEvent
ServiceName string
Instances []ServiceInstance
}
......@@ -100,9 +63,9 @@ func (s *ServiceInstancesChangedEvent) String() string {
// NewServiceInstancesChangedEvent will create the ServiceInstanceChangedEvent instance
func NewServiceInstancesChangedEvent(serviceName string, instances []ServiceInstance) *ServiceInstancesChangedEvent {
return &ServiceInstancesChangedEvent{
baseEvent: baseEvent{
source: serviceName,
timestamp: time.Now(),
BaseEvent: observer.BaseEvent{
Source: serviceName,
Timestamp: time.Now(),
},
ServiceName: serviceName,
Instances: instances,
......
......@@ -18,25 +18,27 @@
package registry
import (
gxsort "github.com/dubbogo/gost/sort"
"reflect"
)
// EventListener is an new interface used to align with dubbo 2.7.5
// It contains the Prioritized means that the listener has its priority
type EventListener interface {
gxsort.Prioritizer
// OnEvent handle this event
OnEvent(e Event) error
}
// ConditionalEventListener only handle the event which it can handle
type ConditionalEventListener interface {
EventListener
// Accept will make the decision whether it should handle this event
Accept(e Event) bool
}
import (
"github.com/apache/dubbo-go/common/observer"
)
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
observer.EventListener
}
func (sicl *ServiceInstancesChangedListener) OnEvent(e observer.Event) error {
return nil
}
func (sicl *ServiceInstancesChangedListener) GetPriority() int {
return -1
}
func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type {
return reflect.TypeOf(&ServiceInstancesChangedEvent{})
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment