From bc732b54b84d41a8f86207b67aa559194c597982 Mon Sep 17 00:00:00 2001 From: Joe Zou <yixian.zou@gmail.com> Date: Mon, 20 Jan 2020 18:42:49 +0800 Subject: [PATCH] add router event listener --- cluster/router/condition/app_router.go | 41 ++ .../{ => condition}/condition_router.go | 63 +++- .../condition/condition_router_parser.go | 46 +++ .../condition/condition_router_parser_test.go | 27 ++ .../router/condition/condition_router_rule.go | 14 + cluster/router/condition/listenable_router.go | 118 ++++++ cluster/router/condition_router_test.go | 349 ------------------ .../{url_utils.go => match/match_utils.go} | 8 +- .../match_utils_test.go} | 20 +- cluster/router/router_factory.go | 13 +- cluster/router/router_factory_test.go | 324 ++++++++++++++++ common/url.go | 20 + config_center/dynamic_configuration.go | 6 + 13 files changed, 666 insertions(+), 383 deletions(-) create mode 100644 cluster/router/condition/app_router.go rename cluster/router/{ => condition}/condition_router.go (86%) create mode 100644 cluster/router/condition/condition_router_parser.go create mode 100644 cluster/router/condition/condition_router_parser_test.go create mode 100644 cluster/router/condition/condition_router_rule.go create mode 100644 cluster/router/condition/listenable_router.go delete mode 100644 cluster/router/condition_router_test.go rename cluster/router/{url_utils.go => match/match_utils.go} (88%) rename cluster/router/{url_utils_test.go => match/match_utils_test.go} (66%) diff --git a/cluster/router/condition/app_router.go b/cluster/router/condition/app_router.go new file mode 100644 index 000000000..d5af0c36c --- /dev/null +++ b/cluster/router/condition/app_router.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package condition + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +const ( + NAME = "APP_ROUTER" + APP_ROUTER_DEFAULT_PRIORITY = int64(150) +) + +type AppRouter struct { + listenableRouter +} + +func NewAppRouter(url *common.URL) (*AppRouter, error) { + appRouter := &AppRouter{} + err := appRouter.newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, "")) + if err != nil { + return nil, err + } + appRouter.priority = APP_ROUTER_DEFAULT_PRIORITY + return appRouter, nil +} diff --git a/cluster/router/condition_router.go b/cluster/router/condition/condition_router.go similarity index 86% rename from cluster/router/condition_router.go rename to cluster/router/condition/condition_router.go index 8178a1e37..c3b64ad21 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition/condition_router.go @@ -15,9 +15,10 @@ * limitations under the License. */ -package router +package condition import ( + matcher "github.com/apache/dubbo-go/cluster/router/match" "reflect" "regexp" "strings" @@ -39,11 +40,12 @@ import ( const ( ROUTE_PATTERN = `([&!=,]*)\\s*([^&!=,\\s]+)` FORCE = "force" + ENABLED = "enabled" PRIORITY = "priority" ) var ( - router_pattern = regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`) + routerPatternReg = regexp.MustCompile(ROUTE_PATTERN) ) //ConditionRouter condition router struct @@ -52,21 +54,19 @@ type ConditionRouter struct { url *common.URL priority int64 Force bool + enabled bool WhenCondition map[string]MatchPair ThenCondition map[string]MatchPair } -func newConditionRouter(url *common.URL) (*ConditionRouter, error) { +//NewConditionRouterWithRule +func NewConditionRouterWithRule(rule string) (*ConditionRouter, error) { var ( whenRule string thenRule string when map[string]MatchPair then map[string]MatchPair ) - rule, err := url.GetParamAndDecoded(constant.RULE_KEY) - if err != nil || len(rule) == 0 { - return nil, perrors.Errorf("Illegal route rule!") - } rule = strings.Replace(rule, "consumer.", "", -1) rule = strings.Replace(rule, "provider.", "", -1) i := strings.Index(rule, "=>") @@ -99,15 +99,33 @@ func newConditionRouter(url *common.URL) (*ConditionRouter, error) { then = t } return &ConditionRouter{ - ROUTE_PATTERN, - url, - url.GetParamInt(PRIORITY, 0), - url.GetParamBool(FORCE, false), - when, - then, + Pattern: ROUTE_PATTERN, + WhenCondition: when, + ThenCondition: then, }, nil } +//NewConditionRouter +func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { + + rule, err := url.GetParamAndDecoded(constant.RULE_KEY) + if err != nil || len(rule) == 0 { + return nil, perrors.Errorf("Illegal route rule!") + } + + router, err := NewConditionRouterWithRule(rule) + if err != nil { + return nil, err + } + + router.url = url + router.priority = url.GetParamInt(PRIORITY, 0) + router.Force = url.GetParamBool(FORCE, false) + router.enabled = url.GetParamBool(ENABLED, true) + + return router, nil +} + func (c *ConditionRouter) Priority() int64 { return c.priority } @@ -116,8 +134,15 @@ func (c *ConditionRouter) Url() common.URL { return *c.url } +func (c *ConditionRouter) Enabled() bool { + return c.enabled +} + //Router determine the target server list. func (c *ConditionRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if !c.Enabled() { + return invokers + } if len(invokers) == 0 { return invokers } @@ -174,7 +199,7 @@ func parseRule(rule string) (map[string]MatchPair, error) { pair MatchPair ) values := gxset.NewSet() - matches := router_pattern.FindAllSubmatch([]byte(rule), -1) + matches := routerPatternReg.FindAllSubmatch([]byte(rule), -1) for _, groups := range matches { separator := string(groups[1]) content := string(groups[2]) @@ -225,7 +250,7 @@ func parseRule(rule string) (map[string]MatchPair, error) { } func getStartIndex(rule string) int { - if indexTuple := router_pattern.FindIndex([]byte(rule)); len(indexTuple) > 0 { + if indexTuple := routerPatternReg.FindIndex([]byte(rule)); len(indexTuple) > 0 { return indexTuple[0] } return -1 @@ -287,7 +312,7 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { if !pair.Matches.Empty() && pair.Mismatches.Empty() { for match := range pair.Matches.Items { - if isMatchGlobPattern(match.(string), value, param) { + if matcher.IsMatchGlobalPattern(match.(string), value, param) { return true } } @@ -296,7 +321,7 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { if !pair.Mismatches.Empty() && pair.Matches.Empty() { for mismatch := range pair.Mismatches.Items { - if isMatchGlobPattern(mismatch.(string), value, param) { + if matcher.IsMatchGlobalPattern(mismatch.(string), value, param) { return false } } @@ -305,12 +330,12 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { if !pair.Mismatches.Empty() && !pair.Matches.Empty() { //when both mismatches and matches contain the same value, then using mismatches first for mismatch := range pair.Mismatches.Items { - if isMatchGlobPattern(mismatch.(string), value, param) { + if matcher.IsMatchGlobalPattern(mismatch.(string), value, param) { return false } } for match := range pair.Matches.Items { - if isMatchGlobPattern(match.(string), value, param) { + if matcher.IsMatchGlobalPattern(match.(string), value, param) { return true } } diff --git a/cluster/router/condition/condition_router_parser.go b/cluster/router/condition/condition_router_parser.go new file mode 100644 index 000000000..9c5354eb1 --- /dev/null +++ b/cluster/router/condition/condition_router_parser.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import "gopkg.in/yaml.v2" + +/* Parse + * scope: application + * runtime: true + * force: false + * conditions: + * - > + * method!=sayHello => + * - > + * ip=127.0.0.1 + * => + * 1.1.1.1 + */ +func Parse(rawRule string) (*RouterRule, error) { + r := &RouterRule{} + err := yaml.Unmarshal([]byte(rawRule), r) + if err != nil { + return r, err + } + r.RawRule = rawRule + if len(r.Conditions) == 0 { + r.Valid = false + } + + return r, nil +} diff --git a/cluster/router/condition/condition_router_parser_test.go b/cluster/router/condition/condition_router_parser_test.go new file mode 100644 index 000000000..890063c04 --- /dev/null +++ b/cluster/router/condition/condition_router_parser_test.go @@ -0,0 +1,27 @@ +package condition + +import ( + "testing" +) +import ( + "github.com/stretchr/testify/assert" +) + +func TestParse(t *testing.T) { + testyml := ` +scope: application +runtime: true +force: false +conditions: + - > + method!=sayHello => + - > + ip=127.0.0.1 + => + 1.1.1.1` + rule, e := Parse(testyml) + + assert.Nil(t, e) + assert.NotNil(t, rule) + assert.Equal(t, 2, len(rule.Conditions)) +} diff --git a/cluster/router/condition/condition_router_rule.go b/cluster/router/condition/condition_router_rule.go new file mode 100644 index 000000000..0439819bf --- /dev/null +++ b/cluster/router/condition/condition_router_rule.go @@ -0,0 +1,14 @@ +package condition + +type RouterRule struct { + RawRule string + Runtime bool + Force bool + Valid bool + Enabled bool + Priority int + Dynamic bool + Scope string + Key string + Conditions []string +} diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go new file mode 100644 index 000000000..d94bb5e09 --- /dev/null +++ b/cluster/router/condition/listenable_router.go @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting" +) + +const ( + ROUTER_NAME = "LISTENABLE_ROUTER" + RULE_SUFFIX = ".condition-router" + DEFAULT_PRIORITY = ^int64(0) +) + +//ListenableRouter Abstract router which listens to dynamic configuration +type listenableRouter struct { + conditionRouters []*ConditionRouter + routerRule *RouterRule + url *common.URL + force bool + priority int64 +} + +func (l *listenableRouter) newListenableRouter(url *common.URL, ruleKey string) error { + l.url = url + l.priority = DEFAULT_PRIORITY + if ruleKey == "" { + return nil + } + + routerKey := ruleKey + RULE_SUFFIX + //TODO: add listener + + //TODO: add get rule + rule := "rule" + if rule != "" { + l.Process(&config_center.ConfigChangeEvent{ + Key: routerKey, + Value: rule, + ConfigType: remoting.EventTypeUpdate}) + } +} + +func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + if remoting.EventTypeDel == event.ConfigType { + l.routerRule = nil + l.conditionRouters = make([]*ConditionRouter, 0) + return + } + content, ok := event.Value.(string) + if !ok { + logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) + return + } + + routerRule, err := Parse(content) + if err != nil { + logger.Errorf("Parse condition router rule fail,error:[%s] ", err) + return + } + l.generateConditions(routerRule) +} + +func (l *listenableRouter) generateConditions(rule *RouterRule) { + if rule == nil || !rule.Valid { + return + } + l.conditionRouters = make([]*ConditionRouter, 0) + for _, c := range rule.Conditions { + router, e := NewConditionRouterWithRule(c) + if e != nil { + logger.Errorf("Create condition router with rule fail,raw rule:[%s] ", c) + continue + } + router.Force = rule.Force + router.enabled = rule.Enabled + l.conditionRouters = append(l.conditionRouters, router) + } +} + +func (l *listenableRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if len(invokers) == 0 || len(l.conditionRouters) == 0 { + return invokers + } + //We will check enabled status inside each router. + for _, r := range l.conditionRouters { + invokers = r.Route(invokers, url, invocation) + } + return invokers +} + +func (l *listenableRouter) Priority() int64 { + return l.priority +} + +func (l *listenableRouter) Url() common.URL { + return *l.url +} diff --git a/cluster/router/condition_router_test.go b/cluster/router/condition_router_test.go deleted file mode 100644 index 637151a74..000000000 --- a/cluster/router/condition_router_test.go +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package router - -import ( - "context" - "encoding/base64" - "fmt" - "reflect" - "testing" -) - -import ( - "github.com/dubbogo/gost/net" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" -) - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/invocation" -) - -type MockInvoker struct { - url common.URL - available bool - destroyed bool - successCount int -} - -func NewMockInvoker(url common.URL, successCount int) *MockInvoker { - return &MockInvoker{ - url: url, - available: true, - destroyed: false, - successCount: successCount, - } -} - -func (bi *MockInvoker) GetUrl() common.URL { - return bi.url -} - -func getRouteUrl(rule string) *common.URL { - url, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService") - url.AddParam("rule", rule) - url.AddParam("force", "true") - return &url -} - -func getRouteUrlWithForce(rule, force string) *common.URL { - url, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService") - url.AddParam("rule", rule) - url.AddParam("force", force) - return &url -} - -func getRouteUrlWithNoForce(rule string) *common.URL { - url, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService") - url.AddParam("rule", rule) - return &url -} - -func (bi *MockInvoker) IsAvailable() bool { - return bi.available -} - -func (bi *MockInvoker) IsDestroyed() bool { - return bi.destroyed -} - -type rest struct { - tried int - success bool -} - -var count int - -func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { - count++ - var success bool - var err error = nil - if count >= bi.successCount { - success = true - } else { - err = perrors.New("error") - } - result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} - return result -} - -func (bi *MockInvoker) Destroy() { - logger.Infof("Destroy invoker: %v", bi.GetUrl().String()) - bi.destroyed = true - bi.available = false -} - -func TestRoute_matchWhen(t *testing.T) { - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - cUrl, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService") - matchWhen, _ := router.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, true, matchWhen) - rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) - matchWhen1, _ := router1.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, true, matchWhen1) - rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - matchWhen2, _ := router2.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, false, matchWhen2) - rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - matchWhen3, _ := router3.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, true, matchWhen3) - rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) - matchWhen4, _ := router4.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, true, matchWhen4) - rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) - router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) - matchWhen5, _ := router5.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, false, matchWhen5) - rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) - router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) - matchWhen6, _ := router6.(*ConditionRouter).MatchWhen(&cUrl, inv) - assert.Equal(t, true, matchWhen6) -} - -func TestRoute_matchFilter(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - t.Logf("The local ip is %s", localIP) - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invokers := []protocol.Invoker{NewMockInvoker(url1, 1), NewMockInvoker(url2, 2), NewMockInvoker(url3, 3)} - rule1 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3")) - rule2 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.* & host != 10.20.3.3")) - rule3 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3 & host != 10.20.3.3")) - rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4")) - rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3")) - rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson")) - router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) - router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) - router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) - cUrl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers4 := router4.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers5 := router5.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers6 := router6.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - assert.Equal(t, 1, len(fileredInvokers1)) - assert.Equal(t, 0, len(fileredInvokers2)) - assert.Equal(t, 0, len(fileredInvokers3)) - assert.Equal(t, 1, len(fileredInvokers4)) - assert.Equal(t, 2, len(fileredInvokers5)) - assert.Equal(t, 1, len(fileredInvokers6)) - -} - -func TestRoute_methodRoute(t *testing.T) { - inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) - rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - url, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") - matchWhen, _ := router.(*ConditionRouter).MatchWhen(&url, inv) - assert.Equal(t, true, matchWhen) - url1, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") - matchWhen, _ = router.(*ConditionRouter).MatchWhen(&url1, inv) - assert.Equal(t, true, matchWhen) - url2, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") - rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - matchWhen, _ = router2.(*ConditionRouter).MatchWhen(&url2, inv) - assert.Equal(t, false, matchWhen) - url3, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") - rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - matchWhen, _ = router3.(*ConditionRouter).MatchWhen(&url3, inv) - assert.Equal(t, true, matchWhen) - -} - -func TestRoute_ReturnFalse(t *testing.T) { - url, _ := common.NewURL(context.TODO(), "") - localIP, _ := gxnet.GetLocalIP() - invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 0, len(fileredInvokers)) -} - -func TestRoute_ReturnEmpty(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url, _ := common.NewURL(context.TODO(), "") - invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 0, len(fileredInvokers)) -} - -func TestRoute_ReturnAll(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - invokers := []protocol.Invoker{&MockInvoker{}, &MockInvoker{}, &MockInvoker{}} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, invokers, fileredInvokers) -} - -func TestRoute_HostFilter(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 2) - invoker3 := NewMockInvoker(url3, 3) - invokers := []protocol.Invoker{invoker1, invoker2, invoker3} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 2, len(fileredInvokers)) - assert.Equal(t, invoker2, fileredInvokers[0]) - assert.Equal(t, invoker3, fileredInvokers[1]) -} - -func TestRoute_Empty_HostFilter(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 2) - invoker3 := NewMockInvoker(url3, 3) - invokers := []protocol.Invoker{invoker1, invoker2, invoker3} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 2, len(fileredInvokers)) - assert.Equal(t, invoker2, fileredInvokers[0]) - assert.Equal(t, invoker3, fileredInvokers[1]) -} - -func TestRoute_False_HostFilter(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 2) - invoker3 := NewMockInvoker(url3, 3) - invokers := []protocol.Invoker{invoker1, invoker2, invoker3} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 2, len(fileredInvokers)) - assert.Equal(t, invoker2, fileredInvokers[0]) - assert.Equal(t, invoker3, fileredInvokers[1]) -} - -func TestRoute_Placeholder(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 2) - invoker3 := NewMockInvoker(url3, 3) - invokers := []protocol.Invoker{invoker1, invoker2, invoker3} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 2, len(fileredInvokers)) - assert.Equal(t, invoker2, fileredInvokers[0]) - assert.Equal(t, invoker3, fileredInvokers[1]) -} - -func TestRoute_NoForce(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 2) - invoker3 := NewMockInvoker(url3, 3) - invokers := []protocol.Invoker{invoker1, invoker2, invoker3} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrlWithNoForce(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, invokers, fileredInvokers) -} - -func TestRoute_Force(t *testing.T) { - localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 2) - invoker3 := NewMockInvoker(url3, 3) - invokers := []protocol.Invoker{invoker1, invoker2, invoker3} - inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) - curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrlWithForce(rule, "true")) - fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) - assert.Equal(t, 0, len(fileredInvokers)) -} diff --git a/cluster/router/url_utils.go b/cluster/router/match/match_utils.go similarity index 88% rename from cluster/router/url_utils.go rename to cluster/router/match/match_utils.go index 0adff7503..cc2d3cafe 100644 --- a/cluster/router/url_utils.go +++ b/cluster/router/match/match_utils.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package router +package match import ( "strings" @@ -25,14 +25,14 @@ import ( "github.com/apache/dubbo-go/common" ) -func isMatchGlobPattern(pattern string, value string, param *common.URL) bool { +func IsMatchGlobalPattern(pattern string, value string, param *common.URL) bool { if param != nil && strings.HasPrefix(pattern, "$") { pattern = param.GetRawParam(pattern[1:]) } - return isMatchInternalPattern(pattern, value) + return IsMatchInternalPattern(pattern, value) } -func isMatchInternalPattern(pattern string, value string) bool { +func IsMatchInternalPattern(pattern string, value string) bool { if "*" == pattern { return true } diff --git a/cluster/router/url_utils_test.go b/cluster/router/match/match_utils_test.go similarity index 66% rename from cluster/router/url_utils_test.go rename to cluster/router/match/match_utils_test.go index 39925889a..7ce550fb8 100644 --- a/cluster/router/url_utils_test.go +++ b/cluster/router/match/match_utils_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package router +package match import ( "context" @@ -31,17 +31,17 @@ import ( ) func TestIsMatchInternalPattern(t *testing.T) { - assert.Equal(t, true, isMatchInternalPattern("*", "value")) - assert.Equal(t, true, isMatchInternalPattern("", "")) - assert.Equal(t, false, isMatchInternalPattern("", "value")) - assert.Equal(t, true, isMatchInternalPattern("value", "value")) - assert.Equal(t, true, isMatchInternalPattern("v*", "value")) - assert.Equal(t, true, isMatchInternalPattern("*ue", "value")) - assert.Equal(t, true, isMatchInternalPattern("*e", "value")) - assert.Equal(t, true, isMatchInternalPattern("v*e", "value")) + assert.Equal(t, true, IsMatchInternalPattern("*", "value")) + assert.Equal(t, true, IsMatchInternalPattern("", "")) + assert.Equal(t, false, IsMatchInternalPattern("", "value")) + assert.Equal(t, true, IsMatchInternalPattern("value", "value")) + assert.Equal(t, true, IsMatchInternalPattern("v*", "value")) + assert.Equal(t, true, IsMatchInternalPattern("*ue", "value")) + assert.Equal(t, true, IsMatchInternalPattern("*e", "value")) + assert.Equal(t, true, IsMatchInternalPattern("v*e", "value")) } func TestIsMatchGlobPattern(t *testing.T) { url, _ := common.NewURL(context.TODO(), "dubbo://localhost:8080/Foo?key=v*e") - assert.Equal(t, true, isMatchGlobPattern("$key", "value", &url)) + assert.Equal(t, true, IsMatchGlobalPattern("$key", "value", &url)) } diff --git a/cluster/router/router_factory.go b/cluster/router/router_factory.go index a9794cb88..badd90169 100644 --- a/cluster/router/router_factory.go +++ b/cluster/router/router_factory.go @@ -19,12 +19,14 @@ package router import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/cluster/router/condition" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" ) func init() { extension.SetRouterFactory("condition", NewConditionRouterFactory) + extension.SetRouterFactory("app", NewAppRouterFactory) } type ConditionRouterFactory struct{} @@ -33,5 +35,14 @@ func NewConditionRouterFactory() cluster.RouterFactory { return ConditionRouterFactory{} } func (c ConditionRouterFactory) Router(url *common.URL) (cluster.Router, error) { - return newConditionRouter(url) + return condition.NewConditionRouter(url) +} + +type AppRouterFactory struct{} + +func NewAppRouterFactory() cluster.RouterFactory { + return AppRouterFactory{} +} +func (c AppRouterFactory) Router(url *common.URL) (cluster.Router, error) { + return condition.NewAppRouter(url) } diff --git a/cluster/router/router_factory_test.go b/cluster/router/router_factory_test.go index 7b14e3d3d..2a7d7f827 100644 --- a/cluster/router/router_factory_test.go +++ b/cluster/router/router_factory_test.go @@ -18,13 +18,337 @@ package router import ( + "context" + "encoding/base64" + "fmt" + "github.com/apache/dubbo-go/cluster/router/condition" + "reflect" "testing" ) import ( + "github.com/dubbogo/gost/net" + perrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +type MockInvoker struct { + url common.URL + available bool + destroyed bool + successCount int +} + +func NewMockInvoker(url common.URL, successCount int) *MockInvoker { + return &MockInvoker{ + url: url, + available: true, + destroyed: false, + successCount: successCount, + } +} + +func (bi *MockInvoker) GetUrl() common.URL { + return bi.url +} + +func getRouteUrl(rule string) *common.URL { + url, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService") + url.AddParam("rule", rule) + url.AddParam("force", "true") + return &url +} + +func getRouteUrlWithForce(rule, force string) *common.URL { + url, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService") + url.AddParam("rule", rule) + url.AddParam("force", force) + return &url +} + +func getRouteUrlWithNoForce(rule string) *common.URL { + url, _ := common.NewURL(context.TODO(), "condition://0.0.0.0/com.foo.BarService") + url.AddParam("rule", rule) + return &url +} + +func (bi *MockInvoker) IsAvailable() bool { + return bi.available +} + +func (bi *MockInvoker) IsDestroyed() bool { + return bi.destroyed +} + +type rest struct { + tried int + success bool +} + +var count int + +func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + count++ + var success bool + var err error = nil + if count >= bi.successCount { + success = true + } else { + err = perrors.New("error") + } + result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} + return result +} + +func (bi *MockInvoker) Destroy() { + logger.Infof("Destroy invoker: %v", bi.GetUrl().String()) + bi.destroyed = true + bi.available = false +} + +func TestRoute_matchWhen(t *testing.T) { + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + cUrl, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService") + matchWhen, _ := router.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, true, matchWhen) + rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) + router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) + matchWhen1, _ := router1.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, true, matchWhen1) + rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) + router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) + matchWhen2, _ := router2.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, false, matchWhen2) + rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) + router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) + matchWhen3, _ := router3.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, true, matchWhen3) + rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) + router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) + matchWhen4, _ := router4.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, true, matchWhen4) + rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) + router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) + matchWhen5, _ := router5.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, false, matchWhen5) + rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) + router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) + matchWhen6, _ := router6.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + assert.Equal(t, true, matchWhen6) +} + +func TestRoute_matchFilter(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + t.Logf("The local ip is %s", localIP) + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invokers := []protocol.Invoker{NewMockInvoker(url1, 1), NewMockInvoker(url2, 2), NewMockInvoker(url3, 3)} + rule1 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3")) + rule2 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.* & host != 10.20.3.3")) + rule3 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3 & host != 10.20.3.3")) + rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4")) + rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3")) + rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson")) + router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) + router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) + router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) + router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) + router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) + router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) + cUrl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers4 := router4.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers5 := router5.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers6 := router6.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + assert.Equal(t, 1, len(fileredInvokers1)) + assert.Equal(t, 0, len(fileredInvokers2)) + assert.Equal(t, 0, len(fileredInvokers3)) + assert.Equal(t, 1, len(fileredInvokers4)) + assert.Equal(t, 2, len(fileredInvokers5)) + assert.Equal(t, 1, len(fileredInvokers6)) + +} + +func TestRoute_methodRoute(t *testing.T) { + inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) + rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + url, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") + matchWhen, _ := router.(*condition.ConditionRouter).MatchWhen(&url, inv) + assert.Equal(t, true, matchWhen) + url1, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") + matchWhen, _ = router.(*condition.ConditionRouter).MatchWhen(&url1, inv) + assert.Equal(t, true, matchWhen) + url2, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") + rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) + router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) + matchWhen, _ = router2.(*condition.ConditionRouter).MatchWhen(&url2, inv) + assert.Equal(t, false, matchWhen) + url3, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") + rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) + router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) + matchWhen, _ = router3.(*condition.ConditionRouter).MatchWhen(&url3, inv) + assert.Equal(t, true, matchWhen) + +} + +func TestRoute_ReturnFalse(t *testing.T) { + url, _ := common.NewURL(context.TODO(), "") + localIP, _ := gxnet.GetLocalIP() + invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 0, len(fileredInvokers)) +} + +func TestRoute_ReturnEmpty(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url, _ := common.NewURL(context.TODO(), "") + invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 0, len(fileredInvokers)) +} + +func TestRoute_ReturnAll(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + invokers := []protocol.Invoker{&MockInvoker{}, &MockInvoker{}, &MockInvoker{}} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, invokers, fileredInvokers) +} + +func TestRoute_HostFilter(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 2) + invoker3 := NewMockInvoker(url3, 3) + invokers := []protocol.Invoker{invoker1, invoker2, invoker3} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 2, len(fileredInvokers)) + assert.Equal(t, invoker2, fileredInvokers[0]) + assert.Equal(t, invoker3, fileredInvokers[1]) +} + +func TestRoute_Empty_HostFilter(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 2) + invoker3 := NewMockInvoker(url3, 3) + invokers := []protocol.Invoker{invoker1, invoker2, invoker3} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 2, len(fileredInvokers)) + assert.Equal(t, invoker2, fileredInvokers[0]) + assert.Equal(t, invoker3, fileredInvokers[1]) +} + +func TestRoute_False_HostFilter(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 2) + invoker3 := NewMockInvoker(url3, 3) + invokers := []protocol.Invoker{invoker1, invoker2, invoker3} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 2, len(fileredInvokers)) + assert.Equal(t, invoker2, fileredInvokers[0]) + assert.Equal(t, invoker3, fileredInvokers[1]) +} + +func TestRoute_Placeholder(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 2) + invoker3 := NewMockInvoker(url3, 3) + invokers := []protocol.Invoker{invoker1, invoker2, invoker3} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 2, len(fileredInvokers)) + assert.Equal(t, invoker2, fileredInvokers[0]) + assert.Equal(t, invoker3, fileredInvokers[1]) +} + +func TestRoute_NoForce(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 2) + invoker3 := NewMockInvoker(url3, 3) + invokers := []protocol.Invoker{invoker1, invoker2, invoker3} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrlWithNoForce(rule)) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, invokers, fileredInvokers) +} + +func TestRoute_Force(t *testing.T) { + localIP, _ := gxnet.GetLocalIP() + url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService") + url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 2) + invoker3 := NewMockInvoker(url3, 3) + invokers := []protocol.Invoker{invoker1, invoker2, invoker3} + inv := &invocation.RPCInvocation{} + rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) + curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") + router, _ := NewConditionRouterFactory().Router(getRouteUrlWithForce(rule, "true")) + fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + assert.Equal(t, 0, len(fileredInvokers)) +} + func TestNewConditionRouterFactory(t *testing.T) { factory := NewConditionRouterFactory() assert.NotNil(t, factory) diff --git a/common/url.go b/common/url.go index 6f171c7fe..0180ccb8c 100644 --- a/common/url.go +++ b/common/url.go @@ -290,6 +290,26 @@ func (c URL) Key() string { //return c.ServiceKey() } +func (c *URL) ColonSeparatedKey() string { + intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) + if intf == "" { + return "" + } + buf := &bytes.Buffer{} + buf.WriteString(intf) + buf.WriteString(":") + version := c.GetParam(constant.VERSION_KEY, "") + if version != "" && version != "0.0.0" { + buf.WriteString(version) + } + group := c.GetParam(constant.GROUP_KEY, "") + buf.WriteString(":") + if group != "" { + buf.WriteString(group) + } + return buf.String() +} + func (c *URL) GetBackupUrls() []*URL { var ( urls []*URL diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 0546d3973..6665c7681 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -18,6 +18,7 @@ package config_center import ( + "github.com/apache/dubbo-go/common" "time" ) @@ -64,3 +65,8 @@ func WithTimeout(time time.Duration) Option { opt.Timeout = time } } + +//GetRuleKey The format is '{interfaceName}:[version]:[group]' +func GetRuleKey(url common.URL) string { + return url.ColonSeparatedKey() +} -- GitLab