Skip to content
Snippets Groups Projects
Commit 603c1b93 authored by scott's avatar scott
Browse files

Fix wg bug, add(1) out of goroutine

parent 83d39758
No related branches found
No related tags found
No related merge requests found
......@@ -53,7 +53,6 @@ func NewEventListener(client *Client) *EventListener {
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.Watch(key)
......@@ -138,8 +137,6 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.WatchWithPrefix(prefix)
......@@ -217,12 +214,14 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
l.wg.Add(1)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
l.wg.Add(1)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
......
......@@ -48,7 +48,6 @@ func NewEventListener(client *Client) *EventListener {
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, done, err := l.client.Watch(key)
......@@ -124,7 +123,6 @@ func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, done, err := l.client.WatchWithPrefix(prefix)
......@@ -197,12 +195,15 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key)
l.wg.Add(1)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
l.wg.Add(1)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
......
......@@ -59,7 +59,6 @@ func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
// ListenServiceNodeEvent ...
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
var zkEvent zk.Event
for {
......@@ -145,6 +144,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
continue
}
// listen l service node
l.wg.Add(1)
go func(node string, zkPath string, listener remoting.DataListener) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
......@@ -174,7 +174,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
......@@ -261,6 +260,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
//if zkPath is end of "providers/ & consumers/" we do not listen children dir
if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 &&
strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
......@@ -292,6 +292,7 @@ func timeSecondDuration(sec int) time.Duration {
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
logger.Infof("listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment