Skip to content
Snippets Groups Projects
Commit bb0177ff authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #510 from zouyx/feature/addRegistryUnpub

Ftr : add UnRegister and UnSubscribe for zookeeper Registry
parents 4317860d be426b6a
No related branches found
No related tags found
No related merge requests found
Showing with 324 additions and 46 deletions
......@@ -212,9 +212,9 @@ const (
// consumer
CONSUMER = "consumer"
// key of access key id
ACCESS_KEY_ID_KEY = "accessKeyId"
ACCESS_KEY_ID_KEY = ".accessKeyId"
// key of secret access key
SECRET_ACCESS_KEY_KEY = "secretAccessKey"
SECRET_ACCESS_KEY_KEY = ".secretAccessKey"
)
// metadata report
......
......@@ -35,11 +35,8 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vaj
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s=
github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
......@@ -53,7 +50,6 @@ github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f h1:/8NcnxL6
github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.15.24 h1:xLAdTA/ore6xdPAljzZRed7IGqQgC+nY+ERS5vaj4Ro=
github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
......@@ -389,8 +385,6 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo=
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg=
github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8=
github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s=
......
......@@ -56,6 +56,8 @@ func init() {
localIP, _ = gxnet.GetLocalIP()
}
type createPathFunc func(dubboPath string) error
/*
* -----------------------------------NOTICE---------------------------------------------
* If there is no special case, you'd better inherit BaseRegistry and implement the
......@@ -74,8 +76,12 @@ type FacadeBasedRegistry interface {
CreatePath(string) error
// DoRegister actually do the register job
DoRegister(string, string) error
// DoUnregister do the unregister job
DoUnregister(string, string) error
// DoSubscribe actually subscribe the URL
DoSubscribe(conf *common.URL) (Listener, error)
// DoUnsubscribe does unsubscribe the URL
DoUnsubscribe(conf *common.URL) (Listener, error)
// CloseAndNilClient close the client and then reset the client in registry to nil
// you should notice that this method will be invoked inside a lock.
// So you should implement this method as light weighted as you can.
......@@ -94,7 +100,7 @@ type BaseRegistry struct {
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
cltLock sync.Mutex //ctl lock is a lock for services map
cltLock sync.RWMutex //ctl lock is a lock for services map
services map[string]common.URL // service name + protocol -> service config, for store the service registered
}
......@@ -154,6 +160,43 @@ func (r *BaseRegistry) Register(conf common.URL) error {
return nil
}
// UnRegister implement interface registry to unregister
func (r *BaseRegistry) UnRegister(conf common.URL) error {
var (
ok bool
err error
oldURL common.URL
)
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
oldURL, ok = r.services[conf.Key()]
if !ok {
err = perrors.Errorf("Path{%s} has not registered", conf.Key())
}
delete(r.services, conf.Key())
}()
if err != nil {
return err
}
err = r.unregister(conf)
if err != nil {
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
r.services[conf.Key()] = oldURL
}()
return perrors.WithMessagef(err, "register(conf:%+v)", conf)
}
return nil
}
// service is for getting service path stored in url
func (r *BaseRegistry) service(c common.URL) string {
return url.QueryEscape(c.Service())
......@@ -189,6 +232,18 @@ func (r *BaseRegistry) RestartCallBack() bool {
// register for register url to registry, include init params
func (r *BaseRegistry) register(c common.URL) error {
return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath)
}
// unregister for unregister url to registry, include init params
func (r *BaseRegistry) unregister(c common.URL) error {
return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil)
}
func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, cpf createPathFunc) error {
if f == nil {
panic(" Must provide a `function(string, string) error` to process URL. ")
}
var (
err error
//revision string
......@@ -213,15 +268,15 @@ func (r *BaseRegistry) register(c common.URL) error {
switch role {
case common.PROVIDER:
dubboPath, rawURL, err = r.providerRegistry(c, params)
dubboPath, rawURL, err = r.providerRegistry(c, params, cpf)
case common.CONSUMER:
dubboPath, rawURL, err = r.consumerRegistry(c, params)
dubboPath, rawURL, err = r.consumerRegistry(c, params, cpf)
default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
encodedURL = url.QueryEscape(rawURL)
dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL)
err = f(dubboPath, encodedURL)
if err != nil {
return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL)
......@@ -229,8 +284,15 @@ func (r *BaseRegistry) register(c common.URL) error {
return nil
}
// createPath will create dubbo path in register
func (r *BaseRegistry) createPath(dubboPath string) error {
r.cltLock.Lock()
defer r.cltLock.Unlock()
return r.facadeBasedRegistry.CreatePath(dubboPath)
}
// providerRegistry for provider role do
func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) {
func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) {
var (
dubboPath string
rawURL string
......@@ -240,11 +302,9 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string
return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
}
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
err = r.facadeBasedRegistry.CreatePath(dubboPath)
}()
if f != nil {
err = f(dubboPath)
}
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
......@@ -274,7 +334,7 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string
}
// consumerRegistry for consumer role do
func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) {
func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) {
var (
dubboPath string
rawURL string
......@@ -282,23 +342,18 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string
)
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
err = r.facadeBasedRegistry.CreatePath(dubboPath)
}()
if f != nil {
err = f(dubboPath)
}
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
return "", "", perrors.WithStack(err)
}
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
func() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
err = r.facadeBasedRegistry.CreatePath(dubboPath)
}()
if f != nil {
err = f(dubboPath)
}
if err != nil {
logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
......@@ -323,20 +378,20 @@ func sleepWait(n int) {
}
// Subscribe :subscribe from registry, event will notify by notifyListener
func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
n := 0
for {
n++
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
return perrors.New("BaseRegistry is not available.")
}
listener, err := r.facadeBasedRegistry.DoSubscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
return err
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
......@@ -358,6 +413,37 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
// UnSubscribe URL
func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return perrors.New("BaseRegistry is not available.")
}
listener, err := r.facadeBasedRegistry.DoUnsubscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return perrors.New("BaseRegistry is not available.")
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
return perrors.WithStack(err)
}
for {
if serviceEvent, err := listener.Next(); err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
break
} else {
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
return nil
}
// closeRegisters close and remove registry client and reset services map
func (r *BaseRegistry) closeRegisters() {
logger.Infof("begin to close provider client")
......
......@@ -95,7 +95,7 @@ func (r *consulRegistry) register(url common.URL) error {
return r.client.Agent().ServiceRegister(service)
}
func (r *consulRegistry) Unregister(url common.URL) error {
func (r *consulRegistry) UnRegister(url common.URL) error {
var err error
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
......@@ -112,11 +112,17 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url))
}
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER {
r.subscribe(url, notifyListener)
}
return nil
}
// UnSubscribe :
func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
return perrors.New("UnSubscribe not support in consulRegistry")
}
func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) {
......
......@@ -44,7 +44,7 @@ func (suite *consulRegistryTestSuite) testRegister() {
func (suite *consulRegistryTestSuite) testUnregister() {
consulProviderRegistry, _ := suite.providerRegistry.(*consulRegistry)
err := consulProviderRegistry.Unregister(suite.providerUrl)
err := consulProviderRegistry.UnRegister(suite.providerUrl)
assert.NoError(suite.t, err)
}
......
......@@ -114,6 +114,10 @@ func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
func (r *etcdV3Registry) DoUnregister(root string, node string) error {
return perrors.New("DoUnregister is not support in etcdV3Registry")
}
func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
r.client = nil
......@@ -168,3 +172,7 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error)
return configListener, nil
}
func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry")
}
......@@ -107,6 +107,10 @@ func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
func (r *kubernetesRegistry) DoUnregister(root string, node string) error {
return perrors.New("DoUnregister is not support in kubernetesRegistry")
}
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
......@@ -139,6 +143,10 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil
}
func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry")
}
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
......
......@@ -51,6 +51,11 @@ func (*MockRegistry) Register(url common.URL) error {
return nil
}
// UnRegister
func (r *MockRegistry) UnRegister(conf common.URL) error {
return nil
}
// Destroy ...
func (r *MockRegistry) Destroy() {
if r.destroyed.CAS(false, true) {
......@@ -72,7 +77,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
}
// Subscribe ...
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
go func() {
for {
if !r.IsAvailable() {
......@@ -104,6 +109,12 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
}()
return nil
}
// UnSubscribe :
func (r *MockRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error {
return nil
}
type listener struct {
......
......@@ -136,23 +136,28 @@ func (nr *nacosRegistry) Register(url common.URL) error {
return nil
}
// UnRegister
func (nr *nacosRegistry) UnRegister(conf common.URL) error {
return perrors.New("UnRegister is not support in nacosRegistry")
}
func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return NewNacosListener(*conf, nr.namingClient)
}
//subscribe from registry
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return
return perrors.New("nacosRegistry is not available.")
}
listener, err := nr.subscribe(url)
if err != nil {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return
return err
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
......@@ -164,7 +169,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return
return err
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
......@@ -172,6 +177,12 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}
}
return nil
}
// UnSubscribe :
func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
return perrors.New("UnSubscribe not support in nacosRegistry")
}
func (nr *nacosRegistry) GetUrl() common.URL {
......
......@@ -22,9 +22,8 @@ import (
"strings"
"sync"
)
import (
"github.com/dubbogo/gost/container/set"
gxset "github.com/dubbogo/gost/container/set"
)
import (
......@@ -96,8 +95,24 @@ func getRegistry(regUrl *common.URL) registry.Registry {
func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.URL {
if registryUrl.GetParamBool("simplified", false) {
return providerUrl.CloneWithParams(reserveParams)
} else {
return filterHideKey(providerUrl)
}
}
// filterHideKey filter the parameters that do not need to be output in url(Starting with .)
func filterHideKey(url *common.URL) *common.URL {
//be careful params maps in url is map type
cloneURL := url.Clone()
removeSet := gxset.NewSet()
for k, _ := range cloneURL.GetParams() {
if strings.HasPrefix(k, ".") {
removeSet.Add(k)
}
}
return providerUrl
cloneURL.RemoveParams(removeSet)
return cloneURL
}
func (proto *registryProtocol) initConfigurationListeners() {
......
......@@ -284,3 +284,12 @@ func TestExportWithApplicationConfig(t *testing.T) {
v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl))
assert.NotNil(t, v2)
}
func TestGetProviderUrlWithHideKey(t *testing.T) {
url, _ := common.NewURL("dubbo://127.0.0.1:1111?a=a1&b=b1&.c=c1&.d=d1&e=e1&protocol=registry")
providerUrl := getUrlToRegistry(&url, &url)
assert.NotContains(t, providerUrl.GetParams(), ".c")
assert.NotContains(t, providerUrl.GetParams(), ".d")
assert.Contains(t, providerUrl.GetParams(), "a")
}
......@@ -34,6 +34,12 @@ type Registry interface {
//And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring.
Register(url common.URL) error
// UnRegister is required to support the contract:
// 1. If it is the persistent stored data of dynamic=false, the registration data can not be found, then the IllegalStateException is thrown, otherwise it is ignored.
// 2. Unregister according to the full url match.
// url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
UnRegister(url common.URL) error
//When creating new registry extension,pls select one of the following modes.
//Will remove in dubbogo version v1.1.0
//mode1 : return Listener with Next function which can return subscribe service event from registry
......@@ -42,7 +48,14 @@ type Registry interface {
//Will relace mode1 in dubbogo version v1.1.0
//mode2 : callback mode, subscribe with notify(notify listener).
Subscribe(*common.URL, NotifyListener)
Subscribe(*common.URL, NotifyListener) error
// UnSubscribe is required to support the contract:
// 1. If don't subscribe, ignore it directly.
// 2. Unsubscribe by full URL match.
// url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
// listener A listener of the change event, not allowed to be empty
UnSubscribe(*common.URL, NotifyListener) error
}
// NotifyListener ...
......
......@@ -56,6 +56,16 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
l.subscribed[url] = listener
}
// UnSubscribeURL is used to set a watch listener for url
func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
if l.closed {
return nil
}
listener := l.subscribed[url]
delete(l.subscribed, url)
return listener
}
// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
// Intercept the last bit
......
......@@ -20,6 +20,7 @@ package zookeeper
import (
"fmt"
"net/url"
"path"
"sync"
"time"
)
......@@ -149,10 +150,23 @@ func (r *zkRegistry) DoRegister(root string, node string) error {
return r.registerTempZookeeperNode(root, node)
}
func (r *zkRegistry) DoUnregister(root string, node string) error {
r.cltLock.Lock()
defer r.cltLock.Unlock()
if !r.ZkClient().ZkConnValid() {
return perrors.Errorf("zk client is not valid.")
}
return r.ZkClient().Delete(path.Join(root, node))
}
func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
return r.getListener(conf)
}
func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
return r.getCloseListener(conf)
}
func (r *zkRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
......@@ -255,3 +269,37 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
return zkListener, nil
}
func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) {
var zkListener *RegistryConfigurationListener
r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf]
if configurationListener != nil {
zkListener, _ := configurationListener.(*RegistryConfigurationListener)
if zkListener != nil {
if zkListener.isClosed {
return nil, perrors.New("configListener already been closed")
}
}
}
zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener)
r.dataListener.mutex.Unlock()
if r.listener == nil {
return nil, perrors.New("listener is null can not close.")
}
//Interested register to dataconfig.
r.listenerLock.Lock()
listener := r.listener
r.listener = nil
r.listenerLock.Unlock()
r.dataListener.Close()
listener.Close()
return zkListener, nil
}
......@@ -45,6 +45,31 @@ func Test_Register(t *testing.T) {
assert.NoError(t, err)
}
func Test_UnRegister(t *testing.T) {
// register
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, _ := newMockZkRegistry(&regurl)
defer ts.Stop()
err := reg.Register(url)
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children)
assert.NoError(t, err)
err = reg.UnRegister(url)
children, err = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Equal(t, 0, len(children))
assert.Error(t, err)
assert.True(t, reg.IsAvailable())
err = reg.Register(url)
children, _ = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children)
assert.NoError(t, err)
}
func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
......@@ -74,6 +99,39 @@ func Test_Subscribe(t *testing.T) {
defer ts.Stop()
}
func Test_NoSubscribe(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, _ := newMockZkRegistry(&regurl)
//provider register
err := reg.Register(url)
assert.NoError(t, err)
if err != nil {
return
}
//consumer register
regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
_, reg2, _ := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
reg2.Register(url)
listener, _ := reg2.DoSubscribe(&url)
serviceEvent, _ := listener.Next()
assert.NoError(t, err)
if err != nil {
return
}
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
reg2.DoUnsubscribe(&url)
assert.Nil(t, reg2.listener)
defer ts.Stop()
}
func Test_ConsumerDestory(t *testing.T) {
regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
......
......@@ -314,7 +314,8 @@ func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
// Close ...
// Close will let client listen exit
func (l *ZkEventListener) Close() {
close(l.client.exit)
l.wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment