From 6f2a2edc202279fd6c6397d1aad2d3112e29385d Mon Sep 17 00:00:00 2001 From: "vito.he" <hxmhlt@163.com> Date: Mon, 3 Feb 2020 18:05:25 +0800 Subject: [PATCH] Mod:refactor to add base_registry --- config_center/zookeeper/impl.go | 2 +- registry/base_registry.go | 358 ++++++++++++++++++++++++++++ registry/etcdv3/listener.go | 8 +- registry/etcdv3/registry.go | 264 ++------------------ registry/etcdv3/registry_test.go | 4 +- registry/zookeeper/listener.go | 6 +- registry/zookeeper/registry.go | 346 +++------------------------ registry/zookeeper/registry_test.go | 4 +- remoting/etcdv3/facade.go | 6 +- remoting/zookeeper/facade.go | 6 +- remoting/zookeeper/facade_test.go | 2 +- 11 files changed, 423 insertions(+), 583 deletions(-) create mode 100644 registry/base_registry.go diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 6842d9e37..a20f5496d 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -175,7 +175,7 @@ func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { return &c.wg } -func (c *zookeeperDynamicConfiguration) GetDone() chan struct{} { +func (c *zookeeperDynamicConfiguration) Done() chan struct{} { return c.done } diff --git a/registry/base_registry.go b/registry/base_registry.go new file mode 100644 index 000000000..33eba5e8e --- /dev/null +++ b/registry/base_registry.go @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "context" + "fmt" + "net/url" + "os" + "strconv" + "strings" + "sync" + "time" +) + +import ( + gxnet "github.com/dubbogo/gost/net" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +const ( + // RegistryConnDelay connection delay + RegistryConnDelay = 3 + // MaxWaitInterval max wait interval + MaxWaitInterval = 3 * time.Second +) + +var ( + processID = "" + localIP = "" +) + +func init() { + processID = fmt.Sprintf("%d", os.Getpid()) + localIP, _ = gxnet.GetLocalIP() +} + +type FacadeBasedRegistry interface { + Registry + CreatePath(string) error + DoRegister(string, string) error + DoSubscribe(conf *common.URL) (Listener, error) + CloseAndNilClient() + CloseListener() + InitListeners() +} + +type BaseRegistry struct { + context context.Context + facadeBasedRegistry FacadeBasedRegistry + *common.URL + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + + cltLock sync.Mutex + services map[string]common.URL // service name + protocol -> service config +} + +func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry { + r.URL = url + r.birth = time.Now().UnixNano() + r.done = make(chan struct{}) + r.services = make(map[string]common.URL) + r.facadeBasedRegistry = facadeRegistry + r.wg.Add(1) + return r +} + +func (r *BaseRegistry) GetUrl() common.URL { + return *r.URL +} +func (r *BaseRegistry) Destroy() { + r.facadeBasedRegistry.CloseListener() + close(r.done) + r.wg.Wait() + r.closeRegisters() +} + +func (r *BaseRegistry) Register(conf common.URL) error { + var ( + ok bool + err error + ) + role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) + switch role { + case common.CONSUMER: + r.cltLock.Lock() + _, ok = r.services[conf.Key()] + r.cltLock.Unlock() + if ok { + return perrors.Errorf("Path{%s} has been registered", conf.Path) + } + + err = r.register(conf) + if err != nil { + return perrors.WithStack(err) + } + + r.cltLock.Lock() + r.services[conf.Key()] = conf + r.cltLock.Unlock() + logger.Debugf("(consumerRegistry)Register(conf{%#v})", conf) + + case common.PROVIDER: + + // Check if the service has been registered + r.cltLock.Lock() + // Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path). + // Because the consumer wants to provide monitoring functions for the selector, + // the provider allows multiple groups or versions of the same service to be registered. + _, ok = r.services[conf.Key()] + r.cltLock.Unlock() + if ok { + return perrors.Errorf("Path{%s} has been registered", conf.Key()) + } + + err = r.register(conf) + if err != nil { + return perrors.WithMessagef(err, "register(conf:%+v)", conf) + } + + r.cltLock.Lock() + r.services[conf.Key()] = conf + r.cltLock.Unlock() + + logger.Debugf("(ProviderRegistry)Register(conf{%#v})", conf) + } + + return nil +} + +func (r *BaseRegistry) service(c common.URL) string { + return url.QueryEscape(c.Service()) +} + +func (r *BaseRegistry) RestartCallBack() bool { + + // copy r.services + services := []common.URL{} + for _, confIf := range r.services { + services = append(services, confIf) + } + + flag := true + for _, confIf := range services { + err := r.register(confIf) + if err != nil { + logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf, perrors.WithStack(err)) + flag = false + break + } + logger.Infof("success to re-register service :%v", confIf.Key()) + } + r.facadeBasedRegistry.InitListeners() + + return flag +} + +func (r *BaseRegistry) register(c common.URL) error { + var ( + err error + //revision string + params url.Values + rawURL string + encodedURL string + dubboPath string + //conf config.URL + ) + params = url.Values{} + + c.RangeParams(func(key, value string) bool { + params.Add(key, value) + return true + }) + + params.Add("pid", processID) + params.Add("ip", localIP) + //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6)) + + role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) + switch role { + + case common.PROVIDER: + + if c.Path == "" || len(c.Methods) == 0 { + return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) + r.cltLock.Lock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) + return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath) + } + params.Add("anyhost", "true") + + // Dubbo java consumer to start looking for the provider url,because the category does not match, + // the provider will not find, causing the consumer can not start, so we use consumers. + // DubboRole = [...]string{"consumer", "", "", "provider"} + // params.Add("category", (RoleType(PROVIDER)).Role()) + params.Add("category", (common.RoleType(common.PROVIDER)).String()) + params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) + + params.Add("side", (common.RoleType(common.PROVIDER)).Role()) + + if len(c.Methods) == 0 { + params.Add("methods", strings.Join(c.Methods, ",")) + } + logger.Debugf("provider zk url params:%#v", params) + var host string + if c.Ip == "" { + host = localIP + ":" + c.Port + } else { + host = c.Ip + ":" + c.Port + } + + rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode()) + encodedURL = url.QueryEscape(rawURL) + + // Print your own registration service providers. + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String()) + logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) + + case common.CONSUMER: + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) + r.cltLock.Lock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) + return perrors.WithStack(err) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) + r.cltLock.Lock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) + return perrors.WithStack(err) + } + + params.Add("protocol", c.Protocol) + + params.Add("category", (common.RoleType(common.CONSUMER)).String()) + params.Add("dubbo", "dubbogo-consumer-"+constant.Version) + + rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) + encodedURL = url.QueryEscape(rawURL) + + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String()) + logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) + + default: + return perrors.Errorf("@c{%v} type is not referencer or provider", c) + } + + dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") + err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL) + + if err != nil { + return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL) + } + return nil +} + +func sleepWait(n int) { + wait := time.Duration((n + 1) * 2e8) + if wait > MaxWaitInterval { + wait = MaxWaitInterval + } + time.Sleep(wait) +} + +//subscribe from registry +func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { + n := 0 + for { + n++ + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return + } + + listener, err := r.facadeBasedRegistry.DoSubscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return + } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + continue + } + + for { + if serviceEvent, err := listener.Next(); err != nil { + logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) + listener.Close() + break + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) + } + + } + sleepWait(n) + } +} + +func (r *BaseRegistry) closeRegisters() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + logger.Infof("begin to close provider client") + // Close the old client first to close the tmp node. + r.facadeBasedRegistry.CloseAndNilClient() + r.services = nil +} + +func (r *BaseRegistry) IsAvailable() bool { + select { + case <-r.done: + return false + default: + return true + } +} + +func (r *BaseRegistry) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *BaseRegistry) Done() chan struct{} { + return r.done +} diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 30e0cec67..de769e8ad 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -81,7 +81,7 @@ type configurationListener struct { // NewConfigurationListener ... func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { // add a new waiter - reg.wg.Add(1) + reg.WaitGroup().Add(1) return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { @@ -91,7 +91,7 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { - case <-l.registry.done: + case <-l.registry.Done(): logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") return nil, perrors.New("listener stopped") @@ -99,7 +99,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { logger.Infof("got etcd event %#v", e) if e.ConfigType == remoting.EventTypeDel { select { - case <-l.registry.done: + case <-l.registry.Done(): logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) default: } @@ -110,5 +110,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } func (l *configurationListener) Close() { - l.registry.wg.Done() + l.registry.WaitGroup().Done() } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 032057928..f66abe3ef 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -19,17 +19,13 @@ package etcdv3 import ( "fmt" - "net/url" - "os" "path" - "strconv" "strings" "sync" "time" ) import ( - gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) @@ -42,39 +38,23 @@ import ( "github.com/apache/dubbo-go/remoting/etcdv3" ) -var ( - processID = "" - localIP = "" -) - const ( // Name module name Name = "etcdv3" - // RegistryConnDelay ... - RegistryConnDelay = 3 ) func init() { - processID = fmt.Sprintf("%d", os.Getpid()) - localIP, _ = gxnet.GetLocalIP() extension.SetRegistry(Name, newETCDV3Registry) } type etcdV3Registry struct { - *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - - cltLock sync.Mutex - client *etcdv3.Client - services map[string]common.URL // service name + protocol -> service config - + registry.BaseRegistry + cltLock sync.Mutex + client *etcdv3.Client listenerLock sync.Mutex listener *etcdv3.EventListener dataListener *dataListener configListener *configurationListener - - wg sync.WaitGroup // wg+done for etcd client restart - done chan struct{} } // Client get the etcdv3 client @@ -92,38 +72,6 @@ func (r *etcdV3Registry) ClientLock() *sync.Mutex { return &r.cltLock } -//WaitGroup return the wait group handle -func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup { - return &r.wg -} - -// GetDone return the done channel -func (r *etcdV3Registry) GetDone() chan struct{} { - return r.done -} - -//RestartCallBack restart callback -func (r *etcdV3Registry) RestartCallBack() bool { - - services := []common.URL{} - for _, confIf := range r.services { - services = append(services, confIf) - } - - flag := true - for _, confIf := range services { - err := r.Register(confIf) - if err != nil { - logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) - flag = false - break - } - logger.Infof("success to re-register service :%v", confIf.Key()) - } - return flag -} - func newETCDV3Registry(url *common.URL) (registry.Registry, error) { timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) @@ -135,12 +83,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String()) - r := &etcdV3Registry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - } + r := &etcdV3Registry{} + + r.InitBaseRegistry(url, r) if err := etcdv3.ValidateClient( r, @@ -151,92 +96,35 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { return nil, err } - r.wg.Add(1) go etcdv3.HandleClientRestart(r) - r.listener = etcdv3.NewEventListener(r.client) - r.configListener = NewConfigurationListener(r) - r.dataListener = NewRegistryDataListener(r.configListener) + r.InitListeners() return r, nil } -// GetUrl get registry url -func (r *etcdV3Registry) GetUrl() common.URL { - return *r.URL -} - -// IsAvailable check the register client is available -func (r *etcdV3Registry) IsAvailable() bool { - - select { - case <-r.done: - return false - default: - return true - } +func (r *etcdV3Registry) InitListeners() { + r.listener = etcdv3.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) } -// Destroy destroy client -func (r *etcdV3Registry) Destroy() { - - if r.configListener != nil { - r.configListener.Close() - } - r.stop() +func (r *etcdV3Registry) DoRegister(root string, node string) error { + return r.client.Create(path.Join(root, node), "") } -func (r *etcdV3Registry) stop() { - - close(r.done) - - // close current client +func (r *etcdV3Registry) CloseAndNilClient() { r.client.Close() - - r.cltLock.Lock() r.client = nil - r.services = nil - r.cltLock.Unlock() } -// Register ... -func (r *etcdV3Registry) Register(svc common.URL) error { - - role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - if err != nil { - return perrors.WithMessage(err, "get registry role") - } - - r.cltLock.Lock() - if _, ok := r.services[svc.Key()]; ok { - r.cltLock.Unlock() - return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path)) - } - r.cltLock.Unlock() - - switch role { - case common.PROVIDER: - logger.Debugf("(provider register )Register(conf{%#v})", svc) - if err := r.registerProvider(svc); err != nil { - return perrors.WithMessage(err, "register provider") - } - case common.CONSUMER: - logger.Debugf("(consumer register )Register(conf{%#v})", svc) - if err := r.registerConsumer(svc); err != nil { - return perrors.WithMessage(err, "register consumer") - } - default: - return perrors.New(fmt.Sprintf("unknown role %d", role)) +func (r *etcdV3Registry) CloseListener() { + if r.configListener != nil { + r.configListener.Close() } - - r.cltLock.Lock() - r.services[svc.Key()] = svc - r.cltLock.Unlock() - return nil } -func (r *etcdV3Registry) createDirIfNotExist(k string) error { - +func (r *etcdV3Registry) CreatePath(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) @@ -248,89 +136,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error { return nil } -func (r *etcdV3Registry) registerConsumer(svc common.URL) error { - - consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER]) - if err := r.createDirIfNotExist(consumersNode); err != nil { - logger.Errorf("etcd client create path %s: %v", consumersNode, err) - return perrors.WithMessage(err, "etcd create consumer nodes") - } - providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) - if err := r.createDirIfNotExist(providersNode); err != nil { - return perrors.WithMessage(err, "create provider node") - } - - params := url.Values{} - - params.Add("protocol", svc.Protocol) - - params.Add("category", (common.RoleType(common.CONSUMER)).String()) - params.Add("dubbo", "dubbogo-consumer-"+constant.Version) - - encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) - dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) - if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) - } - - return nil -} - -func (r *etcdV3Registry) registerProvider(svc common.URL) error { - - if len(svc.Path) == 0 || len(svc.Methods) == 0 { - return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods)) - } - - var ( - urlPath string - encodedURL string - dubboPath string - ) - - providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) - if err := r.createDirIfNotExist(providersNode); err != nil { - return perrors.WithMessage(err, "create provider node") - } - - params := url.Values{} - - svc.RangeParams(func(key, value string) bool { - params[key] = []string{value} - return true - }) - params.Add("pid", processID) - params.Add("ip", localIP) - params.Add("anyhost", "true") - params.Add("category", (common.RoleType(common.PROVIDER)).String()) - params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) - params.Add("side", (common.RoleType(common.PROVIDER)).Role()) - - if len(svc.Methods) == 0 { - params.Add("methods", strings.Join(svc.Methods, ",")) - } - - logger.Debugf("provider url params:%#v", params) - var host string - if len(svc.Ip) == 0 { - host = localIP + ":" + svc.Port - } else { - host = svc.Ip + ":" + svc.Port - } - - urlPath = svc.Path - - encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode())) - dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) - - if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) - } - - return nil -} - -func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { +func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( configListener *configurationListener @@ -363,35 +169,3 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { return configListener, nil } - -//Subscribe from registry -func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { - for { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - - listener, err := r.subscribe(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } - - for { - serviceEvent, err := listener.Next() - if err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - return - } - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) - } - } -} diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index ab997b291..87cf24016 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -87,7 +87,7 @@ func (suite *RegistryTestSuite) TestSubscribe() { err = reg2.Register(url) assert.NoError(t, err) - listener, err := reg2.subscribe(&url) + listener, err := reg2.DoSubscribe(&url) if err != nil { t.Fatal(err) } @@ -105,7 +105,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() { url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) reg := initRegistry(t) - _, err := reg.subscribe(&url) + _, err := reg.DoSubscribe(&url) if err != nil { t.Fatal(err) } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 010508775..6dbf588ca 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -87,7 +87,7 @@ type RegistryConfigurationListener struct { // NewRegistryConfigurationListener ... func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { - reg.wg.Add(1) + reg.WaitGroup().Add(1) return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } @@ -104,7 +104,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.") return nil, perrors.New("listener stopped") - case <-l.registry.done: + case <-l.registry.Done(): logger.Warnf("zk consumer register has quit, so zk event listener exit now.") return nil, perrors.New("listener stopped") @@ -127,7 +127,7 @@ func (l *RegistryConfigurationListener) Close() { // ensure that the listener will be closed at most once. l.closeOnce.Do(func() { l.isClosed = true - l.registry.wg.Done() + l.registry.WaitGroup().Done() }) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index bc815c3ae..30b3ab29b 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -18,18 +18,14 @@ package zookeeper import ( - "context" "fmt" "net/url" - "os" - "strconv" "strings" "sync" "time" ) import ( - gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "github.com/samuel/go-zookeeper/zk" ) @@ -46,21 +42,9 @@ import ( const ( // RegistryZkClient zk client name RegistryZkClient = "zk registry" - // RegistryConnDelay connection delay - RegistryConnDelay = 3 - // MaxWaitInterval max wait interval - MaxWaitInterval = 3 * time.Second -) - -var ( - processID = "" - localIP = "" ) func init() { - processID = fmt.Sprintf("%d", os.Getpid()) - localIP, _ = gxnet.GetLocalIP() - //plugins.PluggableRegistries["zookeeper"] = newZkRegistry extension.SetRegistry("zookeeper", newZkRegistry) } @@ -69,20 +53,13 @@ func init() { ///////////////////////////////////// type zkRegistry struct { - context context.Context - *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - wg sync.WaitGroup // wg+done for zk restart - done chan struct{} - - cltLock sync.Mutex - client *zookeeper.ZookeeperClient - services map[string]common.URL // service name + protocol -> service config - + registry.BaseRegistry + client *zookeeper.ZookeeperClient listenerLock sync.Mutex listener *zookeeper.ZkEventListener dataListener *RegistryDataListener configListener *RegistryConfigurationListener + cltLock sync.Mutex //for provider zkPath map[string]int // key = protocol://ip:port/interface } @@ -92,21 +69,16 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { err error r *zkRegistry ) - r = &zkRegistry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - zkPath: make(map[string]int), + zkPath: make(map[string]int), } + r.InitBaseRegistry(url, r) err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient)) if err != nil { return nil, err } - r.wg.Add(1) go zookeeper.HandleClientRestart(r) r.listener = zookeeper.NewZkEventListener(r.client) @@ -133,27 +105,40 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust ) r = &zkRegistry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - zkPath: make(map[string]int), + zkPath: make(map[string]int), } - + r.InitBaseRegistry(url, r) c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...) if err != nil { return nil, nil, err } - r.wg.Add(1) go zookeeper.HandleClientRestart(r) + r.InitListeners() + return c, r, nil +} +func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) r.configListener = NewRegistryConfigurationListener(r.client, r) r.dataListener = NewRegistryDataListener(r.configListener) +} - return c, r, nil +func (r *zkRegistry) CreatePath(path string) error { + return r.ZkClient().Create(path) +} + +func (r *zkRegistry) DoRegister(root string, node string) error { + return r.registerTempZookeeperNode(root, node) +} + +func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { + return r.getListener(conf) } +func (r *zkRegistry) CloseAndNilClient() { + r.client.Close() + r.client = nil +} func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { return r.client } @@ -166,221 +151,10 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex { return &r.cltLock } -func (r *zkRegistry) WaitGroup() *sync.WaitGroup { - return &r.wg -} - -func (r *zkRegistry) GetDone() chan struct{} { - return r.done -} - -func (r *zkRegistry) GetUrl() common.URL { - return *r.URL -} - -func (r *zkRegistry) Destroy() { +func (r *zkRegistry) CloseListener() { if r.configListener != nil { r.configListener.Close() } - close(r.done) - r.wg.Wait() - r.closeRegisters() -} - -func (r *zkRegistry) RestartCallBack() bool { - - // copy r.services - services := []common.URL{} - for _, confIf := range r.services { - services = append(services, confIf) - } - - flag := true - for _, confIf := range services { - err := r.register(confIf) - if err != nil { - logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) - flag = false - break - } - logger.Infof("success to re-register service :%v", confIf.Key()) - } - r.listener = zookeeper.NewZkEventListener(r.client) - r.configListener = NewRegistryConfigurationListener(r.client, r) - r.dataListener = NewRegistryDataListener(r.configListener) - - return flag -} - -func (r *zkRegistry) Register(conf common.URL) error { - var ( - ok bool - err error - ) - role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - switch role { - case common.CONSUMER: - r.cltLock.Lock() - _, ok = r.services[conf.Key()] - r.cltLock.Unlock() - if ok { - return perrors.Errorf("Path{%s} has been registered", conf.Path) - } - - err = r.register(conf) - if err != nil { - return perrors.WithStack(err) - } - - r.cltLock.Lock() - r.services[conf.Key()] = conf - r.cltLock.Unlock() - logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) - - case common.PROVIDER: - - // Check if the service has been registered - r.cltLock.Lock() - // Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path). - // Because the consumer wants to provide monitoring functions for the selector, - // the provider allows multiple groups or versions of the same service to be registered. - _, ok = r.services[conf.Key()] - r.cltLock.Unlock() - if ok { - return perrors.Errorf("Path{%s} has been registered", conf.Key()) - } - - err = r.register(conf) - if err != nil { - return perrors.WithMessagef(err, "register(conf:%+v)", conf) - } - - r.cltLock.Lock() - r.services[conf.Key()] = conf - r.cltLock.Unlock() - - logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf) - } - - return nil -} - -func (r *zkRegistry) service(c common.URL) string { - return url.QueryEscape(c.Service()) -} - -func (r *zkRegistry) register(c common.URL) error { - var ( - err error - //revision string - params url.Values - rawURL string - encodedURL string - dubboPath string - //conf config.URL - ) - - err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient)) - if err != nil { - return perrors.WithStack(err) - } - params = url.Values{} - - c.RangeParams(func(key, value string) bool { - params.Add(key, value) - return true - }) - - params.Add("pid", processID) - params.Add("ip", localIP) - //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6)) - - role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - switch role { - - case common.PROVIDER: - - if c.Path == "" || len(c.Methods) == 0 { - return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) - } - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - r.cltLock.Lock() - err = r.client.Create(dubboPath) - r.cltLock.Unlock() - if err != nil { - logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) - return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath) - } - params.Add("anyhost", "true") - - // Dubbo java consumer to start looking for the provider url,because the category does not match, - // the provider will not find, causing the consumer can not start, so we use consumers. - // DubboRole = [...]string{"consumer", "", "", "provider"} - // params.Add("category", (RoleType(PROVIDER)).Role()) - params.Add("category", (common.RoleType(common.PROVIDER)).String()) - params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) - - params.Add("side", (common.RoleType(common.PROVIDER)).Role()) - - if len(c.Methods) == 0 { - params.Add("methods", strings.Join(c.Methods, ",")) - } - logger.Debugf("provider zk url params:%#v", params) - var host string - if c.Ip == "" { - host = localIP + ":" + c.Port - } else { - host = c.Ip + ":" + c.Port - } - - rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode()) - encodedURL = url.QueryEscape(rawURL) - - // Print your own registration service providers. - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String()) - logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) - - case common.CONSUMER: - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) - r.cltLock.Lock() - err = r.client.Create(dubboPath) - r.cltLock.Unlock() - if err != nil { - logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) - return perrors.WithStack(err) - } - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - r.cltLock.Lock() - err = r.client.Create(dubboPath) - r.cltLock.Unlock() - if err != nil { - logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) - return perrors.WithStack(err) - } - - params.Add("protocol", c.Protocol) - - params.Add("category", (common.RoleType(common.CONSUMER)).String()) - params.Add("dubbo", "dubbogo-consumer-"+constant.Version) - - rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) - encodedURL = url.QueryEscape(rawURL) - - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String()) - logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) - - default: - return perrors.Errorf("@c{%v} type is not referencer or provider", c) - } - - dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") - err = r.registerTempZookeeperNode(dubboPath, encodedURL) - - if err != nil { - return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL) - } - return nil } func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { @@ -410,53 +184,6 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { return nil } -func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { - return r.getListener(conf) -} -func sleepWait(n int) { - wait := time.Duration((n + 1) * 2e8) - if wait > MaxWaitInterval { - wait = MaxWaitInterval - } - time.Sleep(wait) -} - -//subscribe from registry -func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { - n := 0 - for { - n++ - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - - listener, err := r.subscribe(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } - - for { - if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - break - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) - } - - } - sleepWait(n) - } -} - func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) { var ( zkListener *RegistryConfigurationListener @@ -493,22 +220,3 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen return zkListener, nil } - -func (r *zkRegistry) closeRegisters() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - logger.Infof("begin to close provider zk client") - // Close the old client first to close the tmp node. - r.client.Close() - r.client = nil - r.services = nil -} - -func (r *zkRegistry) IsAvailable() bool { - select { - case <-r.done: - return false - default: - return true - } -} diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 2c7bb9061..5e5189c27 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -64,7 +64,7 @@ func Test_Subscribe(t *testing.T) { _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) reg2.Register(url) - listener, _ := reg2.subscribe(&url) + listener, _ := reg2.DoSubscribe(&url) serviceEvent, _ := listener.Next() assert.NoError(t, err) @@ -85,7 +85,7 @@ func Test_ConsumerDestory(t *testing.T) { assert.NoError(t, err) err = reg.Register(url) assert.NoError(t, err) - _, err = reg.subscribe(&url) + _, err = reg.DoSubscribe(&url) assert.NoError(t, err) //listener.Close() diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index d00620661..35befc85e 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -38,7 +38,7 @@ type clientFacade interface { SetClient(*Client) ClientLock() *sync.Mutex WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container - GetDone() chan struct{} //for etcd client control + Done() chan struct{} //for etcd client control RestartCallBack() bool common.Node } @@ -55,7 +55,7 @@ func HandleClientRestart(r clientFacade) { LOOP: for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...") break LOOP // re-register all services @@ -72,7 +72,7 @@ LOOP: failTimes = 0 for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") break LOOP case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index 18f1a0498..055db4f71 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -35,7 +35,7 @@ type zkClientFacade interface { SetZkClient(*ZookeeperClient) ZkClientLock() *sync.Mutex WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container - GetDone() chan struct{} //for zk client control + Done() chan struct{} //for zk client control RestartCallBack() bool common.Node } @@ -52,7 +52,7 @@ func HandleClientRestart(r zkClientFacade) { LOOP: for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP // re-register all services @@ -68,7 +68,7 @@ LOOP: failTimes = 0 for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection zk. diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go index 58e0d69dc..9c17bd485 100644 --- a/remoting/zookeeper/facade_test.go +++ b/remoting/zookeeper/facade_test.go @@ -55,7 +55,7 @@ func (r *mockFacade) WaitGroup() *sync.WaitGroup { return &r.wg } -func (r *mockFacade) GetDone() chan struct{} { +func (r *mockFacade) Done() chan struct{} { return r.done } -- GitLab