diff --git a/README.md b/README.md index 1e978734879953004472c57749d7c450a55ff09c..245b693fb3dac5c7d6058a2d57dcc33c1d41e309 100644 --- a/README.md +++ b/README.md @@ -54,10 +54,16 @@ Todo List: You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap). + + ## Quick Start The subdirectory examples shows how to use dubbo-go. Please read the [examples/README.md](https://github.com/apache/dubbo-go/blob/develop/examples/README.md) carefully to learn how to dispose the configuration and compile the program. +## Contributing + +If you are willing to do some code contributions and document contributions to [Apache/dubbo-go](https://github.com/apache/dubbo-go), please visit [contribution intro](https://github.com/apache/dubbo-go/blob/master/cg.md). + ## Benchmark Benchmark project please refer to [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark) diff --git a/README_CN.md b/README_CN.md index eece857dfb391422baf64d61fd874c273cb2f9fe..a5502d1379ac4e3ba977ac1727c1c30f9ce488b2 100644 --- a/README_CN.md +++ b/README_CN.md @@ -58,6 +58,10 @@ Apache License, Version 2.0 杩欎釜瀛愮洰褰曚笅鐨勪緥瀛愬睍绀轰簡濡備綍浣跨敤 dubbo-go 銆傝浠旂粏闃呰 [examples/README.md](https://github.com/apache/dubbo-go/blob/develop/examples/README.md) 瀛︿範濡備綍澶勭悊閰嶇疆骞剁紪璇戠▼搴忋€� +## 濡備綍璐$尞 + +濡傛灉鎮ㄦ効鎰忕粰 [Apache/dubbo-go](https://github.com/apache/dubbo-go) 璐$尞浠g爜鎴栬€呮枃妗o紝鎴戜滑閮界儹鐑堟杩庛€傚叿浣撹鍙傝€� [contribution intro](https://github.com/apache/dubbo-go/blob/master/cg.md)銆� + ## 鎬ц兘娴嬭瘯 ## 鎬ц兘娴嬭瘯椤圭洰鏄� [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark) diff --git a/cluster/configurator/override.go b/cluster/configurator/override.go index 7328cf7831aba63e9177ea2acf0ccef1b2ab670a..797665e51388fbe3e8c8c46024be4a1f08a67055 100644 --- a/cluster/configurator/override.go +++ b/cluster/configurator/override.go @@ -11,7 +11,7 @@ import ( ) func init() { - extension.SetConfigurator("override", newConfigurator) + extension.SetDefaultConfigurator(newConfigurator) } func newConfigurator(url *common.URL) cluster.Configurator { return &overrideConfigurator{configuratorUrl: url} diff --git a/common/constant/default.go b/common/constant/default.go index 503ccb23b37f8d9c778f7e5e0d1d8411b88e3a09..717b93163e347d345651319440a4b0504d31c980 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -46,8 +46,9 @@ const ( ) const ( - ANY_VALUE = "*" - ANYHOST_VALUE = "0.0.0.0" + ANY_VALUE = "*" + ANYHOST_VALUE = "0.0.0.0" + REMOVE_VALUE_PREFIX = "-" ) const ( diff --git a/common/extension/configurator.go b/common/extension/configurator.go index 4a2b4f8ae8eeaaa5540e086f5efe0b3d9d60ec50..99cc8bdb985ac25e9f2f743342ab11064c7de5ca 100644 --- a/common/extension/configurator.go +++ b/common/extension/configurator.go @@ -37,3 +37,14 @@ func GetConfigurator(name string, url *common.URL) cluster.Configurator { return configurator[name](url) } +func SetDefaultConfigurator(v func(url *common.URL) cluster.Configurator) { + configurator["default"] = v +} + +func GetDefaultConfigurator(url *common.URL) cluster.Configurator { + if configurator["default"] == nil { + panic("config center for default is not existing, make sure you have import the package.") + } + return configurator["default"](url) + +} diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index ec4371da4768298fe0928ba6ef88c2be7060832e..4232c9d90bee5a48ed988fa5449c3acbaf64f100 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -85,22 +85,23 @@ func (s *TestService1) Version() string { func TestServiceMap_Register(t *testing.T) { // lowercase s0 := &testService{} - methods, err := ServiceMap.Register("testporotocol", s0) + // methods, err := ServiceMap.Register("testporotocol", s0) + _, err := ServiceMap.Register("testporotocol", s0) assert.EqualError(t, err, "type testService is not exported") // succ s := &TestService{} - methods, err = ServiceMap.Register("testporotocol", s) + methods, err := ServiceMap.Register("testporotocol", s) assert.NoError(t, err) assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods) // repeat - methods, err = ServiceMap.Register("testporotocol", s) + _, err = ServiceMap.Register("testporotocol", s) assert.EqualError(t, err, "service already defined: com.test.Path") // no method s1 := &TestService1{} - methods, err = ServiceMap.Register("testporotocol", s1) + _, err = ServiceMap.Register("testporotocol", s1) assert.EqualError(t, err, "type com.test.Path1 has no exported methods of suitable type") ServiceMap = &serviceMap{ diff --git a/common/url.go b/common/url.go index f40feda3ee53f87c8899b38bcd2ed0c08586fc53..64c4274e6adc74228f7e856e00e40bbe05dd1d2d 100644 --- a/common/url.go +++ b/common/url.go @@ -32,6 +32,7 @@ import ( import ( "github.com/dubbogo/gost/container" + "github.com/jinzhu/copier" perrors "github.com/pkg/errors" ) @@ -220,9 +221,26 @@ func (c URL) URLEqual(url URL) bool { if cKey != urlKey { return false } + if url.GetParam(constant.ENABLED_KEY, "true") != "true" && url.GetParam(constant.ENABLED_KEY, "") != constant.ANY_VALUE { + return false + } + //TODO :may need add interface key any value condition + if !isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) { + return false + } return true } - +func isMatchCategory(category1 string, category2 string) bool { + if len(category2) == 0 { + return category1 == constant.DEFAULT_CATEGORY + } else if strings.Contains(category2, constant.ANY_VALUE) { + return true + } else if strings.Contains(category2, constant.REMOVE_VALUE_PREFIX) { + return !strings.Contains(category2, constant.REMOVE_VALUE_PREFIX+category1) + } else { + strings.Contains(category2, category1) + } +} func (c URL) String() string { buildString := fmt.Sprintf( "%s://%s:%s@%s:%s%s?", @@ -460,3 +478,8 @@ func MergeUrl(serviceUrl URL, referenceUrl *URL) URL { return mergedUrl } +func (c *URL) Clone() *URL { + newUrl := &URL{} + copier.Copy(newUrl, c) + return newUrl +} diff --git a/common/url_test.go b/common/url_test.go index 68ed5b488c0828216008f24edb12e37859738c63..0f23c5a1d0f6d090368c1abccd9ab4093f1d5d25 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -242,3 +242,14 @@ func TestURL_SetParams(t *testing.T) { assert.Equal(t, "3", u1.Params.Get("key")) assert.Equal(t, "2.6.0", u1.Params.Get("version")) } + +func TestClone(t *testing.T) { + u1, err := NewURL(context.TODO(), "dubbo://:@127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&configVersion=1.0") + assert.NoError(t, err) + u2 := u1.Clone() + assert.Equal(t, u2.Protocol, "dubbo") + assert.Equal(t, "1.0", u2.Params.Get("configVersion")) + u2.Protocol = "provider" + assert.Equal(t, u1.Protocol, "dubbo") + assert.Equal(t, u2.Protocol, "provider") +} diff --git a/config/service_config.go b/config/service_config.go index 1b78c2ef3d6074a82576051f5cca83db42eef04c..66386aec4e841f06478d3ee08d87c2c5f03487a8 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -58,7 +58,6 @@ type ServiceConfig struct { unexported *atomic.Bool exported *atomic.Bool rpcService common.RPCService - exporters []protocol.Exporter cacheProtocol protocol.Protocol cacheMutex sync.Mutex } @@ -127,7 +126,6 @@ func (srvconfig *ServiceConfig) Export() error { if exporter == nil { panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, url))) } - srvconfig.exporters = append(srvconfig.exporters, exporter) } } else { invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*url) @@ -135,7 +133,6 @@ func (srvconfig *ServiceConfig) Export() error { if exporter == nil { panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", url))) } - srvconfig.exporters = append(srvconfig.exporters, exporter) } } diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 6b526b0c67df2d693214295c94dfeb35200ffec4..ef0761efcd0d2cee47425dd5e1099200d3be6a70 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -69,7 +69,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu err = c.client.Create(c.rootPath) c.listener.ListenServiceEvent(c.rootPath, c.cacheListener) - return c, nil + return c, err } @@ -95,7 +95,7 @@ func newMockZookeeperDynamicConfiguration(url *common.URL, opts ...zookeeper.Opt err = c.client.Create(c.rootPath) go c.listener.ListenServiceEvent(c.rootPath, c.cacheListener) - return tc, c, nil + return tc, c, err } diff --git a/examples/dubbo/go-server/app/user.go b/examples/dubbo/go-server/app/user.go index fcd9ea7b8677add705127b817799bcb4beb6dabb..3c261dc029022fe8a3a80a4007e5aa132643eb7c 100644 --- a/examples/dubbo/go-server/app/user.go +++ b/examples/dubbo/go-server/app/user.go @@ -26,6 +26,7 @@ import ( import ( "github.com/dubbogo/hessian2" + "github.com/dubbogo/hessian2/java_exception" perrors "github.com/pkg/errors" ) @@ -172,7 +173,7 @@ func (u *UserProvider) GetUser3() error { } func (u *UserProvider) GetErr(ctx context.Context, req []interface{}, rsp *User) error { - return hessian.NewThrowable("exception") + return java_exception.NewThrowable("exception") } func (u *UserProvider) GetUsers(req []interface{}) ([]interface{}, error) { diff --git a/examples/dubbo/with-configcenter-go-server/app/user.go b/examples/dubbo/with-configcenter-go-server/app/user.go index e4400cc270ad46c84085b52e8879bbd49969bcb0..b84e7229cc62ce5b3c8e8b0a3927c55b2aff9df8 100644 --- a/examples/dubbo/with-configcenter-go-server/app/user.go +++ b/examples/dubbo/with-configcenter-go-server/app/user.go @@ -31,6 +31,7 @@ import ( import ( "github.com/apache/dubbo-go/config" hessian "github.com/dubbogo/hessian2" + "github.com/dubbogo/hessian2/java_exception" ) type Gender hessian.JavaEnum @@ -146,7 +147,7 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User } func (u *UserProvider) GetErr(ctx context.Context, req []interface{}, rsp *User) error { - return hessian.NewThrowable("exception") + return java_exception.NewThrowable("exception") } func (u *UserProvider) GetUser0(id string, name string) (User, error) { diff --git a/go.mod b/go.mod index ef4c7b2ca8e17c4013f0916964597b43c9ae4615..e1ab3c34bfb26e8fa8a81ee0e95d03a969a84be8 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,14 @@ module github.com/apache/dubbo-go require ( github.com/dubbogo/getty v1.0.7 - github.com/dubbogo/gost v1.0.0 - github.com/dubbogo/hessian2 v1.0.2 + github.com/dubbogo/gost v1.1.1 + github.com/dubbogo/hessian2 v1.2.0 + github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8 github.com/magiconair/properties v1.8.1 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 + github.com/ulule/deepcopier v0.0.0-20171107155558-ca99b135e50f go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 3af6b3eb0d163f358e622769edbe6b856823afac..745c68f1ecc255fd4c016e653581e7434e05bdfa 100644 --- a/go.sum +++ b/go.sum @@ -3,14 +3,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/gost v1.0.0 h1:obKvpJYdrIY2BidHYwYoj2E50OtwCDqVVVTcH2nnhAY= -github.com/dubbogo/gost v1.0.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= -github.com/dubbogo/hessian2 v1.0.2 h1:Ka9Z32ZszGAdCpgrGuZQmwkT0qe1pd3o9r7ERCDnSlQ= -github.com/dubbogo/hessian2 v1.0.2/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= +github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= +github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw= +github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8 h1:mGIXW/lubQ4B+3bXTLxcTMTjUNDqoF6T/HUW9LbFx9s= +github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8/go.mod h1:yL958EeXv8Ylng6IfnvG4oflryUi3vgA3xPs9hmII1s= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -22,6 +24,8 @@ github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/ulule/deepcopier v0.0.0-20171107155558-ca99b135e50f h1:QatZ4lsJBY3x1+Imst9g95+vUl7m52dqM9Pi4aSMW8w= +github.com/ulule/deepcopier v0.0.0-20171107155558-ca99b135e50f/go.mod h1:BNLmYJ8oMJPIPpNx5968jCyUhwEU1XT3YsuOqtbo5qo= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 56f95090c2658bc2760ca72e4fd662f2b99d95e0..8ba46e7b0b321095652fb7c6d0c1c7403e1fea1e 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -217,7 +217,6 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") - p.Service.Target = svcUrl.GetParam(constant.INTERFACE_KEY, "") p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Method = method diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index f4a5f4a8474b30b13bf7598bc634ac722955d91b..a39b0aef743d18c24f41b41655abfada996503e8 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -62,13 +62,13 @@ func TestClient_Call(t *testing.T) { } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - user := &User{} + // user := &User{} //err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) //assert.NoError(t, err) //assert.NotEqual(t, "", user.Id) //assert.NotEqual(t, "", user.Name) - user = &User{} + user := &User{} err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go index f585666b3769d1384681039d25cea98a36a589fc..4f5229d67242e582a128caa40c205482e813f08e 100644 --- a/protocol/dubbo/codec_test.go +++ b/protocol/dubbo/codec_test.go @@ -49,7 +49,6 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { // request pkg.Header.Type = hessian.PackageRequest pkg.Service.Interface = "Service" - pkg.Service.Target = "Service" pkg.Service.Version = "2.6" pkg.Service.Method = "Method" pkg.Service.Timeout = time.Second @@ -64,10 +63,10 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) assert.Equal(t, int64(10086), pkgres.Header.ID) assert.Equal(t, "2.5.4", pkgres.Body.([]interface{})[0]) - assert.Equal(t, "Service", pkgres.Body.([]interface{})[1]) + assert.Equal(t, "", pkgres.Body.([]interface{})[1]) assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) - assert.Equal(t, map[interface{}]interface{}{"interface": "Service", "path": "", "timeout": "1000"}, pkgres.Body.([]interface{})[6]) + assert.Equal(t, map[interface{}]interface{}{"interface": "Service", "path": "", "group": "", "timeout": "1000"}, pkgres.Body.([]interface{})[6]) } diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index 8d3ac5dabb69acbfc3ca9846ebd1575a53b32eb8..ade74246121b5f275c8dbeaa5923228dbab2804f 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -42,7 +42,7 @@ func TestJsonClientCodec_Write(t *testing.T) { assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n", string(data)) cd.Args = 1 - data, err = codec.Write(cd) + _, err = codec.Write(cd) assert.EqualError(t, err, "unsupported param type: int") } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index d9bc7921b8ac8686d51350f59b0eb8aed21e7396..cd479ed7d64572e5823d783e6812757a4d6129d6 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -81,39 +81,17 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O } //subscibe from registry -func (dir *registryDirectory) Subscribe(url common.URL) { - for { - if !dir.registry.IsAvailable() { - logger.Warnf("event listener game over.") - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - return - } - - listener, err := dir.registry.Subscribe(url) - if err != nil { - if !dir.registry.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } - - for { - if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - go dir.update(serviceEvent) - } +func (dir *registryDirectory) Subscribe(url *common.URL) { + notifyListener := ¬ifyListener{dir} + dir.registry.Subscribe(url, notifyListener) +} - } +type notifyListener struct { + dir *registryDirectory +} - } +func (nl *notifyListener) Notify(event *registry.ServiceEvent) { + go nl.dir.update(event) } //subscribe service from registry , and update the cacheServices diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index ff33b5fe6e063257c12035f3262e0daae874363e..58d17940aef125307cd2738e77bdf74739b5c45a 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -18,6 +18,7 @@ package protocol import ( + "github.com/apache/dubbo-go/cluster" "sync" ) @@ -71,6 +72,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "") registryUrl.Protocol = protocol } + var reg registry.Registry if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { @@ -90,7 +92,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } - go directory.Subscribe(*serviceUrl) + go directory.Subscribe(serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) @@ -101,9 +103,10 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { - registryUrl := proto.getRegistryUrl(invoker) - providerUrl := proto.getProviderUrl(invoker) + registryUrl := getRegistryUrl(invoker) + providerUrl := getProviderUrl(invoker) + overriderUrl := getSubscribedOverrideUrl(&providerUrl) var reg registry.Registry if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { @@ -131,9 +134,74 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte logger.Infof("The exporter has not been cached, and will return a new exporter!") } + // Deprecated! subscribe to override rules in 2.6.x or before. + overrideSubscribeListener := &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} + reg.Subscribe(overriderUrl, overrideSubscribeListener) return cachedExporter.(protocol.Exporter) } +func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) { + key := getProviderUrl(invoker).Key() + if oldExporter, loaded := proto.bounds.Load(key); loaded { + wrappedNewInvoker := newWrappedInvoker(invoker, *newUrl) + //TODO:MAY not safe + oldExporter.(protocol.Exporter).Unexport() + proto.bounds.Delete(key) + + proto.Export(wrappedNewInvoker) + //TODO: unregister & unsubscribe + + } +} +func (proto *registryProtocol) overrideWithConfig(providerUrl *common.URL, listener *overrideSubscribeListener) { + +} + +type overrideSubscribeListener struct { + url *common.URL + originInvoker protocol.Invoker + protocol *registryProtocol + configurator cluster.Configurator +} + +func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { + if isMatched(&(event.Service), nl.url) { + nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) + nl.doOverrideIfNecessary() + } +} +func (nl *overrideSubscribeListener) doOverrideIfNecessary() { + providerUrl := getProviderUrl(nl.originInvoker) + key := providerUrl.Key() + if exporter, ok := nl.protocol.bounds.Load(key); ok { + currentUrl := exporter.(protocol.Exporter).GetInvoker().GetUrl() + nl.configurator.Configure(&providerUrl) + if currentUrl.String() == providerUrl.String() { + newRegUrl := nl.originInvoker.GetUrl() + setProviderUrl(&newRegUrl, &providerUrl) + nl.protocol.reExport(nl.originInvoker, &newRegUrl) + } + } +} + +func isMatched(url *common.URL, subscribedUrl *common.URL) bool { + // Compatible with the 2.6.x + if len(url.GetParam(constant.CATEGORY_KEY, "")) == 0 && url.Protocol == constant.OVERRIDE_PROTOCOL { + url.AddParam(constant.CATEGORY_KEY, constant.CONFIGURATORS_CATEGORY) + } + if subscribedUrl.URLEqual(*url) { + return true + } + return false + +} +func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { + newUrl := providerUrl.Clone() + newUrl.Protocol = constant.PROVIDER_PROTOCOL + newUrl.Params.Add(constant.CATEGORY_KEY, constant.CONFIGURATORS_CATEGORY) + newUrl.Params.Add(constant.CHECK_KEY, "false") + return newUrl +} func (proto *registryProtocol) Destroy() { for _, ivk := range proto.invokers { @@ -158,7 +226,7 @@ func (proto *registryProtocol) Destroy() { }) } -func (*registryProtocol) getRegistryUrl(invoker protocol.Invoker) common.URL { +func getRegistryUrl(invoker protocol.Invoker) common.URL { //here add * for return a new url url := invoker.GetUrl() //if the protocol == registry ,set protocol the registry value in url.params @@ -169,10 +237,13 @@ func (*registryProtocol) getRegistryUrl(invoker protocol.Invoker) common.URL { return url } -func (*registryProtocol) getProviderUrl(invoker protocol.Invoker) common.URL { +func getProviderUrl(invoker protocol.Invoker) common.URL { url := invoker.GetUrl() return *url.SubURL } +func setProviderUrl(regURL *common.URL, providerURL *common.URL) { + regURL.SubURL = providerURL +} func GetProtocol() protocol.Protocol { if regProtocol != nil { diff --git a/registry/registry.go b/registry/registry.go index b41ac534ba7daa588bd47d3327383047358033bc..c7279a29e1f423ca200aa2bf9390c127efcf10cb 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -28,10 +28,21 @@ type Registry interface { //And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring. Register(url common.URL) error - //used for service consumer ,start subscribe service event from registry - Subscribe(common.URL) (Listener, error) + //When creating new registry extension,pls select one of the following modes. + //Will remove in dubbogo version v1.1.0 + //mode1 : return Listener with Next function which can return subscribe service event from registry + //Deprecated! + //subscribe(common.URL) (Listener, error) + + //Will relace mode1 in dubbogo version v1.1.0 + //mode2 : callback mode, subscribe with notify(notify listener). + Subscribe(*common.URL, NotifyListener) +} +type NotifyListener interface { + Notify(*ServiceEvent) } +//Deprecated! type Listener interface { Next() (*ServiceEvent, error) Close() diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 67a8b3b67fc2bcd2c366e985ceba315f5900a1df..0f4f57917bb53c61aa6ad8c700d11e2d9582b151 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -45,7 +45,8 @@ import ( ) const ( - RegistryZkClient = "zk registry" + RegistryZkClient = "zk registry" + RegistryConnDelay = 3 ) var ( @@ -211,7 +212,6 @@ func (r *zkRegistry) Register(conf common.URL) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) switch role { case common.CONSUMER: - ok = false r.cltLock.Lock() _, ok = r.services[conf.Key()] r.cltLock.Unlock() @@ -232,7 +232,6 @@ func (r *zkRegistry) Register(conf common.URL) error { case common.PROVIDER: // 妫€楠屾湇鍔℃槸鍚﹀凡缁忔敞鍐岃繃 - ok = false r.cltLock.Lock() // 娉ㄦ剰姝ゅ涓巆onsumerZookeeperRegistry鐨勫樊寮傦紝consumer鐢ㄧ殑鏄痗onf.Path锛� // 鍥犱负consumer瑕佹彁渚泈atch鍔熻兘缁檚elector浣跨敤, provider鍏佽娉ㄥ唽鍚屼竴涓猻ervice鐨勫涓猤roup or version @@ -390,11 +389,46 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { return nil } -func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) { +func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } -func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) { +//subscibe from registry +func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { + for { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + return + } + + listener, err := r.subscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return + } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + continue + } + + for { + if serviceEvent, err := listener.Next(); err != nil { + logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) + listener.Close() + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + return + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) + } + + } + + } +} +func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) { var ( zkListener *RegistryConfigurationListener ) @@ -419,7 +453,7 @@ func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListene } //娉ㄥ唽鍒癲ataconfig鐨刬nterested - r.dataListener.AddInterestedURL(&conf) + r.dataListener.AddInterestedURL(conf) go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), conf.Service()), r.dataListener) diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 168246e1579c26d515bca836a3ad1cf66b26bfcd..5d56d4be03589b69fbc2ca258eacb5aa76353666 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -38,9 +38,9 @@ func Test_Register(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, err := newMockZkRegistry(®url) + ts, reg, _ := newMockZkRegistry(®url) defer ts.Stop() - err = reg.Register(url) + err := reg.Register(url) children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*.serviceid%3Dsoa.mock%26.*provider", children) assert.NoError(t, err) @@ -49,10 +49,10 @@ func Test_Register(t *testing.T) { func Test_Subscribe(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - ts, reg, err := newMockZkRegistry(®url) + ts, reg, _ := newMockZkRegistry(®url) //provider register - err = reg.Register(url) + err := reg.Register(url) assert.NoError(t, err) if err != nil { @@ -61,12 +61,12 @@ func Test_Subscribe(t *testing.T) { //consumer register regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) - _, reg2, err := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) - err = reg2.Register(url) - listener, err := reg2.Subscribe(url) + reg2.Register(url) + listener, _ := reg2.subscribe(url) - serviceEvent, err := listener.Next() + serviceEvent, _ := listener.Next() assert.NoError(t, err) if err != nil { return @@ -85,7 +85,7 @@ func Test_ConsumerDestory(t *testing.T) { assert.NoError(t, err) err = reg.Register(url) assert.NoError(t, err) - _, err = reg.Subscribe(url) + _, err = reg.subscribe(url) assert.NoError(t, err) //listener.Close() @@ -103,7 +103,7 @@ func Test_ProviderDestory(t *testing.T) { defer ts.Stop() assert.NoError(t, err) - err = reg.Register(url) + reg.Register(url) //listener.Close() time.Sleep(1e9) diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go index 10cc6dcdd62a5ac74cad21dc3d81a9a9a75696d1..c2fedb911b7560a13c5b98d865cb97575a7719df 100644 --- a/remoting/zookeeper/listener_test.go +++ b/remoting/zookeeper/listener_test.go @@ -18,7 +18,6 @@ package zookeeper import ( - "fmt" "sync" "testing" "time" @@ -28,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" ) import ( + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/remoting" ) @@ -113,10 +113,11 @@ type mockDataListener struct { } func (m *mockDataListener) DataChange(eventType remoting.Event) bool { - fmt.Println(eventType) + logger.Info(eventType) m.eventList = append(m.eventList, eventType) if eventType.Content == m.changedData { m.wait.Done() + m.client.Close() } return true }