Skip to content
Snippets Groups Projects
Commit cf7a6135 authored by Ian Luo's avatar Ian Luo
Browse files

refactor the code style

parent eef3685d
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,6 @@
package chain
import (
"github.com/RoaringBitmap/roaring"
"github.com/apache/dubbo-go/common/constant"
"math"
"sort"
"sync"
......@@ -34,6 +32,7 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
......@@ -56,44 +55,38 @@ type RouterChain struct {
builtinRouters []router.PriorityRouter
mutex sync.RWMutex
url common.URL
url common.URL
// The times of address notification since last update for address cache
count int64
last time.Time
ch chan struct{}
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
ch chan struct{}
// Address cache
cache atomic.Value
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
rs := c.copyRouters()
v := c.cache.Load()
if v == nil {
cache := c.loadCache()
if cache == nil {
return c.invokers
}
cache := v.(*router.AddrCache)
bitmap := cache.Bitmap
for _, r := range rs {
bitmap := cache.bitmap
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.Invokers[index]
finalInvokers[i] = cache.invokers[index]
}
return finalInvokers
}
func (c *RouterChain) copyRouters() []router.PriorityRouter {
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
return rs
return finalInvokers
}
// AddRouters Add routers to router chain
......@@ -110,6 +103,8 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.routers = newRouters
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
......@@ -124,6 +119,8 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
}
}
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
func (c *RouterChain) loop() {
for {
select {
......@@ -138,6 +135,27 @@ func (c *RouterChain) loop() {
}
}
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
return rs
}
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}
return v.(*InvokerCache)
}
// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
if c.invokers == nil || len(c.invokers) == 0 {
return
......@@ -146,18 +164,8 @@ func (c *RouterChain) buildCache() {
// FIXME: should lock here, it is fine with dirty read if no panic happens I believe.
invokers := make([]protocol.Invoker, len(c.invokers))
copy(invokers, c.invokers)
cache := &router.AddrCache{
Invokers: invokers,
Bitmap: ToBitmap(invokers),
AddrPool: make(map[string]router.RouterAddrPool),
AddrMeta: make(map[string]router.AddrMetadata),
}
var origin *router.AddrCache
v := c.cache.Load()
if v != nil {
origin = v.(*router.AddrCache)
}
cache := BuildCache(invokers)
origin := c.loadCache()
var mutex sync.Mutex
var wg sync.WaitGroup
......@@ -168,8 +176,8 @@ func (c *RouterChain) buildCache() {
go func(p router.Poolable) {
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
cache.AddrPool[p.Name()] = pool
cache.AddrMeta[p.Name()] = info
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
mutex.Unlock()
wg.Done()
}(p)
......@@ -221,25 +229,32 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
return chain, nil
}
func poolRouter(p router.Poolable, origin *router.AddrCache, invokers []protocol.Invoker) (router.RouterAddrPool, router.AddrMetadata) {
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldRePool() || IsDiff(origin.Invokers, invokers) {
if isCacheMiss(origin, name) || p.ShouldPool() || isInvokersChanged(origin.invokers, invokers) {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
} else {
logger.Debugf("reuse existing address cache for router %q", name)
return origin.AddrPool[name], origin.AddrMeta[name]
return origin.pools[name], origin.metadatas[name]
}
}
func isCacheMiss(cache *router.AddrCache, key string) bool {
if cache == nil || cache.AddrPool == nil || cache.Invokers == nil || cache.AddrPool[key] == nil {
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}
func IsDiff(left []protocol.Invoker, right []protocol.Invoker) bool {
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
}
......@@ -247,7 +262,7 @@ func IsDiff(left []protocol.Invoker, right []protocol.Invoker) bool {
for _, r := range right {
found := false
for _, l := range left {
if IsEquals(l.GetUrl(), r.GetUrl()) {
if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
......@@ -259,38 +274,6 @@ func IsDiff(left []protocol.Invoker, right []protocol.Invoker) bool {
return false
}
func IsEquals(left common.URL, right common.URL) bool {
if left.Ip != right.Ip || left.Port != right.Port {
return false
}
leftMap := left.ToMap()
delete(leftMap, constant.TIMESTAMP_KEY)
delete(leftMap, constant.REMOTE_TIMESTAMP_KEY)
rightMap := right.ToMap()
delete(rightMap, constant.TIMESTAMP_KEY)
delete(rightMap, constant.REMOTE_TIMESTAMP_KEY)
if len(leftMap) != len(rightMap) {
return false
}
for lk, lv := range leftMap {
if rv, ok := rightMap[lk]; !ok {
return false
} else if lv != rv {
return false
}
}
return true
}
func ToBitmap(invokers []protocol.Invoker) *roaring.Bitmap {
bitmap := roaring.NewBitmap()
bitmap.AddRange(0, uint64(len(invokers)))
return bitmap
}
// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chain
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/protocol"
)
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type InvokerCache struct {
// The snapshot of invokers
invokers []protocol.Invoker
// The bitmap representation for invokers snapshot
bitmap *roaring.Bitmap
// Address pool from routers which implement Poolable
pools map[string]router.AddrPool
// Address metadata from routers which implement Poolable
metadatas map[string]router.AddrMetadata
}
// BuildCache builds address cache from the given invokers.
func BuildCache(invokers []protocol.Invoker) *InvokerCache {
return &InvokerCache{
invokers: invokers,
bitmap: utils.ToBitmap(invokers),
pools: make(map[string]router.AddrPool),
metadatas: make(map[string]router.AddrMetadata),
}
}
// GetInvokers get invokers snapshot.
func (c *InvokerCache) GetInvokers() []protocol.Invoker {
return c.invokers
}
// FindAddrPool finds address pool for a poolable router.
func (c *InvokerCache) FindAddrPool(p router.Poolable) router.AddrPool {
return c.pools[p.Name()]
}
// FindAddrMeta finds address metadata for a poolable router.
func (c *InvokerCache) FindAddrMeta(p router.Poolable) router.AddrMetadata {
return c.metadatas[p.Name()]
}
// SetAddrPool sets address pool for a poolable router, for unit test only
func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) {
c.pools[name] = pool
}
// SetAddrMeta sets address metadata for a poolable router, for unit test only
func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) {
c.metadatas[name] = meta
}
......@@ -33,6 +33,7 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -380,9 +381,6 @@ func TestNewAppRouterFactory(t *testing.T) {
assert.NotNil(t, factory)
}
func setUpAddrCache(addrs []protocol.Invoker) *router.AddrCache {
cache := &router.AddrCache{
Invokers: addrs,
}
return cache
func setUpAddrCache(addrs []protocol.Invoker) router.Cache {
return chain.BuildCache(addrs)
}
......@@ -66,14 +66,14 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
l.priority = listenableRouterDefaultPriority
routerKey := ruleKey + constant.ConditionRouterRuleSuffix
//add listener
// add listener
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
return nil, perrors.Errorf("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
}
dynamicConfiguration.AddListener(routerKey, l)
//get rule
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
return nil, perrors.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
......@@ -131,11 +131,11 @@ func (l *listenableRouter) generateConditions(rule *RouterRule) {
}
// Route Determine the target invokers list.
func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() || len(l.conditionRouters) == 0 {
return invokers
}
//We will check enabled status inside each router.
// We will check enabled status inside each router.
for _, r := range l.conditionRouters {
invokers = r.Route(invokers, cache, url, invocation)
}
......
......@@ -24,21 +24,22 @@ import (
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/net"
)
const (
//pattern route pattern regex
// pattern route pattern regex
pattern = `([&!=,]*)\\s*([^&!=,\\s]+)`
)
......@@ -144,7 +145,7 @@ func (c *ConditionRouter) Enabled() bool {
}
// Route Determine the target invokers list.
func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.Enabled() {
return invokers
}
......@@ -159,13 +160,14 @@ func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCach
}
if len(c.ThenCondition) == 0 {
return router.EmptyAddr
return utils.EmptyAddr
}
result := roaring.NewBitmap()
for iter := invokers.Iterator(); iter.HasNext(); {
index := iter.Next()
invokerUrl := cache.Invokers[index].GetUrl()
invoker := cache.GetInvokers()[index]
invokerUrl := invoker.GetUrl()
isMatchThen := c.MatchThen(&invokerUrl, url)
if isMatchThen {
result.Add(index)
......@@ -323,7 +325,7 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool {
return true
}
if !pair.Mismatches.Empty() && !pair.Matches.Empty() {
//when both mismatches and matches contain the same value, then using mismatches first
// when both mismatches and matches contain the same value, then using mismatches first
for mismatch := range pair.Mismatches.Items {
if isMatchGlobalPattern(mismatch.(string), value, param) {
return false
......
......@@ -19,6 +19,9 @@ package healthcheck
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
......@@ -31,6 +34,7 @@ import (
const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
healthy = "healthy"
name = "health-check-router"
)
// HealthCheckRouter provides a health-first routing mechanism through HealthChecker
......@@ -54,7 +58,7 @@ func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) {
}
// Route gets a list of healthy invoker
func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !r.enabled {
return invokers
}
......@@ -70,8 +74,9 @@ func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCa
return healthyInvokers
}
func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.RouterAddrPool, router.AddrMetadata) {
rb := make(router.RouterAddrPool)
// Pool separates healthy invokers from others.
func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
rb := make(router.AddrPool)
rb[healthy] = roaring.NewBitmap()
for i, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
......@@ -82,12 +87,13 @@ func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.RouterAddr
return rb, nil
}
func (r *HealthCheckRouter) ShouldRePool() bool {
// ShouldPool will always return true to make sure healthy check constantly.
func (r *HealthCheckRouter) ShouldPool() bool {
return true
}
func (r *HealthCheckRouter) Name() string {
return "health-check-router"
return name
}
// Priority
......
......@@ -19,8 +19,6 @@ package healthcheck
import (
"fmt"
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"math"
"testing"
"time"
......@@ -31,6 +29,9 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -137,15 +138,10 @@ func TestNewHealthCheckRouter(t *testing.T) {
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500))
}
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) *router.AddrCache {
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := &router.AddrCache{
Invokers: addrs,
AddrPool: make(map[string]router.RouterAddrPool),
AddrMeta: make(map[string]router.AddrMetadata),
}
cache.AddrMeta[r.Name()] = info
cache.AddrPool[r.Name()] = pool
cache := chain.BuildCache(addrs)
cache.SetAddrMeta(r.Name(), info)
cache.SetAddrPool(r.Name(), pool)
return cache
}
......@@ -19,6 +19,9 @@ package router
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
......@@ -39,7 +42,7 @@ type FilePriorityRouterFactory interface {
// Router
type router interface {
// Route Determine the target invokers list.
Route(*roaring.Bitmap, *AddrCache, *common.URL, protocol.Invocation) *roaring.Bitmap
Route(*roaring.Bitmap, Cache, *common.URL, protocol.Invocation) *roaring.Bitmap
// URL Return URL in router
URL() common.URL
......@@ -53,33 +56,38 @@ type PriorityRouter interface {
Priority() int64
}
// Poolable caches address pool and address metadata for a router instance which will be used later in Router's Route.
type Poolable interface {
Pool([]protocol.Invoker) (RouterAddrPool, AddrMetadata)
ShouldRePool() bool
// Pool created address pool and address metadata from the invokers.
Pool([]protocol.Invoker) (AddrPool, AddrMetadata)
// ShouldPool returns if it should pool. One typical scenario is a router rule changes, in this case, a pooling
// is necessary, even if the addresses not changed at all.
ShouldPool() bool
// Name return the Poolable's name.
Name() string
}
// AddrPool is an address pool, backed by a snapshot of address list, divided into categories.
type AddrPool map[string]*roaring.Bitmap
// AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable.
type AddrMetadata interface {
// Source indicates where the metadata comes from.
Source() string
}
type RouterAddrPool map[string]*roaring.Bitmap
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type Cache interface {
// GetInvokers returns the snapshot of received invokers.
GetInvokers() []protocol.Invoker
// AddrCache caches all addresses relevant info for a snapshot of received invokers, the calculation logic is
// different from router to router.
type AddrCache struct {
Invokers []protocol.Invoker // invokers snapshot
Bitmap *roaring.Bitmap // bitmap for invokers
AddrPool map[string]RouterAddrPool // address pool from the invokers for one particular router
AddrMeta map[string]AddrMetadata // address meta info collected from the invokers for one particular router
}
// FindAddrPool returns address pool associated with the given Poolable instance.
FindAddrPool(Poolable) AddrPool
func (c *AddrCache) FindAddrPool(p Poolable) RouterAddrPool {
return c.AddrPool[p.Name()]
// FindAddrMeta returns address metadata associated with the given Poolable instance.
FindAddrMeta(Poolable) AddrMetadata
}
func (c *AddrCache) FindAddrMeta(p Poolable) AddrMetadata {
return c.AddrMeta[p.Name()]
}
var EmptyAddr = roaring.NewBitmap()
......@@ -18,18 +18,18 @@
package tag
import (
"github.com/RoaringBitmap/roaring"
"github.com/apache/dubbo-go/cluster/router"
"net/url"
"strconv"
"sync"
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -76,7 +76,7 @@ func (f *FileTagRouter) Priority() int64 {
return f.router.priority
}
func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() {
return invokers
}
......
......@@ -18,22 +18,26 @@
package tag
import (
"github.com/RoaringBitmap/roaring"
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"strconv"
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
const (
name = "tag-router"
)
type tagRouter struct {
url *common.URL
enabled bool
......@@ -55,7 +59,7 @@ func (c *tagRouter) isEnabled() bool {
return c.enabled
}
func (c *tagRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.isEnabled() || invokers.IsEmpty() {
return invokers
}
......@@ -65,7 +69,7 @@ func (c *tagRouter) Route(invokers *roaring.Bitmap, cache *router.AddrCache, url
return invokers
}
ret := router.EmptyAddr
ret := utils.EmptyAddr
if target, ok := cache.FindAddrPool(c)[tag]; ok {
ret = utils.JoinIfNotEqual(target, invokers)
}
......@@ -85,8 +89,9 @@ func (c *tagRouter) Priority() int64 {
return c.priority
}
func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.RouterAddrPool, router.AddrMetadata) {
rb := make(router.RouterAddrPool)
// Pool divided invokers into different address pool by tag.
func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
rb := make(router.AddrPool)
for i, invoker := range invokers {
url := invoker.GetUrl()
tag := url.GetParam(constant.Tagkey, "")
......@@ -100,12 +105,13 @@ func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.RouterAddrPool, ro
return rb, nil
}
func (c *tagRouter) ShouldRePool() bool {
// ShouldPool returns false, to make sure address cache for tag router happens once and only once.
func (c *tagRouter) ShouldPool() bool {
return false
}
func (c *tagRouter) Name() string {
return "tag-router"
return name
}
func findStaticTag(invocation protocol.Invocation) string {
......
......@@ -19,8 +19,6 @@ package tag
import (
"context"
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"testing"
)
......@@ -29,6 +27,9 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
......@@ -163,15 +164,10 @@ func TestTagRouterRouteNoForce(t *testing.T) {
assert.Equal(t, 3, len(invRst2.ToArray()))
}
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) *router.AddrCache {
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := &router.AddrCache{
Invokers: addrs,
AddrPool: make(map[string]router.RouterAddrPool),
AddrMeta: make(map[string]router.AddrMetadata),
}
cache.AddrMeta[r.Name()] = info
cache.AddrPool[r.Name()] = pool
cache := chain.BuildCache(addrs)
cache.SetAddrPool(r.Name(), pool)
cache.SetAddrMeta(r.Name(), info)
return cache
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/protocol"
)
var EmptyAddr = roaring.NewBitmap()
func JoinIfNotEqual(left *roaring.Bitmap, right *roaring.Bitmap) *roaring.Bitmap {
if !left.Equals(right) {
left = left.Clone()
......@@ -22,14 +44,6 @@ func FallbackIfJoinToEmpty(left *roaring.Bitmap, right *roaring.Bitmap) *roaring
}
}
func ToIndex(invokers []protocol.Invoker) []int {
var ret []int
for i := range invokers {
ret = append(ret, i)
}
return ret
}
func ToBitmap(invokers []protocol.Invoker) *roaring.Bitmap {
bitmap := roaring.NewBitmap()
bitmap.AddRange(0, uint64(len(invokers)))
......
......@@ -643,6 +643,34 @@ func (c *URL) CloneWithParams(reserveParams []string) *URL {
)
}
// IsEquals compares if two URLs equals with each other. Excludes are all parameter keys which should ignored.
func IsEquals(left URL, right URL, excludes ...string) bool {
if left.Ip != right.Ip || left.Port != right.Port {
return false
}
leftMap := left.ToMap()
rightMap := right.ToMap()
for _, exclude := range excludes {
delete(leftMap, exclude)
delete(rightMap, exclude)
}
if len(leftMap) != len(rightMap) {
return false
}
for lk, lv := range leftMap {
if rv, ok := rightMap[lk]; !ok {
return false
} else if lv != rv {
return false
}
}
return true
}
func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) {
methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
for _, paramKey := range paramKeys {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment