diff --git a/common/constant/key.go b/common/constant/key.go
index da21a3a9e1254d5a22d670a11c5c01022892e096..e62beffc44c40044691f8285262a47e7d2080578 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -212,9 +212,9 @@ const (
// consumer
CONSUMER = "consumer"
// key of access key id
- ACCESS_KEY_ID_KEY = "accessKeyId"
+ ACCESS_KEY_ID_KEY = ".accessKeyId"
// key of secret access key
- SECRET_ACCESS_KEY_KEY = "secretAccessKey"
+ SECRET_ACCESS_KEY_KEY = ".secretAccessKey"
)
// metadata report
diff --git a/go.sum b/go.sum
index 326b4e68974fe8a851f73d27fbc7e14ee1045c19..9bcb1fe45700401e938c0caa1aca94e65711fe0b 100644
--- a/go.sum
+++ b/go.sum
@@ -35,11 +35,8 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vaj
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
-github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0=
-github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
-github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s=
github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
@@ -53,7 +50,6 @@ github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f h1:/8NcnxL6
github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.15.24 h1:xLAdTA/ore6xdPAljzZRed7IGqQgC+nY+ERS5vaj4Ro=
github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
-github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -389,8 +385,6 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
-github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo=
-github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg=
github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8=
github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s=
diff --git a/registry/base_registry.go b/registry/base_registry.go
index 3e1bddf233310871182544b6415c10c8df27e622..ad1a3b61741e003625612ad58409eb8615271a84 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -56,6 +56,8 @@ func init() {
localIP, _ = gxnet.GetLocalIP()
}
+type createPathFunc func(dubboPath string) error
+
/*
* -----------------------------------NOTICE---------------------------------------------
* If there is no special case, you'd better inherit BaseRegistry and implement the
@@ -74,8 +76,12 @@ type FacadeBasedRegistry interface {
CreatePath(string) error
// DoRegister actually do the register job
DoRegister(string, string) error
+ // DoUnregister do the unregister job
+ DoUnregister(string, string) error
// DoSubscribe actually subscribe the URL
DoSubscribe(conf *common.URL) (Listener, error)
+ // DoUnsubscribe does unsubscribe the URL
+ DoUnsubscribe(conf *common.URL) (Listener, error)
// CloseAndNilClient close the client and then reset the client in registry to nil
// you should notice that this method will be invoked inside a lock.
// So you should implement this method as light weighted as you can.
@@ -94,7 +100,7 @@ type BaseRegistry struct {
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
- cltLock sync.Mutex //ctl lock is a lock for services map
+ cltLock sync.RWMutex //ctl lock is a lock for services map
services map[string]common.URL // service name + protocol -> service config, for store the service registered
}
@@ -154,6 +160,43 @@ func (r *BaseRegistry) Register(conf common.URL) error {
return nil
}
+// UnRegister implement interface registry to unregister
+func (r *BaseRegistry) UnRegister(conf common.URL) error {
+ var (
+ ok bool
+ err error
+ oldURL common.URL
+ )
+
+ func() {
+ r.cltLock.Lock()
+ defer r.cltLock.Unlock()
+ oldURL, ok = r.services[conf.Key()]
+
+ if !ok {
+ err = perrors.Errorf("Path{%s} has not registered", conf.Key())
+ }
+
+ delete(r.services, conf.Key())
+ }()
+
+ if err != nil {
+ return err
+ }
+
+ err = r.unregister(conf)
+ if err != nil {
+ func() {
+ r.cltLock.Lock()
+ defer r.cltLock.Unlock()
+ r.services[conf.Key()] = oldURL
+ }()
+ return perrors.WithMessagef(err, "register(conf:%+v)", conf)
+ }
+
+ return nil
+}
+
// service is for getting service path stored in url
func (r *BaseRegistry) service(c common.URL) string {
return url.QueryEscape(c.Service())
@@ -189,6 +232,18 @@ func (r *BaseRegistry) RestartCallBack() bool {
// register for register url to registry, include init params
func (r *BaseRegistry) register(c common.URL) error {
+ return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath)
+}
+
+// unregister for unregister url to registry, include init params
+func (r *BaseRegistry) unregister(c common.URL) error {
+ return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil)
+}
+
+func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, cpf createPathFunc) error {
+ if f == nil {
+ panic(" Must provide a `function(string, string) error` to process URL. ")
+ }
var (
err error
//revision string
@@ -213,15 +268,15 @@ func (r *BaseRegistry) register(c common.URL) error {
switch role {
case common.PROVIDER:
- dubboPath, rawURL, err = r.providerRegistry(c, params)
+ dubboPath, rawURL, err = r.providerRegistry(c, params, cpf)
case common.CONSUMER:
- dubboPath, rawURL, err = r.consumerRegistry(c, params)
+ dubboPath, rawURL, err = r.consumerRegistry(c, params, cpf)
default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
encodedURL = url.QueryEscape(rawURL)
dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
- err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL)
+ err = f(dubboPath, encodedURL)
if err != nil {
return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL)
@@ -229,8 +284,15 @@ func (r *BaseRegistry) register(c common.URL) error {
return nil
}
+// createPath will create dubbo path in register
+func (r *BaseRegistry) createPath(dubboPath string) error {
+ r.cltLock.Lock()
+ defer r.cltLock.Unlock()
+ return r.facadeBasedRegistry.CreatePath(dubboPath)
+}
+
// providerRegistry for provider role do
-func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) {
+func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) {
var (
dubboPath string
rawURL string
@@ -240,11 +302,9 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string
return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
}
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
- func() {
- r.cltLock.Lock()
- defer r.cltLock.Unlock()
- err = r.facadeBasedRegistry.CreatePath(dubboPath)
- }()
+ if f != nil {
+ err = f(dubboPath)
+ }
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
@@ -274,7 +334,7 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string
}
// consumerRegistry for consumer role do
-func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) {
+func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) {
var (
dubboPath string
rawURL string
@@ -282,23 +342,18 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string
)
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
- func() {
- r.cltLock.Lock()
- defer r.cltLock.Unlock()
- err = r.facadeBasedRegistry.CreatePath(dubboPath)
-
- }()
+ if f != nil {
+ err = f(dubboPath)
+ }
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
return "", "", perrors.WithStack(err)
}
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
- func() {
- r.cltLock.Lock()
- defer r.cltLock.Unlock()
- err = r.facadeBasedRegistry.CreatePath(dubboPath)
- }()
+ if f != nil {
+ err = f(dubboPath)
+ }
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
@@ -323,20 +378,20 @@ func sleepWait(n int) {
}
// Subscribe :subscribe from registry, event will notify by notifyListener
-func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
+func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
n := 0
for {
n++
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
- return
+ return perrors.New("BaseRegistry is not available.")
}
listener, err := r.facadeBasedRegistry.DoSubscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
- return
+ return err
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
@@ -358,6 +413,37 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
+// UnSubscribe URL
+func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error {
+ if !r.IsAvailable() {
+ logger.Warnf("event listener game over.")
+ return perrors.New("BaseRegistry is not available.")
+ }
+
+ listener, err := r.facadeBasedRegistry.DoUnsubscribe(url)
+ if err != nil {
+ if !r.IsAvailable() {
+ logger.Warnf("event listener game over.")
+ return perrors.New("BaseRegistry is not available.")
+ }
+ logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
+ return perrors.WithStack(err)
+ }
+
+ for {
+ if serviceEvent, err := listener.Next(); err != nil {
+ logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
+ listener.Close()
+ break
+ } else {
+ logger.Infof("update begin, service event: %v", serviceEvent.String())
+ notifyListener.Notify(serviceEvent)
+ }
+
+ }
+ return nil
+}
+
// closeRegisters close and remove registry client and reset services map
func (r *BaseRegistry) closeRegisters() {
logger.Infof("begin to close provider client")
diff --git a/registry/consul/registry.go b/registry/consul/registry.go
index c5b8510a6c87068a5b4f1ce52203d401a896a6c2..c9e0718346258b6b38f2a793dc215bcf8e65cdb7 100644
--- a/registry/consul/registry.go
+++ b/registry/consul/registry.go
@@ -95,7 +95,7 @@ func (r *consulRegistry) register(url common.URL) error {
return r.client.Agent().ServiceRegister(service)
}
-func (r *consulRegistry) Unregister(url common.URL) error {
+func (r *consulRegistry) UnRegister(url common.URL) error {
var err error
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
@@ -112,11 +112,17 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url))
}
-func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
+func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER {
r.subscribe(url, notifyListener)
}
+ return nil
+}
+
+// UnSubscribe :
+func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
+ return perrors.New("UnSubscribe not support in consulRegistry")
}
func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) {
diff --git a/registry/consul/registry_test.go b/registry/consul/registry_test.go
index bb6842cd8fb67dd2cc70b1a7530fbb94f618a9b0..94718f5ab657c198882f065a50e5d5a2c9d4bc6f 100644
--- a/registry/consul/registry_test.go
+++ b/registry/consul/registry_test.go
@@ -44,7 +44,7 @@ func (suite *consulRegistryTestSuite) testRegister() {
func (suite *consulRegistryTestSuite) testUnregister() {
consulProviderRegistry, _ := suite.providerRegistry.(*consulRegistry)
- err := consulProviderRegistry.Unregister(suite.providerUrl)
+ err := consulProviderRegistry.UnRegister(suite.providerUrl)
assert.NoError(suite.t, err)
}
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index 5d389c36374fe9de5561418bc90d44a7d780fd48..a65d090349b40d473c769e3130e4f000ee03bd00 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -114,6 +114,10 @@ func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
+func (r *etcdV3Registry) DoUnregister(root string, node string) error {
+ return perrors.New("DoUnregister is not support in etcdV3Registry")
+}
+
func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
r.client = nil
@@ -168,3 +172,7 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error)
return configListener, nil
}
+
+func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
+ return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry")
+}
diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go
index 8a02d0e3e693b58946a97e7b47238e0be4272dcf..7ee0f6b0eeb83181bfd20e1abe4685e8319cd09b 100644
--- a/registry/kubernetes/registry.go
+++ b/registry/kubernetes/registry.go
@@ -107,6 +107,10 @@ func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
+func (r *kubernetesRegistry) DoUnregister(root string, node string) error {
+ return perrors.New("DoUnregister is not support in kubernetesRegistry")
+}
+
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
@@ -139,6 +143,10 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil
}
+func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
+ return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry")
+}
+
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
diff --git a/registry/mock_registry.go b/registry/mock_registry.go
index 9591928eebd22bf2a99ec9dcfeb285c4519a3b90..f39490a26755a96aab1438d965bd8ee6fc75006f 100644
--- a/registry/mock_registry.go
+++ b/registry/mock_registry.go
@@ -51,6 +51,11 @@ func (*MockRegistry) Register(url common.URL) error {
return nil
}
+// UnRegister
+func (r *MockRegistry) UnRegister(conf common.URL) error {
+ return nil
+}
+
// Destroy ...
func (r *MockRegistry) Destroy() {
if r.destroyed.CAS(false, true) {
@@ -72,7 +77,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
}
// Subscribe ...
-func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
+func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
go func() {
for {
if !r.IsAvailable() {
@@ -104,6 +109,12 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
}()
+ return nil
+}
+
+// UnSubscribe :
+func (r *MockRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error {
+ return nil
}
type listener struct {
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index a436b85064829b9f42c9dcc45545e5bf2fd2fefe..c98bbc7843d4317d9f7d74040481052b28c0f493 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -136,23 +136,28 @@ func (nr *nacosRegistry) Register(url common.URL) error {
return nil
}
+// UnRegister
+func (nr *nacosRegistry) UnRegister(conf common.URL) error {
+ return perrors.New("UnRegister is not support in nacosRegistry")
+}
+
func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return NewNacosListener(*conf, nr.namingClient)
}
//subscribe from registry
-func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
+func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
- return
+ return perrors.New("nacosRegistry is not available.")
}
listener, err := nr.subscribe(url)
if err != nil {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
- return
+ return err
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
@@ -164,7 +169,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
- return
+ return err
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
@@ -172,6 +177,12 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}
}
+ return nil
+}
+
+// UnSubscribe :
+func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
+ return perrors.New("UnSubscribe not support in nacosRegistry")
}
func (nr *nacosRegistry) GetUrl() common.URL {
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 52a7dcbfc77fd576ef8d2917ce51cc09f3cd0b97..aa8fbcbe7d6eca682892d4627878fe6bfc3756fe 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -22,9 +22,8 @@ import (
"strings"
"sync"
)
-
import (
- "github.com/dubbogo/gost/container/set"
+ gxset "github.com/dubbogo/gost/container/set"
)
import (
@@ -96,8 +95,24 @@ func getRegistry(regUrl *common.URL) registry.Registry {
func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.URL {
if registryUrl.GetParamBool("simplified", false) {
return providerUrl.CloneWithParams(reserveParams)
+ } else {
+ return filterHideKey(providerUrl)
+ }
+}
+
+// filterHideKey filter the parameters that do not need to be output in url(Starting with .)
+func filterHideKey(url *common.URL) *common.URL {
+
+ //be careful params maps in url is map type
+ cloneURL := url.Clone()
+ removeSet := gxset.NewSet()
+ for k, _ := range cloneURL.GetParams() {
+ if strings.HasPrefix(k, ".") {
+ removeSet.Add(k)
+ }
}
- return providerUrl
+ cloneURL.RemoveParams(removeSet)
+ return cloneURL
}
func (proto *registryProtocol) initConfigurationListeners() {
diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go
index cee2a6a625368f655d1b9bc5fe8cc37031e1aef7..15fd3cacfacad36309e0ad4deb3c7c7441e47e26 100644
--- a/registry/protocol/protocol_test.go
+++ b/registry/protocol/protocol_test.go
@@ -284,3 +284,12 @@ func TestExportWithApplicationConfig(t *testing.T) {
v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
assert.NotNil(t, v2)
}
+
+func TestGetProviderUrlWithHideKey(t *testing.T) {
+ url, _ := common.NewURL("dubbo://127.0.0.1:1111?a=a1&b=b1&.c=c1&.d=d1&e=e1&protocol=registry")
+ providerUrl := getUrlToRegistry(&url, &url)
+ assert.NotContains(t, providerUrl.GetParams(), ".c")
+ assert.NotContains(t, providerUrl.GetParams(), ".d")
+ assert.Contains(t, providerUrl.GetParams(), "a")
+
+}
diff --git a/registry/registry.go b/registry/registry.go
index d673864700e6ba99e8f0283247d53760b85598aa..74e63aa66ebdc674261ce4109b27a067ce769007 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -34,6 +34,12 @@ type Registry interface {
//And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring.
Register(url common.URL) error
+ // UnRegister is required to support the contract:
+ // 1. If it is the persistent stored data of dynamic=false, the registration data can not be found, then the IllegalStateException is thrown, otherwise it is ignored.
+ // 2. Unregister according to the full url match.
+ // url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
+ UnRegister(url common.URL) error
+
//When creating new registry extension,pls select one of the following modes.
//Will remove in dubbogo version v1.1.0
//mode1 : return Listener with Next function which can return subscribe service event from registry
@@ -42,7 +48,14 @@ type Registry interface {
//Will relace mode1 in dubbogo version v1.1.0
//mode2 : callback mode, subscribe with notify(notify listener).
- Subscribe(*common.URL, NotifyListener)
+ Subscribe(*common.URL, NotifyListener) error
+
+ // UnSubscribe is required to support the contract:
+ // 1. If don't subscribe, ignore it directly.
+ // 2. Unsubscribe by full URL match.
+ // url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
+ // listener A listener of the change event, not allowed to be empty
+ UnSubscribe(*common.URL, NotifyListener) error
}
// NotifyListener ...
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index c5b2f33c6107e82aa172c818c0d8aca1483248c6..d0220ddf0dc415ad0d593d0f0eed34cd698b1879 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -56,6 +56,16 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
l.subscribed[url] = listener
}
+// UnSubscribeURL is used to set a watch listener for url
+func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
+ if l.closed {
+ return nil
+ }
+ listener := l.subscribed[url]
+ delete(l.subscribed, url)
+ return listener
+}
+
// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
// Intercept the last bit
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 88d5d6221b4bc7136ba4c3e7c95fb53ba35a9a58..e68265068bc2f3c60b57fc134e49ec08baef7900 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -20,6 +20,7 @@ package zookeeper
import (
"fmt"
"net/url"
+ "path"
"sync"
"time"
)
@@ -149,10 +150,23 @@ func (r *zkRegistry) DoRegister(root string, node string) error {
return r.registerTempZookeeperNode(root, node)
}
+func (r *zkRegistry) DoUnregister(root string, node string) error {
+ r.cltLock.Lock()
+ defer r.cltLock.Unlock()
+ if !r.ZkClient().ZkConnValid() {
+ return perrors.Errorf("zk client is not valid.")
+ }
+ return r.ZkClient().Delete(path.Join(root, node))
+}
+
func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
return r.getListener(conf)
}
+func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
+ return r.getCloseListener(conf)
+}
+
func (r *zkRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
@@ -255,3 +269,37 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
return zkListener, nil
}
+
+func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) {
+
+ var zkListener *RegistryConfigurationListener
+ r.dataListener.mutex.Lock()
+ configurationListener := r.dataListener.subscribed[conf]
+ if configurationListener != nil {
+
+ zkListener, _ := configurationListener.(*RegistryConfigurationListener)
+ if zkListener != nil {
+ if zkListener.isClosed {
+ return nil, perrors.New("configListener already been closed")
+ }
+ }
+ }
+
+ zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener)
+ r.dataListener.mutex.Unlock()
+
+ if r.listener == nil {
+ return nil, perrors.New("listener is null can not close.")
+ }
+
+ //Interested register to dataconfig.
+ r.listenerLock.Lock()
+ listener := r.listener
+ r.listener = nil
+ r.listenerLock.Unlock()
+
+ r.dataListener.Close()
+ listener.Close()
+
+ return zkListener, nil
+}
diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go
index 688deccfbec67771c4071f6307802a16e4e0fc8b..d4141e8012f8ce1291175bc4b4ed7b1a85e502e2 100644
--- a/registry/zookeeper/registry_test.go
+++ b/registry/zookeeper/registry_test.go
@@ -45,6 +45,31 @@ func Test_Register(t *testing.T) {
assert.NoError(t, err)
}
+func Test_UnRegister(t *testing.T) {
+ // register
+ regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
+ url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
+
+ ts, reg, _ := newMockZkRegistry(®url)
+ defer ts.Stop()
+ err := reg.Register(url)
+ children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
+ assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children)
+ assert.NoError(t, err)
+
+ err = reg.UnRegister(url)
+ children, err = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
+ assert.Equal(t, 0, len(children))
+ assert.Error(t, err)
+ assert.True(t, reg.IsAvailable())
+
+ err = reg.Register(url)
+ children, _ = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
+ assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children)
+ assert.NoError(t, err)
+
+}
+
func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
@@ -74,6 +99,39 @@ func Test_Subscribe(t *testing.T) {
defer ts.Stop()
}
+func Test_NoSubscribe(t *testing.T) {
+ regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
+ url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
+ ts, reg, _ := newMockZkRegistry(®url)
+
+ //provider register
+ err := reg.Register(url)
+ assert.NoError(t, err)
+
+ if err != nil {
+ return
+ }
+
+ //consumer register
+ regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
+ _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts))
+
+ reg2.Register(url)
+ listener, _ := reg2.DoSubscribe(&url)
+
+ serviceEvent, _ := listener.Next()
+ assert.NoError(t, err)
+ if err != nil {
+ return
+ }
+ assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
+
+ reg2.DoUnsubscribe(&url)
+ assert.Nil(t, reg2.listener)
+
+ defer ts.Stop()
+}
+
func Test_ConsumerDestory(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 84877667763ce870e76202844e9dc9dc1c3f008c..097106acf6b44d03708362d587b5faa8281edeab 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -314,7 +314,8 @@ func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
-// Close ...
+// Close will let client listen exit
func (l *ZkEventListener) Close() {
+ close(l.client.exit)
l.wg.Wait()
}