Skip to content
Snippets Groups Projects
Commit 8e92ca17 authored by 邹毅贤's avatar 邹毅贤
Browse files

change subscribe service key in registrydatalistener

parent be426b6a
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ import (
// RegistryDataListener contains all URL information subscribed by zookeeper registry
type RegistryDataListener struct {
subscribed map[*common.URL]config_center.ConfigurationListener
subscribed map[string]config_center.ConfigurationListener
mutex sync.Mutex
closed bool
}
......@@ -45,7 +45,7 @@ type RegistryDataListener struct {
// NewRegistryDataListener constructs a new RegistryDataListener
func NewRegistryDataListener() *RegistryDataListener {
return &RegistryDataListener{
subscribed: make(map[*common.URL]config_center.ConfigurationListener)}
subscribed: make(map[string]config_center.ConfigurationListener)}
}
// SubscribeURL is used to set a watch listener for url
......@@ -53,7 +53,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
if l.closed {
return
}
l.subscribed[url] = listener
l.subscribed[url.Key()] = listener
}
// UnSubscribeURL is used to set a watch listener for url
......@@ -61,8 +61,8 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
if l.closed {
return nil
}
listener := l.subscribed[url]
delete(l.subscribed, url)
listener := l.subscribed[url.Key()]
delete(l.subscribed, url.Key())
return listener
}
......@@ -86,7 +86,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
return false
}
for url, listener := range l.subscribed {
if serviceURL.URLEqual(*url) {
if serviceURL.Key() == url {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
......
......@@ -129,12 +129,17 @@ func (r *zkRegistry) InitListeners() {
recoverd := r.dataListener.subscribed
if recoverd != nil && len(recoverd) > 0 {
// recover all subscribed url
for conf, oldListener := range recoverd {
for recoveredURL, oldListener := range recoverd {
if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok {
regConfigListener.Close()
}
newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r))
go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener)
serviceURL, err := common.NewURL(recoveredURL)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", recoveredURL, err)
continue
}
newDataListener.SubscribeURL(&serviceURL, NewRegistryConfigurationListener(r.client, r))
go r.listener.ListenServiceEvent(&serviceURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(serviceURL.Service())), newDataListener)
}
}
......@@ -231,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
dataListener := r.dataListener
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf] != nil {
if r.dataListener.subscribed[conf.Key()] != nil {
zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener)
zkListener, _ := r.dataListener.subscribed[conf.Key()].(*RegistryConfigurationListener)
if zkListener != nil {
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
......@@ -274,7 +279,7 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
var zkListener *RegistryConfigurationListener
r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf]
configurationListener := r.dataListener.subscribed[conf.Key()]
if configurationListener != nil {
zkListener, _ := configurationListener.(*RegistryConfigurationListener)
......
......@@ -99,7 +99,7 @@ func Test_Subscribe(t *testing.T) {
defer ts.Stop()
}
func Test_NoSubscribe(t *testing.T) {
func Test_UnSubscribe(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, _ := newMockZkRegistry(&regurl)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment