diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index 10d812a2e9fd560ba55d91c8345869728aec911c..a7f139b235d461ef8eaaa212c4cbf04b54a6b130 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -20,6 +20,7 @@ package chain import ( "sort" "sync" + "sync/atomic" "time" ) @@ -62,20 +63,19 @@ type RouterChain struct { last time.Time // Channel for notify to update the address cache notify chan struct{} - // Address caches, all caches still in used will be kept - caches []*InvokerCache + // Address cache + cache atomic.Value } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - cache, ok := c.findCache() - if !ok { + cache := c.loadCache() + if cache == nil { + c.mutex.RLock() + defer c.mutex.RUnlock() return c.invokers } - cache.in() - defer cache.out() - bitmap := cache.bitmap for _, r := range c.copyRouters() { bitmap = r.Route(bitmap, cache, url, invocation) @@ -145,51 +145,43 @@ func (c *RouterChain) copyRouters() []router.PriorityRouter { return ret } -// findCache returns the latest cache (at the end of the cache list), if there's no valid entry in the list yet, -// return false instead. -func (c *RouterChain) findCache() (*InvokerCache, bool) { +// copyInvokers copies a snapshot of the received invokers. +func (c *RouterChain) copyInvokers() []protocol.Invoker { c.mutex.RLock() defer c.mutex.RUnlock() - if c.caches == nil || len(c.caches) == 0 { - return nil, false + if c.invokers == nil || len(c.invokers) == 0 { + return nil } - return c.caches[len(c.caches)-1], true + ret := make([]protocol.Invoker, 0, len(c.invokers)) + ret = append(ret, c.invokers...) + return ret } -func sweepUnusedCache(caches []*InvokerCache) []*InvokerCache { - ret := caches[:0] - for _, c := range caches { - if c.isInUse() { - ret = append(ret, c) - } +// loadCache loads cache from sync.Value to guarantee the visibility +func (c *RouterChain) loadCache() *InvokerCache { + v := c.cache.Load() + if v == nil { + return nil } - return ret + + return v.(*InvokerCache) } // buildCache builds address cache with the new invokers for all poolable routers. func (c *RouterChain) buildCache() { - c.mutex.RLock() - if c.invokers == nil || len(c.invokers) == 0 { - defer c.mutex.RUnlock() + invokers := c.copyInvokers() + if invokers == nil || len(c.invokers) == 0 { return } - invokers := make([]protocol.Invoker, 0, len(c.invokers)) - invokers = append(invokers, c.invokers...) - - var origin *InvokerCache - if c.caches != nil && len(c.caches) > 0 { - origin = c.caches[len(c.caches)-1] - c.caches = sweepUnusedCache(c.caches) - } - c.mutex.RUnlock() + cache := BuildCache(invokers) + origin := c.loadCache() var ( mutex sync.Mutex wg sync.WaitGroup ) - cache := BuildCache(invokers) for _, r := range c.copyRouters() { if p, ok := r.(router.Poolable); ok { wg.Add(1) @@ -205,9 +197,7 @@ func (c *RouterChain) buildCache() { } wg.Wait() - c.mutex.Lock() - defer c.mutex.Unlock() - c.caches = append(c.caches, cache) + c.cache.Store(cache) } // URL Return URL in RouterChain @@ -241,7 +231,6 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { builtinRouters: routers, routers: newRouters, last: time.Now(), - caches: []*InvokerCache{}, notify: make(chan struct{}), } if url != nil { diff --git a/cluster/router/chain/invoker_cache.go b/cluster/router/chain/invoker_cache.go index da12d5af934e358c3951f1a48ea88cba6d87e3ce..43cdfa50670f5f09666dbea10a2837d58cafd1b0 100644 --- a/cluster/router/chain/invoker_cache.go +++ b/cluster/router/chain/invoker_cache.go @@ -19,7 +19,6 @@ package chain import ( "github.com/RoaringBitmap/roaring" - "go.uber.org/atomic" ) import ( @@ -43,8 +42,6 @@ type InvokerCache struct { // Address metadata from routers which implement Poolable metadatas map[string]router.AddrMetadata - - inUse atomic.Int32 } // BuildCache builds address cache from the given invokers. @@ -81,18 +78,3 @@ func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) { func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) { c.metadatas[name] = meta } - -// in increases inUse count at the beginning of every request, used only by router chain -func (c *InvokerCache) in() { - c.inUse.Inc() -} - -// out decreases inUse count at the end of every request, used only by router chain -func (c *InvokerCache) out() { - c.inUse.Dec() -} - -// isInUse returns false when inUse count equals to 0, so that it can be safely removed by router chain -func (c *InvokerCache) isInUse() bool { - return c.inUse.Load() > 0 -}