diff --git a/cluster/router.go b/cluster/router.go index 2e6097046029ace7b422e2aed7332297967790c9..7e3fa99bcc825115e17fc157eaeb98f88723a779 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -29,7 +29,7 @@ type RouterFactory interface { } type Router interface { - Route([]protocol.Invoker, common.URL, protocol.Invocation) ([]protocol.Invoker, error) + Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker CompareTo(router Router) int } diff --git a/cluster/router/condition_router.go b/cluster/router/condition_router.go index a175e5c5dfbbefa30edd5553e944765d6bbe8066..4243080253aa47009d47f797ace0a15d899ef827 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition_router.go @@ -1,24 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package router import ( - "encoding/base64" + "fmt" "github.com/apache/dubbo-go/cluster" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/utils" + "github.com/apache/dubbo-go/common/logger" + "net" + "reflect" "regexp" "strings" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" perrors "github.com/pkg/errors" ) const ( RoutePattern = `([&!=,]*)\\s*([^&!=,\\s]+)` + FORCE = "force" + PRIORITY = "priority" ) -var itemExists = struct{}{} - type ConditionRouter struct { Pattern string Url common.URL @@ -28,32 +48,6 @@ type ConditionRouter struct { ThenCondition map[string]MatchPair } -func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) ([]protocol.Invoker, error) { - if len(invokers) == 0 { - return invokers, nil - } - if !c.MatchWhen(url, invocation) { - return invokers, nil - } - var result []protocol.Invoker - if len(c.ThenCondition) == 0 { - return result, nil - } - for _, invoker := range invokers { - - if c.MatchThen(invoker.GetUrl(), url) { - result = append(result, invoker) - } - } - if len(result) > 0 { - return result, nil - } else if c.Force { - //todo 鏃ュ織 - return result, nil - } - return invokers, nil -} - func (c ConditionRouter) CompareTo(r cluster.Router) int { var result int router, ok := r.(*ConditionRouter) @@ -75,9 +69,7 @@ func (c ConditionRouter) CompareTo(r cluster.Router) int { func newConditionRouter(url common.URL) (*ConditionRouter, error) { var whenRule string var thenRule string - - ruleDec, err := base64.URLEncoding.DecodeString(url.GetParam("rule", "")) - rule := string(ruleDec) + rule, err := url.GetParameterAndDecoded(constant.RULE_KEY) if err != nil || rule == "" { return nil, perrors.Errorf("Illegal route rule!") } @@ -86,7 +78,6 @@ func newConditionRouter(url common.URL) (*ConditionRouter, error) { i := strings.Index(rule, "=>") whenRule = strings.Trim(If(i < 0, "", rule[0:i]).(string), " ") thenRule = strings.Trim(If(i < 0, rule, rule[i+2:]).(string), " ") - w, err := parseRule(whenRule) if err != nil { return nil, perrors.Errorf("%s", "") @@ -102,13 +93,73 @@ func newConditionRouter(url common.URL) (*ConditionRouter, error) { return &ConditionRouter{ RoutePattern, url, - url.GetParamInt("priority", 0), - url.GetParamBool("force", false), + url.GetParamInt(PRIORITY, 0), + url.GetParamBool(FORCE, false), when, then, }, nil } +func LocalIp() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + fmt.Println(err) + } + var ip = "localhost" + for _, address := range addrs { + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + ip = ipnet.IP.String() + } + } + } + return ip +} + +func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) []protocol.Invoker { + if len(invokers) == 0 { + return invokers + } + isMatchWhen, err := c.MatchWhen(url, invocation) + if err != nil { + + var urls []string + for _, invo := range invokers { + urls = append(urls, reflect.TypeOf(invo).String()) + } + logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err) + return invokers + } + if !isMatchWhen { + return invokers + } + var result []protocol.Invoker + if len(c.ThenCondition) == 0 { + return result + } + for _, invoker := range invokers { + isMatchThen, err := c.MatchThen(invoker.GetUrl(), url) + if err != nil { + var urls []string + for _, invo := range invokers { + urls = append(urls, reflect.TypeOf(invo).String()) + } + logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err) + return invokers + } + if isMatchThen { + result = append(result, invoker) + } + } + if len(result) > 0 { + return result + } else if c.Force { + logger.Warnf("The route result is empty and force execute. consumer: %s, service: %s, router: %s", LocalIp(), url.Service()) + return result + } + return invokers +} + func parseRule(rule string) (map[string]MatchPair, error) { condition := make(map[string]MatchPair) @@ -173,19 +224,23 @@ func parseRule(rule string) (map[string]MatchPair, error) { } -func (c *ConditionRouter) MatchWhen(url common.URL, invocation protocol.Invocation) bool { - - return len(c.WhenCondition) == 0 || MatchCondition(c.WhenCondition, &url, nil, invocation) +// +func (c *ConditionRouter) MatchWhen(url common.URL, invocation protocol.Invocation) (bool, error) { + condition, err := MatchCondition(c.WhenCondition, &url, nil, invocation) + return len(c.WhenCondition) == 0 || condition, err } -func (c *ConditionRouter) MatchThen(url common.URL, param common.URL) bool { - return len(c.ThenCondition) > 0 && MatchCondition(c.ThenCondition, &url, ¶m, nil) +//MatchThen MatchThen +func (c *ConditionRouter) MatchThen(url common.URL, param common.URL) (bool, error) { + condition, err := MatchCondition(c.ThenCondition, &url, ¶m, nil) + return len(c.ThenCondition) > 0 && condition, err } -func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.URL, invocation protocol.Invocation) bool { +//MatchCondition MatchCondition +func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.URL, invocation protocol.Invocation) (bool, error) { sample := url.ToMap() if len(sample) == 0 { - return true + return true, perrors.Errorf("") } result := false for key, matchPair := range pairs { @@ -201,20 +256,20 @@ func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U } if sampleValue != "" { if !matchPair.isMatch(sampleValue, param) { - return false + return false, nil } else { result = true } } else { if !(matchPair.Matches.Empty()) { - return false + return false, nil } else { result = true } } } - return result + return result, nil } func If(b bool, t, f interface{}) interface{} { diff --git a/cluster/router/condition_router_test.go b/cluster/router/condition_router_test.go index b5a32be0de4316cddf1c909f9646d701a0b0b8f9..69d842a2b17931f735658cb27ecdfa00b78c7dd1 100644 --- a/cluster/router/condition_router_test.go +++ b/cluster/router/condition_router_test.go @@ -1,18 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package router import ( "context" "encoding/base64" - perrors "errors" "fmt" + "reflect" + "testing" + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + perrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "net" - "reflect" - "testing" ) type MockInvoker struct { @@ -89,58 +106,43 @@ func (bi *MockInvoker) Destroy() { bi.available = false } -func LocalIp() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - fmt.Println(err) - } - var ip string = "localhost" - for _, address := range addrs { - if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - ip = ipnet.IP.String() - } - } - } - return ip -} func TestRoute_matchWhen(t *testing.T) { - rpcInvacation := &invocation.RPCInvocation{} + inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) cUrl, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService") - matchWhen := router.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen, _ := router.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen) rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) router1, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule1)) - matchWhen1 := router1.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen1, _ := router1.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen1) rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) router2, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule2)) - matchWhen2 := router2.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen2, _ := router2.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, false, matchWhen2) rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) router3, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule3)) - matchWhen3 := router3.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen3, _ := router3.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen3) rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) router4, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule4)) - matchWhen4 := router4.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen4, _ := router4.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen4) rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) router5, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule5)) - matchWhen5 := router5.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen5, _ := router5.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, false, matchWhen5) rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) router6, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule6)) - matchWhen6 := router6.(*ConditionRouter).MatchWhen(cUrl, rpcInvacation) + matchWhen6, _ := router6.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen6) } func TestRoute_matchFilter(t *testing.T) { @@ -162,12 +164,12 @@ func TestRoute_matchFilter(t *testing.T) { router6, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule6)) cUrl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") - fileredInvokers1, _ := router1.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers2, _ := router2.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers3, _ := router3.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers4, _ := router4.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers5, _ := router5.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers6, _ := router6.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers1 := router1.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers2 := router2.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers3 := router3.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers4 := router4.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers5 := router5.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers6 := router6.Route(invokers, cUrl, &invocation.RPCInvocation{}) assert.Equal(t, 1, len(fileredInvokers1)) assert.Equal(t, 0, len(fileredInvokers2)) assert.Equal(t, 0, len(fileredInvokers3)) @@ -184,23 +186,23 @@ func TestRoute_methodRoute(t *testing.T) { router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) url, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") - matchWhen := router.(*ConditionRouter).MatchWhen(url, inv) + matchWhen, _ := router.(*ConditionRouter).MatchWhen(url, inv) assert.Equal(t, true, matchWhen) url1, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") - matchWhen = router.(*ConditionRouter).MatchWhen(url1, inv) + matchWhen, _ = router.(*ConditionRouter).MatchWhen(url1, inv) assert.Equal(t, true, matchWhen) url2, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) router2, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule2)) - matchWhen = router2.(*ConditionRouter).MatchWhen(url2, inv) + matchWhen, _ = router2.(*ConditionRouter).MatchWhen(url2, inv) assert.Equal(t, false, matchWhen) url3, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) router3, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule3)) - matchWhen = router3.(*ConditionRouter).MatchWhen(url3, inv) + matchWhen, _ = router3.(*ConditionRouter).MatchWhen(url3, inv) assert.Equal(t, true, matchWhen) } @@ -213,7 +215,7 @@ func TestRoute_ReturnFalse(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } func TestRoute_ReturnEmpty(t *testing.T) { @@ -224,7 +226,7 @@ func TestRoute_ReturnEmpty(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } func TestRoute_ReturnAll(t *testing.T) { @@ -234,7 +236,7 @@ func TestRoute_ReturnAll(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, invokers, fileredInvokers) } @@ -251,7 +253,7 @@ func TestRoute_HostFilter(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -269,7 +271,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -287,7 +289,7 @@ func TestRoute_False_HostFilter(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -305,7 +307,7 @@ func TestRoute_Placeholder(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrl(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -323,7 +325,7 @@ func TestRoute_NoForce(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrlWithNoForce(rule)) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, invokers, fileredInvokers) } func TestRoute_Force(t *testing.T) { @@ -339,6 +341,6 @@ func TestRoute_Force(t *testing.T) { curl, _ := common.NewURL(context.TODO(), "consumer://"+LocalIp()+"/com.foo.BarService") router, _ := NewConditionRouterFactory().GetRouter(getRouteUrlWithForce(rule, "true")) - fileredInvokers, _ := router.(*ConditionRouter).Route(invokers, curl, inv) + fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } diff --git a/common/constant/key.go b/common/constant/key.go index 4fccf05f548c2dfca338cf171e9b487b19b9f7ae..54702db02d969a33f1589731bf4563c2cffcbd7e 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -66,8 +66,7 @@ const ( APP_VERSION_KEY = "app.version" OWNER_KEY = "owner" ENVIRONMENT_KEY = "environment" -) -const ( - METHOD_KEY = "method" - METHOD_KEYS = "methods" + METHOD_KEY = "method" + METHOD_KEYS = "methods" + RULE_KEY = "rule" ) diff --git a/common/url.go b/common/url.go index e6fdf49e9f8e0b6eab28e6efa8abf31c364a5c40..2429653002967fdf2cb917d9e52c2f8323c6bba3 100644 --- a/common/url.go +++ b/common/url.go @@ -19,6 +19,7 @@ package common import ( "context" + "encoding/base64" "fmt" "net" "net/url" @@ -274,6 +275,11 @@ func (c URL) GetParam(s string, d string) string { } return r } +func (c URL) GetParameterAndDecoded(key string) (string, error) { + ruleDec, err := base64.URLEncoding.DecodeString(c.GetParam(key, "")) + value := string(ruleDec) + return value, err +} func (c URL) GetRawParameter(key string) string { if "protocol" == key {