diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index d0220ddf0dc415ad0d593d0f0eed34cd698b1879..ec82fa0309118fba4b5c21772d4dfd356f3b0c5c 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -37,7 +37,7 @@ import ( // RegistryDataListener contains all URL information subscribed by zookeeper registry type RegistryDataListener struct { - subscribed map[*common.URL]config_center.ConfigurationListener + subscribed map[string]config_center.ConfigurationListener mutex sync.Mutex closed bool } @@ -45,7 +45,7 @@ type RegistryDataListener struct { // NewRegistryDataListener constructs a new RegistryDataListener func NewRegistryDataListener() *RegistryDataListener { return &RegistryDataListener{ - subscribed: make(map[*common.URL]config_center.ConfigurationListener)} + subscribed: make(map[string]config_center.ConfigurationListener)} } // SubscribeURL is used to set a watch listener for url @@ -53,7 +53,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen if l.closed { return } - l.subscribed[url] = listener + l.subscribed[url.ServiceKey()] = listener } // UnSubscribeURL is used to set a watch listener for url @@ -61,8 +61,8 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con if l.closed { return nil } - listener := l.subscribed[url] - delete(l.subscribed, url) + listener := l.subscribed[url.ServiceKey()] + delete(l.subscribed, url.ServiceKey()) return listener } @@ -85,8 +85,8 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { if l.closed { return false } - for url, listener := range l.subscribed { - if serviceURL.URLEqual(*url) { + for serviceKey, listener := range l.subscribed { + if serviceURL.ServiceKey() == serviceKey { listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, @@ -111,18 +111,25 @@ func (l *RegistryDataListener) Close() { // RegistryConfigurationListener represent the processor of zookeeper watcher type RegistryConfigurationListener struct { - client *zk.ZookeeperClient - registry *zkRegistry - events chan *config_center.ConfigChangeEvent - isClosed bool - close chan struct{} - closeOnce sync.Once + client *zk.ZookeeperClient + registry *zkRegistry + events chan *config_center.ConfigChangeEvent + isClosed bool + close chan struct{} + closeOnce sync.Once + subscribeURL *common.URL } // NewRegistryConfigurationListener for listening the event of zk. -func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { +func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry, conf *common.URL) *RegistryConfigurationListener { reg.WaitGroup().Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)} + return &RegistryConfigurationListener{ + client: client, + registry: reg, + events: make(chan *config_center.ConfigChangeEvent, 32), + isClosed: false, + close: make(chan struct{}, 1), + subscribeURL: conf} } // Process submit the ConfigChangeEvent to the event chan to notify all observer diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index e68265068bc2f3c60b57fc134e49ec08baef7900..5d5f9e0526b7b8a9c5a2e2524f27f03573d758a8 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -129,12 +129,17 @@ func (r *zkRegistry) InitListeners() { recoverd := r.dataListener.subscribed if recoverd != nil && len(recoverd) > 0 { // recover all subscribed url - for conf, oldListener := range recoverd { - if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok { + for _, oldListener := range recoverd { + var ( + regConfigListener *RegistryConfigurationListener + ok bool + ) + + if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok { regConfigListener.Close() } - newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r)) - go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener) + newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL)) + go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener) } } @@ -231,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen dataListener := r.dataListener dataListener.mutex.Lock() defer dataListener.mutex.Unlock() - if r.dataListener.subscribed[conf] != nil { + if r.dataListener.subscribed[conf.ServiceKey()] != nil { - zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) + zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener) if zkListener != nil { r.listenerLock.Lock() defer r.listenerLock.Unlock() @@ -245,7 +250,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen } } - zkListener = NewRegistryConfigurationListener(r.client, r) + zkListener = NewRegistryConfigurationListener(r.client, r, conf) if r.listener == nil { r.cltLock.Lock() client := r.client @@ -274,7 +279,7 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL var zkListener *RegistryConfigurationListener r.dataListener.mutex.Lock() - configurationListener := r.dataListener.subscribed[conf] + configurationListener := r.dataListener.subscribed[conf.ServiceKey()] if configurationListener != nil { zkListener, _ := configurationListener.(*RegistryConfigurationListener) diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index d4141e8012f8ce1291175bc4b4ed7b1a85e502e2..2e2602e64c008d1944223078d29886e443267246 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -99,7 +99,7 @@ func Test_Subscribe(t *testing.T) { defer ts.Stop() } -func Test_NoSubscribe(t *testing.T) { +func Test_UnSubscribe(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, _ := newMockZkRegistry(®url)