diff --git a/cluster/configuration.go b/cluster/configuration.go new file mode 100644 index 0000000000000000000000000000000000000000..ebe29a145d55a4a6ebd49f04af3bd04623b023dc --- /dev/null +++ b/cluster/configuration.go @@ -0,0 +1,8 @@ +package cluster + +import "github.com/apache/dubbo-go/common" + +type Configurator interface { + GetUrl() *common.URL + Configure(url *common.URL) +} diff --git a/cluster/configurator/override.go b/cluster/configurator/override.go new file mode 100644 index 0000000000000000000000000000000000000000..7328cf7831aba63e9177ea2acf0ccef1b2ab670a --- /dev/null +++ b/cluster/configurator/override.go @@ -0,0 +1,102 @@ +package configurator + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/utils" + "github.com/dubbogo/gost/container" + "strings" +) + +func init() { + extension.SetConfigurator("override", newConfigurator) +} +func newConfigurator(url *common.URL) cluster.Configurator { + return &overrideConfigurator{configuratorUrl: url} +} + +type overrideConfigurator struct { + configuratorUrl *common.URL +} + +func (c *overrideConfigurator) GetUrl() *common.URL { + return c.configuratorUrl +} + +func (c *overrideConfigurator) Configure(url *common.URL) { + //remove configuratorUrl some param that can not be configured + if c.configuratorUrl.GetParam(constant.ENABLED_KEY, "true") == "false" || len(c.configuratorUrl.Location) == 0 { + return + } + + //branch for version 2.7.x + apiVersion := c.configuratorUrl.GetParam(constant.CONFIG_VERSION_KEY, "") + if len(apiVersion) != 0 { + currentSide := url.GetParam(constant.SIDE_KEY, "") + configuratorSide := c.configuratorUrl.GetParam(constant.SIDE_KEY, "") + if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide && c.configuratorUrl.Port == "0" { + localIP, _ := utils.GetLocalIP() + c.configureIfMatch(localIP, url) + } else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide && c.configuratorUrl.Port == url.Port { + c.configureIfMatch(url.Ip, url) + } + } else { + //branch for version 2.6.x and less + c.configureDeprecated(url) + } +} + +//translate from java, compatible rules in java +func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { + if constant.ANYHOST_VALUE == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip { + providers := c.configuratorUrl.GetParam(constant.OVERRIDE_PROVIDERS_KEY, "") + if len(providers) == 0 || strings.Index(providers, url.Location) >= 0 || strings.Index(providers, constant.ANYHOST_VALUE) >= 0 { + configApp := c.configuratorUrl.GetParam(constant.APPLICATION_KEY, c.configuratorUrl.Username) + currentApp := url.GetParam(constant.APPLICATION_KEY, url.Username) + if len(configApp) == 0 || constant.ANY_VALUE == configApp || configApp == currentApp { + conditionKeys := container.NewSet() + conditionKeys.Add(constant.CATEGORY_KEY) + conditionKeys.Add(constant.CHECK_KEY) + conditionKeys.Add(constant.ENABLED_KEY) + conditionKeys.Add(constant.GROUP_KEY) + conditionKeys.Add(constant.VERSION_KEY) + conditionKeys.Add(constant.APPLICATION_KEY) + conditionKeys.Add(constant.SIDE_KEY) + conditionKeys.Add(constant.CONFIG_VERSION_KEY) + conditionKeys.Add(constant.COMPATIBLE_CONFIG_KEY) + for k, _ := range c.configuratorUrl.Params { + value := c.configuratorUrl.Params.Get(k) + if strings.HasPrefix(k, "~") || k == constant.APPLICATION_KEY || k == constant.SIDE_KEY { + conditionKeys.Add(k) + if len(value) != 0 && value != constant.ANY_VALUE && value != c.configuratorUrl.Params.Get(strings.TrimPrefix(k, "~")) { + return + } + } + } + c.configuratorUrl.RemoveParams(conditionKeys) + url.SetParams(c.configuratorUrl.Params) + } + } + } +} + +func (c *overrideConfigurator) configureDeprecated(url *common.URL) { + // If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance. + if c.configuratorUrl.Port != "0" { + if url.Port == c.configuratorUrl.Port { + c.configureIfMatch(url.Ip, url) + } + } else { + // override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0 + // 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore; + // 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider + if url.GetParam(constant.SIDE_KEY, "") == common.DubboRole[common.CONSUMER] { + localIP, _ := utils.GetLocalIP() + c.configureIfMatch(localIP, url) + } else { + c.configureIfMatch(constant.ANYHOST_VALUE, url) + } + } +} diff --git a/cluster/configurator/override_test.go b/cluster/configurator/override_test.go new file mode 100644 index 0000000000000000000000000000000000000000..0e763469c80cf8c91072de989e5d3f48f86b7b75 --- /dev/null +++ b/cluster/configurator/override_test.go @@ -0,0 +1,55 @@ +package configurator + +import ( + "context" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_configureVerison2p6(t *testing.T) { + url, err := common.NewURL(context.Background(), "override://0.0.0.0:0/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService") + assert.NoError(t, err) + configurator := extension.GetConfigurator("override", &url) + assert.Equal(t, "override", configurator.GetUrl().Protocol) + + providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + configurator.Configure(&providerUrl) + assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY)) + +} +func Test_configureVerisonOverrideAddr(t *testing.T) { + url, err := common.NewURL(context.Background(), "override://0.0.0.0:0/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService&providerAddresses=127.0.0.2:20001|127.0.0.3:20001") + assert.NoError(t, err) + configurator := extension.GetConfigurator("override", &url) + assert.Equal(t, "override", configurator.GetUrl().Protocol) + + providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + configurator.Configure(&providerUrl) + assert.Equal(t, "failover", providerUrl.Params.Get(constant.CLUSTER_KEY)) + +} +func Test_configureVerison2p6WithIp(t *testing.T) { + url, err := common.NewURL(context.Background(), "override://127.0.0.1:20001/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService") + assert.NoError(t, err) + configurator := extension.GetConfigurator("override", &url) + assert.Equal(t, "override", configurator.GetUrl().Protocol) + + providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + configurator.Configure(&providerUrl) + assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY)) + +} + +func Test_configureVerison2p7(t *testing.T) { + url, err := common.NewURL(context.Background(), "jsonrpc://0.0.0.0:20001/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService&configVersion=1.0&side=provider") + assert.NoError(t, err) + configurator := extension.GetConfigurator("override", &url) + + providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + configurator.Configure(&providerUrl) + assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY)) + +} diff --git a/common/constant/default.go b/common/constant/default.go index 05461ca6e7360e08d716d6b78d20ad4411df99e8..503ccb23b37f8d9c778f7e5e0d1d8411b88e3a09 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -18,8 +18,12 @@ package constant const ( - DUBBO = "dubbo" + DUBBO = "dubbo" + PROVIDER_PROTOCOL = "provider" + //compatible with 2.6.x + OVERRIDE_PROTOCOL = "override" ) + const ( DEFAULT_WEIGHT = 100 // DEFAULT_WARMUP = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second @@ -42,5 +46,11 @@ const ( ) const ( - ANY_VALUE = "*" + ANY_VALUE = "*" + ANYHOST_VALUE = "0.0.0.0" +) + +const ( + CONFIGURATORS_CATEGORY = "configurators" + DEFAULT_CATEGORY = "providers" ) diff --git a/common/constant/key.go b/common/constant/key.go index 82df44c3e10b6f19d2fba2c86fb7b5086904ab41..f045bc8e95dee22462f979210327a2236699559c 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -22,13 +22,18 @@ const ( ) const ( - GROUP_KEY = "group" - VERSION_KEY = "version" - INTERFACE_KEY = "interface" - PATH_KEY = "path" - SERVICE_KEY = "service" - METHODS_KEY = "methods" - TIMEOUT_KEY = "timeout" + GROUP_KEY = "group" + VERSION_KEY = "version" + INTERFACE_KEY = "interface" + PATH_KEY = "path" + SERVICE_KEY = "service" + METHODS_KEY = "methods" + TIMEOUT_KEY = "timeout" + CATEGORY_KEY = "category" + CHECK_KEY = "check" + ENABLED_KEY = "enabled" + SIDE_KEY = "side" + OVERRIDE_PROVIDERS_KEY = "providerAddresses" ) const ( @@ -73,8 +78,10 @@ const ( ) const ( - CONFIG_NAMESPACE_KEY = "config.namespace" - CONFIG_TIMEOUT_KET = "config.timeout" + CONFIG_NAMESPACE_KEY = "config.namespace" + CONFIG_TIMEOUT_KET = "config.timeout" + CONFIG_VERSION_KEY = "configVersion" + COMPATIBLE_CONFIG_KEY = "compatible_config" ) const ( RegistryConfigPrefix = "dubbo.registries." diff --git a/common/extension/configurator.go b/common/extension/configurator.go new file mode 100644 index 0000000000000000000000000000000000000000..4a2b4f8ae8eeaaa5540e086f5efe0b3d9d60ec50 --- /dev/null +++ b/common/extension/configurator.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" +) + +var ( + configurator = make(map[string]func(url *common.URL) cluster.Configurator) +) + +func SetConfigurator(name string, v func(url *common.URL) cluster.Configurator) { + configurator[name] = v +} + +func GetConfigurator(name string, url *common.URL) cluster.Configurator { + if configurator[name] == nil { + panic("config center for " + name + " is not existing, make sure you have import the package.") + } + return configurator[name](url) + +} diff --git a/common/url.go b/common/url.go index 47b44cf57feb9cc9eadb18e597a935073918fe99..f40feda3ee53f87c8899b38bcd2ed0c08586fc53 100644 --- a/common/url.go +++ b/common/url.go @@ -22,6 +22,7 @@ import ( "context" "encoding/base64" "fmt" + "math" "net" "net/url" @@ -30,6 +31,7 @@ import ( ) import ( + "github.com/dubbogo/gost/container" perrors "github.com/pkg/errors" ) @@ -231,8 +233,8 @@ func (c URL) String() string { func (c URL) Key() string { buildString := fmt.Sprintf( - "%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s", - c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, "")) + "%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s&category_key=%s", + c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""), c.GetParam(constant.CATEGORY_KEY, "")) return buildString //return c.ServiceKey() } @@ -358,6 +360,18 @@ func (c URL) GetMethodParam(method string, key string, d string) string { } return r } +func (c *URL) RemoveParams(set *container.HashSet) { + for k, _ := range set.Items { + s := k.(string) + delete(c.Params, s) + } +} +func (c *URL) SetParams(m url.Values) { + + for k, _ := range m { + c.Params.Set(k, m.Get(k)) + } +} // ToMap transfer URL to Map func (c URL) ToMap() map[string]string { diff --git a/common/url_test.go b/common/url_test.go index 143e31cb34f2ec1da7efc910a6b4133f0f4789b5..68ed5b488c0828216008f24edb12e37859738c63 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -232,3 +232,13 @@ func TestMergeUrl(t *testing.T) { assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) } + +func TestURL_SetParams(t *testing.T) { + u1, err := NewURL(context.TODO(), "dubbo://:@127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&configVersion=1.0") + assert.NoError(t, err) + params := url.Values{} + params.Set("key", "3") + u1.SetParams(params) + assert.Equal(t, "3", u1.Params.Get("key")) + assert.Equal(t, "2.6.0", u1.Params.Get("version")) +} diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index 478a88d19404be612c1caf6be4885edc01bb414b..7a47e14bda652dbb4689bfe520bc163a2b0635dc 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -59,7 +59,7 @@ func main() { println("res: %v", res) } - time.Sleep(3e9) + time.Sleep(10e9) println("\n\n\nstart to test jsonrpc") user := &JsonRPCUser{} diff --git a/examples/jsonrpc/with-configcenter-go-client/dubbo.client.properties b/examples/jsonrpc/with-configcenter-go-client/dubbo.client.properties new file mode 100644 index 0000000000000000000000000000000000000000..c7e6e0e3ddf30a7655d8ac2823a25baf81347abd --- /dev/null +++ b/examples/jsonrpc/with-configcenter-go-client/dubbo.client.properties @@ -0,0 +1,15 @@ +dubbo.consumer.check=true +dubbo.consumer.request_timeout=5s +dubbo.consumer.connect_timeout=5s +dubbo.application.organization=ikurento.com +dubbo.application.name=BDTService +dubbo.application.module=dubbogo user-info client +dubbo.application.version=0.0.1 +dubbo.application.owner=ZX1 +dubbo.application.environment=dev +dubbo.registries.hangzhouzk.protocol=zookeeper +dubbo.registries.hangzhouzk.timeout=3s +dubbo.registries.hangzhouzk.address=127.0.0.1:2181 +dubbo.registries.shanghaizk.protocol=zookeeper +dubbo.registries.shanghaizk.timeout=3s +dubbo.registries.shanghaizk.address=127.0.0.1:2182 diff --git a/examples/jsonrpc/with-configcenter-go-server/dubbo.properties b/examples/jsonrpc/with-configcenter-go-server/dubbo.properties new file mode 100644 index 0000000000000000000000000000000000000000..7477c41eb53d6ef7d02a63c3c4ca5c4167043ec4 --- /dev/null +++ b/examples/jsonrpc/with-configcenter-go-server/dubbo.properties @@ -0,0 +1,15 @@ +dubbo.application.organization=ikurento.com +dubbo.application.name=BDTService +dubbo.application.module=dubbogo user-info server +dubbo.application.version=0.0.1 +dubbo.application.owner=ZX1 +dubbo.application.environment=dev +dubbo.registries.hangzhouzk.protocol=zookeeper +dubbo.registries.hangzhouzk.timeout=3s +dubbo.registries.hangzhouzk.address=127.0.0.1:2181 +dubbo.registries.shanghaizk.protocol=zookeeper +dubbo.registries.shanghaizk.timeout=3s +dubbo.registries.shanghaizk.address=127.0.0.1:2182 +dubbo.protocols.dubbo1.name=dubbo +dubbo.protocols.dubbo1.ip=127.0.0.1 +dubbo.protocols.dubbo1.port=20001 \ No newline at end of file diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 11687f82ee5fe0dba6f2bca38b6454ee9c666d91..d9bc7921b8ac8686d51350f59b0eb8aed21e7396 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -85,6 +85,7 @@ func (dir *registryDirectory) Subscribe(url common.URL) { for { if !dir.registry.IsAvailable() { logger.Warnf("event listener game over.") + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) return } @@ -234,3 +235,12 @@ func (dir *registryDirectory) Destroy() { dir.cacheInvokers = []protocol.Invoker{} }) } + +type referenceConfigurationListener struct { + directory *registryDirectory + url *common.URL +} + +func (l *referenceConfigurationListener) Process(event *remoting.ConfigChangeEvent) { + //l.directory.refreshInvokers(event) +} diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 6b15133319975ee6cb305482b1cfd17aacbca5b6..67a8b3b67fc2bcd2c366e985ceba315f5900a1df 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -421,7 +421,7 @@ func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListene //注册到dataconfig的interested r.dataListener.AddInterestedURL(&conf) - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", conf.Service()), r.dataListener) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), conf.Service()), r.dataListener) return zkListener, nil } diff --git a/remoting/listener.go b/remoting/listener.go index da30f6989dbaae5668ad189b4fdef945d622fd9b..866c8503bb1c112db4182e31052199d49a25c7fd 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -46,7 +46,7 @@ type EventType int const ( EventTypeAdd = iota EventTypeDel - EvnetTypeUpdate + EventTypeUpdate ) var serviceEventTypeStrings = [...]string{ diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 5b9e0a8f824598fd5030bd76eec04adf3e639ed9..78c83ba3b249aae1e3f1059d64d718d0546b0d81 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -69,7 +69,7 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remo logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath) if len(listener) > 0 { content, _, _ := l.client.Conn.Get(zkEvent.Path) - listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EvnetTypeUpdate, Content: string(content)}) + listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)}) } case zk.EventNodeCreated: