Skip to content
Snippets Groups Projects
Commit b0f3e195 authored by scott.wang's avatar scott.wang
Browse files

Fix deadlock && len(string) ==0 && rename errors-> perrors

parent 86a197a0
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@ import (
)
import (
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
)
import (
......@@ -32,7 +32,7 @@ func (l *dataListener) AddInterestedURL(url *common.URL) {
func (l *dataListener) DataChange(eventType remoting.Event) bool {
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.TODO(), url)
serviceURL, err := common.NewURL(context.Background(), url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
......@@ -67,10 +67,10 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
select {
case <-l.registry.done:
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, errors.New("listener stopped")
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Warnf("got etcd event %#s", e)
logger.Infof("got etcd event %#s", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.done:
......
......@@ -55,7 +55,7 @@ func (suite *RegistryTestSuite) TestDataChange() {
t := suite.T()
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
url, _ := common.NewURL(context.Background(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) {
t.Fatal("data change not ok")
......
......@@ -12,7 +12,7 @@ import (
)
import (
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
)
import (
......@@ -31,10 +31,12 @@ var (
localIP = ""
)
const Name = "etcdv3"
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = utils.GetLocalIP()
extension.SetRegistry("etcdv3", newETCDV3Registry)
extension.SetRegistry(Name, newETCDV3Registry)
}
type etcdV3Registry struct {
......@@ -81,7 +83,7 @@ func (r *etcdV3Registry) RestartCallBack() bool {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, errors.WithStack(err))
confIf, perrors.WithStack(err))
flag = false
break
}
......@@ -96,11 +98,10 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return nil, errors.WithMessagef(err, "new etcd registry(address:%+v)", url.Location)
return nil, perrors.WithMessagef(err, "new etcd registry(address:%+v)", url.Location)
}
logger.Infof("etcd address is: %v", url.Location)
logger.Infof("time-out is: %v", timeout.String())
logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String())
r := &etcdV3Registry{
URL: url,
......@@ -109,7 +110,8 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
services: make(map[string]common.URL),
}
if err := etcdv3.ValidateClient(r,
if err := etcdv3.ValidateClient(
r,
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
etcdv3.WithEndpoints(url.Location),
......@@ -166,12 +168,13 @@ func (r *etcdV3Registry) Register(svc common.URL) error {
role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if err != nil {
return errors.WithMessage(err, "get registry role")
return perrors.WithMessage(err, "get registry role")
}
r.cltLock.Lock()
if _, ok := r.services[svc.Key()]; ok {
return errors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
r.cltLock.Unlock()
return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
}
r.cltLock.Unlock()
......@@ -179,15 +182,15 @@ func (r *etcdV3Registry) Register(svc common.URL) error {
case common.PROVIDER:
logger.Debugf("(provider register )Register(conf{%#v})", svc)
if err := r.registerProvider(svc); err != nil {
return errors.WithMessage(err, "register provider")
return perrors.WithMessage(err, "register provider")
}
case common.CONSUMER:
logger.Debugf("(consumer register )Register(conf{%#v})", svc)
if err := r.registerConsumer(svc); err != nil {
return errors.WithMessage(err, "register consumer")
return perrors.WithMessage(err, "register consumer")
}
default:
return errors.New(fmt.Sprintf("unknown role %d", role))
return perrors.New(fmt.Sprintf("unknown role %d", role))
}
r.cltLock.Lock()
......@@ -202,7 +205,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error {
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
if err := r.client.Create(tmpPath, ""); err != nil {
return errors.WithMessagef(err, "create path %s in etcd", tmpPath)
return perrors.WithMessagef(err, "create path %s in etcd", tmpPath)
}
}
......@@ -214,11 +217,11 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
if err := r.createDirIfNotExist(consumersNode); err != nil {
logger.Errorf("etcd client create path %s: %v", consumersNode, err)
return errors.WithMessage(err, "etcd create consumer nodes")
return perrors.WithMessage(err, "etcd create consumer nodes")
}
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return errors.WithMessage(err, "create provider node")
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
......@@ -231,7 +234,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return errors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
......@@ -239,8 +242,8 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
func (r *etcdV3Registry) registerProvider(svc common.URL) error {
if svc.Path == "" || len(svc.Methods) == 0 {
return errors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
if len(svc.Path) == 0 || len(svc.Methods) == 0 {
return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
}
var (
......@@ -251,7 +254,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return errors.WithMessage(err, "create provider node")
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
......@@ -272,7 +275,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
logger.Debugf("provider url params:%#v", params)
var host string
if svc.Ip == "" {
if len(svc.Ip) == 0 {
host = localIP + ":" + svc.Port
} else {
host = svc.Ip + ":" + svc.Port
......@@ -284,7 +287,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return errors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
......@@ -304,7 +307,7 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, errors.New("etcd client broken")
return nil, perrors.New("etcd client broken")
}
// new client & listener
......@@ -315,7 +318,7 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
r.listenerLock.Unlock()
}
//注册到dataconfig的interested
//register the svc to dataListener
r.dataListener.AddInterestedURL(&svc)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener)
......
......@@ -8,17 +8,17 @@ import (
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/stretchr/testify/assert"
)
import (
"github.com/stretchr/testify/assert"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func initRegistry(t *testing.T) *etcdV3Registry {
regurl, err := common.NewURL(context.TODO(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
regurl, err := common.NewURL(context.Background(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
......@@ -37,7 +37,7 @@ func (suite *RegistryTestSuite) TestRegister() {
t := suite.T()
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
err := reg.Register(url)
......@@ -52,8 +52,8 @@ func (suite *RegistryTestSuite) TestRegister() {
func (suite *RegistryTestSuite) TestSubscribe() {
t := suite.T()
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
regurl, _ := common.NewURL(context.Background(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
//provider register
......@@ -82,7 +82,7 @@ func (suite *RegistryTestSuite) TestSubscribe() {
func (suite *RegistryTestSuite) TestConsumerDestory() {
t := suite.T()
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
_, err := reg.Subscribe(url)
......@@ -102,7 +102,7 @@ func (suite *RegistryTestSuite) TestProviderDestory() {
t := suite.T()
reg := initRegistry(t)
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg.Register(url)
//listener.Close()
......
......@@ -10,7 +10,7 @@ import (
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
"google.golang.org/grpc"
)
......@@ -25,8 +25,8 @@ const (
)
var (
ErrNilETCDV3Client = errors.New("etcd raw client is nil") // full describe the ERR
ErrKVPairNotFound = errors.New("k/v pair not found")
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
type Options struct {
......@@ -80,7 +80,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return errors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
}
container.SetClient(newClient)
}
......@@ -92,7 +92,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return errors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
}
container.SetClient(newClient)
}
......@@ -127,7 +127,7 @@ func newClient(name string, endpoints []string, timeout time.Duration, heartbeat
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, errors.WithMessage(err, "new raw client block connect to server")
return nil, perrors.WithMessage(err, "new raw client block connect to server")
}
c := &Client{
......@@ -145,7 +145,7 @@ func newClient(name string, endpoints []string, timeout time.Duration, heartbeat
}
if err := c.maintenanceStatus(); err != nil {
return nil, errors.WithMessage(err, "client maintenance status")
return nil, perrors.WithMessage(err, "client maintenance status")
}
return c, nil
}
......@@ -198,7 +198,7 @@ func (c *Client) maintenanceStatus() error {
s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
if err != nil {
return errors.WithMessage(err, "new session with server")
return perrors.WithMessage(err, "new session with server")
}
// must add wg before go maintenance status goroutine
......@@ -375,18 +375,18 @@ func (c *Client) keepAliveKV(k string, v string) error {
lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
if err != nil {
return errors.WithMessage(err, "grant lease")
return perrors.WithMessage(err, "grant lease")
}
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
c.rawClient.Revoke(c.ctx, lease.ID)
return errors.WithMessage(err, "keep alive lease")
return perrors.WithMessage(err, "keep alive lease")
}
_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
if err != nil {
return errors.WithMessage(err, "put k/v with lease")
return perrors.WithMessage(err, "put k/v with lease")
}
return nil
}
......@@ -415,7 +415,7 @@ func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
if err != nil {
return errors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}
return nil
}
......@@ -424,7 +424,7 @@ func (c *Client) Delete(k string) error {
err := c.delete(k)
if err != nil {
return errors.WithMessagef(err, "delete k/v (key %s)", k)
return perrors.WithMessagef(err, "delete k/v (key %s)", k)
}
return nil
......@@ -436,7 +436,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
err := c.keepAliveKV(completeKey, "")
if err != nil {
return "", errors.WithMessagef(err, "keepalive kv (key %s)", completeKey)
return "", perrors.WithMessagef(err, "keepalive kv (key %s)", completeKey)
}
return completeKey, nil
......@@ -446,7 +446,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
if err != nil {
return nil, nil, errors.WithMessagef(err, "get key children (key %s)", k)
return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k)
}
return kList, vList, nil
}
......@@ -455,7 +455,7 @@ func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
if err != nil {
return "", errors.WithMessagef(err, "get key value (key %s)", k)
return "", perrors.WithMessagef(err, "get key value (key %s)", k)
}
return v, nil
......@@ -465,7 +465,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
if err != nil {
return nil, errors.WithMessagef(err, "watch prefix (key %s)", k)
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}
return wc, nil
}
......@@ -474,7 +474,7 @@ func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
if err != nil {
return nil, errors.WithMessagef(err, "watch prefix (key %s)", prefix)
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}
return wc, nil
}
......@@ -13,7 +13,7 @@ import (
import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/embed"
"google.golang.org/grpc/connectivity"
......@@ -210,7 +210,7 @@ func (suite *ClientTestSuite) TestClientDeleteKV() {
}
_, err := c.Get(k)
if errors.Cause(err) == expect {
if perrors.Cause(err) == expect {
continue
}
......
......@@ -7,7 +7,7 @@ import (
import (
"github.com/dubbogo/getty"
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
)
import (
......@@ -50,22 +50,23 @@ LOOP:
r.SetClient(nil)
r.ClientLock().Unlock()
// 接etcd,直至成功
// try to connect to etcd,
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 防止疯狂重连etcd
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
}
err = ValidateClient(r,
err = ValidateClient(
r,
WithName(clientName),
WithEndpoints(endpoint),
WithTimeout(timeout),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoint, errors.WithStack(err))
endpoint, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
......
......@@ -8,7 +8,7 @@ import (
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
)
import (
......@@ -183,20 +183,18 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
keyList, valueList, err := l.client.getChildren(key)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, errors.WithMessage(err, "get children"))
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList {
logger.Infof("got children list key -> %s", k)
if !listener.DataChange(remoting.Event{
listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
Content: valueList[i],
}) {
continue
}
})
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment