diff --git a/common/rpc_service.go b/common/rpc_service.go
index b138b4f300387a512932ee8869db493c24f4cb27..ebd1d02f843bc339c3a37d977e2138798307475d 100644
--- a/common/rpc_service.go
+++ b/common/rpc_service.go
@@ -274,7 +274,9 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
}
}
delete(svcs, serviceId)
- delete(sm.serviceMap, protocol)
+ if len(sm.serviceMap) == 0 {
+ delete(sm.serviceMap, protocol)
+ }
return nil
}
diff --git a/config/reference_config.go b/config/reference_config.go
index dcdba958058edb7c1f59f49f20a4cb738ea102f8..3710cbc4bc62a01a014e91bcb978742c4a93c5cb 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -188,6 +188,7 @@ func (c *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, c.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(c.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
+
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.CONSUMER)).Role())
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index 404243d4751146d1edc9a61d51cbb81d73c2ffb1..5ffbb5f95dd5830534599c689681f94db4365660 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -74,7 +74,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
c.cacheListener = NewCacheListener(c.rootPath)
err = c.client.Create(c.rootPath)
- c.listener.ListenServiceEvent(c.rootPath, c.cacheListener)
+ c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
return c, err
}
@@ -100,7 +100,7 @@ func newMockZookeeperDynamicConfiguration(url *common.URL, opts ...zookeeper.Opt
c.cacheListener = NewCacheListener(c.rootPath)
err = c.client.Create(c.rootPath)
- go c.listener.ListenServiceEvent(c.rootPath, c.cacheListener)
+ go c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
return tc, c, err
}
diff --git a/registry/base_registry.go b/registry/base_registry.go
index 04694da25a38899cfc6f56a8092a15547f142e9c..3e1bddf233310871182544b6415c10c8df27e622 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -121,6 +121,7 @@ func (r *BaseRegistry) Destroy() {
close(r.done)
// wait waitgroup done (wait listeners outside close over)
r.wg.Wait()
+
//close registry client
r.closeRegisters()
}
@@ -178,7 +179,10 @@ func (r *BaseRegistry) RestartCallBack() bool {
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
- r.facadeBasedRegistry.InitListeners()
+
+ if flag {
+ r.facadeBasedRegistry.InitListeners()
+ }
return flag
}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index a7678ba4e2f38cfeb77f202103e03066a7efdbef..b47b9f372da8948d1add97c7b354503e18e23511 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -45,6 +45,7 @@ import (
var (
regProtocol *registryProtocol
+ once sync.Once
)
type registryProtocol struct {
@@ -346,12 +347,12 @@ func setProviderUrl(regURL *common.URL, providerURL *common.URL) {
regURL.SubURL = providerURL
}
-// GetProtocol ...
+// GetProtocol return the singleton RegistryProtocol
func GetProtocol() protocol.Protocol {
- if regProtocol != nil {
- return regProtocol
- }
- return newRegistryProtocol()
+ once.Do(func() {
+ regProtocol = newRegistryProtocol()
+ })
+ return regProtocol
}
type wrappedInvoker struct {
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index bef1760e04fd6597721fd19b5d19820f45ed2bf0..c5b2f33c6107e82aa172c818c0d8aca1483248c6 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -35,23 +35,28 @@ import (
zk "github.com/apache/dubbo-go/remoting/zookeeper"
)
-// RegistryDataListener ...
+// RegistryDataListener contains all URL information subscribed by zookeeper registry
type RegistryDataListener struct {
- interestedURL []*common.URL
- listener config_center.ConfigurationListener
+ subscribed map[*common.URL]config_center.ConfigurationListener
+ mutex sync.Mutex
+ closed bool
}
-// NewRegistryDataListener ...
-func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener {
- return &RegistryDataListener{listener: listener}
+// NewRegistryDataListener constructs a new RegistryDataListener
+func NewRegistryDataListener() *RegistryDataListener {
+ return &RegistryDataListener{
+ subscribed: make(map[*common.URL]config_center.ConfigurationListener)}
}
-// AddInterestedURL ...
-func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
- l.interestedURL = append(l.interestedURL, url)
+// SubscribeURL is used to set a watch listener for url
+func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) {
+ if l.closed {
+ return
+ }
+ l.subscribed[url] = listener
}
-// DataChange ...
+// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
@@ -65,10 +70,14 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
return false
}
-
- for _, v := range l.interestedURL {
- if serviceURL.URLEqual(*v) {
- l.listener.Process(
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ if l.closed {
+ return false
+ }
+ for url, listener := range l.subscribed {
+ if serviceURL.URLEqual(*url) {
+ listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
@@ -81,38 +90,48 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
return false
}
-// RegistryConfigurationListener ...
+// Close all RegistryConfigurationListener in subscribed
+func (l *RegistryDataListener) Close() {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ for _, listener := range l.subscribed {
+ listener.(*RegistryConfigurationListener).Close()
+ }
+}
+
+// RegistryConfigurationListener represent the processor of zookeeper watcher
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
isClosed bool
+ close chan struct{}
closeOnce sync.Once
}
// NewRegistryConfigurationListener for listening the event of zk.
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.WaitGroup().Add(1)
- return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
+ return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)}
}
-// Process ...
+// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
-// Next ...
+// Next will observe the registry state and events chan
func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.Done():
- logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
- return nil, perrors.New("listener stopped")
-
+ logger.Warnf("listener's zk client connection (address {%s}) is broken, so zk event listener exit now.", l.client.ZkAddrs)
+ return nil, perrors.New("zookeeper client stopped")
+ case <-l.close:
+ return nil, perrors.New("listener have been closed")
case <-l.registry.Done():
- logger.Warnf("zk consumer register has quit, so zk event listener exit now.")
- return nil, perrors.New("listener stopped")
-
+ logger.Warnf("zk consumer register has quit, so zk event listener exit now. (registry url {%v}", l.registry.BaseRegistry.URL)
+ return nil, perrors.New("zookeeper registry, (registry url{%v}) stopped")
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.ConfigType == remoting.EventTypeDel && !l.valid() {
@@ -127,15 +146,17 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
}
}
-// Close ...
+// Close RegistryConfigurationListener only once
func (l *RegistryConfigurationListener) Close() {
// ensure that the listener will be closed at most once.
l.closeOnce.Do(func() {
l.isClosed = true
+ l.close <- struct{}{}
l.registry.WaitGroup().Done()
})
}
+// valid return the true if the client conn isn't nil
func (l *RegistryConfigurationListener) valid() bool {
return l.client.ZkConnValid()
}
diff --git a/registry/zookeeper/listener_test.go b/registry/zookeeper/listener_test.go
index 1a76b29a6f64e0329b289ce50218032a25f6f5cd..a0e9147a9e0ee8767efcf78d5e2aa536140f6a8b 100644
--- a/registry/zookeeper/listener_test.go
+++ b/registry/zookeeper/listener_test.go
@@ -32,15 +32,15 @@ import (
)
func Test_DataChange(t *testing.T) {
- listener := NewRegistryDataListener(&MockDataListener{})
+ listener := NewRegistryDataListener()
url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-1.3.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
- listener.AddInterestedURL(&url)
+ listener.SubscribeURL(&url, &MockConfigurationListener{})
int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-1.3.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"})
assert.Equal(t, true, int)
}
-type MockDataListener struct {
+type MockConfigurationListener struct {
}
-func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {
+func (*MockConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
}
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 4fd58e9e4d9ecd285675fc416f3d7a36bf19fd54..88d5d6221b4bc7136ba4c3e7c95fb53ba35a9a58 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -53,12 +53,11 @@ func init() {
type zkRegistry struct {
registry.BaseRegistry
- client *zookeeper.ZookeeperClient
- listenerLock sync.Mutex
- listener *zookeeper.ZkEventListener
- dataListener *RegistryDataListener
- configListener *RegistryConfigurationListener
- cltLock sync.Mutex
+ client *zookeeper.ZookeeperClient
+ listenerLock sync.Mutex
+ listener *zookeeper.ZkEventListener
+ dataListener *RegistryDataListener
+ cltLock sync.Mutex
//for provider
zkPath map[string]int // key = protocol://ip:port/interface
}
@@ -77,13 +76,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
if err != nil {
return nil, err
}
- r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
- r.configListener = NewRegistryConfigurationListener(r.client, r)
- r.dataListener = NewRegistryDataListener(r.configListener)
+
+ r.dataListener = NewRegistryDataListener()
return r, nil
}
@@ -120,8 +118,27 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust
func (r *zkRegistry) InitListeners() {
r.listener = zookeeper.NewZkEventListener(r.client)
- r.configListener = NewRegistryConfigurationListener(r.client, r)
- r.dataListener = NewRegistryDataListener(r.configListener)
+ newDataListener := NewRegistryDataListener()
+ // should recover if dataListener isn't nil before
+ if r.dataListener != nil {
+ // close all old listener
+ oldDataListener := r.dataListener
+ oldDataListener.mutex.Lock()
+ defer oldDataListener.mutex.Unlock()
+ recoverd := r.dataListener.subscribed
+ if recoverd != nil && len(recoverd) > 0 {
+ // recover all subscribed url
+ for conf, oldListener := range recoverd {
+ if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok {
+ regConfigListener.Close()
+ }
+ newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r))
+ go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener)
+
+ }
+ }
+ }
+ r.dataListener = newDataListener
}
func (r *zkRegistry) CreatePath(path string) error {
@@ -154,8 +171,8 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex {
}
func (r *zkRegistry) CloseListener() {
- if r.configListener != nil {
- r.configListener.Close()
+ if r.dataListener != nil {
+ r.dataListener.Close()
}
}
@@ -172,32 +189,49 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
return perrors.WithStack(err)
}
+
+ // try to register the node
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
- if err == zk.ErrNodeExists {
- logger.Warnf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
- } else {
- logger.Errorf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
+ logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
+ if perrors.Cause(err) == zk.ErrNodeExists {
+ // should delete the old node
+ logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node)
+ if err = r.client.Delete(zkPath); err == nil {
+ _, err = r.client.RegisterTemp(root, node)
+ }
+ if err != nil {
+ logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
+ }
}
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
- logger.Debugf("create a zookeeper node:%s", zkPath)
+ logger.Debugf("Create a zookeeper node:%s", zkPath)
return nil
}
func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
- var (
- zkListener *RegistryConfigurationListener
- )
- r.listenerLock.Lock()
- if r.configListener.isClosed {
- r.listenerLock.Unlock()
- return nil, perrors.New("configListener already been closed")
+ var zkListener *RegistryConfigurationListener
+ dataListener := r.dataListener
+ dataListener.mutex.Lock()
+ defer dataListener.mutex.Unlock()
+ if r.dataListener.subscribed[conf] != nil {
+
+ zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener)
+ if zkListener != nil {
+ r.listenerLock.Lock()
+ defer r.listenerLock.Unlock()
+ if zkListener.isClosed {
+ return nil, perrors.New("configListener already been closed")
+ } else {
+ return zkListener, nil
+ }
+ }
}
- zkListener = r.configListener
- r.listenerLock.Unlock()
+
+ zkListener = NewRegistryConfigurationListener(r.client, r)
if r.listener == nil {
r.cltLock.Lock()
client := r.client
@@ -215,8 +249,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
}
//Interested register to dataconfig.
- r.dataListener.AddInterestedURL(conf)
- go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener)
+ r.dataListener.SubscribeURL(conf, zkListener)
+
+ go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener)
return zkListener, nil
}
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 21486aab59c3f9b44c25b68d7433f864a990149a..c788bc4c1157bc0c8e10fa5bb723cc0cc56f791d 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -118,7 +118,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
for _, opt := range opts {
opt(opions)
}
-
+ connected := false
err = nil
lock := container.ZkClientLock()
@@ -143,6 +143,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
+ connected = true
}
if container.ZkClient().Conn == nil {
@@ -150,10 +151,16 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
if err == nil {
container.ZkClient().Wait.Add(1)
+ connected = true
go container.ZkClient().HandleZkEvent(event)
}
}
+ if connected {
+ logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location)
+ container.WaitGroup().Add(1) //zk client start successful, then registry wg +1
+ }
+
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
@@ -386,6 +393,7 @@ func (z *ZookeeperClient) Close() {
z.Conn = nil
z.Unlock()
if conn != nil {
+ logger.Warnf("zkClient Conn{name:%s, zk addr:%s} exit now.", z.name, conn.SessionID())
conn.Close()
}
@@ -462,7 +470,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
- return "", perrors.WithStack(err)
+ return zkPath, perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go
index 055db4f716a914354d1bada653fbc0a850b615b5..4e3945388ff402f60a02150615a8914f9cba2435 100644
--- a/remoting/zookeeper/facade.go
+++ b/remoting/zookeeper/facade.go
@@ -48,11 +48,11 @@ func HandleClientRestart(r zkClientFacade) {
failTimes int
)
- defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.Done():
+ r.WaitGroup().Done() // dec the wg when registry is closed
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
@@ -63,12 +63,14 @@ LOOP:
zkAddress := r.ZkClient().ZkAddrs
r.SetZkClient(nil)
r.ZkClientLock().Unlock()
+ r.WaitGroup().Done() // dec the wg when zk client is closed
// Connect zk until success.
failTimes = 0
for {
select {
case <-r.Done():
+ r.WaitGroup().Done() // dec the wg when registry is closed
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection zk.
diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go
index a41f6cd3230700332519ce1c2d3489bfcc4b6ef0..01d46da6cc1abae90210a323d32ac84bad80249b 100644
--- a/remoting/zookeeper/facade_test.go
+++ b/remoting/zookeeper/facade_test.go
@@ -38,6 +38,16 @@ type mockFacade struct {
done chan struct{}
}
+func newMockFacade(client *ZookeeperClient, url *common.URL) zkClientFacade {
+ mock := &mockFacade{
+ client: client,
+ URL: url,
+ }
+
+ mock.wg.Add(1)
+ return mock
+}
+
func (r *mockFacade) ZkClient() *ZookeeperClient {
return r.client
}
@@ -80,7 +90,7 @@ func Test_Facade(t *testing.T) {
assert.NoError(t, err)
defer ts.Stop()
url, _ := common.NewURL("mock://127.0.0.1")
- mock := &mockFacade{client: z, URL: &url}
+ mock := newMockFacade(z, &url)
go HandleClientRestart(mock)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index eaf259f4417201c95172e95d7a87476575e004d5..84877667763ce870e76202844e9dc9dc1c3f008c 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
package zookeeper
import (
+ "github.com/apache/dubbo-go/common"
"path"
"strings"
"sync"
@@ -173,7 +174,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
}
-func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
+func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
defer l.wg.Done()
var (
@@ -224,7 +225,16 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
}
failTimes = 0
for _, c := range children {
- // listen l service node
+
+ // Only need to compare Path when subscribing to provider
+ if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) != -1 {
+ provider, _ := common.NewURL(c)
+ if provider.Path != conf.Path {
+ continue
+ }
+ }
+
+ //listen l service node
dubboPath := path.Join(zkPath, c)
//Save the path to avoid listen repeatedly
@@ -232,7 +242,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
_, ok := l.pathMap[dubboPath]
l.pathMapLock.Unlock()
if ok {
- logger.Warnf("@zkPath %s has already been listened.", zkPath)
+ logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}
@@ -263,7 +273,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
- l.listenDirEvent(zkPath, listener)
+ l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
@@ -291,11 +301,11 @@ func timeSecondDuration(sec int) time.Duration {
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
-func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
+func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
logger.Infof("listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
- l.listenDirEvent(zkPath, listener)
+ l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go
index 7301cd52c392b6950b3a49f78e8124eae532b083..ba7d6ba81b6af97dc5ad3788e8399d08cbe5b2bb 100644
--- a/remoting/zookeeper/listener_test.go
+++ b/remoting/zookeeper/listener_test.go
@@ -97,7 +97,7 @@ func TestListener(t *testing.T) {
go client.HandleZkEvent(event)
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
- listener.ListenServiceEvent("/dubbo", dataListener)
+ listener.ListenServiceEvent(nil, "/dubbo", dataListener)
time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)