diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 7918e2df0fbb2c488569791beefa6d872ca22564..3178f4b42eabee6c4bb6181a3afc65d3ea30fb7f 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -41,7 +41,7 @@ import ( _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" - _ "github.com/apache/dubbo-go/registry/zookeeper" + _ "github.com/apache/dubbo-go/registry/etcd" ) var ( diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index 7a193ea64d783220f016c6b6220d602a5c586129..532d40dae4c7f65ef3fa90687744bd8c43a86ee4 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -18,15 +18,9 @@ application_config: registries : "hangzhouzk": - protocol: "zookeeper" + protocol: "etcdv3" timeout : "3s" - address: "127.0.0.1:2181" - username: "" - password: "" - "shanghaizk": - protocol: "zookeeper" - timeout : "3s" - address: "127.0.0.1:2182" + address: "127.0.0.1:2379" username: "" password: "" diff --git a/examples/dubbo/go-server/app/server.go b/examples/dubbo/go-server/app/server.go index 788fc665b872d4f7f1429122cee581a6ab4979e7..ede44ad0191bcb958cd9097478b0f2ded0c58588 100644 --- a/examples/dubbo/go-server/app/server.go +++ b/examples/dubbo/go-server/app/server.go @@ -40,7 +40,7 @@ import ( _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" - _ "github.com/apache/dubbo-go/registry/zookeeper" + _ "github.com/apache/dubbo-go/registry/etcd" ) var ( diff --git a/examples/dubbo/go-server/app/user.go b/examples/dubbo/go-server/app/user.go index 3c261dc029022fe8a3a80a4007e5aa132643eb7c..6e4474124c47610b11e95cf77d40f9084c2b9d58 100644 --- a/examples/dubbo/go-server/app/user.go +++ b/examples/dubbo/go-server/app/user.go @@ -105,7 +105,7 @@ func init() { userMap.user["A000"] = DefaultUser userMap.user["A001"] = User{Id: "001", Name: "ZhangSheng", Age: 18, Sex: Gender(MAN)} userMap.user["A002"] = User{Id: "002", Name: "Lily", Age: 20, Sex: Gender(WOMAN)} - userMap.user["A003"] = User{Id: "113", Name: "Moorse", Age: 30, Sex: Gender(WOMAN)} + userMap.user["A003"] = User{Id: "113", Name: "ScottWang", Age: 30, Sex: Gender(WOMAN)} for k, v := range userMap.user { v.Time = time.Now() userMap.user[k] = v diff --git a/examples/dubbo/go-server/profiles/release/server.yml b/examples/dubbo/go-server/profiles/release/server.yml index 3db12902f1d349e1703f2f977c682c61b0d35938..e3457972ad47f282ea18dc1c2f551503456c1739 100644 --- a/examples/dubbo/go-server/profiles/release/server.yml +++ b/examples/dubbo/go-server/profiles/release/server.yml @@ -12,15 +12,9 @@ application_config: registries : "hangzhouzk": - protocol: "zookeeper" + protocol: "etcdv3" timeout : "3s" - address: "127.0.0.1:2181" - username: "" - password: "" - "shanghaizk": - protocol: "zookeeper" - timeout : "3s" - address: "127.0.0.1:2182" + address: "127.0.0.1:2379" username: "" password: "" diff --git a/go.mod b/go.mod index d31dc9f3b9f87c787e2ad0bef26f082d74cb8186..42ed1f0df35aa7e9ec9147e8c114bfbcd32bf6e5 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,41 @@ module github.com/apache/dubbo-go require ( - github.com/AlexStocks/goext v0.3.2 - github.com/AlexStocks/log4go v1.0.2 // indirect - github.com/coreos/etcd v3.3.13+incompatible // indirect + github.com/coreos/bbolt v1.3.3 // indirect + github.com/coreos/etcd v3.3.13+incompatible + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/dubbogo/getty v1.2.0 github.com/dubbogo/gost v1.1.1 github.com/dubbogo/hessian2 v1.2.0 github.com/gogo/protobuf v1.2.1 // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/google/btree v1.0.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect + github.com/jonboulle/clockwork v0.1.0 // indirect github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 + github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect + github.com/juju/testing v0.0.0-20190613124551-e81189438503 // indirect github.com/magiconair/properties v1.8.1 - github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect - github.com/mattn/go-isatty v0.0.8 // indirect github.com/pkg/errors v0.8.1 + github.com/prometheus/client_golang v1.0.0 // indirect + github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90 // indirect github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec + github.com/soheilhy/cmux v0.1.4 // indirect github.com/stretchr/testify v1.3.0 + github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect + github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 - google.golang.org/grpc v1.22.0 // indirect + golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect + golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 // indirect + google.golang.org/grpc v1.22.0 + gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index d0dd872d571c2ff77dfbba410a09f9361c459cc1..e75ef64986fc772f2396a53c1cfc581ebb4f1257 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,26 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0= -github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkglDGSl3Hc= -github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk= -github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY= +github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dubbogo/getty v1.2.0 h1:LmP/R/yCYp5/ubUTm5Xa7QwE3EnFFSjguJH8DJruJ70= github.com/dubbogo/getty v1.2.0/go.mod h1:p9Pyk18a+5Aa0GZ546aCzlehEfvEof0jAF0+QQcEPYQ= github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= @@ -17,36 +28,101 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw= github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= +github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= +github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 h1:aQ4ahUIm1FHeZ2XsL1lbzkn9NxhkOUeiMWITOA7OOms= github.com/juju/errors v1.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= +github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifGWef4WUqvXk0iRqdhdy/2uzI= +github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= +github.com/juju/testing v0.0.0-20190613124551-e81189438503 h1:ZUgTbk8oHgP0jpMieifGC9Lv47mHn8Pb3mFX3/Ew4iY= +github.com/juju/testing v0.0.0-20190613124551-e81189438503/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= -github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e h1:hB2xlXdHp/pmPZq0y3QnmWAArdw9PqbmotexnWx/FU8= -github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= -github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= -github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90 h1:Cov9QkEXNhh8QeXoICvORjJ4RrpyvXmSf7rHSpS+ZfI= +github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= @@ -55,29 +131,56 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 h1:RMGusaKverhgGR5KBERIKiTyWoWHRd84GCtsNlvLvIo= +golang.org/x/time v0.0.0-20190513212739-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU= +gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/registry/etcd/listener.go b/registry/etcd/listener.go index a58612da2f4a052750f55428158357de50abd733..c0a367b1cc3f5541d1fec928b88e4cb07a20ccbd 100644 --- a/registry/etcd/listener.go +++ b/registry/etcd/listener.go @@ -54,7 +54,7 @@ func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { - case <-l.registry.ctx.Done(): + case <-l.registry.done: logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") return nil, errors.New("listener stopped") @@ -62,7 +62,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { logger.Debugf("got etcd event %s", e) if e.ConfigType == remoting.EventTypeDel { select { - case <-l.registry.ctx.Done(): + case <-l.registry.done: logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) default: } @@ -76,4 +76,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } func (l *configurationListener) Close() { + l.registry.wg.Done() } diff --git a/registry/etcd/registry.go b/registry/etcd/registry.go index 677fab8bd158e16f08d928cec1d52159ab125f7d..b1508449b11462a8b9ca3b012b9196cbd5bb9052 100644 --- a/registry/etcd/registry.go +++ b/registry/etcd/registry.go @@ -1,9 +1,7 @@ package etcd import ( - "context" "fmt" - "github.com/apache/dubbo-go/remoting" "net/url" "os" "path" @@ -12,16 +10,16 @@ import ( "sync" "time" - etcd "github.com/AlexStocks/goext/database/etcd" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/etcdv3" "github.com/apache/dubbo-go/version" "github.com/juju/errors" - "go.etcd.io/etcd/clientv3" + perrors "github.com/pkg/errors" ) var ( @@ -32,23 +30,62 @@ var ( func init() { processID = fmt.Sprintf("%d", os.Getpid()) localIP, _ = utils.GetLocalIP() - extension.SetRegistry("etcd", newETCDV3Registry) + extension.SetRegistry("etcdv3", newETCDV3Registry) } type etcdV3Registry struct { *common.URL birth int64 // time of file birth, seconds since Epoch; 0 if unknown - ctx context.Context - cancel context.CancelFunc + cltLock sync.Mutex + client *etcdv3.Client + services map[string]common.URL // service name + protocol -> service config - rawClient *clientv3.Client - client *etcd.Client - - dataListener remoting.DataListener - configListener remoting.ConfigurationListener + listenerLock sync.Mutex + listener *etcdv3.EventListener + dataListener *dataListener + configListener *configurationListener servicesCache sync.Map // service name + protocol -> service config + + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} +} + +func (r *etcdV3Registry) Client() *etcdv3.Client { + return r.client +} +func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { + r.client = client +} +func (r *etcdV3Registry) ClientLock() *sync.Mutex { + return &r.cltLock +} +func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup { + return &r.wg +} +func (r *etcdV3Registry) GetDone() chan struct{} { + return r.done +} +func (r *etcdV3Registry) RestartCallBack() bool { + + services := []common.URL{} + for _, confIf := range r.services { + services = append(services, confIf) + } + + flag := true + for _, confIf := range services { + err := r.Register(confIf) + if err != nil { + logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf, perrors.WithStack(err)) + flag = false + break + } + logger.Infof("success to re-register service :%v", confIf.Key()) + } + return flag } func newETCDV3Registry(url *common.URL) (registry.Registry, error) { @@ -63,56 +100,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { logger.Infof("etcd address is: %v", url.Location) logger.Infof("time-out is: %v", timeout.String()) - rawClient, err := clientv3.New(clientv3.Config{ - Endpoints: []string{url.Location}, - DialTimeout: timeout, - //DialOptions: []grpc.DialOption{grpc.WithBlock()}, - }) - if err != nil { - return nil, errors.Annotate(err, "block connect to etcd server") - } - - rawClient.ActiveConnection() - - rootCtx, cancel := context.WithCancel(context.Background()) - client, err := etcd.NewClient(rawClient, etcd.WithTTL(time.Second), etcd.WithContext(rootCtx)) - if err != nil { - return nil, errors.Annotate(err, "new etcd client") + r := &etcdV3Registry{ + URL: url, + birth: time.Now().UnixNano(), + done: make(chan struct{}), } - r := etcdV3Registry{ - URL: url, - ctx: rootCtx, - cancel: cancel, - rawClient: rawClient, - client: client, - servicesCache: sync.Map{}, + if err := etcdv3.ValidateClient(r, etcdv3.WithName(etcdv3.RegistryETCDV3Client)); err != nil { + return nil, err } - go r.keepAlive() - return &r, nil -} + r.wg.Add(1) + go etcdv3.HandleClientRestart(r) -func (r *etcdV3Registry) keepAlive() error { + r.listener = etcdv3.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) - resp, err := r.client.KeepAlive() - if err != nil { - return errors.Annotate(err, "keep alive") - } - go func() { - for { - select { - case _, ok := <-resp: - if !ok { - logger.Errorf("etcd server stop") - r.cancel() - return - } - - } - } - }() - return nil + return r, nil } func (r *etcdV3Registry) GetUrl() common.URL { @@ -122,7 +127,7 @@ func (r *etcdV3Registry) GetUrl() common.URL { func (r *etcdV3Registry) IsAvailable() bool { select { - case <-r.ctx.Done(): + case <-r.done: return false default: return true @@ -130,20 +135,21 @@ func (r *etcdV3Registry) IsAvailable() bool { } func (r *etcdV3Registry) Destroy() { + + if r.configListener != nil { + r.configListener.Close() + } r.stop() } func (r *etcdV3Registry) stop() { - // close current client - r.rawClient.Close() + close(r.done) - // cancel ctx - r.cancel() + // close current client + r.client.Close() - r.rawClient = nil - r.ctx = nil - r.cancel = nil + r.client = nil r.servicesCache.Range(func(key, value interface{}) bool { r.servicesCache.Delete(key) return true @@ -180,24 +186,12 @@ func (r *etcdV3Registry) Register(svc common.URL) error { return nil } -func (r *etcdV3Registry) createKVIfNotExist(k string, v string) error { - - _, err := r.rawClient.Txn(r.ctx). - If(clientv3.Compare(clientv3.Version(k), "<", 1)). - Then(clientv3.OpPut(k, v)). - Commit() - if err != nil { - return errors.Annotatef(err, "etcd create k %s v %s", k, v) - } - return nil -} - func (r *etcdV3Registry) createDirIfNotExist(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) - if err := r.createKVIfNotExist(tmpPath, ""); err != nil { + if err := r.client.Create(tmpPath, ""); err != nil { return errors.Annotatef(err, "create path %s in etcd", tmpPath) } } @@ -226,7 +220,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) - if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil { + if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } @@ -279,7 +273,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode())) dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) - if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil { + if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } @@ -288,8 +282,33 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) { + var ( + configListener *configurationListener + ) + + r.listenerLock.Lock() + configListener = r.configListener + r.listenerLock.Unlock() + if r.listener == nil { + r.cltLock.Lock() + client := r.client + r.cltLock.Unlock() + if client == nil { + return nil, perrors.New("zk connection broken") + } + + // new client & listener + listener := etcdv3.NewEventListener(r.client) + + r.listenerLock.Lock() + r.listener = listener + r.listenerLock.Unlock() + } + + //注册到dataconfig的interested + r.dataListener.AddInterestedURL(&svc) - logger.Infof("subscribe svc: %s", svc) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener) - return nil, nil + return configListener, nil } diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 8cfb47f7cf5c6433d648b3ad3f76935f48bd376b..a6eaee8af323a1506a7d385bf3dbc72fc1ab4b76 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -2,13 +2,13 @@ package etcdv3 import ( "context" + "fmt" "path" "sync" "time" ) import ( - "github.com/AlexStocks/goext/database/etcd" "github.com/juju/errors" perrors "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -24,10 +24,11 @@ import ( const ( ConnDelay = 3 MaxFailTimes = 15 + RegistryETCDV3Client = "etcd registry" ) var ( - ErrNilETCDV3ClientConn = errors.New("etcdv3client{conn} is nil") + ErrNilETCDV3ClientConn = errors.New("etcd clientset {conn} is nil") // full describe the ERR ErrKVPairNotFound = errors.New("k/v pair not found") ) @@ -36,7 +37,7 @@ type clientSet struct { lock sync.RWMutex // protect all element in // clientSet - gxClient *gxetcd.Client + //gxClient *gxetcd.Client rawClient *clientv3.Client // client controller used to change client behave @@ -60,17 +61,17 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error { DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) if err != nil { - return errors.Annotate(err, "block connect to etcd server") + return errors.Annotate(err, "new raw client block connect to server") } // share context - gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx)) - if err != nil { - return errors.Annotate(err, "new etcd client") - } + //gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx)) + //if err != nil { + // return errors.Annotate(err, "new gxetcd client") + //} out := &clientSet{ - gxClient: gxClient, + //gxClient: gxClient, rawClient: client, ctx: rootCtx, cancel: cancel, @@ -92,12 +93,20 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error { func (c *clientSet) maintenanceStatus() error { c.c.Wait.Add(1) - aliveResp, err := c.gxClient.KeepAlive() + + lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) if err != nil { - return errors.Annotatef(err, "etcd keep alive") + return errors.Annotatef(err, "grant lease") } + + keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) + if err != nil || keepAlive == nil { + c.rawClient.Revoke(c.ctx, lease.ID) + return errors.Annotate(err, "keep alive lease") + } + // start maintenance the connection status - go c.maintenanceStatusLoop(aliveResp) + go c.maintenanceStatusLoop(keepAlive) return nil } @@ -105,7 +114,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl defer func() { c.c.Wait.Done() - logger.Infof("etcd {path:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name) + logger.Infof("etcdv3 clientset {endpoints:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name) }() // get signal, will start maintenanceStatusLoop @@ -118,7 +127,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl return case <-c.ctx.Done(): // client context exit - logger.Warn("etcd clientSet context done") + logger.Warn("etcdv3 clientset context done") return case msg, ok := <-aliveResp: // etcd connection lose @@ -126,7 +135,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl // if clientSet.Client is nil, it will panic if !ok { - logger.Warnf("etcd server stop at term: %#v", msg) + logger.Warnf("etcdv3 server stop at term: %#v", msg) c.c.Lock() // hold the c.Client lock c.c.cs.clean() @@ -213,10 +222,25 @@ func (c *clientSet) getChildrenW(k string) ([]string, []string, clientv3.WatchCh return nil, nil, nil, ErrNilETCDV3ClientConn } - wc := c.rawClient.Watch(c.ctx, k, clientv3.WithPrefix()) + wc,err := c.watchWithPrefix(k) + if err != nil{ + return nil, nil, nil,errors.Annotate(err, "watch with prefix") + } return kList, vList, wc, nil } +func (c *clientSet) watchWithPrefix(prefix string) (clientv3.WatchChan, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, ErrNilETCDV3ClientConn + } + + return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil +} + func (c *clientSet) watch(k string) (clientv3.WatchChan, error) { c.lock.RLock() @@ -228,7 +252,7 @@ func (c *clientSet) watch(k string) (clientv3.WatchChan, error) { _, err := c.get(k) if err != nil { - return nil, errors.Annotatef(err, "watch pre check key %s", k) + return nil, errors.Annotatef(err, "pre check key %s", k) } return c.rawClient.Watch(c.ctx, k), nil @@ -296,11 +320,10 @@ func (c *clientSet) keepAliveKV(k string, v string) error { // this method will hold clientset lock func (c *clientSet) clean() { c.lock.Lock() - if c.gxClient != nil { + if c.rawClient != nil { - // close gx client, it will close raw etcdv3 client - c.gxClient.Close() - c.gxClient = nil + // close raw etcdv3 client + c.rawClient.Close() c.rawClient = nil // cancel all context @@ -373,8 +396,6 @@ func ValidateClient(container clientFacade, opts ...Option) error { opt(options) } - err = nil - lock := container.ClientLock() url := container.GetUrl() @@ -388,13 +409,13 @@ func ValidateClient(container clientFacade, opts ...Option) error { if err != nil { logger.Errorf("timeout config %v is invalid ,err is %v", url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) - return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location) + return errors.Annotate(err, "timeout parse") } newClient, err := newClient(options.name, []string{url.Location}, timeout) if err != nil { - logger.Warnf("newETCDV3Client(name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}", + logger.Warnf("new client (name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}", options.name, url.Location, timeout.String(), err) - return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location) + return errors.Annotatef(err, "new client (address:%+v)", url.Location) } container.SetClient(newClient) } @@ -403,12 +424,12 @@ func ValidateClient(container clientFacade, opts ...Option) error { err = newClientSet(container.Client().endpoints, container.Client().timeout, container.Client()) if err != nil { - return errors.Annotate(err, "new client set") + return errors.Annotate(err, "new clientset") } container.Client().cs.startMaintenanceChan <- struct{}{} } - return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.PrimitiveURL) + return nil } func newClient(name string, endpoints []string, timeout time.Duration) (*Client, error) { @@ -427,7 +448,7 @@ func newClient(name string, endpoints []string, timeout time.Duration) (*Client, err = newClientSet(endpoints, timeout, out) if err != nil { - return nil, errors.Annotate(err, "new client set") + return nil, errors.Annotate(err, "new clientset") } // start maintenanceChan @@ -445,19 +466,25 @@ func (c *Client) stop() bool { return false } -func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) { + +func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) error { if key == "" || wc == nil { - return + return errors.New(fmt.Sprintf("key is %s, wc is %v", key, wc)) + } + + wcc, err := c.cs.watch(key) + if err != nil { + return errors.Annotatef(err, "clientset watch %s", key) } c.Lock() a := c.eventRegistry[key] a = append(a, wc) c.eventRegistry[key] = a + c.Unlock() go func() { - wcc := c.cs.rawClient.Watch(c.cs.ctx, key) for msg := range wcc { wc <- msg } @@ -465,8 +492,8 @@ func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) { close(wc) }() - logger.Debugf("etcdClient{%s} register event{path:%s, ptr:%p}", c.name, key, wc) - c.Unlock() + logger.Debugf("etcdv3 client{%s} register event{key:%s, ptr:%p}", c.name, key, wc) + return nil } func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) { @@ -485,10 +512,10 @@ func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) if e == event { arr := infoList infoList = append(arr[:i], arr[i+1:]...) - logger.Debugf("etcdClient{%s} unregister event{path:%s, event:%p}", c.name, key, event) + logger.Debugf("etcdv3 client{%s} unregister event{key:%s, event:%p}", c.name, key, event) } } - logger.Debugf("after etcdClient{%s} unregister event{path:%s, event:%p}, array length %d", + logger.Debugf("after etcdv3 client{%s} unregister event{key:%s, event:%p}, array length %d", c.name, key, event, len(infoList)) if len(infoList) == 0 { delete(c.eventRegistry, key) @@ -531,7 +558,7 @@ func (c *Client) Close() { c.cs = nil } c.Unlock() - logger.Warnf("etcd client{name:%s, etcd addr:%s} exit now.", c.name, c.endpoints) + logger.Warnf("etcdv3 client{name:%s, etcdv3 addr:%s} exit now.", c.name, c.endpoints) } func (c *Client) Create(k string, v string) error { @@ -543,7 +570,7 @@ func (c *Client) Create(k string, v string) error { err = c.cs.put(k, v) } c.Unlock() - return errors.Annotatef(err, "etcd client put key %s value %s", k, v) + return errors.Annotatef(err, "clientset put key %s value %s", k, v) } func (c *Client) Delete(key string) error { @@ -554,7 +581,7 @@ func (c *Client) Delete(key string) error { err = c.cs.delete(key) } c.Unlock() - return errors.Annotatef(err, "etcd client delete (basePath:%s)", key) + return errors.Annotatef(err, "clientset delete (key:%s)", key) } func (c *Client) RegisterTemp(basePath string, node string) (string, error) { @@ -566,40 +593,39 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { err = c.cs.keepAliveKV(completePath, "") } c.Unlock() - logger.Debugf("etcdClient{%s} create a tmp node:%s\n", c.name, completePath) + logger.Debugf("etcdv3 client{%s} create a tmp node:%s\n", c.name, completePath) if err != nil { - return "", errors.Annotatef(err, "etcd client create tmp k %s", completePath) + return "", errors.Annotatef(err, "client create tmp key %s", completePath) } return completePath, nil } -func (c *Client) GetChildrenW(path string) ([]string, clientv3.WatchChan, error) { +func (c *Client) WatchChildren(key string) ([]string, []string, clientv3.WatchChan, error) { var ( - children []string - err error - wc clientv3.WatchChan + err error + childrenKeys []string + childrenValues []string + wc clientv3.WatchChan ) + err = ErrNilETCDV3ClientConn c.Lock() if c.cs != nil { - children, _, wc, err = c.cs.getChildrenW(path) + childrenKeys, childrenValues, wc, err = c.cs.getChildrenW(key) } c.Unlock() if err != nil { - if errors.Cause(err) == ErrKVPairNotFound { - return nil, nil, errors.Annotatef(err,"path{%s} has none children", path) - } - logger.Errorf("etcdv3.ChildrenW(path{%s}) = error(%v)", path, err) - return nil, nil, errors.Annotatef(err, "etcdv3.ChildrenW(path:%s)", path) + logger.Errorf("etcdv3 client Children(key{%s}) = error(%v)", key, perrors.WithStack(err)) + return nil, nil, nil, errors.Annotatef(err, "client ChildrenW(key:%s)", key) } - return children, wc, nil + return childrenKeys, childrenValues, wc, nil } -func (c *Client) GetChildren(path string) ([]string, error) { +func (c *Client) GetChildren(key string) ([]string, error) { var ( err error children []string @@ -608,20 +634,20 @@ func (c *Client) GetChildren(path string) ([]string, error) { err = ErrNilETCDV3ClientConn c.Lock() if c.cs != nil { - children, _, err = c.cs.getChildren(path) + children, _, err = c.cs.getChildren(key) } c.Unlock() if err != nil { if errors.Cause(err) == ErrKVPairNotFound { - return nil, errors.Annotatef(err,"path{%s} has none children", path) + return nil, errors.Annotatef(err, "key{%s} has none children", key) } - logger.Errorf("clientv3.Children(path{%s}) = error(%v)", path, perrors.WithStack(err)) - return nil, errors.Annotatef(err, "clientv3.Children(path:%s)", path) + logger.Errorf("clientv3.Children(key{%s}) = error(%v)", key, perrors.WithStack(err)) + return nil, errors.Annotatef(err, "client GetChildren(key:%s)", key) } return children, nil } -func (c *Client) ExistW(path string) (clientv3.WatchChan, error) { +func (c *Client) WatchExist(key string) (clientv3.WatchChan, error) { var ( err = ErrNilETCDV3ClientConn @@ -630,25 +656,25 @@ func (c *Client) ExistW(path string) (clientv3.WatchChan, error) { c.Lock() if c.cs != nil { - _, err = c.cs.watch(path) + out, err = c.cs.watch(key) } c.Unlock() if err != nil { if errors.Cause(err) == ErrKVPairNotFound { - return nil, errors.Annotatef(err, "path{%s} not exist", path) + return nil, errors.Annotatef(err, "key{%s} not exist", key) } - return nil, errors.Annotatef(err, "clientv3.ExistW(path:%s)", path) + return nil, errors.Annotatef(err, "client WatchExist(key:%s)", key) } return out, nil } -func (c *Client) GetContent(path string) ([]byte, error) { +func (c *Client) GetContent(key string) ([]byte, error) { c.Lock() - value, err := c.cs.get(path) + value, err := c.cs.get(key) if err != nil { - return nil, errors.Annotatef(err, "client set get: %s", path) + return nil, errors.Annotatef(err, "clientset get(key: %s)", key) } c.Unlock() diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index ea71233db7c32964fca0dd884fc62c9afdf4606d..bc5dcd4445f5b4cad9519465388147a07d6283d9 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -1,17 +1,14 @@ package etcdv3 import ( - "gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path" - pathlib "path" "sync" "time" + + "go.etcd.io/etcd/clientv3" ) import ( "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/dubbogo/getty" - perrors "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" ) import ( @@ -20,31 +17,32 @@ import ( ) type EventListener struct { - client *Client - pathMapLock sync.Mutex - pathMap map[string]struct{} - wg sync.WaitGroup + client *Client + keyMapLock sync.Mutex + keyMap map[string]struct{} + wg sync.WaitGroup } func NewEventListener(client *Client) *EventListener { return &EventListener{ - client: client, - pathMap: make(map[string]struct{}), + client: client, + keyMap: make(map[string]struct{}), } } func (l *EventListener) SetClient(client *Client) { l.client = client } -// this method will return true when spec path deleted, +// Listen on a spec key +// this method will return true when spec key deleted, // this method will return false when deep layer connection lose -func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting.DataListener) bool { +func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { l.wg.Add(1) defer l.wg.Done() for { - keyEventCh, err := l.client.ExistW(path) + keyEventCh, err := l.client.WatchExist(key) if err != nil { - logger.Warnf("existW{key:%s} = error{%v}", path, err) + logger.Warnf("WatchExist{key:%s} = error{%v}", key, err) return false } @@ -61,25 +59,12 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting // etcd event stream case e := <-keyEventCh: - if e.Err() != nil{ - logger.Warnf("get a etcd event {err: %s}", e.Err()) + if e.Err() != nil { + logger.Errorf("get a etcdv3 event {err: %s}", e.Err()) + continue } - for _, event := range e.Events{ - logger.Warnf("get a etcd Event{type:%s, path:%s,}", - event.Type.String(), event.Kv.Key ) - switch event.Type { - case mvccpb.PUT: - if len(listener) > 0 { - if event.IsCreate(){ - logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) - listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value)}) - }else{ - logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) - listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EvnetTypeUpdate, Content: string(event.Kv.Value)}) - } - } - case mvccpb.DELETE: - logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + for _, event := range e.Events { + if l.handleEvents(event, listener...) { return true } } @@ -89,285 +74,120 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting return false } - -func (l *EventListener) handleNodeEvent(path string, children []string, listener remoting.DataListener) { - contains := func(s []string, e string) bool { - for _, a := range s { - if a == e { - return true +// return true mean the event type is DELETE +// return false mean the event type is CREATE || UPDATE +func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool { + + logger.Warnf("get a etcdv3 Event {type: %s, key: %s}", event.Type, event.Kv.Key) + + switch event.Type { + // the etcdv3 event just include PUT && DELETE + case mvccpb.PUT: + for _, listener := range listeners { + switch event.IsCreate() { + case true: + logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EventTypeAdd, + Content: string(event.Kv.Value), + }) + case false: + logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EvnetTypeUpdate, + Content: string(event.Kv.Value), + }) } } - return false + case mvccpb.DELETE: + logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + return true } - newChildren, err := l.client.GetChildren(path) - if err != nil { - logger.Errorf("path{%s} child nodes changed, etcdV3.Children() = error{%v}", path, perrors.WithStack(err)) - return - } - - // a node was added -- listen the new node - var ( - newNode string - ) - for _, n := range newChildren { - if contains(children, n) { - continue - } - - newNode = pathlib.Join(path, n) - logger.Infof("add zkNode{%s}", newNode) - content, _, err := l.client.Conn.Get(newNode) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err)) - } - - if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - // listen l service node - go func(node, childNode string) { - logger.Infof("delete zkNode{%s}", node) - if l.ListenServiceNodeEvent(node, listener) { - logger.Infof("delete content{%s}", childNode) - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(newNode, n) - } - - // old node was deleted - var oldNode string - for _, n := range children { - if contains(newChildren, n) { - continue - } - - oldNode = path.Join(zkPath, n) - logger.Warnf("delete zkPath{%s}", oldNode) - - if err != nil { - logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) - continue - } - listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) - } + panic("unreachable") } -func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { +// Listen on a set of key with spec prefix +func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { + l.wg.Add(1) defer l.wg.Done() - - var ( - failTimes int - event chan struct{} - zkEvent zk.Event - ) - event = make(chan struct{}, 4) - defer close(event) for { - // get current children for a zkPath - children, childEventCh, err := l.client.GetChildrenW(zkPath) + _, _, wc, err := l.client.WatchChildren(prefix) if err != nil { - failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes - } - logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) - // clear the event channel - CLEAR: - for { - select { - case <-event: - default: - break CLEAR - } - } - l.client.RegisterEvent(zkPath, &event) - select { - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): - l.client.UnregisterEvent(zkPath, &event) - continue - case <-l.client.Done(): - l.client.UnregisterEvent(zkPath, &event) - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return - case <-event: - logger.Infof("get zk.EventNodeDataChange notify event") - l.client.UnregisterEvent(zkPath, &event) - l.handleZkNodeEvent(zkPath, nil, listener) - continue - } + logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) } - failTimes = 0 - for _, c := range children { - // listen l service node - dubboPath := path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - logger.Infof("Get children!{%s}", dubboPath) - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string) { - if l.ListenServiceNodeEvent(dubboPath) { - listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath) - - //liten sub path recursive - go func(zkPath string, listener remoting.DataListener) { - l.listenDirEvent(zkPath, listener) - logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) - } select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue - } - l.handleZkNodeEvent(zkEvent.Path, children, listener) + + // client watch ctx stop + // server stopped + case <-l.client.cs.ctx.Done(): + logger.Warn("etcd listener service node with prefix etcd server stopped") + return + + // client stopped case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + logger.Warn("etcdv3 client stopped") return + + // etcd event stream + case e := <-wc: + if e.Err() != nil { + logger.Errorf("get a etcdv3 event {err: %s}", e.Err()) + continue + } + for _, event := range e.Events { + l.handleEvents(event, listener...) + } } } } -// -//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) { -// l.wg.EventTypeAdd(1) -// defer l.wg.Done() -// -// var ( -// failTimes int -// event chan struct{} -// zkEvent zk.Event -// ) -// event = make(chan struct{}, 4) -// defer close(event) -// for { -// // get current children for a zkPath -// content,_, eventCh, err := l.client.Conn.GetW(zkPath) -// if err != nil { -// failTimes++ -// if MaxFailTimes <= failTimes { -// failTimes = MaxFailTimes -// } -// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err) -// // clear the event channel -// CLEAR: -// for { -// select { -// case <-event: -// default: -// break CLEAR -// } -// } -// l.client.RegisterEvent(zkPath, &event) -// select { -// case <-time.After(timeSecondDuration(failTimes * ConnDelay)): -// l.client.UnregisterEvent(zkPath, &event) -// continue -// case <-l.client.Done(): -// l.client.UnregisterEvent(zkPath, &event) -// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) -// return -// case <-event: -// logger.Infof("get zk.EventNodeDataChange notify event") -// l.client.UnregisterEvent(zkPath, &event) -// l.handleZkNodeEvent(zkPath, nil, listener) -// continue -// } -// } -// failTimes = 0 -// -// select { -// case zkEvent = <-eventCh: -// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", -// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) -// -// l.handleZkNodeEvent(zkEvent.Path, children, listener) -// case <-l.client.Done(): -// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) -// return -// } -// } -//} - func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener +// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent -func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { - var ( - err error - dubboPath string - children []string - ) +func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { - l.pathMapLock.Lock() - _, ok := l.pathMap[zkPath] - l.pathMapLock.Unlock() + l.keyMapLock.Lock() + _, ok := l.keyMap[key] + l.keyMapLock.Unlock() if ok { - logger.Warnf("@zkPath %s has already been listened.", zkPath) + logger.Warnf("etcdv3 key %s has already been listened.", key) return } - l.pathMapLock.Lock() - l.pathMap[zkPath] = struct{}{} - l.pathMapLock.Unlock() + l.keyMapLock.Lock() + l.keyMap[key] = struct{}{} + l.keyMapLock.Unlock() - logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) - children, err = l.client.GetChildren(zkPath) - if err != nil { - children = nil - logger.Warnf("fail to get children of zk path{%s}", zkPath) - } - - for _, c := range children { + logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + go func(key string, listener remoting.DataListener) { + l.ListenServiceNodeEventWithPrefix(key, listener) + logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) + }(key, listener) - // listen l service node - dubboPath = path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue + logger.Infof("listen dubbo service key{%s}", key) + go func(key string) { + if l.ListenServiceNodeEvent(key) { + listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string) { - if l.ListenServiceNodeEvent(dubboPath) { - listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath) - } - - logger.Infof("listen dubbo path{%s}", zkPath) - go func(zkPath string, listener remoting.DataListener) { - l.listenDirEvent(zkPath, listener) - logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(zkPath, listener) + logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key) + }(key) } -func (l *ZkEventListener) valid() bool { - return l.client.ZkConnValid() +func (l *EventListener) valid() bool { + return l.client.Valid() } -func (l *ZkEventListener) Close() { +func (l *EventListener) Close() { l.wg.Wait() }