Skip to content
Snippets Groups Projects
Commit 0804c09c authored by vito.he's avatar vito.he
Browse files

modify zk ListenServiceEvent

parent 82fe0798
No related branches found
No related tags found
No related merge requests found
......@@ -43,6 +43,7 @@ const (
var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
)
// ZookeeperClient ...
......@@ -513,7 +514,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
return nil, nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, nil, perrors.Errorf("path{%s} has none children", path)
return nil, nil, errNilChildren
}
return children, event, nil
......@@ -544,7 +545,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, perrors.Errorf("path{%s} has none children", path)
return nil, errNilChildren
}
return children, nil
......
......@@ -19,7 +19,6 @@ package zookeeper
import (
"path"
"strings"
"sync"
"time"
)
......@@ -111,8 +110,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
if err == errNilChildren {
content, _, err := l.client.Conn.Get(zkPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err))
}
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
}
// a node was added -- listen the new node
......@@ -272,50 +279,50 @@ func timeSecondDuration(sec int) time.Duration {
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)
zkPath = strings.ReplaceAll(zkPath, "$", "%24")
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, listener remoting.DataListener) {
if l.ListenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
//var (
// err error
// dubboPath string
// children []string
//)
//
//zkPath = strings.ReplaceAll(zkPath, "$", "%24")
//l.pathMapLock.Lock()
//_, ok := l.pathMap[zkPath]
//l.pathMapLock.Unlock()
//if ok {
// logger.Warnf("@zkPath %s has already been listened.", zkPath)
// return
//}
//
//l.pathMapLock.Lock()
//l.pathMap[zkPath] = struct{}{}
//l.pathMapLock.Unlock()
//
//logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
//children, err = l.client.GetChildren(zkPath)
//if err != nil {
// children = nil
// logger.Warnf("fail to get children of zk path{%s}", zkPath)
//}
//
//for _, c := range children {
// // listen l service node
// dubboPath = path.Join(zkPath, c)
// content, _, err := l.client.Conn.Get(dubboPath)
// if err != nil {
// logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
// }
// if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
// continue
// }
// logger.Infof("listen dubbo service key{%s}", dubboPath)
// go func(zkPath string, listener remoting.DataListener) {
// if l.ListenServiceNodeEvent(zkPath) {
// listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
// }
// logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
// }(dubboPath, listener)
//}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
......
......@@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)
time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
client.Close()
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment