Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

AlexStocks
committed
"github.com/apache/dubbo-go/common/logger"

AlexStocks
committed
"github.com/apache/dubbo-go/common"
type ZkEvent struct {
Res *common.Event
func (e ZkEvent) String() string {
return fmt.Sprintf("err:%s, res:%s", e.err, e.Res)
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
keyEventCh, err := l.client.ExistW(zkPath)
logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
case <-l.client.Done():
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
newChildren, err := l.client.GetChildren(zkPath)
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = path.Join(zkPath, n)
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) {
go func(node string) {
logger.Infof("delete content{%s}", n)
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil})
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) {
logger.Warnf("delete content{%s}", n)
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil})
func (l *ZkEventListener) listenDirEvent(zkPath string, listener DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
children, childEventCh, err := l.client.GetChildrenW(zkPath)
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
logger.Errorf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
}
}
failTimes = 0
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener DataListener) {
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
logger.Warnf("@zkPath %s has already been listened.", zkPath)
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
logger.Errorf("fail to get children of zk path{%s}", zkPath)
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: c}, nil}) {
continue
}
// listen l service node
dubboPath = path.Join(zkPath, c)
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL common.URL) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: c}, nil})
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
go func(zkPath string, listener DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
func (l *ZkEventListener) Close() {