diff --git a/registry/base_registry.go b/registry/base_registry.go index 75c0f45d4235f27da3f075f16e1345b79346b9ba..17dfdcbe19e424dad47bf585370bb932645991f6 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -98,7 +98,7 @@ type BaseRegistry struct { birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart done chan struct{} - cltLock sync.Mutex //ctl lock is a lock for services map + cltLock sync.RWMutex //ctl lock is a lock for services map services map[string]common.URL // service name + protocol -> service config, for store the service registered } @@ -138,9 +138,9 @@ func (r *BaseRegistry) Register(conf common.URL) error { ) role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) // Check if the service has been registered - r.cltLock.Lock() + r.cltLock.RLock() _, ok = r.services[conf.Key()] - r.cltLock.Unlock() + r.cltLock.RUnlock() if ok { return perrors.Errorf("Path{%s} has been registered", conf.Key()) } @@ -165,10 +165,17 @@ func (r *BaseRegistry) UnRegister(conf common.URL) error { err error oldURL common.URL ) - r.cltLock.Lock() - oldURL, ok = r.services[conf.Key()] - delete(r.services, conf.Key()) - r.cltLock.Unlock() + func() { + r.cltLock.RLock() + defer r.cltLock.RUnlock() + oldURL, ok = r.services[conf.Key()] + }() + + func() { + r.cltLock.RLock() + defer r.cltLock.Unlock() + delete(r.services, conf.Key()) + }() if !ok { return perrors.Errorf("Path{%s} has not registered", conf.Key()) @@ -176,9 +183,11 @@ func (r *BaseRegistry) UnRegister(conf common.URL) error { err = r.unregister(conf) if err != nil { - r.cltLock.Lock() - r.services[conf.Key()] = oldURL - r.cltLock.Unlock() + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + r.services[conf.Key()] = oldURL + }() return perrors.WithMessagef(err, "register(conf:%+v)", conf) } @@ -366,49 +375,54 @@ func sleepWait(n int) { } // Subscribe :subscribe from registry, event will notify by notifyListener -func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { - r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoSubscribe) +func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { + n := 0 + for { + n++ + err := r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoSubscribe) + if err == nil { + return nil + } + sleepWait(n) + } } // UnSubscribe :UnSubscribeURL -func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) { - r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe) +func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { + return r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe) } // processNotify can process notify listener when Subscribe or UnSubscribe -func (r *BaseRegistry) processNotify(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) { - n := 0 - for { - n++ +func (r *BaseRegistry) processNotify(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) error { + + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return nil + } + + listener, err := f(url) + if err != nil { if !r.IsAvailable() { logger.Warnf("event listener game over.") - return + return nil } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + return perrors.WithStack(err) + } - listener, err := f(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue + for { + if serviceEvent, err := listener.Next(); err != nil { + logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) + listener.Close() + break + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) } - for { - if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - break - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) - } - - } - sleepWait(n) } + return nil } // closeRegisters close and remove registry client and reset services map diff --git a/registry/consul/registry.go b/registry/consul/registry.go index 5a4524c047eddfbe201d4cf767989a0b3a7d073b..c9e0718346258b6b38f2a793dc215bcf8e65cdb7 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -112,16 +112,17 @@ func (r *consulRegistry) unregister(url common.URL) error { return r.client.Agent().ServiceDeregister(buildId(url)) } -func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { +func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) if role == common.CONSUMER { r.subscribe(url, notifyListener) } + return nil } // UnSubscribe : -func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) { - panic(" UnSubscribe not support in consulRegistry ") +func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { + return perrors.New("UnSubscribe not support in consulRegistry") } func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) { diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 60a2eadd936d5e1e7ac42e9481e4ebadf3d055e2..a2722a2e67c22f3e3d9380849a9a256fd2ebbaa0 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -174,6 +174,5 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) } func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { - panic("DoUnsubscribe is not support in etcdV3Registry") - return nil, nil + return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry") } diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index db3e6ee440192985c42e9772ea25bf4e4e7fa107..e64f9acb07b68275edb464bcd983d507658acc2f 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -145,8 +145,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er } func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { - panic("DoUnsubscribe is not support in kubernetesRegistry") - return nil, nil + return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry") } func (r *kubernetesRegistry) InitListeners() { diff --git a/registry/mock_registry.go b/registry/mock_registry.go index e661981d0d9d943470cb8d8aef107c339d82a944..ddf445ac8a713f1c4de84b592f6af584ebfb6ce6 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -78,7 +78,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { } // Subscribe ... -func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { +func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { go func() { for { if !r.IsAvailable() { @@ -110,11 +110,12 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } }() + return nil } // UnSubscribe : -func (r *MockRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) { - +func (r *MockRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { + return nil } type listener struct { diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index e70de75708a11d222fa4df1b621c04f007865e54..6c8749ff381de6d8ce98704b800dbae574f10cae 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -138,8 +138,7 @@ func (nr *nacosRegistry) Register(url common.URL) error { // UnRegister func (nr *nacosRegistry) UnRegister(conf common.URL) error { - panic(" UnRegister not support in nacosRegistry ") - return nil + return perrors.New("UnRegister is not support in nacosRegistry") } func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) { @@ -147,7 +146,7 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) } //subscribe from registry -func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { +func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { for { if !nr.IsAvailable() { logger.Warnf("event listener game over.") @@ -178,11 +177,12 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } } + return nil } // UnSubscribe : -func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) { - panic(" UnSubscribe not support in nacosRegistry ") +func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { + return perrors.New("UnSubscribe not support in nacosRegistry") } func (nr *nacosRegistry) GetUrl() common.URL { diff --git a/registry/registry.go b/registry/registry.go index 740296937fa978a9b4d4da31f7b35b1166d101cb..74e63aa66ebdc674261ce4109b27a067ce769007 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -48,14 +48,14 @@ type Registry interface { //Will relace mode1 in dubbogo version v1.1.0 //mode2 : callback mode, subscribe with notify(notify listener). - Subscribe(*common.URL, NotifyListener) + Subscribe(*common.URL, NotifyListener) error // UnSubscribe is required to support the contract: // 1. If don't subscribe, ignore it directly. // 2. Unsubscribe by full URL match. // url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin // listener A listener of the change event, not allowed to be empty - UnSubscribe(*common.URL, NotifyListener) + UnSubscribe(*common.URL, NotifyListener) error } // NotifyListener ... diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 10e37d412514c46c6788eed8669fe82953f6b315..d0220ddf0dc415ad0d593d0f0eed34cd698b1879 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -58,8 +58,6 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen // UnSubscribeURL is used to set a watch listener for url func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { - l.mutex.Lock() - defer l.mutex.Unlock() if l.closed { return nil } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index a021cbae4daa1ca310ec5c3804bb827b40679858..f51ec671861ecb5fd894f0a7cbbe8883536b491f 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -273,24 +273,24 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) { var zkListener *RegistryConfigurationListener - dataListener := r.dataListener - dataListener.mutex.Lock() - defer dataListener.mutex.Unlock() - if r.dataListener.subscribed[conf] != nil { + r.dataListener.mutex.Lock() + configurationListener := r.dataListener.subscribed[conf] + if configurationListener != nil { - zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) + zkListener, _ := configurationListener.(*RegistryConfigurationListener) if zkListener != nil { r.listenerLock.Lock() - defer r.listenerLock.Unlock() if zkListener.isClosed { + r.listenerLock.Unlock() return nil, perrors.New("configListener already been closed") - } else { - return zkListener, nil } + r.listenerLock.Unlock() } } - zkListener = dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener) + zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener) + r.dataListener.mutex.Unlock() + if r.listener == nil { return nil, perrors.New("listener is null can not close.") } @@ -299,9 +299,8 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL r.listenerLock.Lock() listener := r.listener r.listener = nil - r.listenerLock.Unlock() - dataListener.Close() + r.dataListener.Close() listener.Close() return zkListener, nil diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 9ae0027daf342343a5bd57b1fdae5634a3fd4c9e..b7451f9543518b8f366bb026b4c0b06d7df8474b 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -92,6 +92,39 @@ func Test_Subscribe(t *testing.T) { defer ts.Stop() } +func Test_NoSubscribe(t *testing.T) { + regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + ts, reg, _ := newMockZkRegistry(®url) + + //provider register + err := reg.Register(url) + assert.NoError(t, err) + + if err != nil { + return + } + + //consumer register + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + + reg2.Register(url) + listener, _ := reg2.DoSubscribe(&url) + + serviceEvent, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + return + } + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) + + reg2.DoUnsubscribe(&url) + assert.Nil(t, reg2.listener) + + defer ts.Stop() +} + func Test_ConsumerDestory(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 84877667763ce870e76202844e9dc9dc1c3f008c..6c9d072385bd0a2ac87eabd3ad4ae13663ea22e9 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -316,5 +316,6 @@ func (l *ZkEventListener) valid() bool { // Close ... func (l *ZkEventListener) Close() { + close(l.client.exit) l.wg.Wait() }