Skip to content
Snippets Groups Projects
Commit d532083b authored by 邹毅贤's avatar 邹毅贤
Browse files

fix review problems

parent 0e3199d1
No related branches found
No related tags found
No related merge requests found
......@@ -161,11 +161,13 @@ func (r *BaseRegistry) Register(conf common.URL) error {
// UnRegister
func (r *BaseRegistry) UnRegister(conf common.URL) error {
var (
ok bool
err error
ok bool
err error
oldURL common.URL
)
r.cltLock.Lock()
_, ok = r.services[conf.Key()]
oldURL, ok = r.services[conf.Key()]
delete(r.services, conf.Key())
r.cltLock.Unlock()
if !ok {
......@@ -174,13 +176,12 @@ func (r *BaseRegistry) UnRegister(conf common.URL) error {
err = r.unregister(conf)
if err != nil {
r.cltLock.Lock()
r.services[conf.Key()] = oldURL
r.cltLock.Unlock()
return perrors.WithMessagef(err, "register(conf:%+v)", conf)
}
r.cltLock.Lock()
delete(r.services, conf.Key())
r.cltLock.Unlock()
return nil
}
......@@ -219,15 +220,15 @@ func (r *BaseRegistry) RestartCallBack() bool {
// register for register url to registry, include init params
func (r *BaseRegistry) register(c common.URL) error {
return r.processURL(c, r.facadeBasedRegistry.DoRegister)
return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath)
}
// unregister for unregister url to registry, include init params
func (r *BaseRegistry) unregister(c common.URL) error {
return r.processURL(c, r.facadeBasedRegistry.DoUnregister)
return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil)
}
func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error) error {
func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, createPathFunc func(dubboPath string) error) error {
if f == nil {
panic(" Must provide a `function(string, string) error` to process URL. ")
}
......@@ -255,9 +256,9 @@ func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error) er
switch role {
case common.PROVIDER:
dubboPath, rawURL, err = r.providerRegistry(c, params)
dubboPath, rawURL, err = r.providerRegistry(c, params, createPathFunc)
case common.CONSUMER:
dubboPath, rawURL, err = r.consumerRegistry(c, params)
dubboPath, rawURL, err = r.consumerRegistry(c, params, createPathFunc)
default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
......@@ -271,8 +272,15 @@ func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error) er
return nil
}
// createPath will create dubbo path in register
func (r *BaseRegistry) createPath(dubboPath string) error {
r.cltLock.Lock()
defer r.cltLock.Unlock()
return r.facadeBasedRegistry.CreatePath(dubboPath)
}
// providerRegistry for provider role do
func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) {
func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, createPathFunc func(dubboPath string) error) (string, string, error) {
var (
dubboPath string
rawURL string
......@@ -282,11 +290,9 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string
return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
}
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
err = r.facadeBasedRegistry.CreatePath(dubboPath)
}()
if createPathFunc != nil {
err = createPathFunc(dubboPath)
}
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
......@@ -316,7 +322,7 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string
}
// consumerRegistry for consumer role do
func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) {
func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, createPathFunc func(dubboPath string) error) (string, string, error) {
var (
dubboPath string
rawURL string
......@@ -324,23 +330,18 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string
)
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
err = r.facadeBasedRegistry.CreatePath(dubboPath)
}()
if createPathFunc != nil {
err = createPathFunc(dubboPath)
}
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
return "", "", perrors.WithStack(err)
}
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
err = r.facadeBasedRegistry.CreatePath(dubboPath)
}()
if createPathFunc != nil {
err = createPathFunc(dubboPath)
}
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
......@@ -366,16 +367,16 @@ func sleepWait(n int) {
// Subscribe :subscribe from registry, event will notify by notifyListener
func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
r.processSubscribe(url, notifyListener, r.facadeBasedRegistry.DoSubscribe)
r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoSubscribe)
}
// UnSubscribe :
// UnSubscribe :UnSubscribeURL
func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) {
r.processSubscribe(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe)
r.processNotify(url, notifyListener, r.facadeBasedRegistry.DoUnsubscribe)
}
// process Subscribe or UnSubscribe
func (r *BaseRegistry) processSubscribe(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) {
// processNotify can process notify listener when Subscribe or UnSubscribe
func (r *BaseRegistry) processNotify(url *common.URL, notifyListener NotifyListener, f func(conf *common.URL) (Listener, error)) {
n := 0
for {
n++
......
......@@ -56,7 +56,7 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
l.subscribed[url] = listener
}
// SubscribeURL is used to set a watch listener for url
// UnSubscribeURL is used to set a watch listener for url
func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
l.mutex.Lock()
defer l.mutex.Unlock()
......
......@@ -151,6 +151,11 @@ func (r *zkRegistry) DoRegister(root string, node string) error {
}
func (r *zkRegistry) DoUnregister(root string, node string) error {
r.cltLock.Lock()
defer r.cltLock.Unlock()
if !r.ZkClient().ZkConnValid() {
return perrors.Errorf("zk client is not valid.")
}
return r.ZkClient().Delete(path.Join(root, node))
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment