diff --git a/go.mod b/go.mod index 61ce25b5cd909ffb50298e35122eb78fde1365ea..b63c2060ed86373ee910efc789f62f3166309ab0 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,40 @@ module github.com/apache/dubbo-go require ( github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 + github.com/coreos/bbolt v1.3.3 // indirect + github.com/coreos/etcd v3.3.13+incompatible + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 + github.com/gogo/protobuf v1.2.1 // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/google/btree v1.0.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect + github.com/jonboulle/clockwork v0.1.0 // indirect + github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 + github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect + github.com/juju/testing v0.0.0-20190613124551-e81189438503 // indirect github.com/magiconair/properties v1.8.1 github.com/pkg/errors v0.8.1 + github.com/prometheus/client_golang v1.0.0 // indirect + github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90 // indirect github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec + github.com/soheilhy/cmux v0.1.4 // indirect github.com/stretchr/testify v1.3.0 + github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect + github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/bbolt v1.3.3 // indirect + go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 + golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect + golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 // indirect + google.golang.org/grpc v1.22.0 + gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index 7718fce852388f9fa896b891924a0662de6c157f..9d0e0450ed02c0ceed65569cfbb2c3c60106309d 100644 --- a/go.sum +++ b/go.sum @@ -1,42 +1,185 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY= +github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= +github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE= github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= +github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= +github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 h1:aQ4ahUIm1FHeZ2XsL1lbzkn9NxhkOUeiMWITOA7OOms= +github.com/juju/errors v1.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= +github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifGWef4WUqvXk0iRqdhdy/2uzI= +github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= +github.com/juju/testing v0.0.0-20190613124551-e81189438503 h1:ZUgTbk8oHgP0jpMieifGC9Lv47mHn8Pb3mFX3/Ew4iY= +github.com/juju/testing v0.0.0-20190613124551-e81189438503/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90 h1:Cov9QkEXNhh8QeXoICvORjJ4RrpyvXmSf7rHSpS+ZfI= +github.com/prometheus/client_model v0.0.0-20190220174349-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= +go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 h1:RMGusaKverhgGR5KBERIKiTyWoWHRd84GCtsNlvLvIo= +golang.org/x/time v0.0.0-20190513212739-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw= +google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU= +gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..6c67b18b119658523959fa16ab520a32599fd026 --- /dev/null +++ b/registry/etcdv3/listener.go @@ -0,0 +1,88 @@ +package etcdv3 + +import ( + "context" + "strings" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" +) + +import ( + "github.com/juju/errors" +) + +type dataListener struct { + interestedURL []*common.URL + listener remoting.ConfigurationListener +} + +func NewRegistryDataListener(listener remoting.ConfigurationListener) *dataListener { + return &dataListener{listener: listener, interestedURL: []*common.URL{}} +} + +func (l *dataListener) AddInterestedURL(url *common.URL) { + l.interestedURL = append(l.interestedURL, url) +} + +func (l *dataListener) DataChange(eventType remoting.Event) bool { + + url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):] + serviceURL, err := common.NewURL(context.TODO(), url) + if err != nil { + logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) + return false + } + + for _, v := range l.interestedURL { + if serviceURL.URLEqual(*v) { + l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path, Value: serviceURL, ConfigType: eventType.Action}) + return true + } + } + + return false +} + +type configurationListener struct { + registry *etcdV3Registry + events chan *remoting.ConfigChangeEvent +} + +func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { + // add a new waiter + reg.wg.Add(1) + return &configurationListener{registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)} +} +func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) { + l.events <- configType +} + +func (l *configurationListener) Next() (*registry.ServiceEvent, error) { + for { + select { + case <-l.registry.done: + logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") + return nil, errors.New("listener stopped") + + case e := <-l.events: + logger.Warnf("got etcd event %#s", e) + if e.ConfigType == remoting.EventTypeDel { + select { + case <-l.registry.done: + logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) + default: + } + continue + } + return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil + } + } +} +func (l *configurationListener) Close() { + l.registry.wg.Done() +} diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a2637a01e46680e2615ca560938d12ded57ae968 --- /dev/null +++ b/registry/etcdv3/listener_test.go @@ -0,0 +1,68 @@ +package etcdv3 + +import ( + "context" + "testing" + "time" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/remoting" + "github.com/stretchr/testify/suite" + "go.etcd.io/etcd/embed" +) + +type RegistryTestSuite struct { + suite.Suite + etcd *embed.Etcd +} + +// start etcd server +func (suite *RegistryTestSuite) SetupSuite() { + + t := suite.T() + + cfg := embed.NewConfig() + cfg.Dir = "/tmp/default.etcd" + e, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + select { + case <-e.Server.ReadyNotify(): + t.Log("Server is ready!") + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + t.Logf("Server took too long to start!") + } + + suite.etcd = e + return +} + +// stop etcd server +func (suite *RegistryTestSuite) TearDownSuite() { + suite.etcd.Close() +} + +func (suite *RegistryTestSuite) TestDataChange() { + + t := suite.T() + + listener := NewRegistryDataListener(&MockDataListener{}) + url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") + listener.AddInterestedURL(&url) + if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) { + t.Fatal("data change not ok") + } +} + +func TestRegistrySuite(t *testing.T) { + suite.Run(t, &RegistryTestSuite{}) +} + +type MockDataListener struct { +} + +func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) { + +} diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go new file mode 100644 index 0000000000000000000000000000000000000000..f6424a6a0e65daad7fb7f9d04c046aeac9d1f374 --- /dev/null +++ b/registry/etcdv3/registry.go @@ -0,0 +1,324 @@ +package etcdv3 + +import ( + "fmt" + "net/url" + "os" + "path" + "strconv" + "strings" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/utils" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/etcdv3" + "github.com/apache/dubbo-go/version" +) + +import ( + "github.com/juju/errors" + perrors "github.com/pkg/errors" +) + +var ( + processID = "" + localIP = "" +) + +func init() { + processID = fmt.Sprintf("%d", os.Getpid()) + localIP, _ = utils.GetLocalIP() + extension.SetRegistry("etcdv3", newETCDV3Registry) +} + +type etcdV3Registry struct { + *common.URL + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + + cltLock sync.Mutex + client *etcdv3.Client + services map[string]common.URL // service name + protocol -> service config + + listenerLock sync.Mutex + listener *etcdv3.EventListener + dataListener *dataListener + configListener *configurationListener + + wg sync.WaitGroup // wg+done for etcd client restart + done chan struct{} +} + +func (r *etcdV3Registry) Client() *etcdv3.Client { + return r.client +} +func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { + r.client = client +} +func (r *etcdV3Registry) ClientLock() *sync.Mutex { + return &r.cltLock +} +func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup { + return &r.wg +} +func (r *etcdV3Registry) GetDone() chan struct{} { + return r.done +} +func (r *etcdV3Registry) RestartCallBack() bool { + + services := []common.URL{} + for _, confIf := range r.services { + services = append(services, confIf) + } + + flag := true + for _, confIf := range services { + err := r.Register(confIf) + if err != nil { + logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf, perrors.WithStack(err)) + flag = false + break + } + logger.Infof("success to re-register service :%v", confIf.Key()) + } + return flag +} + +func newETCDV3Registry(url *common.URL) (registry.Registry, error) { + + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + logger.Errorf("timeout config %v is invalid ,err is %v", + url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) + return nil, errors.Annotatef(err, "new etcd registry(address:%+v)", url.Location) + } + + logger.Infof("etcd address is: %v", url.Location) + logger.Infof("time-out is: %v", timeout.String()) + + r := &etcdV3Registry{ + URL: url, + birth: time.Now().UnixNano(), + done: make(chan struct{}), + services: make(map[string]common.URL), + } + + if err := etcdv3.ValidateClient(r, + etcdv3.WithName(etcdv3.RegistryETCDV3Client), + etcdv3.WithTimeout(timeout), + etcdv3.WithEndpoints(url.Location), + ); err != nil { + return nil, err + } + + r.wg.Add(1) + go etcdv3.HandleClientRestart(r) + + r.listener = etcdv3.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) + + return r, nil +} + +func (r *etcdV3Registry) GetUrl() common.URL { + return *r.URL +} + +func (r *etcdV3Registry) IsAvailable() bool { + + select { + case <-r.done: + return false + default: + return true + } +} + +func (r *etcdV3Registry) Destroy() { + + if r.configListener != nil { + r.configListener.Close() + } + r.stop() +} + +func (r *etcdV3Registry) stop() { + + close(r.done) + + // close current client + r.client.Close() + + r.cltLock.Lock() + r.client = nil + r.services = nil + r.cltLock.Unlock() +} + +func (r *etcdV3Registry) Register(svc common.URL) error { + + role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) + if err != nil { + return errors.Annotate(err, "get registry role") + } + + r.cltLock.Lock() + if _, ok := r.services[svc.Key()]; ok { + return errors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path)) + } + r.cltLock.Unlock() + + switch role { + case common.PROVIDER: + logger.Debugf("(provider register )Register(conf{%#v})", svc) + if err := r.registerProvider(svc); err != nil { + return errors.Annotate(err, "register provider") + } + case common.CONSUMER: + logger.Debugf("(consumer register )Register(conf{%#v})", svc) + if err := r.registerConsumer(svc); err != nil { + return errors.Annotate(err, "register consumer") + } + default: + return errors.New(fmt.Sprintf("unknown role %d", role)) + } + + r.cltLock.Lock() + r.services[svc.Key()] = svc + r.cltLock.Unlock() + return nil +} + +func (r *etcdV3Registry) createDirIfNotExist(k string) error { + + var tmpPath string + for _, str := range strings.Split(k, "/")[1:] { + tmpPath = path.Join(tmpPath, "/", str) + if err := r.client.Create(tmpPath, ""); err != nil { + return errors.Annotatef(err, "create path %s in etcd", tmpPath) + } + } + + return nil +} + +func (r *etcdV3Registry) registerConsumer(svc common.URL) error { + + consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER]) + if err := r.createDirIfNotExist(consumersNode); err != nil { + logger.Errorf("etcd client create path %s: %v", consumersNode, err) + return errors.Annotate(err, "etcd create consumer nodes") + } + providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) + if err := r.createDirIfNotExist(providersNode); err != nil { + return errors.Annotate(err, "create provider node") + } + + params := url.Values{} + + params.Add("protocol", svc.Protocol) + + params.Add("category", (common.RoleType(common.CONSUMER)).String()) + params.Add("dubbo", "dubbogo-consumer-"+version.Version) + + encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) + dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) + if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { + return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) + } + + return nil +} + +func (r *etcdV3Registry) registerProvider(svc common.URL) error { + + if svc.Path == "" || len(svc.Methods) == 0 { + return errors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods)) + } + + var ( + urlPath string + encodedURL string + dubboPath string + ) + + providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) + if err := r.createDirIfNotExist(providersNode); err != nil { + return errors.Annotate(err, "create provider node") + } + + params := url.Values{} + for k, v := range svc.Params { + params[k] = v + } + + params.Add("pid", processID) + params.Add("ip", localIP) + params.Add("anyhost", "true") + params.Add("category", (common.RoleType(common.PROVIDER)).String()) + params.Add("dubbo", "dubbo-provider-golang-"+version.Version) + params.Add("side", (common.RoleType(common.PROVIDER)).Role()) + + if len(svc.Methods) == 0 { + params.Add("methods", strings.Join(svc.Methods, ",")) + } + + logger.Debugf("provider url params:%#v", params) + var host string + if svc.Ip == "" { + host = localIP + ":" + svc.Port + } else { + host = svc.Ip + ":" + svc.Port + } + + urlPath = svc.Path + + encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode())) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) + + if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { + return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) + } + + return nil +} + +func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) { + + var ( + configListener *configurationListener + ) + + r.listenerLock.Lock() + configListener = r.configListener + r.listenerLock.Unlock() + if r.listener == nil { + r.cltLock.Lock() + client := r.client + r.cltLock.Unlock() + if client == nil { + return nil, perrors.New("etcd client broken") + } + + // new client & listener + listener := etcdv3.NewEventListener(r.client) + + r.listenerLock.Lock() + r.listener = listener + r.listenerLock.Unlock() + } + + //娉ㄥ唽鍒癲ataconfig鐨刬nterested + r.dataListener.AddInterestedURL(&svc) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener) + + return configListener, nil +} diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cfe046a98364849f5cedf2c0de9c4e108c4ec30d --- /dev/null +++ b/registry/etcdv3/registry_test.go @@ -0,0 +1,112 @@ +package etcdv3 + +import ( + "context" + "strconv" + "testing" + "time" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func initRegistry(t *testing.T) *etcdV3Registry { + + regurl, err := common.NewURL(context.TODO(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + if err != nil { + t.Fatal(err) + } + + reg, err := newETCDV3Registry(®url) + if err != nil { + t.Fatal(err) + } + + out := reg.(*etcdV3Registry) + out.client.CleanKV() + return out +} + +func (suite *RegistryTestSuite) TestRegister() { + + t := suite.T() + + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg := initRegistry(t) + err := reg.Register(url) + children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers") + if err != nil { + t.Fatal(err) + } + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) + assert.NoError(t, err) +} + +func (suite *RegistryTestSuite) TestSubscribe() { + + t := suite.T() + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg := initRegistry(t) + //provider register + err := reg.Register(url) + if err != nil { + t.Fatal(err) + } + + //consumer register + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + reg2 := initRegistry(t) + + reg2.Register(url) + listener, err := reg2.Subscribe(url) + if err != nil { + t.Fatal(err) + } + + serviceEvent, err := listener.Next() + if err != nil { + t.Fatal(err) + } + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) +} + +func (suite *RegistryTestSuite) TestConsumerDestory() { + + t := suite.T() + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg := initRegistry(t) + _, err := reg.Subscribe(url) + if err != nil { + t.Fatal(err) + } + + //listener.Close() + time.Sleep(1e9) + reg.Destroy() + + assert.Equal(t, false, reg.IsAvailable()) + +} + +func (suite *RegistryTestSuite) TestProviderDestory() { + + t := suite.T() + reg := initRegistry(t) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + reg.Register(url) + + //listener.Close() + time.Sleep(1e9) + reg.Destroy() + assert.Equal(t, false, reg.IsAvailable()) +} diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go new file mode 100644 index 0000000000000000000000000000000000000000..224b12e8a00fbae2e940d338386994e4df1bf409 --- /dev/null +++ b/remoting/etcdv3/client.go @@ -0,0 +1,476 @@ +package etcdv3 + +import ( + "context" + "path" + "sync" + "time" +) +import ( + "github.com/apache/dubbo-go/common/logger" +) +import ( + "github.com/coreos/etcd/clientv3" + "github.com/juju/errors" + "go.etcd.io/etcd/clientv3/concurrency" + "google.golang.org/grpc" +) + +const ( + ConnDelay = 3 + MaxFailTimes = 15 + RegistryETCDV3Client = "etcd registry" +) + +var ( + ErrNilETCDV3Client = errors.New("etcd raw client is nil") // full describe the ERR + ErrKVPairNotFound = errors.New("k/v pair not found") +) + +type Options struct { + name string + endpoints []string + client *Client + timeout time.Duration + heartbeat int // heartbeat second +} + +type Option func(*Options) + +func WithEndpoints(endpoints ...string) Option { + return func(opt *Options) { + opt.endpoints = endpoints + } +} +func WithName(name string) Option { + return func(opt *Options) { + opt.name = name + } +} +func WithTimeout(timeout time.Duration) Option { + return func(opt *Options) { + opt.timeout = timeout + } +} + +func WithHeartbeat(heartbeat int) Option { + return func(opt *Options) { + opt.heartbeat = heartbeat + } +} + +func ValidateClient(container clientFacade, opts ...Option) error { + + options := &Options{ + heartbeat: 1, // default heartbeat + } + for _, opt := range opts { + opt(options) + } + + lock := container.ClientLock() + lock.Lock() + defer lock.Unlock() + + // new Client + if container.Client() == nil { + newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) + if err != nil { + logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + options.name, options.endpoints, options.timeout, err) + return errors.Annotatef(err, "new client (address:%+v)", options.endpoints) + } + container.SetClient(newClient) + } + + // Client lose connection with etcd server + if container.Client().rawClient == nil { + + newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) + if err != nil { + logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + options.name, options.endpoints, options.timeout, err) + return errors.Annotatef(err, "new client (address:%+v)", options.endpoints) + } + container.SetClient(newClient) + } + + return nil +} + +type Client struct { + lock sync.RWMutex + + // these properties are only set once when they are started. + name string + endpoints []string + timeout time.Duration + heartbeat int + + ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg + cancel context.CancelFunc // cancel the ctx, all watcher will stopped + rawClient *clientv3.Client + + exit chan struct{} + Wait sync.WaitGroup +} + +func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) { + + ctx, cancel := context.WithCancel(context.Background()) + rawClient, err := clientv3.New(clientv3.Config{ + Context: ctx, + Endpoints: endpoints, + DialTimeout: timeout, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + }) + if err != nil { + return nil, errors.Annotate(err, "new raw client block connect to server") + } + + c := &Client{ + + name: name, + timeout: timeout, + endpoints: endpoints, + heartbeat: heartbeat, + + ctx: ctx, + cancel: cancel, + rawClient: rawClient, + + exit: make(chan struct{}), + } + + if err := c.maintenanceStatus(); err != nil { + return nil, errors.Annotate(err, "client maintenance status") + } + return c, nil +} + +// NOTICE: need to get the lock before calling this method +func (c *Client) clean() { + + // close raw client + c.rawClient.Close() + + // cancel ctx for raw client + c.cancel() + + // clean raw client + c.rawClient = nil +} + +func (c *Client) stop() bool { + + select { + case <-c.exit: + return true + default: + close(c.exit) + } + return false +} + +func (c *Client) Close() { + + if c == nil { + return + } + + // stop the client + c.stop() + + // wait client maintenance status stop + c.Wait.Wait() + + c.lock.Lock() + if c.rawClient != nil { + c.clean() + } + c.lock.Unlock() + logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints) +} + +func (c *Client) maintenanceStatus() error { + + s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) + if err != nil { + return errors.Annotate(err, "new session with server") + } + + // must add wg before go maintenance status goroutine + c.Wait.Add(1) + go c.maintenanceStatusLoop(s) + return nil +} + +func (c *Client) maintenanceStatusLoop(s *concurrency.Session) { + + defer func() { + c.Wait.Done() + logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name) + }() + + for { + select { + case <-c.Done(): + // Client be stopped, will clean the client hold resources + return + case <-s.Done(): + logger.Warn("etcd server stopped") + c.lock.Lock() + // when etcd server stopped, cancel ctx, stop all watchers + c.clean() + c.lock.Unlock() + return + } + } +} + +// if k not exist will put k/v in etcd +// if k is already exist in etcd, return nil +func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3Client + } + + _, err := c.rawClient.Txn(c.ctx). + If(clientv3.Compare(clientv3.Version(k), "<", 1)). + Then(clientv3.OpPut(k, v, opts...)). + Commit() + if err != nil { + return err + + } + return nil +} + +func (c *Client) delete(k string) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3Client + } + + _, err := c.rawClient.Delete(c.ctx, k) + if err != nil { + return err + + } + return nil +} + +func (c *Client) get(k string) (string, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return "", ErrNilETCDV3Client + } + + resp, err := c.rawClient.Get(c.ctx, k) + if err != nil { + return "", err + } + + if len(resp.Kvs) == 0 { + return "", ErrKVPairNotFound + } + + return string(resp.Kvs[0].Value), nil +} + +func (c *Client) CleanKV() error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3Client + } + + _, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix()) + if err != nil { + return err + } + + return nil +} + +func (c *Client) getChildren(k string) ([]string, []string, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, nil, ErrNilETCDV3Client + } + + resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix()) + if err != nil { + return nil, nil, err + } + + if len(resp.Kvs) == 0 { + return nil, nil, ErrKVPairNotFound + } + + var ( + kList []string + vList []string + ) + + for _, kv := range resp.Kvs { + kList = append(kList, string(kv.Key)) + vList = append(vList, string(kv.Value)) + } + + return kList, vList, nil +} + +func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, ErrNilETCDV3Client + } + + return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil +} + +func (c *Client) watch(k string) (clientv3.WatchChan, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, ErrNilETCDV3Client + } + + return c.rawClient.Watch(c.ctx, k), nil +} + +func (c *Client) keepAliveKV(k string, v string) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3Client + } + + lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) + if err != nil { + return errors.Annotatef(err, "grant lease") + } + + keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) + if err != nil || keepAlive == nil { + c.rawClient.Revoke(c.ctx, lease.ID) + return errors.Annotate(err, "keep alive lease") + } + + _, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID)) + if err != nil { + return errors.Annotate(err, "put k/v with lease") + } + return nil +} + +func (c *Client) Done() <-chan struct{} { + return c.exit +} + +func (c *Client) Valid() bool { + select { + case <-c.exit: + return false + default: + } + + c.lock.RLock() + if c.rawClient == nil { + c.lock.RUnlock() + return false + } + c.lock.RUnlock() + return true +} + +func (c *Client) Create(k string, v string) error { + + err := c.put(k, v) + if err != nil { + return errors.Annotatef(err, "put k/v (key: %s value %s)", k, v) + } + return nil +} + +func (c *Client) Delete(k string) error { + + err := c.delete(k) + if err != nil { + return errors.Annotatef(err, "delete k/v (key %s)", k) + } + + return nil +} + +func (c *Client) RegisterTemp(basePath string, node string) (string, error) { + + completeKey := path.Join(basePath, node) + + err := c.keepAliveKV(completeKey, "") + if err != nil { + return "", errors.Annotatef(err, "keepalive kv (key %s)", completeKey) + } + + return completeKey, nil +} + +func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { + + kList, vList, err := c.getChildren(k) + if err != nil { + return nil, nil, errors.Annotatef(err, "get key children (key %s)", k) + } + return kList, vList, nil +} + +func (c *Client) Get(k string) (string, error) { + + v, err := c.get(k) + if err != nil { + return "", errors.Annotatef(err, "get key value (key %s)", k) + } + + return v, nil +} + +func (c *Client) Watch(k string) (clientv3.WatchChan, error) { + + wc, err := c.watch(k) + if err != nil { + return nil, errors.Annotatef(err, "watch prefix (key %s)", k) + } + return wc, nil +} + +func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { + + wc, err := c.watchWithPrefix(prefix) + if err != nil { + return nil, errors.Annotatef(err, "watch prefix (key %s)", prefix) + } + return wc, nil +} diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c4e4e904b0d4a54ab1146f3b6529de01ae6fd047 --- /dev/null +++ b/remoting/etcdv3/client_test.go @@ -0,0 +1,365 @@ +package etcdv3 + +import ( + "fmt" + "go.etcd.io/etcd/embed" + "net/url" + "path" + "reflect" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/juju/errors" + "google.golang.org/grpc/connectivity" +) + +// tests dataset +var tests = []struct { + input struct { + k string + v string + } +}{ + {input: struct { + k string + v string + }{k: "name", v: "scott.wang"}}, + {input: struct { + k string + v string + }{k: "namePrefix", v: "prefix.scott.wang"}}, + {input: struct { + k string + v string + }{k: "namePrefix1", v: "prefix1.scott.wang"}}, + {input: struct { + k string + v string + }{k: "age", v: "27"}}, +} + +// test dataset prefix +const prefix = "name" + +type ClientTestSuite struct { + suite.Suite + + etcdConfig struct { + name string + endpoints []string + timeout time.Duration + heartbeat int + } + + etcd *embed.Etcd + + client *Client +} + +// start etcd server +func (suite *ClientTestSuite) SetupSuite() { + + t := suite.T() + + DefaultListenPeerURLs := "http://localhost:2382" + DefaultListenClientURLs := "http://localhost:2381" + lpurl, _ := url.Parse(DefaultListenPeerURLs) + lcurl, _ := url.Parse(DefaultListenClientURLs) + cfg := embed.NewConfig() + cfg.LPUrls = []url.URL{*lpurl} + cfg.LCUrls = []url.URL{*lcurl} + cfg.Dir = "/tmp/default.etcd" + e, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + select { + case <-e.Server.ReadyNotify(): + t.Log("Server is ready!") + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + t.Logf("Server took too long to start!") + } + + suite.etcd = e + return +} + +// stop etcd server +func (suite *ClientTestSuite) TearDownSuite() { + suite.etcd.Close() +} + +func (suite *ClientTestSuite) setUpClient() *Client { + c, err := newClient(suite.etcdConfig.name, + suite.etcdConfig.endpoints, + suite.etcdConfig.timeout, + suite.etcdConfig.heartbeat) + if err != nil { + suite.T().Fatal(err) + } + return c +} + +// set up a client for suite +func (suite *ClientTestSuite) SetupTest() { + c := suite.setUpClient() + c.CleanKV() + suite.client = c + return +} + +func (suite *ClientTestSuite) TestClientClose() { + + fmt.Println("called client close") + + c := suite.client + t := suite.T() + + defer c.Close() + if c.rawClient.ActiveConnection().GetState() != connectivity.Ready { + t.Fatal(suite.client.rawClient.ActiveConnection().GetState()) + } +} + +func (suite *ClientTestSuite) TestClientValid() { + + fmt.Println("called client valid") + + c := suite.client + t := suite.T() + + if c.Valid() != true { + t.Fatal("client is not valid") + } + c.Close() + if suite.client.Valid() != false { + t.Fatal("client is valid") + } +} + +func (suite *ClientTestSuite) TestClientDone() { + + c := suite.client + + go func() { + time.Sleep(2 * time.Second) + c.Close() + }() + + c.Wait.Wait() +} + +func (suite *ClientTestSuite) TestClientCreateKV() { + + tests := tests + + c := suite.client + t := suite.T() + + defer suite.client.Close() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + expect := tc.input.v + + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + + value, err := c.Get(k) + if err != nil { + t.Fatal(err) + } + + if value != expect { + t.Fatalf("expect %v but get %v", expect, value) + } + + } +} + +func (suite *ClientTestSuite) TestClientDeleteKV() { + + tests := tests + c := suite.client + t := suite.T() + + defer c.Close() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + expect := ErrKVPairNotFound + + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + + if err := c.Delete(k); err != nil { + t.Fatal(err) + } + + _, err := c.Get(k) + if errors.Cause(err) == expect { + continue + } + + if err != nil { + t.Fatal(err) + } + } + +} + +func (suite *ClientTestSuite) TestClientGetChildrenKVList() { + + tests := tests + + c := suite.client + t := suite.T() + + var expectKList []string + var expectVList []string + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if strings.Contains(k, prefix) { + expectKList = append(expectKList, k) + expectVList = append(expectVList, v) + } + + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + } + + kList, vList, err := c.GetChildrenKVList(prefix) + if err != nil { + t.Fatal(err) + } + + if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) { + return + } + + t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList) + +} + +func (suite *ClientTestSuite) TestClientWatch() { + + tests := tests + + c := suite.client + t := suite.T() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + + defer wg.Done() + + wc, err := c.watch(prefix) + if err != nil { + t.Fatal(err) + } + + for e := range wc { + + for _, event := range e.Events { + t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value) + } + } + + }() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + + if err := c.delete(k); err != nil { + t.Fatal(err) + } + } + + c.Close() + + wg.Wait() + +} + +func (suite *ClientTestSuite) TestClientRegisterTemp() { + + c := suite.client + observeC := suite.setUpClient() + t := suite.T() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + completePath := path.Join("scott", "wang") + wc, err := observeC.watch(completePath) + if err != nil { + t.Fatal(err) + } + + for e := range wc { + + for _, event := range e.Events { + + if event.Type == mvccpb.DELETE { + t.Logf("complete key (%s) is delete", completePath) + wg.Done() + observeC.Close() + return + } + t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value) + } + } + }() + + _, err := c.RegisterTemp("scott", "wang") + if err != nil { + t.Fatal(err) + } + + time.Sleep(2 * time.Second) + c.Close() + + wg.Wait() +} + +func TestClientSuite(t *testing.T) { + suite.Run(t, &ClientTestSuite{ + etcdConfig: struct { + name string + endpoints []string + timeout time.Duration + heartbeat int + }{ + name: "test", + endpoints: []string{"localhost:2381"}, + timeout: time.Second, + heartbeat: 1, + }, + }) +} diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go new file mode 100644 index 0000000000000000000000000000000000000000..b205f7f01a71e75657236dcae085a0d12bef0d13 --- /dev/null +++ b/remoting/etcdv3/facade.go @@ -0,0 +1,81 @@ +package etcdv3 + +import ( + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +import ( + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" +) + +type clientFacade interface { + Client() *Client + SetClient(*Client) + ClientLock() *sync.Mutex + WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container + GetDone() chan struct{} //for zk client control + RestartCallBack() bool + common.Node +} + +func HandleClientRestart(r clientFacade) { + + var ( + err error + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for { + select { + case <-r.GetDone(): + logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...") + break LOOP + // re-register all services + case <-r.Client().Done(): + r.ClientLock().Lock() + clientName := RegistryETCDV3Client + timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + endpoint := r.GetUrl().Location + r.Client().Close() + r.SetClient(nil) + r.ClientLock().Unlock() + + // 鎺k锛岀洿鑷虫垚鍔� + failTimes = 0 + for { + select { + case <-r.GetDone(): + logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 闃叉鐤媯閲嶈繛etcd + } + err = ValidateClient(r, + WithName(clientName), + WithEndpoints(endpoint), + WithTimeout(timeout), + ) + logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", + endpoint, perrors.WithStack(err)) + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} diff --git a/remoting/etcdv3/facede_test.go b/remoting/etcdv3/facede_test.go new file mode 100644 index 0000000000000000000000000000000000000000..94e9056c5990e210d1e7a70b09b42034cde326e4 --- /dev/null +++ b/remoting/etcdv3/facede_test.go @@ -0,0 +1,69 @@ +package etcdv3 + +import ( + "context" + "sync" + "time" + + "github.com/apache/dubbo-go/common" +) + +type mockFacade struct { + client *Client + cltLock sync.Mutex + wg sync.WaitGroup + URL *common.URL + done chan struct{} +} + +func (r *mockFacade) Client() *Client { + return r.client +} + +func (r *mockFacade) SetClient(client *Client) { + r.client = client +} + +func (r *mockFacade) ClientLock() *sync.Mutex { + return &r.cltLock +} + +func (r *mockFacade) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *mockFacade) GetDone() chan struct{} { + return r.done +} + +func (r *mockFacade) GetUrl() common.URL { + return *r.URL +} + +func (r *mockFacade) Destroy() { + close(r.done) + r.wg.Wait() +} + +func (r *mockFacade) RestartCallBack() bool { + return true +} +func (r *mockFacade) IsAvailable() bool { + return true +} + +func (suite *ClientTestSuite) TestFacade() { + + c := suite.client + t := suite.T() + + url, err := common.NewURL(context.Background(), "mock://127.0.0.1:2379") + if err != nil { + t.Fatal(err) + } + mock := &mockFacade{client: c, URL: &url} + go HandleClientRestart(mock) + c.Close() + + time.Sleep(2 * time.Second) +} diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..6402385b8936a8badf4ad1da82ff9e6f7e5f2bcc --- /dev/null +++ b/remoting/etcdv3/listener.go @@ -0,0 +1,216 @@ +package etcdv3 + +import ( + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" +) + +import ( + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/juju/errors" +) + +type EventListener struct { + client *Client + keyMapLock sync.Mutex + keyMap map[string]struct{} + wg sync.WaitGroup +} + +func NewEventListener(client *Client) *EventListener { + return &EventListener{ + client: client, + keyMap: make(map[string]struct{}), + } +} + +// Listen on a spec key +// this method will return true when spec key deleted, +// this method will return false when deep layer connection lose +func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { + l.wg.Add(1) + defer l.wg.Done() + for { + wc, err := l.client.Watch(key) + if err != nil { + logger.Warnf("WatchExist{key:%s} = error{%v}", key, err) + return false + } + + select { + + // client stopped + case <-l.client.Done(): + logger.Warnf("etcd client stopped") + return false + + // client ctx stop + case <-l.client.ctx.Done(): + logger.Warnf("etcd client ctx cancel") + return false + + // handle etcd events + case e, ok := <-wc: + if !ok { + logger.Warnf("etcd watch-chan closed") + return false + } + + if e.Err() != nil { + logger.Errorf("etcd watch ERR {err: %s}", e.Err()) + continue + } + for _, event := range e.Events { + if l.handleEvents(event, listener...) { + // if event is delete + return true + } + } + } + } + + return false +} + +// return true mean the event type is DELETE +// return false mean the event type is CREATE || UPDATE +func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool { + + logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key) + + switch event.Type { + // the etcdv3 event just include PUT && DELETE + case mvccpb.PUT: + for _, listener := range listeners { + switch event.IsCreate() { + case true: + logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EventTypeAdd, + Content: string(event.Kv.Value), + }) + case false: + logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EvnetTypeUpdate, + Content: string(event.Kv.Value), + }) + } + } + return false + case mvccpb.DELETE: + logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + return true + } + + panic("unreachable") +} + +// Listen on a set of key with spec prefix +func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { + + l.wg.Add(1) + defer l.wg.Done() + for { + wc, err := l.client.WatchWithPrefix(prefix) + if err != nil { + logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) + } + + select { + + // client stopped + case <-l.client.Done(): + logger.Warnf("etcd client stopped") + return + + // client ctx stop + case <-l.client.ctx.Done(): + logger.Warnf("etcd client ctx cancel") + return + + // etcd event stream + case e, ok := <-wc: + + if !ok { + logger.Warnf("etcd watch-chan closed") + return + } + + if e.Err() != nil { + logger.Errorf("etcd watch ERR {err: %s}", e.Err()) + continue + } + for _, event := range e.Events { + l.handleEvents(event, listener...) + } + } + } +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + +// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener +// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent +// | +// --------> ListenServiceNodeEvent +func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { + + l.keyMapLock.Lock() + _, ok := l.keyMap[key] + l.keyMapLock.Unlock() + if ok { + logger.Warnf("etcdv3 key %s has already been listened.", key) + return + } + + l.keyMapLock.Lock() + l.keyMap[key] = struct{}{} + l.keyMapLock.Unlock() + + keyList, valueList, err := l.client.getChildren(key) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, errors.Annotate(err, "get children")) + } + + logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) + + for i, k := range keyList { + logger.Infof("got children list key -> %s", k) + if !listener.DataChange(remoting.Event{ + Path: k, + Action: remoting.EventTypeAdd, + Content: valueList[i], + }) { + continue + } + } + + logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + go func(key string, listener remoting.DataListener) { + l.ListenServiceNodeEventWithPrefix(key, listener) + logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) + }(key, listener) + + logger.Infof("listen dubbo service key{%s}", key) + go func(key string) { + if l.ListenServiceNodeEvent(key) { + listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) + } + logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key) + }(key) +} + +func (l *EventListener) Close() { + l.wg.Wait() +} diff --git a/remoting/etcdv3/listener_test.go b/remoting/etcdv3/listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..648cf49fb990a6a63dacf8f8e7f40104f68fa7d8 --- /dev/null +++ b/remoting/etcdv3/listener_test.go @@ -0,0 +1,82 @@ +package etcdv3 + +import ( + "time" + + "github.com/apache/dubbo-go/remoting" + "github.com/stretchr/testify/assert" +) + +var changedData = ` + dubbo.consumer.request_timeout=3s + dubbo.consumer.connect_timeout=5s + dubbo.application.organization=ikurento.com + dubbo.application.name=BDTService + dubbo.application.module=dubbogo user-info server + dubbo.application.version=0.0.1 + dubbo.application.owner=ZX + dubbo.application.environment=dev + dubbo.registries.hangzhouzk.protocol=zookeeper + dubbo.registries.hangzhouzk.timeout=3s + dubbo.registries.hangzhouzk.address=127.0.0.1:2181 + dubbo.registries.shanghaizk.protocol=zookeeper + dubbo.registries.shanghaizk.timeout=3s + dubbo.registries.shanghaizk.address=127.0.0.1:2182 + dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo + dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider + dubbo.service.com.ikurento.user.UserProvider.loadbalance=random + dubbo.service.com.ikurento.user.UserProvider.warmup=100 + dubbo.service.com.ikurento.user.UserProvider.cluster=failover +` + +func (suite *ClientTestSuite) TestListener() { + + var tests = []struct { + input struct { + k string + v string + } + }{ + {input: struct { + k string + v string + }{k: "/dubbo", v: changedData}}, + } + + c := suite.client + t := suite.T() + + listener := NewEventListener(c) + dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} + listener.ListenServiceEvent("/dubbo", dataListener) + + // NOTICE: direct listen will lose create msg + time.Sleep(time.Second) + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + + } + msg := <-dataListener.rc + assert.Equal(t, changedData, msg.Content) +} + +type mockDataListener struct { + eventList []remoting.Event + client *Client + changedData string + + rc chan remoting.Event +} + +func (m *mockDataListener) DataChange(eventType remoting.Event) bool { + m.eventList = append(m.eventList, eventType) + if eventType.Content == m.changedData { + m.rc <- eventType + } + return true +}