diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 453cb574187fd2095b121971240446eb6d27b208..2c61a4970521f098941ca6e7e02a2cb5cc616901 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -56,12 +56,13 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen l.subscribed[url.ServiceKey()] = listener } -// UnSubscribeURL is used to set a watch listener for url +// UnSubscribeURL is used to unset a watch listener for url func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { if l.closed { return nil } listener := l.subscribed[url.ServiceKey()] + listener.(*RegistryConfigurationListener).Close() delete(l.subscribed, url.ServiceKey()) return listener } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index bce8eb531ca49e805a543a438484217a5f4338ce..fe492c2b12712e0935ccd6fdd264e0fe8f24e213 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -77,7 +77,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { if err != nil { return nil, err } - r.WaitGroup().Add(1) //zk client start successful, then wg +1 + go zookeeper.HandleClientRestart(r) r.listener = zookeeper.NewZkEventListener(r.client) @@ -129,7 +129,7 @@ func (r *zkRegistry) InitListeners() { defer oldDataListener.mutex.Unlock() r.dataListener.closed = true recovered := r.dataListener.subscribed - if len(recovered) > 0 { + if recovered != nil && len(recovered) > 0 { // recover all subscribed url for _, oldListener := range recovered { var ( @@ -178,6 +178,7 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) // CloseAndNilClient closes listeners and clear client func (r *zkRegistry) CloseAndNilClient() { + r.client.Close() r.client = nil } @@ -250,7 +251,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen dataListener.mutex.Lock() defer dataListener.mutex.Unlock() if r.dataListener.subscribed[conf.ServiceKey()] != nil { - zkListener, _ = r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener) + + zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener) if zkListener != nil { r.listenerLock.Lock() defer r.listenerLock.Unlock() @@ -282,11 +284,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen //Interested register to dataconfig. r.dataListener.SubscribeURL(conf, zkListener) - go r.listener.ListenServiceEvent( - conf, - fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), - r.dataListener, - ) + go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener) return zkListener, nil } @@ -297,9 +295,9 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL r.dataListener.mutex.Lock() configurationListener := r.dataListener.subscribed[conf.ServiceKey()] if configurationListener != nil { - rcListener, _ := configurationListener.(*RegistryConfigurationListener) - if rcListener != nil { - if rcListener.isClosed { + zkListener, _ := configurationListener.(*RegistryConfigurationListener) + if zkListener != nil { + if zkListener.isClosed { r.dataListener.mutex.Unlock() return nil, perrors.New("configListener already been closed") } diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 5959a903bc883141f158c5462b0a775c711f8f51..e093175a29185a4daec1715c7833cda8ff1ef3e6 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -37,11 +37,14 @@ func Test_Register(t *testing.T) { regURL, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, _ := newMockZkRegistry(regURL) + ts, reg, err := newMockZkRegistry(regURL) + if err != nil { + assert.NoError(t, err) + } defer func() { _ = ts.Stop() }() - err := reg.Register(url) + err = reg.Register(url) children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children) assert.NoError(t, err) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index d752cdafc4f7e250e2ba5f5c6209f83348100906..a0f3f28027cc063f885041f59728e3c0d204bb80 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -47,6 +47,7 @@ type ZkEventListener struct { pathMapLock sync.Mutex pathMap map[string]struct{} wg sync.WaitGroup + exit chan struct{} } // NewZkEventListener returns a EventListener instance @@ -54,6 +55,7 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { return &ZkEventListener{ client: client, pathMap: make(map[string]struct{}), + exit: make(chan struct{}), } } @@ -116,6 +118,8 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath) return true } + case <-l.exit: + return false } } } @@ -244,6 +248,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): l.client.UnregisterEvent(zkPath, &event) continue + case <-l.exit: + l.client.UnregisterEvent(zkPath, &event) + logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath) + return case <-event: logger.Infof("get zk.EventNodeDataChange notify event") l.client.UnregisterEvent(zkPath, &event) @@ -331,6 +339,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen } l.handleZkNodeEvent(zkEvent.Path, children, listener) break WATCH + case <-l.exit: + logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath) + ticker.Stop() + return } } @@ -360,5 +372,6 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li // Close will let client listen exit func (l *ZkEventListener) Close() { + close(l.exit) l.wg.Wait() }