diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 3df87ed4f603956a653cec692031dee94639de16..4b7ad3b86f1ca296de03772adfd25e962430b6bb 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -47,9 +47,11 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { pathMap: make(map[string]struct{}), } } + func (l *ZkEventListener) SetClient(client *ZookeeperClient) { l.client = client } + func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { l.wg.Add(1) defer l.wg.Done() @@ -72,7 +74,6 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remo content, _, _ := l.client.Conn.Get(zkEvent.Path) listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EvnetTypeUpdate, Content: string(content)}) } - case zk.EventNodeCreated: logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath) if len(listener) > 0 { @@ -100,7 +101,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li return true } } - return false } @@ -130,14 +130,14 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li continue } // listen l service node - go func(node string) { + go func(node string, zkPath string, listener remoting.DataListener) { logger.Infof("delete zkNode{%s}", node) if l.ListenServiceNodeEvent(node, listener) { logger.Infof("delete content{%s}", node) listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(newNode) + }(newNode, zkPath, listener) } // old node was deleted @@ -205,7 +205,6 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi } failTimes = 0 for _, c := range children { - // listen l service node dubboPath := path.Join(zkPath, c) content, _, err := l.client.Conn.Get(dubboPath) @@ -217,14 +216,14 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi continue } logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string) { - if l.ListenServiceNodeEvent(dubboPath) { - listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) + go func(zkPath string, listener remoting.DataListener) { + if l.ListenServiceNodeEvent(zkPath) { + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath) + }(dubboPath, listener) - //liten sub path recursive + //listen sub path recursive go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) @@ -310,7 +309,7 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener +// this func is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent @@ -341,7 +340,6 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da } for _, c := range children { - // listen l service node dubboPath = path.Join(zkPath, c) content, _, err := l.client.Conn.Get(dubboPath) @@ -352,12 +350,12 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da continue } logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string) { - if l.ListenServiceNodeEvent(dubboPath) { - listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) + go func(zkPath string, listener remoting.DataListener) { + if l.ListenServiceNodeEvent(zkPath) { + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath) + }(dubboPath, listener) } logger.Infof("listen dubbo path{%s}", zkPath)