From 7b07c345d1d983a8a8dc23e7a0485a2a5a7597ad Mon Sep 17 00:00:00 2001 From: aliiohs <rzy1107@163.com> Date: Fri, 2 Aug 2019 20:10:14 +0800 Subject: [PATCH] use router in dubbo-go --- .../cluster_impl/failover_cluster_invoker.go | 4 +- .../registry_aware_cluster_invoker.go | 2 +- cluster/directory.go | 2 +- cluster/directory/base_directory.go | 22 ++++++--- cluster/directory/static_directory.go | 21 +++++++-- cluster/router.go | 10 +--- cluster/router/condition_router.go | 16 +++++-- cluster/router_chain.go | 46 +++++++++++++++++++ common/constant/key.go | 1 + common/extension/router_factory.go | 9 ++++ common/url.go | 4 +- registry/directory/directory.go | 31 +++++++++++-- registry/directory/directory_test.go | 3 +- 13 files changed, 140 insertions(+), 31 deletions(-) create mode 100644 cluster/router_chain.go diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index 3dd8c41df..50519f5d3 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -40,7 +40,7 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) + invokers, _ := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { @@ -70,7 +70,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr if err != nil { return &protocol.RPCResult{Err: err} } - invokers = invoker.directory.List(invocation) + invokers, _ = invoker.directory.List(invocation) err = invoker.checkInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{Err: err} diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go index 5785c0248..028918568 100644 --- a/cluster/cluster_impl/registry_aware_cluster_invoker.go +++ b/cluster/cluster_impl/registry_aware_cluster_invoker.go @@ -34,7 +34,7 @@ func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoke } func (invoker *registryAwareClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) + invokers, _ := invoker.directory.List(invocation) //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. for _, invoker := range invokers { if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { diff --git a/cluster/directory.go b/cluster/directory.go index 045296ce5..6eb055667 100644 --- a/cluster/directory.go +++ b/cluster/directory.go @@ -25,5 +25,5 @@ import ( // Extension - Directory type Directory interface { common.Node - List(invocation protocol.Invocation) []protocol.Invoker + List(invocation protocol.Invocation) ([]protocol.Invoker, error) } diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index b20119771..8237495f5 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -18,19 +18,20 @@ package directory import ( - "sync" -) -import ( + "github.com/apache/dubbo-go/cluster" "go.uber.org/atomic" + "sync" ) import ( "github.com/apache/dubbo-go/common" ) type BaseDirectory struct { - url *common.URL - destroyed *atomic.Bool - mutex sync.Mutex + url *common.URL + ConsumerUrl *common.URL + destroyed *atomic.Bool + routers []cluster.Router + mutex sync.Mutex } func NewBaseDirectory(url *common.URL) BaseDirectory { @@ -39,9 +40,18 @@ func NewBaseDirectory(url *common.URL) BaseDirectory { destroyed: atomic.NewBool(false), } } +func (dir *BaseDirectory) Destroyed() bool { + return dir.destroyed.Load() +} func (dir *BaseDirectory) GetUrl() common.URL { return *dir.url } +func (dir *BaseDirectory) SetRouters(routers []cluster.Router) { + dir.routers = routers +} +func (dir *BaseDirectory) Routers() []cluster.Router { + return dir.routers +} func (dir *BaseDirectory) Destroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index e7a0e6e56..ec299a9b6 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -18,8 +18,11 @@ package directory import ( + "fmt" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" + "reflect" ) type staticDirectory struct { @@ -52,9 +55,21 @@ func (dir *staticDirectory) IsAvailable() bool { return true } -func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker { - //TODO:Here should add router - return dir.invokers +func (dir *staticDirectory) List(invocation protocol.Invocation) ([]protocol.Invoker, error) { + if dir.destroyed.Load() { + //todo err full + return nil, fmt.Errorf("directory already destroyed .url: %s", dir.GetUrl().String()) + } + invokers := dir.invokers + localRouters := dir.routers + if len(localRouters) > 0 { + for _, router := range localRouters { + if reflect.ValueOf(router.Url()).IsNil() || router.Url().GetParamBool(constant.RUNTIME_KEY, false) { + invokers = router.Route(invokers, *dir.ConsumerUrl, invocation) + } + } + } + return invokers, nil } func (dir *staticDirectory) Destroy() { diff --git a/cluster/router.go b/cluster/router.go index 54a196955..5d958ac4b 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -30,12 +30,6 @@ type RouterFactory interface { type Router interface { Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker -} - -type RouterChain struct { - routers []Router -} - -func NewRouterChain(url common.URL) { - + Priority() int64 + Url() common.URL } diff --git a/cluster/router/condition_router.go b/cluster/router/condition_router.go index b9632e291..0d358d511 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition_router.go @@ -45,8 +45,8 @@ const ( //ConditionRouter condition router struct type ConditionRouter struct { Pattern string - Url *common.URL - Priority int64 + url *common.URL + priority int64 Force bool WhenCondition map[string]MatchPair ThenCondition map[string]MatchPair @@ -104,6 +104,14 @@ func newConditionRouter(url *common.URL) (*ConditionRouter, error) { }, nil } +func (c ConditionRouter) Priority() int64 { + return c.priority +} + +func (c ConditionRouter) Url() common.URL { + return *c.url +} + //Router determine the target server list. func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) []protocol.Invoker { if len(invokers) == 0 { @@ -116,7 +124,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv for _, invo := range invokers { urls = append(urls, reflect.TypeOf(invo).String()) } - logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err) + logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.url.String(), strings.Join(urls, ","), err) return invokers } if !isMatchWhen { @@ -134,7 +142,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv for _, invo := range invokers { urls = append(urls, reflect.TypeOf(invo).String()) } - logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err) + logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.url.String(), strings.Join(urls, ","), err) return invokers } if isMatchThen { diff --git a/cluster/router_chain.go b/cluster/router_chain.go new file mode 100644 index 000000000..8d7cab21a --- /dev/null +++ b/cluster/router_chain.go @@ -0,0 +1,46 @@ +package cluster + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "sort" +) + +type RouterChain struct { + routers []Router + builtinRouters []Router + Invokers []protocol.Invoker +} + +func NewRouterChain(url common.URL) *RouterChain { + var builtinRouters []Router + factories := extension.GetRouterFactorys() + for _, factory := range factories { + router, _ := factory.Router(&url) + builtinRouters = append(builtinRouters, router) + } + var routers []Router + copy(routers, builtinRouters) + sort.Slice(routers, func(i, j int) bool { + return routers[i].Priority() < routers[j].Priority() + }) + return &RouterChain{ + builtinRouters: routers, + routers: routers, + } +} + +func (r RouterChain) AddRouters(routers []Router) { + r.routers = append(r.routers, routers...) + sort.Slice(r.routers, func(i, j int) bool { + return routers[i].Priority() < routers[j].Priority() + }) +} + +func (r RouterChain) SetInvokers(invokers []protocol.Invoker) { + r.Invokers = invokers + /*for _, _ := range r.routers { + //router.Notify(r.Invokers) + }*/ +} diff --git a/common/constant/key.go b/common/constant/key.go index 3441b341d..af7700af2 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -73,6 +73,7 @@ const ( METHOD_KEY = "method" METHOD_KEYS = "methods" RULE_KEY = "rule" + RUNTIME_KEY = "runtime" ) const ( diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index f364292b0..9a21d35e3 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -19,3 +19,12 @@ func GetRouterFactory(name string) cluster.RouterFactory { return routers[name]() } +func GetRouterFactorys() []cluster.RouterFactory { + var factorys []cluster.RouterFactory + + for _, value := range routers { + factorys = append(factorys, value()) + } + return factorys + +} diff --git a/common/url.go b/common/url.go index c594df235..2c82082b9 100644 --- a/common/url.go +++ b/common/url.go @@ -196,7 +196,7 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error) if strings.Contains(s.Location, ":") { s.Ip, s.Port, err = net.SplitHostPort(s.Location) if err != nil { - return s, perrors.Errorf("net.SplitHostPort(Url.Host{%s}), error{%v}", s.Location, err) + return s, perrors.Errorf("net.SplitHostPort(url.Host{%s}), error{%v}", s.Location, err) } } for _, opt := range opts { @@ -275,7 +275,7 @@ func (c URL) Service() string { return service } else if c.SubURL != nil { service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) - if service != "" { //if url.path is "" then return suburl's path, special for registry Url + if service != "" { //if url.path is "" then return suburl's path, special for registry url return service } } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 11687f82e..fd4f6bec5 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -18,12 +18,17 @@ package directory import ( + "fmt" + "github.com/apache/dubbo-go/common/utils" + "github.com/apache/dubbo-go/version" + "reflect" "sync" "time" ) import ( perrors "github.com/pkg/errors" + "go.uber.org/atomic" ) import ( @@ -53,6 +58,8 @@ type registryDirectory struct { cacheInvokers []protocol.Invoker listenerLock sync.Mutex serviceType string + serviceKey string + forbidden atomic.Bool registry registry.Registry cacheInvokersMap *sync.Map //use sync.map //cacheInvokersMap map[string]protocol.Invoker @@ -207,9 +214,27 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) { } //select the protocol invokers from the directory -func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { - //TODO:router - return dir.cacheInvokers +func (dir *registryDirectory) List(invocation protocol.Invocation) ([]protocol.Invoker, error) { + if dir.Destroyed() { + return nil, fmt.Errorf("directory already destroyed .url: %s", dir.GetUrl().String()) + } + if dir.forbidden.Load() { + //todo error + localIP, _ := utils.GetLocalIP() + return nil, fmt.Errorf("no provider available from registry %s for service %s on consumer %s use dubbo version %s, please check status of providers(disabled, not registered or in blacklist)", dir.GetUrl().Location, dir.ConsumerUrl.ServiceKey(), localIP, version.Version) + } + + invokers := dir.cacheInvokers + localRouters := dir.Routers() + if len(localRouters) > 0 { + for _, router := range localRouters { + if reflect.ValueOf(router.Url()).IsNil() || router.Url().GetParamBool(constant.RUNTIME_KEY, false) { + invokers = router.Route(invokers, *dir.ConsumerUrl, invocation) + } + } + } + return invokers, nil + } func (dir *registryDirectory) IsAvailable() bool { diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index f31165d0a..5f931ad0c 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -113,7 +113,8 @@ func Test_List(t *testing.T) { registryDirectory, _ := normalRegistryDir() time.Sleep(1e9) - assert.Len(t, registryDirectory.List(&invocation.RPCInvocation{}), 3) + invokers, _ := registryDirectory.List(&invocation.RPCInvocation{}) + assert.Len(t, invokers, 3) assert.Equal(t, true, registryDirectory.IsAvailable()) } -- GitLab