Skip to content
Snippets Groups Projects
Commit ada3eb6b authored by haohongfan's avatar haohongfan
Browse files

Merge branch 'refact-seri' of github.com:apache/dubbo-go into refact-seri

parents f86865db d2bdc6f9
No related branches found
No related tags found
No related merge requests found
Showing
with 892 additions and 315 deletions
# Release Notes
---
## 1.5.1
### New Features
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/703)
- [Add TLS support](https://github.com/apache/dubbo-go/pull/685)
- [Add Nearest first for multiple registry](https://github.com/apache/dubbo-go/pull/659)
- [Add application and service level router](https://github.com/apache/dubbo-go/pull/662)
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/665)
### Enhancement
- [Avoid init the log twice](https://github.com/apache/dubbo-go/pull/719)
- [Correct words and format codes](https://github.com/apache/dubbo-go/pull/704)
- [Change log stack level from warn to error](https://github.com/apache/dubbo-go/pull/702)
- [Optimize remotes configuration](https://github.com/apache/dubbo-go/pull/687)
### Bugfixes
- [Fix register service instance after provider config load](https://github.com/apache/dubbo-go/pull/694)
- [Fix call subscribe function asynchronously](https://github.com/apache/dubbo-go/pull/721)
- [Fix tag router rule copy](https://github.com/apache/dubbo-go/pull/721)
- [Fix nacos unit test failed](https://github.com/apache/dubbo-go/pull/705)
- [Fix can not inovke nacos destroy when graceful shutdown](https://github.com/apache/dubbo-go/pull/689)
- [Fix zk lost event](https://github.com/apache/dubbo-go/pull/692)
- [Fix k8s ut bug](https://github.com/apache/dubbo-go/pull/693)
Milestone: [https://github.com/apache/dubbo-go/milestone/2?closed=1](https://github.com/apache/dubbo-go/milestone/2?closed=1)
Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apache/dubbo-go/projects/8)
## 1.5.0
### New Features
......
......@@ -16,6 +16,8 @@ Apache License, Version 2.0
## Release note ##
[v1.5.1 - Aug 23, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.1)
[v1.5.0 - July 24, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.0)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
......
......@@ -39,10 +39,13 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
if len(invokers) > 0 {
url = invokers[0].GetUrl()
}
return &staticDirectory{
dir := &staticDirectory{
BaseDirectory: NewBaseDirectory(&url),
invokers: invokers,
}
dir.routerChain.SetInvokers(invokers)
return dir
}
//for-loop invokers ,if all invokers is available ,then it means directory is available
......@@ -69,7 +72,7 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo
return invokers
}
dirUrl := dir.GetUrl()
return routerChain.Route(invokers, &dirUrl, invocation)
return routerChain.Route(&dirUrl, invocation)
}
// Destroy Destroy
......@@ -92,6 +95,7 @@ func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error
if e != nil {
return e
}
routerChain.SetInvokers(dir.invokers)
dir.SetRouterChain(routerChain)
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package router
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
// Chain
type Chain interface {
Route(*common.URL, protocol.Invocation) []protocol.Invoker
// Refresh invokers
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
}
......@@ -18,9 +18,10 @@
package chain
import (
"math"
"sort"
"sync"
"sync/atomic"
"time"
)
import (
......@@ -30,11 +31,18 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
)
// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
......@@ -48,30 +56,38 @@ type RouterChain struct {
mutex sync.RWMutex
url common.URL
// The times of address notification since last update for address cache
count int64
// The timestamp of last update for address cache
last time.Time
// Channel for notify to update the address cache
notify chan struct{}
// Address cache
cache atomic.Value
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invokers
l := len(c.routers)
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}
for _, r := range rs {
finalInvokers = r.Route(finalInvokers, url, invocation)
bitmap := cache.bitmap
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
return finalInvokers
}
// SetInvokers notify router chain of the initial addresses from registry at the first time. Notify whenever addresses in registry change.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
for _, r := range c.routers {
if notifyRouter, ok := r.(router.NotifyRouter); ok {
notifyRouter.Notify(invokers)
}
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
}
return finalInvokers
}
// AddRouters Add routers to router chain
......@@ -88,6 +104,116 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.routers = newRouters
}
// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
// time interval exceeds timeThreshold since last cache update, then notify to update the cache.
func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.mutex.Lock()
c.invokers = invokers
c.mutex.Unlock()
c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
}
// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
func (c *RouterChain) loop() {
for {
ticker := time.NewTicker(timeInterval)
select {
case <-ticker.C:
c.buildCache()
case <-c.notify:
c.buildCache()
}
}
}
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
defer c.mutex.RUnlock()
ret := make([]router.PriorityRouter, 0, len(c.routers))
ret = append(ret, c.routers...)
return ret
}
// copyInvokers copies a snapshot of the received invokers.
func (c *RouterChain) copyInvokers() []protocol.Invoker {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.invokers == nil || len(c.invokers) == 0 {
return nil
}
ret := make([]protocol.Invoker, 0, len(c.invokers))
ret = append(ret, c.invokers...)
return ret
}
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}
return v.(*InvokerCache)
}
// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}
c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}
// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}
var (
mutex sync.Mutex
wg sync.WaitGroup
)
cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()
c.cache.Store(cache)
}
// URL Return URL in RouterChain
func (c *RouterChain) URL() common.URL {
return c.url
......@@ -118,14 +244,62 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
if url != nil {
chain.url = *url
}
go chain.loop()
return chain, nil
}
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}
logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
return true
}
for _, r := range right {
found := false
for _, l := range left {
if common.IsEquals(l.GetUrl(), r.GetUrl(), constant.TIMESTAMP_KEY, constant.REMOTE_TIMESTAMP_KEY) {
found = true
break
}
}
if !found {
return true
}
}
return false
}
// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
......
......@@ -172,12 +172,12 @@ func TestRouterChainRoute(t *testing.T) {
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 1, len(finalInvokers))
}
......@@ -213,10 +213,12 @@ conditions:
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
......@@ -236,13 +238,16 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
finalInvokers := chain.Route(&targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chain
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/protocol"
)
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type InvokerCache struct {
// The snapshot of invokers
invokers []protocol.Invoker
// The bitmap representation for invokers snapshot
bitmap *roaring.Bitmap
// Address pool from routers which implement Poolable
pools map[string]router.AddrPool
// Address metadata from routers which implement Poolable
metadatas map[string]router.AddrMetadata
}
// BuildCache builds address cache from the given invokers.
func BuildCache(invokers []protocol.Invoker) *InvokerCache {
return &InvokerCache{
invokers: invokers,
bitmap: utils.ToBitmap(invokers),
pools: make(map[string]router.AddrPool, 8),
metadatas: make(map[string]router.AddrMetadata, 8),
}
}
// GetInvokers get invokers snapshot.
func (c *InvokerCache) GetInvokers() []protocol.Invoker {
return c.invokers
}
// FindAddrPool finds address pool for a poolable router.
func (c *InvokerCache) FindAddrPool(p router.Poolable) router.AddrPool {
return c.pools[p.Name()]
}
// FindAddrMeta finds address metadata for a poolable router.
func (c *InvokerCache) FindAddrMeta(p router.Poolable) router.AddrMetadata {
return c.metadatas[p.Name()]
}
// SetAddrPool sets address pool for a poolable router, for unit test only
func (c *InvokerCache) SetAddrPool(name string, pool router.AddrPool) {
c.pools[name] = pool
}
// SetAddrMeta sets address metadata for a poolable router, for unit test only
func (c *InvokerCache) SetAddrMeta(name string, meta router.AddrMetadata) {
c.metadatas[name] = meta
}
......@@ -32,6 +32,9 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
......@@ -180,30 +183,30 @@ func TestRoute_matchFilter(t *testing.T) {
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers4 := router4.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers5 := router5.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers6 := router6.Route(invokers, &cUrl, &invocation.RPCInvocation{})
assert.Equal(t, 1, len(fileredInvokers1))
assert.Equal(t, 0, len(fileredInvokers2))
assert.Equal(t, 0, len(fileredInvokers3))
assert.Equal(t, 1, len(fileredInvokers4))
assert.Equal(t, 2, len(fileredInvokers5))
assert.Equal(t, 1, len(fileredInvokers6))
ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret3 := router3.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret4 := router4.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret5 := router5.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
ret6 := router6.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &cUrl, &invocation.RPCInvocation{})
assert.Equal(t, 1, len(ret1.ToArray()))
assert.Equal(t, 0, len(ret2.ToArray()))
assert.Equal(t, 0, len(ret3.ToArray()))
assert.Equal(t, 1, len(ret4.ToArray()))
assert.Equal(t, 2, len(ret5.ToArray()))
assert.Equal(t, 1, len(ret6.ToArray()))
}
func TestRoute_methodRoute(t *testing.T) {
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := router.(*ConditionRouter).MatchWhen(&url, inv)
matchWhen := r.(*ConditionRouter).MatchWhen(&url, inv)
assert.Equal(t, true, matchWhen)
url1, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
matchWhen = router.(*ConditionRouter).MatchWhen(&url1, inv)
matchWhen = r.(*ConditionRouter).MatchWhen(&url1, inv)
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
......@@ -225,9 +228,9 @@ func TestRoute_ReturnFalse(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnEmpty(t *testing.T) {
......@@ -237,9 +240,9 @@ func TestRoute_ReturnEmpty(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnAll(t *testing.T) {
......@@ -253,9 +256,9 @@ func TestRoute_ReturnAll(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_HostFilter(t *testing.T) {
......@@ -270,11 +273,11 @@ func TestRoute_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_Empty_HostFilter(t *testing.T) {
......@@ -289,11 +292,11 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_False_HostFilter(t *testing.T) {
......@@ -308,11 +311,11 @@ func TestRoute_False_HostFilter(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_Placeholder(t *testing.T) {
......@@ -327,11 +330,11 @@ func TestRoute_Placeholder(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_NoForce(t *testing.T) {
......@@ -346,9 +349,9 @@ func TestRoute_NoForce(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_Force(t *testing.T) {
......@@ -363,9 +366,9 @@ func TestRoute_Force(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), &curl, inv)
assert.Equal(t, 0, len(fileredInvokers.ToArray()))
}
func TestNewConditionRouterFactory(t *testing.T) {
......@@ -377,3 +380,7 @@ func TestNewAppRouterFactory(t *testing.T) {
factory := newAppRouterFactory()
assert.NotNil(t, factory)
}
func setUpAddrCache(addrs []protocol.Invoker) router.Cache {
return chain.BuildCache(addrs)
}
......@@ -22,10 +22,12 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
......@@ -64,14 +66,14 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) {
l.priority = listenableRouterDefaultPriority
routerKey := ruleKey + constant.ConditionRouterRuleSuffix
//add listener
// add listener
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
return nil, perrors.Errorf("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
}
dynamicConfiguration.AddListener(routerKey, l)
//get rule
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
return nil, perrors.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
......@@ -129,13 +131,13 @@ func (l *listenableRouter) generateConditions(rule *RouterRule) {
}
// Route Determine the target invokers list.
func (l *listenableRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 || len(l.conditionRouters) == 0 {
func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() || len(l.conditionRouters) == 0 {
return invokers
}
//We will check enabled status inside each router.
// We will check enabled status inside each router.
for _, r := range l.conditionRouters {
invokers = r.Route(invokers, url, invocation)
invokers = r.Route(invokers, cache, url, invocation)
}
return invokers
}
......
......@@ -23,20 +23,23 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/net"
)
const (
//pattern route pattern regex
// pattern route pattern regex
pattern = `([&!=,]*)\\s*([^&!=,\\s]+)`
)
......@@ -148,29 +151,36 @@ func (c *ConditionRouter) Enabled() bool {
}
// Route Determine the target invokers list.
func (c *ConditionRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.Enabled() {
return invokers
}
if len(invokers) == 0 {
if invokers.IsEmpty() {
return invokers
}
isMatchWhen := c.MatchWhen(url, invocation)
if !isMatchWhen {
return invokers
}
var result []protocol.Invoker
if len(c.ThenCondition) == 0 {
return result
return utils.EmptyAddr
}
for _, invoker := range invokers {
result := roaring.NewBitmap()
for iter := invokers.Iterator(); iter.HasNext(); {
index := iter.Next()
invoker := cache.GetInvokers()[index]
invokerUrl := invoker.GetUrl()
isMatchThen := c.MatchThen(&invokerUrl, url)
if isMatchThen {
result = append(result, invoker)
result.Add(index)
}
}
if len(result) > 0 {
if !result.IsEmpty() {
return result
} else if c.Force {
rule, _ := url.GetParamAndDecoded(constant.RULE_KEY)
......@@ -178,6 +188,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url *common.URL, in
logger.Warnf("The route result is empty and force execute. consumer: %s, service: %s, router: %s", localIP, url.Service(), rule)
return result
}
return invokers
}
......
......@@ -17,8 +17,13 @@
package healthcheck
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
......@@ -28,6 +33,8 @@ import (
const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
healthy = "healthy"
name = "health-check-router"
)
// HealthCheckRouter provides a health-first routing mechanism through HealthChecker
......@@ -51,25 +58,48 @@ func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) {
}
// Route gets a list of healthy invoker
func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
func (r *HealthCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !r.enabled {
return invokers
}
healthyInvokers := make([]protocol.Invoker, 0, len(invokers))
addrPool := cache.FindAddrPool(r)
// Add healthy invoker to the list
for _, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
healthyInvokers = append(healthyInvokers, invoker)
}
}
// If all Invoke are considered unhealthy, downgrade to all inovker
if len(healthyInvokers) == 0 {
healthyInvokers := utils.JoinIfNotEqual(addrPool[healthy], invokers)
// If all invokers are considered unhealthy, downgrade to all invoker
if healthyInvokers.IsEmpty() {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
}
return healthyInvokers
}
// Pool separates healthy invokers from others.
func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
if !r.enabled {
return nil, nil
}
rb := make(router.AddrPool, 8)
rb[healthy] = roaring.NewBitmap()
for i, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
rb[healthy].Add(uint32(i))
}
}
return rb, nil
}
// ShouldPool will always return true to make sure healthy check constantly.
func (r *HealthCheckRouter) ShouldPool() bool {
return r.enabled
}
func (r *HealthCheckRouter) Name() string {
return name
}
// Priority
func (r *HealthCheckRouter) Priority() int64 {
return 0
......
......@@ -29,6 +29,9 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -59,25 +62,25 @@ func TestHealthCheckRouterRoute(t *testing.T) {
invoker3 := NewMockInvoker(url3)
invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil)
res := hcr.Route(invokers, &consumerURL, inv)
res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// now all invokers are healthy
assert.True(t, len(res) == len(invokers))
assert.True(t, len(res.ToArray()) == len(invokers))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// invokers1 is unhealthy now
assert.True(t, len(res) == 2 && !contains(res, invoker1))
assert.True(t, len(res.ToArray()) == 2 && !res.Contains(0))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
request(url2, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// only invokers3 is healthy now
assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2))
assert.True(t, len(res.ToArray()) == 1 && !res.Contains(0) && !res.Contains(1))
for i := 0; i < 10; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
......@@ -85,37 +88,28 @@ func TestHealthCheckRouterRoute(t *testing.T) {
request(url3, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
// now all invokers are unhealthy, so downgraded to all
assert.True(t, len(res) == 3)
assert.True(t, len(res.ToArray()) == 3)
// reset the invoker1 successive failed count, so invoker1 go to healthy
request(url1, healthCheckRouteMethodNameTest, 0, false, true)
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
assert.True(t, res.Contains(0))
for i := 0; i < 6; i++ {
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
// now all invokers are unhealthy, so downgraded to all again
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, len(res) == 3)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
assert.True(t, len(res.ToArray()) == 3)
time.Sleep(time.Second * 2)
// invoker1 go to healthy again after 2s
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*HealthCheckRouter), invokers), &consumerURL, inv)
assert.True(t, res.Contains(0))
}
func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool {
for _, e := range invokers {
if e == invoker {
return true
}
}
return false
}
func TestNewHealthCheckRouter(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
......@@ -143,3 +137,11 @@ func TestNewHealthCheckRouter(t *testing.T) {
assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10))
assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500))
}
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := chain.BuildCache(addrs)
cache.SetAddrMeta(r.Name(), info)
cache.SetAddrPool(r.Name(), pool)
return cache
}
......@@ -17,6 +17,10 @@
package router
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
......@@ -38,7 +42,8 @@ type FilePriorityRouterFactory interface {
// Router
type router interface {
// Route Determine the target invokers list.
Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker
Route(*roaring.Bitmap, Cache, *common.URL, protocol.Invocation) *roaring.Bitmap
// URL Return URL in router
URL() common.URL
}
......@@ -51,10 +56,38 @@ type PriorityRouter interface {
Priority() int64
}
// NotifyRouter notify router use the invoker list. Invoker list may change from time to time. This method gives the router a
// chance to prepare before {@link Router#route(List, URL, Invocation)} gets called.
type NotifyRouter interface {
PriorityRouter
// Notify notify whenever addresses in registry change
Notify([]protocol.Invoker)
// Poolable caches address pool and address metadata for a router instance which will be used later in Router's Route.
type Poolable interface {
// Pool created address pool and address metadata from the invokers.
Pool([]protocol.Invoker) (AddrPool, AddrMetadata)
// ShouldPool returns if it should pool. One typical scenario is a router rule changes, in this case, a pooling
// is necessary, even if the addresses not changed at all.
ShouldPool() bool
// Name return the Poolable's name.
Name() string
}
// AddrPool is an address pool, backed by a snapshot of address list, divided into categories.
type AddrPool map[string]*roaring.Bitmap
// AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable.
type AddrMetadata interface {
// Source indicates where the metadata comes from.
Source() string
}
// Cache caches all addresses relevant info for a snapshot of received invokers. It keeps a snapshot of the received
// address list, and also keeps address pools and address metadata from routers based on the same address snapshot, if
// the router implements Poolable.
type Cache interface {
// GetInvokers returns the snapshot of received invokers.
GetInvokers() []protocol.Invoker
// FindAddrPool returns address pool associated with the given Poolable instance.
FindAddrPool(Poolable) AddrPool
// FindAddrMeta returns address metadata associated with the given Poolable instance.
FindAddrMeta(Poolable) AddrMetadata
}
......@@ -24,10 +24,12 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -74,9 +76,10 @@ func (f *FileTagRouter) Priority() int64 {
return f.router.priority
}
func (f *FileTagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 {
func (f *FileTagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() {
return invokers
}
return f.Route(invokers, url, invocation)
// FIXME: I believe this is incorrect.
return f.Route(invokers, cache, url, invocation)
}
......@@ -41,8 +41,8 @@ import (
type RouterRule struct {
router.BaseRouterRule `yaml:",inline"`
Tags []Tag
addressToTagNames map[string][]string
tagNameToAddresses map[string][]string
AddressToTagNames map[string][]string
TagNameToAddresses map[string][]string
}
func getRule(rawRule string) (*RouterRule, error) {
......@@ -58,13 +58,13 @@ func getRule(rawRule string) (*RouterRule, error) {
// parseTags use for flattening tags data to @addressToTagNames and @tagNameToAddresses
func (t *RouterRule) parseTags() {
t.addressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.tagNameToAddresses = make(map[string][]string, len(t.Tags))
t.AddressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.TagNameToAddresses = make(map[string][]string, len(t.Tags))
for _, tag := range t.Tags {
for _, address := range tag.Addresses {
t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name)
t.AddressToTagNames[address] = append(t.AddressToTagNames[address], tag.Name)
}
t.tagNameToAddresses[tag.Name] = tag.Addresses
t.TagNameToAddresses[tag.Name] = tag.Addresses
}
}
......@@ -85,15 +85,15 @@ func (t *RouterRule) getTagNames() []string {
}
func (t *RouterRule) hasTag(tag string) bool {
return len(t.tagNameToAddresses[tag]) > 0
return len(t.TagNameToAddresses[tag]) > 0
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
return t.addressToTagNames
return t.AddressToTagNames
}
func (t *RouterRule) getTagNameToAddresses() map[string][]string {
return t.tagNameToAddresses
return t.TagNameToAddresses
}
func (t *RouterRule) getTags() []Tag {
......
......@@ -18,16 +18,20 @@
package tag
import (
"net"
"strconv"
"strings"
"sync"
)
import (
"github.com/RoaringBitmap/roaring"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
......@@ -37,6 +41,32 @@ import (
"github.com/apache/dubbo-go/remoting"
)
const (
name = "tag-router"
staticPrefix = "static-"
dynamicPrefix = "dynamic-"
)
// addrMetadata keeps snapshot data for app name, and some infos extracted from dynamic tag rule in order to make
// Route() method lock-free.
type addrMetadata struct {
// application name
application string
// is rule a runtime rule
ruleRuntime bool
// is rule a force rule
ruleForce bool
// is rule a valid rule
ruleValid bool
// is rule an enabled rule
ruleEnabled bool
}
// Source indicates where the metadata comes from.
func (m *addrMetadata) Source() string {
return name
}
// tagRouter defines url, enable and the priority
type tagRouter struct {
url *common.URL
......@@ -44,6 +74,8 @@ type tagRouter struct {
enabled bool
priority int64
application string
ruleChanged bool
mutex sync.RWMutex
}
// NewTagRouter returns a tagRouter instance if url is not nil
......@@ -63,61 +95,80 @@ func (c *tagRouter) isEnabled() bool {
return c.enabled
}
func (c *tagRouter) tagRouterRuleCopy() RouterRule {
routerRule := *c.tagRouterRule
return routerRule
}
// Route gets a list of invoker
func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
var (
result []protocol.Invoker
addresses []string
)
func (c *tagRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.isEnabled() || invokers.IsEmpty() {
return invokers
}
if shouldUseDynamicTag(cache.FindAddrMeta(c)) {
return c.routeWithDynamicTag(invokers, cache, url, invocation)
}
return c.routeWithStaticTag(invokers, cache, url, invocation)
}
if !c.isEnabled() || len(invokers) == 0 {
// routeWithStaticTag routes with static tag rule
func (c *tagRouter) routeWithStaticTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return invokers
}
// Use static tags if dynamic tags are not set or invalid
if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
return filterUsingStaticTag(invokers, url, invocation)
ret, _ := c.filterWithTag(invokers, cache, staticPrefix+tag)
if ret.IsEmpty() && !isForceUseTag(url, invocation) {
return invokers
}
// since the rule can be changed by config center, we should copy one to use.
tagRouterRuleCopy := c.tagRouterRuleCopy()
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = url.GetParam(constant.Tagkey, "")
return ret
}
// routeWithDynamicTag routes with dynamic tag rule
func (c *tagRouter) routeWithDynamicTag(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
tag := findTag(invocation, url)
if tag == "" {
return c.filterNotInDynamicTag(invokers, cache)
}
// if we are requesting for a Provider with a specific tag
if len(tag) > 0 {
return filterInvokersWithTag(invokers, url, invocation, tagRouterRuleCopy, tag)
ret, ok := c.filterWithTag(invokers, cache, dynamicPrefix+tag)
if ok && (!ret.IsEmpty() || isTagRuleForce(cache.FindAddrMeta(c))) {
return ret
}
// return all addresses in dynamic tag group.
addresses = tagRouterRuleCopy.getAddresses()
if len(addresses) > 0 {
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port)
}
result = filterInvoker(invokers, filterAddressNotMatches)
// 1. all addresses are in dynamic tag group, return empty list.
if len(result) == 0 {
return result
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
if ret.IsEmpty() {
ret, _ = c.filterWithTag(invokers, cache, staticPrefix+tag)
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if !ret.IsEmpty() || isForceUseTag(url, invocation) {
return ret
}
return c.filterNotInDynamicTag(invokers, cache)
}
return ret
}
// filterWithTag filters incoming invokers with the given tag
func (c *tagRouter) filterWithTag(invokers *roaring.Bitmap, cache router.Cache, tag string) (*roaring.Bitmap, bool) {
if target, ok := cache.FindAddrPool(c)[tag]; ok {
return utils.JoinIfNotEqual(target, invokers), true
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
filter := func(invoker protocol.Invoker) bool {
localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
return len(localTag) == 0 || !(tagRouterRuleCopy.hasTag(localTag))
return utils.EmptyAddr, false
}
// filterNotInDynamicTag filters incoming invokers not applied to dynamic tag rule
func (c *tagRouter) filterNotInDynamicTag(invokers *roaring.Bitmap, cache router.Cache) *roaring.Bitmap {
// FAILOVER: return all Providers without any tags.
invokers = invokers.Clone()
for k, v := range cache.FindAddrPool(c) {
if strings.HasPrefix(k, dynamicPrefix) {
invokers.AndNot(v)
}
}
return filterInvoker(result, filter)
return invokers
}
// Process parses dynamic tag rule
func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
......@@ -135,16 +186,53 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.tagRouterRule = routerRule
c.ruleChanged = true
return
}
func (c *tagRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
// URL gets the url of tagRouter
func (c *tagRouter) URL() common.URL {
return *c.url
}
// Priority gets the priority of tagRouter
func (c *tagRouter) Priority() int64 {
return c.priority
}
// Pool divided invokers into different address pool by tag.
func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
c.fetchRuleIfNecessary(invokers)
rb := make(router.AddrPool, 8)
poolWithStaticTag(invokers, rb)
c.mutex.Lock()
defer c.mutex.Unlock()
poolWithDynamicTag(invokers, c.tagRouterRule, rb)
c.ruleChanged = false
// create metadata in order to avoid lock in route()
meta := addrMetadata{application: c.application}
if c.tagRouterRule != nil {
meta.ruleForce = c.tagRouterRule.Force
meta.ruleEnabled = c.tagRouterRule.Enabled
meta.ruleValid = c.tagRouterRule.Valid
}
return rb, &meta
}
// fetchRuleIfNecessary fetches, parses rule and register listener for the further change
func (c *tagRouter) fetchRuleIfNecessary(invokers []protocol.Invoker) {
if invokers == nil || len(invokers) == 0 {
return
}
invoker := invokers[0]
url := invoker.GetUrl()
url := invokers[0].GetUrl()
providerApplication := url.GetParam(constant.RemoteApplicationKey, "")
if len(providerApplication) == 0 {
logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
......@@ -159,11 +247,15 @@ func (c *tagRouter) Notify(invokers []protocol.Invoker) {
if providerApplication != c.application {
dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c)
} else {
// if app name from URL is as same as the current app name, then it is safe to jump out
return
}
c.application = providerApplication
routerKey := providerApplication + constant.TagRouterRuleSuffix
dynamicConfiguration.AddListener(routerKey, c)
//get rule
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
......@@ -177,71 +269,52 @@ func (c *tagRouter) Notify(invokers []protocol.Invoker) {
}
}
// URL gets the url of tagRouter
func (c *tagRouter) URL() common.URL {
return *c.url
// ShouldPool returns false, to make sure address cache for tag router happens once and only once.
func (c *tagRouter) ShouldPool() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.ruleChanged
}
// Priority gets the priority of tagRouter
func (c *tagRouter) Priority() int64 {
return c.priority
// Name returns pool's name
func (c *tagRouter) Name() string {
return name
}
// filterUsingStaticTag gets a list of invoker using static tag
func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if tag, ok := invocation.Attachments()[constant.Tagkey]; ok {
result := make([]protocol.Invoker, 0, 8)
for _, v := range invokers {
if v.GetUrl().GetParam(constant.Tagkey, "") == tag {
result = append(result, v)
}
}
if len(result) == 0 && !isForceUseTag(url, invocation) {
return invokers
}
return result
// poolWithDynamicTag pools addresses with the tags defined in dynamic tag rule, all keys have prefix "dynamic-"
func poolWithDynamicTag(invokers []protocol.Invoker, rule *RouterRule, pool router.AddrPool) {
if rule == nil {
return
}
return invokers
}
// filterInvokersWithTag gets a list of invoker using dynamic route with tag
func filterInvokersWithTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation, tagRouterRule RouterRule, tag string) []protocol.Invoker {
var (
result []protocol.Invoker
addresses []string
)
addresses, _ = tagRouterRule.getTagNameToAddresses()[tag]
// filter by dynamic tag group first
if len(addresses) > 0 {
filterAddressMatches := func(invoker protocol.Invoker) bool {
url := invoker.GetUrl()
return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port)
}
result = filterInvoker(invokers, filterAddressMatches)
if len(result) > 0 || tagRouterRule.Force {
return result
}
}
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
filter := func(invoker protocol.Invoker) bool {
return invoker.GetUrl().GetParam(constant.Tagkey, "") == tag
}
result = filterInvoker(invokers, filter)
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if len(result) > 0 || isForceUseTag(url, invocation) {
return result
tagNameToAddresses := rule.getTagNameToAddresses()
for tag, addrs := range tagNameToAddresses {
pool[dynamicPrefix+tag] = addrsToBitmap(addrs, invokers)
}
// FAILOVER: return all Providers without any tags.
filterAddressNotMatches := func(invoker protocol.Invoker) bool {
}
// poolWithStaticTag pools addresses with tags found from incoming URLs, all keys have prefix "static-"
func poolWithStaticTag(invokers []protocol.Invoker, pool router.AddrPool) {
for i, invoker := range invokers {
url := invoker.GetUrl()
return len(addresses) == 0 || !checkAddressMatch(tagRouterRule.getAddresses(), url.Ip, url.Port)
}
filterTagIsEmpty := func(invoker protocol.Invoker) bool {
return invoker.GetUrl().GetParam(constant.Tagkey, "") == ""
tag := url.GetParam(constant.Tagkey, "")
if len(tag) > 0 {
if _, ok := pool[staticPrefix+tag]; !ok {
pool[staticPrefix+tag] = roaring.NewBitmap()
}
pool[staticPrefix+tag].AddInt(i)
}
}
return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
}
// shouldUseDynamicTag uses the snapshot data from the parsed rule to decide if dynamic tag rule should be used or not
func shouldUseDynamicTag(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleValid && meta.(*addrMetadata).ruleEnabled
}
// isTagRuleForce uses the snapshot data from the parsed rule to decide if dynamic tag rule is forced or not
func isTagRuleForce(meta router.AddrMetadata) bool {
return meta.(*addrMetadata).ruleForce
}
// isForceUseTag returns whether force use tag
......@@ -252,31 +325,44 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
return false
}
type filter func(protocol.Invoker) bool
// addrsToBitmap finds indexes for the given IP addresses in the target URL list, if any '0.0.0.0' IP address is met,
// then returns back all indexes of the URLs list.
func addrsToBitmap(addrs []string, invokers []protocol.Invoker) *roaring.Bitmap {
ret := roaring.NewBitmap()
for _, addr := range addrs {
if isAnyHost(addr) {
ret.AddRange(0, uint64(len(invokers)))
return ret
}
func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker {
var res []protocol.Invoker
OUTER:
for _, invoker := range invokers {
for _, filter := range filters {
if !filter(invoker) {
continue OUTER
}
index := findIndexWithIp(addr, invokers)
if index != -1 {
ret.AddInt(index)
}
res = append(res, invoker)
}
return res
return ret
}
func checkAddressMatch(addresses []string, host, port string) bool {
addr := net.JoinHostPort(constant.ANYHOST_VALUE, port)
for _, address := range addresses {
if gxnet.MatchIP(address, host, port) {
return true
}
if address == addr {
return true
// findIndexWithIp finds index for one particular IP
func findIndexWithIp(addr string, invokers []protocol.Invoker) int {
for i, invoker := range invokers {
if gxnet.MatchIP(addr, invoker.GetUrl().Ip, invoker.GetUrl().Port) {
return i
}
}
return false
return -1
}
// isAnyHost checks if an IP is '0.0.0.0'
func isAnyHost(addr string) bool {
return strings.HasPrefix(addr, constant.ANYHOST_VALUE)
}
// findTag finds tag, first from invocation's attachment, then from URL
func findTag(invocation protocol.Invocation, consumerUrl *common.URL) string {
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
tag = consumerUrl.GetParam(constant.Tagkey, "")
}
return tag
}
......@@ -25,15 +25,18 @@ import (
)
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
......@@ -143,17 +146,17 @@ func TestTagRouterRouteForce(t *testing.T) {
invokers = append(invokers, inv2, inv3, inv4)
inv := &invocation.RPCInvocation{}
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestHangZhou)
invRst1 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 1, len(invRst1))
assert.Equal(t, tagRouterTestHangZhou, invRst1[0].GetUrl().GetParam(tagRouterTestDubboTag, ""))
invRst1 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 1, len(invRst1.ToArray()))
assert.Equal(t, tagRouterTestHangZhou, invokers[invRst1.ToArray()[0]].GetUrl().GetParam(tagRouterTestDubboTag, ""))
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou)
invRst2 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 0, len(invRst2))
invRst2 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 0, len(invRst2.ToArray()))
inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestFalse)
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou)
invRst3 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 3, len(invRst3))
invRst3 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 3, len(invRst3.ToArray()))
}
func TestTagRouterRouteNoForce(t *testing.T) {
......@@ -175,20 +178,37 @@ func TestTagRouterRouteNoForce(t *testing.T) {
invokers = append(invokers, inv2, inv3, inv4)
inv := &invocation.RPCInvocation{}
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestHangZhou)
invRst := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 1, len(invRst))
assert.Equal(t, tagRouterTestHangZhou, invRst[0].GetUrl().GetParam(tagRouterTestDubboTag, ""))
invRst := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 1, len(invRst.ToArray()))
assert.Equal(t, tagRouterTestHangZhou, invokers[invRst.ToArray()[0]].GetUrl().GetParam(tagRouterTestDubboTag, ""))
inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou)
inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestTrue)
invRst1 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 0, len(invRst1))
invRst1 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 0, len(invRst1.ToArray()))
inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestFalse)
invRst2 := tagRouter.Route(invokers, &u1, inv)
assert.Equal(t, 3, len(invRst2))
invRst2 := tagRouter.Route(utils.ToBitmap(invokers), setUpAddrCache(tagRouter, invokers), &u1, inv)
assert.Equal(t, 3, len(invRst2.ToArray()))
}
func TestFilterInvoker(t *testing.T) {
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := chain.BuildCache(addrs)
cache.SetAddrPool(r.Name(), pool)
cache.SetAddrMeta(r.Name(), info)
return cache
}
func setUpAddrCacheWithRuleDisabled(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
info.(*addrMetadata).ruleEnabled = false
cache := chain.BuildCache(addrs)
cache.SetAddrPool(r.Name(), pool)
cache.SetAddrMeta(r.Name(), info)
return cache
}
func TestRouteBeijingInvoker(t *testing.T) {
u2, e2 := common.NewURL(tagRouterTestHangZhouUrl)
u3, e3 := common.NewURL(tagRouterTestShangHaiUrl)
u4, e4 := common.NewURL(tagRouterTestBeijingUrl)
......@@ -203,23 +223,17 @@ func TestFilterInvoker(t *testing.T) {
inv5 := NewMockInvoker(u5)
var invokers []protocol.Invoker
invokers = append(invokers, inv2, inv3, inv4, inv5)
filterTag := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" {
return true
}
return false
}
res := filterInvoker(invokers, filterTag)
assert.Equal(t, []protocol.Invoker{inv4, inv5}, res)
flag := true
filterEnabled := func(invoker protocol.Invoker) bool {
if invoker.GetUrl().GetParamBool(constant.RouterEnabled, false) == flag {
return true
}
return false
}
res2 := filterInvoker(invokers, filterTag, filterEnabled)
assert.Equal(t, []protocol.Invoker{inv4}, res2)
url, _ := common.NewURL(tagRouterTestBeijingUrl)
tagRouter, _ := NewTagRouter(&url)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(tagRouter, invokers)
inv := &invocation.RPCInvocation{}
res := tagRouter.Route(rb, cache, &url, inv)
// inv4 and inv5
assert.Equal(t, []uint32{2, 3}, res.ToArray())
}
type DynamicTagRouter struct {
......@@ -301,44 +315,51 @@ func (suite *DynamicTagRouter) TearDownTest() {
func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
invokers := suite.invokers
suite.route.Notify(invokers)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(suite.route, invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag1")
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[0])
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(0))
consumer.SetAttachments(tagRouterTestDubboTag, "tag3")
targetInvokers = suite.route.Route(invokers, suite.url, consumer)
suite.Equal(2, len(targetInvokers))
suite.Equal(targetInvokers, []protocol.Invoker{suite.invokers[2], suite.invokers[3]})
targetInvokers = suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(2), targetInvokers.GetCardinality())
suite.True(targetInvokers.Contains(2) && targetInvokers.Contains(3))
}
func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() {
invokers := suite.invokers
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCacheWithRuleDisabled(suite.route, invokers)
consumer := &invocation.RPCInvocation{}
consumer.SetAttachments(tagRouterTestDubboTag, "tag4")
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[3])
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(3))
}
// Teas no tag and return a address are not in dynamic tag group
func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() {
invokers := suite.invokers
suite.route.Notify(invokers)
rb := roaring.NewBitmap()
rb.AddRange(0, uint64(len(invokers)))
cache := setUpAddrCache(suite.route, invokers)
suite.NotNil(suite.route.tagRouterRule)
consumer := &invocation.RPCInvocation{}
targetInvokers := suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[4])
targetInvokers := suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(4))
// test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
consumer.SetAttachments(tagRouterTestDubboTag, "tag5")
targetInvokers = suite.route.Route(invokers, suite.url, consumer)
suite.Equal(1, len(targetInvokers))
suite.Equal(targetInvokers[0], suite.invokers[4])
targetInvokers = suite.route.Route(rb, cache, suite.url, consumer)
suite.Equal(uint64(1), targetInvokers.GetCardinality())
suite.Equal(targetInvokers.ToArray()[0], uint32(4))
}
func TestProcess(t *testing.T) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/protocol"
)
var EmptyAddr = roaring.NewBitmap()
func JoinIfNotEqual(left *roaring.Bitmap, right *roaring.Bitmap) *roaring.Bitmap {
if !left.Equals(right) {
left = left.Clone()
left.And(right)
}
return left
}
func FallbackIfJoinToEmpty(left *roaring.Bitmap, right *roaring.Bitmap) *roaring.Bitmap {
ret := JoinIfNotEqual(left, right)
if ret == nil || ret.IsEmpty() {
return right
}
return ret
}
func ToBitmap(invokers []protocol.Invoker) *roaring.Bitmap {
bitmap := roaring.NewBitmap()
bitmap.AddRange(0, uint64(len(invokers)))
return bitmap
}
......@@ -21,6 +21,10 @@ import (
"fmt"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/metadata/service"
)
......@@ -36,12 +40,11 @@ func SetMetadataService(msType string, creator func() (service.MetadataService,
}
// GetMetadataService will create a MetadataService instance
// it will panic if msType not found
func GetMetadataService(msType string) (service.MetadataService, error) {
if creator, ok := metadataServiceInsMap[msType]; ok {
return creator()
}
panic(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+
return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", msType))
}
......@@ -25,7 +25,7 @@ import (
)
import (
"github.com/dubbogo/getty"
"github.com/apache/dubbo-getty"
perrors "github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
......@@ -60,6 +60,10 @@ type Logger interface {
}
func init() {
// forbidden to executing twice.
if logger != nil {
return
}
logConfFile := os.Getenv(constant.APP_LOG_CONF_FILE)
err := InitLog(logConfFile)
if err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment