Skip to content
Snippets Groups Projects
Unverified Commit 613cc6ed authored by Joe Zou's avatar Joe Zou Committed by GitHub
Browse files

Merge pull request #696 from apache/revert-665-featrue/tagRouter

Ftr: dynamic tag router
parents 0a98a209 603ffc19
No related branches found
No related tags found
No related merge requests found
......@@ -36,8 +36,5 @@ cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar
mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
......@@ -24,6 +24,7 @@ import (
)
import (
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/stretchr/testify/assert"
)
......@@ -33,7 +34,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
......
......@@ -85,7 +85,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
return l, nil
}
// Process Process config change event, generate routers and set them to the listenableRouter instance
// Process Process config change event , generate routers and set them to the listenableRouter instance
func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
......
......@@ -42,7 +42,7 @@ type FileTagRouter struct {
force bool
}
// NewFileTagRouter Create file tag router instance with content (from config file)
// NewFileTagRouter Create file tag router instance with content ( from config file)
func NewFileTagRouter(content []byte) (*FileTagRouter, error) {
fileRouter := &FileTagRouter{}
rule, err := getRule(string(content))
......
......@@ -22,27 +22,9 @@ import (
"github.com/apache/dubbo-go/common/yaml"
)
/**
* %YAML1.2
* ---
* force: true
* runtime: false
* enabled: true
* priority: 1
* key: demo-provider
* tags:
* - name: tag1
* addresses: [ip1, ip2]
* - name: tag2
* addresses: [ip3, ip4]
* ...
*/
// RouterRule RouterRule config read from config file or config center
type RouterRule struct {
router.BaseRouterRule `yaml:",inline""`
Tags []Tag
addressToTagNames map[string][]string
tagNameToAddresses map[string][]string
}
func getRule(rawRule string) (*RouterRule, error) {
......@@ -52,58 +34,5 @@ func getRule(rawRule string) (*RouterRule, error) {
return r, err
}
r.RawRule = rawRule
r.init()
return r, nil
}
func (t *RouterRule) init() {
t.addressToTagNames = make(map[string][]string, 8)
t.tagNameToAddresses = make(map[string][]string, 8)
for _, tag := range t.Tags {
for _, address := range tag.Addresses {
t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name)
}
t.tagNameToAddresses[tag.Name] = tag.Addresses
}
}
func (t *RouterRule) getAddresses() []string {
var result = make([]string, 0, 8*len(t.Tags))
for _, tag := range t.Tags {
result = append(result, tag.Addresses...)
}
return result
}
func (t *RouterRule) getTagNames() []string {
var result = make([]string, 0, len(t.Tags))
for _, tag := range t.Tags {
result = append(result, tag.Name)
}
return result
}
func (t *RouterRule) hasTag(tag string) bool {
for _, t := range t.Tags {
if tag == t.Name {
return true
}
}
return false
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
return t.addressToTagNames
}
func (t *RouterRule) getTagNameToAddresses() map[string][]string {
return t.tagNameToAddresses
}
func (t *RouterRule) getTags() []Tag {
return t.Tags
}
func (t *RouterRule) setTags(tags []Tag) {
t.Tags = tags
}
......@@ -22,56 +22,19 @@ import (
)
import (
"github.com/stretchr/testify/suite"
"github.com/stretchr/testify/assert"
)
type RuleTestSuite struct {
suite.Suite
rule *RouterRule
}
func (suite *RuleTestSuite) SetupTest() {
var err error
func TestGetRule(t *testing.T) {
yml := `
scope: application
runtime: true
force: true
runtime: false
enabled: true
priority: 1
key: demo-provider
tags:
- name: tag1
addresses: [ip1, ip2]
- name: tag2
addresses: [ip3, ip4]
`
suite.rule, err = getRule(yml)
suite.Nil(err)
}
func (suite *RuleTestSuite) TestGetRule() {
var err error
suite.Equal(true, suite.rule.Force)
suite.Equal(false, suite.rule.Runtime)
suite.Equal("application", suite.rule.Scope)
suite.Equal(1, suite.rule.Priority)
suite.Equal("demo-provider", suite.rule.Key)
suite.Nil(err)
}
func (suite *RuleTestSuite) TestGetTagNames() {
suite.Equal([]string{"tag1", "tag2"}, suite.rule.getTagNames())
}
func (suite *RuleTestSuite) TestGetAddresses() {
suite.Equal([]string{"ip1", "ip2", "ip3", "ip4"}, suite.rule.getAddresses())
}
func (suite *RuleTestSuite) TestHasTag() {
suite.Equal(true, suite.rule.hasTag("tag1"))
suite.Equal(false, suite.rule.hasTag("tag404"))
}
func TestRuleTestSuite(t *testing.T) {
suite.Run(t, new(RuleTestSuite))
rule, e := getRule(yml)
assert.Nil(t, e)
assert.NotNil(t, rule)
assert.Equal(t, true, rule.Force)
assert.Equal(t, true, rule.Runtime)
assert.Equal(t, "application", rule.Scope)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tag
type Tag struct {
Name string
Addresses []string
}
func (t *Tag) getName() string {
return t.Name
}
func (t *Tag) setName(name string) {
t.Name = name
}
func (t *Tag) getAddresses() []string {
return t.Addresses
}
func (t *Tag) setAddresses(addresses []string) {
t.Addresses = addresses
}
......@@ -18,10 +18,7 @@
package tag
import (
"errors"
"net"
"strconv"
"strings"
)
import (
......@@ -30,21 +27,15 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting"
)
// tagRouter defines url, enable and the priority
type tagRouter struct {
url *common.URL
tagRouterRule *RouterRule
enabled bool
priority int64
application string
url *common.URL
enabled bool
priority int64
}
// NewTagRouter returns a tagRouter instance if url is not nil
......@@ -64,15 +55,6 @@ func (c *tagRouter) isEnabled() bool {
return c.enabled
}
func (c *tagRouter) SetApplication(app string) {
c.application = app
}
func (c *tagRouter) tagRouterRuleCopy() RouterRule {
routerRule := *c.tagRouterRule
return routerRule
}
// Route gets a list of invoker
func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !c.isEnabled() {
......@@ -81,152 +63,7 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati
if len(invokers) == 0 {
return invokers
}
if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
return filterUsingStaticTag(invokers, url, invocation)
}
// since the rule can be changed by config center, we should copy one to use.
tagRouterRuleCopy := c.tagRouterRuleCopy()
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = url.GetParam(constant.Tagkey, "")
}
var (
result []protocol.Invoker
addresses []string
)
// if we are requesting for a Provider with a specific tag
if len(tag) > 0 {
addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag]
// filter by dynamic tag group first
if len(addresses) > 0 {
filterAddressMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) {
return true
}
return false
}
result = filterInvoker(invokers, filterAddressMatches)
if len(result) > 0 || tagRouterRuleCopy.Force {
return result
}
} else {
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
filter := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag {
return true
}
return false
}
result = filterInvoker(invokers, filter)
}
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if len(result) > 0 || isForceUseTag(url, invocation) {
return result
} else {
// FAILOVER: return all Providers without any tags.
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) {
return true
}
return false
}
filterTagIsEmpty := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" {
return true
}
return false
}
return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
}
} else {
// return all addresses in dynamic tag group.
addresses = tagRouterRuleCopy.getAddresses()
if len(addresses) > 0 {
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) {
return true
}
return false
}
result = filterInvoker(invokers, filterAddressNotMatches)
// 1. all addresses are in dynamic tag group, return empty list.
if len(result) == 0 {
return result
}
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
filter := func(invoker protocol.Invoker) bool {
localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag))
}
return filterInvoker(result, filter)
}
}
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
c.tagRouterRule = nil
return
} else {
content, ok := event.Value.(string)
if !ok {
logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value)
return
}
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.tagRouterRule = routerRule
return
}
}
func (c *tagRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
return
}
invoker := invokers[0]
url := invoker.GetUrl()
providerApplication := url.GetParam(constant.RemoteApplicationKey, "")
if providerApplication == "" {
logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
"in this TagRouter is not specified.")
return
}
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
return
}
if providerApplication != c.application {
dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c)
}
routerKey := providerApplication + constant.TagRouterRuleSuffix
dynamicConfiguration.AddListener(routerKey, c)
//get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
return
}
if rule != "" {
c.Process(&config_center.ConfigChangeEvent{
Key: routerKey,
Value: rule,
ConfigType: remoting.EventTypeUpdate})
}
return filterUsingStaticTag(invokers, url, invocation)
}
// URL gets the url of tagRouter
......@@ -239,7 +76,7 @@ func (c *tagRouter) Priority() int64 {
return c.priority
}
// filterUsingStaticTag gets a list of invoker using static tag, If there's no dynamic tag rule being set, use static tag in URL
// filterUsingStaticTag gets a list of invoker using static tag
func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if tag, ok := invocation.Attachments()[constant.Tagkey]; ok {
result := make([]protocol.Invoker, 0, 8)
......@@ -263,163 +100,3 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
}
return false
}
type filter func(protocol.Invoker) bool
func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker {
var res []protocol.Invoker
OUTER:
for _, invoker := range invokers {
for _, filter := range filters {
if !filter(invoker) {
continue OUTER
}
}
res = append(res, invoker)
}
return res
}
// TODO: need move to dubbogo/gost
func checkAddressMatch(addresses []string, host, port string) bool {
for _, address := range addresses {
if matchIp(address, host, port) {
return true
}
if address == net.JoinHostPort(constant.ANYHOST_VALUE, port) {
return true
}
}
return false
}
func matchIp(pattern, host, port string) bool {
// if the pattern is subnet format, it will not be allowed to config port param in pattern.
if strings.Contains(pattern, "/") {
_, subnet, _ := net.ParseCIDR(pattern)
if subnet != nil && subnet.Contains(net.ParseIP(host)) {
return true
}
return false
}
return matchIpRange(pattern, host, port)
}
func matchIpRange(pattern, host, port string) bool {
if pattern == "" || host == "" {
logger.Error("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host)
return false
}
pattern = strings.TrimSpace(pattern)
if "*.*.*.*" == pattern || "*" == pattern {
return true
}
isIpv4 := true
ip4 := net.ParseIP(host).To4()
if ip4 == nil {
isIpv4 = false
}
hostAndPort := getPatternHostAndPort(pattern, isIpv4)
if hostAndPort[1] != "" && hostAndPort[1] != port {
return false
}
pattern = hostAndPort[0]
// TODO 常量化
splitCharacter := "."
if !isIpv4 {
splitCharacter = ":"
}
mask := strings.Split(pattern, splitCharacter)
// check format of pattern
if err := checkHostPattern(pattern, mask, isIpv4); err != nil {
logger.Error(err)
return false
}
if pattern == host {
return true
}
// short name condition
if !ipPatternContains(pattern) {
return pattern == host
}
ipAddress := strings.Split(host, splitCharacter)
for i := 0; i < len(mask); i++ {
if "*" == mask[i] || mask[i] == ipAddress[i] {
continue
} else if strings.Contains(mask[i], "-") {
rangeNumStrs := strings.Split(mask[i], "-")
if len(rangeNumStrs) != 2 {
logger.Error("There is wrong format of ip Address: " + mask[i])
return false
}
min := getNumOfIpSegment(rangeNumStrs[0], isIpv4)
max := getNumOfIpSegment(rangeNumStrs[1], isIpv4)
ip := getNumOfIpSegment(ipAddress[i], isIpv4)
if ip < min || ip > max {
return false
}
} else if "0" == ipAddress[i] && "0" == mask[i] || "00" == mask[i] || "000" == mask[i] || "0000" == mask[i] {
continue
} else if mask[i] != ipAddress[i] {
return false
}
}
return true
}
func ipPatternContains(pattern string) bool {
return strings.Contains(pattern, "*") || strings.Contains(pattern, "-")
}
func checkHostPattern(pattern string, mask []string, isIpv4 bool) error {
if !isIpv4 {
if len(mask) != 8 && ipPatternContains(pattern) {
return errors.New("If you config ip expression that contains '*' or '-', please fill qualified ip pattern like 234e:0:4567:0:0:0:3d:*. ")
}
if len(mask) != 8 && !strings.Contains(pattern, "::") {
return errors.New("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern)
}
} else {
if len(mask) != 4 {
return errors.New("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern)
}
}
return nil
}
func getPatternHostAndPort(pattern string, isIpv4 bool) []string {
result := make([]string, 2)
if strings.HasPrefix(pattern, "[") && strings.Contains(pattern, "]:") {
end := strings.Index(pattern, "]:")
result[0] = pattern[1:end]
result[1] = pattern[end+2:]
} else if strings.HasPrefix(pattern, "[") && strings.HasSuffix(pattern, "]") {
result[0] = pattern[1 : len(pattern)-1]
result[1] = ""
} else if isIpv4 && strings.Contains(pattern, ":") {
end := strings.Index(pattern, ":")
result[0] = pattern[:end]
result[1] = pattern[end+1:]
} else {
result[0] = pattern
}
return result
}
func getNumOfIpSegment(ipSegment string, isIpv4 bool) int {
if isIpv4 {
ipSeg, _ := strconv.Atoi(ipSegment)
return ipSeg
}
ipSeg, _ := strconv.ParseInt(ipSegment, 0, 16)
return int(ipSeg)
}
......@@ -19,43 +19,25 @@ package tag
import (
"context"
"fmt"
"github.com/stretchr/testify/suite"
"testing"
"time"
)
import (
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
const (
tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou&remote.application=test-tag"
tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20002/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai&remote.application=test-tag"
tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing&remote.application=test-tag"
tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20004/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing&remote.application=test-tag"
tagRouterTestUserConsumer = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider1 = "dubbo://127.0.0.1:20001/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider2 = "dubbo://127.0.0.1:20002/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider3 = "dubbo://127.0.0.1:20003/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider4 = "dubbo://127.0.0.1:20004/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag4"
tagRouterTestDynamicIpv4Provider5 = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag5"
tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou"
tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai"
tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing"
tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true"
tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true"
tagRouterTestDubboTag = "dubbo.tag"
tagRouterTestDubboForceTag = "dubbo.force.tag"
......@@ -63,15 +45,6 @@ const (
tagRouterTestGuangZhou = "guangzhou"
tagRouterTestFalse = "false"
tagRouterTestTrue = "true"
routerPath = "/dubbo/config/dubbo/test-tag.tag-router"
routerLocalIP = "127.0.0.1"
routerZk = "zookeeper"
)
var (
zkFormat = "zookeeper://%s:%d"
conditionFormat = "condition://%s/com.foo.BarService"
)
// MockInvoker is only mock the Invoker to support test tagRouter
......@@ -187,211 +160,3 @@ func TestTagRouterRouteNoForce(t *testing.T) {
invRst2 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 3, len(invRst2))
}
func TestFilterInvoker(t *testing.T) {
u2, e2 := common.NewURL(tagRouterTestHangZhouUrl)
u3, e3 := common.NewURL(tagRouterTestShangHaiUrl)
u4, e4 := common.NewURL(tagRouterTestBeijingUrl)
u5, e5 := common.NewURL(tagRouterTestEnabledBeijingUrl)
assert.Nil(t, e2)
assert.Nil(t, e3)
assert.Nil(t, e4)
assert.Nil(t, e5)
inv2 := NewMockInvoker(u2)
inv3 := NewMockInvoker(u3)
inv4 := NewMockInvoker(u4)
inv5 := NewMockInvoker(u5)
var invokers []protocol.Invoker
invokers = append(invokers, inv2, inv3, inv4, inv5)
filterTag := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" {
return true
}
return false
}
res := filterInvoker(invokers, filterTag)
assert.Equal(t, []protocol.Invoker{inv4, inv5}, res)
flag := true
filterEnabled := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParamBool(constant.RouterEnabled, false) == flag {
return true
}
return false
}
res2 := filterInvoker(invokers, filterTag, filterEnabled)
assert.Equal(t, []protocol.Invoker{inv4}, res2)
}
type DynamicTagRouter struct {
suite.Suite
rule *RouterRule
route *tagRouter
zkClient *zookeeper.ZookeeperClient
testCluster *zk.TestCluster
invokers []protocol.Invoker
url *common.URL
}
func TestDynamicTagRouter(t *testing.T) {
dtg := &DynamicTagRouter{}
u1, _ := common.NewURL(tagRouterTestDynamicIpv4Provider1)
u2, _ := common.NewURL(tagRouterTestDynamicIpv4Provider2)
u3, _ := common.NewURL(tagRouterTestDynamicIpv4Provider3)
u4, _ := common.NewURL(tagRouterTestDynamicIpv4Provider4)
u5, _ := common.NewURL(tagRouterTestDynamicIpv4Provider5)
inv1 := NewMockInvoker(u1)
inv2 := NewMockInvoker(u2)
inv3 := NewMockInvoker(u3)
inv4 := NewMockInvoker(u4)
inv5 := NewMockInvoker(u5)
dtg.invokers = append(dtg.invokers, inv1, inv2, inv3, inv4, inv5)
suite.Run(t, dtg)
}
func (suite *DynamicTagRouter) SetupTest() {
var err error
testYML := `enabled: true
scope: application
force: true
runtime: false
valid: true
priority: 1
key: demo-provider
tags:
- name: tag1
addresses: ["127.0.0.1:20001"]
- name: tag2
addresses: ["127.0.0.1:20002"]
- name: tag3
addresses: ["127.0.0.1:20003", "127.0.0.1:20004"]
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
suite.NoError(err)
err = z.Create(routerPath)
suite.NoError(err)
suite.zkClient = z
suite.testCluster = ts
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
suite.NoError(err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
suite.Nil(err)
suite.NotNil(configuration)
url, e1 := common.NewURL(tagRouterTestUserConsumerTag)
suite.Nil(e1)
tagRouter, err := NewTagRouter(&url)
suite.Nil(err)
suite.NotNil(tagRouter)
suite.route = tagRouter
suite.url = &url
}
func (suite *DynamicTagRouter) TearDownTest() {
suite.zkClient.Close()
suite.testCluster.Stop()
}
func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
invokers := suite.invokers
suite.route.Notify(invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag1")
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[0])
consumer.SetAttachments(tagRouterTestDubboTag, "tag3")
targetInvokers = suite.route.Route(invokers, suite.url, consumer)
suite.Equal(2, len(targetInvokers))
suite.Equal(targetInvokers, []protocol.Invoker{suite.invokers[2], suite.invokers[3]})
}
func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() {
invokers := suite.invokers
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag4")
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[3])
}
// Teas no tag and return a address are not in dynamic tag group
func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() {
invokers := suite.invokers
suite.route.Notify(invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[4])
// test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
consumer.SetAttachments(tagRouterTestDubboTag, "tag5")
targetInvokers = suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[4])
}
func (suite *DynamicTagRouter) TestTODO() {
testYML := `enabled: true
scope: application
force: true
runtime: false
valid: true
priority: 1
key: demo-provider
tags:
- name: tag1
addresses: ["127.0.0.1:20001"]
- name: tag2
addresses: ["127.0.0.1:20002"]
- name: tag3
addresses: ["127.0.0.1:20003", "127.0.0.1:20004"]
`
_, err := suite.zkClient.Conn.Set(routerPath, []byte(testYML), 1)
suite.NoError(err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
}
func TestProcess(t *testing.T) {
u1, err := common.NewURL(tagRouterTestUserConsumerTag)
assert.Nil(t, err)
tagRouter, e := NewTagRouter(&u1)
assert.Nil(t, e)
assert.NotNil(t, tagRouter)
testYML := `
scope: application
force: true
runtime: false
enabled: true
valid: true
priority: 1
key: demo-provider
tags:
- name: beijing
addresses: [192.168.1.1, 192.168.1.2]
- name: hangzhou
addresses: [192.168.1.3, 192.168.1.4]
`
tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd})
assert.NotNil(t, tagRouter.tagRouterRule)
assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames())
assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses())
assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"])
assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"])
tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel})
assert.Nil(t, tagRouter.tagRouterRule)
}
......@@ -188,9 +188,6 @@ const (
HealthCheckRouterName = "health_check"
// TagRouterName Specify the name of TagRouter
TagRouterName = "tag"
// TagRouterRuleSuffix Specify tag router suffix
TagRouterRuleSuffix = ".tag-router"
RemoteApplicationKey = "remote.application"
// ConditionRouterRuleSuffix Specify condition router suffix
ConditionRouterRuleSuffix = ".condition-router"
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment