From 0cb2789383a28c972dd0eccb57293a6bd959d7a7 Mon Sep 17 00:00:00 2001 From: Ian Luo <ian.luo@gmail.com> Date: Tue, 18 Aug 2020 23:19:01 +0800 Subject: [PATCH] switch to use sync.Mutex instead of atomic.Value. --- cluster/router/chain/chain.go | 50 +++++++++++---------------- cluster/router/chain/invoker_cache.go | 27 +++++++++++++++ cluster/router/router.go | 4 +-- cluster/router/tag/tag_router.go | 7 ++-- 4 files changed, 53 insertions(+), 35 deletions(-) diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index a7f139b23..ee4dca5b4 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -20,7 +20,6 @@ package chain import ( "sort" "sync" - "sync/atomic" "time" ) @@ -64,17 +63,20 @@ type RouterChain struct { // Channel for notify to update the address cache notify chan struct{} // Address cache - cache atomic.Value + cache *InvokerCache } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - cache := c.loadCache() - if cache == nil { - c.mutex.RLock() + var cache *InvokerCache + c.mutex.RLock() + if c.cache == nil { defer c.mutex.RUnlock() return c.invokers } + // FIXME: this means clone happens in request scope which should be avoid + cache = c.cache.Clone() + c.mutex.RUnlock() bitmap := cache.bitmap for _, r := range c.copyRouters() { @@ -145,37 +147,23 @@ func (c *RouterChain) copyRouters() []router.PriorityRouter { return ret } -// copyInvokers copies a snapshot of the received invokers. -func (c *RouterChain) copyInvokers() []protocol.Invoker { +// buildCache builds address cache with the new invokers for all poolable routers. +func (c *RouterChain) buildCache() { c.mutex.RLock() - defer c.mutex.RUnlock() if c.invokers == nil || len(c.invokers) == 0 { - return nil - } - ret := make([]protocol.Invoker, 0, len(c.invokers)) - ret = append(ret, c.invokers...) - return ret -} - -// loadCache loads cache from sync.Value to guarantee the visibility -func (c *RouterChain) loadCache() *InvokerCache { - v := c.cache.Load() - if v == nil { - return nil + defer c.mutex.RUnlock() + return } - return v.(*InvokerCache) -} - -// buildCache builds address cache with the new invokers for all poolable routers. -func (c *RouterChain) buildCache() { - invokers := c.copyInvokers() - if invokers == nil || len(c.invokers) == 0 { - return + invokers := make([]protocol.Invoker, 0, len(c.invokers)) + invokers = append(invokers, c.invokers...) + var origin *InvokerCache + if c.cache != nil { + origin = c.cache.Clone() } + c.mutex.RUnlock() cache := BuildCache(invokers) - origin := c.loadCache() var ( mutex sync.Mutex @@ -197,7 +185,9 @@ func (c *RouterChain) buildCache() { } wg.Wait() - c.cache.Store(cache) + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache = cache } // URL Return URL in RouterChain diff --git a/cluster/router/chain/invoker_cache.go b/cluster/router/chain/invoker_cache.go index 43cdfa506..b1b39d09a 100644 --- a/cluster/router/chain/invoker_cache.go +++ b/cluster/router/chain/invoker_cache.go @@ -78,3 +78,30 @@ func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) { func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) { c.metadatas[name] = meta } + +func (c *InvokerCache) Clone() *InvokerCache { + ret := &InvokerCache{ + pools: make(map[string]router.AddrPool, len(c.pools)), + metadatas: make(map[string]router.AddrMetadata, len(c.metadatas)), + } + + invokers := make([]protocol.Invoker, 0, len(c.invokers)) + invokers = append(invokers, c.invokers...) + ret.invokers = invokers + + ret.bitmap = c.bitmap.Clone() + + for k, v := range c.pools { + pool := make(router.AddrPool, len(v)) + for k1, v1 := range pool { + pool[k1] = v1.Clone() + } + ret.pools[k] = pool + } + + for k, v := range c.metadatas { + ret.metadatas[k] = v.Clone() + } + + return ret +} diff --git a/cluster/router/router.go b/cluster/router/router.go index ddca42a01..f68e4fcb5 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -74,8 +74,8 @@ type AddrPool map[string]*roaring.Bitmap // AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable. type AddrMetadata interface { - // Source indicates where the metadata comes from. - Source() string + // Copy returns a copy of address metadata. + Clone() AddrMetadata } // Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 0acdf70ce..efa6dd179 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -62,9 +62,10 @@ type addrMetadata struct { ruleEnabled bool } -// Source indicates where the metadata comes from. -func (m *addrMetadata) Source() string { - return name +// Clone returns a copy of addrMetadata +func (m *addrMetadata) Clone() router.AddrMetadata { + ret := *m + return &ret } // tagRouter defines url, enable and the priority -- GitLab