diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index ebd63694b7772715c1f6140b12389bfa526d21c6..bc815c3ae0eaf1b5700d0e9d613e3189d64a2da2 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -304,7 +304,6 @@ func (r *zkRegistry) register(c common.URL) error { if c.Path == "" || len(c.Methods) == 0 { return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } - // 鍏堝垱寤烘湇鍔′笅闈㈢殑provider node dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index b5f02281e9dfc5b5b6f1a289164e85d2b457d1a8..0c70caa7fa9f8c6283616d66ffd459349833df39 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -43,6 +43,7 @@ const ( var ( errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") + errNilChildren = perrors.Errorf("has none children") ) // ZookeeperClient ... @@ -513,7 +514,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, return nil, nil, perrors.Errorf("path{%s} has none children", path) } if len(children) == 0 { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, errNilChildren } return children, event, nil @@ -544,7 +545,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { return nil, perrors.Errorf("path{%s} has none children", path) } if len(children) == 0 { - return nil, perrors.Errorf("path{%s} has none children", path) + return nil, errNilChildren } return children, nil diff --git a/remoting/zookeeper/client_test.go b/remoting/zookeeper/client_test.go index f1bd0c2cb38669ad968bd83efae166a4432c6e2d..17b9f0f33128eab7fc682fd37aeb454c8c2c5066 100644 --- a/remoting/zookeeper/client_test.go +++ b/remoting/zookeeper/client_test.go @@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) { states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} verifyEventStateOrder(t, event, states, "event channel") } + +func Test_UnregisterEvent(t *testing.T) { + client := &ZookeeperClient{} + client.eventRegistry = make(map[string][]*chan struct{}) + array := []*chan struct{}{} + array = append(array, new(chan struct{})) + client.eventRegistry["test"] = array + client.UnregisterEvent("test", new(chan struct{})) +} diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 407cd8a230d730724dc6a40c2a885723f5087d60..6850aa3ffac28fbf52c2924649ef8ed1b149f88b 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -19,7 +19,6 @@ package zookeeper import ( "path" - "strings" "sync" "time" ) @@ -111,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newChildren, err := l.client.GetChildren(zkPath) if err != nil { - logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) - return + if err == errNilChildren { + content, _, err := l.client.Conn.Get(zkPath) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err)) + } else { + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)}) + } + + } else { + logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) + } } // a node was added -- listen the new node @@ -272,51 +280,6 @@ func timeSecondDuration(sec int) time.Duration { // | // --------> ListenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { - var ( - err error - dubboPath string - children []string - ) - - zkPath = strings.ReplaceAll(zkPath, "$", "%24") - l.pathMapLock.Lock() - _, ok := l.pathMap[zkPath] - l.pathMapLock.Unlock() - if ok { - logger.Warnf("@zkPath %s has already been listened.", zkPath) - return - } - - l.pathMapLock.Lock() - l.pathMap[zkPath] = struct{}{} - l.pathMapLock.Unlock() - - logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) - children, err = l.client.GetChildren(zkPath) - if err != nil { - children = nil - logger.Warnf("fail to get children of zk path{%s}", zkPath) - } - - for _, c := range children { - // listen l service node - dubboPath = path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string, listener remoting.DataListener) { - if l.ListenServiceNodeEvent(zkPath) { - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) - } - logger.Infof("listen dubbo path{%s}", zkPath) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go index aa627c7e8a53ef87fb39446b05d4001bcf18cf3f..1276e5363f5e3f1fcde9a36d8a94e671e030e6c3 100644 --- a/remoting/zookeeper/listener_test.go +++ b/remoting/zookeeper/listener_test.go @@ -97,12 +97,11 @@ func TestListener(t *testing.T) { listener := NewZkEventListener(client) dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait} listener.ListenServiceEvent("/dubbo", dataListener) - + time.Sleep(1 * time.Second) _, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1) assert.NoError(t, err) wait.Wait() assert.Equal(t, changedData, dataListener.eventList[1].Content) - client.Close() }