diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 7957eb8c2da3f7054120610ce857a4392e19abf0..963f01b28b540e3864e6966e833f793ae3417d55 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -6,7 +6,7 @@ import ( ) import ( - "github.com/pkg/errors" + perrors "github.com/pkg/errors" ) import ( @@ -32,7 +32,7 @@ func (l *dataListener) AddInterestedURL(url *common.URL) { func (l *dataListener) DataChange(eventType remoting.Event) bool { url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):] - serviceURL, err := common.NewURL(context.TODO(), url) + serviceURL, err := common.NewURL(context.Background(), url) if err != nil { logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) return false @@ -67,10 +67,10 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { select { case <-l.registry.done: logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") - return nil, errors.New("listener stopped") + return nil, perrors.New("listener stopped") case e := <-l.events: - logger.Warnf("got etcd event %#s", e) + logger.Infof("got etcd event %#s", e) if e.ConfigType == remoting.EventTypeDel { select { case <-l.registry.done: diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go index 56000e8c868c585f1e8235fdd832d522da7cb51b..0ac8fc475e454e737e0ea03301709457561e961c 100644 --- a/registry/etcdv3/listener_test.go +++ b/registry/etcdv3/listener_test.go @@ -55,7 +55,7 @@ func (suite *RegistryTestSuite) TestDataChange() { t := suite.T() listener := NewRegistryDataListener(&MockDataListener{}) - url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") + url, _ := common.NewURL(context.Background(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") listener.AddInterestedURL(&url) if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) { t.Fatal("data change not ok") diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 2bbbaa0134cc6332a401e62c0af075c4fa4bd2fd..c716653cf9220256f28468f8e322a499eea61299 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -12,7 +12,7 @@ import ( ) import ( - "github.com/pkg/errors" + perrors "github.com/pkg/errors" ) import ( @@ -31,10 +31,12 @@ var ( localIP = "" ) +const Name = "etcdv3" + func init() { processID = fmt.Sprintf("%d", os.Getpid()) localIP, _ = utils.GetLocalIP() - extension.SetRegistry("etcdv3", newETCDV3Registry) + extension.SetRegistry(Name, newETCDV3Registry) } type etcdV3Registry struct { @@ -81,7 +83,7 @@ func (r *etcdV3Registry) RestartCallBack() bool { err := r.Register(confIf) if err != nil { logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, errors.WithStack(err)) + confIf, perrors.WithStack(err)) flag = false break } @@ -96,11 +98,10 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { if err != nil { logger.Errorf("timeout config %v is invalid ,err is %v", url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) - return nil, errors.WithMessagef(err, "new etcd registry(address:%+v)", url.Location) + return nil, perrors.WithMessagef(err, "new etcd registry(address:%+v)", url.Location) } - logger.Infof("etcd address is: %v", url.Location) - logger.Infof("time-out is: %v", timeout.String()) + logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String()) r := &etcdV3Registry{ URL: url, @@ -109,7 +110,8 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { services: make(map[string]common.URL), } - if err := etcdv3.ValidateClient(r, + if err := etcdv3.ValidateClient( + r, etcdv3.WithName(etcdv3.RegistryETCDV3Client), etcdv3.WithTimeout(timeout), etcdv3.WithEndpoints(url.Location), @@ -166,12 +168,13 @@ func (r *etcdV3Registry) Register(svc common.URL) error { role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) if err != nil { - return errors.WithMessage(err, "get registry role") + return perrors.WithMessage(err, "get registry role") } r.cltLock.Lock() if _, ok := r.services[svc.Key()]; ok { - return errors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path)) + r.cltLock.Unlock() + return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path)) } r.cltLock.Unlock() @@ -179,15 +182,15 @@ func (r *etcdV3Registry) Register(svc common.URL) error { case common.PROVIDER: logger.Debugf("(provider register )Register(conf{%#v})", svc) if err := r.registerProvider(svc); err != nil { - return errors.WithMessage(err, "register provider") + return perrors.WithMessage(err, "register provider") } case common.CONSUMER: logger.Debugf("(consumer register )Register(conf{%#v})", svc) if err := r.registerConsumer(svc); err != nil { - return errors.WithMessage(err, "register consumer") + return perrors.WithMessage(err, "register consumer") } default: - return errors.New(fmt.Sprintf("unknown role %d", role)) + return perrors.New(fmt.Sprintf("unknown role %d", role)) } r.cltLock.Lock() @@ -202,7 +205,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error { for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) if err := r.client.Create(tmpPath, ""); err != nil { - return errors.WithMessagef(err, "create path %s in etcd", tmpPath) + return perrors.WithMessagef(err, "create path %s in etcd", tmpPath) } } @@ -214,11 +217,11 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER]) if err := r.createDirIfNotExist(consumersNode); err != nil { logger.Errorf("etcd client create path %s: %v", consumersNode, err) - return errors.WithMessage(err, "etcd create consumer nodes") + return perrors.WithMessage(err, "etcd create consumer nodes") } providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) if err := r.createDirIfNotExist(providersNode); err != nil { - return errors.WithMessage(err, "create provider node") + return perrors.WithMessage(err, "create provider node") } params := url.Values{} @@ -231,7 +234,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return errors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) + return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } return nil @@ -239,8 +242,8 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { func (r *etcdV3Registry) registerProvider(svc common.URL) error { - if svc.Path == "" || len(svc.Methods) == 0 { - return errors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods)) + if len(svc.Path) == 0 || len(svc.Methods) == 0 { + return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods)) } var ( @@ -251,7 +254,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) if err := r.createDirIfNotExist(providersNode); err != nil { - return errors.WithMessage(err, "create provider node") + return perrors.WithMessage(err, "create provider node") } params := url.Values{} @@ -272,7 +275,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { logger.Debugf("provider url params:%#v", params) var host string - if svc.Ip == "" { + if len(svc.Ip) == 0 { host = localIP + ":" + svc.Port } else { host = svc.Ip + ":" + svc.Port @@ -284,7 +287,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return errors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) + return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } return nil @@ -304,7 +307,7 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) { client := r.client r.cltLock.Unlock() if client == nil { - return nil, errors.New("etcd client broken") + return nil, perrors.New("etcd client broken") } // new client & listener @@ -315,7 +318,7 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) { r.listenerLock.Unlock() } - //注册到dataconfig的interested + //register the svc to dataListener r.dataListener.AddInterestedURL(&svc) go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener) diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index cfe046a98364849f5cedf2c0de9c4e108c4ec30d..26204c74ad4305278e33d9c8b50199cfa578bf8a 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -8,17 +8,17 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" + "github.com/stretchr/testify/assert" ) import ( - "github.com/stretchr/testify/assert" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" ) func initRegistry(t *testing.T) *etcdV3Registry { - regurl, err := common.NewURL(context.TODO(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + regurl, err := common.NewURL(context.Background(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) if err != nil { t.Fatal(err) } @@ -37,7 +37,7 @@ func (suite *RegistryTestSuite) TestRegister() { t := suite.T() - url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) reg := initRegistry(t) err := reg.Register(url) @@ -52,8 +52,8 @@ func (suite *RegistryTestSuite) TestRegister() { func (suite *RegistryTestSuite) TestSubscribe() { t := suite.T() - regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) - url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + regurl, _ := common.NewURL(context.Background(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) reg := initRegistry(t) //provider register @@ -82,7 +82,7 @@ func (suite *RegistryTestSuite) TestSubscribe() { func (suite *RegistryTestSuite) TestConsumerDestory() { t := suite.T() - url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) reg := initRegistry(t) _, err := reg.Subscribe(url) @@ -102,7 +102,7 @@ func (suite *RegistryTestSuite) TestProviderDestory() { t := suite.T() reg := initRegistry(t) - url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) reg.Register(url) //listener.Close() diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index fff2271b0f09bc2f83335fc3323de64e49219920..57d1211fe30e00dcb1ad16733f36b7969ebaf505 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -10,7 +10,7 @@ import ( import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" - "github.com/pkg/errors" + perrors "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -25,8 +25,8 @@ const ( ) var ( - ErrNilETCDV3Client = errors.New("etcd raw client is nil") // full describe the ERR - ErrKVPairNotFound = errors.New("k/v pair not found") + ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR + ErrKVPairNotFound = perrors.New("k/v pair not found") ) type Options struct { @@ -80,7 +80,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) - return errors.WithMessagef(err, "new client (address:%+v)", options.endpoints) + return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) } container.SetClient(newClient) } @@ -92,7 +92,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) - return errors.WithMessagef(err, "new client (address:%+v)", options.endpoints) + return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) } container.SetClient(newClient) } @@ -127,7 +127,7 @@ func newClient(name string, endpoints []string, timeout time.Duration, heartbeat DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) if err != nil { - return nil, errors.WithMessage(err, "new raw client block connect to server") + return nil, perrors.WithMessage(err, "new raw client block connect to server") } c := &Client{ @@ -145,7 +145,7 @@ func newClient(name string, endpoints []string, timeout time.Duration, heartbeat } if err := c.maintenanceStatus(); err != nil { - return nil, errors.WithMessage(err, "client maintenance status") + return nil, perrors.WithMessage(err, "client maintenance status") } return c, nil } @@ -198,7 +198,7 @@ func (c *Client) maintenanceStatus() error { s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) if err != nil { - return errors.WithMessage(err, "new session with server") + return perrors.WithMessage(err, "new session with server") } // must add wg before go maintenance status goroutine @@ -375,18 +375,18 @@ func (c *Client) keepAliveKV(k string, v string) error { lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) if err != nil { - return errors.WithMessage(err, "grant lease") + return perrors.WithMessage(err, "grant lease") } keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) if err != nil || keepAlive == nil { c.rawClient.Revoke(c.ctx, lease.ID) - return errors.WithMessage(err, "keep alive lease") + return perrors.WithMessage(err, "keep alive lease") } _, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID)) if err != nil { - return errors.WithMessage(err, "put k/v with lease") + return perrors.WithMessage(err, "put k/v with lease") } return nil } @@ -415,7 +415,7 @@ func (c *Client) Create(k string, v string) error { err := c.put(k, v) if err != nil { - return errors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) + return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) } return nil } @@ -424,7 +424,7 @@ func (c *Client) Delete(k string) error { err := c.delete(k) if err != nil { - return errors.WithMessagef(err, "delete k/v (key %s)", k) + return perrors.WithMessagef(err, "delete k/v (key %s)", k) } return nil @@ -436,7 +436,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { err := c.keepAliveKV(completeKey, "") if err != nil { - return "", errors.WithMessagef(err, "keepalive kv (key %s)", completeKey) + return "", perrors.WithMessagef(err, "keepalive kv (key %s)", completeKey) } return completeKey, nil @@ -446,7 +446,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { kList, vList, err := c.getChildren(k) if err != nil { - return nil, nil, errors.WithMessagef(err, "get key children (key %s)", k) + return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k) } return kList, vList, nil } @@ -455,7 +455,7 @@ func (c *Client) Get(k string) (string, error) { v, err := c.get(k) if err != nil { - return "", errors.WithMessagef(err, "get key value (key %s)", k) + return "", perrors.WithMessagef(err, "get key value (key %s)", k) } return v, nil @@ -465,7 +465,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) { wc, err := c.watch(k) if err != nil { - return nil, errors.WithMessagef(err, "watch prefix (key %s)", k) + return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k) } return wc, nil } @@ -474,7 +474,7 @@ func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { wc, err := c.watchWithPrefix(prefix) if err != nil { - return nil, errors.WithMessagef(err, "watch prefix (key %s)", prefix) + return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix) } return wc, nil } diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go index 38535e69b39c4caa567ac988825852e69f616907..187789e0abfac6a0e195bebd68ce4b91e0f9bdec 100644 --- a/remoting/etcdv3/client_test.go +++ b/remoting/etcdv3/client_test.go @@ -13,7 +13,7 @@ import ( import ( "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/pkg/errors" + perrors "github.com/pkg/errors" "github.com/stretchr/testify/suite" "go.etcd.io/etcd/embed" "google.golang.org/grpc/connectivity" @@ -210,7 +210,7 @@ func (suite *ClientTestSuite) TestClientDeleteKV() { } _, err := c.Get(k) - if errors.Cause(err) == expect { + if perrors.Cause(err) == expect { continue } diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index 25fdbfdfe24ce5b54c3110a9ef5acf646af62b1f..e75b39d6bcd7f67f7606c6b212f59e7a42178fd8 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -7,7 +7,7 @@ import ( import ( "github.com/dubbogo/getty" - "github.com/pkg/errors" + perrors "github.com/pkg/errors" ) import ( @@ -50,22 +50,23 @@ LOOP: r.SetClient(nil) r.ClientLock().Unlock() - // 接etcd,直至成功 + // try to connect to etcd, failTimes = 0 for { select { case <-r.GetDone(): logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") break LOOP - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 防止疯狂重连etcd + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent } - err = ValidateClient(r, + err = ValidateClient( + r, WithName(clientName), WithEndpoints(endpoint), WithTimeout(timeout), ) logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", - endpoint, errors.WithStack(err)) + endpoint, perrors.WithStack(err)) if err == nil { if r.RestartCallBack() { break diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index 12307698f2c6506ced78efd8ece38a749fbcd669..59273af554a63e5fc907ba5a30bb1e18bb22c0f5 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -8,7 +8,7 @@ import ( import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/pkg/errors" + perrors "github.com/pkg/errors" ) import ( @@ -183,20 +183,18 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis keyList, valueList, err := l.client.getChildren(key) if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, errors.WithMessage(err, "get children")) + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) } logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) for i, k := range keyList { logger.Infof("got children list key -> %s", k) - if !listener.DataChange(remoting.Event{ + listener.DataChange(remoting.Event{ Path: k, Action: remoting.EventTypeAdd, Content: valueList[i], - }) { - continue - } + }) } logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)