Skip to content
Snippets Groups Projects
Commit f9fdbebb authored by Ian Luo's avatar Ian Luo
Browse files

Merge branch 'develop', and reimplement dynamic tag rule

parents 7f478948 e3610678
No related branches found
No related tags found
No related merge requests found
Showing
with 699 additions and 93 deletions
......@@ -36,5 +36,8 @@ xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"
md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar
md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
......@@ -36,5 +36,8 @@ cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar
mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
......@@ -112,11 +112,15 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
app := url.GetParam(constant.APPLICATION_KEY, "")
dirApp := dir.GetUrl().GetParam(constant.APPLICATION_KEY, "")
if len(dirApp) == 0 && dir.GetUrl().SubURL != nil {
dirApp = dir.GetUrl().SubURL.GetParam(constant.APPLICATION_KEY, "")
}
serviceKey := dir.GetUrl().ServiceKey()
if serviceKey == "" {
if len(serviceKey) == 0 {
serviceKey = dir.GetUrl().SubURL.ServiceKey()
}
if len(app) > 0 && app == dir.GetUrl().GetParam(constant.APPLICATION_KEY, "") {
if len(app) > 0 && app == dirApp {
return true
}
if url.ServiceKey() == serviceKey {
......
......@@ -24,7 +24,6 @@ import (
)
import (
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/stretchr/testify/assert"
)
......@@ -34,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
......
......@@ -87,7 +87,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
return l, nil
}
// Process Process config change event , generate routers and set them to the listenableRouter instance
// Process Process config change event, generate routers and set them to the listenableRouter instance
func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
......
......@@ -44,7 +44,7 @@ type FileTagRouter struct {
force bool
}
// NewFileTagRouter Create file tag router instance with content ( from config file)
// NewFileTagRouter Create file tag router instance with content (from config file)
func NewFileTagRouter(content []byte) (*FileTagRouter, error) {
fileRouter := &FileTagRouter{}
rule, err := getRule(string(content))
......
......@@ -22,9 +22,27 @@ import (
"github.com/apache/dubbo-go/common/yaml"
)
/**
* %YAML1.2
* ---
* force: true
* runtime: false
* enabled: true
* priority: 1
* key: demo-provider
* tags:
* - name: tag1
* addresses: [ip1, ip2]
* - name: tag2
* addresses: [ip3, ip4]
* ...
*/
// RouterRule RouterRule config read from config file or config center
type RouterRule struct {
router.BaseRouterRule `yaml:",inline"`
Tags []Tag
addressToTagNames map[string][]string
tagNameToAddresses map[string][]string
}
func getRule(rawRule string) (*RouterRule, error) {
......@@ -34,5 +52,54 @@ func getRule(rawRule string) (*RouterRule, error) {
return r, err
}
r.RawRule = rawRule
r.parseTags()
return r, nil
}
// parseTags use for flattening tags data to @addressToTagNames and @tagNameToAddresses
func (t *RouterRule) parseTags() {
t.addressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.tagNameToAddresses = make(map[string][]string, len(t.Tags))
for _, tag := range t.Tags {
for _, address := range tag.Addresses {
t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name)
}
t.tagNameToAddresses[tag.Name] = tag.Addresses
}
}
func (t *RouterRule) getAddresses() []string {
var result = make([]string, 0, 2*len(t.Tags))
for _, tag := range t.Tags {
result = append(result, tag.Addresses...)
}
return result
}
func (t *RouterRule) getTagNames() []string {
var result = make([]string, 0, len(t.Tags))
for _, tag := range t.Tags {
result = append(result, tag.Name)
}
return result
}
func (t *RouterRule) hasTag(tag string) bool {
return len(t.tagNameToAddresses[tag]) > 0
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
return t.addressToTagNames
}
func (t *RouterRule) getTagNameToAddresses() map[string][]string {
return t.tagNameToAddresses
}
func (t *RouterRule) getTags() []Tag {
return t.Tags
}
func (t *RouterRule) setTags(tags []Tag) {
t.Tags = tags
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tag
type Tag struct {
Name string
Addresses []string
}
......@@ -19,10 +19,13 @@ package tag
import (
"strconv"
"strings"
"sync"
)
import (
"github.com/RoaringBitmap/roaring"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
......@@ -30,19 +33,49 @@ import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting"
)
const (
name = "tag-router"
name = "tag-router"
staticPrefix = "static-"
dynamicPrefix = "dynamic-"
)
// addrMetadata keeps snapshot data for app name, and some infos extracted from dynamic tag rule in order to make
// Route() method lock-free.
type addrMetadata struct {
// application name
application string
// is rule a runtime rule
ruleRuntime bool
// is rule a force rule
ruleForce bool
// is rule a valid rule
ruleValid bool
// is rule an enabled rule
ruleEnabled bool
}
// Source indicates where the metadata comes from.
func (m *addrMetadata) Source() string {
return name
}
// tagRouter defines url, enable and the priority
type tagRouter struct {
url *common.URL
enabled bool
priority int64
url *common.URL
tagRouterRule *RouterRule
enabled bool
priority int64
application string
ruleChanged bool
mutex sync.RWMutex
}
// NewTagRouter returns a tagRouter instance if url is not nil
......@@ -68,16 +101,20 @@ func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *com
return invokers
}
tag := findStaticTag(invocation)
if tag == "" {
return invokers
if shouldUseDynamicTag(cache.FindAddrMeta(c)) {
return c.routeWithDynamicTag(invokers, cache, url, invocation)
}
return c.routeWithStaticTag(invokers, cache, url, invocation)
}
ret := utils.EmptyAddr
if target, ok := cache.FindAddrPool(c)[tag]; ok {
ret = utils.JoinIfNotEqual(target, invokers)
// routeWithStaticTag routes with static tag rule
func (c *tagRouter) routeWithStaticTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return invokers
}
ret, _ := c.filterWithTag(invokers, cache, staticPrefix+tag)
if ret.IsEmpty() && !isForceUseTag(url, invocation) {
return invokers
}
......@@ -85,6 +122,78 @@ func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *com
return ret
}
// routeWithDynamicTag routes with dynamic tag rule
func (c *tagRouter) routeWithDynamicTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return c.filterNotInDynamicTag(invokers, cache)
}
ret, ok := c.filterWithTag(invokers, cache, dynamicPrefix+tag)
if ok && (!ret.IsEmpty() || isTagRuleForce(cache.FindAddrMeta(c))) {
return ret
}
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
if ret.IsEmpty() {
ret, _ = c.filterWithTag(invokers, cache, staticPrefix+tag)
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if !ret.IsEmpty() || isForceUseTag(url, invocation) {
return ret
}
return c.filterNotInDynamicTag(invokers, cache)
}
return ret
}
// filterWithTag filters incoming invokers with the given tag
func (c *tagRouter) filterWithTag(invokers *roaring.Bitmap, cache router.Cache, tag string) (*roaring.Bitmap, bool) {
if target, ok := cache.FindAddrPool(c)[tag]; ok {
return utils.JoinIfNotEqual(target, invokers), true
}
return utils.EmptyAddr, false
}
// filterNotInDynamicTag filters incoming invokers not applied to dynamic tag rule
func (c *tagRouter) filterNotInDynamicTag(invokers *roaring.Bitmap, cache router.Cache) *roaring.Bitmap {
// FAILOVER: return all Providers without any tags.
invokers = invokers.Clone()
for k, v := range cache.FindAddrPool(c) {
if strings.HasPrefix(k, dynamicPrefix) {
invokers.AndNot(v)
}
}
return invokers
}
// Process parses dynamic tag rule
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
c.tagRouterRule = nil
return
}
content, ok := event.Value.(string)
if !ok {
logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value)
return
}
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.tagRouterRule = routerRule
c.ruleChanged = true
return
}
// URL gets the url of tagRouter
func (c *tagRouter) URL() common.URL {
return *c.url
......@@ -97,31 +206,115 @@ func (c *tagRouter) Priority() int64 {
// Pool divided invokers into different address pool by tag.
func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
c.fetchRuleIfNecessary(invokers)
rb := make(router.AddrPool, 8)
for i, invoker := range invokers {
url := invoker.GetUrl()
tag := url.GetParam(constant.Tagkey, "")
if tag != "" {
if _, ok := rb[tag]; !ok {
rb[tag] = roaring.NewBitmap()
}
rb[tag].AddInt(i)
}
poolWithStaticTag(invokers, rb)
c.mutex.Lock()
defer c.mutex.Unlock()
poolWithDynamicTag(invokers, c.tagRouterRule, rb)
c.ruleChanged = false
// create metadata in order to avoid lock in route()
meta := addrMetadata{application: c.application}
if c.tagRouterRule != nil {
meta.ruleForce = c.tagRouterRule.Force
meta.ruleEnabled = c.tagRouterRule.Enabled
meta.ruleValid = c.tagRouterRule.Valid
}
return rb, &meta
}
// fetchRuleIfNecessary fetches, parses rule and register listener for the further change
func (c *tagRouter) fetchRuleIfNecessary(invokers []protocol.Invoker) {
if invokers == nil || len(invokers) == 0 {
return
}
url := invokers[0].GetUrl()
providerApplication := url.GetParam(constant.RemoteApplicationKey, "")
if len(providerApplication) == 0 {
logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
"in this TagRouter is not specified.")
return
}
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
return
}
if providerApplication != c.application {
dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c)
} else {
// if app name from URL is as same as the current app name, then it is safe to jump out
return
}
c.application = providerApplication
routerKey := providerApplication + constant.TagRouterRuleSuffix
dynamicConfiguration.AddListener(routerKey, c)
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
return
}
if len(rule) > 0 {
c.Process(&config_center.ConfigChangeEvent{
Key: routerKey,
Value: rule,
ConfigType: remoting.EventTypeUpdate})
}
return rb, nil
}
// ShouldPool returns false, to make sure address cache for tag router happens once and only once.
func (c *tagRouter) ShouldPool() bool {
return false
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.ruleChanged
}
// Name returns pool's name
func (c *tagRouter) Name() string {
return name
}
func findStaticTag(invocation protocol.Invocation) string {
return invocation.Attachments()[constant.Tagkey]
// poolWithDynamicTag pools addresses with the tags defined in dynamic tag rule, all keys have prefix "dynamic-"
func poolWithDynamicTag(invokers []protocol.Invoker, rule *RouterRule, pool router.AddrPool) {
if rule == nil {
return
}
tagNameToAddresses := rule.getTagNameToAddresses()
for tag, addrs := range tagNameToAddresses {
pool[dynamicPrefix+tag] = addrsToBitmap(addrs, invokers)
}
}
// poolWithStaticTag pools addresses with tags found from incoming URLs, all keys have prefix "static-"
func poolWithStaticTag(invokers []protocol.Invoker, pool router.AddrPool) {
for i, invoker := range invokers {
url := invoker.GetUrl()
tag := url.GetParam(constant.Tagkey, "")
if tag != "" {
if _, ok := pool[staticPrefix+tag]; !ok {
pool[staticPrefix+tag] = roaring.NewBitmap()
}
pool[staticPrefix+tag].AddInt(i)
}
}
}
// shouldUseDynamicTag uses the snapshot data from the parsed rule to decide if dynamic tag rule should be used or not
func shouldUseDynamicTag(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleValid && meta.(*addrMetadata).ruleEnabled
}
// isTagRuleForce uses the snapshot data from the parsed rule to decide if dynamic tag rule is forced or not
func isTagRuleForce(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleForce
}
// isForceUseTag returns whether force use tag
......@@ -131,3 +324,45 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
}
return false
}
// addrsToBitmap finds indexes for the given IP addresses in the target URL list, if any '0.0.0.0' IP address is met,
// then returns back all indexes of the URLs list.
func addrsToBitmap(addrs []string, invokers []protocol.Invoker) *roaring.Bitmap {
ret := roaring.NewBitmap()
for _, addr := range addrs {
if isAnyHost(addr) {
ret.AddRange(0, uint64(len(invokers)))
return ret
}
index := findIndexWithIp(addr, invokers)
if index != -1 {
ret.AddInt(index)
}
}
return ret
}
// findIndexWithIp finds index for one particular IP
func findIndexWithIp(addr string, invokers []protocol.Invoker) int {
for i, invoker := range invokers {
if gxnet.MatchIP(addr, invoker.GetUrl().Ip, invoker.GetUrl().Port) {
return i
}
}
return -1
}
// isAnyHost checks if an IP is '0.0.0.0'
func isAnyHost(addr string) bool {
return strings.HasPrefix(addr, constant.ANYHOST_VALUE)
}
// findTag finds tag, first from invocation's attachment, then from URL
func findTag(invocation protocol.Invocation, consumerUrl *common.URL) string {
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = consumerUrl.GetParam(constant.Tagkey, "")
}
return tag
}
......@@ -19,11 +19,16 @@ package tag
import (
"context"
"fmt"
"testing"
"time"
)
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
import (
......@@ -31,16 +36,29 @@ import (
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
const (
tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou"
tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai"
tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing"
tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true"
tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true"
tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou&remote.application=test-tag"
tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20002/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai&remote.application=test-tag"
tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing&remote.application=test-tag"
tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20004/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing&remote.application=test-tag"
tagRouterTestUserConsumer = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider1 = "dubbo://127.0.0.1:20001/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider2 = "dubbo://127.0.0.1:20002/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider3 = "dubbo://127.0.0.1:20003/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag"
tagRouterTestDynamicIpv4Provider4 = "dubbo://127.0.0.1:20004/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag4"
tagRouterTestDynamicIpv4Provider5 = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag5"
tagRouterTestDubboTag = "dubbo.tag"
tagRouterTestDubboForceTag = "dubbo.force.tag"
......@@ -48,6 +66,15 @@ const (
tagRouterTestGuangZhou = "guangzhou"
tagRouterTestFalse = "false"
tagRouterTestTrue = "true"
routerPath = "/dubbo/config/dubbo/test-tag.tag-router"
routerLocalIP = "127.0.0.1"
routerZk = "zookeeper"
)
var (
zkFormat = "zookeeper://%s:%d"
conditionFormat = "condition://%s/com.foo.BarService"
)
// MockInvoker is only mock the Invoker to support test tagRouter
......@@ -171,3 +198,197 @@ func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
cache.SetAddrMeta(r.Name(), info)
return cache
}
func setUpAddrCacheWithRuleDisabled(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
info.(*addrMetadata).ruleEnabled = false
cache := chain.BuildCache(addrs)
cache.SetAddrPool(r.Name(), pool)
cache.SetAddrMeta(r.Name(), info)
return cache
}
func TestRouteBeijingInvoker(t *testing.T) {
u2, e2 := common.NewURL(tagRouterTestHangZhouUrl)
u3, e3 := common.NewURL(tagRouterTestShangHaiUrl)
u4, e4 := common.NewURL(tagRouterTestBeijingUrl)
u5, e5 := common.NewURL(tagRouterTestEnabledBeijingUrl)
assert.Nil(t, e2)
assert.Nil(t, e3)
assert.Nil(t, e4)
assert.Nil(t, e5)
inv2 := NewMockInvoker(u2)
inv3 := NewMockInvoker(u3)
inv4 := NewMockInvoker(u4)
inv5 := NewMockInvoker(u5)
var invokers []protocol.Invoker
invokers = append(invokers, inv2, inv3, inv4, inv5)
url, _ := common.NewURL(tagRouterTestBeijingUrl)
tagRouter, _ := NewTagRouter(&url)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(tagRouter, invokers)
inv := &invocation.RPCInvocation{}
res := tagRouter.Route(rb, cache, &url, inv)
// inv4 and inv5
assert.Equal(t, []uint32{2, 3}, res.ToArray())
}
type DynamicTagRouter struct {
suite.Suite
rule *RouterRule
route *tagRouter
zkClient *zookeeper.ZookeeperClient
testCluster *zk.TestCluster
invokers []protocol.Invoker
url *common.URL
}
func TestDynamicTagRouter(t *testing.T) {
dtg := &DynamicTagRouter{}
u1, _ := common.NewURL(tagRouterTestDynamicIpv4Provider1)
u2, _ := common.NewURL(tagRouterTestDynamicIpv4Provider2)
u3, _ := common.NewURL(tagRouterTestDynamicIpv4Provider3)
u4, _ := common.NewURL(tagRouterTestDynamicIpv4Provider4)
u5, _ := common.NewURL(tagRouterTestDynamicIpv4Provider5)
inv1 := NewMockInvoker(u1)
inv2 := NewMockInvoker(u2)
inv3 := NewMockInvoker(u3)
inv4 := NewMockInvoker(u4)
inv5 := NewMockInvoker(u5)
dtg.invokers = append(dtg.invokers, inv1, inv2, inv3, inv4, inv5)
suite.Run(t, dtg)
}
func (suite *DynamicTagRouter) SetupTest() {
var err error
testYML := `enabled: true
scope: application
force: true
runtime: false
valid: true
priority: 1
key: demo-provider
tags:
- name: tag1
addresses: ["127.0.0.1:20001"]
- name: tag2
addresses: ["127.0.0.1:20002"]
- name: tag3
addresses: ["127.0.0.1:20003", "127.0.0.1:20004"]
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
suite.NoError(err)
err = z.Create(routerPath)
suite.NoError(err)
suite.zkClient = z
suite.testCluster = ts
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
suite.NoError(err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
suite.Nil(err)
suite.NotNil(configuration)
url, e1 := common.NewURL(tagRouterTestUserConsumerTag)
suite.Nil(e1)
tagRouter, err := NewTagRouter(&url)
suite.Nil(err)
suite.NotNil(tagRouter)
suite.route = tagRouter
suite.url = &url
}
func (suite *DynamicTagRouter) TearDownTest() {
suite.zkClient.Close()
suite.testCluster.Stop()
}
func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
invokers := suite.invokers
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(suite.route, invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag1")
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(0))
consumer.SetAttachments(tagRouterTestDubboTag, "tag3")
targetInvokers = suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(2), targetInvokers.GetCardinality())
suite.True(targetInvokers.Contains(2) && targetInvokers.Contains(3))
}
func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() {
invokers := suite.invokers
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCacheWithRuleDisabled(suite.route, invokers)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag4")
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(3))
}
// Teas no tag and return a address are not in dynamic tag group
func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() {
invokers := suite.invokers
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(suite.route, invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(4))
// test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
consumer.SetAttachments(tagRouterTestDubboTag, "tag5")
targetInvokers = suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(4))
}
func TestProcess(t *testing.T) {
u1, err := common.NewURL(tagRouterTestUserConsumerTag)
assert.Nil(t, err)
tagRouter, e := NewTagRouter(&u1)
assert.Nil(t, e)
assert.NotNil(t, tagRouter)
testYML := `
scope: application
force: true
runtime: false
enabled: true
valid: true
priority: 1
key: demo-provider
tags:
- name: beijing
addresses: [192.168.1.1, 192.168.1.2]
- name: hangzhou
addresses: [192.168.1.3, 192.168.1.4]
`
tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd})
assert.NotNil(t, tagRouter.tagRouterRule)
assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames())
assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses())
assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"])
assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"])
tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel})
assert.Nil(t, tagRouter.tagRouterRule)
}
......@@ -46,6 +46,7 @@ const (
DUBBO_KEY = "dubbo"
RELEASE_KEY = "release"
ANYHOST_KEY = "anyhost"
SSL_ENABLED_KEY = "ssl-enabled"
)
const (
......@@ -192,6 +193,9 @@ const (
HealthCheckRouterName = "health_check"
// TagRouterName Specify the name of TagRouter
TagRouterName = "tag"
// TagRouterRuleSuffix Specify tag router suffix
TagRouterRuleSuffix = ".tag-router"
RemoteApplicationKey = "remote.application"
// ConditionRouterRuleSuffix Specify condition router suffix
ConditionRouterRuleSuffix = ".condition-router"
......
......@@ -25,7 +25,7 @@ import (
)
import (
"github.com/dubbogo/getty"
"github.com/apache/dubbo-getty"
perrors "github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
......
......@@ -42,6 +42,7 @@ var (
providerConfig *ProviderConfig
// baseConfig = providerConfig.BaseConfig or consumerConfig
baseConfig *BaseConfig
sslEnabled = false
// configAccessMutex is used to make sure that xxxxConfig will only be created once if needed.
// it should be used combine with double-check to avoid the race condition
......@@ -325,6 +326,12 @@ func GetBaseConfig() *BaseConfig {
return baseConfig
}
func GetSslEnabled() bool {
return sslEnabled
}
func SetSslEnabled(enabled bool) {
sslEnabled = enabled
}
func IsProvider() bool {
return providerConfig != nil
}
......@@ -24,7 +24,6 @@ import (
import (
"github.com/creasty/defaults"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
......@@ -34,6 +33,10 @@ import (
"github.com/apache/dubbo-go/common/yaml"
)
const (
MaxWheelTimeSpan = 900e9 // 900s, 15 minute
)
/////////////////////////
// consumerConfig
/////////////////////////
......@@ -107,9 +110,9 @@ func ConsumerInit(confConFile string) error {
if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout)
}
if consumerConfig.RequestTimeout >= time.Duration(getty.MaxWheelTimeSpan) {
if consumerConfig.RequestTimeout >= time.Duration(MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "request_timeout %s should be less than %s",
consumerConfig.Request_Timeout, time.Duration(getty.MaxWheelTimeSpan))
consumerConfig.Request_Timeout, time.Duration(MaxWheelTimeSpan))
}
}
if consumerConfig.Connect_Timeout != "" {
......
......@@ -188,6 +188,7 @@ func (c *ServiceConfig) Export() error {
common.WithPort(port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
common.WithParamsValue(constant.SSL_ENABLED_KEY, strconv.FormatBool(GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
common.WithToken(c.Token),
)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"github.com/apache/dubbo-getty"
)
var (
serverTlsConfigBuilder getty.TlsConfigBuilder
clientTlsConfigBuilder getty.TlsConfigBuilder
)
func GetServerTlsConfigBuilder() getty.TlsConfigBuilder {
return serverTlsConfigBuilder
}
func GetClientTlsConfigBuilder() getty.TlsConfigBuilder {
return clientTlsConfigBuilder
}
func SetServerTlsConfigBuilder(configBuilder getty.TlsConfigBuilder) {
serverTlsConfigBuilder = configBuilder
}
func SetClientTlsConfigBuilder(configBuilder getty.TlsConfigBuilder) {
clientTlsConfigBuilder = configBuilder
}
......@@ -27,7 +27,9 @@ import (
import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
"github.com/zouyx/agollo"
"github.com/zouyx/agollo/v3"
agolloConstant "github.com/zouyx/agollo/v3/constant"
"github.com/zouyx/agollo/v3/env/config"
)
import (
......@@ -35,19 +37,18 @@ import (
"github.com/apache/dubbo-go/common/constant"
cc "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
)
const (
apolloProtocolPrefix = "http://"
apolloConfigFormat = "%s.%s"
apolloConfigFormat = "%s%s"
)
type apolloConfiguration struct {
url *common.URL
listeners sync.Map
appConf *agollo.AppConfig
appConf *config.AppConfig
parser parser.ConfigurationParser
}
......@@ -60,31 +61,20 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {
appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "")
namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP))
c.appConf = &agollo.AppConfig{
AppId: appId,
c.appConf = &config.AppConfig{
AppID: appId,
Cluster: configCluster,
NamespaceName: namespaces,
Ip: configAddr,
IP: configAddr,
}
agollo.InitCustomConfig(func() (*agollo.AppConfig, error) {
agollo.InitCustomConfig(func() (*config.AppConfig, error) {
return c.appConf, nil
})
return c, agollo.Start()
}
func getChangeType(change agollo.ConfigChangeType) remoting.EventType {
switch change {
case agollo.ADDED:
return remoting.EventTypeAdd
case agollo.DELETED:
return remoting.EventTypeDel
default:
return remoting.EventTypeUpdate
}
}
func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) {
k := &cc.Options{}
for _, opt := range opts {
......@@ -92,7 +82,7 @@ func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationL
}
key = k.Group + key
l, _ := c.listeners.LoadOrStore(key, NewApolloListener())
l, _ := c.listeners.LoadOrStore(key, newApolloListener())
l.(*apolloListener).AddListener(listener)
}
......@@ -110,10 +100,10 @@ func (c *apolloConfiguration) RemoveListener(key string, listener cc.Configurati
}
func getProperties(namespace string) string {
return getNamespaceName(namespace, agollo.Properties)
return getNamespaceName(namespace, agolloConstant.Properties)
}
func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat) string {
func getNamespaceName(namespace string, configFileFormat agolloConstant.ConfigFileFormat) string {
return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat)
}
......@@ -148,7 +138,7 @@ func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (stri
if config == nil {
return "", perrors.New(fmt.Sprintf("nothing in namespace:%s ", key))
}
return config.GetContent(agollo.Properties), nil
return config.GetContent(), nil
}
func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string {
......
......@@ -202,9 +202,9 @@ func initMockApollo(t *testing.T) *apolloConfiguration {
return configuration
}
func TestAddListener(t *testing.T) {
func TestListener(t *testing.T) {
listener := &apolloDataListener{}
listener.wg.Add(1)
listener.wg.Add(2)
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
......@@ -215,28 +215,14 @@ func TestAddListener(t *testing.T) {
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
//test add
apollo.AddListener(mockNamespace, listener)
listener.wg.Wait()
assert.Equal(t, "registries.hangzhouzk.username", listener.event)
assert.Equal(t, "mockDubbog.properties", listener.event)
assert.Greater(t, listener.count, 0)
deleteMockJson(t)
}
func TestRemoveListener(t *testing.T) {
listener := &apolloDataListener{}
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "11111"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
apollo.AddListener(mockNamespace, listener)
//test remove
apollo.RemoveListener(mockNamespace, listener)
assert.Equal(t, "", listener.event)
listenerCount := 0
apollo.listeners.Range(func(_, value interface{}) bool {
apolloListener := value.(*apolloListener)
......@@ -247,7 +233,6 @@ func TestRemoveListener(t *testing.T) {
return true
})
assert.Equal(t, listenerCount, 0)
assert.Equal(t, listener.count, 0)
deleteMockJson(t)
}
......
......@@ -18,34 +18,48 @@
package apollo
import (
"github.com/zouyx/agollo"
"github.com/zouyx/agollo/v3"
"github.com/zouyx/agollo/v3/storage"
"gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{}
}
// NewApolloListener creates a new apolloListener
func NewApolloListener() *apolloListener {
// nolint
func newApolloListener() *apolloListener {
return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
}
}
// OnChange process each listener
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: getChangeType(change.ChangeType),
Key: key,
Value: change.NewValue,
})
}
func (a *apolloListener) OnChange(changeEvent *storage.ChangeEvent) {
}
// OnNewestChange process each listener by all changes
func (a *apolloListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {
b, err := yaml.Marshal(changeEvent.Changes)
if err != nil {
logger.Errorf("apollo onNewestChange err %+v",
err)
return
}
content := string(b)
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: remoting.EventTypeUpdate,
Key: changeEvent.Namespace,
Value: content,
})
}
}
......
......@@ -98,6 +98,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
}
nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url)
if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
......@@ -115,6 +116,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.NacosClient().SetClient(&configClient)
}
return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
......@@ -167,8 +169,9 @@ func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url commo
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(timeout / time.Millisecond)),
ListenInterval: uint64(int32(timeout / time.Millisecond)),
NotLoadCacheAtStart: true,
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, logDir),
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
Username: url.GetParam(constant.NACOS_USERNAME, ""),
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment