diff --git a/registry/base_registry.go b/registry/base_registry.go index 83ceb60e742859e703a69420747ac9cc05b585ef..27feaef91f46f534cde8fc9f3077453e9fd490c7 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -29,7 +29,6 @@ import ( ) import ( - gxset "github.com/dubbogo/gost/container/set" gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) @@ -80,7 +79,7 @@ type FacadeBasedRegistry interface { // DoSubscribe actually subscribe the URL DoSubscribe(conf *common.URL) (Listener, error) // DoUnsubscribe does unsubscribe the URL - DoUnsubscribe(svc *common.URL) error + DoUnsubscribe(conf *common.URL) (Listener, error) // CloseAndNilClient close the client and then reset the client in registry to nil // you should notice that this method will be invoked inside a lock. // So you should implement this method as light weighted as you can. @@ -96,12 +95,11 @@ type BaseRegistry struct { context context.Context facadeBasedRegistry FacadeBasedRegistry *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - wg sync.WaitGroup // wg+done for zk restart - done chan struct{} - cltLock sync.Mutex //ctl lock is a lock for services map - services map[string]common.URL // service name + protocol -> service config, for store the service registered - serviceListener map[string]*gxset.HashSet // service name + protocol -> service listener, for store the service listener registered + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + cltLock sync.Mutex //ctl lock is a lock for services map + services map[string]common.URL // service name + protocol -> service config, for store the service registered } // InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it @@ -368,14 +366,17 @@ func sleepWait(n int) { // Subscribe :subscribe from registry, event will notify by notifyListener func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { - n := 0 + r.processSubscribe(url, notifyListener, r.facadeBasedRegistry.DoSubscribe) +} - r.cltLock.Lock() - set := r.serviceListener[url.Key()] - if set == nil { - r.serviceListener[url.Key()] = gxset.NewSet() - } - r.cltLock.Unlock() +// UnSubscribe : +func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) { + r.processSubscribe(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe) +} + +// process Subscribe or UnSubscribe +func (r *BaseRegistry) processSubscribe(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) { + n := 0 for { n++ if !r.IsAvailable() { @@ -383,7 +384,7 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) return } - listener, err := r.facadeBasedRegistry.DoSubscribe(url) + listener, err := f(url) if err != nil { if !r.IsAvailable() { logger.Warnf("event listener game over.") @@ -401,12 +402,7 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) break } else { logger.Infof("update begin, service event: %v", serviceEvent.String()) - listener := notifyListener - listener.Notify(serviceEvent) - r.cltLock.Lock() - set := r.serviceListener[url.Key()] - set.Add(listener) - r.cltLock.Unlock() + notifyListener.Notify(serviceEvent) } } @@ -414,16 +410,6 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } -// UnSubscribe : -func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) { - ////r.serviceListener[url.Key()] = notifyListener - //for index, configuration := range configurations { - // if configuration == a2 { - // configurations = append(configurations[:index], configurations[index+1:]...) - // } - //} -} - // closeRegisters close and remove registry client and reset services map func (r *BaseRegistry) closeRegisters() { logger.Infof("begin to close provider client") diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 9b4bd4430b33664ddfa75bb4debcb4ff271a7b93..60a2eadd936d5e1e7ac42e9481e4ebadf3d055e2 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -173,7 +173,7 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) return configListener, nil } -func (r *etcdV3Registry) DoUnsubscribe(svc *common.URL) error { +func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { panic("DoUnsubscribe is not support in etcdV3Registry") - return nil + return nil, nil } diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 61d2616b775da1de3436f1716f7eca6fa95947f7..db3e6ee440192985c42e9772ea25bf4e4e7fa107 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -144,9 +144,9 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er return configListener, nil } -func (r *kubernetesRegistry) DoUnsubscribe(svc *common.URL) error { +func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { panic("DoUnsubscribe is not support in kubernetesRegistry") - return nil + return nil, nil } func (r *kubernetesRegistry) InitListeners() { diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index c5b2f33c6107e82aa172c818c0d8aca1483248c6..f428f0f719e5c038e39711d91e5779a8f4f5cf70 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -56,6 +56,18 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen l.subscribed[url] = listener } +// SubscribeURL is used to set a watch listener for url +func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { + if l.closed { + return nil + } + l.mutex.Lock() + listener := l.subscribed[url] + delete(l.subscribed, url) + l.mutex.Unlock() + return listener +} + // DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { // Intercept the last bit diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index b4ec7be9dabc70a96c66f1b886cd56171945c5fc..3811811bf769f91f59433574c2dfc38f17e3dc73 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -158,8 +158,8 @@ func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } -func (r *zkRegistry) DoUnsubscribe(svc *common.URL) error { - return nil +func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return r.getCloseListener(conf) } func (r *zkRegistry) CloseAndNilClient() { @@ -264,3 +264,37 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen return zkListener, nil } + +func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) { + + var zkListener *RegistryConfigurationListener + dataListener := r.dataListener + dataListener.mutex.Lock() + defer dataListener.mutex.Unlock() + if r.dataListener.subscribed[conf] != nil { + + zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) + if zkListener != nil { + r.listenerLock.Lock() + defer r.listenerLock.Unlock() + if zkListener.isClosed { + return nil, perrors.New("configListener already been closed") + } else { + return zkListener, nil + } + } + } + + zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener) + if r.listener == nil { + return nil, perrors.New("listener is null can not close.") + } + + //Interested register to dataconfig. + r.listenerLock.Lock() + r.dataListener.Close() + r.listener.Close() + r.listenerLock.Unlock() + + return zkListener, nil +}