diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index bef1760e04fd6597721fd19b5d19820f45ed2bf0..d2adc44d39533d3f93fc4c2667f0213825261e09 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -37,18 +37,19 @@ import ( // RegistryDataListener ... type RegistryDataListener struct { - interestedURL []*common.URL - listener config_center.ConfigurationListener + subscribed map[*common.URL]config_center.ConfigurationListener + listener config_center.ConfigurationListener } // NewRegistryDataListener ... -func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener { - return &RegistryDataListener{listener: listener} +func NewRegistryDataListener() *RegistryDataListener { + return &RegistryDataListener{ + subscribed: make(map[*common.URL]config_center.ConfigurationListener)} } -// AddInterestedURL ... -func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { - l.interestedURL = append(l.interestedURL, url) +// SubscribeURL is used to set a watch listener for url +func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) { + l.subscribed[url] = listener } // DataChange ... @@ -65,10 +66,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path) return false } - - for _, v := range l.interestedURL { - if serviceURL.URLEqual(*v) { - l.listener.Process( + for url, listener := range l.subscribed { + if serviceURL.URLEqual(*url) { + listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, Value: serviceURL, @@ -81,6 +81,12 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { return false } +func (l *RegistryDataListener) Close(){ + for _, listener := range l.subscribed { + listener.(*RegistryConfigurationListener).Close() + } +} + // RegistryConfigurationListener ... type RegistryConfigurationListener struct { client *zk.ZookeeperClient diff --git a/registry/zookeeper/listener_test.go b/registry/zookeeper/listener_test.go index 1a76b29a6f64e0329b289ce50218032a25f6f5cd..a0e9147a9e0ee8767efcf78d5e2aa536140f6a8b 100644 --- a/registry/zookeeper/listener_test.go +++ b/registry/zookeeper/listener_test.go @@ -32,15 +32,15 @@ import ( ) func Test_DataChange(t *testing.T) { - listener := NewRegistryDataListener(&MockDataListener{}) + listener := NewRegistryDataListener() url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-1.3.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") - listener.AddInterestedURL(&url) + listener.SubscribeURL(&url, &MockConfigurationListener{}) int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-1.3.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) assert.Equal(t, true, int) } -type MockDataListener struct { +type MockConfigurationListener struct { } -func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) { +func (*MockConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 4fd58e9e4d9ecd285675fc416f3d7a36bf19fd54..7c776aeedd6715b476735b7e9439394e172a77c2 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -20,6 +20,7 @@ package zookeeper import ( "fmt" "net/url" + "strings" "sync" "time" ) @@ -53,12 +54,11 @@ func init() { type zkRegistry struct { registry.BaseRegistry - client *zookeeper.ZookeeperClient - listenerLock sync.Mutex - listener *zookeeper.ZkEventListener - dataListener *RegistryDataListener - configListener *RegistryConfigurationListener - cltLock sync.Mutex + client *zookeeper.ZookeeperClient + listenerLock sync.Mutex + listener *zookeeper.ZkEventListener + dataListener *RegistryDataListener + cltLock sync.Mutex //for provider zkPath map[string]int // key = protocol://ip:port/interface } @@ -82,8 +82,8 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { go zookeeper.HandleClientRestart(r) r.listener = zookeeper.NewZkEventListener(r.client) - r.configListener = NewRegistryConfigurationListener(r.client, r) - r.dataListener = NewRegistryDataListener(r.configListener) + + r.dataListener = NewRegistryDataListener() return r, nil } @@ -120,8 +120,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) - r.configListener = NewRegistryConfigurationListener(r.client, r) - r.dataListener = NewRegistryDataListener(r.configListener) + r.dataListener = NewRegistryDataListener() } func (r *zkRegistry) CreatePath(path string) error { @@ -154,8 +153,8 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex { } func (r *zkRegistry) CloseListener() { - if r.configListener != nil { - r.configListener.Close() + if r.dataListener != nil { + r.dataListener.Close() } } @@ -187,17 +186,7 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { } func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) { - var ( - zkListener *RegistryConfigurationListener - ) - - r.listenerLock.Lock() - if r.configListener.isClosed { - r.listenerLock.Unlock() - return nil, perrors.New("configListener already been closed") - } - zkListener = r.configListener - r.listenerLock.Unlock() + zkListener := NewRegistryConfigurationListener(r.client, r) if r.listener == nil { r.cltLock.Lock() client := r.client @@ -215,8 +204,10 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen } //Interested register to dataconfig. - r.dataListener.AddInterestedURL(conf) + r.dataListener.SubscribeURL(conf, zkListener) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener) + return zkListener, nil }