Skip to content
Snippets Groups Projects
Commit c06c8a27 authored by scott.wang's avatar scott.wang
Browse files

etcdv3 basic complete

parent 549f4ae8
No related branches found
No related tags found
No related merge requests found
......@@ -41,7 +41,7 @@ import (
_ "github.com/apache/dubbo-go/cluster/cluster_impl"
_ "github.com/apache/dubbo-go/cluster/loadbalance"
_ "github.com/apache/dubbo-go/registry/zookeeper"
_ "github.com/apache/dubbo-go/registry/etcd"
)
var (
......
......@@ -18,15 +18,9 @@ application_config:
registries :
"hangzhouzk":
protocol: "zookeeper"
protocol: "etcdv3"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
"shanghaizk":
protocol: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2182"
address: "127.0.0.1:2379"
username: ""
password: ""
......
......@@ -40,7 +40,7 @@ import (
_ "github.com/apache/dubbo-go/cluster/cluster_impl"
_ "github.com/apache/dubbo-go/cluster/loadbalance"
_ "github.com/apache/dubbo-go/registry/zookeeper"
_ "github.com/apache/dubbo-go/registry/etcd"
)
var (
......
......@@ -105,7 +105,7 @@ func init() {
userMap.user["A000"] = DefaultUser
userMap.user["A001"] = User{Id: "001", Name: "ZhangSheng", Age: 18, Sex: Gender(MAN)}
userMap.user["A002"] = User{Id: "002", Name: "Lily", Age: 20, Sex: Gender(WOMAN)}
userMap.user["A003"] = User{Id: "113", Name: "Moorse", Age: 30, Sex: Gender(WOMAN)}
userMap.user["A003"] = User{Id: "113", Name: "ScottWang", Age: 30, Sex: Gender(WOMAN)}
for k, v := range userMap.user {
v.Time = time.Now()
userMap.user[k] = v
......
......@@ -12,15 +12,9 @@ application_config:
registries :
"hangzhouzk":
protocol: "zookeeper"
protocol: "etcdv3"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
"shanghaizk":
protocol: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2182"
address: "127.0.0.1:2379"
username: ""
password: ""
......
module github.com/apache/dubbo-go
require (
github.com/AlexStocks/goext v0.3.2
github.com/AlexStocks/log4go v1.0.2 // indirect
github.com/coreos/etcd v3.3.13+incompatible // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dubbogo/getty v1.2.0
github.com/dubbogo/gost v1.1.1
github.com/dubbogo/hessian2 v1.2.0
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect
github.com/juju/testing v0.0.0-20190613124551-e81189438503 // indirect
github.com/magiconair/properties v1.8.1
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect
github.com/mattn/go-isatty v0.0.8 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.0.0 // indirect
github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90 // indirect
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.0 // indirect
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect
golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 // indirect
google.golang.org/grpc v1.22.0
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/yaml.v2 v2.2.2
)
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0=
github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkglDGSl3Hc=
github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk=
github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dubbogo/getty v1.2.0 h1:LmP/R/yCYp5/ubUTm5Xa7QwE3EnFFSjguJH8DJruJ70=
github.com/dubbogo/getty v1.2.0/go.mod h1:p9Pyk18a+5Aa0GZ546aCzlehEfvEof0jAF0+QQcEPYQ=
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
......@@ -17,36 +28,101 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw=
github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 h1:aQ4ahUIm1FHeZ2XsL1lbzkn9NxhkOUeiMWITOA7OOms=
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifGWef4WUqvXk0iRqdhdy/2uzI=
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U=
github.com/juju/testing v0.0.0-20190613124551-e81189438503 h1:ZUgTbk8oHgP0jpMieifGC9Lv47mHn8Pb3mFX3/Ew4iY=
github.com/juju/testing v0.0.0-20190613124551-e81189438503/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e h1:hB2xlXdHp/pmPZq0y3QnmWAArdw9PqbmotexnWx/FU8=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90 h1:Cov9QkEXNhh8QeXoICvORjJ4RrpyvXmSf7rHSpS+ZfI=
github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw=
go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
......@@ -55,29 +131,56 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 h1:RMGusaKverhgGR5KBERIKiTyWoWHRd84GCtsNlvLvIo=
golang.org/x/time v0.0.0-20190513212739-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw=
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
......@@ -54,7 +54,7 @@ func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent)
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.ctx.Done():
case <-l.registry.done:
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, errors.New("listener stopped")
......@@ -62,7 +62,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Debugf("got etcd event %s", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.ctx.Done():
case <-l.registry.done:
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
......@@ -76,4 +76,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
func (l *configurationListener) Close() {
l.registry.wg.Done()
}
package etcd
import (
"context"
"fmt"
"github.com/apache/dubbo-go/remoting"
"net/url"
"os"
"path"
......@@ -12,16 +10,16 @@ import (
"sync"
"time"
etcd "github.com/AlexStocks/goext/database/etcd"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/etcdv3"
"github.com/apache/dubbo-go/version"
"github.com/juju/errors"
"go.etcd.io/etcd/clientv3"
perrors "github.com/pkg/errors"
)
var (
......@@ -32,23 +30,62 @@ var (
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = utils.GetLocalIP()
extension.SetRegistry("etcd", newETCDV3Registry)
extension.SetRegistry("etcdv3", newETCDV3Registry)
}
type etcdV3Registry struct {
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
ctx context.Context
cancel context.CancelFunc
cltLock sync.Mutex
client *etcdv3.Client
services map[string]common.URL // service name + protocol -> service config
rawClient *clientv3.Client
client *etcd.Client
dataListener remoting.DataListener
configListener remoting.ConfigurationListener
listenerLock sync.Mutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
servicesCache sync.Map // service name + protocol -> service config
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
}
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *etcdV3Registry) GetDone() chan struct{} {
return r.done
}
func (r *etcdV3Registry) RestartCallBack() bool {
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
flag := true
for _, confIf := range services {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return flag
}
func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
......@@ -63,56 +100,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
logger.Infof("etcd address is: %v", url.Location)
logger.Infof("time-out is: %v", timeout.String())
rawClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{url.Location},
DialTimeout: timeout,
//DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, errors.Annotate(err, "block connect to etcd server")
}
rawClient.ActiveConnection()
rootCtx, cancel := context.WithCancel(context.Background())
client, err := etcd.NewClient(rawClient, etcd.WithTTL(time.Second), etcd.WithContext(rootCtx))
if err != nil {
return nil, errors.Annotate(err, "new etcd client")
r := &etcdV3Registry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
}
r := etcdV3Registry{
URL: url,
ctx: rootCtx,
cancel: cancel,
rawClient: rawClient,
client: client,
servicesCache: sync.Map{},
if err := etcdv3.ValidateClient(r, etcdv3.WithName(etcdv3.RegistryETCDV3Client)); err != nil {
return nil, err
}
go r.keepAlive()
return &r, nil
}
r.wg.Add(1)
go etcdv3.HandleClientRestart(r)
func (r *etcdV3Registry) keepAlive() error {
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
resp, err := r.client.KeepAlive()
if err != nil {
return errors.Annotate(err, "keep alive")
}
go func() {
for {
select {
case _, ok := <-resp:
if !ok {
logger.Errorf("etcd server stop")
r.cancel()
return
}
}
}
}()
return nil
return r, nil
}
func (r *etcdV3Registry) GetUrl() common.URL {
......@@ -122,7 +127,7 @@ func (r *etcdV3Registry) GetUrl() common.URL {
func (r *etcdV3Registry) IsAvailable() bool {
select {
case <-r.ctx.Done():
case <-r.done:
return false
default:
return true
......@@ -130,20 +135,21 @@ func (r *etcdV3Registry) IsAvailable() bool {
}
func (r *etcdV3Registry) Destroy() {
if r.configListener != nil {
r.configListener.Close()
}
r.stop()
}
func (r *etcdV3Registry) stop() {
// close current client
r.rawClient.Close()
close(r.done)
// cancel ctx
r.cancel()
// close current client
r.client.Close()
r.rawClient = nil
r.ctx = nil
r.cancel = nil
r.client = nil
r.servicesCache.Range(func(key, value interface{}) bool {
r.servicesCache.Delete(key)
return true
......@@ -180,24 +186,12 @@ func (r *etcdV3Registry) Register(svc common.URL) error {
return nil
}
func (r *etcdV3Registry) createKVIfNotExist(k string, v string) error {
_, err := r.rawClient.Txn(r.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
Then(clientv3.OpPut(k, v)).
Commit()
if err != nil {
return errors.Annotatef(err, "etcd create k %s v %s", k, v)
}
return nil
}
func (r *etcdV3Registry) createDirIfNotExist(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
if err := r.createKVIfNotExist(tmpPath, ""); err != nil {
if err := r.client.Create(tmpPath, ""); err != nil {
return errors.Annotatef(err, "create path %s in etcd", tmpPath)
}
}
......@@ -226,7 +220,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil {
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
......@@ -279,7 +273,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil {
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
......@@ -288,8 +282,33 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
)
r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
// new client & listener
listener := etcdv3.NewEventListener(r.client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
//注册到dataconfig的interested
r.dataListener.AddInterestedURL(&svc)
logger.Infof("subscribe svc: %s", svc)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener)
return nil, nil
return configListener, nil
}
......@@ -2,13 +2,13 @@ package etcdv3
import (
"context"
"fmt"
"path"
"sync"
"time"
)
import (
"github.com/AlexStocks/goext/database/etcd"
"github.com/juju/errors"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
......@@ -24,10 +24,11 @@ import (
const (
ConnDelay = 3
MaxFailTimes = 15
RegistryETCDV3Client = "etcd registry"
)
var (
ErrNilETCDV3ClientConn = errors.New("etcdv3client{conn} is nil")
ErrNilETCDV3ClientConn = errors.New("etcd clientset {conn} is nil") // full describe the ERR
ErrKVPairNotFound = errors.New("k/v pair not found")
)
......@@ -36,7 +37,7 @@ type clientSet struct {
lock sync.RWMutex // protect all element in
// clientSet
gxClient *gxetcd.Client
//gxClient *gxetcd.Client
rawClient *clientv3.Client
// client controller used to change client behave
......@@ -60,17 +61,17 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error {
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return errors.Annotate(err, "block connect to etcd server")
return errors.Annotate(err, "new raw client block connect to server")
}
// share context
gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx))
if err != nil {
return errors.Annotate(err, "new etcd client")
}
//gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx))
//if err != nil {
// return errors.Annotate(err, "new gxetcd client")
//}
out := &clientSet{
gxClient: gxClient,
//gxClient: gxClient,
rawClient: client,
ctx: rootCtx,
cancel: cancel,
......@@ -92,12 +93,20 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error {
func (c *clientSet) maintenanceStatus() error {
c.c.Wait.Add(1)
aliveResp, err := c.gxClient.KeepAlive()
lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
if err != nil {
return errors.Annotatef(err, "etcd keep alive")
return errors.Annotatef(err, "grant lease")
}
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
c.rawClient.Revoke(c.ctx, lease.ID)
return errors.Annotate(err, "keep alive lease")
}
// start maintenance the connection status
go c.maintenanceStatusLoop(aliveResp)
go c.maintenanceStatusLoop(keepAlive)
return nil
}
......@@ -105,7 +114,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl
defer func() {
c.c.Wait.Done()
logger.Infof("etcd {path:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name)
logger.Infof("etcdv3 clientset {endpoints:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name)
}()
// get signal, will start maintenanceStatusLoop
......@@ -118,7 +127,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl
return
case <-c.ctx.Done():
// client context exit
logger.Warn("etcd clientSet context done")
logger.Warn("etcdv3 clientset context done")
return
case msg, ok := <-aliveResp:
// etcd connection lose
......@@ -126,7 +135,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl
// if clientSet.Client is nil, it will panic
if !ok {
logger.Warnf("etcd server stop at term: %#v", msg)
logger.Warnf("etcdv3 server stop at term: %#v", msg)
c.c.Lock() // hold the c.Client lock
c.c.cs.clean()
......@@ -213,10 +222,25 @@ func (c *clientSet) getChildrenW(k string) ([]string, []string, clientv3.WatchCh
return nil, nil, nil, ErrNilETCDV3ClientConn
}
wc := c.rawClient.Watch(c.ctx, k, clientv3.WithPrefix())
wc,err := c.watchWithPrefix(k)
if err != nil{
return nil, nil, nil,errors.Annotate(err, "watch with prefix")
}
return kList, vList, wc, nil
}
func (c *clientSet) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3ClientConn
}
return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
}
func (c *clientSet) watch(k string) (clientv3.WatchChan, error) {
c.lock.RLock()
......@@ -228,7 +252,7 @@ func (c *clientSet) watch(k string) (clientv3.WatchChan, error) {
_, err := c.get(k)
if err != nil {
return nil, errors.Annotatef(err, "watch pre check key %s", k)
return nil, errors.Annotatef(err, "pre check key %s", k)
}
return c.rawClient.Watch(c.ctx, k), nil
......@@ -296,11 +320,10 @@ func (c *clientSet) keepAliveKV(k string, v string) error {
// this method will hold clientset lock
func (c *clientSet) clean() {
c.lock.Lock()
if c.gxClient != nil {
if c.rawClient != nil {
// close gx client, it will close raw etcdv3 client
c.gxClient.Close()
c.gxClient = nil
// close raw etcdv3 client
c.rawClient.Close()
c.rawClient = nil
// cancel all context
......@@ -373,8 +396,6 @@ func ValidateClient(container clientFacade, opts ...Option) error {
opt(options)
}
err = nil
lock := container.ClientLock()
url := container.GetUrl()
......@@ -388,13 +409,13 @@ func ValidateClient(container clientFacade, opts ...Option) error {
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location)
return errors.Annotate(err, "timeout parse")
}
newClient, err := newClient(options.name, []string{url.Location}, timeout)
if err != nil {
logger.Warnf("newETCDV3Client(name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}",
logger.Warnf("new client (name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}",
options.name, url.Location, timeout.String(), err)
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location)
return errors.Annotatef(err, "new client (address:%+v)", url.Location)
}
container.SetClient(newClient)
}
......@@ -403,12 +424,12 @@ func ValidateClient(container clientFacade, opts ...Option) error {
err = newClientSet(container.Client().endpoints, container.Client().timeout, container.Client())
if err != nil {
return errors.Annotate(err, "new client set")
return errors.Annotate(err, "new clientset")
}
container.Client().cs.startMaintenanceChan <- struct{}{}
}
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.PrimitiveURL)
return nil
}
func newClient(name string, endpoints []string, timeout time.Duration) (*Client, error) {
......@@ -427,7 +448,7 @@ func newClient(name string, endpoints []string, timeout time.Duration) (*Client,
err = newClientSet(endpoints, timeout, out)
if err != nil {
return nil, errors.Annotate(err, "new client set")
return nil, errors.Annotate(err, "new clientset")
}
// start maintenanceChan
......@@ -445,19 +466,25 @@ func (c *Client) stop() bool {
return false
}
func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) {
func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) error {
if key == "" || wc == nil {
return
return errors.New(fmt.Sprintf("key is %s, wc is %v", key, wc))
}
wcc, err := c.cs.watch(key)
if err != nil {
return errors.Annotatef(err, "clientset watch %s", key)
}
c.Lock()
a := c.eventRegistry[key]
a = append(a, wc)
c.eventRegistry[key] = a
c.Unlock()
go func() {
wcc := c.cs.rawClient.Watch(c.cs.ctx, key)
for msg := range wcc {
wc <- msg
}
......@@ -465,8 +492,8 @@ func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) {
close(wc)
}()
logger.Debugf("etcdClient{%s} register event{path:%s, ptr:%p}", c.name, key, wc)
c.Unlock()
logger.Debugf("etcdv3 client{%s} register event{key:%s, ptr:%p}", c.name, key, wc)
return nil
}
func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) {
......@@ -485,10 +512,10 @@ func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse)
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
logger.Debugf("etcdClient{%s} unregister event{path:%s, event:%p}", c.name, key, event)
logger.Debugf("etcdv3 client{%s} unregister event{key:%s, event:%p}", c.name, key, event)
}
}
logger.Debugf("after etcdClient{%s} unregister event{path:%s, event:%p}, array length %d",
logger.Debugf("after etcdv3 client{%s} unregister event{key:%s, event:%p}, array length %d",
c.name, key, event, len(infoList))
if len(infoList) == 0 {
delete(c.eventRegistry, key)
......@@ -531,7 +558,7 @@ func (c *Client) Close() {
c.cs = nil
}
c.Unlock()
logger.Warnf("etcd client{name:%s, etcd addr:%s} exit now.", c.name, c.endpoints)
logger.Warnf("etcdv3 client{name:%s, etcdv3 addr:%s} exit now.", c.name, c.endpoints)
}
func (c *Client) Create(k string, v string) error {
......@@ -543,7 +570,7 @@ func (c *Client) Create(k string, v string) error {
err = c.cs.put(k, v)
}
c.Unlock()
return errors.Annotatef(err, "etcd client put key %s value %s", k, v)
return errors.Annotatef(err, "clientset put key %s value %s", k, v)
}
func (c *Client) Delete(key string) error {
......@@ -554,7 +581,7 @@ func (c *Client) Delete(key string) error {
err = c.cs.delete(key)
}
c.Unlock()
return errors.Annotatef(err, "etcd client delete (basePath:%s)", key)
return errors.Annotatef(err, "clientset delete (key:%s)", key)
}
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
......@@ -566,40 +593,39 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
err = c.cs.keepAliveKV(completePath, "")
}
c.Unlock()
logger.Debugf("etcdClient{%s} create a tmp node:%s\n", c.name, completePath)
logger.Debugf("etcdv3 client{%s} create a tmp node:%s\n", c.name, completePath)
if err != nil {
return "", errors.Annotatef(err, "etcd client create tmp k %s", completePath)
return "", errors.Annotatef(err, "client create tmp key %s", completePath)
}
return completePath, nil
}
func (c *Client) GetChildrenW(path string) ([]string, clientv3.WatchChan, error) {
func (c *Client) WatchChildren(key string) ([]string, []string, clientv3.WatchChan, error) {
var (
children []string
err error
wc clientv3.WatchChan
err error
childrenKeys []string
childrenValues []string
wc clientv3.WatchChan
)
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, wc, err = c.cs.getChildrenW(path)
childrenKeys, childrenValues, wc, err = c.cs.getChildrenW(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, nil, errors.Annotatef(err,"path{%s} has none children", path)
}
logger.Errorf("etcdv3.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, errors.Annotatef(err, "etcdv3.ChildrenW(path:%s)", path)
logger.Errorf("etcdv3 client Children(key{%s}) = error(%v)", key, perrors.WithStack(err))
return nil, nil, nil, errors.Annotatef(err, "client ChildrenW(key:%s)", key)
}
return children, wc, nil
return childrenKeys, childrenValues, wc, nil
}
func (c *Client) GetChildren(path string) ([]string, error) {
func (c *Client) GetChildren(key string) ([]string, error) {
var (
err error
children []string
......@@ -608,20 +634,20 @@ func (c *Client) GetChildren(path string) ([]string, error) {
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, err = c.cs.getChildren(path)
children, _, err = c.cs.getChildren(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err,"path{%s} has none children", path)
return nil, errors.Annotatef(err, "key{%s} has none children", key)
}
logger.Errorf("clientv3.Children(path{%s}) = error(%v)", path, perrors.WithStack(err))
return nil, errors.Annotatef(err, "clientv3.Children(path:%s)", path)
logger.Errorf("clientv3.Children(key{%s}) = error(%v)", key, perrors.WithStack(err))
return nil, errors.Annotatef(err, "client GetChildren(key:%s)", key)
}
return children, nil
}
func (c *Client) ExistW(path string) (clientv3.WatchChan, error) {
func (c *Client) WatchExist(key string) (clientv3.WatchChan, error) {
var (
err = ErrNilETCDV3ClientConn
......@@ -630,25 +656,25 @@ func (c *Client) ExistW(path string) (clientv3.WatchChan, error) {
c.Lock()
if c.cs != nil {
_, err = c.cs.watch(path)
out, err = c.cs.watch(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err, "path{%s} not exist", path)
return nil, errors.Annotatef(err, "key{%s} not exist", key)
}
return nil, errors.Annotatef(err, "clientv3.ExistW(path:%s)", path)
return nil, errors.Annotatef(err, "client WatchExist(key:%s)", key)
}
return out, nil
}
func (c *Client) GetContent(path string) ([]byte, error) {
func (c *Client) GetContent(key string) ([]byte, error) {
c.Lock()
value, err := c.cs.get(path)
value, err := c.cs.get(key)
if err != nil {
return nil, errors.Annotatef(err, "client set get: %s", path)
return nil, errors.Annotatef(err, "clientset get(key: %s)", key)
}
c.Unlock()
......
package etcdv3
import (
"gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path"
pathlib "path"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
)
import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
import (
......@@ -20,31 +17,32 @@ import (
)
type EventListener struct {
client *Client
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
client *Client
keyMapLock sync.Mutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
pathMap: make(map[string]struct{}),
client: client,
keyMap: make(map[string]struct{}),
}
}
func (l *EventListener) SetClient(client *Client) {
l.client = client
}
// this method will return true when spec path deleted,
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting.DataListener) bool {
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
keyEventCh, err := l.client.ExistW(path)
keyEventCh, err := l.client.WatchExist(key)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", path, err)
logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
return false
}
......@@ -61,25 +59,12 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting
// etcd event stream
case e := <-keyEventCh:
if e.Err() != nil{
logger.Warnf("get a etcd event {err: %s}", e.Err())
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
continue
}
for _, event := range e.Events{
logger.Warnf("get a etcd Event{type:%s, path:%s,}",
event.Type.String(), event.Kv.Key )
switch event.Type {
case mvccpb.PUT:
if len(listener) > 0 {
if event.IsCreate(){
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value)})
}else{
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EvnetTypeUpdate, Content: string(event.Kv.Value)})
}
}
case mvccpb.DELETE:
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
for _, event := range e.Events {
if l.handleEvents(event, listener...) {
return true
}
}
......@@ -89,285 +74,120 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting
return false
}
func (l *EventListener) handleNodeEvent(path string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {
logger.Warnf("get a etcdv3 Event {type: %s, key: %s}", event.Type, event.Kv.Key)
switch event.Type {
// the etcdv3 event just include PUT && DELETE
case mvccpb.PUT:
for _, listener := range listeners {
switch event.IsCreate() {
case true:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Kv.Value),
})
case false:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EvnetTypeUpdate,
Content: string(event.Kv.Value),
})
}
}
return false
case mvccpb.DELETE:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
return true
}
newChildren, err := l.client.GetChildren(path)
if err != nil {
logger.Errorf("path{%s} child nodes changed, etcdV3.Children() = error{%v}", path, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = pathlib.Join(path, n)
logger.Infof("add zkNode{%s}", newNode)
content, _, err := l.client.Conn.Get(newNode)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
go func(node, childNode string) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", childNode)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, n)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
panic("unreachable")
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.GetChildrenW(zkPath)
_, _, wc, err := l.client.WatchChildren(prefix)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
select {
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
}
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}
failTimes = 0
for _, c := range children {
// listen l service node
dubboPath := path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
logger.Infof("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
//liten sub path recursive
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
// client watch ctx stop
// server stopped
case <-l.client.cs.ctx.Done():
logger.Warn("etcd listener service node with prefix etcd server stopped")
return
// client stopped
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
logger.Warn("etcdv3 client stopped")
return
// etcd event stream
case e := <-wc:
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
continue
}
for _, event := range e.Events {
l.handleEvents(event, listener...)
}
}
}
}
//
//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) {
// l.wg.EventTypeAdd(1)
// defer l.wg.Done()
//
// var (
// failTimes int
// event chan struct{}
// zkEvent zk.Event
// )
// event = make(chan struct{}, 4)
// defer close(event)
// for {
// // get current children for a zkPath
// content,_, eventCh, err := l.client.Conn.GetW(zkPath)
// if err != nil {
// failTimes++
// if MaxFailTimes <= failTimes {
// failTimes = MaxFailTimes
// }
// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err)
// // clear the event channel
// CLEAR:
// for {
// select {
// case <-event:
// default:
// break CLEAR
// }
// }
// l.client.RegisterEvent(zkPath, &event)
// select {
// case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
// l.client.UnregisterEvent(zkPath, &event)
// continue
// case <-l.client.Done():
// l.client.UnregisterEvent(zkPath, &event)
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// case <-event:
// logger.Infof("get zk.EventNodeDataChange notify event")
// l.client.UnregisterEvent(zkPath, &event)
// l.handleZkNodeEvent(zkPath, nil, listener)
// continue
// }
// }
// failTimes = 0
//
// select {
// case zkEvent = <-eventCh:
// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
//
// l.handleZkNodeEvent(zkEvent.Path, children, listener)
// case <-l.client.Done():
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// }
// }
//}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
l.keyMapLock.Lock()
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
logger.Infof("listen dubbo service key{%s}", key)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)
}(key)
}
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
func (l *EventListener) valid() bool {
return l.client.Valid()
}
func (l *ZkEventListener) Close() {
func (l *EventListener) Close() {
l.wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment