Skip to content
Snippets Groups Projects
Commit 6619560a authored by LaurenceLiZhixin's avatar LaurenceLiZhixin
Browse files

fix: conflict

parents 329a2923 f7ae84a4
No related branches found
No related tags found
No related merge requests found
......@@ -13,7 +13,7 @@ require (
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.2
github.com/dubbogo/gost v1.11.3
github.com/dubbogo/triple v0.0.0-20210403061850-372f2dc47e02
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.4.0
......
......@@ -180,8 +180,9 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.2 h1:NanyHmvzE1HrgI2T9H/jE/N1wkxFEj+IbM1A4RT9H7Q=
github.com/dubbogo/gost v1.11.2/go.mod h1:3QQEj50QOhkWTERT785YZ5ZxIRGNdR11FCLP7FzHsMc=
github.com/dubbogo/gost v1.11.3 h1:PSP9KQyuRJugmPLqC18MFgoIL0g1G4n/9FTKgQYjjbE=
github.com/dubbogo/gost v1.11.3/go.mod h1:3QQEj50QOhkWTERT785YZ5ZxIRGNdR11FCLP7FzHsMc=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.2-0.20210326124702-e6a866993192 h1:CBEicrrVwR6u8ty+kL68ItxXVk1jaVYThrsx5ARhxUc=
github.com/dubbogo/net v0.0.2-0.20210326124702-e6a866993192/go.mod h1:B6/ka3g8VzcyrmdCH4VkHP1K0aHeI37FmclS+TCwIBU=
......@@ -194,7 +195,6 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
......
......@@ -22,6 +22,10 @@ import (
"time"
)
import (
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -30,7 +34,6 @@ import (
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
"github.com/apache/dubbo-go/remoting/etcdv3"
)
const DEFAULT_ROOT = "dubbo"
......@@ -43,7 +46,7 @@ func init() {
// etcdMetadataReport is the implementation of MetadataReport based etcd
type etcdMetadataReport struct {
client *etcdv3.Client
client *gxetcd.Client
root string
}
......@@ -121,7 +124,7 @@ type etcdMetadataReportFactory struct{}
func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
timeout, _ := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
addresses := strings.Split(url.Location, ",")
client, err := etcdv3.NewClient(etcdv3.MetadataETCDV3Client, addresses, timeout, 1)
client, err := gxetcd.NewClient(gxetcd.MetadataETCDV3Client, addresses, timeout, 1)
if err != nil {
logger.Errorf("Could not create etcd metadata report. URL: %s,error:{%v}", url.String(), err)
return nil
......
......@@ -26,6 +26,7 @@ import (
)
import (
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)
......@@ -50,7 +51,7 @@ func init() {
type etcdV3Registry struct {
registry.BaseRegistry
cltLock sync.Mutex
client *etcdv3.Client
client *gxetcd.Client
listenerLock sync.RWMutex
listener *etcdv3.EventListener
dataListener *dataListener
......@@ -58,12 +59,12 @@ type etcdV3Registry struct {
}
// Client gets the etcdv3 client
func (r *etcdV3Registry) Client() *etcdv3.Client {
func (r *etcdV3Registry) Client() *gxetcd.Client {
return r.client
}
// SetClient sets the etcdv3 client
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
func (r *etcdV3Registry) SetClient(client *gxetcd.Client) {
r.client = client
}
......@@ -88,9 +89,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
if err := etcdv3.ValidateClient(
r,
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
etcdv3.WithEndpoints(strings.Split(url.Location, ",")...),
gxetcd.WithName(gxetcd.RegistryETCDV3Client),
gxetcd.WithTimeout(timeout),
gxetcd.WithEndpoints(strings.Split(url.Location, ",")...),
); err != nil {
return nil, err
}
......
......@@ -26,6 +26,7 @@ import (
import (
gxset "github.com/dubbogo/gost/container/set"
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
perrors "github.com/pkg/errors"
......@@ -56,7 +57,7 @@ type etcdV3ServiceDiscovery struct {
// descriptor is a short string about the basic information of this instance
descriptor string
// client is current Etcdv3 client
client *etcdv3.Client
client *gxetcd.Client
// serviceInstance is current serviceInstance
serviceInstance *registry.ServiceInstance
// services is when register or update will add service name
......@@ -307,9 +308,9 @@ func newEtcdV3ServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
logger.Infof("etcd address is: %v,timeout is:%s", remoteConfig.Address, timeout.String())
client := etcdv3.NewServiceDiscoveryClient(
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
etcdv3.WithEndpoints(strings.Split(remoteConfig.Address, ",")...),
gxetcd.WithName(gxetcd.RegistryETCDV3Client),
gxetcd.WithTimeout(timeout),
gxetcd.WithEndpoints(strings.Split(remoteConfig.Address, ",")...),
)
descriptor := fmt.Sprintf("etcd-service-discovery[%s]", remoteConfig.Address)
......
......@@ -79,6 +79,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
return nil, err
}
r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
......
......@@ -18,110 +18,42 @@
package etcdv3
import (
"context"
"sync"
"time"
)
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
// ConnDelay connection delay
ConnDelay = 3
// MaxFailTimes max failure times
MaxFailTimes = 15
// RegistryETCDV3Client client name
RegistryETCDV3Client = "etcd registry"
// metadataETCDV3Client client name
MetadataETCDV3Client = "etcd metadata"
)
var (
// Defines related errors
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
// nolint
type Options struct {
name string
endpoints []string
client *Client
timeout time.Duration
heartbeat int // heartbeat second
}
// Option will define a function of handling Options
type Option func(*Options)
// WithEndpoints sets etcd client endpoints
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
}
}
// WithName sets etcd client name
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
}
}
// WithTimeout sets etcd client timeout
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.timeout = timeout
}
}
// WithHeartbeat sets etcd client heartbeat
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.heartbeat = heartbeat
}
}
// ValidateClient validates client and sets options
func ValidateClient(container clientFacade, opts ...Option) error {
options := &Options{
heartbeat: 1, // default heartbeat
}
func ValidateClient(container clientFacade, opts ...gxetcd.Option) error {
options := &gxetcd.Options{}
for _, opt := range opts {
opt(options)
}
lock := container.ClientLock()
lock.Lock()
defer lock.Unlock()
// new Client
if container.Client() == nil {
newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
options.Name, options.Endpoints, options.Timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.Endpoints)
}
container.SetClient(newClient)
}
// Client lose connection with etcd server
if container.Client().rawClient == nil {
newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if container.Client().GetRawClient() == nil {
newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
options.Name, options.Endpoints, options.Timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.Endpoints)
}
container.SetClient(newClient)
}
......@@ -130,368 +62,18 @@ func ValidateClient(container clientFacade, opts ...Option) error {
}
// nolint
func NewServiceDiscoveryClient(opts ...Option) *Client {
options := &Options{
heartbeat: 1, // default heartbeat
func NewServiceDiscoveryClient(opts ...gxetcd.Option) *gxetcd.Client {
options := &gxetcd.Options{
Heartbeat: 1, // default heartbeat
}
for _, opt := range opts {
opt(options)
}
newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
newClient, err := gxetcd.NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
logger.Errorf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
options.Name, options.Endpoints, options.Timeout, err)
}
return newClient
}
// Client represents etcd client Configuration
type Client struct {
lock sync.RWMutex
// these properties are only set once when they are started.
name string
endpoints []string
timeout time.Duration
heartbeat int
ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
rawClient *clientv3.Client
exit chan struct{}
Wait sync.WaitGroup
}
// nolint
func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
rawClient, err := clientv3.New(clientv3.Config{
Context: ctx,
Endpoints: endpoints,
DialTimeout: timeout,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, perrors.WithMessage(err, "new raw client block connect to server")
}
c := &Client{
name: name,
timeout: timeout,
endpoints: endpoints,
heartbeat: heartbeat,
ctx: ctx,
cancel: cancel,
rawClient: rawClient,
exit: make(chan struct{}),
}
if err := c.maintenanceStatus(); err != nil {
return nil, perrors.WithMessage(err, "client maintenance status")
}
return c, nil
}
// NOTICE: need to get the lock before calling this method
func (c *Client) clean() {
// close raw client
c.rawClient.Close()
// cancel ctx for raw client
c.cancel()
// clean raw client
c.rawClient = nil
}
func (c *Client) stop() bool {
select {
case <-c.exit:
return true
default:
close(c.exit)
}
return false
}
// nolint
func (c *Client) Close() {
if c == nil {
return
}
// stop the client
c.stop()
// wait client maintenance status stop
c.Wait.Wait()
c.lock.Lock()
defer c.lock.Unlock()
if c.rawClient != nil {
c.clean()
}
logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)
}
func (c *Client) maintenanceStatus() error {
s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
if err != nil {
return perrors.WithMessage(err, "new session with server")
}
// must add wg before go maintenance status goroutine
c.Wait.Add(1)
go c.maintenanceStatusLoop(s)
return nil
}
func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
defer func() {
c.Wait.Done()
logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name)
}()
for {
select {
case <-c.Done():
// Client be stopped, will clean the client hold resources
return
case <-s.Done():
logger.Warn("etcd server stopped")
c.lock.Lock()
// when etcd server stopped, cancel ctx, stop all watchers
c.clean()
// when connection lose, stop client, trigger reconnect to etcd
c.stop()
c.lock.Unlock()
return
}
}
}
// if k not exist will put k/v in etcd, otherwise return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
return err
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, replace it
func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "!=", -1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
return err
}
func (c *Client) delete(k string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Delete(c.ctx, k)
return err
}
func (c *Client) get(k string) (string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return "", ErrNilETCDV3Client
}
resp, err := c.rawClient.Get(c.ctx, k)
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", ErrKVPairNotFound
}
return string(resp.Kvs[0].Value), nil
}
// nolint
func (c *Client) CleanKV() error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix())
return err
}
func (c *Client) getChildren(k string) ([]string, []string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, nil, ErrNilETCDV3Client
}
resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix())
if err != nil {
return nil, nil, err
}
if len(resp.Kvs) == 0 {
return nil, nil, ErrKVPairNotFound
}
kList := make([]string, 0, len(resp.Kvs))
vList := make([]string, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
kList = append(kList, string(kv.Key))
vList = append(vList, string(kv.Value))
}
return kList, vList, nil
}
func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3Client
}
return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
}
func (c *Client) watch(k string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3Client
}
return c.rawClient.Watch(c.ctx, k), nil
}
func (c *Client) keepAliveKV(k string, v string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
// make lease time longer, since 1 second is too short
lease, err := c.rawClient.Grant(c.ctx, int64(30*time.Second.Seconds()))
if err != nil {
return perrors.WithMessage(err, "grant lease")
}
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
if _, revokeErr := c.rawClient.Revoke(c.ctx, lease.ID); revokeErr != nil {
logger.Warnf("rawClient.Revoke() = error:%v", revokeErr)
}
if err != nil {
return perrors.WithMessage(err, "keep alive lease")
} else {
return perrors.New("keep alive lease")
}
}
_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
return perrors.WithMessage(err, "put k/v with lease")
}
// nolint
func (c *Client) Done() <-chan struct{} {
return c.exit
}
// nolint
func (c *Client) Valid() bool {
select {
case <-c.exit:
return false
default:
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.rawClient != nil
}
// nolint
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}
// Update key value ...
func (c *Client) Update(k, v string) error {
err := c.update(k, v)
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
}
// nolint
func (c *Client) Delete(k string) error {
err := c.delete(k)
return perrors.WithMessagef(err, "delete k/v (key %s)", k)
}
// RegisterTemp registers a temporary node
func (c *Client) RegisterTemp(k, v string) error {
err := c.keepAliveKV(k, v)
return perrors.WithMessagef(err, "keepalive kv (key %s)", k)
}
// GetChildrenKVList gets children kv list by @k
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k)
}
// Get gets value by @k
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
return v, perrors.WithMessagef(err, "get key value (key %s)", k)
}
// Watch watches on spec key
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}
// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcdv3
import (
"net/url"
"os"
"path"
"reflect"
"strings"
"sync"
"testing"
"time"
)
import (
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/mvcc/mvccpb"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/connectivity"
)
const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd"
// tests dataset
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "name", v: "scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix", v: "prefix.scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix1", v: "prefix1.scott.wang"}},
{input: struct {
k string
v string
}{k: "age", v: "27"}},
}
// test dataset prefix
const prefix = "name"
type ClientTestSuite struct {
suite.Suite
etcdConfig struct {
name string
endpoints []string
timeout time.Duration
heartbeat int
}
etcd *embed.Etcd
client *Client
}
// start etcd server
func (suite *ClientTestSuite) SetupSuite() {
t := suite.T()
DefaultListenPeerURLs := "http://localhost:2382"
DefaultListenClientURLs := "http://localhost:2381"
lpurl, _ := url.Parse(DefaultListenPeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs)
cfg := embed.NewConfig()
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-time.After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}
suite.etcd = e
}
// stop etcd server
func (suite *ClientTestSuite) TearDownSuite() {
suite.etcd.Close()
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
suite.FailNow(err.Error())
}
}
func (suite *ClientTestSuite) setUpClient() *Client {
c, err := NewClient(suite.etcdConfig.name,
suite.etcdConfig.endpoints,
suite.etcdConfig.timeout,
suite.etcdConfig.heartbeat)
if err != nil {
suite.T().Fatal(err)
}
return c
}
// set up a client for suite
func (suite *ClientTestSuite) SetupTest() {
c := suite.setUpClient()
err := c.CleanKV()
suite.Nil(err)
suite.client = c
}
func (suite *ClientTestSuite) TestClientClose() {
c := suite.client
t := suite.T()
defer c.Close()
if c.rawClient.ActiveConnection().GetState() != connectivity.Ready {
t.Fatal(suite.client.rawClient.ActiveConnection().GetState())
}
}
func (suite *ClientTestSuite) TestClientValid() {
c := suite.client
t := suite.T()
if !c.Valid() {
t.Fatal("client is not valid")
}
c.Close()
if suite.client.Valid() != false {
t.Fatal("client is valid")
}
}
func (suite *ClientTestSuite) TestClientDone() {
c := suite.client
go func() {
time.Sleep(2 * time.Second)
c.Close()
}()
c.Wait.Wait()
if c.Valid() {
suite.T().Fatal("client should be invalid then")
}
}
func (suite *ClientTestSuite) TestClientCreateKV() {
tests := tests
c := suite.client
t := suite.T()
defer suite.client.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
value, err := c.Get(k)
if err != nil {
t.Fatal(err)
}
if value != expect {
t.Fatalf("expect %v but get %v", expect, value)
}
}
}
func (suite *ClientTestSuite) TestClientDeleteKV() {
tests := tests
c := suite.client
t := suite.T()
defer c.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := ErrKVPairNotFound
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
if err := c.Delete(k); err != nil {
t.Fatal(err)
}
_, err := c.Get(k)
if perrors.Cause(err) == expect {
continue
}
if err != nil {
t.Fatal(err)
}
}
}
func (suite *ClientTestSuite) TestClientGetChildrenKVList() {
tests := tests
c := suite.client
t := suite.T()
var expectKList []string
var expectVList []string
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if strings.Contains(k, prefix) {
expectKList = append(expectKList, k)
expectVList = append(expectVList, v)
}
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
}
kList, vList, err := c.GetChildrenKVList(prefix)
if err != nil {
t.Fatal(err)
}
if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) {
return
}
t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList)
}
func (suite *ClientTestSuite) TestClientWatch() {
tests := tests
c := suite.client
t := suite.T()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
wc, err := c.watch(prefix)
if err != nil {
t.Error(err)
}
events := make([]mvccpb.Event, 0)
var eCreate, eDelete mvccpb.Event
for e := range wc {
for _, event := range e.Events {
events = append(events, (mvccpb.Event)(*event))
if event.Type == mvccpb.PUT {
eCreate = (mvccpb.Event)(*event)
}
if event.Type == mvccpb.DELETE {
eDelete = (mvccpb.Event)(*event)
}
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
assert.Equal(t, 2, len(events))
assert.Contains(t, events, eCreate)
assert.Contains(t, events, eDelete)
}()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
if err := c.delete(k); err != nil {
t.Fatal(err)
}
}
c.Close()
wg.Wait()
}
func (suite *ClientTestSuite) TestClientRegisterTemp() {
c := suite.client
observeC := suite.setUpClient()
t := suite.T()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
completePath := path.Join("scott", "wang")
wc, err := observeC.watch(completePath)
if err != nil {
t.Error(err)
}
events := make([]mvccpb.Event, 0)
var eCreate, eDelete mvccpb.Event
for e := range wc {
for _, event := range e.Events {
events = append(events, (mvccpb.Event)(*event))
if event.Type == mvccpb.DELETE {
eDelete = (mvccpb.Event)(*event)
t.Logf("complete key (%s) is delete", completePath)
observeC.Close()
break
}
eCreate = (mvccpb.Event)(*event)
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
assert.Equal(t, 2, len(events))
assert.Contains(t, events, eCreate)
assert.Contains(t, events, eDelete)
}()
err := c.RegisterTemp("scott/wang", "test")
if err != nil {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
c.Close()
wg.Wait()
}
func TestClientSuite(t *testing.T) {
suite.Run(t, &ClientTestSuite{
etcdConfig: struct {
name string
endpoints []string
timeout time.Duration
heartbeat int
}{
name: "test",
endpoints: []string{"localhost:2381"},
timeout: time.Second,
heartbeat: 1,
},
})
}
......@@ -24,6 +24,7 @@ import (
import (
"github.com/apache/dubbo-getty"
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)
......@@ -34,8 +35,8 @@ import (
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
Client() *gxetcd.Client
SetClient(client *gxetcd.Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup // for wait group control, etcd client listener & etcd client container
Done() chan struct{} // for etcd client control
......@@ -60,9 +61,9 @@ LOOP:
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
clientName := RegistryETCDV3Client
clientName := gxetcd.RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoints := r.Client().endpoints
endpoints := r.Client().GetEndPoints()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
......@@ -74,13 +75,14 @@ LOOP:
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * gxetcd.ConnDelay)): // avoid connect frequent
}
err = ValidateClient(
r,
WithName(clientName),
WithEndpoints(endpoints...),
WithTimeout(timeout),
gxetcd.WithName(clientName),
gxetcd.WithEndpoints(endpoints...),
gxetcd.WithTimeout(timeout),
gxetcd.WithHeartbeat(1),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
......@@ -88,8 +90,8 @@ LOOP:
break
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
if gxetcd.MaxFailTimes <= failTimes {
failTimes = gxetcd.MaxFailTimes
}
}
}
......
......@@ -25,6 +25,7 @@ import (
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)
......@@ -35,14 +36,14 @@ import (
// nolint
type EventListener struct {
client *Client
client *gxetcd.Client
keyMapLock sync.RWMutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
// NewEventListener returns a EventListener instance
func NewEventListener(client *Client) *EventListener {
func NewEventListener(client *gxetcd.Client) *EventListener {
return &EventListener{
client: client,
keyMap: make(map[string]struct{}),
......@@ -69,7 +70,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
return false
// client ctx stop
case <-l.client.ctx.Done():
case <-l.client.GetCtx().Done():
logger.Warnf("etcd client ctx cancel")
return false
......@@ -147,7 +148,7 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
return
// client ctx stop
case <-l.client.ctx.Done():
case <-l.client.GetCtx().Done():
logger.Warnf("etcd client ctx cancel")
return
......@@ -191,7 +192,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
keyList, valueList, err := l.client.getChildren(key)
keyList, valueList, err := l.client.GetChildren(key)
if err != nil {
logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
......
......@@ -18,10 +18,15 @@
package etcdv3
import (
"net/url"
"os"
"testing"
"time"
)
import (
"github.com/coreos/etcd/embed"
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
"github.com/stretchr/testify/assert"
)
......@@ -29,6 +34,8 @@ import (
"github.com/apache/dubbo-go/remoting"
)
const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd"
var changedData = `
dubbo.consumer.request_timeout=3s
dubbo.consumer.connect_timeout=5s
......@@ -51,7 +58,40 @@ var changedData = `
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
func (suite *ClientTestSuite) TestListener() {
var etcd *embed.Etcd
func SetUpEtcdServer(t *testing.T) {
var err error
DefaultListenPeerURLs := "http://localhost:2382"
DefaultListenClientURLs := "http://localhost:2381"
lpurl, _ := url.Parse(DefaultListenPeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs)
cfg := embed.NewConfig()
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = defaultEtcdV3WorkDir
etcd, err = embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-etcd.Server.ReadyNotify():
t.Log("Server is ready!")
case <-time.After(60 * time.Second):
etcd.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}
}
func ClearEtcdServer(t *testing.T) {
etcd.Close()
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
t.Fail()
}
}
func TestListener(t *testing.T) {
tests := []struct {
input struct {
k string
......@@ -63,9 +103,9 @@ func (suite *ClientTestSuite) TestListener() {
v string
}{k: "/dubbo", v: changedData}},
}
c := suite.client
t := suite.T()
SetUpEtcdServer(t)
c, err := gxetcd.NewClient("test", []string{"localhost:2381"}, time.Second, 1)
assert.NoError(t, err)
listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
......@@ -84,11 +124,12 @@ func (suite *ClientTestSuite) TestListener() {
}
msg := <-dataListener.rc
assert.Equal(t, changedData, msg.Content)
ClearEtcdServer(t)
}
type mockDataListener struct {
eventList []remoting.Event
client *Client
client *gxetcd.Client
changedData string
rc chan remoting.Event
......
......@@ -124,6 +124,7 @@ func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error
clientConfig.Username = rc.Username
clientConfig.Password = rc.Password
clientConfig.NotLoadCacheAtStart = true
clientConfig.NamespaceId = rc.GetParam(constant.NACOS_NAMESPACE_ID, "")
configMap["clientConfig"] = clientConfig
return clients.CreateNamingClient(configMap)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment