diff --git a/go.mod b/go.mod index 422489896dee87b214c73b05b66b503ddd0019a2..aa7ade2f9d5778f088c5ffbb172d4e695f36ad55 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/apache/dubbo-go require ( + github.com/Workiva/go-datastructures v1.0.50 // indirect github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible @@ -11,15 +12,12 @@ require ( github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 github.com/gogo/protobuf v1.2.1 // indirect - github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/golang/mock v1.3.1 // indirect github.com/google/btree v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect github.com/jonboulle/clockwork v0.1.0 // indirect - github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 - github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect - github.com/juju/testing v0.0.0-20190723135506-ce30eb24acd2 // indirect github.com/magiconair/properties v1.8.1 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect @@ -30,12 +28,10 @@ require ( github.com/stretchr/testify v1.3.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect - go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect google.golang.org/grpc v1.22.1 - gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index de3c00236115aaeb28fc28b263bd1ecdd2cb4bb0..57651bf482aad9a10bf314e1b6fac77a53eb5b66 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= @@ -19,7 +20,6 @@ github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+ github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -38,11 +38,9 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -68,22 +66,14 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 h1:dMIPRDg6gi7CUp0Kj2+HxqJ5kTr1iAdzsXYIrLCNSmU= -github.com/juju/errors v0.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= -github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifGWef4WUqvXk0iRqdhdy/2uzI= -github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= -github.com/juju/testing v0.0.0-20190723135506-ce30eb24acd2 h1:Pp8RxiF4rSoXP9SED26WCfNB28/dwTDpPXS8XMJR8rc= -github.com/juju/testing v0.0.0-20190723135506-ce30eb24acd2/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -131,8 +121,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= -go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= @@ -158,7 +146,6 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -175,6 +162,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= @@ -183,12 +171,8 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU= -gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 6c67b18b119658523959fa16ab520a32599fd026..7957eb8c2da3f7054120610ce857a4392e19abf0 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -5,6 +5,10 @@ import ( "strings" ) +import ( + "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" @@ -12,10 +16,6 @@ import ( "github.com/apache/dubbo-go/remoting" ) -import ( - "github.com/juju/errors" -) - type dataListener struct { interestedURL []*common.URL listener remoting.ConfigurationListener diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go index a2637a01e46680e2615ca560938d12ded57ae968..56000e8c868c585f1e8235fdd832d522da7cb51b 100644 --- a/registry/etcdv3/listener_test.go +++ b/registry/etcdv3/listener_test.go @@ -4,13 +4,19 @@ import ( "context" "testing" "time" +) - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/remoting" +import ( + "github.com/dubbogo/getty" "github.com/stretchr/testify/suite" "go.etcd.io/etcd/embed" ) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/remoting" +) + type RegistryTestSuite struct { suite.Suite etcd *embed.Etcd @@ -30,7 +36,7 @@ func (suite *RegistryTestSuite) SetupSuite() { select { case <-e.Server.ReadyNotify(): t.Log("Server is ready!") - case <-time.After(60 * time.Second): + case <-getty.GetTimeWheel().After(60 * time.Second): e.Server.Stop() // trigger a shutdown t.Logf("Server took too long to start!") } @@ -60,9 +66,6 @@ func TestRegistrySuite(t *testing.T) { suite.Run(t, &RegistryTestSuite{}) } -type MockDataListener struct { -} - -func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) { +type MockDataListener struct{} -} +func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) {} diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index f6424a6a0e65daad7fb7f9d04c046aeac9d1f374..2bbbaa0134cc6332a401e62c0af075c4fa4bd2fd 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -11,6 +11,10 @@ import ( "time" ) +import ( + "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -22,11 +26,6 @@ import ( "github.com/apache/dubbo-go/version" ) -import ( - "github.com/juju/errors" - perrors "github.com/pkg/errors" -) - var ( processID = "" localIP = "" @@ -82,7 +81,7 @@ func (r *etcdV3Registry) RestartCallBack() bool { err := r.Register(confIf) if err != nil { logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) + confIf, errors.WithStack(err)) flag = false break } @@ -97,7 +96,7 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { if err != nil { logger.Errorf("timeout config %v is invalid ,err is %v", url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) - return nil, errors.Annotatef(err, "new etcd registry(address:%+v)", url.Location) + return nil, errors.WithMessagef(err, "new etcd registry(address:%+v)", url.Location) } logger.Infof("etcd address is: %v", url.Location) @@ -167,7 +166,7 @@ func (r *etcdV3Registry) Register(svc common.URL) error { role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) if err != nil { - return errors.Annotate(err, "get registry role") + return errors.WithMessage(err, "get registry role") } r.cltLock.Lock() @@ -180,12 +179,12 @@ func (r *etcdV3Registry) Register(svc common.URL) error { case common.PROVIDER: logger.Debugf("(provider register )Register(conf{%#v})", svc) if err := r.registerProvider(svc); err != nil { - return errors.Annotate(err, "register provider") + return errors.WithMessage(err, "register provider") } case common.CONSUMER: logger.Debugf("(consumer register )Register(conf{%#v})", svc) if err := r.registerConsumer(svc); err != nil { - return errors.Annotate(err, "register consumer") + return errors.WithMessage(err, "register consumer") } default: return errors.New(fmt.Sprintf("unknown role %d", role)) @@ -203,7 +202,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error { for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) if err := r.client.Create(tmpPath, ""); err != nil { - return errors.Annotatef(err, "create path %s in etcd", tmpPath) + return errors.WithMessagef(err, "create path %s in etcd", tmpPath) } } @@ -215,11 +214,11 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER]) if err := r.createDirIfNotExist(consumersNode); err != nil { logger.Errorf("etcd client create path %s: %v", consumersNode, err) - return errors.Annotate(err, "etcd create consumer nodes") + return errors.WithMessage(err, "etcd create consumer nodes") } providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) if err := r.createDirIfNotExist(providersNode); err != nil { - return errors.Annotate(err, "create provider node") + return errors.WithMessage(err, "create provider node") } params := url.Values{} @@ -232,7 +231,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) + return errors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } return nil @@ -252,7 +251,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) if err := r.createDirIfNotExist(providersNode); err != nil { - return errors.Annotate(err, "create provider node") + return errors.WithMessage(err, "create provider node") } params := url.Values{} @@ -285,7 +284,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) + return errors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } return nil @@ -305,7 +304,7 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) { client := r.client r.cltLock.Unlock() if client == nil { - return nil, perrors.New("etcd client broken") + return nil, errors.New("etcd client broken") } // new client & listener diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 65b70999e9d67ce14819513fedae70c643d2454f..db50b3d46dce2dca8b29582f97fee37fef6db093 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -6,16 +6,18 @@ import ( "sync" "time" ) -import ( - "github.com/apache/dubbo-go/common/logger" -) + import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" - "github.com/juju/errors" + "github.com/pkg/errors" "google.golang.org/grpc" ) +import ( + "github.com/apache/dubbo-go/common/logger" +) + const ( ConnDelay = 3 MaxFailTimes = 15 @@ -78,7 +80,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) - return errors.Annotatef(err, "new client (address:%+v)", options.endpoints) + return errors.WithMessagef(err, "new client (address:%+v)", options.endpoints) } container.SetClient(newClient) } @@ -90,7 +92,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) - return errors.Annotatef(err, "new client (address:%+v)", options.endpoints) + return errors.WithMessagef(err, "new client (address:%+v)", options.endpoints) } container.SetClient(newClient) } @@ -125,7 +127,7 @@ func newClient(name string, endpoints []string, timeout time.Duration, heartbeat DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) if err != nil { - return nil, errors.Annotate(err, "new raw client block connect to server") + return nil, errors.WithMessage(err, "new raw client block connect to server") } c := &Client{ @@ -143,7 +145,7 @@ func newClient(name string, endpoints []string, timeout time.Duration, heartbeat } if err := c.maintenanceStatus(); err != nil { - return nil, errors.Annotate(err, "client maintenance status") + return nil, errors.WithMessage(err, "client maintenance status") } return c, nil } @@ -196,7 +198,7 @@ func (c *Client) maintenanceStatus() error { s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) if err != nil { - return errors.Annotate(err, "new session with server") + return errors.WithMessage(err, "new session with server") } // must add wg before go maintenance status goroutine @@ -222,6 +224,8 @@ func (c *Client) maintenanceStatusLoop(s *concurrency.Session) { c.lock.Lock() // when etcd server stopped, cancel ctx, stop all watchers c.clean() + // when connection lose, stop client, trigger reconnect to etcd + c.stop() c.lock.Unlock() return } @@ -371,18 +375,18 @@ func (c *Client) keepAliveKV(k string, v string) error { lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) if err != nil { - return errors.Annotatef(err, "grant lease") + return errors.WithMessage(err, "grant lease") } keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) if err != nil || keepAlive == nil { c.rawClient.Revoke(c.ctx, lease.ID) - return errors.Annotate(err, "keep alive lease") + return errors.WithMessage(err, "keep alive lease") } _, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID)) if err != nil { - return errors.Annotate(err, "put k/v with lease") + return errors.WithMessage(err, "put k/v with lease") } return nil } @@ -411,7 +415,7 @@ func (c *Client) Create(k string, v string) error { err := c.put(k, v) if err != nil { - return errors.Annotatef(err, "put k/v (key: %s value %s)", k, v) + return errors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) } return nil } @@ -420,7 +424,7 @@ func (c *Client) Delete(k string) error { err := c.delete(k) if err != nil { - return errors.Annotatef(err, "delete k/v (key %s)", k) + return errors.WithMessagef(err, "delete k/v (key %s)", k) } return nil @@ -432,7 +436,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { err := c.keepAliveKV(completeKey, "") if err != nil { - return "", errors.Annotatef(err, "keepalive kv (key %s)", completeKey) + return "", errors.WithMessagef(err, "keepalive kv (key %s)", completeKey) } return completeKey, nil @@ -442,7 +446,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { kList, vList, err := c.getChildren(k) if err != nil { - return nil, nil, errors.Annotatef(err, "get key children (key %s)", k) + return nil, nil, errors.WithMessagef(err, "get key children (key %s)", k) } return kList, vList, nil } @@ -451,7 +455,7 @@ func (c *Client) Get(k string) (string, error) { v, err := c.get(k) if err != nil { - return "", errors.Annotatef(err, "get key value (key %s)", k) + return "", errors.WithMessagef(err, "get key value (key %s)", k) } return v, nil @@ -461,7 +465,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) { wc, err := c.watch(k) if err != nil { - return nil, errors.Annotatef(err, "watch prefix (key %s)", k) + return nil, errors.WithMessagef(err, "watch prefix (key %s)", k) } return wc, nil } @@ -470,7 +474,7 @@ func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { wc, err := c.watchWithPrefix(prefix) if err != nil { - return nil, errors.Annotatef(err, "watch prefix (key %s)", prefix) + return nil, errors.WithMessagef(err, "watch prefix (key %s)", prefix) } return wc, nil } diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go index ca9efdf7f679a26ed2f87718a6f8e52e3945c83b..38535e69b39c4caa567ac988825852e69f616907 100644 --- a/remoting/etcdv3/client_test.go +++ b/remoting/etcdv3/client_test.go @@ -2,7 +2,6 @@ package etcdv3 import ( "fmt" - "go.etcd.io/etcd/embed" "net/url" "path" "reflect" @@ -10,11 +9,13 @@ import ( "sync" "testing" "time" +) - "github.com/stretchr/testify/suite" - +import ( "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/juju/errors" + "github.com/pkg/errors" + "github.com/stretchr/testify/suite" + "go.etcd.io/etcd/embed" "google.golang.org/grpc/connectivity" ) diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index b205f7f01a71e75657236dcae085a0d12bef0d13..25fdbfdfe24ce5b54c3110a9ef5acf646af62b1f 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -6,22 +6,22 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" + "github.com/dubbogo/getty" + "github.com/pkg/errors" ) import ( - "github.com/dubbogo/getty" - perrors "github.com/pkg/errors" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" ) type clientFacade interface { Client() *Client SetClient(*Client) ClientLock() *sync.Mutex - WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container - GetDone() chan struct{} //for zk client control + WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container + GetDone() chan struct{} //for etcd client control RestartCallBack() bool common.Node } @@ -50,7 +50,7 @@ LOOP: r.SetClient(nil) r.ClientLock().Unlock() - // 接zk,直至成功 + // 接etcd,直至成功 failTimes = 0 for { select { @@ -65,7 +65,7 @@ LOOP: WithTimeout(timeout), ) logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", - endpoint, perrors.WithStack(err)) + endpoint, errors.WithStack(err)) if err == nil { if r.RestartCallBack() { break diff --git a/remoting/etcdv3/facede_test.go b/remoting/etcdv3/facede_test.go index 94e9056c5990e210d1e7a70b09b42034cde326e4..8718fc3b7456ff74a3e071713fbcd56356bbc8a6 100644 --- a/remoting/etcdv3/facede_test.go +++ b/remoting/etcdv3/facede_test.go @@ -4,7 +4,9 @@ import ( "context" "sync" "time" +) +import( "github.com/apache/dubbo-go/common" ) diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index 6402385b8936a8badf4ad1da82ff9e6f7e5f2bcc..12307698f2c6506ced78efd8ece38a749fbcd669 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -6,14 +6,14 @@ import ( ) import ( - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/remoting" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/pkg/errors" ) import ( - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/juju/errors" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" ) type EventListener struct { @@ -109,6 +109,9 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin case mvccpb.DELETE: logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key) return true + + default: + return false } panic("unreachable") @@ -180,7 +183,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis keyList, valueList, err := l.client.getChildren(key) if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, errors.Annotate(err, "get children")) + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, errors.WithMessage(err, "get children")) } logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) diff --git a/remoting/etcdv3/listener_test.go b/remoting/etcdv3/listener_test.go index 648cf49fb990a6a63dacf8f8e7f40104f68fa7d8..33904a21345ec0ac7ee1adbb239a0a7a44852387 100644 --- a/remoting/etcdv3/listener_test.go +++ b/remoting/etcdv3/listener_test.go @@ -2,11 +2,16 @@ package etcdv3 import ( "time" +) - "github.com/apache/dubbo-go/remoting" +import ( "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/remoting" +) + var changedData = ` dubbo.consumer.request_timeout=3s dubbo.consumer.connect_timeout=5s