diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 4ccc19e95521d03ae1f663ec276646cf30926533..7f4f14a8e47173253e2e5b7f4eed5db2bed64958 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -85,7 +85,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { return l, nil } -// Process Process config change event , generate routers and set them to the listenableRouter instance +// Process Process config change event, generate routers and set them to the listenableRouter instance func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) if remoting.EventTypeDel == event.ConfigType { diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 8144c83203dbe98778dd6bb8dcdb9888be664b3b..433abcb72eb6e201e64790af932e847d8faba8af 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -42,7 +42,7 @@ type FileTagRouter struct { force bool } -// NewFileTagRouter Create file tag router instance with content ( from config file) +// NewFileTagRouter Create file tag router instance with content (from config file) func NewFileTagRouter(content []byte) (*FileTagRouter, error) { fileRouter := &FileTagRouter{} rule, err := getRule(string(content)) diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 926446dcb2f18fa2fd4639a9246a85f435d75d45..b2b6659054e277652f7394377e034969d5cd9bce 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -22,9 +22,27 @@ import ( "github.com/apache/dubbo-go/common/yaml" ) +/** + * %YAML1.2 + * --- + * force: true + * runtime: false + * enabled: true + * priority: 1 + * key: demo-provider + * tags: + * - name: tag1 + * addresses: [ip1, ip2] + * - name: tag2 + * addresses: [ip3, ip4] + * ... + */ // RouterRule RouterRule config read from config file or config center type RouterRule struct { router.BaseRouterRule `yaml:",inline""` + tags []tag + addressToTagNames map[string][]string + tagNameToAddresses map[string][]string } func getRule(rawRule string) (*RouterRule, error) { @@ -34,5 +52,32 @@ func getRule(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule + // TODO init tags return r, nil } + +func (t *RouterRule) getAddresses() []string { + // TODO get all tag addresses + return nil +} + +func (t *RouterRule) getTagNames() []string { + // TODO get all tag names + return nil +} + +func (t *RouterRule) getAddressToTagNames() map[string][]string { + return t.addressToTagNames +} + +func (t *RouterRule) getTagNameToAddresses() map[string][]string { + return t.tagNameToAddresses +} + +func (t *RouterRule) getTags() []tag { + return t.tags +} + +func (t *RouterRule) setTags(tags []tag) { + t.tags = tags +} diff --git a/cluster/router/tag/tag.go b/cluster/router/tag/tag.go new file mode 100644 index 0000000000000000000000000000000000000000..07d719af86874b8012a4e9c5a6d490e3516df178 --- /dev/null +++ b/cluster/router/tag/tag.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tag + +type tag struct { + name string + addresses []string +} + +func (t *tag) getName() string { + return t.name +} + +func (t *tag) setName(name string) { + t.name = name +} + +func (t *tag) getAddresses() []string { + return t.addresses +} + +func (t *tag) setAddresses(addresses []string) { + t.addresses = addresses +} diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 87da418943e90c63a905f35260ada7880d6f51b9..ff1209dd45445747614b5c43cee3afab425957d8 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -18,6 +18,7 @@ package tag import ( + "fmt" "strconv" ) @@ -28,13 +29,17 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting" ) type tagRouter struct { - url *common.URL - enabled bool - priority int64 + url *common.URL + tagRouterRule *RouterRule + enabled bool + priority int64 } func NewTagRouter(url *common.URL) (*tagRouter, error) { @@ -59,7 +64,81 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati if len(invokers) == 0 { return invokers } - return filterUsingStaticTag(invokers, url, invocation) + // since the rule can be changed by config center, we should copy one to use. + tagRouterRuleCopy := c.tagRouterRule + if tagRouterRuleCopy == nil || !tagRouterRuleCopy.Valid || !tagRouterRuleCopy.Enabled { + return filterUsingStaticTag(invokers, url, invocation) + } + tag, ok := invocation.Attachments()[constant.Tagkey] + if !ok { + tag = url.GetParam(constant.Tagkey, "") + } + var ( + result []protocol.Invoker + addresses []string + ) + if tag != "" { + addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] + // filter by dynamic tag group first + if len(addresses) > 0 { + // TODO filter invokers + result = nil + if len(result) > 0 || tagRouterRuleCopy.Force { + return result + } + } else { + // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by + // dynamic tag group but force=false. check static tag + // TODO filter invokers + return result + } + // If there's no tagged providers that can match the current tagged request. force.tag is set by default + // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. + if len(result) > 0 || isForceUseTag(url, invocation) { + return result + } else { + // FAILOVER: return all Providers without any tags. + // TODO filter invokers + return result + } + } else { + // return all addresses in dynamic tag group. + addresses = tagRouterRuleCopy.getAddresses() + if len(addresses) > 0 { + // TODO filter invokers + // 1. all addresses are in dynamic tag group, return empty list. + if len(result) == 0 { + return result + } + // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the + // static tag group. + } + // TODO filter invokers + return result + } +} + +func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of dynamic tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + if remoting.EventTypeDel == event.ConfigType { + c.tagRouterRule = nil + return + } else { + content, ok := event.Value.(string) + if !ok { + msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value) + logger.Error(msg) + return + } + + routerRule, err := getRule(content) + if err != nil { + logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err) + return + } + c.tagRouterRule = routerRule + return + } } func (c *tagRouter) URL() common.URL { @@ -92,3 +171,21 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { } return false } + +func addressMatches(url *common.URL, addresses []string) bool { + return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) +} + +func addressNotMatches(url *common.URL, addresses []string) bool { + return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) +} + +func checkAddressMatch(addresses []string, host, port string) bool { + for _, address := range addresses { + // TODO address parse + if address == (host + port) { + return true + } + } + return false +}