Skip to content
Snippets Groups Projects
Commit 549f4ae8 authored by scott.wang's avatar scott.wang
Browse files

merge && add etcdv3 remote pkg

parents 7a34fb21 7cf9e2c1
No related branches found
No related tags found
No related merge requests found
Showing
with 1208 additions and 52 deletions
......@@ -85,22 +85,23 @@ func (s *TestService1) Version() string {
func TestServiceMap_Register(t *testing.T) {
// lowercase
s0 := &testService{}
methods, err := ServiceMap.Register("testporotocol", s0)
// methods, err := ServiceMap.Register("testporotocol", s0)
_, err := ServiceMap.Register("testporotocol", s0)
assert.EqualError(t, err, "type testService is not exported")
// succ
s := &TestService{}
methods, err = ServiceMap.Register("testporotocol", s)
methods, err := ServiceMap.Register("testporotocol", s)
assert.NoError(t, err)
assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods)
// repeat
methods, err = ServiceMap.Register("testporotocol", s)
_, err = ServiceMap.Register("testporotocol", s)
assert.EqualError(t, err, "service already defined: com.test.Path")
// no method
s1 := &TestService1{}
methods, err = ServiceMap.Register("testporotocol", s1)
_, err = ServiceMap.Register("testporotocol", s1)
assert.EqualError(t, err, "type com.test.Path1 has no exported methods of suitable type")
ServiceMap = &serviceMap{
......
......@@ -69,7 +69,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
err = c.client.Create(c.rootPath)
c.listener.ListenServiceEvent(c.rootPath, c.cacheListener)
return c, nil
return c, err
}
......@@ -95,7 +95,7 @@ func newMockZookeeperDynamicConfiguration(url *common.URL, opts ...zookeeper.Opt
err = c.client.Create(c.rootPath)
go c.listener.ListenServiceEvent(c.rootPath, c.cacheListener)
return tc, c, nil
return tc, c, err
}
......
module github.com/apache/dubbo-go
require (
github.com/dubbogo/getty v1.0.7
github.com/dubbogo/gost v1.0.0
github.com/dubbogo/hessian2 v1.1.2
github.com/AlexStocks/goext v0.3.2
github.com/AlexStocks/log4go v1.0.2 // indirect
github.com/coreos/etcd v3.3.13+incompatible // indirect
github.com/dubbogo/getty v1.2.0
github.com/dubbogo/gost v1.1.1
github.com/dubbogo/hessian2 v1.2.0
github.com/gogo/protobuf v1.2.1 // indirect
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7
github.com/magiconair/properties v1.8.1
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect
github.com/mattn/go-isatty v0.0.8 // indirect
github.com/pkg/errors v0.8.1
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/stretchr/testify v1.3.0
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.0 // indirect
gopkg.in/yaml.v2 v2.2.2
)
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0=
github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkglDGSl3Hc=
github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk=
github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU=
github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo=
github.com/dubbogo/gost v1.0.0 h1:obKvpJYdrIY2BidHYwYoj2E50OtwCDqVVVTcH2nnhAY=
github.com/dubbogo/gost v1.0.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/hessian2 v1.1.2 h1:SRkPzIwVv2D+ZUOCE2XuI5kANoL01ShhAheLcc3usJE=
github.com/dubbogo/hessian2 v1.1.2/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM=
github.com/dubbogo/getty v1.2.0 h1:LmP/R/yCYp5/ubUTm5Xa7QwE3EnFFSjguJH8DJruJ70=
github.com/dubbogo/getty v1.2.0/go.mod h1:p9Pyk18a+5Aa0GZ546aCzlehEfvEof0jAF0+QQcEPYQ=
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw=
github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7 h1:aQ4ahUIm1FHeZ2XsL1lbzkn9NxhkOUeiMWITOA7OOms=
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e h1:hB2xlXdHp/pmPZq0y3QnmWAArdw9PqbmotexnWx/FU8=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
......@@ -22,6 +47,8 @@ github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw=
go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
......@@ -29,13 +56,28 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw=
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
......@@ -62,13 +62,13 @@ func TestClient_Call(t *testing.T) {
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
user := &User{}
// user := &User{}
//err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user)
//assert.NoError(t, err)
//assert.NotEqual(t, "", user.Id)
//assert.NotEqual(t, "", user.Name)
user = &User{}
user := &User{}
err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
......
......@@ -42,7 +42,7 @@ func TestJsonClientCodec_Write(t *testing.T) {
assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n", string(data))
cd.Args = 1
data, err = codec.Write(cd)
_, err = codec.Write(cd)
assert.EqualError(t, err, "unsupported param type: int")
}
......
......@@ -72,6 +72,8 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return nil, errors.Annotate(err, "block connect to etcd server")
}
rawClient.ActiveConnection()
rootCtx, cancel := context.WithCancel(context.Background())
client, err := etcd.NewClient(rawClient, etcd.WithTTL(time.Second), etcd.WithContext(rootCtx))
if err != nil {
......
......@@ -211,7 +211,6 @@ func (r *zkRegistry) Register(conf common.URL) error {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role {
case common.CONSUMER:
ok = false
r.cltLock.Lock()
_, ok = r.services[conf.Key()]
r.cltLock.Unlock()
......@@ -232,7 +231,6 @@ func (r *zkRegistry) Register(conf common.URL) error {
case common.PROVIDER:
// 检验服务是否已经注册过
ok = false
r.cltLock.Lock()
// 注意此处与consumerZookeeperRegistry的差异,consumer用的是conf.Path,
// 因为consumer要提供watch功能给selector使用, provider允许注册同一个service的多个group or version
......
......@@ -38,9 +38,9 @@ func Test_Register(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := newMockZkRegistry(&regurl)
ts, reg, _ := newMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
err := reg.Register(url)
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err)
......@@ -49,10 +49,10 @@ func Test_Register(t *testing.T) {
func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := newMockZkRegistry(&regurl)
ts, reg, _ := newMockZkRegistry(&regurl)
//provider register
err = reg.Register(url)
err := reg.Register(url)
assert.NoError(t, err)
if err != nil {
......@@ -61,12 +61,12 @@ func Test_Subscribe(t *testing.T) {
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
_, reg2, err := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
_, reg2, _ := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
err = reg2.Register(url)
listener, err := reg2.Subscribe(url)
reg2.Register(url)
listener, _ := reg2.Subscribe(url)
serviceEvent, err := listener.Next()
serviceEvent, _ := listener.Next()
assert.NoError(t, err)
if err != nil {
return
......@@ -103,7 +103,7 @@ func Test_ProviderDestory(t *testing.T) {
defer ts.Stop()
assert.NoError(t, err)
err = reg.Register(url)
reg.Register(url)
//listener.Close()
time.Sleep(1e9)
......
package etcdv3
import (
"context"
"path"
"sync"
"time"
)
import (
"github.com/AlexStocks/goext/database/etcd"
"github.com/juju/errors"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
const (
ConnDelay = 3
MaxFailTimes = 15
)
var (
ErrNilETCDV3ClientConn = errors.New("etcdv3client{conn} is nil")
ErrKVPairNotFound = errors.New("k/v pair not found")
)
// clientSet for etcdv3
type clientSet struct {
lock sync.RWMutex // protect all element in
// clientSet
gxClient *gxetcd.Client
rawClient *clientv3.Client
// client controller used to change client behave
ctx context.Context // if etcd connection lose, the ctx.Done will be sent msg
cancel context.CancelFunc
// c was filled, start maintenanceStatus
startMaintenanceChan chan struct{}
c *Client
}
func newClientSet(endpoints []string, timeout time.Duration, c *Client) error {
rootCtx, cancel := context.WithCancel(context.Background())
// connect to etcd
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: timeout,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return errors.Annotate(err, "block connect to etcd server")
}
// share context
gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx))
if err != nil {
return errors.Annotate(err, "new etcd client")
}
out := &clientSet{
gxClient: gxClient,
rawClient: client,
ctx: rootCtx,
cancel: cancel,
startMaintenanceChan: make(chan struct{}),
c: c,
}
err = out.maintenanceStatus()
if err != nil {
return errors.Annotate(err, "maintenance connection status")
}
// set clientset to client
c.cs = out
return nil
}
func (c *clientSet) maintenanceStatus() error {
c.c.Wait.Add(1)
aliveResp, err := c.gxClient.KeepAlive()
if err != nil {
return errors.Annotatef(err, "etcd keep alive")
}
// start maintenance the connection status
go c.maintenanceStatusLoop(aliveResp)
return nil
}
func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAliveResponse) {
defer func() {
c.c.Wait.Done()
logger.Infof("etcd {path:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name)
}()
// get signal, will start maintenanceStatusLoop
<-c.startMaintenanceChan
for {
select {
case <-c.c.Done():
// client done
return
case <-c.ctx.Done():
// client context exit
logger.Warn("etcd clientSet context done")
return
case msg, ok := <-aliveResp:
// etcd connection lose
// NOTICE
// if clientSet.Client is nil, it will panic
if !ok {
logger.Warnf("etcd server stop at term: %#v", msg)
c.c.Lock() // hold the c.Client lock
c.c.cs.clean()
// NOTICE
// uninstall the cs from client
c.c.cs = nil
c.c.Unlock()
return
}
}
}
}
func (c *clientSet) put(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3ClientConn
}
_, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
if err != nil {
return err
}
return nil
}
func (c *clientSet) delete(k string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3ClientConn
}
_, err := c.rawClient.Delete(c.ctx, k)
if err != nil {
return err
}
return nil
}
func (c *clientSet) get(k string) (string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return "", ErrNilETCDV3ClientConn
}
resp, err := c.rawClient.Get(c.ctx, k)
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", ErrKVPairNotFound
}
return string(resp.Kvs[0].Value), nil
}
func (c *clientSet) getChildrenW(k string) ([]string, []string, clientv3.WatchChan, error) {
kList, vList, err := c.getChildren(k)
if err != nil {
return nil, nil, nil, errors.Annotatef(err, "get children %s", k)
}
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, nil, nil, ErrNilETCDV3ClientConn
}
wc := c.rawClient.Watch(c.ctx, k, clientv3.WithPrefix())
return kList, vList, wc, nil
}
func (c *clientSet) watch(k string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3ClientConn
}
_, err := c.get(k)
if err != nil {
return nil, errors.Annotatef(err, "watch pre check key %s", k)
}
return c.rawClient.Watch(c.ctx, k), nil
}
func (c *clientSet) getChildren(k string) ([]string, []string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, nil, ErrNilETCDV3ClientConn
}
resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix())
if err != nil {
return nil, nil, err
}
if len(resp.Kvs) == 0 {
return nil, nil, ErrKVPairNotFound
}
var (
kList []string
vList []string
)
for _, kv := range resp.Kvs {
kList = append(kList, string(kv.Key))
vList = append(vList, string(kv.Value))
}
return kList, vList, nil
}
func (c *clientSet) keepAliveKV(k string, v string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3ClientConn
}
lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
if err != nil {
return errors.Annotatef(err, "grant lease")
}
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
c.rawClient.Revoke(c.ctx, lease.ID)
return errors.Annotate(err, "keep alive lease")
}
err = c.put(k, v, clientv3.WithLease(lease.ID))
if err != nil {
return errors.Annotate(err, "put k/v with lease")
}
return nil
}
// because this method will be called by more than one goroutine
// this method will hold clientset lock
func (c *clientSet) clean() {
c.lock.Lock()
if c.gxClient != nil {
// close gx client, it will close raw etcdv3 client
c.gxClient.Close()
c.gxClient = nil
c.rawClient = nil
// cancel all context
c.cancel()
c.ctx = nil
c.lock.Unlock()
return
}
c.lock.Unlock()
}
type Client struct {
name string
endpoints []string
timeout time.Duration
sync.Mutex // for control clientSet && event registry
cs *clientSet
done chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]clientv3.WatchChan
}
type Options struct {
name string
endpoints []string
client *Client
}
type Option func(*Options)
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
}
}
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
}
}
func StateToString(state connectivity.State) string {
switch state {
case connectivity.Shutdown:
return "etcdv3 disconnected"
case connectivity.TransientFailure:
return "etcdv3 transient failure"
case connectivity.Idle:
return "etcdv3 connect idle"
case connectivity.Ready:
return "etcdv3 client ready"
case connectivity.Connecting:
return "etcdv3 client connecting"
default:
return state.String()
}
return "etcdv3 unknown state"
}
func ValidateClient(container clientFacade, opts ...Option) error {
var (
err error
)
options := &Options{}
for _, opt := range opts {
opt(options)
}
err = nil
lock := container.ClientLock()
url := container.GetUrl()
lock.Lock()
defer lock.Unlock()
// bootstrap all clientset
if container.Client() == nil {
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location)
}
newClient, err := newClient(options.name, []string{url.Location}, timeout)
if err != nil {
logger.Warnf("newETCDV3Client(name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}",
options.name, url.Location, timeout.String(), err)
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location)
}
container.SetClient(newClient)
}
if container.Client().cs == nil {
err = newClientSet(container.Client().endpoints, container.Client().timeout, container.Client())
if err != nil {
return errors.Annotate(err, "new client set")
}
container.Client().cs.startMaintenanceChan <- struct{}{}
}
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.PrimitiveURL)
}
func newClient(name string, endpoints []string, timeout time.Duration) (*Client, error) {
var (
err error
out *Client
)
out = &Client{
name: name,
endpoints: endpoints,
timeout: timeout,
done: make(chan struct{}),
eventRegistry: make(map[string][]clientv3.WatchChan),
}
err = newClientSet(endpoints, timeout, out)
if err != nil {
return nil, errors.Annotate(err, "new client set")
}
// start maintenanceChan
out.cs.startMaintenanceChan <- struct{}{}
return out, nil
}
func (c *Client) stop() bool {
select {
case <-c.done:
return true
default:
close(c.done)
}
return false
}
func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) {
if key == "" || wc == nil {
return
}
c.Lock()
a := c.eventRegistry[key]
a = append(a, wc)
c.eventRegistry[key] = a
go func() {
wcc := c.cs.rawClient.Watch(c.cs.ctx, key)
for msg := range wcc {
wc <- msg
}
// when wcc close, close the wc
close(wc)
}()
logger.Debugf("etcdClient{%s} register event{path:%s, ptr:%p}", c.name, key, wc)
c.Unlock()
}
func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) {
if key == "" {
return
}
c.Lock()
defer c.Unlock()
infoList, ok := c.eventRegistry[key]
if !ok {
return
}
for i, e := range infoList {
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
logger.Debugf("etcdClient{%s} unregister event{path:%s, event:%p}", c.name, key, event)
}
}
logger.Debugf("after etcdClient{%s} unregister event{path:%s, event:%p}, array length %d",
c.name, key, event, len(infoList))
if len(infoList) == 0 {
delete(c.eventRegistry, key)
} else {
c.eventRegistry[key] = infoList
}
}
func (c *Client) Done() <-chan struct{} {
return c.done
}
func (c *Client) Valid() bool {
select {
case <-c.done:
return false
default:
}
valid := true
c.Lock()
if c.cs == nil {
valid = false
}
c.Unlock()
return valid
}
func (c *Client) Close() {
if c == nil {
return
}
c.stop()
c.Wait.Wait()
c.Lock()
if c.cs != nil {
c.cs.clean()
c.cs = nil
}
c.Unlock()
logger.Warnf("etcd client{name:%s, etcd addr:%s} exit now.", c.name, c.endpoints)
}
func (c *Client) Create(k string, v string) error {
err := ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
err = c.cs.put(k, v)
}
c.Unlock()
return errors.Annotatef(err, "etcd client put key %s value %s", k, v)
}
func (c *Client) Delete(key string) error {
err := ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
err = c.cs.delete(key)
}
c.Unlock()
return errors.Annotatef(err, "etcd client delete (basePath:%s)", key)
}
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
err := ErrNilETCDV3ClientConn
completePath := path.Join(basePath, node)
c.Lock()
if c.cs != nil {
err = c.cs.keepAliveKV(completePath, "")
}
c.Unlock()
logger.Debugf("etcdClient{%s} create a tmp node:%s\n", c.name, completePath)
if err != nil {
return "", errors.Annotatef(err, "etcd client create tmp k %s", completePath)
}
return completePath, nil
}
func (c *Client) GetChildrenW(path string) ([]string, clientv3.WatchChan, error) {
var (
children []string
err error
wc clientv3.WatchChan
)
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, wc, err = c.cs.getChildrenW(path)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, nil, errors.Annotatef(err,"path{%s} has none children", path)
}
logger.Errorf("etcdv3.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, errors.Annotatef(err, "etcdv3.ChildrenW(path:%s)", path)
}
return children, wc, nil
}
func (c *Client) GetChildren(path string) ([]string, error) {
var (
err error
children []string
)
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, err = c.cs.getChildren(path)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err,"path{%s} has none children", path)
}
logger.Errorf("clientv3.Children(path{%s}) = error(%v)", path, perrors.WithStack(err))
return nil, errors.Annotatef(err, "clientv3.Children(path:%s)", path)
}
return children, nil
}
func (c *Client) ExistW(path string) (clientv3.WatchChan, error) {
var (
err = ErrNilETCDV3ClientConn
out clientv3.WatchChan
)
c.Lock()
if c.cs != nil {
_, err = c.cs.watch(path)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err, "path{%s} not exist", path)
}
return nil, errors.Annotatef(err, "clientv3.ExistW(path:%s)", path)
}
return out, nil
}
func (c *Client) GetContent(path string) ([]byte, error) {
c.Lock()
value, err := c.cs.get(path)
if err != nil {
return nil, errors.Annotatef(err, "client set get: %s", path)
}
c.Unlock()
return []byte(value), nil
}
package etcdv3
package etcdv3
import (
"sync"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
GetDone() chan struct{} //for zk client control
RestartCallBack() bool
common.Node
}
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
r.Client().Close()
clientName := r.Client().name
endpoints := r.Client().endpoints
r.SetClient(nil)
r.ClientLock().Unlock()
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 防止疯狂重连etcd
}
err = ValidateClient(r, WithName(clientName), WithEndpoints(endpoints...))
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
package etcdv3
package etcdv3
import (
"gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path"
pathlib "path"
"sync"
"time"
)
import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type EventListener struct {
client *Client
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
pathMap: make(map[string]struct{}),
}
}
func (l *EventListener) SetClient(client *Client) {
l.client = client
}
// this method will return true when spec path deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
keyEventCh, err := l.client.ExistW(path)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", path, err)
return false
}
select {
// client watch ctx stop
// server stopped
case <-l.client.cs.ctx.Done():
return false
// client stopped
case <-l.client.Done():
return false
// etcd event stream
case e := <-keyEventCh:
if e.Err() != nil{
logger.Warnf("get a etcd event {err: %s}", e.Err())
}
for _, event := range e.Events{
logger.Warnf("get a etcd Event{type:%s, path:%s,}",
event.Type.String(), event.Kv.Key )
switch event.Type {
case mvccpb.PUT:
if len(listener) > 0 {
if event.IsCreate(){
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value)})
}else{
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EvnetTypeUpdate, Content: string(event.Kv.Value)})
}
}
case mvccpb.DELETE:
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
return true
}
}
}
}
return false
}
func (l *EventListener) handleNodeEvent(path string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
newChildren, err := l.client.GetChildren(path)
if err != nil {
logger.Errorf("path{%s} child nodes changed, etcdV3.Children() = error{%v}", path, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = pathlib.Join(path, n)
logger.Infof("add zkNode{%s}", newNode)
content, _, err := l.client.Conn.Get(newNode)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
go func(node, childNode string) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", childNode)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, n)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.GetChildrenW(zkPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
select {
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
}
}
failTimes = 0
for _, c := range children {
// listen l service node
dubboPath := path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
logger.Infof("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
//liten sub path recursive
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
}
}
}
//
//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) {
// l.wg.EventTypeAdd(1)
// defer l.wg.Done()
//
// var (
// failTimes int
// event chan struct{}
// zkEvent zk.Event
// )
// event = make(chan struct{}, 4)
// defer close(event)
// for {
// // get current children for a zkPath
// content,_, eventCh, err := l.client.Conn.GetW(zkPath)
// if err != nil {
// failTimes++
// if MaxFailTimes <= failTimes {
// failTimes = MaxFailTimes
// }
// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err)
// // clear the event channel
// CLEAR:
// for {
// select {
// case <-event:
// default:
// break CLEAR
// }
// }
// l.client.RegisterEvent(zkPath, &event)
// select {
// case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
// l.client.UnregisterEvent(zkPath, &event)
// continue
// case <-l.client.Done():
// l.client.UnregisterEvent(zkPath, &event)
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// case <-event:
// logger.Infof("get zk.EventNodeDataChange notify event")
// l.client.UnregisterEvent(zkPath, &event)
// l.handleZkNodeEvent(zkPath, nil, listener)
// continue
// }
// }
// failTimes = 0
//
// select {
// case zkEvent = <-eventCh:
// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
//
// l.handleZkNodeEvent(zkEvent.Path, children, listener)
// case <-l.client.Done():
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// }
// }
//}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
func (l *ZkEventListener) Close() {
l.wg.Wait()
}
package etcdv3
......@@ -298,29 +298,26 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
z.Lock()
for {
a, ok := z.eventRegistry[zkPath]
if !ok {
break
}
for i, e := range a {
if e == event {
arr := a
a = append(arr[:i], arr[i+1:]...)
logger.Debugf("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
}
}
logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d",
z.name, zkPath, event, len(a))
if len(a) == 0 {
delete(z.eventRegistry, zkPath)
} else {
z.eventRegistry[zkPath] = a
defer z.Unlock()
infoList, ok := z.eventRegistry[zkPath]
if !ok {
return
}
for i, e := range infoList {
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
logger.Debugf("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
}
}
z.Unlock()
logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d",
z.name, zkPath, event, len(infoList))
if len(infoList) == 0 {
delete(z.eventRegistry, zkPath)
} else {
z.eventRegistry[zkPath] = infoList
}
}
func (z *ZookeeperClient) Done() <-chan struct{} {
......
......@@ -19,9 +19,9 @@ package zookeeper
import (
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
......@@ -70,7 +70,7 @@ LOOP:
case <-r.GetDone():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 防止疯狂重连zk
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 防止疯狂重连zk
}
err = ValidateZookeeperClient(r, WithZkName(zkName))
logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
......
......@@ -24,6 +24,7 @@ import (
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
......@@ -188,7 +189,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
}
l.client.RegisterEvent(zkPath, &event)
select {
case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.client.Done():
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment