From 163e9a730dafe97d54859a519700cf1232245d8d Mon Sep 17 00:00:00 2001 From: Joe Zou <yixian.zou@gmail.com> Date: Thu, 9 Jan 2020 11:25:41 +0800 Subject: [PATCH] refactor router code --- cluster/directory/base_directory.go | 13 +++--- cluster/router/router_chain/router_chain.go | 49 +++++++++++++++++++++ cluster/router_chain.go | 48 -------------------- common/extension/router_factory.go | 2 +- registry/protocol/protocol.go | 7 +++ 5 files changed, 65 insertions(+), 54 deletions(-) create mode 100644 cluster/router/router_chain/router_chain.go delete mode 100644 cluster/router_chain.go diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index ac7b04fa3..98b392221 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -19,6 +19,8 @@ package directory import ( "sync" + + "github.com/apache/dubbo-go/cluster/router/router_chain" ) import ( "go.uber.org/atomic" @@ -34,11 +36,12 @@ import ( var RouterUrlSet = gxset.NewSet() type BaseDirectory struct { - url *common.URL - destroyed *atomic.Bool - routers []cluster.Router - mutex sync.Mutex - once sync.Once + url *common.URL + destroyed *atomic.Bool + routers []cluster.Router + mutex sync.Mutex + once sync.Once + routerChain *router_chain.RouterChain } func NewBaseDirectory(url *common.URL) BaseDirectory { diff --git a/cluster/router/router_chain/router_chain.go b/cluster/router/router_chain/router_chain.go new file mode 100644 index 000000000..dab644db6 --- /dev/null +++ b/cluster/router/router_chain/router_chain.go @@ -0,0 +1,49 @@ +package router_chain + +import ( + "sort" +) +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type RouterChain struct { + routers []cluster.Router + builtinRouters []cluster.Router + Invokers []protocol.Invoker +} + +func NewRouterChain(url *common.URL) *RouterChain { + var builtinRouters []cluster.Router + factories := extension.GetRouterFactories() + for _, factory := range factories { + router, _ := factory.Router(url) + builtinRouters = append(builtinRouters, router) + } + var routers []cluster.Router + copy(routers, builtinRouters) + sort.SliceStable(routers, func(i, j int) bool { + return routers[i].Priority() < routers[j].Priority() + }) + return &RouterChain{ + builtinRouters: builtinRouters, + routers: routers, + } +} + +func (r RouterChain) AddRouters(routers []cluster.Router) { + r.routers = append(r.routers, routers...) + sort.SliceStable(r.routers, func(i, j int) bool { + return routers[i].Priority() < routers[j].Priority() + }) +} + +func (r RouterChain) SetInvokers(invokers []protocol.Invoker) { + r.Invokers = invokers + /*for _, _ := range r.routers { + //router.Notify(r.Invokers) + }*/ +} diff --git a/cluster/router_chain.go b/cluster/router_chain.go deleted file mode 100644 index bd3cf5f45..000000000 --- a/cluster/router_chain.go +++ /dev/null @@ -1,48 +0,0 @@ -package cluster - -import ( - "sort" -) -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/protocol" -) - -type RouterChain struct { - routers []Router - builtinRouters []Router - Invokers []protocol.Invoker -} - -func NewRouterChain(url common.URL) *RouterChain { - //var builtinRouters []Router - //factories := extension.GetRouterFactories() - //for _, factory := range factories { - // router, _ := factory.Router(&url) - // builtinRouters = append(builtinRouters, router) - //} - //var routers []Router - //copy(routers, builtinRouters) - //sort.Slice(routers, func(i, j int) bool { - // return routers[i].Priority() < routers[j].Priority() - //}) - //return &RouterChain{ - // builtinRouters: routers, - // routers: routers, - //} - return nil -} - -func (r RouterChain) AddRouters(routers []Router) { - r.routers = append(r.routers, routers...) - sort.SliceStable(r.routers, func(i, j int) bool { - return routers[i].Priority() < routers[j].Priority() - }) -} - -func (r RouterChain) SetInvokers(invokers []protocol.Invoker) { - r.Invokers = invokers - /*for _, _ := range r.routers { - //router.Notify(r.Invokers) - }*/ -} diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index c07ada223..d88c26ff8 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -36,7 +36,7 @@ func GetRouterFactory(name string) cluster.RouterFactory { return routers[name]() } -func GetRouterFactorys() []cluster.RouterFactory { +func GetRouterFactories() []cluster.RouterFactory { var factorys []cluster.RouterFactory for _, value := range routers { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 534a4b945..0752f5c3c 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -20,6 +20,9 @@ package protocol import ( "strings" "sync" + + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/cluster/router/router_chain" ) import ( @@ -121,6 +124,10 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } go directory.Subscribe(serviceUrl) + chain := router_chain.NewRouterChain(serviceUrl) + if chain != nil { + //directory.SetRouters(chain.) + } //new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) -- GitLab