Skip to content
Snippets Groups Projects
Commit 6589a7dc authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #1083 from LaurenceLiZhixin/Ftr/uniform-router

Ftr: uniform router support
parents 4eae5f28 e9928aa2
No related branches found
No related tags found
No related merge requests found
Showing
with 15 additions and 2574 deletions
......@@ -30,8 +30,6 @@ import (
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
)
// BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
......@@ -74,42 +72,6 @@ func (dir *BaseDirectory) GetDirectoryUrl() *common.URL {
return dir.url
}
// SetRouters Convert url to routers and add them into dir.routerChain
func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
if len(urls) == 0 {
return
}
routers := make([]router.PriorityRouter, 0, len(urls))
rc := dir.routerChain
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
if len(routerKey) == 0 {
continue
}
if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL {
if !dir.isProperRouter(url) {
continue
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
}
routers = append(routers, r)
}
logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc.AddRouters(routers)
dir.mutex.Unlock()
}
func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
app := url.GetParam(constant.APPLICATION_KEY, "")
dirApp := dir.GetUrl().GetParam(constant.APPLICATION_KEY, "")
......
......@@ -29,7 +29,6 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router/chain"
_ "github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
......@@ -52,17 +51,7 @@ func TestBuildRouterChain(t *testing.T) {
directory := NewBaseDirectory(regURL)
var err error
directory.routerChain, err = chain.NewRouterChain(regURL)
assert.Nil(t, err)
localIP := common.GetLocalIp()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
routeURL.AddParam(constant.INTERFACE_KEY, "mock-app")
routerURLs := make([]*common.URL, 0)
routerURLs = append(routerURLs, routeURL)
directory.SetRouters(routerURLs)
chain := directory.RouterChain()
assert.NotNil(t, chain)
assert.Error(t, err)
}
func getRouteURL(rule string, u *common.URL) *common.URL {
......
......@@ -37,8 +37,9 @@ import (
"github.com/apache/dubbo-go/protocol"
)
const (
timeInterval = 5 * time.Second
var (
virtualServiceConfigByte []byte
destinationRuleConfigByte []byte
)
// RouterChain Router chain
......@@ -71,24 +72,10 @@ func (c *RouterChain) GetNotifyChan() chan struct{} {
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}
bitmap := cache.bitmap
finalInvokers := c.invokers
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}
indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
finalInvokers = r.Route(c.invokers, url, invocation)
}
return finalInvokers
}
......@@ -121,22 +108,6 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
}()
}
// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
}
}
// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
......@@ -158,66 +129,9 @@ func (c *RouterChain) copyInvokers() []protocol.Invoker {
return ret
}
// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}
return v.(*InvokerCache)
}
// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}
c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}
// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if len(invokers) == 0 {
return
}
var (
mutex sync.Mutex
wg sync.WaitGroup
)
cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()
c.cache.Store(cache)
}
// URL Return URL in RouterChain
func (c *RouterChain) URL() *common.URL {
return c.url
func SetVSAndDRConfigByte(vs, dr []byte) {
virtualServiceConfigByte = vs
destinationRuleConfigByte = dr
}
// NewRouterChain Use url to init router chain
......@@ -234,10 +148,14 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
}
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil {
logger.Warnf("virtual Service Config or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n")
}
r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err)
continue
}
routers = append(routers, r)
......@@ -256,34 +174,9 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
chain.url = url
}
go chain.loop()
return chain, nil
}
// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}
logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}
// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}
// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chain
import (
"encoding/base64"
"fmt"
"testing"
"time"
)
import (
zk "github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
const (
localIP = "127.0.0.1"
test1234IP = "1.2.3.4"
test1111IP = "1.1.1.1"
test0000IP = "0.0.0.0"
port20000 = 20000
path = "/dubbo/config/dubbo/test-condition.condition-router"
zkFormat = "zookeeper://%s:%d"
consumerFormat = "consumer://%s/com.foo.BarService"
dubboForamt = "dubbo://%s:%d/com.foo.BarService"
anyUrlFormat = "condition://%s/com.foo.BarService"
zkName = "zookeeper"
applicationKey = "test-condition"
applicationField = "application"
forceField = "force"
forceValue = "true"
)
var zkCluster *zk.TestCluster
func TestNewRouterChain(t *testing.T) {
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
zkCluster = ts
err = z.Create(path)
assert.NoError(t, err)
testyml := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host != 172.22.3.91
`
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
z.Delete(path)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
chain, err := NewRouterChain(getRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 1, len(chain.routers))
appRouter := chain.routers[0].(*condition.AppRouter)
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter.RouterRule())
rule := appRouter.RouterRule()
assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Force)
assert.True(t, rule.Enabled)
assert.True(t, rule.Valid)
assert.Equal(t, testyml, rule.RawRule)
assert.Equal(t, false, rule.Runtime)
assert.Equal(t, false, rule.Dynamic)
assert.Equal(t, "mock-app", rule.Key)
}
func TestNewRouterChainURLNil(t *testing.T) {
chain, err := NewRouterChain(nil)
assert.NoError(t, err)
assert.NotNil(t, chain)
}
func TestRouterChainAddRouters(t *testing.T) {
_, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
testyml := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host != 172.22.3.91
`
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
z.Delete(path)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, zkCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 2, len(chain.routers))
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
r, err := factory.NewPriorityRouter(url, notify)
assert.Nil(t, err)
assert.NotNil(t, r)
routers := make([]router.PriorityRouter, 0)
routers = append(routers, r)
chain.AddRouters(routers)
assert.Equal(t, 3, len(chain.routers))
}
func TestRouterChainRoute(t *testing.T) {
ts, _, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 1, len(chain.routers))
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
var invokers []protocol.Invoker
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(targetURL, inv)
assert.Equal(t, 1, len(finalInvokers))
}
func TestRouterChainRouteAppRouter(t *testing.T) {
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
testyml := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host = 1.1.1.1 => host != 1.2.3.4
`
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
z.Delete(path)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 2, len(chain.routers))
var invokers []protocol.Invoker
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
func TestRouterChainRouteNoRoute(t *testing.T) {
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second, gxzookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
chain, err := NewRouterChain(getConditionNoRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 1, len(chain.routers))
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
var invokers []protocol.Invoker
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
chain.SetInvokers(invokers)
chain.buildCache()
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
func getConditionNoRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host != 1.2.3.4"))
url.AddParam(constant.RULE_KEY, rule)
return url
}
func getConditionRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host = 1.2.3.4"))
url.AddParam(constant.RULE_KEY, rule)
return url
}
func getRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
return url
}
......@@ -23,7 +23,6 @@ import (
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -48,7 +47,6 @@ type InvokerCache struct {
func BuildCache(invokers []protocol.Invoker) *InvokerCache {
return &InvokerCache{
invokers: invokers,
bitmap: utils.ToBitmap(invokers),
pools: make(map[string]router.AddrPool, 8),
metadatas: make(map[string]router.AddrMetadata, 8),
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
const (
// Default priority for application router
appRouterDefaultPriority = int64(150)
)
// AppRouter For listen application router with config center
type AppRouter struct {
listenableRouter
notify interface{}
}
// NewAppRouter Init AppRouter by url
func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) {
if url == nil {
return nil, perrors.Errorf("No route URL for create app router!")
}
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify)
if err != nil {
return nil, err
}
appRouter.priority = appRouterDefaultPriority
return appRouter, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"fmt"
"testing"
"time"
)
import (
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/zookeeper"
"github.com/apache/dubbo-go/remoting"
)
const (
routerPath = "/dubbo/config/dubbo/test-condition.condition-router"
routerLocalIP = "127.0.0.1"
routerZk = "zookeeper"
routerKey = "test-condition"
)
var (
zkFormat = "zookeeper://%s:%d"
conditionFormat = "condition://%s/com.foo.BarService"
)
func TestNewAppRouter(t *testing.T) {
testYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host != 172.22.3.91
`
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
assert.NoError(t, err)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter)
assert.NotNil(t, appRouter.RouterRule())
rule := appRouter.RouterRule()
assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Force)
assert.True(t, rule.Enabled)
assert.True(t, rule.Valid)
assert.Equal(t, testYML, rule.RawRule)
assert.Equal(t, false, rule.Runtime)
assert.Equal(t, false, rule.Dynamic)
assert.Equal(t, "mock-app", rule.Key)
assert.Equal(t, 0, rule.Priority)
}
func TestGenerateConditions(t *testing.T) {
testYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host != 172.22.3.91
- host = 192.168.199.208 => host = 192.168.199.208
`
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
assert.NoError(t, err)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
rule, err := getRule(testYML)
assert.Nil(t, err)
appRouter.generateConditions(rule)
assert.Equal(t, 2, len(appRouter.conditionRouters))
}
func TestProcess(t *testing.T) {
testYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host != 172.22.3.91
`
ts, z, _, err := gxzookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(routerPath)
assert.NoError(t, err)
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL(routerKey)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
appRouter, err := NewAppRouter(appRouteURL, notify)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
assert.Equal(t, 1, len(appRouter.conditionRouters))
testNewYML := `scope: application
key: mock-app
enabled: true
force: true
runtime: false
conditions:
- => host != 172.22.3.91
- host = 192.168.199.208 => host = 192.168.199.208
`
appRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel})
assert.Equal(t, 0, len(appRouter.conditionRouters))
appRouter.Process(&config_center.ConfigChangeEvent{Value: testNewYML, ConfigType: remoting.EventTypeAdd})
assert.Equal(t, 2, len(appRouter.conditionRouters))
}
func getAppRouteURL(applicationKey string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(conditionFormat, constant.ANYHOST_VALUE))
url.AddParam("application", applicationKey)
url.AddParam("force", "true")
return url
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
)
func init() {
extension.SetRouterFactory(constant.ConditionRouterName, newConditionRouterFactory)
extension.SetRouterFactory(constant.ConditionAppRouterName, newAppRouterFactory)
}
// ConditionRouterFactory Condition router factory
type ConditionRouterFactory struct{}
func newConditionRouterFactory() router.PriorityRouterFactory {
return &ConditionRouterFactory{}
}
// NewPriorityRouter creates ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
return NewConditionRouter(url, notify)
}
// NewRouter Create FileRouterFactory by Content
func (c *ConditionRouterFactory) NewFileRouter(content []byte) (router.PriorityRouter, error) {
return NewFileConditionRouter(content)
}
// AppRouterFactory Application router factory
type AppRouterFactory struct{}
func newAppRouterFactory() router.PriorityRouterFactory {
return &AppRouterFactory{}
}
// NewPriorityRouter creates AppRouterFactory by URL
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
return NewAppRouter(url, notify)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"context"
"encoding/base64"
"fmt"
"reflect"
"testing"
)
import (
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
const (
factory1111Ip = "1.1.1.1"
factoryUrlFormat = "condition://%s/com.foo.BarService"
factoryDubboFormat = "dubbo://%s:20880/com.foo.BarService"
factoryConsumerMethodFormat = "consumer://%s/com.foo.BarService?methods=getFoo"
factory333URL = "dubbo://10.20.3.3:20880/com.foo.BarService"
factoryConsumerFormat = "consumer://%s/com.foo.BarService"
factoryHostIp1234Format = "host = %s => host = 1.2.3.4"
)
type MockInvoker struct {
url *common.URL
available bool
destroyed bool
successCount int
}
func NewMockInvoker(url *common.URL, successCount int) *MockInvoker {
return &MockInvoker{
url: url,
available: true,
destroyed: false,
successCount: successCount,
}
}
func (bi *MockInvoker) GetUrl() *common.URL {
return bi.url
}
func getRouteUrl(rule string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE))
url.AddParam("rule", rule)
url.AddParam("force", "true")
return url
}
func getRouteUrlWithForce(rule, force string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE))
url.AddParam("rule", rule)
url.AddParam("force", force)
return url
}
func getRouteUrlWithNoForce(rule string) *common.URL {
url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE))
url.AddParam("rule", rule)
return url
}
func (bi *MockInvoker) IsAvailable() bool {
return bi.available
}
func (bi *MockInvoker) IsDestroyed() bool {
return bi.destroyed
}
type rest struct {
tried int
success bool
}
var count int
func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result {
count++
var (
success bool
err error
)
if count >= bi.successCount {
success = true
} else {
err = perrors.New("error")
}
result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}}
return result
}
func (bi *MockInvoker) Destroy() {
logger.Infof("Destroy invoker: %v", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
}
func TestRoute_matchWhen(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4"))
notify := make(chan struct{})
go func() {
for range notify {
}
}()
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip))
matchWhen := router.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen)
rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify)
matchWhen1 := router1.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen1)
rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
matchWhen2 := router2.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, false, matchWhen2)
rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
matchWhen3 := router3.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen3)
rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify)
matchWhen4 := router4.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen4)
rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4"))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify)
matchWhen5 := router5.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, false, matchWhen5)
rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4"))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify)
matchWhen6 := router6.(*ConditionRouter).MatchWhen(cUrl, inv)
assert.Equal(t, true, matchWhen6)
}
func TestRoute_matchFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
t.Logf("The local ip is %s", localIP)
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invokers := []protocol.Invoker{NewMockInvoker(url1, 1), NewMockInvoker(url2, 2), NewMockInvoker(url3, 3)}
rule1 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3"))
rule2 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.* & host != 10.20.3.3"))
rule3 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3 & host != 10.20.3.3"))
rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4"))
rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3"))
rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson"))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify)
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify)
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify)
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify)
cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret3 := router3.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret4 := router4.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret5 := router5.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
ret6 := router6.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{})
assert.Equal(t, 1, len(ret1.ToArray()))
assert.Equal(t, 0, len(ret2.ToArray()))
assert.Equal(t, 0, len(ret3.ToArray()))
assert.Equal(t, 1, len(ret4.ToArray()))
assert.Equal(t, 2, len(ret5.ToArray()))
assert.Equal(t, 1, len(ret6.ToArray()))
}
func TestRoute_methodRoute(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := r.(*ConditionRouter).MatchWhen(url, inv)
assert.Equal(t, true, matchWhen)
url1, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
matchWhen = r.(*ConditionRouter).MatchWhen(url1, inv)
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify)
matchWhen = router2.(*ConditionRouter).MatchWhen(url2, inv)
assert.Equal(t, false, matchWhen)
url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify)
matchWhen = router3.(*ConditionRouter).MatchWhen(url3, inv)
assert.Equal(t, true, matchWhen)
}
func TestRoute_ReturnFalse(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
url, _ := common.NewURL("")
localIP := common.GetLocalIp()
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnEmpty(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url, _ := common.NewURL("")
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(ret.ToArray()))
}
func TestRoute_ReturnAll(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
urlString := "dubbo://" + localIP + "/com.foo.BarService"
dubboURL, _ := common.NewURL(urlString)
mockInvoker1 := NewMockInvoker(dubboURL, 1)
mockInvoker2 := NewMockInvoker(dubboURL, 1)
mockInvoker3 := NewMockInvoker(dubboURL, 1)
invokers := []protocol.Invoker{mockInvoker1, mockInvoker2, mockInvoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_HostFilter(t *testing.T) {
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
notify := make(chan struct{})
go func() {
for range notify {
}
}()
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_Empty_HostFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_False_HostFilter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_Placeholder(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 2, len(ret.ToArray()))
assert.Equal(t, invoker2, invokers[ret.ToArray()[0]])
assert.Equal(t, invoker3, invokers[ret.ToArray()[1]])
}
func TestRoute_NoForce(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule), notify)
ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, len(invokers), len(ret.ToArray()))
}
func TestRoute_Force(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
localIP := common.GetLocalIp()
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"), notify)
fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv)
assert.Equal(t, 0, len(fileredInvokers.ToArray()))
}
func TestNewConditionRouterFactory(t *testing.T) {
factory := newConditionRouterFactory()
assert.NotNil(t, factory)
}
func TestNewAppRouterFactory(t *testing.T) {
factory := newAppRouterFactory()
assert.NotNil(t, factory)
}
func setUpAddrCache(addrs []protocol.Invoker) router.Cache {
return chain.BuildCache(addrs)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"encoding/base64"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
// FileConditionRouter Use for parse config file of condition router
type FileConditionRouter struct {
listenableRouter
parseOnce sync.Once
url *common.URL
}
// NewFileConditionRouter Create file condition router instance with content ( from config file)
func NewFileConditionRouter(content []byte) (*FileConditionRouter, error) {
fileRouter := &FileConditionRouter{}
rule, err := getRule(string(content))
if err != nil {
return nil, perrors.Errorf("yaml.Unmarshal() failed , error:%v", perrors.WithStack(err))
}
if !rule.Valid {
return nil, perrors.Errorf("rule content is not verify for condition router , error:%v", perrors.WithStack(err))
}
fileRouter.generateConditions(rule)
return fileRouter, nil
}
// URL Return URL in file condition router n
func (f *FileConditionRouter) URL() *common.URL {
f.parseOnce.Do(func() {
routerRule := f.routerRule
rule := parseCondition(routerRule.Conditions)
f.url = common.NewURLWithOptions(
common.WithProtocol(constant.CONDITION_ROUTE_PROTOCOL),
common.WithIp(constant.ANYHOST_VALUE),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.RouterForce, strconv.FormatBool(routerRule.Force)),
common.WithParamsValue(constant.RouterPriority, strconv.Itoa(routerRule.Priority)),
common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString([]byte(rule))),
common.WithParamsValue(constant.ROUTER_KEY, constant.CONDITION_ROUTE_PROTOCOL),
common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY),
)
if routerRule.Scope == constant.RouterApplicationScope {
f.url.AddParam(constant.APPLICATION_KEY, routerRule.Key)
return
}
grp, srv, ver, e := parseServiceRouterKey(routerRule.Key)
if e != nil {
return
}
if len(grp) > 0 {
f.url.AddParam(constant.GROUP_KEY, grp)
}
if len(ver) > 0 {
f.url.AddParam(constant.VERSION_KEY, ver)
}
if len(srv) > 0 {
f.url.AddParam(constant.INTERFACE_KEY, srv)
}
})
return f.url
}
// The input value must follow [{group}/]{service}[:{version}] pattern
// the returning strings are representing group, service, version respectively.
// input: mock-group/mock-service:1.0.0 ==> "mock-group", "mock-service", "1.0.0"
// input: mock-group/mock-service ==> "mock-group", "mock-service", ""
// input: mock-service:1.0.0 ==> "", "mock-service", "1.0.0"
// For more samples, please refer to unit test.
func parseServiceRouterKey(key string) (string, string, string, error) {
if len(strings.TrimSpace(key)) == 0 {
return "", "", "", nil
}
reg := regexp.MustCompile(`(.*/{1})?([^:/]+)(:{1}[^:]*)?`)
strs := reg.FindAllStringSubmatch(key, -1)
if strs == nil || len(strs) > 1 {
return "", "", "", perrors.Errorf("Invalid key, service key must follow [{group}/]{service}[:{version}] pattern")
}
if len(strs[0]) != 4 {
return "", "", "", perrors.Errorf("Parse service router key failed")
}
grp := strings.TrimSpace(strings.TrimRight(strs[0][1], "/"))
srv := strings.TrimSpace(strs[0][2])
ver := strings.TrimSpace(strings.TrimLeft(strs[0][3], ":"))
return grp, srv, ver, nil
}
func parseCondition(conditions []string) string {
var when, then string
for _, condition := range conditions {
condition = strings.Trim(condition, " ")
if strings.Contains(condition, "=>") {
array := strings.SplitN(condition, "=>", 2)
consumer := strings.Trim(array[0], " ")
provider := strings.Trim(array[1], " ")
if len(consumer) != 0 {
if len(when) != 0 {
when = strings.Join([]string{when, consumer}, " & ")
} else {
when = consumer
}
}
if len(provider) != 0 {
if len(then) != 0 {
then = strings.Join([]string{then, provider}, " & ")
} else {
then = provider
}
}
}
}
return strings.Join([]string{when, then}, " => ")
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestLoadYmlConfig(t *testing.T) {
router, e := NewFileConditionRouter([]byte(`scope: application
key: mock-app
priority: 1
force: true
conditions :
- "a => b"
- "c => d"`))
assert.Nil(t, e)
assert.NotNil(t, router)
assert.Equal(t, router.routerRule.Priority, 1)
assert.Equal(t, router.routerRule.Force, true)
assert.Equal(t, len(router.routerRule.Conditions), 2)
}
func TestParseCondition(t *testing.T) {
s := make([]string, 2)
s = append(s, "a => b")
s = append(s, "c => d")
condition := parseCondition(s)
assert.Equal(t, "a & c => b & d", condition)
}
func TestFileRouterURL(t *testing.T) {
router, e := NewFileConditionRouter([]byte(`scope: application
key: mock-app
priority: 1
force: true
conditions :
- "a => b"
- "c => d"`))
assert.Nil(t, e)
assert.NotNil(t, router)
assert.Equal(t, "condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String())
router, e = NewFileConditionRouter([]byte(`scope: service
key: mock-service
priority: 1
force: true
conditions :
- "a => b"
- "c => d"`))
assert.Nil(t, e)
assert.NotNil(t, router)
assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&interface=mock-service&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String())
router, e = NewFileConditionRouter([]byte(`scope: service
key: grp1/mock-service:v1
priority: 1
force: true
conditions :
- "a => b"
- "c => d"`))
assert.Nil(t, e)
assert.NotNil(t, router)
assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&group=grp1&interface=mock-service&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D&version=v1", router.URL().String())
}
func TestParseServiceRouterKey(t *testing.T) {
testString := " mock-group / mock-service:1.0.0"
grp, srv, ver, err := parseServiceRouterKey(testString)
assert.Equal(t, "mock-group", grp)
assert.Equal(t, "mock-service", srv)
assert.Equal(t, "1.0.0", ver)
assert.Nil(t, err)
testString = "mock-group/mock-service"
grp, srv, ver, err = parseServiceRouterKey(testString)
assert.Equal(t, "mock-group", grp)
assert.Equal(t, "mock-service", srv)
assert.Equal(t, "", ver)
assert.Nil(t, err)
testString = "mock-service:1.0.0"
grp, srv, ver, err = parseServiceRouterKey(testString)
assert.Equal(t, "", grp)
assert.Equal(t, "mock-service", srv)
assert.Equal(t, "1.0.0", ver)
assert.Nil(t, err)
testString = "mock-service"
grp, srv, ver, err = parseServiceRouterKey(testString)
assert.Equal(t, "", grp)
assert.Equal(t, "mock-service", srv)
assert.Equal(t, "", ver)
assert.NoError(t, err)
testString = "/mock-service:"
grp, srv, ver, err = parseServiceRouterKey(testString)
assert.Equal(t, "", grp)
assert.Equal(t, "mock-service", srv)
assert.Equal(t, "", ver)
assert.NoError(t, err)
testString = "grp:mock-service:123"
grp, srv, ver, err = parseServiceRouterKey(testString)
assert.Equal(t, "", grp)
assert.Equal(t, "", srv)
assert.Equal(t, "", ver)
assert.Error(t, err)
testString = ""
grp, srv, ver, err = parseServiceRouterKey(testString)
assert.Equal(t, "", grp)
assert.Equal(t, "", srv)
assert.Equal(t, "", ver)
assert.NoError(t, err)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"fmt"
)
import (
"github.com/RoaringBitmap/roaring"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting"
)
const (
// Default priority for listenable router, use the maximum int64 value
listenableRouterDefaultPriority = ^int64(0)
)
// listenableRouter Abstract router which listens to dynamic configuration
type listenableRouter struct {
conditionRouters []*ConditionRouter
routerRule *RouterRule
url *common.URL
force bool
priority int64
notify chan struct{}
}
// RouterRule Get RouterRule instance from listenableRouter
func (l *listenableRouter) RouterRule() *RouterRule {
return l.routerRule
}
func newListenableRouter(url *common.URL, ruleKey string, notify chan struct{}) (*AppRouter, error) {
if ruleKey == "" {
return nil, perrors.Errorf("NewListenableRouter ruleKey is nil, can't create Listenable router")
}
l := &AppRouter{}
l.url = url
l.priority = listenableRouterDefaultPriority
l.notify = notify
routerKey := ruleKey + constant.ConditionRouterRuleSuffix
// add listener
dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
return nil, perrors.Errorf("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
}
dynamicConfiguration.AddListener(routerKey, l)
// get rule
rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
if len(rule) == 0 || err != nil {
return nil, perrors.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err)
}
l.Process(&config_center.ConfigChangeEvent{
Key: routerKey,
Value: rule,
ConfigType: remoting.EventTypeUpdate,
})
logger.Info("Init app router success")
return l, nil
}
// Process Process config change event, generate routers and set them to the listenableRouter instance
func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
if remoting.EventTypeDel == event.ConfigType {
l.routerRule = nil
if l.conditionRouters != nil {
l.conditionRouters = l.conditionRouters[:0]
}
return
}
content, ok := event.Value.(string)
if !ok {
msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value)
logger.Error(msg)
return
}
routerRule, err := getRule(content)
if err != nil {
logger.Errorf("Parse condition router rule fail,error:[%s] ", err)
return
}
l.generateConditions(routerRule)
go func() {
l.notify <- struct{}{}
}()
}
func (l *listenableRouter) generateConditions(rule *RouterRule) {
if rule == nil || !rule.Valid {
return
}
l.conditionRouters = make([]*ConditionRouter, 0, len(rule.Conditions))
l.routerRule = rule
for _, c := range rule.Conditions {
router, e := NewConditionRouterWithRule(c)
if e != nil {
logger.Errorf("Create condition router with rule fail,raw rule:[%s] ", c)
continue
}
router.Force = rule.Force
router.enabled = rule.Enabled
l.conditionRouters = append(l.conditionRouters, router)
}
}
// Route Determine the target invokers list.
func (l *listenableRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if invokers.IsEmpty() || len(l.conditionRouters) == 0 {
return invokers
}
// We will check enabled status inside each router.
for _, r := range l.conditionRouters {
invokers = r.Route(invokers, cache, url, invocation)
}
return invokers
}
// Priority Return Priority in listenable router
func (l *listenableRouter) Priority() int64 {
return l.priority
}
// URL Return URL in listenable router
func (l *listenableRouter) URL() *common.URL {
return l.url
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"regexp"
"strings"
)
import (
"github.com/RoaringBitmap/roaring"
"github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
// pattern route pattern regex
pattern = `([&!=,]*)\\s*([^&!=,\\s]+)`
)
var routerPatternReg = regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`)
var emptyMatchPair = MatchPair{
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
// ConditionRouter Condition router struct
type ConditionRouter struct {
Pattern string
url *common.URL
priority int64
Force bool
enabled bool
WhenCondition map[string]MatchPair
ThenCondition map[string]MatchPair
notify chan struct{}
}
// NewConditionRouterWithRule Init condition router by raw rule
func NewConditionRouterWithRule(rule string) (*ConditionRouter, error) {
var (
whenRule string
thenRule string
when map[string]MatchPair
then map[string]MatchPair
)
rule = strings.Replace(rule, "consumer.", "", -1)
rule = strings.Replace(rule, "provider.", "", -1)
i := strings.Index(rule, "=>")
if i > 0 {
whenRule = rule[0:i]
}
if i < 0 {
thenRule = rule
} else {
thenRule = rule[i+2:]
}
whenRule = strings.Trim(whenRule, " ")
thenRule = strings.Trim(thenRule, " ")
w, err := parseRule(whenRule)
if err != nil {
return nil, perrors.Errorf("%s", "")
}
t, err := parseRule(thenRule)
if err != nil {
return nil, perrors.Errorf("%s", "")
}
if len(whenRule) == 0 || "true" == whenRule {
when = make(map[string]MatchPair, 16)
} else {
when = w
}
if len(thenRule) == 0 || "false" == thenRule {
when = make(map[string]MatchPair, 16)
} else {
then = t
}
return &ConditionRouter{
Pattern: pattern,
WhenCondition: when,
ThenCondition: then,
}, nil
}
// NewConditionRouter Init condition router by URL
func NewConditionRouter(url *common.URL, notify chan struct{}) (*ConditionRouter, error) {
if url == nil {
return nil, perrors.Errorf("Illegal route URL!")
}
rule, err := url.GetParamAndDecoded(constant.RULE_KEY)
if err != nil || len(rule) == 0 {
return nil, perrors.Errorf("Illegal route rule!")
}
router, err := NewConditionRouterWithRule(rule)
if err != nil {
return nil, err
}
router.url = url
var defaultPriority int64 = 0
if url.GetParam(constant.APPLICATION_KEY, "") != "" {
defaultPriority = 150
} else if url.GetParam(constant.INTERFACE_KEY, "") != "" {
defaultPriority = 140
}
router.priority = url.GetParamInt(constant.RouterPriority, defaultPriority)
router.Force = url.GetParamBool(constant.RouterForce, false)
router.enabled = url.GetParamBool(constant.RouterEnabled, true)
router.notify = notify
return router, nil
}
// Priority Return Priority in condition router
func (c *ConditionRouter) Priority() int64 {
return c.priority
}
// URL Return URL in condition router
func (c *ConditionRouter) URL() *common.URL {
return c.url
}
// Enabled Return is condition router is enabled
// true: enabled
// false: disabled
func (c *ConditionRouter) Enabled() bool {
return c.enabled
}
// Route Determine the target invokers list.
func (c *ConditionRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
if !c.Enabled() {
return invokers
}
if invokers.IsEmpty() {
return invokers
}
isMatchWhen := c.MatchWhen(url, invocation)
if !isMatchWhen {
return invokers
}
if len(c.ThenCondition) == 0 {
return utils.EmptyAddr
}
result := roaring.NewBitmap()
for iter := invokers.Iterator(); iter.HasNext(); {
index := iter.Next()
invoker := cache.GetInvokers()[index]
invokerUrl := invoker.GetUrl()
isMatchThen := c.MatchThen(invokerUrl, url)
if isMatchThen {
result.Add(index)
}
}
if !result.IsEmpty() {
return result
} else if c.Force {
rule, _ := url.GetParamAndDecoded(constant.RULE_KEY)
localIP := common.GetLocalIp()
logger.Warnf("The route result is empty and force execute. consumer: %s, service: %s, router: %s", localIP, url.Service(), rule)
return result
}
return invokers
}
func parseRule(rule string) (map[string]MatchPair, error) {
condition := make(map[string]MatchPair, 16)
if len(rule) == 0 {
return condition, nil
}
var pair MatchPair
values := gxset.NewSet()
matches := routerPatternReg.FindAllSubmatch([]byte(rule), -1)
for _, groups := range matches {
separator := string(groups[1])
content := string(groups[2])
switch separator {
case "":
pair = MatchPair{
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
condition[content] = pair
case "&":
if r, ok := condition[content]; ok {
pair = r
} else {
pair = MatchPair{
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
condition[content] = pair
}
case "=":
if pair == emptyMatchPair {
startIndex := getStartIndex(rule)
return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex)
}
values = pair.Matches
values.Add(content)
case "!=":
if pair == emptyMatchPair {
startIndex := getStartIndex(rule)
return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex)
}
values = pair.Mismatches
values.Add(content)
case ",":
if values.Empty() {
startIndex := getStartIndex(rule)
return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex)
}
values.Add(content)
default:
startIndex := getStartIndex(rule)
return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex)
}
}
return condition, nil
}
func getStartIndex(rule string) int {
if indexTuple := routerPatternReg.FindIndex([]byte(rule)); len(indexTuple) > 0 {
return indexTuple[0]
}
return -1
}
// MatchWhen MatchWhen
func (c *ConditionRouter) MatchWhen(url *common.URL, invocation protocol.Invocation) bool {
condition := matchCondition(c.WhenCondition, url, nil, invocation)
return len(c.WhenCondition) == 0 || condition
}
// MatchThen MatchThen
func (c *ConditionRouter) MatchThen(url *common.URL, param *common.URL) bool {
condition := matchCondition(c.ThenCondition, url, param, nil)
return len(c.ThenCondition) > 0 && condition
}
// MatchCondition MatchCondition
func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.URL, invocation protocol.Invocation) bool {
sample := url.ToMap()
if sample == nil {
// because url.ToMap() may return nil, but it should continue to process make condition
sample = make(map[string]string)
}
var result bool
for key, matchPair := range pairs {
var sampleValue string
if invocation != nil && ((constant.METHOD_KEY == key) || (constant.METHOD_KEYS == key)) {
sampleValue = invocation.MethodName()
} else {
sampleValue = sample[key]
if len(sampleValue) == 0 {
sampleValue = sample[constant.PREFIX_DEFAULT_KEY+key]
}
}
if len(sampleValue) > 0 {
if !matchPair.isMatch(sampleValue, param) {
return false
}
result = true
} else {
if !(matchPair.Matches.Empty()) {
return false
}
result = true
}
}
return result
}
// MatchPair Match key pair, condition process
type MatchPair struct {
Matches *gxset.HashSet
Mismatches *gxset.HashSet
}
func (pair MatchPair) isMatch(value string, param *common.URL) bool {
if !pair.Matches.Empty() && pair.Mismatches.Empty() {
for match := range pair.Matches.Items {
if isMatchGlobalPattern(match.(string), value, param) {
return true
}
}
return false
}
if !pair.Mismatches.Empty() && pair.Matches.Empty() {
for mismatch := range pair.Mismatches.Items {
if isMatchGlobalPattern(mismatch.(string), value, param) {
return false
}
}
return true
}
if !pair.Mismatches.Empty() && !pair.Matches.Empty() {
// when both mismatches and matches contain the same value, then using mismatches first
for mismatch := range pair.Mismatches.Items {
if isMatchGlobalPattern(mismatch.(string), value, param) {
return false
}
}
for match := range pair.Matches.Items {
if isMatchGlobalPattern(match.(string), value, param) {
return true
}
}
return false
}
return false
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"strings"
)
import (
gxstrings "github.com/dubbogo/gost/strings"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/yaml"
)
// RouterRule RouterRule config read from config file or config center
type RouterRule struct {
router.BaseRouterRule `yaml:",inline"`
Conditions []string
}
/* Parse Router raw rule parser
* example :
* scope: application
* runtime: true
* force: false
* conditions:
* - >
* method!=sayHello =>
* - >
* ip=127.0.0.1
* =>
* 1.1.1.1
*/
func getRule(rawRule string) (*RouterRule, error) {
r := &RouterRule{}
err := yaml.UnmarshalYML([]byte(rawRule), r)
if err != nil {
return r, err
}
r.RawRule = rawRule
if len(r.Conditions) > 0 && len(r.Key) > 0 && (r.Scope == constant.RouterApplicationScope || r.Scope == constant.RouterServiceScope) {
r.Valid = true
}
return r, nil
}
// isMatchGlobalPattern Match value to param content by pattern
func isMatchGlobalPattern(pattern string, value string, param *common.URL) bool {
if param != nil && strings.HasPrefix(pattern, "$") {
pattern = param.GetRawParam(pattern[1:])
}
return gxstrings.IsMatchPattern(pattern, value)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
)
func TestGetRule(t *testing.T) {
testyml := `
scope: application
key: test-provider
runtime: true
force: false
conditions:
- >
method!=sayHello =>
- >
ip=127.0.0.1
=>
1.1.1.1`
rule, e := getRule(testyml)
assert.Nil(t, e)
assert.NotNil(t, rule)
assert.Equal(t, 2, len(rule.Conditions))
assert.Equal(t, "application", rule.Scope)
assert.True(t, rule.Runtime)
assert.Equal(t, false, rule.Force)
assert.Equal(t, testyml, rule.RawRule)
assert.True(t, rule.Valid)
assert.Equal(t, false, rule.Enabled)
assert.Equal(t, false, rule.Dynamic)
assert.Equal(t, "test-provider", rule.Key)
testyml = `
key: test-provider
runtime: true
force: false
conditions:
- >
method!=sayHello =>`
rule, e = getRule(testyml)
assert.Nil(t, e)
assert.False(t, rule.Valid)
testyml = `
scope: noApplication
key: test-provider
conditions:
- >
method!=sayHello =>`
rule, e = getRule(testyml)
assert.Nil(t, e)
assert.False(t, rule.Valid)
}
func TestIsMatchGlobPattern(t *testing.T) {
url, _ := common.NewURL("dubbo://localhost:8080/Foo?key=v*e")
assert.Equal(t, true, isMatchGlobalPattern("$key", "value", url))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package condition
import (
"testing"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/dubbogo/gost/container/set"
"github.com/stretchr/testify/assert"
)
func TestParseRule(t *testing.T) {
testString := ``
matchPair, err := parseRule(testString)
assert.Nil(t, err)
assert.EqualValues(t, matchPair, make(map[string]MatchPair, 16))
testString = `method!=sayHello&application=sayGoodBye`
matchPair, err = parseRule(testString)
assert.Nil(t, err)
assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet("sayHello"))
assert.EqualValues(t, matchPair["application"].Matches, gxset.NewSet("sayGoodBye"))
testString = `noRule`
matchPair, err = parseRule(testString)
assert.Nil(t, err)
assert.EqualValues(t, matchPair["noRule"].Mismatches, gxset.NewSet())
assert.EqualValues(t, matchPair["noRule"].Matches, gxset.NewSet())
testString = `method!=sayHello,sayGoodBye`
matchPair, err = parseRule(testString)
assert.Nil(t, err)
assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet(`sayHello`, `sayGoodBye`))
testString = `method!=sayHello,sayGoodDay=sayGoodBye`
matchPair, err = parseRule(testString)
assert.Nil(t, err)
assert.EqualValues(t, matchPair["method"].Mismatches, gxset.NewSet(`sayHello`, `sayGoodDay`))
assert.EqualValues(t, matchPair["method"].Matches, gxset.NewSet(`sayGoodBye`))
}
func TestNewConditionRouter(t *testing.T) {
notify := make(chan struct{})
go func() {
for range notify {
}
}()
url, _ := common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`)
router, err := NewConditionRouter(url, notify)
assert.Nil(t, err)
assert.Equal(t, true, router.Enabled())
assert.Equal(t, true, router.Force)
assert.Equal(t, int64(1), router.Priority())
whenRule, _ := parseRule("a & c")
thenRule, _ := parseRule("b & d")
assert.EqualValues(t, router.WhenCondition, whenRule)
assert.EqualValues(t, router.ThenCondition, thenRule)
router, err = NewConditionRouter(nil, notify)
assert.Nil(t, router)
assert.Error(t, err)
url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmT4gYiAmIGQ%3D`)
router, err = NewConditionRouter(url, notify)
assert.Nil(t, router)
assert.Error(t, err)
url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`)
router, err = NewConditionRouter(url, notify)
assert.Nil(t, err)
assert.Equal(t, int64(150), router.Priority())
url, _ = common.NewURL(`condition://0.0.0.0:?category=routers&force=true&interface=mock-service&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`)
router, err = NewConditionRouter(url, notify)
assert.Nil(t, err)
assert.Equal(t, int64(140), router.Priority())
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package router
import (
"github.com/apache/dubbo-go/protocol"
)
// ConnChecker is used to determine whether the invoker is healthy or not
type ConnChecker interface {
// IsConnHealthy evaluates the healthy state on the given Invoker
IsConnHealthy(invoker protocol.Invoker) bool
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conncheck
import (
"github.com/RoaringBitmap/roaring"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
const (
connHealthy = "conn-healthy"
name = "conn-check-router"
)
// ConnCheckRouter provides a health-first routing mechanism through ConnChecker
type ConnCheckRouter struct {
url *common.URL
checker router.ConnChecker
notify chan struct{}
}
// NewConnCheckRouter construct an NewConnCheckRouter via url
func NewConnCheckRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) {
r := &ConnCheckRouter{
url: url,
notify: notify,
}
checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER)
r.checker = extension.GetConnChecker(checkerName, url)
return r, nil
}
// Route gets a list of healthy invoker
func (r *ConnCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap {
addrPool := cache.FindAddrPool(r)
// Add healthy invoker to the list
healthyInvokers := utils.JoinIfNotEqual(addrPool[connHealthy], invokers)
// If all invokers are considered unhealthy, downgrade to all invoker
if healthyInvokers.IsEmpty() {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
}
return healthyInvokers
}
// Pool separates healthy invokers from others.
func (r *ConnCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
rb := make(router.AddrPool, 8)
rb[connHealthy] = roaring.NewBitmap()
for i, invoker := range invokers {
if r.checker.IsConnHealthy(invoker) {
rb[connHealthy].Add(uint32(i))
}
}
return rb, nil
}
// ShouldPool will always return true to make sure healthy check constantly.
func (r *ConnCheckRouter) ShouldPool() bool {
return true
}
// Name get name of ConnCheckerRouter
func (r *ConnCheckRouter) Name() string {
return name
}
// Priority get Router priority level
func (r *ConnCheckRouter) Priority() int64 {
return 0
}
// URL Return URL in router
func (r *ConnCheckRouter) URL() *common.URL {
return r.url
}
// ConnChecker returns the HealthChecker bound to this HealthCheckRouter
func (r *ConnCheckRouter) ConnChecker() router.ConnChecker {
return r.checker
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conncheck
import (
"fmt"
"testing"
)
import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/cluster/router/utils"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)
const (
connCheckRoute1010IP = "192.168.10.10"
connCheckRoute1011IP = "192.168.10.11"
connCheckRoute1012IP = "192.168.10.12"
connCheckRouteMethodNameTest = "test"
connCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider"
connCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider"
)
func TestConnCheckRouterRoute(t *testing.T) {
defer protocol.CleanAllStatus()
notify := make(chan struct{})
go func() {
for range notify {
}
}()
consumerURL, _ := common.NewURL(connCheck1001URL)
url1, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1010IP))
url2, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1011IP))
url3, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1012IP))
hcr, _ := NewConnCheckRouter(consumerURL, notify)
var invokers []protocol.Invoker
invoker1 := NewMockInvoker(url1)
invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3)
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil)
res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now invoker3 is healthy
assert.True(t, len(res.ToArray()) == 1)
// check blacklist remove
protocol.RemoveInvokerUnhealthyStatus(invoker1)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now invoker3 invoker1 is healthy
assert.True(t, len(res.ToArray()) == 2)
}
func TestRecovery(t *testing.T) {
// check recover
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker1 := mock.NewMockInvoker(ctrl)
invoker2 := mock.NewMockInvoker(ctrl)
invoker1.EXPECT().GetUrl().Return(&common.URL{Path: "path1"}).AnyTimes()
invoker2.EXPECT().GetUrl().Return(&common.URL{Path: "path2"}).AnyTimes()
invoker1.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker2.EXPECT().IsAvailable().Return(true).AnyTimes()
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 2)
protocol.TryRefreshBlackList()
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 0)
}
func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache {
pool, info := r.Pool(addrs)
cache := chain.BuildCache(addrs)
cache.SetAddrMeta(r.Name(), info)
cache.SetAddrPool(r.Name(), pool)
return cache
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conncheck
import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
func init() {
extension.SetConnChecker(constant.DEFAULT_CONN_CHECKER, NewDefaultConnChecker)
}
// DefaultConnChecker is the default implementation of ConnChecker, which determines the health status of invoker conn
type DefaultConnChecker struct{}
// IsConnHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request
// and the current active request
func (c *DefaultConnChecker) IsConnHealthy(invoker protocol.Invoker) bool {
return protocol.GetInvokerHealthyStatus(invoker)
}
// NewDefaultConnChecker constructs a new DefaultConnChecker based on the url
func NewDefaultConnChecker(url *common.URL) router.ConnChecker {
return &DefaultConnChecker{}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment