diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index 1ca7d479d14860947427cd9d1a7c53ae7620beab..3f8813631c1e6ae183a6fd435afbc9695332d9b6 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -18,8 +18,6 @@ package chain import ( - "github.com/RoaringBitmap/roaring" - "github.com/apache/dubbo-go/common/constant" "math" "sort" "sync" @@ -34,6 +32,7 @@ import ( import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" @@ -56,44 +55,38 @@ type RouterChain struct { builtinRouters []router.PriorityRouter mutex sync.RWMutex - url common.URL + url common.URL + + // The times of address notification since last update for address cache count int64 - last time.Time - ch chan struct{} + // The timestamp of last update for address cache + last time.Time + // Channel for notify to update the address cache + ch chan struct{} + // Address cache cache atomic.Value } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - rs := c.copyRouters() - - v := c.cache.Load() - if v == nil { + cache := c.loadCache() + if cache == nil { return c.invokers } - cache := v.(*router.AddrCache) - bitmap := cache.Bitmap - for _, r := range rs { + bitmap := cache.bitmap + for _, r := range c.copyRouters() { bitmap = r.Route(bitmap, cache, url, invocation) } indexes := bitmap.ToArray() finalInvokers := make([]protocol.Invoker, len(indexes)) for i, index := range indexes { - finalInvokers[i] = cache.Invokers[index] + finalInvokers[i] = cache.invokers[index] } - return finalInvokers -} -func (c *RouterChain) copyRouters() []router.PriorityRouter { - l := len(c.routers) - rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2))) - c.mutex.RLock() - copy(rs, c.routers) - c.mutex.RUnlock() - return rs + return finalInvokers } // AddRouters Add routers to router chain @@ -110,6 +103,8 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) { c.routers = newRouters } +// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and +// time interval exceeds timeThreshold since last cache update, then notify to update the cache. func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) { c.invokers = invokers @@ -124,6 +119,8 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) { } } +// loop listens on events to update the address cache when it's necessary, either when it receives notification +// from address update, or when timeInterval exceeds. func (c *RouterChain) loop() { for { select { @@ -138,6 +135,27 @@ func (c *RouterChain) loop() { } } +// copyRouters make a snapshot copy from RouterChain's router list. +func (c *RouterChain) copyRouters() []router.PriorityRouter { + l := len(c.routers) + rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2))) + c.mutex.RLock() + copy(rs, c.routers) + c.mutex.RUnlock() + return rs +} + +// loadCache loads cache from sync.Value to guarantee the visibility +func (c *RouterChain) loadCache() *InvokerCache { + v := c.cache.Load() + if v == nil { + return nil + } + + return v.(*InvokerCache) +} + +// buildCache builds address cache with the new invokers for all poolable routers. func (c *RouterChain) buildCache() { if c.invokers == nil || len(c.invokers) == 0 { return @@ -146,18 +164,8 @@ func (c *RouterChain) buildCache() { // FIXME: should lock here, it is fine with dirty read if no panic happens I believe. invokers := make([]protocol.Invoker, len(c.invokers)) copy(invokers, c.invokers) - cache := &router.AddrCache{ - Invokers: invokers, - Bitmap: ToBitmap(invokers), - AddrPool: make(map[string]router.RouterAddrPool), - AddrMeta: make(map[string]router.AddrMetadata), - } - - var origin *router.AddrCache - v := c.cache.Load() - if v != nil { - origin = v.(*router.AddrCache) - } + cache := BuildCache(invokers) + origin := c.loadCache() var mutex sync.Mutex var wg sync.WaitGroup @@ -168,8 +176,8 @@ func (c *RouterChain) buildCache() { go func(p router.Poolable) { pool, info := poolRouter(p, origin, invokers) mutex.Lock() - cache.AddrPool[p.Name()] = pool - cache.AddrMeta[p.Name()] = info + cache.pools[p.Name()] = pool + cache.metadatas[p.Name()] = info mutex.Unlock() wg.Done() }(p) @@ -221,25 +229,32 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { return chain, nil } -func poolRouter(p router.Poolable, origin *router.AddrCache, invokers []protocol.Invoker) (router.RouterAddrPool, router.AddrMetadata) { +// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary. +// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its +// rule doesn't change), and the address list doesn't change, then the existing data will be re-used. +func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { name := p.Name() - if isCacheMiss(origin, name) || p.ShouldRePool() || IsDiff(origin.Invokers, invokers) { + if isCacheMiss(origin, name) || p.ShouldPool() || isInvokersChanged(origin.invokers, invokers) { logger.Debugf("build address cache for router %q", name) return p.Pool(invokers) } else { logger.Debugf("reuse existing address cache for router %q", name) - return origin.AddrPool[name], origin.AddrMeta[name] + return origin.pools[name], origin.metadatas[name] } } -func isCacheMiss(cache *router.AddrCache, key string) bool { - if cache == nil || cache.AddrPool == nil || cache.Invokers == nil || cache.AddrPool[key] == nil { +// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed. +// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry +// doesn't exist. +func isCacheMiss(cache *InvokerCache, key string) bool { + if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil { return true } return false } -func IsDiff(left []protocol.Invoker, right []protocol.Invoker) bool { +// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left. +func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool { if len(right) != len(left) { return true } @@ -247,7 +262,7 @@ func IsDiff(left []protocol.Invoker, right []protocol.Invoker) bool { for _, r := range right { found := false for _, l := range left { - if IsEquals(l.GetUrl(), r.GetUrl()) { + if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) { found = true break } @@ -259,38 +274,6 @@ func IsDiff(left []protocol.Invoker, right []protocol.Invoker) bool { return false } -func IsEquals(left common.URL, right common.URL) bool { - if left.Ip != right.Ip || left.Port != right.Port { - return false - } - - leftMap := left.ToMap() - delete(leftMap, constant.TIMESTAMP_KEY) - delete(leftMap, constant.REMOTE_TIMESTAMP_KEY) - rightMap := right.ToMap() - delete(rightMap, constant.TIMESTAMP_KEY) - delete(rightMap, constant.REMOTE_TIMESTAMP_KEY) - - if len(leftMap) != len(rightMap) { - return false - } - - for lk, lv := range leftMap { - if rv, ok := rightMap[lk]; !ok { - return false - } else if lv != rv { - return false - } - } - return true -} - -func ToBitmap(invokers []protocol.Invoker) *roaring.Bitmap { - bitmap := roaring.NewBitmap() - bitmap.AddRange(0, uint64(len(invokers))) - return bitmap -} - // sortRouter Sort router instance by priority with stable algorithm func sortRouter(routers []router.PriorityRouter) { sort.Stable(byPriority(routers)) diff --git a/cluster/router/chain/invoker_cache.go b/cluster/router/chain/invoker_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..9f96bddd4224645fc45f5919879f380076deb2f1 --- /dev/null +++ b/cluster/router/chain/invoker_cache.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain + +import ( + "github.com/RoaringBitmap/roaring" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/protocol" +) + +// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received +// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if +// the router implements Poolable. +type InvokerCache struct { + // The snapshot of invokers + invokers []protocol.Invoker + + // The bitmap representation for invokers snapshot + bitmap *roaring.Bitmap + + // Address pool from routers which implement Poolable + pools map[string]router.AddrPool + + // Address metadata from routers which implement Poolable + metadatas map[string]router.AddrMetadata +} + +// BuildCache builds address cache from the given invokers. +func BuildCache(invokers []protocol.Invoker) *InvokerCache { + return &InvokerCache{ + invokers: invokers, + bitmap: utils.ToBitmap(invokers), + pools: make(map[string]router.AddrPool), + metadatas: make(map[string]router.AddrMetadata), + } +} + +// GetInvokers get invokers snapshot. +func (c *InvokerCache) GetInvokers() []protocol.Invoker { + return c.invokers +} + +// FindAddrPool finds address pool for a poolable router. +func (c *InvokerCache) FindAddrPool(p router.Poolable) router.AddrPool { + return c.pools[p.Name()] +} + +// FindAddrMeta finds address metadata for a poolable router. +func (c *InvokerCache) FindAddrMeta(p router.Poolable) router.AddrMetadata { + return c.metadatas[p.Name()] +} + +// SetAddrPool sets address pool for a poolable router, for unit test only +func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) { + c.pools[name] = pool +} + +// SetAddrMeta sets address metadata for a poolable router, for unit test only +func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) { + c.metadatas[name] = meta +} diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 91aeb8d886bcbc873a73087f7fe0fbc0e0fe0267..b31f6c7a82ea102c6e0b80d68e43f7b359785544 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -33,6 +33,7 @@ import ( import ( "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -380,9 +381,6 @@ func TestNewAppRouterFactory(t *testing.T) { assert.NotNil(t, factory) } -func setUpAddrCache(addrs []protocol.Invoker) *router.AddrCache { - cache := &router.AddrCache{ - Invokers: addrs, - } - return cache +func setUpAddrCache(addrs []protocol.Invoker) router.Cache { + return chain.BuildCache(addrs) } diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 86f128bf4d8088b62a8e51f21bfd3ad4c6ba3289..fd8068e827b2711f863a4ed08fedff078cf83630 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -66,14 +66,14 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { l.priority = listenableRouterDefaultPriority routerKey := ruleKey + constant.ConditionRouterRuleSuffix - //add listener + // add listener dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration() if dynamicConfiguration == nil { return nil, perrors.Errorf("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please") } dynamicConfiguration.AddListener(routerKey, l) - //get rule + // get rule rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP)) if len(rule) == 0 || err != nil { return nil, perrors.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err) @@ -131,11 +131,11 @@ func (l *listenableRouter) generateConditions(rule *RouterRule) { } // Route Determine the target invokers list. -func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { +func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { if invokers.IsEmpty() || len(l.conditionRouters) == 0 { return invokers } - //We will check enabled status inside each router. + // We will check enabled status inside each router. for _, r := range l.conditionRouters { invokers = r.Route(invokers, cache, url, invocation) } diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go index cd544477a121ca17a1bb637331fa69fd5501fff7..9c74757025c204153ece1f3c2f6cb81a872bbada 100644 --- a/cluster/router/condition/router.go +++ b/cluster/router/condition/router.go @@ -24,21 +24,22 @@ import ( import ( "github.com/RoaringBitmap/roaring" + "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" - "github.com/dubbogo/gost/container/set" - "github.com/dubbogo/gost/net" ) const ( - //pattern route pattern regex + // pattern route pattern regex pattern = `([&!=,]*)\\s*([^&!=,\\s]+)` ) @@ -144,7 +145,7 @@ func (c *ConditionRouter) Enabled() bool { } // Route Determine the target invokers list. -func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { +func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { if !c.Enabled() { return invokers } @@ -159,13 +160,14 @@ func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCach } if len(c.ThenCondition) == 0 { - return router.EmptyAddr + return utils.EmptyAddr } result := roaring.NewBitmap() for iter := invokers.Iterator(); iter.HasNext(); { index := iter.Next() - invokerUrl := cache.Invokers[index].GetUrl() + invoker := cache.GetInvokers()[index] + invokerUrl := invoker.GetUrl() isMatchThen := c.MatchThen(&invokerUrl, url) if isMatchThen { result.Add(index) @@ -323,7 +325,7 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { return true } if !pair.Mismatches.Empty() && !pair.Matches.Empty() { - //when both mismatches and matches contain the same value, then using mismatches first + // when both mismatches and matches contain the same value, then using mismatches first for mismatch := range pair.Mismatches.Items { if isMatchGlobalPattern(mismatch.(string), value, param) { return false diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 1199ff07ed49de8fdee9234da0c39037f590a7ee..708c45c6d860ac53495f739c86546ee5fb58652f 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -19,6 +19,9 @@ package healthcheck import ( "github.com/RoaringBitmap/roaring" +) + +import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" @@ -31,6 +34,7 @@ import ( const ( HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" healthy = "healthy" + name = "health-check-router" ) // HealthCheckRouter provides a health-first routing mechanism through HealthChecker @@ -54,7 +58,7 @@ func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) { } // Route gets a list of healthy invoker -func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { +func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { if !r.enabled { return invokers } @@ -70,8 +74,9 @@ func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCa return healthyInvokers } -func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.RouterAddrPool, router.AddrMetadata) { - rb := make(router.RouterAddrPool) +// Pool separates healthy invokers from others. +func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { + rb := make(router.AddrPool) rb[healthy] = roaring.NewBitmap() for i, invoker := range invokers { if r.checker.IsHealthy(invoker) { @@ -82,12 +87,13 @@ func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.RouterAddr return rb, nil } -func (r *HealthCheckRouter) ShouldRePool() bool { +// ShouldPool will always return true to make sure healthy check constantly. +func (r *HealthCheckRouter) ShouldPool() bool { return true } func (r *HealthCheckRouter) Name() string { - return "health-check-router" + return name } // Priority diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index ddd68ec403b473d1dab57d7176b6cdc233545754..c321b56156b9ae272048f32ef50eb419add27b03 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -19,8 +19,6 @@ package healthcheck import ( "fmt" - "github.com/apache/dubbo-go/cluster/router" - "github.com/apache/dubbo-go/cluster/router/utils" "math" "testing" "time" @@ -31,6 +29,9 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" + "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" @@ -137,15 +138,10 @@ func TestNewHealthCheckRouter(t *testing.T) { assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500)) } -func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) *router.AddrCache { +func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { pool, info := r.Pool(addrs) - cache := &router.AddrCache{ - Invokers: addrs, - AddrPool: make(map[string]router.RouterAddrPool), - AddrMeta: make(map[string]router.AddrMetadata), - } - - cache.AddrMeta[r.Name()] = info - cache.AddrPool[r.Name()] = pool + cache := chain.BuildCache(addrs) + cache.SetAddrMeta(r.Name(), info) + cache.SetAddrPool(r.Name(), pool) return cache } diff --git a/cluster/router/router.go b/cluster/router/router.go index 998b7d8b02afb72b61439635d6874f6893dc62ab..ddca42a01d5d5b56a33b9145926c77a843ea9787 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -19,6 +19,9 @@ package router import ( "github.com/RoaringBitmap/roaring" +) + +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" ) @@ -39,7 +42,7 @@ type FilePriorityRouterFactory interface { // Router type router interface { // Route Determine the target invokers list. - Route(*roaring.Bitmap, *AddrCache, *common.URL, protocol.Invocation) *roaring.Bitmap + Route(*roaring.Bitmap, Cache, *common.URL, protocol.Invocation) *roaring.Bitmap // URL Return URL in router URL() common.URL @@ -53,33 +56,38 @@ type PriorityRouter interface { Priority() int64 } +// Poolable caches address pool and address metadata for a router instance which will be used later in Router's Route. type Poolable interface { - Pool([]protocol.Invoker) (RouterAddrPool, AddrMetadata) - ShouldRePool() bool + // Pool created address pool and address metadata from the invokers. + Pool([]protocol.Invoker) (AddrPool, AddrMetadata) + + // ShouldPool returns if it should pool. One typical scenario is a router rule changes, in this case, a pooling + // is necessary, even if the addresses not changed at all. + ShouldPool() bool + + // Name return the Poolable's name. Name() string } +// AddrPool is an address pool, backed by a snapshot of address list, divided into categories. +type AddrPool map[string]*roaring.Bitmap + +// AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable. type AddrMetadata interface { + // Source indicates where the metadata comes from. Source() string } -type RouterAddrPool map[string]*roaring.Bitmap +// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received +// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if +// the router implements Poolable. +type Cache interface { + // GetInvokers returns the snapshot of received invokers. + GetInvokers() []protocol.Invoker -// AddrCache caches all addresses relevant info for a snapshot of received invokers, the calculation logic is -// different from router to router. -type AddrCache struct { - Invokers []protocol.Invoker // invokers snapshot - Bitmap *roaring.Bitmap // bitmap for invokers - AddrPool map[string]RouterAddrPool // address pool from the invokers for one particular router - AddrMeta map[string]AddrMetadata // address meta info collected from the invokers for one particular router -} + // FindAddrPool returns address pool associated with the given Poolable instance. + FindAddrPool(Poolable) AddrPool -func (c *AddrCache) FindAddrPool(p Poolable) RouterAddrPool { - return c.AddrPool[p.Name()] + // FindAddrMeta returns address metadata associated with the given Poolable instance. + FindAddrMeta(Poolable) AddrMetadata } - -func (c *AddrCache) FindAddrMeta(p Poolable) AddrMetadata { - return c.AddrMeta[p.Name()] -} - -var EmptyAddr = roaring.NewBitmap() diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 4f0f8517bd98631f1a8f9886d847d84c47e10beb..5754399e5f4d6063d0c64fde2300379adefc267c 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -18,18 +18,18 @@ package tag import ( - "github.com/RoaringBitmap/roaring" - "github.com/apache/dubbo-go/cluster/router" "net/url" "strconv" "sync" ) import ( + "github.com/RoaringBitmap/roaring" perrors "github.com/pkg/errors" ) import ( + "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" @@ -76,7 +76,7 @@ func (f *FileTagRouter) Priority() int64 { return f.router.priority } -func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { +func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { if invokers.IsEmpty() { return invokers } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 6b359b869dfee8ae624574a26933f6e397309c1e..79e3eeccb5a8b2ce694bafb97fb0c2ae43e9720b 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -18,22 +18,26 @@ package tag import ( - "github.com/RoaringBitmap/roaring" - "github.com/apache/dubbo-go/cluster/router" - "github.com/apache/dubbo-go/cluster/router/utils" "strconv" ) import ( + "github.com/RoaringBitmap/roaring" perrors "github.com/pkg/errors" ) import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) +const ( + name = "tag-router" +) + type tagRouter struct { url *common.URL enabled bool @@ -55,7 +59,7 @@ func (c *tagRouter) isEnabled() bool { return c.enabled } -func (c *tagRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { +func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { if !c.isEnabled() || invokers.IsEmpty() { return invokers } @@ -65,7 +69,7 @@ func (c *tagRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url return invokers } - ret := router.EmptyAddr + ret := utils.EmptyAddr if target, ok := cache.FindAddrPool(c)[tag]; ok { ret = utils.JoinIfNotEqual(target, invokers) } @@ -85,8 +89,9 @@ func (c *tagRouter) Priority() int64 { return c.priority } -func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.RouterAddrPool, router.AddrMetadata) { - rb := make(router.RouterAddrPool) +// Pool divided invokers into different address pool by tag. +func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { + rb := make(router.AddrPool) for i, invoker := range invokers { url := invoker.GetUrl() tag := url.GetParam(constant.Tagkey, "") @@ -100,12 +105,13 @@ func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.RouterAddrPool, ro return rb, nil } -func (c *tagRouter) ShouldRePool() bool { +// ShouldPool returns false, to make sure address cache for tag router happens once and only once. +func (c *tagRouter) ShouldPool() bool { return false } func (c *tagRouter) Name() string { - return "tag-router" + return name } func findStaticTag(invocation protocol.Invocation) string { diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 465f5504926d3042609c513dbefa24837d666382..76553365fddbe99a1c92fd9d3e4020f282402962 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -19,8 +19,6 @@ package tag import ( "context" - "github.com/apache/dubbo-go/cluster/router" - "github.com/apache/dubbo-go/cluster/router/utils" "testing" ) @@ -29,6 +27,9 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" + "github.com/apache/dubbo-go/cluster/router/utils" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -163,15 +164,10 @@ func TestTagRouterRouteNoForce(t *testing.T) { assert.Equal(t, 3, len(invRst2.ToArray())) } -func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) *router.AddrCache { +func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { pool, info := r.Pool(addrs) - cache := &router.AddrCache{ - Invokers: addrs, - AddrPool: make(map[string]router.RouterAddrPool), - AddrMeta: make(map[string]router.AddrMetadata), - } - - cache.AddrMeta[r.Name()] = info - cache.AddrPool[r.Name()] = pool + cache := chain.BuildCache(addrs) + cache.SetAddrPool(r.Name(), pool) + cache.SetAddrMeta(r.Name(), info) return cache } diff --git a/cluster/router/utils/bitmap_util.go b/cluster/router/utils/bitmap_util.go index b51cff96a23e37545d4f5923f38f2e0e3798f226..b34ecf89333b8468f6203c2b35dea1d29863b48e 100644 --- a/cluster/router/utils/bitmap_util.go +++ b/cluster/router/utils/bitmap_util.go @@ -1,10 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package utils import ( "github.com/RoaringBitmap/roaring" +) + +import ( "github.com/apache/dubbo-go/protocol" ) +var EmptyAddr = roaring.NewBitmap() + func JoinIfNotEqual(left *roaring.Bitmap, right *roaring.Bitmap) *roaring.Bitmap { if !left.Equals(right) { left = left.Clone() @@ -22,14 +44,6 @@ func FallbackIfJoinToEmpty(left *roaring.Bitmap, right *roaring.Bitmap) *roaring } } -func ToIndex(invokers []protocol.Invoker) []int { - var ret []int - for i := range invokers { - ret = append(ret, i) - } - return ret -} - func ToBitmap(invokers []protocol.Invoker) *roaring.Bitmap { bitmap := roaring.NewBitmap() bitmap.AddRange(0, uint64(len(invokers))) diff --git a/common/url.go b/common/url.go index 807d0ed5eff4ecb70d3adeb8524b841d0ec92a58..7e5ac1e0be4133784e9cc0430b6b4810832556af 100644 --- a/common/url.go +++ b/common/url.go @@ -643,6 +643,34 @@ func (c *URL) CloneWithParams(reserveParams []string) *URL { ) } +// IsEquals compares if two URLs equals with each other. Excludes are all parameter keys which should ignored. +func IsEquals(left URL, right URL, excludes ...string) bool { + if left.Ip != right.Ip || left.Port != right.Port { + return false + } + + leftMap := left.ToMap() + rightMap := right.ToMap() + for _, exclude := range excludes { + delete(leftMap, exclude) + delete(rightMap, exclude) + } + + if len(leftMap) != len(rightMap) { + return false + } + + for lk, lv := range leftMap { + if rv, ok := rightMap[lk]; !ok { + return false + } else if lv != rv { + return false + } + } + + return true +} + func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) { methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys)) for _, paramKey := range paramKeys {