Skip to content
Snippets Groups Projects
Commit 6707cdc3 authored by xg.gao's avatar xg.gao
Browse files

Merge branch 'develop' into read

parents f8434c31 4c483e0e
No related branches found
No related tags found
No related merge requests found
Showing
with 891 additions and 315 deletions
# Release Notes
---
## 1.5.1
### New Features
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/703)
- [Add TLS support](https://github.com/apache/dubbo-go/pull/685)
- [Add Nearest first for multiple registry](https://github.com/apache/dubbo-go/pull/659)
- [Add application and service level router](https://github.com/apache/dubbo-go/pull/662)
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/665)
### Enhancement
- [Avoid init the log twice](https://github.com/apache/dubbo-go/pull/719)
- [Correct words and format codes](https://github.com/apache/dubbo-go/pull/704)
- [Change log stack level from warn to error](https://github.com/apache/dubbo-go/pull/702)
- [Optimize remotes configuration](https://github.com/apache/dubbo-go/pull/687)
### Bugfixes
- [Fix register service instance after provider config load](https://github.com/apache/dubbo-go/pull/694)
- [Fix call subscribe function asynchronously](https://github.com/apache/dubbo-go/pull/721)
- [Fix tag router rule copy](https://github.com/apache/dubbo-go/pull/721)
- [Fix nacos unit test failed](https://github.com/apache/dubbo-go/pull/705)
- [Fix can not inovke nacos destroy when graceful shutdown](https://github.com/apache/dubbo-go/pull/689)
- [Fix zk lost event](https://github.com/apache/dubbo-go/pull/692)
- [Fix k8s ut bug](https://github.com/apache/dubbo-go/pull/693)
Milestone: [https://github.com/apache/dubbo-go/milestone/2?closed=1](https://github.com/apache/dubbo-go/milestone/2?closed=1)
Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apache/dubbo-go/projects/8)
## 1.5.0
### New Features
......
......@@ -16,6 +16,8 @@ Apache License, Version 2.0
## Release note ##
[v1.5.1 - Aug 23, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.1)
[v1.5.0 - July 24, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.0)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
......
......@@ -39,10 +39,13 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
if len(invokers) > 0 {
url = invokers[0].GetUrl()
}
return &staticDirectory{
dir := &staticDirectory{
BaseDirectory: NewBaseDirectory(&url),
invokers: invokers,
}
dir.routerChain.SetInvokers(invokers)
return dir
}
//for-loop invokers ,if all invokers is available ,then it means directory is available
......@@ -69,7 +72,7 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo
return invokers
}
dirUrl := dir.GetUrl()
return routerChain.Route(invokers, &dirUrl, invocation)
return routerChain.Route(&dirUrl, invocation)
}
// Destroy Destroy
......@@ -92,6 +95,7 @@ func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error
if e != nil {
return e
}
routerChain.SetInvokers(dir.invokers)
dir.SetRouterChain(routerChain)
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package router
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
// Chain
type Chain interface {
Route(*common.URL, protocol.Invocation) []protocol.Invoker
// Refresh invokers
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
}
......@@ -18,9 +18,10 @@
package chain
import (
"math"
"sort"
"sync"
"sync/atomic"
"time"
)
import (
......@@ -30,11 +31,18 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
)
// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
......@@ -48,30 +56,38 @@ type RouterChain struct {
mutex sync.RWMutex
url common.URL
// The times of address notification since last update for address cache
count int64
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
notify chan struct{}
// Address cache
cache atomic.Value
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invokers
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}
for _, r := range rs {
finalInvokers = r.Route(finalInvokers, url, invocation)
bitmap := cache.bitmap
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
return finalInvokers
}
// SetInvokers notify router chain of the initial addresses from registry at the first time. Notify whenever addresses in registry change.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
for _, r := range c.routers {
if notifyRouter, ok := r.(router.NotifyRouter); ok {
notifyRouter.Notify(invokers)
}
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
}
return finalInvokers
}
// AddRouters Add routers to router chain
......@@ -88,6 +104,116 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.routers = newRouters
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()
c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
}
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
func (c *RouterChain) loop() {
for {
ticker := time.NewTicker(timeInterval)
select {
case <-ticker.C:
c.buildCache()
case <-c.notify:
c.buildCache()
}
}
}
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
defer c.mutex.RUnlock()
ret := make([]router.PriorityRouter, 0, len(c.routers))
ret = append(ret, c.routers...)
return ret
}
// copyInvokers copies a snapshot of the received invokers.
func (c *RouterChain) copyInvokers() []protocol.Invoker {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.invokers == nil || len(c.invokers) == 0 {
return nil
}
ret := make([]protocol.Invoker, 0, len(c.invokers))
ret = append(ret, c.invokers...)
return ret
}
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}
return v.(*InvokerCache)
}
// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}
c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}
// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}
var (
mutex sync.Mutex
wg sync.WaitGroup
)
cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()
c.cache.Store(cache)
}
// URL Return URL in RouterChain
func (c *RouterChain) URL() common.URL {
return c.url
......@@ -118,14 +244,62 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
if url != nil {
chain.url = *url
}
go chain.loop()
return chain, nil
}
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}
logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
}
for _, r := range right {
found := false
for _, l := range left {
if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
}
if !found {
return true
}
}
return false
}
// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
......
......@@ -172,12 +172,12 @@ func TestRouterChainRoute(t *testing.T) {
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 1, len(finalInvokers))
}
......@@ -213,10 +213,12 @@ conditions:
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
......@@ -236,13 +238,16 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chain
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/protocol"
)
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type InvokerCache struct {
// The snapshot of invokers
invokers []protocol.Invoker
// The bitmap representation for invokers snapshot
bitmap *roaring.Bitmap
// Address pool from routers which implement Poolable
pools map[string]router.AddrPool
// Address metadata from routers which implement Poolable
metadatas map[string]router.AddrMetadata
}
// BuildCache builds address cache from the given invokers.
func BuildCache(invokers []protocol.Invoker) *InvokerCache {
return &InvokerCache{
invokers: invokers,
bitmap: utils.ToBitmap(invokers),
pools: make(map[string]router.AddrPool, 8),
metadatas: make(map[string]router.AddrMetadata, 8),
}
}
// GetInvokers get invokers snapshot.
func (c *InvokerCache) GetInvokers() []protocol.Invoker {
return c.invokers
}
// FindAddrPool finds address pool for a poolable router.
func (c *InvokerCache) FindAddrPool(p router.Poolable) router.AddrPool {
return c.pools[p.Name()]
}
// FindAddrMeta finds address metadata for a poolable router.
func (c *InvokerCache) FindAddrMeta(p router.Poolable) router.AddrMetadata {
return c.metadatas[p.Name()]
}
// SetAddrPool sets address pool for a poolable router, for unit test only
func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) {
c.pools[name] = pool
}
// SetAddrMeta sets address metadata for a poolable router, for unit test only
func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) {
c.metadatas[name] = meta
}
......@@ -32,6 +32,9 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
......@@ -180,30 +183,30 @@ func TestRoute_matchFilter(t *testing.T) {
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers4 := router4.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers5 := router5.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers6 := router6.Route(invokers, &cUrl, &invocation.RPCInvocation{})
assert.Equal(t, 1, len(fileredInvokers1))
assert.Equal(t, 0, len(fileredInvokers2))
assert.Equal(t, 0, len(fileredInvokers3))
assert.Equal(t, 1, len(fileredInvokers4))
assert.Equal(t, 2, len(fileredInvokers5))
assert.Equal(t, 1, len(fileredInvokers6))
ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret3 := router3.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret4 := router4.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret5 := router5.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret6 := router6.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
assert.Equal(t, 1, len(ret1.ToArray()))
assert.Equal(t, 0, len(ret2.ToArray()))
assert.Equal(t, 0, len(ret3.ToArray()))
assert.Equal(t, 1, len(ret4.ToArray()))
assert.Equal(t, 2, len(ret5.ToArray()))
assert.Equal(t, 1, len(ret6.ToArray()))
}
func TestRoute_methodRoute(t *testing.T) {
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := router.(*ConditionRouter).MatchWhen(&url, inv)
matchWhen := r.(*ConditionRouter).MatchWhen(&url, inv)
assert.Equal(t, true, matchWhen)
url1, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
matchWhen = router.(*ConditionRouter).MatchWhen(&url1, inv)
matchWhen = r.(*ConditionRouter).MatchWhen(&url1, inv)
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
......@@ -225,9 +228,9 @@ func TestRoute_ReturnFalse(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnEmpty(t *testing.T) {
......@@ -237,9 +240,9 @@ func TestRoute_ReturnEmpty(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnAll(t *testing.T) {
......@@ -253,9 +256,9 @@ func TestRoute_ReturnAll(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_HostFilter(t *testing.T) {
......@@ -270,11 +273,11 @@ func TestRoute_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_Empty_HostFilter(t *testing.T) {
......@@ -289,11 +292,11 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_False_HostFilter(t *testing.T) {
......@@ -308,11 +311,11 @@ func TestRoute_False_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_Placeholder(t *testing.T) {
......@@ -327,11 +330,11 @@ func TestRoute_Placeholder(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_NoForce(t *testing.T) {
......@@ -346,9 +349,9 @@ func TestRoute_NoForce(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_Force(t *testing.T) {
......@@ -363,9 +366,9 @@ func TestRoute_Force(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 0, len(fileredInvokers.ToArray()))
}
func TestNewConditionRouterFactory(t *testing.T) {
......@@ -377,3 +380,7 @@ func TestNewAppRouterFactory(t *testing.T) {
factory := newAppRouterFactory()
assert.NotNil(t, factory)
}
func setUpAddrCache(addrs []protocol.Invoker) router.Cache {
return chain.BuildCache(addrs)
}
......@@ -22,10 +22,12 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
......@@ -64,14 +66,14 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
l.priority = listenableRouterDefaultPriority
routerKey := ruleKey + constant.ConditionRouterRuleSuffix
//add listener
// add listener
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
return nil, perrors.Errorf("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
}
dynamicConfiguration.AddListener(routerKey, l)
//get rule
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
return nil, perrors.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
......@@ -129,13 +131,13 @@ func (l *listenableRouter) generateConditions(rule *RouterRule) {
}
// Route Determine the target invokers list.
func (l *listenableRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 || len(l.conditionRouters) == 0 {
func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() || len(l.conditionRouters) == 0 {
return invokers
}
//We will check enabled status inside each router.
// We will check enabled status inside each router.
for _, r := range l.conditionRouters {
invokers = r.Route(invokers, url, invocation)
invokers = r.Route(invokers, cache, url, invocation)
}
return invokers
}
......
......@@ -23,20 +23,23 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/net"
)
const (
//pattern route pattern regex
// pattern route pattern regex
pattern = `([&!=,]*)\\s*([^&!=,\\s]+)`
)
......@@ -148,29 +151,36 @@ func (c *ConditionRouter) Enabled() bool {
}
// Route Determine the target invokers list.
func (c *ConditionRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.Enabled() {
return invokers
}
if len(invokers) == 0 {
if invokers.IsEmpty() {
return invokers
}
isMatchWhen := c.MatchWhen(url, invocation)
if !isMatchWhen {
return invokers
}
var result []protocol.Invoker
if len(c.ThenCondition) == 0 {
return result
return utils.EmptyAddr
}
for _, invoker := range invokers {
result := roaring.NewBitmap()
for iter := invokers.Iterator(); iter.HasNext(); {
index := iter.Next()
invoker := cache.GetInvokers()[index]
invokerUrl := invoker.GetUrl()
isMatchThen := c.MatchThen(&invokerUrl, url)
if isMatchThen {
result = append(result, invoker)
result.Add(index)
}
}
if len(result) > 0 {
if !result.IsEmpty() {
return result
} else if c.Force {
rule, _ := url.GetParamAndDecoded(constant.RULE_KEY)
......@@ -178,6 +188,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url *common.URL, in
logger.Warnf("The route result is empty and force execute. consumer: %s, service: %s, router: %s", localIP, url.Service(), rule)
return result
}
return invokers
}
......
......@@ -17,8 +17,13 @@
package healthcheck
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
......@@ -28,6 +33,8 @@ import (
const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
healthy = "healthy"
name = "health-check-router"
)
// HealthCheckRouter provides a health-first routing mechanism through HealthChecker
......@@ -51,25 +58,48 @@ func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) {
}
// Route gets a list of healthy invoker
func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !r.enabled {
return invokers
}
healthyInvokers := make([]protocol.Invoker, 0, len(invokers))
addrPool := cache.FindAddrPool(r)
// Add healthy invoker to the list
for _, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
healthyInvokers = append(healthyInvokers, invoker)
}
}
// If all Invoke are considered unhealthy, downgrade to all inovker
if len(healthyInvokers) == 0 {
healthyInvokers := utils.JoinIfNotEqual(addrPool[healthy], invokers)
// If all invokers are considered unhealthy, downgrade to all invoker
if healthyInvokers.IsEmpty() {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
}
return healthyInvokers
}
// Pool separates healthy invokers from others.
func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
if !r.enabled {
return nil, nil
}
rb := make(router.AddrPool, 8)
rb[healthy] = roaring.NewBitmap()
for i, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
rb[healthy].Add(uint32(i))
}
}
return rb, nil
}
// ShouldPool will always return true to make sure healthy check constantly.
func (r *HealthCheckRouter) ShouldPool() bool {
return r.enabled
}
func (r *HealthCheckRouter) Name() string {
return name
}
// Priority
func (r *HealthCheckRouter) Priority() int64 {
return 0
......
......@@ -29,6 +29,9 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -59,25 +62,25 @@ func TestHealthCheckRouterRoute(t *testing.T) {
invoker3 := NewMockInvoker(url3)
invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil)
res := hcr.Route(invokers, &consumerURL, inv)
res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// now all invokers are healthy
assert.True(t, len(res) == len(invokers))
assert.True(t, len(res.ToArray()) == len(invokers))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// invokers1 is unhealthy now
assert.True(t, len(res) == 2 && !contains(res, invoker1))
assert.True(t, len(res.ToArray()) == 2 && !res.Contains(0))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
request(url2, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// only invokers3 is healthy now
assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2))
assert.True(t, len(res.ToArray()) == 1 && !res.Contains(0) && !res.Contains(1))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
......@@ -85,37 +88,28 @@ func TestHealthCheckRouterRoute(t *testing.T) {
request(url3, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// now all invokers are unhealthy, so downgraded to all
assert.True(t, len(res) == 3)
assert.True(t, len(res.ToArray()) == 3)
// reset the invoker1 successive failed count, so invoker1 go to healthy
request(url1, healthCheckRouteMethodNameTest, 0, false, true)
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
assert.True(t, res.Contains(0))
for i := 0; i < 6; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
// now all invokers are unhealthy, so downgraded to all again
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, len(res) == 3)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
assert.True(t, len(res.ToArray()) == 3)
time.Sleep(time.Second * 2)
// invoker1 go to healthy again after 2s
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
assert.True(t, res.Contains(0))
}
func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool {
for _, e := range invokers {
if e == invoker {
return true
}
}
return false
}
func TestNewHealthCheckRouter(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
......@@ -143,3 +137,11 @@ func TestNewHealthCheckRouter(t *testing.T) {
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500))
}
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := chain.BuildCache(addrs)
cache.SetAddrMeta(r.Name(), info)
cache.SetAddrPool(r.Name(), pool)
return cache
}
......@@ -17,6 +17,10 @@
package router
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
......@@ -38,7 +42,8 @@ type FilePriorityRouterFactory interface {
// Router
type router interface {
// Route Determine the target invokers list.
Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker
Route(*roaring.Bitmap, Cache, *common.URL, protocol.Invocation) *roaring.Bitmap
// URL Return URL in router
URL() common.URL
}
......@@ -51,10 +56,38 @@ type PriorityRouter interface {
Priority() int64
}
// NotifyRouter notify router use the invoker list. Invoker list may change from time to time. This method gives the router a
// chance to prepare before {@link Router#route(List, URL, Invocation)} gets called.
type NotifyRouter interface {
PriorityRouter
// Notify notify whenever addresses in registry change
Notify([]protocol.Invoker)
// Poolable caches address pool and address metadata for a router instance which will be used later in Router's Route.
type Poolable interface {
// Pool created address pool and address metadata from the invokers.
Pool([]protocol.Invoker) (AddrPool, AddrMetadata)
// ShouldPool returns if it should pool. One typical scenario is a router rule changes, in this case, a pooling
// is necessary, even if the addresses not changed at all.
ShouldPool() bool
// Name return the Poolable's name.
Name() string
}
// AddrPool is an address pool, backed by a snapshot of address list, divided into categories.
type AddrPool map[string]*roaring.Bitmap
// AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable.
type AddrMetadata interface {
// Source indicates where the metadata comes from.
Source() string
}
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type Cache interface {
// GetInvokers returns the snapshot of received invokers.
GetInvokers() []protocol.Invoker
// FindAddrPool returns address pool associated with the given Poolable instance.
FindAddrPool(Poolable) AddrPool
// FindAddrMeta returns address metadata associated with the given Poolable instance.
FindAddrMeta(Poolable) AddrMetadata
}
......@@ -24,10 +24,12 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -74,9 +76,10 @@ func (f *FileTagRouter) Priority() int64 {
return f.router.priority
}
func (f *FileTagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 {
func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() {
return invokers
}
return f.Route(invokers, url, invocation)
// FIXME: I believe this is incorrect.
return f.Route(invokers, cache, url, invocation)
}
......@@ -41,8 +41,8 @@ import (
type RouterRule struct {
router.BaseRouterRule `yaml:",inline"`
Tags []Tag
addressToTagNames map[string][]string
tagNameToAddresses map[string][]string
AddressToTagNames map[string][]string
TagNameToAddresses map[string][]string
}
func getRule(rawRule string) (*RouterRule, error) {
......@@ -58,13 +58,13 @@ func getRule(rawRule string) (*RouterRule, error) {
// parseTags use for flattening tags data to @addressToTagNames and @tagNameToAddresses
func (t *RouterRule) parseTags() {
t.addressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.tagNameToAddresses = make(map[string][]string, len(t.Tags))
t.AddressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.TagNameToAddresses = make(map[string][]string, len(t.Tags))
for _, tag := range t.Tags {
for _, address := range tag.Addresses {
t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name)
t.AddressToTagNames[address] = append(t.AddressToTagNames[address], tag.Name)
}
t.tagNameToAddresses[tag.Name] = tag.Addresses
t.TagNameToAddresses[tag.Name] = tag.Addresses
}
}
......@@ -85,15 +85,15 @@ func (t *RouterRule) getTagNames() []string {
}
func (t *RouterRule) hasTag(tag string) bool {
return len(t.tagNameToAddresses[tag]) > 0
return len(t.TagNameToAddresses[tag]) > 0
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
return t.addressToTagNames
return t.AddressToTagNames
}
func (t *RouterRule) getTagNameToAddresses() map[string][]string {
return t.tagNameToAddresses
return t.TagNameToAddresses
}
func (t *RouterRule) getTags() []Tag {
......
......@@ -18,16 +18,20 @@
package tag
import (
"net"
"strconv"
"strings"
"sync"
)
import (
"github.com/RoaringBitmap/roaring"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
......@@ -37,6 +41,32 @@ import (
"github.com/apache/dubbo-go/remoting"
)
const (
name = "tag-router"
staticPrefix = "static-"
dynamicPrefix = "dynamic-"
)
// addrMetadata keeps snapshot data for app name, and some infos extracted from dynamic tag rule in order to make
// Route() method lock-free.
type addrMetadata struct {
// application name
application string
// is rule a runtime rule
ruleRuntime bool
// is rule a force rule
ruleForce bool
// is rule a valid rule
ruleValid bool
// is rule an enabled rule
ruleEnabled bool
}
// Source indicates where the metadata comes from.
func (m *addrMetadata) Source() string {
return name
}
// tagRouter defines url, enable and the priority
type tagRouter struct {
url *common.URL
......@@ -44,6 +74,8 @@ type tagRouter struct {
enabled bool
priority int64
application string
ruleChanged bool
mutex sync.RWMutex
}
// NewTagRouter returns a tagRouter instance if url is not nil
......@@ -63,61 +95,80 @@ func (c *tagRouter) isEnabled() bool {
return c.enabled
}
func (c *tagRouter) tagRouterRuleCopy() RouterRule {
routerRule := *c.tagRouterRule
return routerRule
}
// Route gets a list of invoker
func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
var (
result []protocol.Invoker
addresses []string
)
func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.isEnabled() || invokers.IsEmpty() {
return invokers
}
if shouldUseDynamicTag(cache.FindAddrMeta(c)) {
return c.routeWithDynamicTag(invokers, cache, url, invocation)
}
return c.routeWithStaticTag(invokers, cache, url, invocation)
}
if !c.isEnabled() || len(invokers) == 0 {
// routeWithStaticTag routes with static tag rule
func (c *tagRouter) routeWithStaticTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return invokers
}
// Use static tags if dynamic tags are not set or invalid
if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
return filterUsingStaticTag(invokers, url, invocation)
ret, _ := c.filterWithTag(invokers, cache, staticPrefix+tag)
if ret.IsEmpty() && !isForceUseTag(url, invocation) {
return invokers
}
// since the rule can be changed by config center, we should copy one to use.
tagRouterRuleCopy := c.tagRouterRuleCopy()
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = url.GetParam(constant.Tagkey, "")
return ret
}
// routeWithDynamicTag routes with dynamic tag rule
func (c *tagRouter) routeWithDynamicTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return c.filterNotInDynamicTag(invokers, cache)
}
// if we are requesting for a Provider with a specific tag
if len(tag) > 0 {
return filterInvokersWithTag(invokers, url, invocation, tagRouterRuleCopy, tag)
ret, ok := c.filterWithTag(invokers, cache, dynamicPrefix+tag)
if ok && (!ret.IsEmpty() || isTagRuleForce(cache.FindAddrMeta(c))) {
return ret
}
// return all addresses in dynamic tag group.
addresses = tagRouterRuleCopy.getAddresses()
if len(addresses) > 0 {
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port)
}
result = filterInvoker(invokers, filterAddressNotMatches)
// 1. all addresses are in dynamic tag group, return empty list.
if len(result) == 0 {
return result
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
if ret.IsEmpty() {
ret, _ = c.filterWithTag(invokers, cache, staticPrefix+tag)
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if !ret.IsEmpty() || isForceUseTag(url, invocation) {
return ret
}
return c.filterNotInDynamicTag(invokers, cache)
}
return ret
}
// filterWithTag filters incoming invokers with the given tag
func (c *tagRouter) filterWithTag(invokers *roaring.Bitmap, cache router.Cache, tag string) (*roaring.Bitmap, bool) {
if target, ok := cache.FindAddrPool(c)[tag]; ok {
return utils.JoinIfNotEqual(target, invokers), true
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
filter := func(invoker protocol.Invoker) bool {
localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
return len(localTag) == 0 || !(tagRouterRuleCopy.hasTag(localTag))
return utils.EmptyAddr, false
}
// filterNotInDynamicTag filters incoming invokers not applied to dynamic tag rule
func (c *tagRouter) filterNotInDynamicTag(invokers *roaring.Bitmap, cache router.Cache) *roaring.Bitmap {
// FAILOVER: return all Providers without any tags.
invokers = invokers.Clone()
for k, v := range cache.FindAddrPool(c) {
if strings.HasPrefix(k, dynamicPrefix) {
invokers.AndNot(v)
}
}
return filterInvoker(result, filter)
return invokers
}
// Process parses dynamic tag rule
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
......@@ -135,16 +186,52 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.tagRouterRule = routerRule
return
c.ruleChanged = true
}
// URL gets the url of tagRouter
func (c *tagRouter) URL() common.URL {
return *c.url
}
func (c *tagRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
// Priority gets the priority of tagRouter
func (c *tagRouter) Priority() int64 {
return c.priority
}
// Pool divided invokers into different address pool by tag.
func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
c.fetchRuleIfNecessary(invokers)
rb := make(router.AddrPool, 8)
poolWithStaticTag(invokers, rb)
c.mutex.Lock()
defer c.mutex.Unlock()
poolWithDynamicTag(invokers, c.tagRouterRule, rb)
c.ruleChanged = false
// create metadata in order to avoid lock in route()
meta := addrMetadata{application: c.application}
if c.tagRouterRule != nil {
meta.ruleForce = c.tagRouterRule.Force
meta.ruleEnabled = c.tagRouterRule.Enabled
meta.ruleValid = c.tagRouterRule.Valid
}
return rb, &meta
}
// fetchRuleIfNecessary fetches, parses rule and register listener for the further change
func (c *tagRouter) fetchRuleIfNecessary(invokers []protocol.Invoker) {
if invokers == nil || len(invokers) == 0 {
return
}
invoker := invokers[0]
url := invoker.GetUrl()
url := invokers[0].GetUrl()
providerApplication := url.GetParam(constant.RemoteApplicationKey, "")
if len(providerApplication) == 0 {
logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
......@@ -159,11 +246,15 @@ func (c *tagRouter) Notify(invokers []protocol.Invoker) {
if providerApplication != c.application {
dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c)
} else {
// if app name from URL is as same as the current app name, then it is safe to jump out
return
}
c.application = providerApplication
routerKey := providerApplication + constant.TagRouterRuleSuffix
dynamicConfiguration.AddListener(routerKey, c)
//get rule
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
......@@ -177,71 +268,52 @@ func (c *tagRouter) Notify(invokers []protocol.Invoker) {
}
}
// URL gets the url of tagRouter
func (c *tagRouter) URL() common.URL {
return *c.url
// ShouldPool returns false, to make sure address cache for tag router happens once and only once.
func (c *tagRouter) ShouldPool() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.ruleChanged
}
// Priority gets the priority of tagRouter
func (c *tagRouter) Priority() int64 {
return c.priority
// Name returns pool's name
func (c *tagRouter) Name() string {
return name
}
// filterUsingStaticTag gets a list of invoker using static tag
func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if tag, ok := invocation.Attachments()[constant.Tagkey]; ok {
result := make([]protocol.Invoker, 0, 8)
for _, v := range invokers {
if v.GetUrl().GetParam(constant.Tagkey, "") == tag {
result = append(result, v)
}
}
if len(result) == 0 && !isForceUseTag(url, invocation) {
return invokers
}
return result
// poolWithDynamicTag pools addresses with the tags defined in dynamic tag rule, all keys have prefix "dynamic-"
func poolWithDynamicTag(invokers []protocol.Invoker, rule *RouterRule, pool router.AddrPool) {
if rule == nil {
return
}
return invokers
}
// filterInvokersWithTag gets a list of invoker using dynamic route with tag
func filterInvokersWithTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation, tagRouterRule RouterRule, tag string) []protocol.Invoker {
var (
result []protocol.Invoker
addresses []string
)
addresses, _ = tagRouterRule.getTagNameToAddresses()[tag]
// filter by dynamic tag group first
if len(addresses) > 0 {
filterAddressMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port)
}
result = filterInvoker(invokers, filterAddressMatches)
if len(result) > 0 || tagRouterRule.Force {
return result
}
}
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
filter := func(invoker protocol.Invoker) bool {
return invoker.GetUrl().GetParam(constant.Tagkey, "") == tag
}
result = filterInvoker(invokers, filter)
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if len(result) > 0 || isForceUseTag(url, invocation) {
return result
tagNameToAddresses := rule.getTagNameToAddresses()
for tag, addrs := range tagNameToAddresses {
pool[dynamicPrefix+tag] = addrsToBitmap(addrs, invokers)
}
// FAILOVER: return all Providers without any tags.
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
}
// poolWithStaticTag pools addresses with tags found from incoming URLs, all keys have prefix "static-"
func poolWithStaticTag(invokers []protocol.Invoker, pool router.AddrPool) {
for i, invoker := range invokers {
url := invoker.GetUrl()
return len(addresses) == 0 || !checkAddressMatch(tagRouterRule.getAddresses(), url.Ip, url.Port)
}
filterTagIsEmpty := func(invoker protocol.Invoker) bool {
return invoker.GetUrl().GetParam(constant.Tagkey, "") == ""
tag := url.GetParam(constant.Tagkey, "")
if len(tag) > 0 {
if _, ok := pool[staticPrefix+tag]; !ok {
pool[staticPrefix+tag] = roaring.NewBitmap()
}
pool[staticPrefix+tag].AddInt(i)
}
}
return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
}
// shouldUseDynamicTag uses the snapshot data from the parsed rule to decide if dynamic tag rule should be used or not
func shouldUseDynamicTag(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleValid && meta.(*addrMetadata).ruleEnabled
}
// isTagRuleForce uses the snapshot data from the parsed rule to decide if dynamic tag rule is forced or not
func isTagRuleForce(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleForce
}
// isForceUseTag returns whether force use tag
......@@ -252,31 +324,44 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
return false
}
type filter func(protocol.Invoker) bool
// addrsToBitmap finds indexes for the given IP addresses in the target URL list, if any '0.0.0.0' IP address is met,
// then returns back all indexes of the URLs list.
func addrsToBitmap(addrs []string, invokers []protocol.Invoker) *roaring.Bitmap {
ret := roaring.NewBitmap()
for _, addr := range addrs {
if isAnyHost(addr) {
ret.AddRange(0, uint64(len(invokers)))
return ret
}
func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker {
var res []protocol.Invoker
OUTER:
for _, invoker := range invokers {
for _, filter := range filters {
if !filter(invoker) {
continue OUTER
}
index := findIndexWithIp(addr, invokers)
if index != -1 {
ret.AddInt(index)
}
res = append(res, invoker)
}
return res
return ret
}
func checkAddressMatch(addresses []string, host, port string) bool {
addr := net.JoinHostPort(constant.ANYHOST_VALUE, port)
for _, address := range addresses {
if gxnet.MatchIP(address, host, port) {
return true
}
if address == addr {
return true
// findIndexWithIp finds index for one particular IP
func findIndexWithIp(addr string, invokers []protocol.Invoker) int {
for i, invoker := range invokers {
if gxnet.MatchIP(addr, invoker.GetUrl().Ip, invoker.GetUrl().Port) {
return i
}
}
return false
return -1
}
// isAnyHost checks if an IP is '0.0.0.0'
func isAnyHost(addr string) bool {
return strings.HasPrefix(addr, constant.ANYHOST_VALUE)
}
// findTag finds tag, first from invocation's attachment, then from URL
func findTag(invocation protocol.Invocation, consumerUrl *common.URL) string {
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = consumerUrl.GetParam(constant.Tagkey, "")
}
return tag
}
......@@ -25,15 +25,18 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
......@@ -143,17 +146,17 @@ func TestTagRouterRouteForce(t *testing.T) {
invokers = append(invokers, inv2, inv3, inv4)
inv := &invocation.RPCInvocation{}
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestHangZhou)
invRst1 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 1, len(invRst1))
assert.Equal(t, tagRouterTestHangZhou, invRst1[0].GetUrl().GetParam(tagRouterTestDubboTag, ""))
invRst1 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 1, len(invRst1.ToArray()))
assert.Equal(t, tagRouterTestHangZhou, invokers[invRst1.ToArray()[0]].GetUrl().GetParam(tagRouterTestDubboTag, ""))
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou)
invRst2 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 0, len(invRst2))
invRst2 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 0, len(invRst2.ToArray()))
inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestFalse)
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou)
invRst3 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 3, len(invRst3))
invRst3 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 3, len(invRst3.ToArray()))
}
func TestTagRouterRouteNoForce(t *testing.T) {
......@@ -175,20 +178,37 @@ func TestTagRouterRouteNoForce(t *testing.T) {
invokers = append(invokers, inv2, inv3, inv4)
inv := &invocation.RPCInvocation{}
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestHangZhou)
invRst := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 1, len(invRst))
assert.Equal(t, tagRouterTestHangZhou, invRst[0].GetUrl().GetParam(tagRouterTestDubboTag, ""))
invRst := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 1, len(invRst.ToArray()))
assert.Equal(t, tagRouterTestHangZhou, invokers[invRst.ToArray()[0]].GetUrl().GetParam(tagRouterTestDubboTag, ""))
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou)
inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestTrue)
invRst1 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 0, len(invRst1))
invRst1 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 0, len(invRst1.ToArray()))
inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestFalse)
invRst2 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 3, len(invRst2))
invRst2 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 3, len(invRst2.ToArray()))
}
func TestFilterInvoker(t *testing.T) {
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := chain.BuildCache(addrs)
cache.SetAddrPool(r.Name(), pool)
cache.SetAddrMeta(r.Name(), info)
return cache
}
func setUpAddrCacheWithRuleDisabled(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
info.(*addrMetadata).ruleEnabled = false
cache := chain.BuildCache(addrs)
cache.SetAddrPool(r.Name(), pool)
cache.SetAddrMeta(r.Name(), info)
return cache
}
func TestRouteBeijingInvoker(t *testing.T) {
u2, e2 := common.NewURL(tagRouterTestHangZhouUrl)
u3, e3 := common.NewURL(tagRouterTestShangHaiUrl)
u4, e4 := common.NewURL(tagRouterTestBeijingUrl)
......@@ -203,23 +223,17 @@ func TestFilterInvoker(t *testing.T) {
inv5 := NewMockInvoker(u5)
var invokers []protocol.Invoker
invokers = append(invokers, inv2, inv3, inv4, inv5)
filterTag := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" {
return true
}
return false
}
res := filterInvoker(invokers, filterTag)
assert.Equal(t, []protocol.Invoker{inv4, inv5}, res)
flag := true
filterEnabled := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParamBool(constant.RouterEnabled, false) == flag {
return true
}
return false
}
res2 := filterInvoker(invokers, filterTag, filterEnabled)
assert.Equal(t, []protocol.Invoker{inv4}, res2)
url, _ := common.NewURL(tagRouterTestBeijingUrl)
tagRouter, _ := NewTagRouter(&url)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(tagRouter, invokers)
inv := &invocation.RPCInvocation{}
res := tagRouter.Route(rb, cache, &url, inv)
// inv4 and inv5
assert.Equal(t, []uint32{2, 3}, res.ToArray())
}
type DynamicTagRouter struct {
......@@ -301,44 +315,51 @@ func (suite *DynamicTagRouter) TearDownTest() {
func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
invokers := suite.invokers
suite.route.Notify(invokers)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(suite.route, invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag1")
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[0])
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(0))
consumer.SetAttachments(tagRouterTestDubboTag, "tag3")
targetInvokers = suite.route.Route(invokers, suite.url, consumer)
suite.Equal(2, len(targetInvokers))
suite.Equal(targetInvokers, []protocol.Invoker{suite.invokers[2], suite.invokers[3]})
targetInvokers = suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(2), targetInvokers.GetCardinality())
suite.True(targetInvokers.Contains(2) && targetInvokers.Contains(3))
}
func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() {
invokers := suite.invokers
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCacheWithRuleDisabled(suite.route, invokers)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag4")
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[3])
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(3))
}
// Teas no tag and return a address are not in dynamic tag group
func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() {
invokers := suite.invokers
suite.route.Notify(invokers)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(suite.route, invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[4])
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(4))
// test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
consumer.SetAttachments(tagRouterTestDubboTag, "tag5")
targetInvokers = suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[4])
targetInvokers = suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(4))
}
func TestProcess(t *testing.T) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/protocol"
)
var EmptyAddr = roaring.NewBitmap()
func JoinIfNotEqual(left *roaring.Bitmap, right *roaring.Bitmap) *roaring.Bitmap {
if !left.Equals(right) {
left = left.Clone()
left.And(right)
}
return left
}
func FallbackIfJoinToEmpty(left *roaring.Bitmap, right *roaring.Bitmap) *roaring.Bitmap {
ret := JoinIfNotEqual(left, right)
if ret == nil || ret.IsEmpty() {
return right
}
return ret
}
func ToBitmap(invokers []protocol.Invoker) *roaring.Bitmap {
bitmap := roaring.NewBitmap()
bitmap.AddRange(0, uint64(len(invokers)))
return bitmap
}
......@@ -21,6 +21,10 @@ import (
"fmt"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/metadata/service"
)
......@@ -36,12 +40,11 @@ func SetMetadataService(msType string, creator func() (service.MetadataService,
}
// GetMetadataService will create a MetadataService instance
// it will panic if msType not found
func GetMetadataService(msType string) (service.MetadataService, error) {
if creator, ok := metadataServiceInsMap[msType]; ok {
return creator()
}
panic(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+
return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", msType))
}
......@@ -60,6 +60,10 @@ type Logger interface {
}
func init() {
// forbidden to executing twice.
if logger != nil {
return
}
logConfFile := os.Getenv(constant.APP_LOG_CONF_FILE)
err := InitLog(logConfFile)
if err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment