Skip to content
Snippets Groups Projects
Commit 2c60fede authored by 邹毅贤's avatar 邹毅贤
Browse files

change to service key

parent 8e92ca17
No related branches found
No related tags found
No related merge requests found
......@@ -53,7 +53,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
if l.closed {
return
}
l.subscribed[url.Key()] = listener
l.subscribed[url.ServiceKey()] = listener
}
// UnSubscribeURL is used to set a watch listener for url
......@@ -61,8 +61,8 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
if l.closed {
return nil
}
listener := l.subscribed[url.Key()]
delete(l.subscribed, url.Key())
listener := l.subscribed[url.ServiceKey()]
delete(l.subscribed, url.ServiceKey())
return listener
}
......@@ -85,8 +85,8 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if l.closed {
return false
}
for url, listener := range l.subscribed {
if serviceURL.Key() == url {
for serviceKey, listener := range l.subscribed {
if serviceURL.ServiceKey() == serviceKey {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
......@@ -111,18 +111,25 @@ func (l *RegistryDataListener) Close() {
// RegistryConfigurationListener represent the processor of zookeeper watcher
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
isClosed bool
close chan struct{}
closeOnce sync.Once
client *zk.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
isClosed bool
close chan struct{}
closeOnce sync.Once
subscribeURL *common.URL
}
// NewRegistryConfigurationListener for listening the event of zk.
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry, conf *common.URL) *RegistryConfigurationListener {
reg.WaitGroup().Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)}
return &RegistryConfigurationListener{
client: client,
registry: reg,
events: make(chan *config_center.ConfigChangeEvent, 32),
isClosed: false,
close: make(chan struct{}, 1),
subscribeURL: conf}
}
// Process submit the ConfigChangeEvent to the event chan to notify all observer
......
......@@ -129,17 +129,17 @@ func (r *zkRegistry) InitListeners() {
recoverd := r.dataListener.subscribed
if recoverd != nil && len(recoverd) > 0 {
// recover all subscribed url
for recoveredURL, oldListener := range recoverd {
if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok {
for _, oldListener := range recoverd {
var (
regConfigListener *RegistryConfigurationListener
ok bool
)
if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok {
regConfigListener.Close()
}
serviceURL, err := common.NewURL(recoveredURL)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", recoveredURL, err)
continue
}
newDataListener.SubscribeURL(&serviceURL, NewRegistryConfigurationListener(r.client, r))
go r.listener.ListenServiceEvent(&serviceURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(serviceURL.Service())), newDataListener)
newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL))
go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener)
}
}
......@@ -236,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
dataListener := r.dataListener
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf.Key()] != nil {
if r.dataListener.subscribed[conf.ServiceKey()] != nil {
zkListener, _ := r.dataListener.subscribed[conf.Key()].(*RegistryConfigurationListener)
zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener)
if zkListener != nil {
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
......@@ -250,7 +250,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
}
}
zkListener = NewRegistryConfigurationListener(r.client, r)
zkListener = NewRegistryConfigurationListener(r.client, r, conf)
if r.listener == nil {
r.cltLock.Lock()
client := r.client
......@@ -279,7 +279,7 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
var zkListener *RegistryConfigurationListener
r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf.Key()]
configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
if configurationListener != nil {
zkListener, _ := configurationListener.(*RegistryConfigurationListener)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment