diff --git a/go.mod b/go.mod index fee215bf35b02c889a821bcda4357e2051f124d4..128056a1f1bb5ea315f29a054fa5c20d64222bda 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/etcd v3.3.25+incompatible github.com/creasty/defaults v1.5.1 github.com/dubbogo/go-zookeeper v1.0.3 - github.com/dubbogo/gost v1.11.2 + github.com/dubbogo/gost v1.11.3 github.com/dubbogo/triple v0.0.0-20210403061850-372f2dc47e02 github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.4.0 diff --git a/go.sum b/go.sum index 9e23621ce28f08b824c2545c6bdeaa09d92d2124..5d2070f0f8c63dc2f61686f75fb8e1927f00d94b 100644 --- a/go.sum +++ b/go.sum @@ -180,8 +180,9 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8 github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI= -github.com/dubbogo/gost v1.11.2 h1:NanyHmvzE1HrgI2T9H/jE/N1wkxFEj+IbM1A4RT9H7Q= github.com/dubbogo/gost v1.11.2/go.mod h1:3QQEj50QOhkWTERT785YZ5ZxIRGNdR11FCLP7FzHsMc= +github.com/dubbogo/gost v1.11.3 h1:PSP9KQyuRJugmPLqC18MFgoIL0g1G4n/9FTKgQYjjbE= +github.com/dubbogo/gost v1.11.3/go.mod h1:3QQEj50QOhkWTERT785YZ5ZxIRGNdR11FCLP7FzHsMc= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dubbogo/net v0.0.2-0.20210326124702-e6a866993192 h1:CBEicrrVwR6u8ty+kL68ItxXVk1jaVYThrsx5ARhxUc= github.com/dubbogo/net v0.0.2-0.20210326124702-e6a866993192/go.mod h1:B6/ka3g8VzcyrmdCH4VkHP1K0aHeI37FmclS+TCwIBU= @@ -194,7 +195,6 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index 1939b911822b6b5db3d41f999d4dcb4ca3fae1f2..604e6da470ab9f33e7f146a5d3e283fdc0b8c0a3 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -22,6 +22,10 @@ import ( "time" ) +import ( + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -30,7 +34,6 @@ import ( "github.com/apache/dubbo-go/metadata/identifier" "github.com/apache/dubbo-go/metadata/report" "github.com/apache/dubbo-go/metadata/report/factory" - "github.com/apache/dubbo-go/remoting/etcdv3" ) const DEFAULT_ROOT = "dubbo" @@ -43,7 +46,7 @@ func init() { // etcdMetadataReport is the implementation of MetadataReport based etcd type etcdMetadataReport struct { - client *etcdv3.Client + client *gxetcd.Client root string } @@ -121,7 +124,7 @@ type etcdMetadataReportFactory struct{} func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport { timeout, _ := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) addresses := strings.Split(url.Location, ",") - client, err := etcdv3.NewClient(etcdv3.MetadataETCDV3Client, addresses, timeout, 1) + client, err := gxetcd.NewClient(gxetcd.MetadataETCDV3Client, addresses, timeout, 1) if err != nil { logger.Errorf("Could not create etcd metadata report. URL: %s,error:{%v}", url.String(), err) return nil diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 46ba6b678a5695ffcc6ac735bb81c839d44b0b8a..c51d1fc5100a742cdd7aeb4b152a6f39168f6b03 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -26,6 +26,7 @@ import ( ) import ( + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" ) @@ -50,7 +51,7 @@ func init() { type etcdV3Registry struct { registry.BaseRegistry cltLock sync.Mutex - client *etcdv3.Client + client *gxetcd.Client listenerLock sync.RWMutex listener *etcdv3.EventListener dataListener *dataListener @@ -58,12 +59,12 @@ type etcdV3Registry struct { } // Client gets the etcdv3 client -func (r *etcdV3Registry) Client() *etcdv3.Client { +func (r *etcdV3Registry) Client() *gxetcd.Client { return r.client } // SetClient sets the etcdv3 client -func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { +func (r *etcdV3Registry) SetClient(client *gxetcd.Client) { r.client = client } @@ -88,9 +89,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { if err := etcdv3.ValidateClient( r, - etcdv3.WithName(etcdv3.RegistryETCDV3Client), - etcdv3.WithTimeout(timeout), - etcdv3.WithEndpoints(strings.Split(url.Location, ",")...), + gxetcd.WithName(gxetcd.RegistryETCDV3Client), + gxetcd.WithTimeout(timeout), + gxetcd.WithEndpoints(strings.Split(url.Location, ",")...), ); err != nil { return nil, err } diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index c9c3d4366c4ac99c37388c43026b3cc382d367af..5e3e98f5dce4813443e7ec803da2256c8a36676a 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -26,6 +26,7 @@ import ( import ( gxset "github.com/dubbogo/gost/container/set" + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" gxpage "github.com/dubbogo/gost/hash/page" "github.com/hashicorp/vault/sdk/helper/jsonutil" perrors "github.com/pkg/errors" @@ -56,7 +57,7 @@ type etcdV3ServiceDiscovery struct { // descriptor is a short string about the basic information of this instance descriptor string // client is current Etcdv3 client - client *etcdv3.Client + client *gxetcd.Client // serviceInstance is current serviceInstance serviceInstance *registry.ServiceInstance // services is when register or update will add service name @@ -307,9 +308,9 @@ func newEtcdV3ServiceDiscovery(name string) (registry.ServiceDiscovery, error) { logger.Infof("etcd address is: %v,timeout is:%s", remoteConfig.Address, timeout.String()) client := etcdv3.NewServiceDiscoveryClient( - etcdv3.WithName(etcdv3.RegistryETCDV3Client), - etcdv3.WithTimeout(timeout), - etcdv3.WithEndpoints(strings.Split(remoteConfig.Address, ",")...), + gxetcd.WithName(gxetcd.RegistryETCDV3Client), + gxetcd.WithTimeout(timeout), + gxetcd.WithEndpoints(strings.Split(remoteConfig.Address, ",")...), ) descriptor := fmt.Sprintf("etcd-service-discovery[%s]", remoteConfig.Address) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index bef0ae5bb26516ae7368816569720786183a3875..12c78d9c147ba3f283fd734f842ea928691abd74 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -79,6 +79,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { return nil, err } + r.WaitGroup().Add(1) //zk client start successful, then wg +1 go zookeeper.HandleClientRestart(r) r.listener = zookeeper.NewZkEventListener(r.client) diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 34ee31bd94afcc48776bfc81d73bbd5c87a2b7f8..3acdbc4a4b944f3a97971f414d6d14d0f11e0e29 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -18,110 +18,42 @@ package etcdv3 import ( - "context" - "sync" - "time" -) - -import ( - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/concurrency" + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" - "google.golang.org/grpc" ) import ( "github.com/apache/dubbo-go/common/logger" ) -const ( - // ConnDelay connection delay - ConnDelay = 3 - // MaxFailTimes max failure times - MaxFailTimes = 15 - // RegistryETCDV3Client client name - RegistryETCDV3Client = "etcd registry" - // metadataETCDV3Client client name - MetadataETCDV3Client = "etcd metadata" -) - -var ( - // Defines related errors - ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR - ErrKVPairNotFound = perrors.New("k/v pair not found") -) - -// nolint -type Options struct { - name string - endpoints []string - client *Client - timeout time.Duration - heartbeat int // heartbeat second -} - -// Option will define a function of handling Options -type Option func(*Options) - -// WithEndpoints sets etcd client endpoints -func WithEndpoints(endpoints ...string) Option { - return func(opt *Options) { - opt.endpoints = endpoints - } -} - -// WithName sets etcd client name -func WithName(name string) Option { - return func(opt *Options) { - opt.name = name - } -} - -// WithTimeout sets etcd client timeout -func WithTimeout(timeout time.Duration) Option { - return func(opt *Options) { - opt.timeout = timeout - } -} - -// WithHeartbeat sets etcd client heartbeat -func WithHeartbeat(heartbeat int) Option { - return func(opt *Options) { - opt.heartbeat = heartbeat - } -} - // ValidateClient validates client and sets options -func ValidateClient(container clientFacade, opts ...Option) error { - options := &Options{ - heartbeat: 1, // default heartbeat - } +func ValidateClient(container clientFacade, opts ...gxetcd.Option) error { + options := &gxetcd.Options{} for _, opt := range opts { opt(options) } - lock := container.ClientLock() lock.Lock() defer lock.Unlock() // new Client if container.Client() == nil { - newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) + newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat) if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", - options.name, options.endpoints, options.timeout, err) - return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) + options.Name, options.Endpoints, options.Timeout, err) + return perrors.WithMessagef(err, "new client (address:%+v)", options.Endpoints) } container.SetClient(newClient) } // Client lose connection with etcd server - if container.Client().rawClient == nil { - newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) + if container.Client().GetRawClient() == nil { + newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat) if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", - options.name, options.endpoints, options.timeout, err) - return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) + options.Name, options.Endpoints, options.Timeout, err) + return perrors.WithMessagef(err, "new client (address:%+v)", options.Endpoints) } container.SetClient(newClient) } @@ -130,368 +62,18 @@ func ValidateClient(container clientFacade, opts ...Option) error { } // nolint -func NewServiceDiscoveryClient(opts ...Option) *Client { - options := &Options{ - heartbeat: 1, // default heartbeat +func NewServiceDiscoveryClient(opts ...gxetcd.Option) *gxetcd.Client { + options := &gxetcd.Options{ + Heartbeat: 1, // default heartbeat } for _, opt := range opts { opt(options) } - newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) + newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat) if err != nil { logger.Errorf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", - options.name, options.endpoints, options.timeout, err) + options.Name, options.Endpoints, options.Timeout, err) } return newClient } - -// Client represents etcd client Configuration -type Client struct { - lock sync.RWMutex - - // these properties are only set once when they are started. - name string - endpoints []string - timeout time.Duration - heartbeat int - - ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg - cancel context.CancelFunc // cancel the ctx, all watcher will stopped - rawClient *clientv3.Client - - exit chan struct{} - Wait sync.WaitGroup -} - -// nolint -func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) { - ctx, cancel := context.WithCancel(context.Background()) - rawClient, err := clientv3.New(clientv3.Config{ - Context: ctx, - Endpoints: endpoints, - DialTimeout: timeout, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, - }) - if err != nil { - return nil, perrors.WithMessage(err, "new raw client block connect to server") - } - - c := &Client{ - name: name, - timeout: timeout, - endpoints: endpoints, - heartbeat: heartbeat, - - ctx: ctx, - cancel: cancel, - rawClient: rawClient, - - exit: make(chan struct{}), - } - - if err := c.maintenanceStatus(); err != nil { - return nil, perrors.WithMessage(err, "client maintenance status") - } - return c, nil -} - -// NOTICE: need to get the lock before calling this method -func (c *Client) clean() { - // close raw client - c.rawClient.Close() - - // cancel ctx for raw client - c.cancel() - - // clean raw client - c.rawClient = nil -} - -func (c *Client) stop() bool { - select { - case <-c.exit: - return true - default: - close(c.exit) - } - return false -} - -// nolint -func (c *Client) Close() { - if c == nil { - return - } - - // stop the client - c.stop() - - // wait client maintenance status stop - c.Wait.Wait() - - c.lock.Lock() - defer c.lock.Unlock() - if c.rawClient != nil { - c.clean() - } - logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints) -} - -func (c *Client) maintenanceStatus() error { - s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) - if err != nil { - return perrors.WithMessage(err, "new session with server") - } - - // must add wg before go maintenance status goroutine - c.Wait.Add(1) - go c.maintenanceStatusLoop(s) - return nil -} - -func (c *Client) maintenanceStatusLoop(s *concurrency.Session) { - defer func() { - c.Wait.Done() - logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name) - }() - - for { - select { - case <-c.Done(): - // Client be stopped, will clean the client hold resources - return - case <-s.Done(): - logger.Warn("etcd server stopped") - c.lock.Lock() - // when etcd server stopped, cancel ctx, stop all watchers - c.clean() - // when connection lose, stop client, trigger reconnect to etcd - c.stop() - c.lock.Unlock() - return - } - } -} - -// if k not exist will put k/v in etcd, otherwise return nil -func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return ErrNilETCDV3Client - } - - _, err := c.rawClient.Txn(c.ctx). - If(clientv3.Compare(clientv3.Version(k), "<", 1)). - Then(clientv3.OpPut(k, v, opts...)). - Commit() - return err -} - -// if k not exist will put k/v in etcd -// if k is already exist in etcd, replace it -func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return ErrNilETCDV3Client - } - - _, err := c.rawClient.Txn(c.ctx). - If(clientv3.Compare(clientv3.Version(k), "!=", -1)). - Then(clientv3.OpPut(k, v, opts...)). - Commit() - return err -} - -func (c *Client) delete(k string) error { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return ErrNilETCDV3Client - } - - _, err := c.rawClient.Delete(c.ctx, k) - return err -} - -func (c *Client) get(k string) (string, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return "", ErrNilETCDV3Client - } - - resp, err := c.rawClient.Get(c.ctx, k) - if err != nil { - return "", err - } - - if len(resp.Kvs) == 0 { - return "", ErrKVPairNotFound - } - - return string(resp.Kvs[0].Value), nil -} - -// nolint -func (c *Client) CleanKV() error { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return ErrNilETCDV3Client - } - - _, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix()) - return err -} - -func (c *Client) getChildren(k string) ([]string, []string, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return nil, nil, ErrNilETCDV3Client - } - - resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix()) - if err != nil { - return nil, nil, err - } - - if len(resp.Kvs) == 0 { - return nil, nil, ErrKVPairNotFound - } - - kList := make([]string, 0, len(resp.Kvs)) - vList := make([]string, 0, len(resp.Kvs)) - for _, kv := range resp.Kvs { - kList = append(kList, string(kv.Key)) - vList = append(vList, string(kv.Value)) - } - return kList, vList, nil -} - -func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return nil, ErrNilETCDV3Client - } - - return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil -} - -func (c *Client) watch(k string) (clientv3.WatchChan, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return nil, ErrNilETCDV3Client - } - - return c.rawClient.Watch(c.ctx, k), nil -} - -func (c *Client) keepAliveKV(k string, v string) error { - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil { - return ErrNilETCDV3Client - } - - // make lease time longer, since 1 second is too short - lease, err := c.rawClient.Grant(c.ctx, int64(30*time.Second.Seconds())) - if err != nil { - return perrors.WithMessage(err, "grant lease") - } - - keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) - if err != nil || keepAlive == nil { - if _, revokeErr := c.rawClient.Revoke(c.ctx, lease.ID); revokeErr != nil { - logger.Warnf("rawClient.Revoke() = error:%v", revokeErr) - } - if err != nil { - return perrors.WithMessage(err, "keep alive lease") - } else { - return perrors.New("keep alive lease") - } - } - - _, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID)) - return perrors.WithMessage(err, "put k/v with lease") -} - -// nolint -func (c *Client) Done() <-chan struct{} { - return c.exit -} - -// nolint -func (c *Client) Valid() bool { - select { - case <-c.exit: - return false - default: - } - - c.lock.RLock() - defer c.lock.RUnlock() - return c.rawClient != nil -} - -// nolint -func (c *Client) Create(k string, v string) error { - err := c.put(k, v) - return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) -} - -// Update key value ... -func (c *Client) Update(k, v string) error { - err := c.update(k, v) - return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v) -} - -// nolint -func (c *Client) Delete(k string) error { - err := c.delete(k) - return perrors.WithMessagef(err, "delete k/v (key %s)", k) -} - -// RegisterTemp registers a temporary node -func (c *Client) RegisterTemp(k, v string) error { - err := c.keepAliveKV(k, v) - return perrors.WithMessagef(err, "keepalive kv (key %s)", k) -} - -// GetChildrenKVList gets children kv list by @k -func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { - kList, vList, err := c.getChildren(k) - return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k) -} - -// Get gets value by @k -func (c *Client) Get(k string) (string, error) { - v, err := c.get(k) - return v, perrors.WithMessagef(err, "get key value (key %s)", k) -} - -// Watch watches on spec key -func (c *Client) Watch(k string) (clientv3.WatchChan, error) { - wc, err := c.watch(k) - return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k) -} - -// WatchWithPrefix watches on spec prefix -func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { - wc, err := c.watchWithPrefix(prefix) - return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix) -} diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go deleted file mode 100644 index 4f4fa21dad085dc1f82fb4157cd1209fedb16cde..0000000000000000000000000000000000000000 --- a/remoting/etcdv3/client_test.go +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package etcdv3 - -import ( - "net/url" - "os" - "path" - "reflect" - "strings" - "sync" - "testing" - "time" -) - -import ( - "github.com/coreos/etcd/embed" - "github.com/coreos/etcd/mvcc/mvccpb" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" - "google.golang.org/grpc/connectivity" -) - -const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd" - -// tests dataset -var tests = []struct { - input struct { - k string - v string - } -}{ - {input: struct { - k string - v string - }{k: "name", v: "scott.wang"}}, - {input: struct { - k string - v string - }{k: "namePrefix", v: "prefix.scott.wang"}}, - {input: struct { - k string - v string - }{k: "namePrefix1", v: "prefix1.scott.wang"}}, - {input: struct { - k string - v string - }{k: "age", v: "27"}}, -} - -// test dataset prefix -const prefix = "name" - -type ClientTestSuite struct { - suite.Suite - - etcdConfig struct { - name string - endpoints []string - timeout time.Duration - heartbeat int - } - - etcd *embed.Etcd - - client *Client -} - -// start etcd server -func (suite *ClientTestSuite) SetupSuite() { - t := suite.T() - - DefaultListenPeerURLs := "http://localhost:2382" - DefaultListenClientURLs := "http://localhost:2381" - lpurl, _ := url.Parse(DefaultListenPeerURLs) - lcurl, _ := url.Parse(DefaultListenClientURLs) - cfg := embed.NewConfig() - cfg.LPUrls = []url.URL{*lpurl} - cfg.LCUrls = []url.URL{*lcurl} - cfg.Dir = defaultEtcdV3WorkDir - e, err := embed.StartEtcd(cfg) - if err != nil { - t.Fatal(err) - } - select { - case <-e.Server.ReadyNotify(): - t.Log("Server is ready!") - case <-time.After(60 * time.Second): - e.Server.Stop() // trigger a shutdown - t.Logf("Server took too long to start!") - } - - suite.etcd = e -} - -// stop etcd server -func (suite *ClientTestSuite) TearDownSuite() { - suite.etcd.Close() - if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil { - suite.FailNow(err.Error()) - } -} - -func (suite *ClientTestSuite) setUpClient() *Client { - c, err := NewClient(suite.etcdConfig.name, - suite.etcdConfig.endpoints, - suite.etcdConfig.timeout, - suite.etcdConfig.heartbeat) - if err != nil { - suite.T().Fatal(err) - } - return c -} - -// set up a client for suite -func (suite *ClientTestSuite) SetupTest() { - c := suite.setUpClient() - err := c.CleanKV() - suite.Nil(err) - suite.client = c -} - -func (suite *ClientTestSuite) TestClientClose() { - c := suite.client - t := suite.T() - - defer c.Close() - if c.rawClient.ActiveConnection().GetState() != connectivity.Ready { - t.Fatal(suite.client.rawClient.ActiveConnection().GetState()) - } -} - -func (suite *ClientTestSuite) TestClientValid() { - c := suite.client - t := suite.T() - - if !c.Valid() { - t.Fatal("client is not valid") - } - c.Close() - if suite.client.Valid() != false { - t.Fatal("client is valid") - } -} - -func (suite *ClientTestSuite) TestClientDone() { - c := suite.client - - go func() { - time.Sleep(2 * time.Second) - c.Close() - }() - - c.Wait.Wait() - - if c.Valid() { - suite.T().Fatal("client should be invalid then") - } -} - -func (suite *ClientTestSuite) TestClientCreateKV() { - tests := tests - - c := suite.client - t := suite.T() - - defer suite.client.Close() - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - expect := tc.input.v - - if err := c.Create(k, v); err != nil { - t.Fatal(err) - } - - value, err := c.Get(k) - if err != nil { - t.Fatal(err) - } - - if value != expect { - t.Fatalf("expect %v but get %v", expect, value) - } - - } -} - -func (suite *ClientTestSuite) TestClientDeleteKV() { - tests := tests - c := suite.client - t := suite.T() - - defer c.Close() - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - expect := ErrKVPairNotFound - - if err := c.Create(k, v); err != nil { - t.Fatal(err) - } - - if err := c.Delete(k); err != nil { - t.Fatal(err) - } - - _, err := c.Get(k) - if perrors.Cause(err) == expect { - continue - } - - if err != nil { - t.Fatal(err) - } - } -} - -func (suite *ClientTestSuite) TestClientGetChildrenKVList() { - tests := tests - - c := suite.client - t := suite.T() - - var expectKList []string - var expectVList []string - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - - if strings.Contains(k, prefix) { - expectKList = append(expectKList, k) - expectVList = append(expectVList, v) - } - - if err := c.Create(k, v); err != nil { - t.Fatal(err) - } - } - - kList, vList, err := c.GetChildrenKVList(prefix) - if err != nil { - t.Fatal(err) - } - - if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) { - return - } - - t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList) -} - -func (suite *ClientTestSuite) TestClientWatch() { - tests := tests - - c := suite.client - t := suite.T() - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer wg.Done() - - wc, err := c.watch(prefix) - if err != nil { - t.Error(err) - } - - events := make([]mvccpb.Event, 0) - var eCreate, eDelete mvccpb.Event - - for e := range wc { - for _, event := range e.Events { - events = append(events, (mvccpb.Event)(*event)) - if event.Type == mvccpb.PUT { - eCreate = (mvccpb.Event)(*event) - } - if event.Type == mvccpb.DELETE { - eDelete = (mvccpb.Event)(*event) - } - t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value) - } - } - - assert.Equal(t, 2, len(events)) - assert.Contains(t, events, eCreate) - assert.Contains(t, events, eDelete) - }() - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - - if err := c.Create(k, v); err != nil { - t.Fatal(err) - } - - if err := c.delete(k); err != nil { - t.Fatal(err) - } - } - - c.Close() - - wg.Wait() -} - -func (suite *ClientTestSuite) TestClientRegisterTemp() { - c := suite.client - observeC := suite.setUpClient() - t := suite.T() - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer wg.Done() - - completePath := path.Join("scott", "wang") - wc, err := observeC.watch(completePath) - if err != nil { - t.Error(err) - } - - events := make([]mvccpb.Event, 0) - var eCreate, eDelete mvccpb.Event - - for e := range wc { - for _, event := range e.Events { - events = append(events, (mvccpb.Event)(*event)) - if event.Type == mvccpb.DELETE { - eDelete = (mvccpb.Event)(*event) - t.Logf("complete key (%s) is delete", completePath) - observeC.Close() - break - } - eCreate = (mvccpb.Event)(*event) - t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value) - } - } - - assert.Equal(t, 2, len(events)) - assert.Contains(t, events, eCreate) - assert.Contains(t, events, eDelete) - }() - - err := c.RegisterTemp("scott/wang", "test") - if err != nil { - t.Fatal(err) - } - - time.Sleep(2 * time.Second) - c.Close() - - wg.Wait() -} - -func TestClientSuite(t *testing.T) { - suite.Run(t, &ClientTestSuite{ - etcdConfig: struct { - name string - endpoints []string - timeout time.Duration - heartbeat int - }{ - name: "test", - endpoints: []string{"localhost:2381"}, - timeout: time.Second, - heartbeat: 1, - }, - }) -} diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index 7a176911514146a829e3fa0b6547d8bf1af7b031..8bd1e3a23c82a1daaa3441927d8c46a29b013dc8 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -24,6 +24,7 @@ import ( import ( "github.com/apache/dubbo-getty" + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" ) @@ -34,8 +35,8 @@ import ( ) type clientFacade interface { - Client() *Client - SetClient(*Client) + Client() *gxetcd.Client + SetClient(client *gxetcd.Client) ClientLock() *sync.Mutex WaitGroup() *sync.WaitGroup // for wait group control, etcd client listener & etcd client container Done() chan struct{} // for etcd client control @@ -60,9 +61,9 @@ LOOP: // re-register all services case <-r.Client().Done(): r.ClientLock().Lock() - clientName := RegistryETCDV3Client + clientName := gxetcd.RegistryETCDV3Client timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - endpoints := r.Client().endpoints + endpoints := r.Client().GetEndPoints() r.Client().Close() r.SetClient(nil) r.ClientLock().Unlock() @@ -74,13 +75,14 @@ LOOP: case <-r.Done(): logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") break LOOP - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * gxetcd.ConnDelay)): // avoid connect frequent } err = ValidateClient( r, - WithName(clientName), - WithEndpoints(endpoints...), - WithTimeout(timeout), + gxetcd.WithName(clientName), + gxetcd.WithEndpoints(endpoints...), + gxetcd.WithTimeout(timeout), + gxetcd.WithHeartbeat(1), ) logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", endpoints, perrors.WithStack(err)) @@ -88,8 +90,8 @@ LOOP: break } failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes + if gxetcd.MaxFailTimes <= failTimes { + failTimes = gxetcd.MaxFailTimes } } } diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index 23ee727737e5182cf0481150f38b58122036f877..e4ae69442c8c94799227f9ab70ce82b60cd97b85 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -25,6 +25,7 @@ import ( import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" ) @@ -35,14 +36,14 @@ import ( // nolint type EventListener struct { - client *Client + client *gxetcd.Client keyMapLock sync.RWMutex keyMap map[string]struct{} wg sync.WaitGroup } // NewEventListener returns a EventListener instance -func NewEventListener(client *Client) *EventListener { +func NewEventListener(client *gxetcd.Client) *EventListener { return &EventListener{ client: client, keyMap: make(map[string]struct{}), @@ -69,7 +70,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting. return false // client ctx stop - case <-l.client.ctx.Done(): + case <-l.client.GetCtx().Done(): logger.Warnf("etcd client ctx cancel") return false @@ -147,7 +148,7 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener return // client ctx stop - case <-l.client.ctx.Done(): + case <-l.client.GetCtx().Done(): logger.Warnf("etcd client ctx cancel") return @@ -191,7 +192,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis l.keyMap[key] = struct{}{} l.keyMapLock.Unlock() - keyList, valueList, err := l.client.getChildren(key) + keyList, valueList, err := l.client.GetChildren(key) if err != nil { logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) } diff --git a/remoting/etcdv3/listener_test.go b/remoting/etcdv3/listener_test.go index cfd8bffd0ae80f0001a691d1663165c5464a32c8..6117123ee812a4c3ad581618bb8bf6f7a5a708f4 100644 --- a/remoting/etcdv3/listener_test.go +++ b/remoting/etcdv3/listener_test.go @@ -18,10 +18,15 @@ package etcdv3 import ( + "net/url" + "os" + "testing" "time" ) import ( + "github.com/coreos/etcd/embed" + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" "github.com/stretchr/testify/assert" ) @@ -29,6 +34,8 @@ import ( "github.com/apache/dubbo-go/remoting" ) +const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd" + var changedData = ` dubbo.consumer.request_timeout=3s dubbo.consumer.connect_timeout=5s @@ -51,7 +58,40 @@ var changedData = ` dubbo.service.com.ikurento.user.UserProvider.cluster=failover ` -func (suite *ClientTestSuite) TestListener() { +var etcd *embed.Etcd + +func SetUpEtcdServer(t *testing.T) { + var err error + DefaultListenPeerURLs := "http://localhost:2382" + DefaultListenClientURLs := "http://localhost:2381" + lpurl, _ := url.Parse(DefaultListenPeerURLs) + lcurl, _ := url.Parse(DefaultListenClientURLs) + cfg := embed.NewConfig() + cfg.LPUrls = []url.URL{*lpurl} + cfg.LCUrls = []url.URL{*lcurl} + cfg.Dir = defaultEtcdV3WorkDir + etcd, err = embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + select { + case <-etcd.Server.ReadyNotify(): + t.Log("Server is ready!") + case <-time.After(60 * time.Second): + etcd.Server.Stop() // trigger a shutdown + t.Logf("Server took too long to start!") + } +} + +func ClearEtcdServer(t *testing.T) { + etcd.Close() + if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil { + t.Fail() + } +} + +func TestListener(t *testing.T) { + tests := []struct { input struct { k string @@ -63,9 +103,9 @@ func (suite *ClientTestSuite) TestListener() { v string }{k: "/dubbo", v: changedData}}, } - - c := suite.client - t := suite.T() + SetUpEtcdServer(t) + c, err := gxetcd.NewClient("test", []string{"localhost:2381"}, time.Second, 1) + assert.NoError(t, err) listener := NewEventListener(c) dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} @@ -84,11 +124,12 @@ func (suite *ClientTestSuite) TestListener() { } msg := <-dataListener.rc assert.Equal(t, changedData, msg.Content) + ClearEtcdServer(t) } type mockDataListener struct { eventList []remoting.Event - client *Client + client *gxetcd.Client changedData string rc chan remoting.Event diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go index 8a247e267daa02d748c9a5e47ced698f617dfe9a..b5c6530d6c9d7fc51250ee9c179522e113bbc366 100644 --- a/remoting/nacos/builder.go +++ b/remoting/nacos/builder.go @@ -124,6 +124,7 @@ func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error clientConfig.Username = rc.Username clientConfig.Password = rc.Password clientConfig.NotLoadCacheAtStart = true + clientConfig.NamespaceId = rc.GetParam(constant.NACOS_NAMESPACE_ID, "") configMap["clientConfig"] = clientConfig return clients.CreateNamingClient(configMap)