diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 46cd527b0bb1d3b0d60579b521cb62c9ff96607c..90b231d7c42fa75606d6967eff7b1343f842f1d3 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -18,13 +18,13 @@ package directory import ( + "github.com/apache/dubbo-go/cluster/router" "sync" ) import ( "go.uber.org/atomic" ) import ( - "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -37,7 +37,7 @@ var routerURLSet = gxset.NewSet() type BaseDirectory struct { url *common.URL destroyed *atomic.Bool - routers []cluster.Router + routers []router.Router mutex sync.Mutex once sync.Once } @@ -62,7 +62,7 @@ func (dir *BaseDirectory) GetDirectoryUrl() *common.URL { return dir.url } -func (dir *BaseDirectory) SetRouters(routers []cluster.Router) { +func (dir *BaseDirectory) SetRouters(routers []router.Router) { routerKey := dir.GetUrl().GetParam(constant.ROUTER_KEY, "") if len(routerKey) > 0 { factory := extension.GetRouterFactory(dir.GetUrl().Protocol) @@ -78,7 +78,7 @@ func (dir *BaseDirectory) SetRouters(routers []cluster.Router) { dir.routers = routers } -func (dir *BaseDirectory) Routers() []cluster.Router { +func (dir *BaseDirectory) Routers() []router.Router { dir.once.Do(func() { rs := routerURLSet.Values() for _, r := range rs { diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go new file mode 100644 index 0000000000000000000000000000000000000000..d65f6d201970d955ee97202664faf4ef1de3d45b --- /dev/null +++ b/cluster/router/chain/chain.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "sort" +) + +// RouterChain Router chain +type Chain struct { + //full list of addresses from registry, classified by method name. + invokers []protocol.Invoker + //containing all routers, reconstruct every time 'route://' urls change. + routers []router.Router + // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the + // instance will never delete or recreate. + builtinRouters []router.Router +} + +func (c Chain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + finalInvokers := invoker + for _, r := range c.routers { + finalInvokers = r.Route(invoker, url, invocation) + } + return finalInvokers +} +func (c Chain) AddRouters(routers []router.Router) { + newRouters := make([]router.Router, 0) + newRouters = append(newRouters, c.builtinRouters...) + newRouters = append(newRouters, routers...) + sortRouter(newRouters) + c.routers = newRouters +} + +func NewRouterChain(url common.URL) *Chain { + routerFactories := extension.GetRouters() + if len(routerFactories) == 0 { + return nil + } + routers := make([]router.Router, 0) + for _, routerFactory := range routerFactories { + r, err := routerFactory().Router(&url) + if err != nil { + logger.Errorf("router chain build router fail! error:%v", err) + continue + } + routers = append(routers, r) + } + + newRouters := make([]router.Router, len(routers)) + copy(newRouters, routers) + + sortRouter(newRouters) + + chain := &Chain{ + builtinRouters: routers, + routers: newRouters, + } + + return chain +} + +func sortRouter(routers []router.Router) { + sort.Stable(ByPriority(routers)) +} + +type ByPriority []router.Router + +func (a ByPriority) Len() int { return len(a) } +func (a ByPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() } diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6825ddfd3ac22518a50453e8cc76cc50952659bf --- /dev/null +++ b/cluster/router/chain/chain_test.go @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain diff --git a/cluster/router/chain/factory.go b/cluster/router/chain/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..e2166726bb9194eecacf5d7fa3109e84a8a489ec --- /dev/null +++ b/cluster/router/chain/factory.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterChainsFactory("chain", NewRouterChainFactory) +} + +type RouterChainFactory struct{} + +func (c RouterChainFactory) Router(*common.URL) (router.RouterChain, error) { + panic("implement me") +} + +func NewRouterChainFactory() router.RouterChainFactory { + return RouterChainFactory{} +} diff --git a/cluster/router/condition/condition_router_rule.go b/cluster/router/condition/condition_router_rule.go deleted file mode 100644 index 0439819bfde541976602f403b06b9586337873ec..0000000000000000000000000000000000000000 --- a/cluster/router/condition/condition_router_rule.go +++ /dev/null @@ -1,14 +0,0 @@ -package condition - -type RouterRule struct { - RawRule string - Runtime bool - Force bool - Valid bool - Enabled bool - Priority int - Dynamic bool - Scope string - Key string - Conditions []string -} diff --git a/cluster/router/router_factory.go b/cluster/router/condition/factory.go similarity index 72% rename from cluster/router/router_factory.go rename to cluster/router/condition/factory.go index badd90169f15bd6020ae3c50ba0771ee23ec186d..9222242b2968290eab51423560fe905a28b07646 100644 --- a/cluster/router/router_factory.go +++ b/cluster/router/condition/factory.go @@ -15,11 +15,10 @@ * limitations under the License. */ -package router +package condition import ( - "github.com/apache/dubbo-go/cluster" - "github.com/apache/dubbo-go/cluster/router/condition" + "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" ) @@ -31,18 +30,18 @@ func init() { type ConditionRouterFactory struct{} -func NewConditionRouterFactory() cluster.RouterFactory { +func NewConditionRouterFactory() router.RouterFactory { return ConditionRouterFactory{} } -func (c ConditionRouterFactory) Router(url *common.URL) (cluster.Router, error) { - return condition.NewConditionRouter(url) +func (c ConditionRouterFactory) Router(url *common.URL) (router.Router, error) { + return NewConditionRouter(url) } type AppRouterFactory struct{} -func NewAppRouterFactory() cluster.RouterFactory { +func NewAppRouterFactory() router.RouterFactory { return AppRouterFactory{} } -func (c AppRouterFactory) Router(url *common.URL) (cluster.Router, error) { - return condition.NewAppRouter(url) +func (c AppRouterFactory) Router(url *common.URL) (router.Router, error) { + return NewAppRouter(url) } diff --git a/cluster/router/router_factory_test.go b/cluster/router/condition/factory_test.go similarity index 78% rename from cluster/router/router_factory_test.go rename to cluster/router/condition/factory_test.go index 2a7d7f8279181c0dff0f3674fc62b70a7aba0a41..ff63a282f194ddddd7c071408e03eca43acb8521 100644 --- a/cluster/router/router_factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -15,13 +15,13 @@ * limitations under the License. */ -package router +package condition import ( "context" "encoding/base64" "fmt" - "github.com/apache/dubbo-go/cluster/router/condition" + router7 "github.com/apache/dubbo-go/cluster/router" "reflect" "testing" ) @@ -116,33 +116,33 @@ func (bi *MockInvoker) Destroy() { func TestRoute_matchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + router, _ := router7.Router(getRouteUrl(rule)) cUrl, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService") - matchWhen, _ := router.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + matchWhen, _ := router.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen) rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) - matchWhen1, _ := router1.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + router1, _ := router7.Router(getRouteUrl(rule1)) + matchWhen1, _ := router1.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen1) rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - matchWhen2, _ := router2.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + router2, _ := router7.Router(getRouteUrl(rule2)) + matchWhen2, _ := router2.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, false, matchWhen2) rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - matchWhen3, _ := router3.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + router3, _ := router7.Router(getRouteUrl(rule3)) + matchWhen3, _ := router3.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen3) rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) - matchWhen4, _ := router4.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + router4, _ := router7.Router(getRouteUrl(rule4)) + matchWhen4, _ := router4.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen4) rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) - router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) - matchWhen5, _ := router5.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + router5, _ := router7.Router(getRouteUrl(rule5)) + matchWhen5, _ := router5.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, false, matchWhen5) rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) - router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) - matchWhen6, _ := router6.(*condition.ConditionRouter).MatchWhen(&cUrl, inv) + router6, _ := router7.Router(getRouteUrl(rule6)) + matchWhen6, _ := router6.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen6) } @@ -159,19 +159,19 @@ func TestRoute_matchFilter(t *testing.T) { rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4")) rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3")) rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson")) - router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) - router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) - router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) + router1, _ := router7.Router(getRouteUrl(rule1)) + router2, _ := router7.Router(getRouteUrl(rule2)) + router3, _ := router7.Router(getRouteUrl(rule3)) + router4, _ := router7.Router(getRouteUrl(rule4)) + router5, _ := router7.Router(getRouteUrl(rule5)) + router6, _ := router7.Router(getRouteUrl(rule6)) cUrl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers4 := router4.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers5 := router5.Route(invokers, &cUrl, &invocation.RPCInvocation{}) - fileredInvokers6 := router6.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers1 := router7.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers2 := router7.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers3 := router7.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers4 := router7.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers5 := router7.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers6 := router7.Route(invokers, &cUrl, &invocation.RPCInvocation{}) assert.Equal(t, 1, len(fileredInvokers1)) assert.Equal(t, 0, len(fileredInvokers2)) assert.Equal(t, 0, len(fileredInvokers3)) @@ -184,22 +184,22 @@ func TestRoute_matchFilter(t *testing.T) { func TestRoute_methodRoute(t *testing.T) { inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + router, _ := router7.Router(getRouteUrl(rule)) url, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") - matchWhen, _ := router.(*condition.ConditionRouter).MatchWhen(&url, inv) + matchWhen, _ := router.(*ConditionRouter).MatchWhen(&url, inv) assert.Equal(t, true, matchWhen) url1, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") - matchWhen, _ = router.(*condition.ConditionRouter).MatchWhen(&url1, inv) + matchWhen, _ = router.(*ConditionRouter).MatchWhen(&url1, inv) assert.Equal(t, true, matchWhen) url2, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - matchWhen, _ = router2.(*condition.ConditionRouter).MatchWhen(&url2, inv) + router2, _ := router7.Router(getRouteUrl(rule2)) + matchWhen, _ = router2.(*ConditionRouter).MatchWhen(&url2, inv) assert.Equal(t, false, matchWhen) url3, _ := common.NewURL(context.TODO(), "consumer://1.1.1.1/com.foo.BarService?methods=getFoo") rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - matchWhen, _ = router3.(*condition.ConditionRouter).MatchWhen(&url3, inv) + router3, _ := router7.Router(getRouteUrl(rule3)) + matchWhen, _ = router3.(*ConditionRouter).MatchWhen(&url3, inv) assert.Equal(t, true, matchWhen) } @@ -211,8 +211,8 @@ func TestRoute_ReturnFalse(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } @@ -223,8 +223,8 @@ func TestRoute_ReturnEmpty(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } @@ -234,8 +234,8 @@ func TestRoute_ReturnAll(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, invokers, fileredInvokers) } @@ -251,8 +251,8 @@ func TestRoute_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -270,8 +270,8 @@ func TestRoute_Empty_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -289,8 +289,8 @@ func TestRoute_False_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -308,8 +308,8 @@ func TestRoute_Placeholder(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -327,8 +327,8 @@ func TestRoute_NoForce(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrlWithNoForce(rule)) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrlWithNoForce(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, invokers, fileredInvokers) } @@ -344,8 +344,8 @@ func TestRoute_Force(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) curl, _ := common.NewURL(context.TODO(), "consumer://"+localIP+"/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrlWithForce(rule, "true")) - fileredInvokers := router.(*condition.ConditionRouter).Route(invokers, &curl, inv) + router, _ := router7.Router(getRouteUrlWithForce(rule, "true")) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index d94bb5e096e5f063902a6c94e972a534897b5342..2beb60227e3221586ff293b03f48e18f97a3d3c1 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -17,6 +17,15 @@ package condition +import ( + "fmt" + "github.com/apache/dubbo-go/common/config" +) + +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" @@ -48,16 +57,20 @@ func (l *listenableRouter) newListenableRouter(url *common.URL, ruleKey string) } routerKey := ruleKey + RULE_SUFFIX - //TODO: add listener - - //TODO: add get rule - rule := "rule" - if rule != "" { - l.Process(&config_center.ConfigChangeEvent{ - Key: routerKey, - Value: rule, - ConfigType: remoting.EventTypeUpdate}) + //add listener + dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration() + dynamicConfiguration.AddListener(routerKey, l) + //get rule + rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP)) + if len(rule) == 0 || err != nil { + return perrors.Errorf("get rule fail, config rule{%s}, error{%v}", rule, err) } + l.Process(&config_center.ConfigChangeEvent{ + Key: routerKey, + Value: rule, + ConfigType: remoting.EventTypeUpdate}) + + return nil } func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { @@ -69,7 +82,8 @@ func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { } content, ok := event.Value.(string) if !ok { - logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) + msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value) + logger.Error(msg) return } diff --git a/cluster/router/condition/condition_router.go b/cluster/router/condition/router.go similarity index 100% rename from cluster/router/condition/condition_router.go rename to cluster/router/condition/router.go diff --git a/cluster/router/condition/condition_router_parser.go b/cluster/router/condition/router_parser.go similarity index 96% rename from cluster/router/condition/condition_router_parser.go rename to cluster/router/condition/router_parser.go index 9c5354eb1f9975801e222d8dcdfe5bae05ff3397..fb877ae96228ca0693dc35be92eb4f70ae06604a 100644 --- a/cluster/router/condition/condition_router_parser.go +++ b/cluster/router/condition/router_parser.go @@ -38,8 +38,8 @@ func Parse(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule - if len(r.Conditions) == 0 { - r.Valid = false + if len(r.Conditions) != 0 { + r.Valid = true } return r, nil diff --git a/cluster/router/condition/condition_router_parser_test.go b/cluster/router/condition/router_parser_test.go similarity index 56% rename from cluster/router/condition/condition_router_parser_test.go rename to cluster/router/condition/router_parser_test.go index 890063c043b8b108c81a9b385a6241a2c9fc17cd..042bf3af9bac86d840a92879df631e85124d8174 100644 --- a/cluster/router/condition/condition_router_parser_test.go +++ b/cluster/router/condition/router_parser_test.go @@ -24,4 +24,12 @@ conditions: assert.Nil(t, e) assert.NotNil(t, rule) assert.Equal(t, 2, len(rule.Conditions)) + assert.Equal(t, "application", rule.Scope) + assert.True(t, rule.Runtime) + assert.Equal(t, false, rule.Force) + assert.Equal(t, testyml, rule.RawRule) + assert.True(t, true, rule.Valid) + assert.Equal(t, false, rule.Enabled) + assert.Equal(t, false, rule.Dynamic) + assert.Equal(t, "", rule.Key) } diff --git a/cluster/router/condition/router_rule.go b/cluster/router/condition/router_rule.go new file mode 100644 index 0000000000000000000000000000000000000000..37a01f6ca0826f3979b2538b81b6fe3dc5a068e9 --- /dev/null +++ b/cluster/router/condition/router_rule.go @@ -0,0 +1,18 @@ +package condition + +type BaseRouterRule struct { + RawRule string + Runtime bool + Force bool + Valid bool + Enabled bool + Priority int + Dynamic bool + Scope string + Key string +} + +type RouterRule struct { + BaseRouterRule `yaml:",inline""` + Conditions []string +} diff --git a/cluster/router.go b/cluster/router/router.go similarity index 81% rename from cluster/router.go rename to cluster/router/router.go index 914f6faf6503d8c956bfa1a1535c29adf954753b..cfebdb52dd270645f658b70f4d4c64b2edbd91c0 100644 --- a/cluster/router.go +++ b/cluster/router/router.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster +package router import ( "github.com/apache/dubbo-go/common" @@ -23,13 +23,22 @@ import ( ) // Extension - Router - type RouterFactory interface { Router(*common.URL) (Router, error) } +// Extension - Router Chain +type RouterChainFactory interface { + Router(*common.URL) (RouterChain, error) +} + type Router interface { Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker Priority() int64 Url() common.URL } + +type RouterChain interface { + Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker + AddRouters([]Router) +} diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index 6f27aafaebf87147116e74272cc229657f436201..574cbba8277ae224c5ae2aea62d4a0ebb3c038b2 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -18,21 +18,36 @@ package extension import ( - "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/cluster/router" ) var ( - routers = make(map[string]func() cluster.RouterFactory) + routers = make(map[string]func() router.RouterFactory) + routerChains = make(map[string]func() router.RouterChainFactory) ) -func SetRouterFactory(name string, fun func() cluster.RouterFactory) { +func SetRouterFactory(name string, fun func() router.RouterFactory) { routers[name] = fun } -func GetRouterFactory(name string) cluster.RouterFactory { +func GetRouterFactory(name string) router.RouterFactory { if routers[name] == nil { panic("router_factory for " + name + " is not existing, make sure you have import the package.") } return routers[name]() +} + +func SetRouterChainsFactory(name string, fun func() router.RouterChainFactory) { + routerChains[name] = fun +} + +func GetRouterChainsFactory(name string) router.RouterChainFactory { + if routers[name] == nil { + panic("router_chain_factory for " + name + " is not existing, make sure you have import the package.") + } + return routerChains[name]() +} +func GetRouters() map[string]func() router.RouterFactory { + return routers } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 349c0d440c7f3a5ba9aa7586afad9a119ee95cff..807bb471bdb76bf9af743af64d9dadb80c1f622c 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -18,6 +18,7 @@ package directory import ( + "github.com/apache/dubbo-go/cluster/router" "reflect" "strings" "sync" @@ -30,7 +31,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -163,7 +163,7 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { dir.cacheInvokers = newInvokers } -func toRouters(urls []*common.URL) []cluster.Router { +func toRouters(urls []*common.URL) []router.Router { if len(urls) == 0 { return nil } @@ -186,9 +186,9 @@ func toRouters(urls []*common.URL) []cluster.Router { routerMap.Add(router) } - routers := make([]cluster.Router, 0) + routers := make([]router.Router, 0) for _, v := range routerMap.Values() { - routers = append(routers, v.(cluster.Router)) + routers = append(routers, v.(router.Router)) } return routers