Skip to content
Snippets Groups Projects
Commit c1cc1138 authored by 李志信's avatar 李志信
Browse files

fix: add router change refresh cache logic

parent c822cfab
No related branches found
No related tags found
No related merge requests found
......@@ -20,12 +20,12 @@ package chain
import (
"sort"
"sync"
"sync/atomic"
"time"
)
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
......@@ -38,9 +38,7 @@ import (
)
const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
timeInterval = 5 * time.Second
)
// RouterChain Router chain
......@@ -65,6 +63,8 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// routerNeedsUpdate
routerNeedsUpdate atomic.Bool
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
......@@ -102,6 +102,7 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
c.routerNeedsUpdate.Store(true)
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
......@@ -123,8 +124,9 @@ func (c *RouterChain) loop() {
for {
select {
case <-ticker.C:
if protocol.GetAndRefreshState() {
if protocol.GetAndRefreshState() || c.routerNeedsUpdate.Load() {
c.buildCache()
c.routerNeedsUpdate.Store(false)
}
case <-c.notify:
c.buildCache()
......@@ -237,11 +239,14 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
sortRouter(newRouters)
routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
routerNeedsUpdate: routerNeedsUpdateInit,
}
if url != nil {
chain.url = url
......
......@@ -23,6 +23,10 @@ import (
"time"
)
import (
uberAtomic "go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -30,11 +34,11 @@ import (
)
var (
methodStatistics sync.Map // url -> { methodName : RPCStatus}
serviceStatistic sync.Map // url -> RPCStatus
invokerBlackList sync.Map // store unhealthy url blackList
blackListCacheDirty atomic.Value // store if the cache in chain is not refreshed by blacklist
blackListRefreshing int32 // store if the refresing method is processing
methodStatistics sync.Map // url -> { methodName : RPCStatus}
serviceStatistic sync.Map // url -> RPCStatus
invokerBlackList sync.Map // store unhealthy url blackList
blackListCacheDirty uberAtomic.Bool // store if the cache in chain is not refreshed by blacklist
blackListRefreshing int32 // store if the refresing method is processing
)
func init() {
......@@ -237,7 +241,7 @@ func RemoveUrlKeyUnhealthyStatus(key string) {
func GetAndRefreshState() bool {
state := blackListCacheDirty.Load()
blackListCacheDirty.Store(false)
return state.(bool)
return state
}
// TryRefreshBlackList start 3 gr to check at most block=16 invokers in black list
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment