Skip to content
Snippets Groups Projects
Commit 6352cd07 authored by Joe Zou's avatar Joe Zou Committed by GitHub
Browse files

Merge pull request #535 from zouyx/feature/addRegistryUnpub

Rft : Change subscribe service key in registrydatalistener
parents bb0177ff 2c60fede
No related branches found
No related tags found
No related merge requests found
...@@ -37,7 +37,7 @@ import ( ...@@ -37,7 +37,7 @@ import (
// RegistryDataListener contains all URL information subscribed by zookeeper registry // RegistryDataListener contains all URL information subscribed by zookeeper registry
type RegistryDataListener struct { type RegistryDataListener struct {
subscribed map[*common.URL]config_center.ConfigurationListener subscribed map[string]config_center.ConfigurationListener
mutex sync.Mutex mutex sync.Mutex
closed bool closed bool
} }
...@@ -45,7 +45,7 @@ type RegistryDataListener struct { ...@@ -45,7 +45,7 @@ type RegistryDataListener struct {
// NewRegistryDataListener constructs a new RegistryDataListener // NewRegistryDataListener constructs a new RegistryDataListener
func NewRegistryDataListener() *RegistryDataListener { func NewRegistryDataListener() *RegistryDataListener {
return &RegistryDataListener{ return &RegistryDataListener{
subscribed: make(map[*common.URL]config_center.ConfigurationListener)} subscribed: make(map[string]config_center.ConfigurationListener)}
} }
// SubscribeURL is used to set a watch listener for url // SubscribeURL is used to set a watch listener for url
...@@ -53,7 +53,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen ...@@ -53,7 +53,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
if l.closed { if l.closed {
return return
} }
l.subscribed[url] = listener l.subscribed[url.ServiceKey()] = listener
} }
// UnSubscribeURL is used to set a watch listener for url // UnSubscribeURL is used to set a watch listener for url
...@@ -61,8 +61,8 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con ...@@ -61,8 +61,8 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
if l.closed { if l.closed {
return nil return nil
} }
listener := l.subscribed[url] listener := l.subscribed[url.ServiceKey()]
delete(l.subscribed, url) delete(l.subscribed, url.ServiceKey())
return listener return listener
} }
...@@ -85,8 +85,8 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { ...@@ -85,8 +85,8 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if l.closed { if l.closed {
return false return false
} }
for url, listener := range l.subscribed { for serviceKey, listener := range l.subscribed {
if serviceURL.URLEqual(*url) { if serviceURL.ServiceKey() == serviceKey {
listener.Process( listener.Process(
&config_center.ConfigChangeEvent{ &config_center.ConfigChangeEvent{
Key: eventType.Path, Key: eventType.Path,
...@@ -111,18 +111,25 @@ func (l *RegistryDataListener) Close() { ...@@ -111,18 +111,25 @@ func (l *RegistryDataListener) Close() {
// RegistryConfigurationListener represent the processor of zookeeper watcher // RegistryConfigurationListener represent the processor of zookeeper watcher
type RegistryConfigurationListener struct { type RegistryConfigurationListener struct {
client *zk.ZookeeperClient client *zk.ZookeeperClient
registry *zkRegistry registry *zkRegistry
events chan *config_center.ConfigChangeEvent events chan *config_center.ConfigChangeEvent
isClosed bool isClosed bool
close chan struct{} close chan struct{}
closeOnce sync.Once closeOnce sync.Once
subscribeURL *common.URL
} }
// NewRegistryConfigurationListener for listening the event of zk. // NewRegistryConfigurationListener for listening the event of zk.
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry, conf *common.URL) *RegistryConfigurationListener {
reg.WaitGroup().Add(1) reg.WaitGroup().Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)} return &RegistryConfigurationListener{
client: client,
registry: reg,
events: make(chan *config_center.ConfigChangeEvent, 32),
isClosed: false,
close: make(chan struct{}, 1),
subscribeURL: conf}
} }
// Process submit the ConfigChangeEvent to the event chan to notify all observer // Process submit the ConfigChangeEvent to the event chan to notify all observer
......
...@@ -129,12 +129,17 @@ func (r *zkRegistry) InitListeners() { ...@@ -129,12 +129,17 @@ func (r *zkRegistry) InitListeners() {
recoverd := r.dataListener.subscribed recoverd := r.dataListener.subscribed
if recoverd != nil && len(recoverd) > 0 { if recoverd != nil && len(recoverd) > 0 {
// recover all subscribed url // recover all subscribed url
for conf, oldListener := range recoverd { for _, oldListener := range recoverd {
if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok { var (
regConfigListener *RegistryConfigurationListener
ok bool
)
if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok {
regConfigListener.Close() regConfigListener.Close()
} }
newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r)) newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL))
go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener) go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener)
} }
} }
...@@ -231,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen ...@@ -231,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
dataListener := r.dataListener dataListener := r.dataListener
dataListener.mutex.Lock() dataListener.mutex.Lock()
defer dataListener.mutex.Unlock() defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf] != nil { if r.dataListener.subscribed[conf.ServiceKey()] != nil {
zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener)
if zkListener != nil { if zkListener != nil {
r.listenerLock.Lock() r.listenerLock.Lock()
defer r.listenerLock.Unlock() defer r.listenerLock.Unlock()
...@@ -245,7 +250,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen ...@@ -245,7 +250,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
} }
} }
zkListener = NewRegistryConfigurationListener(r.client, r) zkListener = NewRegistryConfigurationListener(r.client, r, conf)
if r.listener == nil { if r.listener == nil {
r.cltLock.Lock() r.cltLock.Lock()
client := r.client client := r.client
...@@ -274,7 +279,7 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL ...@@ -274,7 +279,7 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
var zkListener *RegistryConfigurationListener var zkListener *RegistryConfigurationListener
r.dataListener.mutex.Lock() r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf] configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
if configurationListener != nil { if configurationListener != nil {
zkListener, _ := configurationListener.(*RegistryConfigurationListener) zkListener, _ := configurationListener.(*RegistryConfigurationListener)
......
...@@ -99,7 +99,7 @@ func Test_Subscribe(t *testing.T) { ...@@ -99,7 +99,7 @@ func Test_Subscribe(t *testing.T) {
defer ts.Stop() defer ts.Stop()
} }
func Test_NoSubscribe(t *testing.T) { func Test_UnSubscribe(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, _ := newMockZkRegistry(&regurl) ts, reg, _ := newMockZkRegistry(&regurl)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment