/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package etcdv3 import ( "sync" "time" ) import ( gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) import ( "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/remoting" ) // nolint type EventListener struct { client *gxetcd.Client keyMapLock sync.RWMutex keyMap map[string]struct{} wg sync.WaitGroup } // NewEventListener returns a EventListener instance func NewEventListener(client *gxetcd.Client) *EventListener { return &EventListener{ client: client, keyMap: make(map[string]struct{}), } } // listenServiceNodeEvent Listen on a spec key // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { defer l.wg.Done() for { wc, err := l.client.Watch(key) if err != nil { logger.Warnf("WatchExist{key:%s} = error{%v}", key, err) return false } select { // client stopped case <-l.client.Done(): logger.Warnf("etcd client stopped") return false // client ctx stop case <-l.client.GetCtx().Done(): logger.Warnf("etcd client ctx cancel") return false // handle etcd events case e, ok := <-wc: if !ok { logger.Warnf("etcd watch-chan closed") return false } if e.Err() != nil { logger.Errorf("etcd watch ERR {err: %s}", e.Err()) continue } for _, event := range e.Events { if l.handleEvents(event, listener...) { // if event is delete return true } } } } } // return true means the event type is DELETE // return false means the event type is CREATE || UPDATE func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool { logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key) switch event.Type { // the etcdv3 event just include PUT && DELETE case mvccpb.PUT: for _, listener := range listeners { switch event.IsCreate() { case true: logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) listener.DataChange(remoting.Event{ Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value), }) case false: logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) listener.DataChange(remoting.Event{ Path: string(event.Kv.Key), Action: remoting.EventTypeUpdate, Content: string(event.Kv.Value), }) } } return false case mvccpb.DELETE: logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key) return true default: return false } } // ListenServiceNodeEventWithPrefix listens on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { defer l.wg.Done() for { wc, err := l.client.WatchWithPrefix(prefix) if err != nil { logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) } select { // client stopped case <-l.client.Done(): logger.Warnf("etcd client stopped") return // client ctx stop case <-l.client.GetCtx().Done(): logger.Warnf("etcd client ctx cancel") return // etcd event stream case e, ok := <-wc: if !ok { logger.Warnf("etcd watch-chan closed") return } if e.Err() != nil { logger.Errorf("etcd watch ERR {err: %s}", e.Err()) continue } for _, event := range e.Events { l.handleEvents(event, listener...) } } } } func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } // ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | // --------> listenServiceNodeEvent func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { l.keyMapLock.RLock() _, ok := l.keyMap[key] l.keyMapLock.RUnlock() if ok { logger.Warnf("etcdv3 key %s has already been listened.", key) return } l.keyMapLock.Lock() l.keyMap[key] = struct{}{} l.keyMapLock.Unlock() keyList, valueList, err := l.client.GetChildren(key) if err != nil { logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) } logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) for i, k := range keyList { logger.Infof("got children list key -> %s", k) listener.DataChange(remoting.Event{ Path: k, Action: remoting.EventTypeAdd, Content: valueList[i], }) } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) l.wg.Add(1) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) }(key, listener) logger.Infof("listen dubbo service key{%s}", key) l.wg.Add(1) go func(key string) { if l.ListenServiceNodeEvent(key) { listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) } logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key) }(key) } // nolint func (l *EventListener) Close() { l.wg.Wait() }