Skip to content
Snippets Groups Projects
listener.go 5.24 KiB
Newer Older
package etcdv3

import (
	"sync"
	"time"
scott.wang's avatar
scott.wang committed
	"github.com/coreos/etcd/clientv3"
)

import (
	"github.com/coreos/etcd/mvcc/mvccpb"
	"github.com/juju/errors"
)

import (
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/remoting"
)

type EventListener struct {
scott.wang's avatar
scott.wang committed
	client     *Client
	keyMapLock sync.Mutex
	keyMap     map[string]struct{}
	wg         sync.WaitGroup
}

func NewEventListener(client *Client) *EventListener {
	return &EventListener{
scott.wang's avatar
scott.wang committed
		client: client,
		keyMap: make(map[string]struct{}),
scott.wang's avatar
scott.wang committed
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
scott.wang's avatar
scott.wang committed
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
	l.wg.Add(1)
	defer l.wg.Done()
	for {
scott.wang's avatar
scott.wang committed
		wc, err := l.client.Watch(key)
		if err != nil {
scott.wang's avatar
scott.wang committed
			logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
			return false
		}

		select {

		// client stopped
		case <-l.client.Done():
scott.wang's avatar
scott.wang committed
			logger.Warnf("etcd client stopped")
			return false

		// client ctx stop
		case <-l.client.ctx.Done():
			logger.Warnf("etcd client ctx cancel")
			return false

		// handle etcd events
scott.wang's avatar
scott.wang committed
		case e, ok := <-wc:
			if !ok {
				logger.Warnf("etcd watch-chan closed")
				return false
			}
scott.wang's avatar
scott.wang committed
			if e.Err() != nil {
scott.wang's avatar
scott.wang committed
				logger.Errorf("etcd watch ERR {err: %s}", e.Err())
scott.wang's avatar
scott.wang committed
				continue
scott.wang's avatar
scott.wang committed
			for _, event := range e.Events {
				if l.handleEvents(event, listener...) {
					// if event is delete
					return true
				}
			}
		}
	}

	return false
}

scott.wang's avatar
scott.wang committed
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {

scott.wang's avatar
scott.wang committed
	logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key)
scott.wang's avatar
scott.wang committed

	switch event.Type {
	// the etcdv3 event just include PUT && DELETE
	case mvccpb.PUT:
		for _, listener := range listeners {
			switch event.IsCreate() {
			case true:
scott.wang's avatar
scott.wang committed
				logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
scott.wang's avatar
scott.wang committed
				listener.DataChange(remoting.Event{
					Path:    string(event.Kv.Key),
					Action:  remoting.EventTypeAdd,
					Content: string(event.Kv.Value),
				})
			case false:
scott.wang's avatar
scott.wang committed
				logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
scott.wang's avatar
scott.wang committed
				listener.DataChange(remoting.Event{
					Path:    string(event.Kv.Key),
					Action:  remoting.EvnetTypeUpdate,
					Content: string(event.Kv.Value),
				})
			}
		}
		return false
scott.wang's avatar
scott.wang committed
	case mvccpb.DELETE:
scott.wang's avatar
scott.wang committed
		logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
scott.wang's avatar
scott.wang committed
		return true
scott.wang's avatar
scott.wang committed
	panic("unreachable")
scott.wang's avatar
scott.wang committed
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {

	l.wg.Add(1)
	defer l.wg.Done()
	for {
scott.wang's avatar
scott.wang committed
		wc, err := l.client.WatchWithPrefix(prefix)
		if err != nil {
scott.wang's avatar
scott.wang committed
			logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
scott.wang's avatar
scott.wang committed

		// client stopped
		case <-l.client.Done():
scott.wang's avatar
scott.wang committed
			logger.Warnf("etcd client stopped")
			return

			// client ctx stop
		case <-l.client.ctx.Done():
			logger.Warnf("etcd client ctx cancel")
			return
scott.wang's avatar
scott.wang committed

			// etcd event stream
scott.wang's avatar
scott.wang committed
		case e, ok := <-wc:

			if !ok {
				logger.Warnf("etcd watch-chan closed")
				return
			}

scott.wang's avatar
scott.wang committed
			if e.Err() != nil {
scott.wang's avatar
scott.wang committed
				logger.Errorf("etcd watch ERR {err: %s}", e.Err())
scott.wang's avatar
scott.wang committed
				continue
			}
			for _, event := range e.Events {
				l.handleEvents(event, listener...)
			}
		}
	}
}

func timeSecondDuration(sec int) time.Duration {
	return time.Duration(sec) * time.Second
}

scott.wang's avatar
scott.wang committed
// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
//                            |
//                            --------> ListenServiceNodeEvent
scott.wang's avatar
scott.wang committed
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
scott.wang's avatar
scott.wang committed
	l.keyMapLock.Lock()
	_, ok := l.keyMap[key]
	l.keyMapLock.Unlock()
	if ok {
scott.wang's avatar
scott.wang committed
		logger.Warnf("etcdv3 key %s has already been listened.", key)
scott.wang's avatar
scott.wang committed
	l.keyMapLock.Lock()
	l.keyMap[key] = struct{}{}
	l.keyMapLock.Unlock()
scott.wang's avatar
scott.wang committed
	keyList, valueList, err := l.client.getChildren(key)
	if err != nil {
		logger.Errorf("Get new node path {%v} 's content error,message is  {%v}", key, errors.Annotate(err, "get children"))
	}

	logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)

scott.wang's avatar
scott.wang committed
	for i, k := range keyList {
		logger.Warnf("get children list key -> %s", k)
		if !listener.DataChange(remoting.Event{
scott.wang's avatar
scott.wang committed
			Path:    k,
			Action:  remoting.EventTypeAdd,
			Content: valueList[i],
		}) {
			continue
		}
	}

scott.wang's avatar
scott.wang committed
	logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
	go func(key string, listener remoting.DataListener) {
		l.ListenServiceNodeEventWithPrefix(key, listener)
		logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
	}(key, listener)
scott.wang's avatar
scott.wang committed
	logger.Infof("listen dubbo service key{%s}", key)
	go func(key string) {
		if l.ListenServiceNodeEvent(key) {
			listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
scott.wang's avatar
scott.wang committed
		logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)
	}(key)
scott.wang's avatar
scott.wang committed
func (l *EventListener) Close() {
	l.wg.Wait()
}