diff --git a/README.md b/README.md index 245b693fb3dac5c7d6058a2d57dcc33c1d41e309..97bf03da978637d1c1198a5d6b615a8eede3f7eb 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,6 @@ Finished List: - Transport: HTTP, TCP - Codec: JsonRPC v2, Hessian v2 - Registry: ZooKeeper -- Routing: Rule(dubbo v2.6.x) - Configure Center: Zookeeper - Cluster Strategy: Failover - Load Balance: Random, RoundRobin, LeastActive diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index ec4371da4768298fe0928ba6ef88c2be7060832e..4232c9d90bee5a48ed988fa5449c3acbaf64f100 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -85,22 +85,23 @@ func (s *TestService1) Version() string { func TestServiceMap_Register(t *testing.T) { // lowercase s0 := &testService{} - methods, err := ServiceMap.Register("testporotocol", s0) + // methods, err := ServiceMap.Register("testporotocol", s0) + _, err := ServiceMap.Register("testporotocol", s0) assert.EqualError(t, err, "type testService is not exported") // succ s := &TestService{} - methods, err = ServiceMap.Register("testporotocol", s) + methods, err := ServiceMap.Register("testporotocol", s) assert.NoError(t, err) assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods) // repeat - methods, err = ServiceMap.Register("testporotocol", s) + _, err = ServiceMap.Register("testporotocol", s) assert.EqualError(t, err, "service already defined: com.test.Path") // no method s1 := &TestService1{} - methods, err = ServiceMap.Register("testporotocol", s1) + _, err = ServiceMap.Register("testporotocol", s1) assert.EqualError(t, err, "type com.test.Path1 has no exported methods of suitable type") ServiceMap = &serviceMap{ diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 6b526b0c67df2d693214295c94dfeb35200ffec4..ef0761efcd0d2cee47425dd5e1099200d3be6a70 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -69,7 +69,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu err = c.client.Create(c.rootPath) c.listener.ListenServiceEvent(c.rootPath, c.cacheListener) - return c, nil + return c, err } @@ -95,7 +95,7 @@ func newMockZookeeperDynamicConfiguration(url *common.URL, opts ...zookeeper.Opt err = c.client.Create(c.rootPath) go c.listener.ListenServiceEvent(c.rootPath, c.cacheListener) - return tc, c, nil + return tc, c, err } diff --git a/go.mod b/go.mod index ac1f1206f380294d9fca3e7d53c4197bbb2a552c..86835ec435b77cac82f20c187b8dc24ec7f6340e 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,9 @@ module github.com/apache/dubbo-go require ( github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 - github.com/dubbogo/getty v1.0.7 - github.com/dubbogo/gost v1.0.0 - github.com/dubbogo/hessian2 v1.1.2 + github.com/dubbogo/getty v1.2.0 + github.com/dubbogo/gost v1.1.1 + github.com/dubbogo/hessian2 v1.2.0 github.com/magiconair/properties v1.8.1 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec diff --git a/go.sum b/go.sum index cb5840050dfd007c15b3493f2b973cd591bc72f7..ff8b97bcadfa94b71a9c2c720762f95df1dbefcd 100644 --- a/go.sum +++ b/go.sum @@ -3,12 +3,13 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= -github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/gost v1.0.0 h1:obKvpJYdrIY2BidHYwYoj2E50OtwCDqVVVTcH2nnhAY= -github.com/dubbogo/gost v1.0.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= -github.com/dubbogo/hessian2 v1.1.2 h1:SRkPzIwVv2D+ZUOCE2XuI5kANoL01ShhAheLcc3usJE= -github.com/dubbogo/hessian2 v1.1.2/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= +github.com/dubbogo/getty v1.2.0 h1:LmP/R/yCYp5/ubUTm5Xa7QwE3EnFFSjguJH8DJruJ70= +github.com/dubbogo/getty v1.2.0/go.mod h1:p9Pyk18a+5Aa0GZ546aCzlehEfvEof0jAF0+QQcEPYQ= +github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= +github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw= +github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index f4a5f4a8474b30b13bf7598bc634ac722955d91b..a39b0aef743d18c24f41b41655abfada996503e8 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -62,13 +62,13 @@ func TestClient_Call(t *testing.T) { } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - user := &User{} + // user := &User{} //err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) //assert.NoError(t, err) //assert.NotEqual(t, "", user.Id) //assert.NotEqual(t, "", user.Name) - user = &User{} + user := &User{} err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index 8d3ac5dabb69acbfc3ca9846ebd1575a53b32eb8..ade74246121b5f275c8dbeaa5923228dbab2804f 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -42,7 +42,7 @@ func TestJsonClientCodec_Write(t *testing.T) { assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n", string(data)) cd.Args = 1 - data, err = codec.Write(cd) + _, err = codec.Write(cd) assert.EqualError(t, err, "unsupported param type: int") } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 6b15133319975ee6cb305482b1cfd17aacbca5b6..c272c5bba615c68c452f36819edef0f2a821a5dc 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -211,7 +211,6 @@ func (r *zkRegistry) Register(conf common.URL) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) switch role { case common.CONSUMER: - ok = false r.cltLock.Lock() _, ok = r.services[conf.Key()] r.cltLock.Unlock() @@ -232,7 +231,6 @@ func (r *zkRegistry) Register(conf common.URL) error { case common.PROVIDER: // 妫€楠屾湇鍔℃槸鍚﹀凡缁忔敞鍐岃繃 - ok = false r.cltLock.Lock() // 娉ㄦ剰姝ゅ涓巆onsumerZookeeperRegistry鐨勫樊寮傦紝consumer鐢ㄧ殑鏄痗onf.Path锛� // 鍥犱负consumer瑕佹彁渚泈atch鍔熻兘缁檚elector浣跨敤, provider鍏佽娉ㄥ唽鍚屼竴涓猻ervice鐨勫涓猤roup or version diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index ba2755fa9923d9e6c11a1908594a176ace458691..22658e1adcd57d6b88bdbf3949cae08f90f1cfb5 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -38,9 +38,9 @@ func Test_Register(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, err := newMockZkRegistry(®url) + ts, reg, _ := newMockZkRegistry(®url) defer ts.Stop() - err = reg.Register(url) + err := reg.Register(url) children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) assert.NoError(t, err) @@ -49,10 +49,10 @@ func Test_Register(t *testing.T) { func Test_Subscribe(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, err := newMockZkRegistry(®url) + ts, reg, _ := newMockZkRegistry(®url) //provider register - err = reg.Register(url) + err := reg.Register(url) assert.NoError(t, err) if err != nil { @@ -61,12 +61,12 @@ func Test_Subscribe(t *testing.T) { //consumer register regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) - _, reg2, err := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) - err = reg2.Register(url) - listener, err := reg2.Subscribe(url) + reg2.Register(url) + listener, _ := reg2.Subscribe(url) - serviceEvent, err := listener.Next() + serviceEvent, _ := listener.Next() assert.NoError(t, err) if err != nil { return @@ -103,7 +103,7 @@ func Test_ProviderDestory(t *testing.T) { defer ts.Stop() assert.NoError(t, err) - err = reg.Register(url) + reg.Register(url) //listener.Close() time.Sleep(1e9) diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index d1d5f9900dbd67bbd061b411eaf2e40ffa0e7561..b94483835c8bcf913de9e40ec4a6ffba3334c103 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -298,29 +298,26 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return } - z.Lock() - for { - a, ok := z.eventRegistry[zkPath] - if !ok { - break - } - for i, e := range a { - if e == event { - arr := a - a = append(arr[:i], arr[i+1:]...) - logger.Debugf("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event) - } - } - logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d", - z.name, zkPath, event, len(a)) - if len(a) == 0 { - delete(z.eventRegistry, zkPath) - } else { - z.eventRegistry[zkPath] = a + defer z.Unlock() + infoList, ok := z.eventRegistry[zkPath] + if !ok { + return + } + for i, e := range infoList { + if e == event { + arr := infoList + infoList = append(arr[:i], arr[i+1:]...) + logger.Debugf("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event) } } - z.Unlock() + logger.Debugf("after zkClient{%s} unregister event{path:%s, event:%p}, array length %d", + z.name, zkPath, event, len(infoList)) + if len(infoList) == 0 { + delete(z.eventRegistry, zkPath) + } else { + z.eventRegistry[zkPath] = infoList + } } func (z *ZookeeperClient) Done() <-chan struct{} { diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index 4fd800f87732288527d9387580fe70d0a9cae9d2..4eace5e9d540ac86ca01683963b19ae547c7c732 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -19,9 +19,9 @@ package zookeeper import ( "sync" - "time" ) import ( + "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) @@ -70,7 +70,7 @@ LOOP: case <-r.GetDone(): logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP - case <-time.After(time.Duration(1e9 * failTimes * ConnDelay)): // 闃叉鐤媯閲嶈繛zk + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 闃叉鐤媯閲嶈繛zk } err = ValidateZookeeperClient(r, WithZkName(zkName)) logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}", diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 5b9e0a8f824598fd5030bd76eec04adf3e639ed9..44fe7eabadde883db852b880eb997444f3f48050 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -24,6 +24,7 @@ import ( ) import ( + "github.com/dubbogo/getty" perrors "github.com/pkg/errors" "github.com/samuel/go-zookeeper/zk" ) @@ -188,7 +189,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi } l.client.RegisterEvent(zkPath, &event) select { - case <-time.After(timeSecondDuration(failTimes * ConnDelay)): + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): l.client.UnregisterEvent(zkPath, &event) continue case <-l.client.Done():