Skip to content
Snippets Groups Projects
Commit a94746db authored by vito.he's avatar vito.he
Browse files

Add: define configurator

parent 34145b46
No related branches found
No related tags found
No related merge requests found
Showing with 302 additions and 17 deletions
package cluster
import "github.com/apache/dubbo-go/common"
type Configurator interface {
GetUrl() *common.URL
Configure(url *common.URL)
}
package configurator
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/dubbogo/gost/container"
"strings"
)
func init() {
extension.SetConfigurator("override", newConfigurator)
}
func newConfigurator(url *common.URL) cluster.Configurator {
return &overrideConfigurator{configuratorUrl: url}
}
type overrideConfigurator struct {
configuratorUrl *common.URL
}
func (c *overrideConfigurator) GetUrl() *common.URL {
return c.configuratorUrl
}
func (c *overrideConfigurator) Configure(url *common.URL) {
//remove configuratorUrl some param that can not be configured
if c.configuratorUrl.GetParam(constant.ENABLED_KEY, "true") == "false" || len(c.configuratorUrl.Location) == 0 {
return
}
//branch for version 2.7.x
apiVersion := c.configuratorUrl.GetParam(constant.CONFIG_VERSION_KEY, "")
if len(apiVersion) != 0 {
currentSide := url.GetParam(constant.SIDE_KEY, "")
configuratorSide := c.configuratorUrl.GetParam(constant.SIDE_KEY, "")
if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide && c.configuratorUrl.Port == "0" {
localIP, _ := utils.GetLocalIP()
c.configureIfMatch(localIP, url)
} else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide && c.configuratorUrl.Port == url.Port {
c.configureIfMatch(url.Ip, url)
}
} else {
//branch for version 2.6.x and less
c.configureDeprecated(url)
}
}
//translate from java, compatible rules in java
func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
if constant.ANYHOST_VALUE == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
providers := c.configuratorUrl.GetParam(constant.OVERRIDE_PROVIDERS_KEY, "")
if len(providers) == 0 || strings.Index(providers, url.Location) >= 0 || strings.Index(providers, constant.ANYHOST_VALUE) >= 0 {
configApp := c.configuratorUrl.GetParam(constant.APPLICATION_KEY, c.configuratorUrl.Username)
currentApp := url.GetParam(constant.APPLICATION_KEY, url.Username)
if len(configApp) == 0 || constant.ANY_VALUE == configApp || configApp == currentApp {
conditionKeys := container.NewSet()
conditionKeys.Add(constant.CATEGORY_KEY)
conditionKeys.Add(constant.CHECK_KEY)
conditionKeys.Add(constant.ENABLED_KEY)
conditionKeys.Add(constant.GROUP_KEY)
conditionKeys.Add(constant.VERSION_KEY)
conditionKeys.Add(constant.APPLICATION_KEY)
conditionKeys.Add(constant.SIDE_KEY)
conditionKeys.Add(constant.CONFIG_VERSION_KEY)
conditionKeys.Add(constant.COMPATIBLE_CONFIG_KEY)
for k, _ := range c.configuratorUrl.Params {
value := c.configuratorUrl.Params.Get(k)
if strings.HasPrefix(k, "~") || k == constant.APPLICATION_KEY || k == constant.SIDE_KEY {
conditionKeys.Add(k)
if len(value) != 0 && value != constant.ANY_VALUE && value != c.configuratorUrl.Params.Get(strings.TrimPrefix(k, "~")) {
return
}
}
}
c.configuratorUrl.RemoveParams(conditionKeys)
url.SetParams(c.configuratorUrl.Params)
}
}
}
}
func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
// If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
if c.configuratorUrl.Port != "0" {
if url.Port == c.configuratorUrl.Port {
c.configureIfMatch(url.Ip, url)
}
} else {
// override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
// 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
// 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
if url.GetParam(constant.SIDE_KEY, "") == common.DubboRole[common.CONSUMER] {
localIP, _ := utils.GetLocalIP()
c.configureIfMatch(localIP, url)
} else {
c.configureIfMatch(constant.ANYHOST_VALUE, url)
}
}
}
package configurator
import (
"context"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/stretchr/testify/assert"
"testing"
)
func Test_configureVerison2p6(t *testing.T) {
url, err := common.NewURL(context.Background(), "override://0.0.0.0:0/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService")
assert.NoError(t, err)
configurator := extension.GetConfigurator("override", &url)
assert.Equal(t, "override", configurator.GetUrl().Protocol)
providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider&timestamp=1562076628&version=&warmup=100")
configurator.Configure(&providerUrl)
assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY))
}
func Test_configureVerisonOverrideAddr(t *testing.T) {
url, err := common.NewURL(context.Background(), "override://0.0.0.0:0/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService&providerAddresses=127.0.0.2:20001|127.0.0.3:20001")
assert.NoError(t, err)
configurator := extension.GetConfigurator("override", &url)
assert.Equal(t, "override", configurator.GetUrl().Protocol)
providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider&timestamp=1562076628&version=&warmup=100")
configurator.Configure(&providerUrl)
assert.Equal(t, "failover", providerUrl.Params.Get(constant.CLUSTER_KEY))
}
func Test_configureVerison2p6WithIp(t *testing.T) {
url, err := common.NewURL(context.Background(), "override://127.0.0.1:20001/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService")
assert.NoError(t, err)
configurator := extension.GetConfigurator("override", &url)
assert.Equal(t, "override", configurator.GetUrl().Protocol)
providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider&timestamp=1562076628&version=&warmup=100")
configurator.Configure(&providerUrl)
assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY))
}
func Test_configureVerison2p7(t *testing.T) {
url, err := common.NewURL(context.Background(), "jsonrpc://0.0.0.0:20001/com.xxx.mock.userProvider?group=1&version=1&cluster=failfast&application=BDTService&configVersion=1.0&side=provider")
assert.NoError(t, err)
configurator := extension.GetConfigurator("override", &url)
providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider&timestamp=1562076628&version=&warmup=100")
configurator.Configure(&providerUrl)
assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY))
}
......@@ -18,8 +18,12 @@
package constant
const (
DUBBO = "dubbo"
DUBBO = "dubbo"
PROVIDER_PROTOCOL = "provider"
//compatible with 2.6.x
OVERRIDE_PROTOCOL = "override"
)
const (
DEFAULT_WEIGHT = 100 //
DEFAULT_WARMUP = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
......@@ -42,5 +46,11 @@ const (
)
const (
ANY_VALUE = "*"
ANY_VALUE = "*"
ANYHOST_VALUE = "0.0.0.0"
)
const (
CONFIGURATORS_CATEGORY = "configurators"
DEFAULT_CATEGORY = "providers"
)
......@@ -22,13 +22,18 @@ const (
)
const (
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
CATEGORY_KEY = "category"
CHECK_KEY = "check"
ENABLED_KEY = "enabled"
SIDE_KEY = "side"
OVERRIDE_PROVIDERS_KEY = "providerAddresses"
)
const (
......@@ -73,8 +78,10 @@ const (
)
const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
CONFIG_VERSION_KEY = "configVersion"
COMPATIBLE_CONFIG_KEY = "compatible_config"
)
const (
RegistryConfigPrefix = "dubbo.registries."
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
)
var (
configurator = make(map[string]func(url *common.URL) cluster.Configurator)
)
func SetConfigurator(name string, v func(url *common.URL) cluster.Configurator) {
configurator[name] = v
}
func GetConfigurator(name string, url *common.URL) cluster.Configurator {
if configurator[name] == nil {
panic("config center for " + name + " is not existing, make sure you have import the package.")
}
return configurator[name](url)
}
......@@ -22,6 +22,7 @@ import (
"context"
"encoding/base64"
"fmt"
"math"
"net"
"net/url"
......@@ -30,6 +31,7 @@ import (
)
import (
"github.com/dubbogo/gost/container"
perrors "github.com/pkg/errors"
)
......@@ -231,8 +233,8 @@ func (c URL) String() string {
func (c URL) Key() string {
buildString := fmt.Sprintf(
"%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s",
c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
"%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s&category_key=%s",
c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""), c.GetParam(constant.CATEGORY_KEY, ""))
return buildString
//return c.ServiceKey()
}
......@@ -358,6 +360,18 @@ func (c URL) GetMethodParam(method string, key string, d string) string {
}
return r
}
func (c *URL) RemoveParams(set *container.HashSet) {
for k, _ := range set.Items {
s := k.(string)
delete(c.Params, s)
}
}
func (c *URL) SetParams(m url.Values) {
for k, _ := range m {
c.Params.Set(k, m.Get(k))
}
}
// ToMap transfer URL to Map
func (c URL) ToMap() map[string]string {
......
......@@ -232,3 +232,13 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
}
func TestURL_SetParams(t *testing.T) {
u1, err := NewURL(context.TODO(), "dubbo://:@127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&configVersion=1.0")
assert.NoError(t, err)
params := url.Values{}
params.Set("key", "3")
u1.SetParams(params)
assert.Equal(t, "3", u1.Params.Get("key"))
assert.Equal(t, "2.6.0", u1.Params.Get("version"))
}
......@@ -59,7 +59,7 @@ func main() {
println("res: %v", res)
}
time.Sleep(3e9)
time.Sleep(10e9)
println("\n\n\nstart to test jsonrpc")
user := &JsonRPCUser{}
......
dubbo.consumer.check=true
dubbo.consumer.request_timeout=5s
dubbo.consumer.connect_timeout=5s
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info client
dubbo.application.version=0.0.1
dubbo.application.owner=ZX1
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info server
dubbo.application.version=0.0.1
dubbo.application.owner=ZX1
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.protocols.dubbo1.name=dubbo
dubbo.protocols.dubbo1.ip=127.0.0.1
dubbo.protocols.dubbo1.port=20001
\ No newline at end of file
......@@ -85,6 +85,7 @@ func (dir *registryDirectory) Subscribe(url common.URL) {
for {
if !dir.registry.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
return
}
......@@ -234,3 +235,12 @@ func (dir *registryDirectory) Destroy() {
dir.cacheInvokers = []protocol.Invoker{}
})
}
type referenceConfigurationListener struct {
directory *registryDirectory
url *common.URL
}
func (l *referenceConfigurationListener) Process(event *remoting.ConfigChangeEvent) {
//l.directory.refreshInvokers(event)
}
......@@ -421,7 +421,7 @@ func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListene
//注册到dataconfig的interested
r.dataListener.AddInterestedURL(&conf)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", conf.Service()), r.dataListener)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), conf.Service()), r.dataListener)
return zkListener, nil
}
......
......@@ -46,7 +46,7 @@ type EventType int
const (
EventTypeAdd = iota
EventTypeDel
EvnetTypeUpdate
EventTypeUpdate
)
var serviceEventTypeStrings = [...]string{
......
......@@ -69,7 +69,7 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remo
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EvnetTypeUpdate, Content: string(content)})
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)})
}
case zk.EventNodeCreated:
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment