diff --git a/registry/base_registry.go b/registry/base_registry.go index 27feaef91f46f534cde8fc9f3077453e9fd490c7..75c0f45d4235f27da3f075f16e1345b79346b9ba 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -161,11 +161,13 @@ func (r *BaseRegistry) Register(conf common.URL) error { // UnRegister func (r *BaseRegistry) UnRegister(conf common.URL) error { var ( - ok bool - err error + ok bool + err error + oldURL common.URL ) r.cltLock.Lock() - _, ok = r.services[conf.Key()] + oldURL, ok = r.services[conf.Key()] + delete(r.services, conf.Key()) r.cltLock.Unlock() if !ok { @@ -174,13 +176,12 @@ func (r *BaseRegistry) UnRegister(conf common.URL) error { err = r.unregister(conf) if err != nil { + r.cltLock.Lock() + r.services[conf.Key()] = oldURL + r.cltLock.Unlock() return perrors.WithMessagef(err, "register(conf:%+v)", conf) } - r.cltLock.Lock() - delete(r.services, conf.Key()) - r.cltLock.Unlock() - return nil } @@ -219,15 +220,15 @@ func (r *BaseRegistry) RestartCallBack() bool { // register for register url to registry, include init params func (r *BaseRegistry) register(c common.URL) error { - return r.processURL(c, r.facadeBasedRegistry.DoRegister) + return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath) } // unregister for unregister url to registry, include init params func (r *BaseRegistry) unregister(c common.URL) error { - return r.processURL(c, r.facadeBasedRegistry.DoUnregister) + return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil) } -func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error) error { +func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, createPathFunc func(dubboPath string) error) error { if f == nil { panic(" Must provide a `function(string, string) error` to process URL. ") } @@ -255,9 +256,9 @@ func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error) er switch role { case common.PROVIDER: - dubboPath, rawURL, err = r.providerRegistry(c, params) + dubboPath, rawURL, err = r.providerRegistry(c, params, createPathFunc) case common.CONSUMER: - dubboPath, rawURL, err = r.consumerRegistry(c, params) + dubboPath, rawURL, err = r.consumerRegistry(c, params, createPathFunc) default: return perrors.Errorf("@c{%v} type is not referencer or provider", c) } @@ -271,8 +272,15 @@ func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error) er return nil } +// createPath will create dubbo path in register +func (r *BaseRegistry) createPath(dubboPath string) error { + r.cltLock.Lock() + defer r.cltLock.Unlock() + return r.facadeBasedRegistry.CreatePath(dubboPath) +} + // providerRegistry for provider role do -func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) { +func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, createPathFunc func(dubboPath string) error) (string, string, error) { var ( dubboPath string rawURL string @@ -282,11 +290,9 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - }() + if createPathFunc != nil { + err = createPathFunc(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath) @@ -316,7 +322,7 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string } // consumerRegistry for consumer role do -func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) { +func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, createPathFunc func(dubboPath string) error) (string, string, error) { var ( dubboPath string rawURL string @@ -324,23 +330,18 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string ) dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - - }() + if createPathFunc != nil { + err = createPathFunc(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithStack(err) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - }() + if createPathFunc != nil { + err = createPathFunc(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) @@ -366,16 +367,16 @@ func sleepWait(n int) { // Subscribe :subscribe from registry, event will notify by notifyListener func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { - r.processSubscribe(url, notifyListener, r.facadeBasedRegistry.DoSubscribe) + r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoSubscribe) } -// UnSubscribe : +// UnSubscribe :UnSubscribeURL func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) { - r.processSubscribe(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe) + r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe) } -// process Subscribe or UnSubscribe -func (r *BaseRegistry) processSubscribe(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) { +// processNotify can process notify listener when Subscribe or UnSubscribe +func (r *BaseRegistry) processNotify(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) { n := 0 for { n++ diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 61a69c587fc3b93f927ef80a30d750c8b8347658..10e37d412514c46c6788eed8669fe82953f6b315 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -56,7 +56,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen l.subscribed[url] = listener } -// SubscribeURL is used to set a watch listener for url +// UnSubscribeURL is used to set a watch listener for url func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { l.mutex.Lock() defer l.mutex.Unlock() diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 06197920bb2182d80a6f2a7daf703c24718e0b33..a021cbae4daa1ca310ec5c3804bb827b40679858 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -151,6 +151,11 @@ func (r *zkRegistry) DoRegister(root string, node string) error { } func (r *zkRegistry) DoUnregister(root string, node string) error { + r.cltLock.Lock() + defer r.cltLock.Unlock() + if !r.ZkClient().ZkConnValid() { + return perrors.Errorf("zk client is not valid.") + } return r.ZkClient().Delete(path.Join(root, node)) }