Skip to content
Snippets Groups Projects
Unverified Commit 299b17b9 authored by cvictory's avatar cvictory Committed by GitHub
Browse files

Optimize conn check router (#1217)

* fix sentinel cannot transport Context issue

* optimize SentinelLogging reset

* support print route info when there is no provider

* revert to original version

* optimize conn_check_router, make routing running fast
parent a3569bf0
No related branches found
No related tags found
No related merge requests found
......@@ -118,7 +118,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
if len(invokers) == 0 {
return nil
}
go protocol.TryRefreshBlackList()
protocol.TryRefreshBlackList()
if len(invokers) == 1 {
if invokers[0].IsAvailable() {
return invokers[0]
......
......@@ -34,8 +34,9 @@ import (
)
const (
connHealthy = "conn-healthy"
name = "conn-check-router"
connUnHealthy = "conn-unhealthy"
connHealthy = "conn-healthy"
name = "conn-check-router"
)
// ConnCheckRouter provides a health-first routing mechanism through ConnChecker
......@@ -60,7 +61,13 @@ func NewConnCheckRouter(url *common.URL, notify chan struct{}) (router.PriorityR
func (r *ConnCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
addrPool := cache.FindAddrPool(r)
// Add healthy invoker to the list
healthyInvokers := utils.JoinIfNotEqual(addrPool[connHealthy], invokers)
unhealthyBits := addrPool[connUnHealthy]
healthyBits := addrPool[connHealthy]
// check if empty, return
if unhealthyBits == nil || unhealthyBits.IsEmpty() || healthyBits == nil || healthyBits.IsEmpty() {
return invokers
}
healthyInvokers := utils.JoinIfNotEqual(healthyBits, invokers)
// If all invokers are considered unhealthy, downgrade to all invoker
if healthyInvokers.IsEmpty() {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
......@@ -76,6 +83,10 @@ func (r *ConnCheckRouter) RouteSnapshot(cache router.Cache) string {
sb := strings.Builder{}
sb.WriteString(r.Name())
sb.WriteString(" -> ")
if healthBit == nil {
sb.WriteString(" No Unhealthy Invoker")
return sb.String()
}
sb.WriteString("Count:")
sb.WriteString(strconv.FormatUint(healthBit.GetCardinality(), 10))
sb.WriteString(" ")
......@@ -86,12 +97,18 @@ func (r *ConnCheckRouter) RouteSnapshot(cache router.Cache) string {
// Pool separates healthy invokers from others.
func (r *ConnCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
rb := make(router.AddrPool, 8)
rb[connHealthy] = roaring.NewBitmap()
healthyInvokers := utils.ToBitmap(invokers)
unhealthyInvokers := roaring.NewBitmap()
for i, invoker := range invokers {
if r.checker.IsConnHealthy(invoker) {
rb[connHealthy].Add(uint32(i))
if !r.checker.IsConnHealthy(invoker) {
unhealthyInvokers.Add(uint32(i))
healthyInvokers.Remove(uint32(i))
}
}
if !unhealthyInvokers.IsEmpty() {
rb[connHealthy] = healthyInvokers
rb[connUnHealthy] = unhealthyInvokers
}
return rb, nil
}
......
......@@ -20,6 +20,7 @@ package conncheck
import (
"fmt"
"testing"
"time"
)
import (
......@@ -99,6 +100,7 @@ func TestRecovery(t *testing.T) {
protocol.SetInvokerUnhealthyStatus(invoker2)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 2)
protocol.TryRefreshBlackList()
time.Sleep(1 * time.Second)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 0)
}
......
......@@ -251,25 +251,27 @@ func GetAndRefreshState() bool {
// if target invoker is available, then remove it from black list
func TryRefreshBlackList() {
if atomic.CompareAndSwapInt32(&blackListRefreshing, 0, 1) {
wg := sync.WaitGroup{}
defer func() {
atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0)
}()
ivks := GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
logger.Debug("blackList len = ", len(ivks))
for i := 0; i < 3; i++ {
wg.Add(1)
go func(ivks []Invoker, i int) {
defer wg.Done()
for j, _ := range ivks {
if j%3-i == 0 && ivks[j].(Invoker).IsAvailable() {
RemoveInvokerUnhealthyStatus(ivks[i])
go func() {
wg := sync.WaitGroup{}
defer func() {
atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0)
}()
ivks := GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
logger.Debug("blackList len = ", len(ivks))
for i := 0; i < 3; i++ {
wg.Add(1)
go func(ivks []Invoker, i int) {
defer wg.Done()
for j, _ := range ivks {
if j%3-i == 0 && ivks[j].(Invoker).IsAvailable() {
RemoveInvokerUnhealthyStatus(ivks[i])
}
}
}
}(ivks, i)
}
wg.Wait()
}(ivks, i)
}
wg.Wait()
}()
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment