Skip to content
Snippets Groups Projects
Commit a7c2c8b7 authored by xg.gao's avatar xg.gao
Browse files

add lock for event registry map to avoid concurrent read write

parent 4f382b03
No related branches found
No related tags found
No related merge requests found
......@@ -49,14 +49,16 @@ var (
// ZookeeperClient ...
type ZookeeperClient struct {
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
eventRegistryLock sync.RWMutex
}
// StateToString ...
......@@ -269,7 +271,7 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.RLock()
z.eventRegistryLock.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
......@@ -279,16 +281,18 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
}
}
}
z.RUnlock()
z.eventRegistryLock.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
}
z.eventRegistryLock.RLock()
if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
for _, e := range a {
*e <- struct{}{}
}
}
z.eventRegistryLock.RUnlock()
}
state = (int)(event.State)
}
......@@ -301,8 +305,8 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
return
}
z.Lock()
defer z.Unlock()
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
a := z.eventRegistry[zkPath]
a = append(a, event)
z.eventRegistry[zkPath] = a
......@@ -314,8 +318,9 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
z.Lock()
defer z.Unlock()
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
infoList, ok := z.eventRegistry[zkPath]
if !ok {
return
......
......@@ -111,7 +111,7 @@ func TestCreateDelete(t *testing.T) {
require.NoError(t, err)
err = z.Delete("/test1/test2/test3/test4")
require.NoError(t, err)
//verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
// verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}
func TestRegisterTemp(t *testing.T) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment