diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index ec4371da4768298fe0928ba6ef88c2be7060832e..4232c9d90bee5a48ed988fa5449c3acbaf64f100 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -85,22 +85,23 @@ func (s *TestService1) Version() string { func TestServiceMap_Register(t *testing.T) { // lowercase s0 := &testService{} - methods, err := ServiceMap.Register("testporotocol", s0) + // methods, err := ServiceMap.Register("testporotocol", s0) + _, err := ServiceMap.Register("testporotocol", s0) assert.EqualError(t, err, "type testService is not exported") // succ s := &TestService{} - methods, err = ServiceMap.Register("testporotocol", s) + methods, err := ServiceMap.Register("testporotocol", s) assert.NoError(t, err) assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods) // repeat - methods, err = ServiceMap.Register("testporotocol", s) + _, err = ServiceMap.Register("testporotocol", s) assert.EqualError(t, err, "service already defined: com.test.Path") // no method s1 := &TestService1{} - methods, err = ServiceMap.Register("testporotocol", s1) + _, err = ServiceMap.Register("testporotocol", s1) assert.EqualError(t, err, "type com.test.Path1 has no exported methods of suitable type") ServiceMap = &serviceMap{ diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 6b526b0c67df2d693214295c94dfeb35200ffec4..ef0761efcd0d2cee47425dd5e1099200d3be6a70 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -69,7 +69,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu err = c.client.Create(c.rootPath) c.listener.ListenServiceEvent(c.rootPath, c.cacheListener) - return c, nil + return c, err } @@ -95,7 +95,7 @@ func newMockZookeeperDynamicConfiguration(url *common.URL, opts ...zookeeper.Opt err = c.client.Create(c.rootPath) go c.listener.ListenServiceEvent(c.rootPath, c.cacheListener) - return tc, c, nil + return tc, c, err } diff --git a/go.mod b/go.mod index 8fd3c215fbb38b9c54a6570630f5ef32ad8f56c1..d31dc9f3b9f87c787e2ad0bef26f082d74cb8186 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,23 @@ module github.com/apache/dubbo-go require ( - github.com/dubbogo/getty v1.0.7 - github.com/dubbogo/gost v1.0.0 - github.com/dubbogo/hessian2 v1.1.2 + github.com/AlexStocks/goext v0.3.2 + github.com/AlexStocks/log4go v1.0.2 // indirect + github.com/coreos/etcd v3.3.13+incompatible // indirect + github.com/dubbogo/getty v1.2.0 + github.com/dubbogo/gost v1.1.1 + github.com/dubbogo/hessian2 v1.2.0 + github.com/gogo/protobuf v1.2.1 // indirect + github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 github.com/magiconair/properties v1.8.1 + github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect + github.com/mattn/go-isatty v0.0.8 // indirect github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 + go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 + google.golang.org/grpc v1.22.0 // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index 859e9aab1c882f3864668b45c6ac47ce7f01b262..d0dd872d571c2ff77dfbba410a09f9361c459cc1 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,43 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0= +github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkglDGSl3Hc= +github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk= +github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= +github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= -github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/gost v1.0.0 h1:obKvpJYdrIY2BidHYwYoj2E50OtwCDqVVVTcH2nnhAY= -github.com/dubbogo/gost v1.0.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= -github.com/dubbogo/hessian2 v1.1.2 h1:SRkPzIwVv2D+ZUOCE2XuI5kANoL01ShhAheLcc3usJE= -github.com/dubbogo/hessian2 v1.1.2/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= +github.com/dubbogo/getty v1.2.0 h1:LmP/R/yCYp5/ubUTm5Xa7QwE3EnFFSjguJH8DJruJ70= +github.com/dubbogo/getty v1.2.0/go.mod h1:p9Pyk18a+5Aa0GZ546aCzlehEfvEof0jAF0+QQcEPYQ= +github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= +github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw= +github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 h1:aQ4ahUIm1FHeZ2XsL1lbzkn9NxhkOUeiMWITOA7OOms= +github.com/juju/errors v1.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e h1:hB2xlXdHp/pmPZq0y3QnmWAArdw9PqbmotexnWx/FU8= +github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -22,6 +47,8 @@ github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= +go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -29,13 +56,28 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw= +google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index f4a5f4a8474b30b13bf7598bc634ac722955d91b..a39b0aef743d18c24f41b41655abfada996503e8 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -62,13 +62,13 @@ func TestClient_Call(t *testing.T) { } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - user := &User{} + // user := &User{} //err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) //assert.NoError(t, err) //assert.NotEqual(t, "", user.Id) //assert.NotEqual(t, "", user.Name) - user = &User{} + user := &User{} err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index 8d3ac5dabb69acbfc3ca9846ebd1575a53b32eb8..ade74246121b5f275c8dbeaa5923228dbab2804f 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -42,7 +42,7 @@ func TestJsonClientCodec_Write(t *testing.T) { assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n", string(data)) cd.Args = 1 - data, err = codec.Write(cd) + _, err = codec.Write(cd) assert.EqualError(t, err, "unsupported param type: int") } diff --git a/registry/etcd/registry.go b/registry/etcd/registry.go index 16aa103e8919b10c64cb1ea9649ede4ff4309437..677fab8bd158e16f08d928cec1d52159ab125f7d 100644 --- a/registry/etcd/registry.go +++ b/registry/etcd/registry.go @@ -72,6 +72,8 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { return nil, errors.Annotate(err, "block connect to etcd server") } + rawClient.ActiveConnection() + rootCtx, cancel := context.WithCancel(context.Background()) client, err := etcd.NewClient(rawClient, etcd.WithTTL(time.Second), etcd.WithContext(rootCtx)) if err != nil { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 6b15133319975ee6cb305482b1cfd17aacbca5b6..c272c5bba615c68c452f36819edef0f2a821a5dc 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -211,7 +211,6 @@ func (r *zkRegistry) Register(conf common.URL) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) switch role { case common.CONSUMER: - ok = false r.cltLock.Lock() _, ok = r.services[conf.Key()] r.cltLock.Unlock() @@ -232,7 +231,6 @@ func (r *zkRegistry) Register(conf common.URL) error { case common.PROVIDER: // 检验服务是否已经注册过 - ok = false r.cltLock.Lock() // 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Path, // 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index ba2755fa9923d9e6c11a1908594a176ace458691..22658e1adcd57d6b88bdbf3949cae08f90f1cfb5 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -38,9 +38,9 @@ func Test_Register(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, err := newMockZkRegistry(®url) + ts, reg, _ := newMockZkRegistry(®url) defer ts.Stop() - err = reg.Register(url) + err := reg.Register(url) children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) assert.NoError(t, err) @@ -49,10 +49,10 @@ func Test_Register(t *testing.T) { func Test_Subscribe(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, err := newMockZkRegistry(®url) + ts, reg, _ := newMockZkRegistry(®url) //provider register - err = reg.Register(url) + err := reg.Register(url) assert.NoError(t, err) if err != nil { @@ -61,12 +61,12 @@ func Test_Subscribe(t *testing.T) { //consumer register regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) - _, reg2, err := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) - err = reg2.Register(url) - listener, err := reg2.Subscribe(url) + reg2.Register(url) + listener, _ := reg2.Subscribe(url) - serviceEvent, err := listener.Next() + serviceEvent, _ := listener.Next() assert.NoError(t, err) if err != nil { return @@ -103,7 +103,7 @@ func Test_ProviderDestory(t *testing.T) { defer ts.Stop() assert.NoError(t, err) - err = reg.Register(url) + reg.Register(url) //listener.Close() time.Sleep(1e9) diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go new file mode 100644 index 0000000000000000000000000000000000000000..8cfb47f7cf5c6433d648b3ad3f76935f48bd376b --- /dev/null +++ b/remoting/etcdv3/client.go @@ -0,0 +1,656 @@ +package etcdv3 + +import ( + "context" + "path" + "sync" + "time" +) + +import ( + "github.com/AlexStocks/goext/database/etcd" + "github.com/juju/errors" + perrors "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +const ( + ConnDelay = 3 + MaxFailTimes = 15 +) + +var ( + ErrNilETCDV3ClientConn = errors.New("etcdv3client{conn} is nil") + ErrKVPairNotFound = errors.New("k/v pair not found") +) + +// clientSet for etcdv3 +type clientSet struct { + lock sync.RWMutex // protect all element in + + // clientSet + gxClient *gxetcd.Client + rawClient *clientv3.Client + + // client controller used to change client behave + ctx context.Context // if etcd connection lose, the ctx.Done will be sent msg + cancel context.CancelFunc + + // c was filled, start maintenanceStatus + startMaintenanceChan chan struct{} + + c *Client +} + +func newClientSet(endpoints []string, timeout time.Duration, c *Client) error { + + rootCtx, cancel := context.WithCancel(context.Background()) + + // connect to etcd + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: timeout, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + }) + if err != nil { + return errors.Annotate(err, "block connect to etcd server") + } + + // share context + gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx)) + if err != nil { + return errors.Annotate(err, "new etcd client") + } + + out := &clientSet{ + gxClient: gxClient, + rawClient: client, + ctx: rootCtx, + cancel: cancel, + startMaintenanceChan: make(chan struct{}), + c: c, + } + + err = out.maintenanceStatus() + if err != nil { + return errors.Annotate(err, "maintenance connection status") + } + + // set clientset to client + c.cs = out + + return nil +} + +func (c *clientSet) maintenanceStatus() error { + + c.c.Wait.Add(1) + aliveResp, err := c.gxClient.KeepAlive() + if err != nil { + return errors.Annotatef(err, "etcd keep alive") + } + // start maintenance the connection status + go c.maintenanceStatusLoop(aliveResp) + return nil +} + +func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAliveResponse) { + + defer func() { + c.c.Wait.Done() + logger.Infof("etcd {path:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name) + }() + + // get signal, will start maintenanceStatusLoop + <-c.startMaintenanceChan + + for { + select { + case <-c.c.Done(): + // client done + return + case <-c.ctx.Done(): + // client context exit + logger.Warn("etcd clientSet context done") + return + case msg, ok := <-aliveResp: + // etcd connection lose + // NOTICE + // if clientSet.Client is nil, it will panic + if !ok { + + logger.Warnf("etcd server stop at term: %#v", msg) + + c.c.Lock() // hold the c.Client lock + c.c.cs.clean() + + // NOTICE + // uninstall the cs from client + c.c.cs = nil + c.c.Unlock() + return + } + } + } +} + +func (c *clientSet) put(k string, v string, opts ...clientv3.OpOption) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3ClientConn + } + + _, err := c.rawClient.Txn(c.ctx). + If(clientv3.Compare(clientv3.Version(k), "<", 1)). + Then(clientv3.OpPut(k, v, opts...)). + Commit() + if err != nil { + return err + + } + return nil +} + +func (c *clientSet) delete(k string) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3ClientConn + } + + _, err := c.rawClient.Delete(c.ctx, k) + if err != nil { + return err + + } + return nil +} + +func (c *clientSet) get(k string) (string, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return "", ErrNilETCDV3ClientConn + } + + resp, err := c.rawClient.Get(c.ctx, k) + if err != nil { + return "", err + } + + if len(resp.Kvs) == 0 { + return "", ErrKVPairNotFound + } + + return string(resp.Kvs[0].Value), nil +} + +func (c *clientSet) getChildrenW(k string) ([]string, []string, clientv3.WatchChan, error) { + + kList, vList, err := c.getChildren(k) + if err != nil { + return nil, nil, nil, errors.Annotatef(err, "get children %s", k) + } + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, nil, nil, ErrNilETCDV3ClientConn + } + + wc := c.rawClient.Watch(c.ctx, k, clientv3.WithPrefix()) + return kList, vList, wc, nil +} + +func (c *clientSet) watch(k string) (clientv3.WatchChan, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, ErrNilETCDV3ClientConn + } + + _, err := c.get(k) + if err != nil { + return nil, errors.Annotatef(err, "watch pre check key %s", k) + } + + return c.rawClient.Watch(c.ctx, k), nil +} + +func (c *clientSet) getChildren(k string) ([]string, []string, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, nil, ErrNilETCDV3ClientConn + } + + resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix()) + if err != nil { + return nil, nil, err + } + + if len(resp.Kvs) == 0 { + return nil, nil, ErrKVPairNotFound + } + + var ( + kList []string + vList []string + ) + + for _, kv := range resp.Kvs { + kList = append(kList, string(kv.Key)) + vList = append(vList, string(kv.Value)) + } + + return kList, vList, nil +} + +func (c *clientSet) keepAliveKV(k string, v string) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3ClientConn + } + + lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) + if err != nil { + return errors.Annotatef(err, "grant lease") + } + + keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) + if err != nil || keepAlive == nil { + c.rawClient.Revoke(c.ctx, lease.ID) + return errors.Annotate(err, "keep alive lease") + } + + err = c.put(k, v, clientv3.WithLease(lease.ID)) + if err != nil { + return errors.Annotate(err, "put k/v with lease") + } + return nil +} + +// because this method will be called by more than one goroutine +// this method will hold clientset lock +func (c *clientSet) clean() { + c.lock.Lock() + if c.gxClient != nil { + + // close gx client, it will close raw etcdv3 client + c.gxClient.Close() + c.gxClient = nil + c.rawClient = nil + + // cancel all context + c.cancel() + c.ctx = nil + + c.lock.Unlock() + return + } + c.lock.Unlock() +} + +type Client struct { + name string + endpoints []string + timeout time.Duration + + sync.Mutex // for control clientSet && event registry + cs *clientSet + + done chan struct{} + Wait sync.WaitGroup + eventRegistry map[string][]clientv3.WatchChan +} + +type Options struct { + name string + endpoints []string + client *Client +} + +type Option func(*Options) + +func WithEndpoints(endpoints ...string) Option { + return func(opt *Options) { + opt.endpoints = endpoints + } +} +func WithName(name string) Option { + return func(opt *Options) { + opt.name = name + } +} + +func StateToString(state connectivity.State) string { + switch state { + case connectivity.Shutdown: + return "etcdv3 disconnected" + case connectivity.TransientFailure: + return "etcdv3 transient failure" + case connectivity.Idle: + return "etcdv3 connect idle" + case connectivity.Ready: + return "etcdv3 client ready" + case connectivity.Connecting: + return "etcdv3 client connecting" + default: + return state.String() + } + + return "etcdv3 unknown state" +} + +func ValidateClient(container clientFacade, opts ...Option) error { + var ( + err error + ) + options := &Options{} + for _, opt := range opts { + opt(options) + } + + err = nil + + lock := container.ClientLock() + url := container.GetUrl() + + lock.Lock() + defer lock.Unlock() + + // bootstrap all clientset + if container.Client() == nil { + //in dubbp ,every registry only connect one node ,so this is []string{r.Address} + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + logger.Errorf("timeout config %v is invalid ,err is %v", + url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) + return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location) + } + newClient, err := newClient(options.name, []string{url.Location}, timeout) + if err != nil { + logger.Warnf("newETCDV3Client(name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}", + options.name, url.Location, timeout.String(), err) + return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location) + } + container.SetClient(newClient) + } + + if container.Client().cs == nil { + + err = newClientSet(container.Client().endpoints, container.Client().timeout, container.Client()) + if err != nil { + return errors.Annotate(err, "new client set") + } + container.Client().cs.startMaintenanceChan <- struct{}{} + } + + return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.PrimitiveURL) +} + +func newClient(name string, endpoints []string, timeout time.Duration) (*Client, error) { + + var ( + err error + out *Client + ) + out = &Client{ + name: name, + endpoints: endpoints, + timeout: timeout, + done: make(chan struct{}), + eventRegistry: make(map[string][]clientv3.WatchChan), + } + + err = newClientSet(endpoints, timeout, out) + if err != nil { + return nil, errors.Annotate(err, "new client set") + } + + // start maintenanceChan + out.cs.startMaintenanceChan <- struct{}{} + return out, nil +} + +func (c *Client) stop() bool { + select { + case <-c.done: + return true + default: + close(c.done) + } + + return false +} +func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) { + + if key == "" || wc == nil { + return + } + + c.Lock() + a := c.eventRegistry[key] + a = append(a, wc) + c.eventRegistry[key] = a + + go func() { + wcc := c.cs.rawClient.Watch(c.cs.ctx, key) + for msg := range wcc { + wc <- msg + } + // when wcc close, close the wc + close(wc) + }() + + logger.Debugf("etcdClient{%s} register event{path:%s, ptr:%p}", c.name, key, wc) + c.Unlock() +} + +func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) { + + if key == "" { + return + } + + c.Lock() + defer c.Unlock() + infoList, ok := c.eventRegistry[key] + if !ok { + return + } + for i, e := range infoList { + if e == event { + arr := infoList + infoList = append(arr[:i], arr[i+1:]...) + logger.Debugf("etcdClient{%s} unregister event{path:%s, event:%p}", c.name, key, event) + } + } + logger.Debugf("after etcdClient{%s} unregister event{path:%s, event:%p}, array length %d", + c.name, key, event, len(infoList)) + if len(infoList) == 0 { + delete(c.eventRegistry, key) + } else { + c.eventRegistry[key] = infoList + } +} + +func (c *Client) Done() <-chan struct{} { + return c.done +} + +func (c *Client) Valid() bool { + select { + case <-c.done: + return false + default: + } + + valid := true + c.Lock() + if c.cs == nil { + valid = false + } + c.Unlock() + + return valid +} + +func (c *Client) Close() { + if c == nil { + return + } + + c.stop() + c.Wait.Wait() + c.Lock() + if c.cs != nil { + c.cs.clean() + c.cs = nil + } + c.Unlock() + logger.Warnf("etcd client{name:%s, etcd addr:%s} exit now.", c.name, c.endpoints) +} + +func (c *Client) Create(k string, v string) error { + + err := ErrNilETCDV3ClientConn + + c.Lock() + if c.cs != nil { + err = c.cs.put(k, v) + } + c.Unlock() + return errors.Annotatef(err, "etcd client put key %s value %s", k, v) +} + +func (c *Client) Delete(key string) error { + + err := ErrNilETCDV3ClientConn + c.Lock() + if c.cs != nil { + err = c.cs.delete(key) + } + c.Unlock() + return errors.Annotatef(err, "etcd client delete (basePath:%s)", key) +} + +func (c *Client) RegisterTemp(basePath string, node string) (string, error) { + + err := ErrNilETCDV3ClientConn + completePath := path.Join(basePath, node) + c.Lock() + if c.cs != nil { + err = c.cs.keepAliveKV(completePath, "") + } + c.Unlock() + logger.Debugf("etcdClient{%s} create a tmp node:%s\n", c.name, completePath) + + if err != nil { + return "", errors.Annotatef(err, "etcd client create tmp k %s", completePath) + } + + return completePath, nil +} + +func (c *Client) GetChildrenW(path string) ([]string, clientv3.WatchChan, error) { + + var ( + children []string + err error + wc clientv3.WatchChan + ) + err = ErrNilETCDV3ClientConn + c.Lock() + if c.cs != nil { + children, _, wc, err = c.cs.getChildrenW(path) + } + c.Unlock() + if err != nil { + if errors.Cause(err) == ErrKVPairNotFound { + return nil, nil, errors.Annotatef(err,"path{%s} has none children", path) + } + logger.Errorf("etcdv3.ChildrenW(path{%s}) = error(%v)", path, err) + return nil, nil, errors.Annotatef(err, "etcdv3.ChildrenW(path:%s)", path) + } + + return children, wc, nil +} + +func (c *Client) GetChildren(path string) ([]string, error) { + var ( + err error + children []string + ) + + err = ErrNilETCDV3ClientConn + c.Lock() + if c.cs != nil { + children, _, err = c.cs.getChildren(path) + } + c.Unlock() + if err != nil { + if errors.Cause(err) == ErrKVPairNotFound { + return nil, errors.Annotatef(err,"path{%s} has none children", path) + } + logger.Errorf("clientv3.Children(path{%s}) = error(%v)", path, perrors.WithStack(err)) + return nil, errors.Annotatef(err, "clientv3.Children(path:%s)", path) + } + return children, nil +} + +func (c *Client) ExistW(path string) (clientv3.WatchChan, error) { + + var ( + err = ErrNilETCDV3ClientConn + out clientv3.WatchChan + ) + + c.Lock() + if c.cs != nil { + _, err = c.cs.watch(path) + } + c.Unlock() + if err != nil { + if errors.Cause(err) == ErrKVPairNotFound { + return nil, errors.Annotatef(err, "path{%s} not exist", path) + } + return nil, errors.Annotatef(err, "clientv3.ExistW(path:%s)", path) + } + + return out, nil +} + +func (c *Client) GetContent(path string) ([]byte, error) { + + c.Lock() + value, err := c.cs.get(path) + if err != nil { + return nil, errors.Annotatef(err, "client set get: %s", path) + } + c.Unlock() + + return []byte(value), nil +} diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dde1a00c55e3b872e7754e79936911d9c922e3b0 --- /dev/null +++ b/remoting/etcdv3/client_test.go @@ -0,0 +1 @@ +package etcdv3 diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go new file mode 100644 index 0000000000000000000000000000000000000000..7f3add21cb67f63a376ffaa955772a11d3749856 --- /dev/null +++ b/remoting/etcdv3/facade.go @@ -0,0 +1,74 @@ +package etcdv3 + +import ( + "sync" +) +import ( + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" +) + +type clientFacade interface { + Client() *Client + SetClient(*Client) + ClientLock() *sync.Mutex + WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container + GetDone() chan struct{} //for zk client control + RestartCallBack() bool + common.Node +} + +func HandleClientRestart(r clientFacade) { + + var ( + err error + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for { + select { + case <-r.GetDone(): + logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...") + break LOOP + // re-register all services + case <-r.Client().Done(): + r.ClientLock().Lock() + r.Client().Close() + clientName := r.Client().name + endpoints := r.Client().endpoints + r.SetClient(nil) + r.ClientLock().Unlock() + + // 接zk,直至成功 + failTimes = 0 + for { + select { + case <-r.GetDone(): + logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 防止疯狂重连etcd + } + err = ValidateClient(r, WithName(clientName), WithEndpoints(endpoints...)) + logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", + endpoints, perrors.WithStack(err)) + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} + diff --git a/remoting/etcdv3/facede_test.go b/remoting/etcdv3/facede_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dde1a00c55e3b872e7754e79936911d9c922e3b0 --- /dev/null +++ b/remoting/etcdv3/facede_test.go @@ -0,0 +1 @@ +package etcdv3 diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..ea71233db7c32964fca0dd884fc62c9afdf4606d --- /dev/null +++ b/remoting/etcdv3/listener.go @@ -0,0 +1,373 @@ +package etcdv3 + +import ( + "gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path" + pathlib "path" + "sync" + "time" +) + +import ( + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" + "github.com/samuel/go-zookeeper/zk" +) + +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" +) + +type EventListener struct { + client *Client + pathMapLock sync.Mutex + pathMap map[string]struct{} + wg sync.WaitGroup +} + +func NewEventListener(client *Client) *EventListener { + return &EventListener{ + client: client, + pathMap: make(map[string]struct{}), + } +} +func (l *EventListener) SetClient(client *Client) { + l.client = client +} + +// this method will return true when spec path deleted, +// this method will return false when deep layer connection lose +func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting.DataListener) bool { + l.wg.Add(1) + defer l.wg.Done() + for { + keyEventCh, err := l.client.ExistW(path) + if err != nil { + logger.Warnf("existW{key:%s} = error{%v}", path, err) + return false + } + + select { + // client watch ctx stop + // server stopped + case <-l.client.cs.ctx.Done(): + return false + + // client stopped + case <-l.client.Done(): + return false + + // etcd event stream + case e := <-keyEventCh: + + if e.Err() != nil{ + logger.Warnf("get a etcd event {err: %s}", e.Err()) + } + for _, event := range e.Events{ + logger.Warnf("get a etcd Event{type:%s, path:%s,}", + event.Type.String(), event.Kv.Key ) + switch event.Type { + case mvccpb.PUT: + if len(listener) > 0 { + if event.IsCreate(){ + logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) + listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value)}) + }else{ + logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) + listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EvnetTypeUpdate, Content: string(event.Kv.Value)}) + } + } + case mvccpb.DELETE: + logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + return true + } + } + } + } + + return false +} + + +func (l *EventListener) handleNodeEvent(path string, children []string, listener remoting.DataListener) { + contains := func(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + + return false + } + + newChildren, err := l.client.GetChildren(path) + if err != nil { + logger.Errorf("path{%s} child nodes changed, etcdV3.Children() = error{%v}", path, perrors.WithStack(err)) + return + } + + // a node was added -- listen the new node + var ( + newNode string + ) + for _, n := range newChildren { + if contains(children, n) { + continue + } + + newNode = pathlib.Join(path, n) + logger.Infof("add zkNode{%s}", newNode) + content, _, err := l.client.Conn.Get(newNode) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err)) + } + + if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) { + continue + } + // listen l service node + go func(node, childNode string) { + logger.Infof("delete zkNode{%s}", node) + if l.ListenServiceNodeEvent(node, listener) { + logger.Infof("delete content{%s}", childNode) + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) + } + logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + }(newNode, n) + } + + // old node was deleted + var oldNode string + for _, n := range children { + if contains(newChildren, n) { + continue + } + + oldNode = path.Join(zkPath, n) + logger.Warnf("delete zkPath{%s}", oldNode) + + if err != nil { + logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) + continue + } + listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) + } +} + +func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { + l.wg.Add(1) + defer l.wg.Done() + + var ( + failTimes int + event chan struct{} + zkEvent zk.Event + ) + event = make(chan struct{}, 4) + defer close(event) + for { + // get current children for a zkPath + children, childEventCh, err := l.client.GetChildrenW(zkPath) + if err != nil { + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) + // clear the event channel + CLEAR: + for { + select { + case <-event: + default: + break CLEAR + } + } + l.client.RegisterEvent(zkPath, &event) + select { + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): + l.client.UnregisterEvent(zkPath, &event) + continue + case <-l.client.Done(): + l.client.UnregisterEvent(zkPath, &event) + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + return + case <-event: + logger.Infof("get zk.EventNodeDataChange notify event") + l.client.UnregisterEvent(zkPath, &event) + l.handleZkNodeEvent(zkPath, nil, listener) + continue + } + } + failTimes = 0 + for _, c := range children { + + // listen l service node + dubboPath := path.Join(zkPath, c) + content, _, err := l.client.Conn.Get(dubboPath) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) + } + logger.Infof("Get children!{%s}", dubboPath) + if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { + continue + } + logger.Infof("listen dubbo service key{%s}", dubboPath) + go func(zkPath string) { + if l.ListenServiceNodeEvent(dubboPath) { + listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) + } + logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + }(dubboPath) + + //liten sub path recursive + go func(zkPath string, listener remoting.DataListener) { + l.listenDirEvent(zkPath, listener) + logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) + }(dubboPath, listener) + } + select { + case zkEvent = <-childEventCh: + logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) + if zkEvent.Type != zk.EventNodeChildrenChanged { + continue + } + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + return + } + } +} + +// +//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) { +// l.wg.EventTypeAdd(1) +// defer l.wg.Done() +// +// var ( +// failTimes int +// event chan struct{} +// zkEvent zk.Event +// ) +// event = make(chan struct{}, 4) +// defer close(event) +// for { +// // get current children for a zkPath +// content,_, eventCh, err := l.client.Conn.GetW(zkPath) +// if err != nil { +// failTimes++ +// if MaxFailTimes <= failTimes { +// failTimes = MaxFailTimes +// } +// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err) +// // clear the event channel +// CLEAR: +// for { +// select { +// case <-event: +// default: +// break CLEAR +// } +// } +// l.client.RegisterEvent(zkPath, &event) +// select { +// case <-time.After(timeSecondDuration(failTimes * ConnDelay)): +// l.client.UnregisterEvent(zkPath, &event) +// continue +// case <-l.client.Done(): +// l.client.UnregisterEvent(zkPath, &event) +// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) +// return +// case <-event: +// logger.Infof("get zk.EventNodeDataChange notify event") +// l.client.UnregisterEvent(zkPath, &event) +// l.handleZkNodeEvent(zkPath, nil, listener) +// continue +// } +// } +// failTimes = 0 +// +// select { +// case zkEvent = <-eventCh: +// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", +// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) +// +// l.handleZkNodeEvent(zkEvent.Path, children, listener) +// case <-l.client.Done(): +// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) +// return +// } +// } +//} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + +// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener +// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent +// | +// --------> ListenServiceNodeEvent +func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { + var ( + err error + dubboPath string + children []string + ) + + l.pathMapLock.Lock() + _, ok := l.pathMap[zkPath] + l.pathMapLock.Unlock() + if ok { + logger.Warnf("@zkPath %s has already been listened.", zkPath) + return + } + + l.pathMapLock.Lock() + l.pathMap[zkPath] = struct{}{} + l.pathMapLock.Unlock() + + logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) + children, err = l.client.GetChildren(zkPath) + if err != nil { + children = nil + logger.Warnf("fail to get children of zk path{%s}", zkPath) + } + + for _, c := range children { + + // listen l service node + dubboPath = path.Join(zkPath, c) + content, _, err := l.client.Conn.Get(dubboPath) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) + } + if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { + continue + } + logger.Infof("listen dubbo service key{%s}", dubboPath) + go func(zkPath string) { + if l.ListenServiceNodeEvent(dubboPath) { + listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) + } + logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + }(dubboPath) + } + + logger.Infof("listen dubbo path{%s}", zkPath) + go func(zkPath string, listener remoting.DataListener) { + l.listenDirEvent(zkPath, listener) + logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) + }(zkPath, listener) +} + +func (l *ZkEventListener) valid() bool { + return l.client.ZkConnValid() +} + +func (l *ZkEventListener) Close() { + l.wg.Wait() +} diff --git a/remoting/etcdv3/listener_test.go b/remoting/etcdv3/listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dde1a00c55e3b872e7754e79936911d9c922e3b0 --- /dev/null +++ b/remoting/etcdv3/listener_test.go @@ -0,0 +1 @@ +package etcdv3 diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index d1d5f9900dbd67bbd061b411eaf2e40ffa0e7561..b94483835c8bcf913de9e40ec4a6ffba3334c103 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -298,29 +298,26 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return } - z.Lock() - for { - a, ok := z.eventRegistry[zkPath] - if !ok { - break - } - for i, e := range a { - if e == event { - arr := a - a = append(arr[:i], arr[i+1:]...) - logger.Debugf("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event) - } - } - logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d", - z.name, zkPath, event, len(a)) - if len(a) == 0 { - delete(z.eventRegistry, zkPath) - } else { - z.eventRegistry[zkPath] = a + defer z.Unlock() + infoList, ok := z.eventRegistry[zkPath] + if !ok { + return + } + for i, e := range infoList { + if e == event { + arr := infoList + infoList = append(arr[:i], arr[i+1:]...) + logger.Debugf("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event) } } - z.Unlock() + logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d", + z.name, zkPath, event, len(infoList)) + if len(infoList) == 0 { + delete(z.eventRegistry, zkPath) + } else { + z.eventRegistry[zkPath] = infoList + } } func (z *ZookeeperClient) Done() <-chan struct{} { diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index 4fd800f87732288527d9387580fe70d0a9cae9d2..4eace5e9d540ac86ca01683963b19ae547c7c732 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -19,9 +19,9 @@ package zookeeper import ( "sync" - "time" ) import ( + "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) @@ -70,7 +70,7 @@ LOOP: case <-r.GetDone(): logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP - case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 防止疯狂重连zk + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 防止疯狂重连zk } err = ValidateZookeeperClient(r, WithZkName(zkName)) logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}", diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 5b9e0a8f824598fd5030bd76eec04adf3e639ed9..44fe7eabadde883db852b880eb997444f3f48050 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -24,6 +24,7 @@ import ( ) import ( + "github.com/dubbogo/getty" perrors "github.com/pkg/errors" "github.com/samuel/go-zookeeper/zk" ) @@ -188,7 +189,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi } l.client.RegisterEvent(zkPath, &event) select { - case <-time.After(timeSecondDuration(failTimes * ConnDelay)): + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): l.client.UnregisterEvent(zkPath, &event) continue case <-l.client.Done():