diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index b5f02281e9dfc5b5b6f1a289164e85d2b457d1a8..0c70caa7fa9f8c6283616d66ffd459349833df39 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -43,6 +43,7 @@ const ( var ( errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") + errNilChildren = perrors.Errorf("has none children") ) // ZookeeperClient ... @@ -513,7 +514,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, return nil, nil, perrors.Errorf("path{%s} has none children", path) } if len(children) == 0 { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, errNilChildren } return children, event, nil @@ -544,7 +545,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { return nil, perrors.Errorf("path{%s} has none children", path) } if len(children) == 0 { - return nil, perrors.Errorf("path{%s} has none children", path) + return nil, errNilChildren } return children, nil diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 407cd8a230d730724dc6a40c2a885723f5087d60..34c75fceae061bea38908ceeb6a9cccab926d63a 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -19,7 +19,6 @@ package zookeeper import ( "path" - "strings" "sync" "time" ) @@ -111,8 +110,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newChildren, err := l.client.GetChildren(zkPath) if err != nil { - logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) - return + if err == errNilChildren { + content, _, err := l.client.Conn.Get(zkPath) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err)) + } + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)}) + + } else { + logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) + } } // a node was added -- listen the new node @@ -272,50 +279,50 @@ func timeSecondDuration(sec int) time.Duration { // | // --------> ListenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { - var ( - err error - dubboPath string - children []string - ) - - zkPath = strings.ReplaceAll(zkPath, "$", "%24") - l.pathMapLock.Lock() - _, ok := l.pathMap[zkPath] - l.pathMapLock.Unlock() - if ok { - logger.Warnf("@zkPath %s has already been listened.", zkPath) - return - } - - l.pathMapLock.Lock() - l.pathMap[zkPath] = struct{}{} - l.pathMapLock.Unlock() - - logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) - children, err = l.client.GetChildren(zkPath) - if err != nil { - children = nil - logger.Warnf("fail to get children of zk path{%s}", zkPath) - } - - for _, c := range children { - // listen l service node - dubboPath = path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string, listener remoting.DataListener) { - if l.ListenServiceNodeEvent(zkPath) { - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) - } + //var ( + // err error + // dubboPath string + // children []string + //) + // + //zkPath = strings.ReplaceAll(zkPath, "$", "%24") + //l.pathMapLock.Lock() + //_, ok := l.pathMap[zkPath] + //l.pathMapLock.Unlock() + //if ok { + // logger.Warnf("@zkPath %s has already been listened.", zkPath) + // return + //} + // + //l.pathMapLock.Lock() + //l.pathMap[zkPath] = struct{}{} + //l.pathMapLock.Unlock() + // + //logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) + //children, err = l.client.GetChildren(zkPath) + //if err != nil { + // children = nil + // logger.Warnf("fail to get children of zk path{%s}", zkPath) + //} + // + //for _, c := range children { + // // listen l service node + // dubboPath = path.Join(zkPath, c) + // content, _, err := l.client.Conn.Get(dubboPath) + // if err != nil { + // logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) + // } + // if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { + // continue + // } + // logger.Infof("listen dubbo service key{%s}", dubboPath) + // go func(zkPath string, listener remoting.DataListener) { + // if l.ListenServiceNodeEvent(zkPath) { + // listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) + // } + // logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + // }(dubboPath, listener) + //} logger.Infof("listen dubbo path{%s}", zkPath) go func(zkPath string, listener remoting.DataListener) { diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go index aa627c7e8a53ef87fb39446b05d4001bcf18cf3f..1276e5363f5e3f1fcde9a36d8a94e671e030e6c3 100644 --- a/remoting/zookeeper/listener_test.go +++ b/remoting/zookeeper/listener_test.go @@ -97,12 +97,11 @@ func TestListener(t *testing.T) { listener := NewZkEventListener(client) dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait} listener.ListenServiceEvent("/dubbo", dataListener) - + time.Sleep(1 * time.Second) _, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1) assert.NoError(t, err) wait.Wait() assert.Equal(t, changedData, dataListener.eventList[1].Content) - client.Close() }