Skip to content
Snippets Groups Projects
Commit 002d5ca5 authored by flycash's avatar flycash
Browse files

Merge develop

parents 6bd4b4cc 3f94f66a
No related branches found
No related tags found
No related merge requests found
Showing
with 383 additions and 288 deletions
......@@ -33,10 +33,15 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
const (
baseClusterInvokerMethodName = "getUser"
baseClusterInvokerFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider"
)
func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
......@@ -45,7 +50,7 @@ func TestStickyNormal(t *testing.T) {
invoked := []protocol.Invoker{}
tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil)
tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
......@@ -54,7 +59,7 @@ func TestStickyNormal(t *testing.T) {
func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
......@@ -62,8 +67,8 @@ func TestStickyNormalWhenError(t *testing.T) {
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
invoked = append(invoked, result)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
assert.NotEqual(t, result, result1)
}
......@@ -72,6 +72,19 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
return invoker
}
func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
......@@ -91,25 +104,11 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) {
}
// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
if _, err = invoker.taskList.Get(1); err != nil {
logger.Warnf("get task found err: %v\n", err)
break
}
go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)
go invoker.tryTimerTaskProc(ctx, retryTask)
}
}
}
......@@ -129,29 +128,26 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
url := invokers[0].GetUrl()
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
loadBalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
var result protocol.Result
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
result := ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
......@@ -164,7 +160,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
return &protocol.RPCResult{}
}
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
timerTask := newRetryTimerTask(loadBalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)
logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
......@@ -172,7 +168,6 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
// ignore
return &protocol.RPCResult{}
}
return result
}
......
......@@ -80,14 +80,14 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
return
}
routers := make([]router.Router, 0, len(urls))
routers := make([]router.PriorityRouter, 0, len(urls))
for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")
if len(routerKey) > 0 {
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewRouter(url)
r, err := factory.NewPriorityRouter(url)
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
......
......@@ -18,6 +18,7 @@
package loadbalance
import (
"fmt"
"testing"
)
......@@ -32,6 +33,19 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
const (
ip = "192.168.1.0"
port8080 = 8080
port8082 = 8082
url8080Short = "dubbo://192.168.1.0:8080"
url8081Short = "dubbo://192.168.1.0:8081"
url20000 = "dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
url8080 = "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
url8081 = "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
url8082 = "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
)
func TestConsistentHashSelectorSuite(t *testing.T) {
suite.Run(t, new(consistentHashSelectorSuite))
}
......@@ -43,8 +57,7 @@ type consistentHashSelectorSuite struct {
func (s *consistentHashSelectorSuite) SetupTest() {
var invokers []protocol.Invoker
url, _ := common.NewURL(
"dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
url, _ := common.NewURL(url20000)
invokers = append(invokers, protocol.NewBaseInvoker(url))
s.selector = newConsistentHashSelector(invokers, "echo", 999944)
}
......@@ -55,14 +68,14 @@ func (s *consistentHashSelectorSuite) TestToKey() {
}
func (s *consistentHashSelectorSuite) TestSelectForKey() {
url1, _ := common.NewURL("dubbo://192.168.1.0:8080")
url2, _ := common.NewURL("dubbo://192.168.1.0:8081")
url1, _ := common.NewURL(url8080Short)
url2, _ := common.NewURL(url8081Short)
s.selector.virtualInvokers = make(map[uint32]protocol.Invoker)
s.selector.virtualInvokers[99874] = protocol.NewBaseInvoker(url1)
s.selector.virtualInvokers[9999945] = protocol.NewBaseInvoker(url2)
s.selector.keys = []uint32{99874, 9999945}
result := s.selector.selectForKey(9999944)
s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?")
s.Equal(result.GetUrl().String(), url8081Short+"?")
}
func TestConsistentHashLoadBalanceSuite(t *testing.T) {
......@@ -83,11 +96,11 @@ type consistentHashLoadBalanceSuite struct {
func (s *consistentHashLoadBalanceSuite) SetupTest() {
var err error
s.url1, err = common.NewURL("dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.url1, err = common.NewURL(url8080)
s.NoError(err)
s.url2, err = common.NewURL("dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.url2, err = common.NewURL(url8081)
s.NoError(err)
s.url3, err = common.NewURL("dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.url3, err = common.NewURL(url8082)
s.NoError(err)
s.invoker1 = protocol.NewBaseInvoker(s.url1)
......@@ -101,9 +114,9 @@ func (s *consistentHashLoadBalanceSuite) SetupTest() {
func (s *consistentHashLoadBalanceSuite) TestSelect() {
args := []interface{}{"name", "password", "age"}
invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080")
s.Equal(invoker.GetUrl().Location, fmt.Sprintf("%s:%d", ip, port8080))
args = []interface{}{"ok", "abc"}
invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082")
s.Equal(invoker.GetUrl().Location, fmt.Sprintf("%s:%d", ip, port8082))
}
......@@ -36,18 +36,24 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
const (
tmpUrl = "dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"
tmpUrlFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider"
tmpIp = "192.168.1.100"
)
func TestRandomlbSelect(t *testing.T) {
randomlb := NewRandomLoadBalance()
invokers := []protocol.Invoker{}
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", 0))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, 0))
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := randomlb.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))
for i := 1; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
randomlb.Select(invokers, &invocation.RPCInvocation{})
......@@ -58,13 +64,13 @@ func TestRandomlbSelectWeight(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
urlParams := url.Values{}
urlParams.Set("methods.test."+constant.WEIGHT_KEY, "10000000000000")
urll, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams))
urll, _ := common.NewURL(tmpUrl, common.WithParams(urlParams))
invokers = append(invokers, protocol.NewBaseInvoker(urll))
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
......@@ -72,7 +78,7 @@ func TestRandomlbSelectWeight(t *testing.T) {
var selected float64
for i := 0; i < 10000; i++ {
s := randomlb.Select(invokers, ivc)
if s.GetUrl().Ip == "192.168.1.100" {
if s.GetUrl().Ip == tmpIp {
selected++
}
selectedInvoker = append(selectedInvoker, s)
......@@ -89,13 +95,13 @@ func TestRandomlbSelectWarmup(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
urlParams := url.Values{}
urlParams.Set(constant.REMOTE_TIMESTAMP_KEY, strconv.FormatInt(time.Now().Add(time.Minute*(-9)).Unix(), 10))
urll, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams))
urll, _ := common.NewURL(tmpUrl, common.WithParams(urlParams))
invokers = append(invokers, protocol.NewBaseInvoker(urll))
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
......@@ -103,7 +109,7 @@ func TestRandomlbSelectWarmup(t *testing.T) {
var selected float64
for i := 0; i < 10000; i++ {
s := randomlb.Select(invokers, ivc)
if s.GetUrl().Ip == "192.168.1.100" {
if s.GetUrl().Ip == tmpIp {
selected++
}
selectedInvoker = append(selectedInvoker, s)
......
......@@ -40,19 +40,21 @@ type RouterChain struct {
// Full list of addresses from registry, classified by method name.
invokers []protocol.Invoker
// Containing all routers, reconstruct every time 'route://' urls change.
routers []router.Router
routers []router.PriorityRouter
// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the
// instance will never delete or recreate.
builtinRouters []router.Router
builtinRouters []router.PriorityRouter
mutex sync.RWMutex
url common.URL
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := invoker
l := len(c.routers)
rs := make([]router.Router, l, int(math.Ceil(float64(l)*1.2)))
rs := make([]router.PriorityRouter, l, int(math.Ceil(float64(l)*1.2)))
c.mutex.RLock()
copy(rs, c.routers)
c.mutex.RUnlock()
......@@ -67,8 +69,8 @@ func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocat
// New a array add builtinRouters which is not sorted in RouterChain and routers
// Sort the array
// Replace router array in RouterChain
func (c *RouterChain) AddRouters(routers []router.Router) {
newRouters := make([]router.Router, 0, len(c.builtinRouters)+len(routers))
func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
newRouters := make([]router.PriorityRouter, 0, len(c.builtinRouters)+len(routers))
newRouters = append(newRouters, c.builtinRouters...)
newRouters = append(newRouters, routers...)
sortRouter(newRouters)
......@@ -77,6 +79,11 @@ func (c *RouterChain) AddRouters(routers []router.Router) {
c.routers = newRouters
}
// URL Return URL in RouterChain
func (c *RouterChain) URL() common.URL {
return c.url
}
// NewRouterChain Use url to init router chain
// Loop routerFactories and call NewRouter method
func NewRouterChain(url *common.URL) (*RouterChain, error) {
......@@ -84,9 +91,9 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}
routers := make([]router.Router, 0, len(routerFactories))
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewRouter(url)
r, err := routerFactory().NewPriorityRouter(url)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
......@@ -94,7 +101,7 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
routers = append(routers, r)
}
newRouters := make([]router.Router, len(routers))
newRouters := make([]router.PriorityRouter, len(routers))
copy(newRouters, routers)
sortRouter(newRouters)
......@@ -103,17 +110,20 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
builtinRouters: routers,
routers: newRouters,
}
if url != nil {
chain.url = *url
}
return chain, nil
}
// sortRouter Sort router instance by priority with stable algorithm
func sortRouter(routers []router.Router) {
func sortRouter(routers []router.PriorityRouter) {
sort.Stable(byPriority(routers))
}
// byPriority Sort by priority
type byPriority []router.Router
type byPriority []router.PriorityRouter
func (a byPriority) Len() int { return len(a) }
func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
......
......@@ -20,7 +20,6 @@ package chain
import (
"encoding/base64"
"fmt"
"strconv"
"testing"
"time"
)
......@@ -43,9 +42,22 @@ import (
)
const (
path = "/dubbo/config/dubbo/test-condition.condition-router"
zkPrefix = "zookeeper://127.0.0.1:"
anyUrl = "condition://0.0.0.0/com.foo.BarService"
localIP = "127.0.0.1"
test1234IP = "1.2.3.4"
test1111IP = "1.1.1.1"
test0000IP = "0.0.0.0"
port20000 = 20000
path = "/dubbo/config/dubbo/test-condition.condition-router"
zkFormat = "zookeeper://%s:%d"
consumerFormat = "consumer://%s/com.foo.BarService"
dubboForamt = "dubbo://%s:%d/com.foo.BarService"
anyUrlFormat = "condition://%s/com.foo.BarService"
zk = "zookeeper"
applicationKey = "test-condition"
applicationField = "application"
forceField = "force"
forceValue = "true"
)
func TestNewRouterChain(t *testing.T) {
......@@ -66,14 +78,14 @@ conditions:
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
chain, err := NewRouterChain(getRouteUrl("test-condition"))
chain, err := NewRouterChain(getRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 1, len(chain.routers))
appRouter := chain.routers[0].(*condition.AppRouter)
......@@ -116,22 +128,22 @@ conditions:
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionRouteUrl("test-condition"))
chain, err := NewRouterChain(getConditionRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 2, len(chain.routers))
url := getConditionRouteUrl("test-condition")
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewRouter(url)
r, err := factory.NewPriorityRouter(url)
assert.Nil(t, err)
assert.NotNil(t, r)
routers := make([]router.Router, 0)
routers := make([]router.PriorityRouter, 0)
routers = append(routers, r)
chain.AddRouters(routers)
assert.Equal(t, 3, len(chain.routers))
......@@ -142,22 +154,22 @@ func TestRouterChainRoute(t *testing.T) {
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionRouteUrl("test-condition"))
chain, err := NewRouterChain(getConditionRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 1, len(chain.routers))
url := getConditionRouteUrl("test-condition")
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService"))
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService"))
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
......@@ -182,46 +194,46 @@ conditions:
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionRouteUrl("test-condition"))
chain, err := NewRouterChain(getConditionRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 2, len(chain.routers))
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService"))
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService"))
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
assert.Equal(t, 0, len(finalInvokers))
}
func TestRouterChain_Route_NoRoute(t *testing.T) {
func TestRouterChainRouteNoRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
chain, err := NewRouterChain(getConditionNoRouteUrl("test-condition"))
chain, err := NewRouterChain(getConditionNoRouteUrl(applicationKey))
assert.Nil(t, err)
assert.Equal(t, 1, len(chain.routers))
url := getConditionRouteUrl("test-condition")
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
invokers := []protocol.Invoker{}
dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService"))
dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000))
invokers = append(invokers, protocol.NewBaseInvoker(dubboURL))
targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService"))
targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP))
inv := &invocation.RPCInvocation{}
finalInvokers := chain.Route(invokers, &targetURL, inv)
......@@ -229,26 +241,26 @@ func TestRouterChain_Route_NoRoute(t *testing.T) {
}
func getConditionNoRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(anyUrl)
url.AddParam("application", applicationKey)
url.AddParam("force", "true")
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host != 1.2.3.4"))
url.AddParam(constant.RULE_KEY, rule)
return &url
}
func getConditionRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(anyUrl)
url.AddParam("application", applicationKey)
url.AddParam("force", "true")
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host = 1.2.3.4"))
url.AddParam(constant.RULE_KEY, rule)
return &url
}
func getRouteUrl(applicationKey string) *common.URL {
url, _ := common.NewURL(anyUrl)
url.AddParam("application", applicationKey)
url.AddParam("force", "true")
url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP))
url.AddParam(applicationField, applicationKey)
url.AddParam(forceField, forceValue)
return &url
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package router
// Chain
type Chain interface {
router
// AddRouters Add routers
AddRouters([]PriorityRouter)
}
......@@ -18,7 +18,7 @@
package condition
import (
"strconv"
"fmt"
"testing"
"time"
)
......@@ -31,6 +31,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
......@@ -38,7 +39,15 @@ import (
)
const (
path = "/dubbo/config/dubbo/test-condition.condition-router"
routerPath = "/dubbo/config/dubbo/test-condition.condition-router"
routerLocalIP = "127.0.0.1"
routerZk = "zookeeper"
routerKey = "test-condition"
)
var (
zkFormat = "zookeeper://%s:%d"
conditionFormat = "condition://%s/com.foo.BarService"
)
func TestNewAppRouter(t *testing.T) {
......@@ -51,22 +60,22 @@ conditions:
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(path)
err = z.Create(routerPath)
assert.NoError(t, err)
_, err = z.Conn.Set(path, []byte(testYML), 0)
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL("test-condition")
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -97,22 +106,22 @@ conditions:
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(path)
err = z.Create(routerPath)
assert.NoError(t, err)
_, err = z.Conn.Set(path, []byte(testYML), 0)
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL("test-condition")
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -134,22 +143,22 @@ conditions:
`
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
err = z.Create(path)
err = z.Create(routerPath)
assert.NoError(t, err)
_, err = z.Conn.Set(path, []byte(testYML), 0)
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer ts.Stop()
defer z.Close()
zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)
assert.NotNil(t, configuration)
appRouteURL := getAppRouteURL("test-condition")
appRouteURL := getAppRouteURL(routerKey)
appRouter, err := NewAppRouter(appRouteURL)
assert.Nil(t, err)
assert.NotNil(t, appRouter)
......@@ -175,7 +184,7 @@ conditions:
}
func getAppRouteURL(applicationKey string) *common.URL {
url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService")
url, _ := common.NewURL(fmt.Sprintf(conditionFormat, constant.ANYHOST_VALUE))
url.AddParam("application", applicationKey)
url.AddParam("force", "true")
return &url
......
......@@ -32,28 +32,28 @@ func init() {
// ConditionRouterFactory Condition router factory
type ConditionRouterFactory struct{}
func newConditionRouterFactory() router.RouterFactory {
func newConditionRouterFactory() router.PriorityRouterFactory {
return &ConditionRouterFactory{}
}
// NewRouter Create ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
// NewPriorityRouter creates ConditionRouterFactory by URL
func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewConditionRouter(url)
}
// NewRouter Create FileRouterFactory by Content
func (c *ConditionRouterFactory) NewFileRouter(content []byte) (router.Router, error) {
func (c *ConditionRouterFactory) NewFileRouter(content []byte) (router.PriorityRouter, error) {
return NewFileConditionRouter(content)
}
// AppRouterFactory Application router factory
type AppRouterFactory struct{}
func newAppRouterFactory() router.RouterFactory {
func newAppRouterFactory() router.PriorityRouterFactory {
return &AppRouterFactory{}
}
// NewRouter Create AppRouterFactory by URL
func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
// NewPriorityRouter creates AppRouterFactory by URL
func (c *AppRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewAppRouter(url)
}
......@@ -33,12 +33,21 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
const anyUrl = "condition://0.0.0.0/com.foo.BarService"
const (
factory1111Ip = "1.1.1.1"
factoryUrlFormat = "condition://%s/com.foo.BarService"
factoryDubboFormat = "dubbo://%s:20880/com.foo.BarService"
factoryConsumerMethodFormat = "consumer://%s/com.foo.BarService?methods=getFoo"
factory333URL = "dubbo://10.20.3.3:20880/com.foo.BarService"
factoryConsumerFormat = "consumer://%s/com.foo.BarService"
factoryHostIp1234Format = "host = %s => host = 1.2.3.4"
)
type MockInvoker struct {
url common.URL
......@@ -61,21 +70,21 @@ func (bi *MockInvoker) GetUrl() common.URL {
}
func getRouteUrl(rule string) *common.URL {
url, _ := common.NewURL(anyUrl)
url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE))
url.AddParam("rule", rule)
url.AddParam("force", "true")
return &url
}
func getRouteUrlWithForce(rule, force string) *common.URL {
url, _ := common.NewURL(anyUrl)
url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE))
url.AddParam("rule", rule)
url.AddParam("force", force)
return &url
}
func getRouteUrlWithNoForce(rule string) *common.URL {
url, _ := common.NewURL(anyUrl)
url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE))
url.AddParam("rule", rule)
return &url
}
......@@ -118,45 +127,45 @@ func (bi *MockInvoker) Destroy() {
bi.available = false
}
func TestRouteMatchWhen(t *testing.T) {
func TestRoute_matchWhen(t *testing.T) {
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
cUrl, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService")
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip))
matchWhen := router.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen)
rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router1, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule1))
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
matchWhen1 := router1.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen1)
rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
matchWhen2 := router2.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, false, matchWhen2)
rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
matchWhen3 := router3.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen3)
rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router4, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule4))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
matchWhen4 := router4.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen4)
rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4"))
router5, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule5))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
matchWhen5 := router5.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, false, matchWhen5)
rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4"))
router6, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule6))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
matchWhen6 := router6.(*ConditionRouter).MatchWhen(&cUrl, inv)
assert.Equal(t, true, matchWhen6)
}
func TestRouteMatchFilter(t *testing.T) {
func TestRoute_matchFilter(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
t.Logf("The local ip is %s", localIP)
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invokers := []protocol.Invoker{NewMockInvoker(url1, 1), NewMockInvoker(url2, 2), NewMockInvoker(url3, 3)}
rule1 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3"))
rule2 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.* & host != 10.20.3.3"))
......@@ -164,13 +173,13 @@ func TestRouteMatchFilter(t *testing.T) {
rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4"))
rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3"))
rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson"))
router1, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule1))
router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2))
router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3))
router4, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule4))
router5, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule6))
cUrl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4))
router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5))
router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6))
cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{})
fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{})
......@@ -186,54 +195,54 @@ func TestRouteMatchFilter(t *testing.T) {
}
func TestRouteMethodRoute(t *testing.T) {
func TestRoute_methodRoute(t *testing.T) {
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{}))
rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4"))
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo")
matchWhen := router.(*ConditionRouter).MatchWhen(&url, inv)
assert.Equal(t, true, matchWhen)
url1, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo")
url1, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
matchWhen = router.(*ConditionRouter).MatchWhen(&url1, inv)
assert.Equal(t, true, matchWhen)
url2, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo")
url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4"))
router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2))
router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2))
matchWhen = router2.(*ConditionRouter).MatchWhen(&url2, inv)
assert.Equal(t, false, matchWhen)
url3, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo")
url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip))
rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4"))
router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3))
router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3))
matchWhen = router3.(*ConditionRouter).MatchWhen(&url3, inv)
assert.Equal(t, true, matchWhen)
}
func TestRouteReturnFalse(t *testing.T) {
func TestRoute_ReturnFalse(t *testing.T) {
url, _ := common.NewURL("")
localIP, _ := gxnet.GetLocalIP()
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
}
func TestRouteReturnEmpty(t *testing.T) {
func TestRoute_ReturnEmpty(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url, _ := common.NewURL("")
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => "))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
}
func TestRouteReturnAll(t *testing.T) {
func TestRoute_ReturnAll(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
urlString := "dubbo://" + localIP + "/com.foo.BarService"
dubboURL, _ := common.NewURL(urlString)
......@@ -243,118 +252,118 @@ func TestRouteReturnAll(t *testing.T) {
invokers := []protocol.Invoker{mockInvoker1, mockInvoker2, mockInvoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
}
func TestRouteHostFilter(t *testing.T) {
func TestRoute_HostFilter(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
}
func TestRouteEmptyHostFilter(t *testing.T) {
func TestRoute_Empty_HostFilter(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
}
func TestRouteFalseHostFilter(t *testing.T) {
func TestRoute_False_HostFilter(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
}
func TestRoutePlaceholder(t *testing.T) {
func TestRoute_Placeholder(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 2, len(fileredInvokers))
assert.Equal(t, invoker2, fileredInvokers[0])
assert.Equal(t, invoker3, fileredInvokers[1])
}
func TestRouteNoForce(t *testing.T) {
func TestRoute_NoForce(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrlWithNoForce(rule))
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, invokers, fileredInvokers)
}
func TestRouteForce(t *testing.T) {
func TestRoute_Force(t *testing.T) {
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url1, _ := common.NewURL(factory333URL)
url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP))
invoker1 := NewMockInvoker(url1, 1)
invoker2 := NewMockInvoker(url2, 2)
invoker3 := NewMockInvoker(url3, 3)
invokers := []protocol.Invoker{invoker1, invoker2, invoker3}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4"))
curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService")
router, _ := newConditionRouterFactory().NewRouter(getRouteUrlWithForce(rule, "true"))
rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP)))
curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP))
router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"))
fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv)
assert.Equal(t, 0, len(fileredInvokers))
}
......
......@@ -77,10 +77,7 @@ func (f *FileConditionRouter) URL() common.URL {
}
func parseCondition(conditions []string) string {
var (
when string
then string
)
var when, then string
for _, condition := range conditions {
condition = strings.Trim(condition, " ")
if strings.Contains(condition, "=>") {
......@@ -101,10 +98,7 @@ func parseCondition(conditions []string) string {
then = provider
}
}
}
}
return strings.Join([]string{when, then}, " => ")
}
......@@ -181,9 +181,7 @@ func parseRule(rule string) (map[string]MatchPair, error) {
return condition, nil
}
var (
pair MatchPair
)
var pair MatchPair
values := gxset.NewSet()
matches := routerPatternReg.FindAllSubmatch([]byte(rule), -1)
for _, groups := range matches {
......
......@@ -18,6 +18,7 @@
package healthcheck
import (
"fmt"
"math"
"testing"
)
......@@ -32,10 +33,16 @@ import (
"github.com/apache/dubbo-go/protocol"
)
func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
const (
healthCheckDubbo1010IP = "192.168.10.10"
healthCheckDubbo1011IP = "192.168.10.11"
healthCheckMethodTest = "test"
healthCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider"
)
func TestDefaultHealthCheckerIsHealthy(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
invoker := NewMockInvoker(url)
healthy := hc.IsHealthy(invoker)
......@@ -45,7 +52,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
// fake the outgoing request
for i := 0; i < 11; i++ {
request(url, "test", 0, true, false)
request(url, healthCheckMethodTest, 0, true, false)
}
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
......@@ -54,7 +61,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
// successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, "test", 0, false, false)
request(url, healthCheckMethodTest, 0, false, false)
}
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
......@@ -63,18 +70,18 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
assert.False(t, hc.IsHealthy(invoker))
// reset successive failed count and go to healthy
request(url, "test", 0, false, true)
request(url, healthCheckMethodTest, 0, false, true)
healthy = hc.IsHealthy(invoker)
assert.True(t, hc.IsHealthy(invoker))
}
func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
func TestDefaultHealthCheckerGetCircuitBreakerSleepWindowTime(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
request(url, healthCheckMethodTest, 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
......@@ -84,48 +91,48 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP))
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime == 0)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
}
func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) {
func TestDefaultHealthCheckerGetCircuitBreakerTimeout(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url))
assert.True(t, timeout == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP))
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
request(url1, healthCheckMethodTest, 1, false, false)
timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1))
// timeout must after the current time
assert.True(t, timeout > protocol.CurrentTimeMillis())
}
func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) {
func TestDefaultHealthCheckerIsCircuitBreakerTripped(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
status := protocol.GetURLStatus(url)
tripped := defaultHc.isCircuitBreakerTripped(status)
assert.False(t, tripped)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
request(url, healthCheckMethodTest, 1, false, false)
}
tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url))
assert.True(t, tripped)
......@@ -134,13 +141,13 @@ func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) {
func TestNewDefaultHealthChecker(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))
url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker)
......
......@@ -33,11 +33,11 @@ type HealthCheckRouteFactory struct {
}
// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory
func newHealthCheckRouteFactory() router.RouterFactory {
func newHealthCheckRouteFactory() router.PriorityRouterFactory {
return &HealthCheckRouteFactory{}
}
// NewRouter construct a new NewHealthCheckRouter via url
func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) {
// NewPriorityRouter construct a new NewHealthCheckRouter via url
func (f *HealthCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewHealthCheckRouter(url)
}
......@@ -38,7 +38,7 @@ type HealthCheckRouter struct {
}
// NewHealthCheckRouter construct an HealthCheckRouter via url
func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) {
r := &HealthCheckRouter{
url: url,
enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false),
......
......@@ -18,6 +18,7 @@
package healthcheck
import (
"fmt"
"math"
"testing"
"time"
......@@ -34,13 +35,22 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestHealthCheckRouter_Route(t *testing.T) {
const (
healthCheckRoute1010IP = "192.168.10.10"
healthCheckRoute1011IP = "192.168.10.11"
healthCheckRoute1012IP = "192.168.10.12"
healthCheckRouteMethodNameTest = "test"
healthCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider"
healthCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider"
)
func TestHealthCheckRouterRoute(t *testing.T) {
defer protocol.CleanAllStatus()
consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider")
consumerURL, _ := common.NewURL(healthCheck1001URL)
consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true")
url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url2, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
url3, _ := common.NewURL("dubbo://192.168.10.12:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP))
url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP))
url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP))
hcr, _ := NewHealthCheckRouter(&consumerURL)
var invokers []protocol.Invoker
......@@ -48,21 +58,21 @@ func TestHealthCheckRouter_Route(t *testing.T) {
invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3)
invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation("test", nil, nil)
inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil)
res := hcr.Route(invokers, &consumerURL, inv)
// now all invokers are healthy
assert.True(t, len(res) == len(invokers))
for i := 0; i < 10; i++ {
request(url1, "test", 0, false, false)
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
// invokers1 is unhealthy now
assert.True(t, len(res) == 2 && !contains(res, invoker1))
for i := 0; i < 10; i++ {
request(url1, "test", 0, false, false)
request(url2, "test", 0, false, false)
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
request(url2, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
......@@ -70,9 +80,9 @@ func TestHealthCheckRouter_Route(t *testing.T) {
assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2))
for i := 0; i < 10; i++ {
request(url1, "test", 0, false, false)
request(url2, "test", 0, false, false)
request(url3, "test", 0, false, false)
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
request(url2, healthCheckRouteMethodNameTest, 0, false, false)
request(url3, healthCheckRouteMethodNameTest, 0, false, false)
}
res = hcr.Route(invokers, &consumerURL, inv)
......@@ -80,12 +90,12 @@ func TestHealthCheckRouter_Route(t *testing.T) {
assert.True(t, len(res) == 3)
// reset the invoker1 successive failed count, so invoker1 go to healthy
request(url1, "test", 0, false, true)
request(url1, healthCheckRouteMethodNameTest, 0, false, true)
res = hcr.Route(invokers, &consumerURL, inv)
assert.True(t, contains(res, invoker1))
for i := 0; i < 6; i++ {
request(url1, "test", 0, false, false)
request(url1, healthCheckRouteMethodNameTest, 0, false, false)
}
// now all invokers are unhealthy, so downgraded to all again
res = hcr.Route(invokers, &consumerURL, inv)
......@@ -108,7 +118,7 @@ func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool {
func TestNewHealthCheckRouter(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP))
hcr, _ := NewHealthCheckRouter(&url)
h := hcr.(*HealthCheckRouter)
assert.Nil(t, h.checker)
......
......@@ -23,34 +23,30 @@ import (
)
// Extension - Router
// RouterFactory Router create factory
type RouterFactory interface {
// NewRouter Create router instance with URL
NewRouter(*common.URL) (Router, error)
// PriorityRouterFactory creates creates priority router with url
type PriorityRouterFactory interface {
// NewPriorityRouter creates router instance with URL
NewPriorityRouter(*common.URL) (PriorityRouter, error)
}
// RouterFactory Router create factory use for parse config file
type FileRouterFactory interface {
// FilePriorityRouterFactory creates priority router with parse config file
type FilePriorityRouterFactory interface {
// NewFileRouters Create file router with config file
NewFileRouter([]byte) (Router, error)
NewFileRouter([]byte) (PriorityRouter, error)
}
// Router
type Router interface {
type router interface {
// Route Determine the target invokers list.
Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker
// Priority Return Priority in router
// 0 to ^int(0) is better
Priority() int64
// URL Return URL in router
URL() common.URL
}
// Chain
type Chain interface {
// Route Determine the target invokers list with chain.
Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker
// AddRouters Add routers
AddRouters([]Router)
// Router
type PriorityRouter interface {
router
// Priority Return Priority in router
// 0 to ^int(0) is better
Priority() int64
}
......@@ -31,17 +31,17 @@ func init() {
type tagRouterFactory struct{}
// NewTagRouterFactory create a tagRouterFactory
func NewTagRouterFactory() router.RouterFactory {
func NewTagRouterFactory() router.PriorityRouterFactory {
return &tagRouterFactory{}
}
// NewRouter create a tagRouter by tagRouterFactory with a url
// NewPriorityRouter create a tagRouter by tagRouterFactory with a url
// The url contains router configuration information
func (c *tagRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
func (c *tagRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
return NewTagRouter(url)
}
// NewFileRouter create a tagRouter by profile content
func (c *tagRouterFactory) NewFileRouter(content []byte) (router.Router, error) {
func (c *tagRouterFactory) NewFileRouter(content []byte) (router.PriorityRouter, error) {
return NewFileTagRouter(content)
}
......@@ -18,6 +18,7 @@
package tag
import (
"fmt"
"testing"
)
......@@ -29,11 +30,16 @@ import (
"github.com/apache/dubbo-go/common"
)
func TestTagRouterFactory_NewRouter(t *testing.T) {
u1, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true")
const (
factoryLocalIP = "127.0.0.1"
factoryFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true"
)
func TestTagRouterFactoryNewRouter(t *testing.T) {
u1, err := common.NewURL(fmt.Sprintf(factoryFormat, factoryLocalIP))
assert.Nil(t, err)
factory := NewTagRouterFactory()
tagRouter, e := factory.NewRouter(&u1)
tagRouter, e := factory.NewPriorityRouter(&u1)
assert.Nil(t, e)
assert.NotNil(t, tagRouter)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment