Skip to content
Snippets Groups Projects
Commit ddb25041 authored by watermelo's avatar watermelo
Browse files

Add: add setInvoker function for router chain

parent a31a4e21
No related branches found
No related tags found
No related merge requests found
......@@ -51,8 +51,8 @@ type RouterChain struct {
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invoker
func (c *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invokers
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
......@@ -65,6 +65,15 @@ func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocat
return finalInvokers
}
// Notify router chain of the initial addresses from registry at the first time. Notify whenever addresses in registry change.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
for _, r := range c.routers {
if notifyRouter, ok := r.(router.NotifyRouter); ok {
notifyRouter.Notify(invokers)
}
}
}
// AddRouters Add routers to router chain
// New a array add builtinRouters which is not sorted in RouterChain and routers
// Sort the array
......
......@@ -169,6 +169,8 @@ func TestRouterChainRoute(t *testing.T) {
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
......@@ -228,7 +230,6 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
......
......@@ -50,3 +50,14 @@ type PriorityRouter interface {
// 0 to ^int(0) is better
Priority() int64
}
// NotifyRouter notify router use the invoker list. Invoker list may change from time to time. This method gives the router a
// chance to prepare before {@link Router#route(List, URL, Invocation)} gets called.
type NotifyRouter interface {
router
// Notify notify whenever addresses in registry change
Notify([]protocol.Invoker)
// Priority Return Priority in router
// 0 to ^int(0) is better
Priority() int64
}
......@@ -56,9 +56,10 @@ func getRule(rawRule string) (*RouterRule, error) {
return r, nil
}
// init use for flatten yaml tags data to @addressToTagNames and @tagNameToAddresses
func (t *RouterRule) init() {
t.addressToTagNames = make(map[string][]string, 8)
t.tagNameToAddresses = make(map[string][]string, 8)
t.addressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.tagNameToAddresses = make(map[string][]string, len(t.Tags))
for _, tag := range t.Tags {
for _, address := range tag.Addresses {
t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name)
......@@ -67,14 +68,16 @@ func (t *RouterRule) init() {
}
}
// getAddresses gets all tag addresses
func (t *RouterRule) getAddresses() []string {
var result = make([]string, 0, 8*len(t.Tags))
var result = make([]string, 0, 2*len(t.Tags))
for _, tag := range t.Tags {
result = append(result, tag.Addresses...)
}
return result
}
// getTagNames gets all tag names
func (t *RouterRule) getTagNames() []string {
var result = make([]string, 0, len(t.Tags))
for _, tag := range t.Tags {
......@@ -84,12 +87,7 @@ func (t *RouterRule) getTagNames() []string {
}
func (t *RouterRule) hasTag(tag string) bool {
for _, t := range t.Tags {
if tag == t.Name {
return true
}
}
return false
return len(t.tagNameToAddresses[tag]) > 0
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
......
......@@ -75,98 +75,51 @@ func (c *tagRouter) tagRouterRuleCopy() RouterRule {
// Route gets a list of invoker
func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !c.isEnabled() {
return invokers
}
if len(invokers) == 0 {
var (
result []protocol.Invoker
addresses []string
)
if !c.isEnabled() || len(invokers) == 0 {
return invokers
}
// Use static tags if dynamic tags are not set or invalid
if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
return filterUsingStaticTag(invokers, url, invocation)
}
// since the rule can be changed by config center, we should copy one to use.
tagRouterRuleCopy := c.tagRouterRuleCopy()
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = url.GetParam(constant.Tagkey, "")
}
var (
result []protocol.Invoker
addresses []string
)
// if we are requesting for a Provider with a specific tag
if len(tag) > 0 {
addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag]
// filter by dynamic tag group first
if len(addresses) > 0 {
filterAddressMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) {
return true
}
return false
}
result = filterInvoker(invokers, filterAddressMatches)
if len(result) > 0 || tagRouterRuleCopy.Force {
return result
}
} else {
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
filter := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag {
return true
}
return false
}
result = filterInvoker(invokers, filter)
return filterInvokersWithTag(invokers, url, invocation, tagRouterRuleCopy, tag)
}
// return all addresses in dynamic tag group.
addresses = tagRouterRuleCopy.getAddresses()
if len(addresses) > 0 {
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port)
}
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if len(result) > 0 || isForceUseTag(url, invocation) {
result = filterInvoker(invokers, filterAddressNotMatches)
// 1. all addresses are in dynamic tag group, return empty list.
if len(result) == 0 {
return result
} else {
// FAILOVER: return all Providers without any tags.
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) {
return true
}
return false
}
filterTagIsEmpty := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" {
return true
}
return false
}
return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
}
} else {
// return all addresses in dynamic tag group.
addresses = tagRouterRuleCopy.getAddresses()
if len(addresses) > 0 {
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) {
return true
}
return false
}
result = filterInvoker(invokers, filterAddressNotMatches)
// 1. all addresses are in dynamic tag group, return empty list.
if len(result) == 0 {
return result
}
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
filter := func(invoker protocol.Invoker) bool {
localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag))
}
return filterInvoker(result, filter)
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
filter := func(invoker protocol.Invoker) bool {
localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag))
}
return filterInvoker(result, filter)
}
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
......@@ -174,21 +127,20 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
if remoting.EventTypeDel == event.ConfigType {
c.tagRouterRule = nil
return
} else {
content, ok := event.Value.(string)
if !ok {
logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value)
return
}
}
content, ok := event.Value.(string)
if !ok {
logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value)
return
}
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.tagRouterRule = routerRule
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.tagRouterRule = routerRule
return
}
func (c *tagRouter) Notify(invokers []protocol.Invoker) {
......@@ -221,7 +173,7 @@ func (c *tagRouter) Notify(invokers []protocol.Invoker) {
logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
return
}
if rule != "" {
if len(rule) > 0 {
c.Process(&config_center.ConfigChangeEvent{
Key: routerKey,
Value: rule,
......@@ -256,6 +208,48 @@ func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocati
return invokers
}
// filterInvokersWithTag gets a list of invoker using dynamic route with tag
func filterInvokersWithTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation, tagRouterRule RouterRule, tag string) []protocol.Invoker {
var (
result []protocol.Invoker
addresses []string
)
addresses, _ = tagRouterRule.getTagNameToAddresses()[tag]
// filter by dynamic tag group first
if len(addresses) > 0 {
filterAddressMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port)
}
result = filterInvoker(invokers, filterAddressMatches)
if len(result) > 0 || tagRouterRule.Force {
return result
}
} else {
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
filter := func(invoker protocol.Invoker) bool {
return invoker.GetUrl().GetParam(constant.Tagkey, "") == tag
}
result = filterInvoker(invokers, filter)
}
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if len(result) > 0 || isForceUseTag(url, invocation) {
return result
} else {
// FAILOVER: return all Providers without any tags.
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) == 0 || !checkAddressMatch(tagRouterRule.getAddresses(), url.Ip, url.Port)
}
filterTagIsEmpty := func(invoker protocol.Invoker) bool {
return invoker.GetUrl().GetParam(constant.Tagkey, "") == ""
}
return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
}
}
// isForceUseTag returns whether force use tag
func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
if b, e := strconv.ParseBool(invocation.AttachmentsByKey(constant.ForceUseTag, url.GetParam(constant.ForceUseTag, "false"))); e == nil {
......@@ -280,7 +274,6 @@ OUTER:
return res
}
// TODO: need move to dubbogo/gost
func checkAddressMatch(addresses []string, host, port string) bool {
for _, address := range addresses {
if matchIp(address, host, port) {
......@@ -293,6 +286,7 @@ func checkAddressMatch(addresses []string, host, port string) bool {
return false
}
// TODO: Already moved to dubbogo/gost, after gost by merged the follows codes will be deleted.
func matchIp(pattern, host, port string) bool {
// if the pattern is subnet format, it will not be allowed to config port param in pattern.
if strings.Contains(pattern, "/") {
......@@ -329,7 +323,6 @@ func matchIpRange(pattern, host, port string) bool {
}
pattern = hostAndPort[0]
// TODO 常量化
splitCharacter := "."
if !isIpv4 {
splitCharacter = ":"
......
......@@ -20,7 +20,6 @@ package tag
import (
"context"
"fmt"
"github.com/stretchr/testify/suite"
"testing"
"time"
)
......@@ -28,6 +27,7 @@ import (
import (
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
import (
......@@ -70,8 +70,7 @@ const (
)
var (
zkFormat = "zookeeper://%s:%d"
conditionFormat = "condition://%s/com.foo.BarService"
zkFormat = "zookeeper://%s:%d"
)
// MockInvoker is only mock the Invoker to support test tagRouter
......@@ -341,30 +340,6 @@ func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() {
suite.Equal(targetInvokers[0], suite.invokers[4])
}
func (suite *DynamicTagRouter) TestTODO() {
testYML := `enabled: true
scope: application
force: true
runtime: false
valid: true
priority: 1
key: demo-provider
tags:
- name: tag1
addresses: ["127.0.0.1:20001"]
- name: tag2
addresses: ["127.0.0.1:20002"]
- name: tag3
addresses: ["127.0.0.1:20003", "127.0.0.1:20004"]
`
_, err := suite.zkClient.Conn.Set(routerPath, []byte(testYML), 1)
suite.NoError(err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
}
func TestProcess(t *testing.T) {
u1, err := common.NewURL(tagRouterTestUserConsumerTag)
assert.Nil(t, err)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment