diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index ee4dca5b4f105b80f9a009b6e4888656f1204966..10d812a2e9fd560ba55d91c8345869728aec911c 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -62,21 +62,19 @@ type RouterChain struct { last time.Time // Channel for notify to update the address cache notify chan struct{} - // Address cache - cache *InvokerCache + // Address caches, all caches still in used will be kept + caches []*InvokerCache } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - var cache *InvokerCache - c.mutex.RLock() - if c.cache == nil { - defer c.mutex.RUnlock() + cache, ok := c.findCache() + if !ok { return c.invokers } - // FIXME: this means clone happens in request scope which should be avoid - cache = c.cache.Clone() - c.mutex.RUnlock() + + cache.in() + defer cache.out() bitmap := cache.bitmap for _, r := range c.copyRouters() { @@ -147,6 +145,27 @@ func (c *RouterChain) copyRouters() []router.PriorityRouter { return ret } +// findCache returns the latest cache (at the end of the cache list), if there's no valid entry in the list yet, +// return false instead. +func (c *RouterChain) findCache() (*InvokerCache, bool) { + c.mutex.RLock() + defer c.mutex.RUnlock() + if c.caches == nil || len(c.caches) == 0 { + return nil, false + } + return c.caches[len(c.caches)-1], true +} + +func sweepUnusedCache(caches []*InvokerCache) []*InvokerCache { + ret := caches[:0] + for _, c := range caches { + if c.isInUse() { + ret = append(ret, c) + } + } + return ret +} + // buildCache builds address cache with the new invokers for all poolable routers. func (c *RouterChain) buildCache() { c.mutex.RLock() @@ -157,19 +176,20 @@ func (c *RouterChain) buildCache() { invokers := make([]protocol.Invoker, 0, len(c.invokers)) invokers = append(invokers, c.invokers...) + var origin *InvokerCache - if c.cache != nil { - origin = c.cache.Clone() + if c.caches != nil && len(c.caches) > 0 { + origin = c.caches[len(c.caches)-1] + c.caches = sweepUnusedCache(c.caches) } c.mutex.RUnlock() - cache := BuildCache(invokers) - var ( mutex sync.Mutex wg sync.WaitGroup ) + cache := BuildCache(invokers) for _, r := range c.copyRouters() { if p, ok := r.(router.Poolable); ok { wg.Add(1) @@ -187,7 +207,7 @@ func (c *RouterChain) buildCache() { c.mutex.Lock() defer c.mutex.Unlock() - c.cache = cache + c.caches = append(c.caches, cache) } // URL Return URL in RouterChain @@ -221,6 +241,7 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { builtinRouters: routers, routers: newRouters, last: time.Now(), + caches: []*InvokerCache{}, notify: make(chan struct{}), } if url != nil { diff --git a/cluster/router/chain/invoker_cache.go b/cluster/router/chain/invoker_cache.go index b1b39d09a7423a7185a38289c1b7301d2c47b411..da12d5af934e358c3951f1a48ea88cba6d87e3ce 100644 --- a/cluster/router/chain/invoker_cache.go +++ b/cluster/router/chain/invoker_cache.go @@ -19,6 +19,7 @@ package chain import ( "github.com/RoaringBitmap/roaring" + "go.uber.org/atomic" ) import ( @@ -42,6 +43,8 @@ type InvokerCache struct { // Address metadata from routers which implement Poolable metadatas map[string]router.AddrMetadata + + inUse atomic.Int32 } // BuildCache builds address cache from the given invokers. @@ -79,29 +82,17 @@ func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) { c.metadatas[name] = meta } -func (c *InvokerCache) Clone() *InvokerCache { - ret := &InvokerCache{ - pools: make(map[string]router.AddrPool, len(c.pools)), - metadatas: make(map[string]router.AddrMetadata, len(c.metadatas)), - } - - invokers := make([]protocol.Invoker, 0, len(c.invokers)) - invokers = append(invokers, c.invokers...) - ret.invokers = invokers - - ret.bitmap = c.bitmap.Clone() - - for k, v := range c.pools { - pool := make(router.AddrPool, len(v)) - for k1, v1 := range pool { - pool[k1] = v1.Clone() - } - ret.pools[k] = pool - } +// in increases inUse count at the beginning of every request, used only by router chain +func (c *InvokerCache) in() { + c.inUse.Inc() +} - for k, v := range c.metadatas { - ret.metadatas[k] = v.Clone() - } +// out decreases inUse count at the end of every request, used only by router chain +func (c *InvokerCache) out() { + c.inUse.Dec() +} - return ret +// isInUse returns false when inUse count equals to 0, so that it can be safely removed by router chain +func (c *InvokerCache) isInUse() bool { + return c.inUse.Load() > 0 } diff --git a/cluster/router/router.go b/cluster/router/router.go index f68e4fcb5ca20878b03009a625a2765b14ade758..ddca42a01d5d5b56a33b9145926c77a843ea9787 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -74,8 +74,8 @@ type AddrPool map[string]*roaring.Bitmap // AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable. type AddrMetadata interface { - // Copy returns a copy of address metadata. - Clone() AddrMetadata + // Source indicates where the metadata comes from. + Source() string } // Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index efa6dd179c7efceba9430dffea5032077a3440ec..0acdf70ce97b7e1984d32ae9d4fc1b50d97ba071 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -62,10 +62,9 @@ type addrMetadata struct { ruleEnabled bool } -// Clone returns a copy of addrMetadata -func (m *addrMetadata) Clone() router.AddrMetadata { - ret := *m - return &ret +// Source indicates where the metadata comes from. +func (m *addrMetadata) Source() string { + return name } // tagRouter defines url, enable and the priority