Skip to content
Snippets Groups Projects
Commit c47af058 authored by flycash's avatar flycash
Browse files

Merge branch 'develop' of github.com:apache/dubbo-go into feature/prometheus

parents f29402bd 3977bc6c
No related branches found
No related tags found
No related merge requests found
......@@ -304,7 +304,6 @@ func (r *zkRegistry) register(c common.URL) error {
if c.Path == "" || len(c.Methods) == 0 {
return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
}
// 先创建服务下面的provider node
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
r.cltLock.Lock()
err = r.client.Create(dubboPath)
......
......@@ -43,6 +43,7 @@ const (
var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
)
// ZookeeperClient ...
......@@ -513,7 +514,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
return nil, nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, nil, perrors.Errorf("path{%s} has none children", path)
return nil, nil, errNilChildren
}
return children, event, nil
......@@ -544,7 +545,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, perrors.Errorf("path{%s} has none children", path)
return nil, errNilChildren
}
return children, nil
......
......@@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) {
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func Test_UnregisterEvent(t *testing.T) {
client := &ZookeeperClient{}
client.eventRegistry = make(map[string][]*chan struct{})
array := []*chan struct{}{}
array = append(array, new(chan struct{}))
client.eventRegistry["test"] = array
client.UnregisterEvent("test", new(chan struct{}))
}
......@@ -19,7 +19,6 @@ package zookeeper
import (
"path"
"strings"
"sync"
"time"
)
......@@ -111,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
if err == errNilChildren {
content, _, err := l.client.Conn.Get(zkPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err))
} else {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}
} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
}
// a node was added -- listen the new node
......@@ -272,51 +280,6 @@ func timeSecondDuration(sec int) time.Duration {
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)
zkPath = strings.ReplaceAll(zkPath, "$", "%24")
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, listener remoting.DataListener) {
if l.ListenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
......
......@@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)
time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
client.Close()
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment