Newer
Older
)
import (
"github.com/coreos/etcd/mvcc/mvccpb"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
keyMap: make(map[string]struct{}),
}
}
func (l *EventListener) SetClient(client *Client) {
l.client = client
}
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
return false
}
select {
// client watch ctx stop
// server stopped
case <-l.client.cs.ctx.Done():
return false
// client stopped
case <-l.client.Done():
return false
// etcd event stream
case e := <-keyEventCh:
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
continue
for _, event := range e.Events {
if l.handleEvents(event, listener...) {
return true
}
}
}
}
return false
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {
logger.Warnf("get a etcdv3 Event {type: %s, key: %s}", event.Type, event.Kv.Key)
switch event.Type {
// the etcdv3 event just include PUT && DELETE
case mvccpb.PUT:
for _, listener := range listeners {
switch event.IsCreate() {
case true:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Kv.Value),
})
case false:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EvnetTypeUpdate,
Content: string(event.Kv.Value),
})
case mvccpb.DELETE:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
return true
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
// client watch ctx stop
// server stopped
case <-l.client.cs.ctx.Done():
logger.Warn("etcd listener service node with prefix etcd server stopped")
return
// client stopped
// etcd event stream
case e := <-wc:
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
continue
}
for _, event := range e.Events {
l.handleEvents(event, listener...)
}
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
logger.Warnf("etcdv3 key %s has already been listened.", key)
l.keyMapLock.Lock()
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)
}(key)
func (l *EventListener) valid() bool {
return l.client.Valid()