Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"github.com/apache/dubbo-go/common/logger"
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}

vito.he
committed
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
keyEventCh, err := l.client.ExistW(zkPath)

vito.he
committed
logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EvnetTypeUpdate, Content: string(content)})
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)})
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
case <-l.client.Done():
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
newChildren, err := l.client.GetChildren(zkPath)
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = path.Join(zkPath, n)
content, _, err := l.client.Conn.Get(newNode)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err))
}

vito.he
committed
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
go func(node string) {

vito.he
committed
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", n)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
children, childEventCh, err := l.client.GetChildrenW(zkPath)
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes

vito.he
committed
logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)

vito.he
committed
for _, c := range children {
// listen l service node
dubboPath := path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
logger.Infof("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {

vito.he
committed
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})

vito.he
committed
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)

vito.he
committed
//liten sub path recursive
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)

vito.he
committed
//
//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) {

vito.he
committed
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// defer l.wg.Done()
//
// var (
// failTimes int
// event chan struct{}
// zkEvent zk.Event
// )
// event = make(chan struct{}, 4)
// defer close(event)
// for {
// // get current children for a zkPath
// content,_, eventCh, err := l.client.Conn.GetW(zkPath)
// if err != nil {
// failTimes++
// if MaxFailTimes <= failTimes {
// failTimes = MaxFailTimes
// }
// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err)
// // clear the event channel
// CLEAR:
// for {
// select {
// case <-event:
// default:
// break CLEAR
// }
// }
// l.client.RegisterEvent(zkPath, &event)
// select {
// case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
// l.client.UnregisterEvent(zkPath, &event)
// continue
// case <-l.client.Done():
// l.client.UnregisterEvent(zkPath, &event)
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// case <-event:
// logger.Infof("get zk.EventNodeDataChange notify event")
// l.client.UnregisterEvent(zkPath, &event)
// l.handleZkNodeEvent(zkPath, nil, listener)
// continue
// }
// }
// failTimes = 0
//
// select {
// case zkEvent = <-eventCh:
// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
//
// l.handleZkNodeEvent(zkEvent.Path, children, listener)
// case <-l.client.Done():
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// }
// }
//}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener

vito.he
committed
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent

vito.he
committed
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {

vito.he
committed
err error
dubboPath string
children []string
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
logger.Warnf("@zkPath %s has already been listened.", zkPath)
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)

vito.he
committed
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
logger.Infof("listen dubbo service key{%s}", dubboPath)

vito.he
committed
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)

vito.he
committed
}(dubboPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
func (l *ZkEventListener) Close() {