diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index 97d20ac5fca0688d3f1f48c6ffc12314f5dc3904..1fd6fb5e24644eaa237ebbf483e2ee21096a0760 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -51,8 +51,8 @@ type RouterChain struct { } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. -func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - finalInvokers := invoker +func (c *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + finalInvokers := invokers l := len(c.routers) rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2))) c.mutex.RLock() @@ -65,6 +65,15 @@ func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocat return finalInvokers } +// Notify router chain of the initial addresses from registry at the first time. Notify whenever addresses in registry change. +func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) { + for _, r := range c.routers { + if notifyRouter, ok := r.(router.NotifyRouter); ok { + notifyRouter.Notify(invokers) + } + } +} + // AddRouters Add routers to router chain // New a array add builtinRouters which is not sorted in RouterChain and routers // Sort the array diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index c1f723525f5307e7732f0ea1ecc27eca7ba09c8d..17a7fa1ae7b2b6f6be44d5c8ca1732f5c64a526b 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -169,6 +169,8 @@ func TestRouterChainRoute(t *testing.T) { dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000)) invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) + chain.SetInvokers(invokers) + targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP)) inv := &invocation.RPCInvocation{} finalInvokers := chain.Route(invokers, &targetURL, inv) @@ -228,7 +230,6 @@ func TestRouterChainRouteNoRoute(t *testing.T) { url := getConditionRouteUrl(applicationKey) assert.NotNil(t, url) - invokers := []protocol.Invoker{} dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000)) invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) diff --git a/cluster/router/router.go b/cluster/router/router.go index 1d1f79d277860abf34bebb9deab8a0f0a67c7b5d..7291224edb2eb9a7d92871eaeb86b07283cb39c1 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -50,3 +50,14 @@ type PriorityRouter interface { // 0 to ^int(0) is better Priority() int64 } + +// NotifyRouter notify router use the invoker list. Invoker list may change from time to time. This method gives the router a +// chance to prepare before {@link Router#route(List, URL, Invocation)} gets called. +type NotifyRouter interface { + router + // Notify notify whenever addresses in registry change + Notify([]protocol.Invoker) + // Priority Return Priority in router + // 0 to ^int(0) is better + Priority() int64 +} diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 5fb7ab151c1e89960152b6c8562ef08f173127df..5ca9281a2d677d7c34be73a088dafe17a1b30e9e 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -56,9 +56,10 @@ func getRule(rawRule string) (*RouterRule, error) { return r, nil } +// init use for flatten yaml tags data to @addressToTagNames and @tagNameToAddresses func (t *RouterRule) init() { - t.addressToTagNames = make(map[string][]string, 8) - t.tagNameToAddresses = make(map[string][]string, 8) + t.addressToTagNames = make(map[string][]string, 2*len(t.Tags)) + t.tagNameToAddresses = make(map[string][]string, len(t.Tags)) for _, tag := range t.Tags { for _, address := range tag.Addresses { t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name) @@ -67,14 +68,16 @@ func (t *RouterRule) init() { } } +// getAddresses gets all tag addresses func (t *RouterRule) getAddresses() []string { - var result = make([]string, 0, 8*len(t.Tags)) + var result = make([]string, 0, 2*len(t.Tags)) for _, tag := range t.Tags { result = append(result, tag.Addresses...) } return result } +// getTagNames gets all tag names func (t *RouterRule) getTagNames() []string { var result = make([]string, 0, len(t.Tags)) for _, tag := range t.Tags { @@ -84,12 +87,7 @@ func (t *RouterRule) getTagNames() []string { } func (t *RouterRule) hasTag(tag string) bool { - for _, t := range t.Tags { - if tag == t.Name { - return true - } - } - return false + return len(t.tagNameToAddresses[tag]) > 0 } func (t *RouterRule) getAddressToTagNames() map[string][]string { diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index ece950ebc0cf6973431d80c2c98e33e3c690cfc9..07ca8f1136883779ea4a1247a9be5a2a46a6bd87 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -75,98 +75,51 @@ func (c *tagRouter) tagRouterRuleCopy() RouterRule { // Route gets a list of invoker func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - if !c.isEnabled() { - return invokers - } - if len(invokers) == 0 { + var ( + result []protocol.Invoker + addresses []string + ) + if !c.isEnabled() || len(invokers) == 0 { return invokers } + + // Use static tags if dynamic tags are not set or invalid if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled { return filterUsingStaticTag(invokers, url, invocation) } + // since the rule can be changed by config center, we should copy one to use. tagRouterRuleCopy := c.tagRouterRuleCopy() tag, ok := invocation.Attachments()[constant.Tagkey] if !ok { tag = url.GetParam(constant.Tagkey, "") } - var ( - result []protocol.Invoker - addresses []string - ) + // if we are requesting for a Provider with a specific tag if len(tag) > 0 { - addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] - // filter by dynamic tag group first - if len(addresses) > 0 { - filterAddressMatches := func(invoker protocol.Invoker) bool { - url := invoker.GetUrl() - if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) { - return true - } - return false - } - result = filterInvoker(invokers, filterAddressMatches) - if len(result) > 0 || tagRouterRuleCopy.Force { - return result - } - } else { - // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by - // dynamic tag group but force=false. check static tag - filter := func(invoker protocol.Invoker) bool { - if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag { - return true - } - return false - } - result = filterInvoker(invokers, filter) + return filterInvokersWithTag(invokers, url, invocation, tagRouterRuleCopy, tag) + } + + // return all addresses in dynamic tag group. + addresses = tagRouterRuleCopy.getAddresses() + if len(addresses) > 0 { + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) } - // If there's no tagged providers that can match the current tagged request. force.tag is set by default - // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. - if len(result) > 0 || isForceUseTag(url, invocation) { + result = filterInvoker(invokers, filterAddressNotMatches) + // 1. all addresses are in dynamic tag group, return empty list. + if len(result) == 0 { return result - } else { - // FAILOVER: return all Providers without any tags. - filterAddressNotMatches := func(invoker protocol.Invoker) bool { - url := invoker.GetUrl() - if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) { - return true - } - return false - } - filterTagIsEmpty := func(invoker protocol.Invoker) bool { - if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" { - return true - } - return false - } - return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty) - } - } else { - // return all addresses in dynamic tag group. - addresses = tagRouterRuleCopy.getAddresses() - if len(addresses) > 0 { - filterAddressNotMatches := func(invoker protocol.Invoker) bool { - url := invoker.GetUrl() - if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) { - return true - } - return false - } - result = filterInvoker(invokers, filterAddressNotMatches) - // 1. all addresses are in dynamic tag group, return empty list. - if len(result) == 0 { - return result - } - } - // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the - // static tag group. - filter := func(invoker protocol.Invoker) bool { - localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") - return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) } - return filterInvoker(result, filter) } + // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the + // static tag group. + filter := func(invoker protocol.Invoker) bool { + localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") + return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) + } + return filterInvoker(result, filter) } func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { @@ -174,21 +127,20 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { if remoting.EventTypeDel == event.ConfigType { c.tagRouterRule = nil return - } else { - content, ok := event.Value.(string) - if !ok { - logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) - return - } + } + content, ok := event.Value.(string) + if !ok { + logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) + return + } - routerRule, err := getRule(content) - if err != nil { - logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err) - return - } - c.tagRouterRule = routerRule + routerRule, err := getRule(content) + if err != nil { + logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err) return } + c.tagRouterRule = routerRule + return } func (c *tagRouter) Notify(invokers []protocol.Invoker) { @@ -221,7 +173,7 @@ func (c *tagRouter) Notify(invokers []protocol.Invoker) { logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err) return } - if rule != "" { + if len(rule) > 0 { c.Process(&config_center.ConfigChangeEvent{ Key: routerKey, Value: rule, @@ -256,6 +208,48 @@ func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocati return invokers } +// filterInvokersWithTag gets a list of invoker using dynamic route with tag +func filterInvokersWithTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation, tagRouterRule RouterRule, tag string) []protocol.Invoker { + var ( + result []protocol.Invoker + addresses []string + ) + addresses, _ = tagRouterRule.getTagNameToAddresses()[tag] + // filter by dynamic tag group first + if len(addresses) > 0 { + filterAddressMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) + } + result = filterInvoker(invokers, filterAddressMatches) + if len(result) > 0 || tagRouterRule.Force { + return result + } + } else { + // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by + // dynamic tag group but force=false. check static tag + filter := func(invoker protocol.Invoker) bool { + return invoker.GetUrl().GetParam(constant.Tagkey, "") == tag + } + result = filterInvoker(invokers, filter) + } + // If there's no tagged providers that can match the current tagged request. force.tag is set by default + // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. + if len(result) > 0 || isForceUseTag(url, invocation) { + return result + } else { + // FAILOVER: return all Providers without any tags. + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + return len(addresses) == 0 || !checkAddressMatch(tagRouterRule.getAddresses(), url.Ip, url.Port) + } + filterTagIsEmpty := func(invoker protocol.Invoker) bool { + return invoker.GetUrl().GetParam(constant.Tagkey, "") == "" + } + return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty) + } +} + // isForceUseTag returns whether force use tag func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { if b, e := strconv.ParseBool(invocation.AttachmentsByKey(constant.ForceUseTag, url.GetParam(constant.ForceUseTag, "false"))); e == nil { @@ -280,7 +274,6 @@ OUTER: return res } -// TODO: need move to dubbogo/gost func checkAddressMatch(addresses []string, host, port string) bool { for _, address := range addresses { if matchIp(address, host, port) { @@ -293,6 +286,7 @@ func checkAddressMatch(addresses []string, host, port string) bool { return false } +// TODO: Already moved to dubbogo/gost, after gost by merged the follows codes will be deleted. func matchIp(pattern, host, port string) bool { // if the pattern is subnet format, it will not be allowed to config port param in pattern. if strings.Contains(pattern, "/") { @@ -329,7 +323,6 @@ func matchIpRange(pattern, host, port string) bool { } pattern = hostAndPort[0] - // TODO 甯搁噺鍖� splitCharacter := "." if !isIpv4 { splitCharacter = ":" diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index e5ddc2890c1512baa84d6a2195ecfda243f1d66b..5ea28c3799d17aaac5da4c994e9a614941e412b6 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -20,7 +20,6 @@ package tag import ( "context" "fmt" - "github.com/stretchr/testify/suite" "testing" "time" ) @@ -28,6 +27,7 @@ import ( import ( "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) import ( @@ -70,8 +70,7 @@ const ( ) var ( - zkFormat = "zookeeper://%s:%d" - conditionFormat = "condition://%s/com.foo.BarService" + zkFormat = "zookeeper://%s:%d" ) // MockInvoker is only mock the Invoker to support test tagRouter @@ -341,30 +340,6 @@ func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() { suite.Equal(targetInvokers[0], suite.invokers[4]) } -func (suite *DynamicTagRouter) TestTODO() { - testYML := `enabled: true -scope: application -force: true -runtime: false -valid: true -priority: 1 -key: demo-provider -tags: - - name: tag1 - addresses: ["127.0.0.1:20001"] - - name: tag2 - addresses: ["127.0.0.1:20002"] - - name: tag3 - addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] -` - _, err := suite.zkClient.Conn.Set(routerPath, []byte(testYML), 1) - suite.NoError(err) - - zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) - config.GetEnvInstance().SetDynamicConfiguration(configuration) -} - func TestProcess(t *testing.T) { u1, err := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, err)