From 603c1b93eccf8dc6732ce9aa91f08b109b34b4da Mon Sep 17 00:00:00 2001 From: scott <scottwangsxll@gmail.com> Date: Sun, 15 Mar 2020 21:09:22 +0800 Subject: [PATCH] Fix wg bug, add(1) out of goroutine --- remoting/etcdv3/listener.go | 5 ++--- remoting/kubernetes/listener.go | 5 +++-- remoting/zookeeper/listener.go | 5 +++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index e69f72698..e3cb74e4f 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -53,7 +53,6 @@ func NewEventListener(client *Client) *EventListener { // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() for { wc, err := l.client.Watch(key) @@ -138,8 +137,6 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin // ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { - - l.wg.Add(1) defer l.wg.Done() for { wc, err := l.client.WatchWithPrefix(prefix) @@ -217,12 +214,14 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + l.wg.Add(1) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) }(key, listener) logger.Infof("listen dubbo service key{%s}", key) + l.wg.Add(1) go func(key string) { if l.ListenServiceNodeEvent(key) { listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index c0f9e9f11..87deeb38c 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -48,7 +48,6 @@ func NewEventListener(client *Client) *EventListener { // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() for { wc, done, err := l.client.Watch(key) @@ -124,7 +123,6 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting. // Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { - l.wg.Add(1) defer l.wg.Done() for { wc, done, err := l.client.WatchWithPrefix(prefix) @@ -197,12 +195,15 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key) + + l.wg.Add(1) go func(key string, listener remoting.DataListener) { l.ListenServiceNodeEventWithPrefix(key, listener) logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) }(key, listener) logger.Infof("listen dubbo service key{%s}", key) + l.wg.Add(1) go func(key string) { if l.ListenServiceNodeEvent(key) { listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 77aa05ee9..6c8b1720a 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -59,7 +59,6 @@ func (l *ZkEventListener) SetClient(client *ZookeeperClient) { // ListenServiceNodeEvent ... func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { - l.wg.Add(1) defer l.wg.Done() var zkEvent zk.Event for { @@ -145,6 +144,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li continue } // listen l service node + l.wg.Add(1) go func(node string, zkPath string, listener remoting.DataListener) { logger.Infof("delete zkNode{%s}", node) if l.ListenServiceNodeEvent(node, listener) { @@ -174,7 +174,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { - l.wg.Add(1) defer l.wg.Done() var ( @@ -261,6 +260,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi //if zkPath is end of "providers/ & consumers/" we do not listen children dir if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 && strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 { + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) @@ -292,6 +292,7 @@ func timeSecondDuration(sec int) time.Duration { // --------> ListenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { logger.Infof("listen dubbo path{%s}", zkPath) + l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) -- GitLab