diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index d0220ddf0dc415ad0d593d0f0eed34cd698b1879..c6324596e57d8679600e87d69b9db33887c86975 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -37,7 +37,7 @@ import ( // RegistryDataListener contains all URL information subscribed by zookeeper registry type RegistryDataListener struct { - subscribed map[*common.URL]config_center.ConfigurationListener + subscribed map[string]config_center.ConfigurationListener mutex sync.Mutex closed bool } @@ -45,7 +45,7 @@ type RegistryDataListener struct { // NewRegistryDataListener constructs a new RegistryDataListener func NewRegistryDataListener() *RegistryDataListener { return &RegistryDataListener{ - subscribed: make(map[*common.URL]config_center.ConfigurationListener)} + subscribed: make(map[string]config_center.ConfigurationListener)} } // SubscribeURL is used to set a watch listener for url @@ -53,7 +53,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen if l.closed { return } - l.subscribed[url] = listener + l.subscribed[url.Key()] = listener } // UnSubscribeURL is used to set a watch listener for url @@ -61,8 +61,8 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con if l.closed { return nil } - listener := l.subscribed[url] - delete(l.subscribed, url) + listener := l.subscribed[url.Key()] + delete(l.subscribed, url.Key()) return listener } @@ -86,7 +86,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { return false } for url, listener := range l.subscribed { - if serviceURL.URLEqual(*url) { + if serviceURL.Key() == url { listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index e68265068bc2f3c60b57fc134e49ec08baef7900..305b763c3ea09ce45b323ea2c1ee6177c08a874e 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -129,12 +129,17 @@ func (r *zkRegistry) InitListeners() { recoverd := r.dataListener.subscribed if recoverd != nil && len(recoverd) > 0 { // recover all subscribed url - for conf, oldListener := range recoverd { + for recoveredURL, oldListener := range recoverd { if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok { regConfigListener.Close() } - newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r)) - go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener) + serviceURL, err := common.NewURL(recoveredURL) + if err != nil { + logger.Errorf("Listen NewURL(r{%s}) = error{%v}", recoveredURL, err) + continue + } + newDataListener.SubscribeURL(&serviceURL, NewRegistryConfigurationListener(r.client, r)) + go r.listener.ListenServiceEvent(&serviceURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(serviceURL.Service())), newDataListener) } } @@ -231,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen dataListener := r.dataListener dataListener.mutex.Lock() defer dataListener.mutex.Unlock() - if r.dataListener.subscribed[conf] != nil { + if r.dataListener.subscribed[conf.Key()] != nil { - zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) + zkListener, _ := r.dataListener.subscribed[conf.Key()].(*RegistryConfigurationListener) if zkListener != nil { r.listenerLock.Lock() defer r.listenerLock.Unlock() @@ -274,7 +279,7 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL var zkListener *RegistryConfigurationListener r.dataListener.mutex.Lock() - configurationListener := r.dataListener.subscribed[conf] + configurationListener := r.dataListener.subscribed[conf.Key()] if configurationListener != nil { zkListener, _ := configurationListener.(*RegistryConfigurationListener) diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index d4141e8012f8ce1291175bc4b4ed7b1a85e502e2..2e2602e64c008d1944223078d29886e443267246 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -99,7 +99,7 @@ func Test_Subscribe(t *testing.T) { defer ts.Stop() } -func Test_NoSubscribe(t *testing.T) { +func Test_UnSubscribe(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, _ := newMockZkRegistry(®url)