Skip to content
Snippets Groups Projects
Commit 2e01ed5b authored by watermelo's avatar watermelo
Browse files

Ftr: add dynamic tag router

parent c6c39e2d
No related branches found
No related tags found
No related merge requests found
......@@ -85,7 +85,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
return l, nil
}
// Process Process config change event , generate routers and set them to the listenableRouter instance
// Process Process config change event, generate routers and set them to the listenableRouter instance
func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
......
......@@ -42,7 +42,7 @@ type FileTagRouter struct {
force bool
}
// NewFileTagRouter Create file tag router instance with content ( from config file)
// NewFileTagRouter Create file tag router instance with content (from config file)
func NewFileTagRouter(content []byte) (*FileTagRouter, error) {
fileRouter := &FileTagRouter{}
rule, err := getRule(string(content))
......
......@@ -22,9 +22,27 @@ import (
"github.com/apache/dubbo-go/common/yaml"
)
/**
* %YAML1.2
* ---
* force: true
* runtime: false
* enabled: true
* priority: 1
* key: demo-provider
* tags:
* - name: tag1
* addresses: [ip1, ip2]
* - name: tag2
* addresses: [ip3, ip4]
* ...
*/
// RouterRule RouterRule config read from config file or config center
type RouterRule struct {
router.BaseRouterRule `yaml:",inline""`
tags []tag
addressToTagNames map[string][]string
tagNameToAddresses map[string][]string
}
func getRule(rawRule string) (*RouterRule, error) {
......@@ -34,5 +52,32 @@ func getRule(rawRule string) (*RouterRule, error) {
return r, err
}
r.RawRule = rawRule
// TODO init tags
return r, nil
}
func (t *RouterRule) getAddresses() []string {
// TODO get all tag addresses
return nil
}
func (t *RouterRule) getTagNames() []string {
// TODO get all tag names
return nil
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
return t.addressToTagNames
}
func (t *RouterRule) getTagNameToAddresses() map[string][]string {
return t.tagNameToAddresses
}
func (t *RouterRule) getTags() []tag {
return t.tags
}
func (t *RouterRule) setTags(tags []tag) {
t.tags = tags
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tag
type tag struct {
name string
addresses []string
}
func (t *tag) getName() string {
return t.name
}
func (t *tag) setName(name string) {
t.name = name
}
func (t *tag) getAddresses() []string {
return t.addresses
}
func (t *tag) setAddresses(addresses []string) {
t.addresses = addresses
}
......@@ -18,6 +18,7 @@
package tag
import (
"fmt"
"strconv"
)
......@@ -28,13 +29,17 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting"
)
type tagRouter struct {
url *common.URL
enabled bool
priority int64
url *common.URL
tagRouterRule *RouterRule
enabled bool
priority int64
}
func NewTagRouter(url *common.URL) (*tagRouter, error) {
......@@ -59,7 +64,81 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati
if len(invokers) == 0 {
return invokers
}
return filterUsingStaticTag(invokers, url, invocation)
// since the rule can be changed by config center, we should copy one to use.
tagRouterRuleCopy := c.tagRouterRule
if tagRouterRuleCopy == nil || !tagRouterRuleCopy.Valid || !tagRouterRuleCopy.Enabled {
return filterUsingStaticTag(invokers, url, invocation)
}
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = url.GetParam(constant.Tagkey, "")
}
var (
result []protocol.Invoker
addresses []string
)
if tag != "" {
addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag]
// filter by dynamic tag group first
if len(addresses) > 0 {
// TODO filter invokers
result = nil
if len(result) > 0 || tagRouterRuleCopy.Force {
return result
}
} else {
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
// TODO filter invokers
return result
}
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if len(result) > 0 || isForceUseTag(url, invocation) {
return result
} else {
// FAILOVER: return all Providers without any tags.
// TODO filter invokers
return result
}
} else {
// return all addresses in dynamic tag group.
addresses = tagRouterRuleCopy.getAddresses()
if len(addresses) > 0 {
// TODO filter invokers
// 1. all addresses are in dynamic tag group, return empty list.
if len(result) == 0 {
return result
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
}
// TODO filter invokers
return result
}
}
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of dynamic tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
c.tagRouterRule = nil
return
} else {
content, ok := event.Value.(string)
if !ok {
msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value)
logger.Error(msg)
return
}
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.tagRouterRule = routerRule
return
}
}
func (c *tagRouter) URL() common.URL {
......@@ -92,3 +171,21 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
}
return false
}
func addressMatches(url *common.URL, addresses []string) bool {
return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port)
}
func addressNotMatches(url *common.URL, addresses []string) bool {
return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port)
}
func checkAddressMatch(addresses []string, host, port string) bool {
for _, address := range addresses {
// TODO address parse
if address == (host + port) {
return true
}
}
return false
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment