diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index e69f7269879138d7579460fd059d28c297960d7e..e3cb74e4f676efa1f325ac45e32b21b39d1bbd6a 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -53,7 +53,6 @@ func NewEventListener(client *Client) *EventListener { // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() for { wc, err := l.client.Watch(key) @@ -138,8 +137,6 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin // ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { - - l.wg.Add(1) defer l.wg.Done() for { wc, err := l.client.WatchWithPrefix(prefix) @@ -217,12 +214,14 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + l.wg.Add(1) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) }(key, listener) logger.Infof("listen dubbo service key{%s}", key) + l.wg.Add(1) go func(key string) { if l.ListenServiceNodeEvent(key) { listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index c0f9e9f119d5b3cb007033767b53313475310150..87deeb38c922780d778bec41d5a8239e2e99cd5e 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -48,7 +48,6 @@ func NewEventListener(client *Client) *EventListener { // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() for { wc, done, err := l.client.Watch(key) @@ -124,7 +123,6 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting. // Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { - l.wg.Add(1) defer l.wg.Done() for { wc, done, err := l.client.WatchWithPrefix(prefix) @@ -197,12 +195,15 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key) + + l.wg.Add(1) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) }(key, listener) logger.Infof("listen dubbo service key{%s}", key) + l.wg.Add(1) go func(key string) { if l.ListenServiceNodeEvent(key) { listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 77aa05ee9eada327475fa5bf86c7af2c65de0ef2..6c8b1720a728f6bf4441450d04215f919deb2efe 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -59,7 +59,6 @@ func (l *ZkEventListener) SetClient(client *ZookeeperClient) { // ListenServiceNodeEvent ... func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() var zkEvent zk.Event for { @@ -145,6 +144,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li continue } // listen l service node + l.wg.Add(1) go func(node string, zkPath string, listener remoting.DataListener) { logger.Infof("delete zkNode{%s}", node) if l.ListenServiceNodeEvent(node, listener) { @@ -174,7 +174,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { - l.wg.Add(1) defer l.wg.Done() var ( @@ -261,6 +260,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi //if zkPath is end of "providers/ & consumers/" we do not listen children dir if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 && strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 { + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) @@ -292,6 +292,7 @@ func timeSecondDuration(sec int) time.Duration { // --------> ListenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { logger.Infof("listen dubbo path{%s}", zkPath) + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)