diff --git a/cluster/cluster_impl/base_cluster_invoker_test.go b/cluster/cluster_impl/base_cluster_invoker_test.go index 695ffcddbbce5a1c65f806b4561670d726588aaa..8121e5c0eab16b92b323fbc0e6e944231d1ed1b9 100644 --- a/cluster/cluster_impl/base_cluster_invoker_test.go +++ b/cluster/cluster_impl/base_cluster_invoker_test.go @@ -33,10 +33,15 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) +const ( + baseClusterInvokerMethodName = "getUser" + baseClusterInvokerFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider" +) + func TestStickyNormal(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i)) url.SetParam("sticky", "true") invokers = append(invokers, NewMockInvoker(url, 1)) } @@ -45,7 +50,7 @@ func TestStickyNormal(t *testing.T) { invoked := []protocol.Invoker{} tmpRandomBalance := loadbalance.NewRandomLoadBalance() - tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil) + tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil) result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) assert.Equal(t, result, result1) @@ -54,7 +59,7 @@ func TestStickyNormal(t *testing.T) { func TestStickyNormalWhenError(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i)) url.SetParam("sticky", "true") invokers = append(invokers, NewMockInvoker(url, 1)) } @@ -62,8 +67,8 @@ func TestStickyNormalWhenError(t *testing.T) { base.availablecheck = true invoked := []protocol.Invoker{} - result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) + result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked) invoked = append(invoked, result) - result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) + result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked) assert.NotEqual(t, result, result1) } diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go index 46b0ff634e56c45223a5aeb5566b9b1401518960..af17a93756a6f558c7da063eec9d8052b83cbe69 100644 --- a/cluster/cluster_impl/failback_cluster_invoker.go +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -72,6 +72,19 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { return invoker } +func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) { + invoked := make([]protocol.Invoker, 0) + invoked = append(invoked, retryTask.lastInvoker) + + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + var result protocol.Result + result = retryInvoker.Invoke(ctx, retryTask.invocation) + if result.Error() != nil { + retryTask.lastInvoker = retryInvoker + invoker.checkRetry(retryTask, result.Error()) + } +} + func (invoker *failbackClusterInvoker) process(ctx context.Context) { invoker.ticker = time.NewTicker(time.Second * 1) for range invoker.ticker.C { @@ -91,25 +104,11 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) { } // ignore return. the get must success. - _, err = invoker.taskList.Get(1) - if err != nil { + if _, err = invoker.taskList.Get(1); err != nil { logger.Warnf("get task found err: %v\n", err) break } - - go func(retryTask *retryTimerTask) { - invoked := make([]protocol.Invoker, 0) - invoked = append(invoked, retryTask.lastInvoker) - - retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) - var result protocol.Result - result = retryInvoker.Invoke(ctx, retryTask.invocation) - if result.Error() != nil { - retryTask.lastInvoker = retryInvoker - invoker.checkRetry(retryTask, result.Error()) - } - }(retryTask) - + go invoker.tryTimerTaskProc(ctx, retryTask) } } } @@ -129,29 +128,26 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", invocation.MethodName(), invoker.GetUrl().Service(), err) return &protocol.RPCResult{} } - url := invokers[0].GetUrl() - methodName := invocation.MethodName() + //Get the service loadbalance config + url := invokers[0].GetUrl() lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) - //Get the service method loadbalance config if have + methodName := invocation.MethodName() if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { lb = v } - loadbalance := extension.GetLoadbalance(lb) + loadBalance := extension.GetLoadbalance(lb) invoked := make([]protocol.Invoker, 0, len(invokers)) - var result protocol.Result - - ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked) //DO INVOKE - result = ivk.Invoke(ctx, invocation) + result := ivk.Invoke(ctx, invocation) if result.Error() != nil { invoker.once.Do(func() { invoker.taskList = queue.New(invoker.failbackTasks) @@ -164,7 +160,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr return &protocol.RPCResult{} } - timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk) + timerTask := newRetryTimerTask(loadBalance, invocation, invokers, ivk) invoker.taskList.Put(timerTask) logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", @@ -172,7 +168,6 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr // ignore return &protocol.RPCResult{} } - return result } diff --git a/cluster/loadbalance/consistent_hash_test.go b/cluster/loadbalance/consistent_hash_test.go index a44293172c6e2c96bd098a19306f69260b713689..9f22d39dc46243dddda89151e07dbea39ab933fb 100644 --- a/cluster/loadbalance/consistent_hash_test.go +++ b/cluster/loadbalance/consistent_hash_test.go @@ -18,6 +18,7 @@ package loadbalance import ( + "fmt" "testing" ) @@ -32,6 +33,19 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) +const ( + ip = "192.168.1.0" + port8080 = 8080 + port8082 = 8082 + + url8080Short = "dubbo://192.168.1.0:8080" + url8081Short = "dubbo://192.168.1.0:8081" + url20000 = "dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1" + url8080 = "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1" + url8081 = "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1" + url8082 = "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1" +) + func TestConsistentHashSelectorSuite(t *testing.T) { suite.Run(t, new(consistentHashSelectorSuite)) } @@ -43,8 +57,7 @@ type consistentHashSelectorSuite struct { func (s *consistentHashSelectorSuite) SetupTest() { var invokers []protocol.Invoker - url, _ := common.NewURL( - "dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + url, _ := common.NewURL(url20000) invokers = append(invokers, protocol.NewBaseInvoker(url)) s.selector = newConsistentHashSelector(invokers, "echo", 999944) } @@ -55,14 +68,14 @@ func (s *consistentHashSelectorSuite) TestToKey() { } func (s *consistentHashSelectorSuite) TestSelectForKey() { - url1, _ := common.NewURL("dubbo://192.168.1.0:8080") - url2, _ := common.NewURL("dubbo://192.168.1.0:8081") + url1, _ := common.NewURL(url8080Short) + url2, _ := common.NewURL(url8081Short) s.selector.virtualInvokers = make(map[uint32]protocol.Invoker) s.selector.virtualInvokers[99874] = protocol.NewBaseInvoker(url1) s.selector.virtualInvokers[9999945] = protocol.NewBaseInvoker(url2) s.selector.keys = []uint32{99874, 9999945} result := s.selector.selectForKey(9999944) - s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?") + s.Equal(result.GetUrl().String(), url8081Short+"?") } func TestConsistentHashLoadBalanceSuite(t *testing.T) { @@ -83,11 +96,11 @@ type consistentHashLoadBalanceSuite struct { func (s *consistentHashLoadBalanceSuite) SetupTest() { var err error - s.url1, err = common.NewURL("dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + s.url1, err = common.NewURL(url8080) s.NoError(err) - s.url2, err = common.NewURL("dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + s.url2, err = common.NewURL(url8081) s.NoError(err) - s.url3, err = common.NewURL("dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1") + s.url3, err = common.NewURL(url8082) s.NoError(err) s.invoker1 = protocol.NewBaseInvoker(s.url1) @@ -101,9 +114,9 @@ func (s *consistentHashLoadBalanceSuite) SetupTest() { func (s *consistentHashLoadBalanceSuite) TestSelect() { args := []interface{}{"name", "password", "age"} invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil)) - s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080") + s.Equal(invoker.GetUrl().Location, fmt.Sprintf("%s:%d", ip, port8080)) args = []interface{}{"ok", "abc"} invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil)) - s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082") + s.Equal(invoker.GetUrl().Location, fmt.Sprintf("%s:%d", ip, port8082)) } diff --git a/cluster/loadbalance/random_test.go b/cluster/loadbalance/random_test.go index 88392de52c93579dd4def3da2d60b415b601b21e..b94d7da43d5bd42e6798fca750c8616830a8df8f 100644 --- a/cluster/loadbalance/random_test.go +++ b/cluster/loadbalance/random_test.go @@ -36,18 +36,24 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) +const ( + tmpUrl = "dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider" + tmpUrlFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider" + tmpIp = "192.168.1.100" +) + func TestRandomlbSelect(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", 0)) + url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, 0)) invokers = append(invokers, protocol.NewBaseInvoker(url)) i := randomlb.Select(invokers, &invocation.RPCInvocation{}) assert.True(t, i.GetUrl().URLEqual(url)) for i := 1; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) invokers = append(invokers, protocol.NewBaseInvoker(url)) } randomlb.Select(invokers, &invocation.RPCInvocation{}) @@ -58,13 +64,13 @@ func TestRandomlbSelectWeight(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) invokers = append(invokers, protocol.NewBaseInvoker(url)) } urlParams := url.Values{} urlParams.Set("methods.test."+constant.WEIGHT_KEY, "10000000000000") - urll, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams)) + urll, _ := common.NewURL(tmpUrl, common.WithParams(urlParams)) invokers = append(invokers, protocol.NewBaseInvoker(urll)) ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) @@ -72,7 +78,7 @@ func TestRandomlbSelectWeight(t *testing.T) { var selected float64 for i := 0; i < 10000; i++ { s := randomlb.Select(invokers, ivc) - if s.GetUrl().Ip == "192.168.1.100" { + if s.GetUrl().Ip == tmpIp { selected++ } selectedInvoker = append(selectedInvoker, s) @@ -89,13 +95,13 @@ func TestRandomlbSelectWarmup(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) invokers = append(invokers, protocol.NewBaseInvoker(url)) } urlParams := url.Values{} urlParams.Set(constant.REMOTE_TIMESTAMP_KEY, strconv.FormatInt(time.Now().Add(time.Minute*(-9)).Unix(), 10)) - urll, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams)) + urll, _ := common.NewURL(tmpUrl, common.WithParams(urlParams)) invokers = append(invokers, protocol.NewBaseInvoker(urll)) ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) @@ -103,7 +109,7 @@ func TestRandomlbSelectWarmup(t *testing.T) { var selected float64 for i := 0; i < 10000; i++ { s := randomlb.Select(invokers, ivc) - if s.GetUrl().Ip == "192.168.1.100" { + if s.GetUrl().Ip == tmpIp { selected++ } selectedInvoker = append(selectedInvoker, s) diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index b4e4e1c0bd6ec02fdc19ab0fa2de0f4de80457fd..c1f723525f5307e7732f0ea1ecc27eca7ba09c8d 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -20,7 +20,6 @@ package chain import ( "encoding/base64" "fmt" - "strconv" "testing" "time" ) @@ -43,9 +42,22 @@ import ( ) const ( - path = "/dubbo/config/dubbo/test-condition.condition-router" - zkPrefix = "zookeeper://127.0.0.1:" - anyUrl = "condition://0.0.0.0/com.foo.BarService" + localIP = "127.0.0.1" + test1234IP = "1.2.3.4" + test1111IP = "1.1.1.1" + test0000IP = "0.0.0.0" + port20000 = 20000 + + path = "/dubbo/config/dubbo/test-condition.condition-router" + zkFormat = "zookeeper://%s:%d" + consumerFormat = "consumer://%s/com.foo.BarService" + dubboForamt = "dubbo://%s:%d/com.foo.BarService" + anyUrlFormat = "condition://%s/com.foo.BarService" + zk = "zookeeper" + applicationKey = "test-condition" + applicationField = "application" + forceField = "force" + forceValue = "true" ) func TestNewRouterChain(t *testing.T) { @@ -66,14 +78,14 @@ conditions: defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) assert.Nil(t, err) assert.NotNil(t, configuration) - chain, err := NewRouterChain(getRouteUrl("test-condition")) + chain, err := NewRouterChain(getRouteUrl(applicationKey)) assert.Nil(t, err) assert.Equal(t, 1, len(chain.routers)) appRouter := chain.routers[0].(*condition.AppRouter) @@ -116,15 +128,15 @@ conditions: defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) - chain, err := NewRouterChain(getConditionRouteUrl("test-condition")) + chain, err := NewRouterChain(getConditionRouteUrl(applicationKey)) assert.Nil(t, err) assert.Equal(t, 2, len(chain.routers)) - url := getConditionRouteUrl("test-condition") + url := getConditionRouteUrl(applicationKey) assert.NotNil(t, url) factory := extension.GetRouterFactory(url.Protocol) r, err := factory.NewPriorityRouter(url) @@ -142,22 +154,22 @@ func TestRouterChainRoute(t *testing.T) { defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) - chain, err := NewRouterChain(getConditionRouteUrl("test-condition")) + chain, err := NewRouterChain(getConditionRouteUrl(applicationKey)) assert.Nil(t, err) assert.Equal(t, 1, len(chain.routers)) - url := getConditionRouteUrl("test-condition") + url := getConditionRouteUrl(applicationKey) assert.NotNil(t, url) invokers := []protocol.Invoker{} - dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService")) + dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000)) invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) - targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService")) + targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP)) inv := &invocation.RPCInvocation{} finalInvokers := chain.Route(invokers, &targetURL, inv) @@ -182,46 +194,46 @@ conditions: defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) - chain, err := NewRouterChain(getConditionRouteUrl("test-condition")) + chain, err := NewRouterChain(getConditionRouteUrl(applicationKey)) assert.Nil(t, err) assert.Equal(t, 2, len(chain.routers)) invokers := []protocol.Invoker{} - dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService")) + dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000)) invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) - targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService")) + targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP)) inv := &invocation.RPCInvocation{} finalInvokers := chain.Route(invokers, &targetURL, inv) assert.Equal(t, 0, len(finalInvokers)) } -func TestRouterChain_Route_NoRoute(t *testing.T) { +func TestRouterChainRouteNoRoute(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) - chain, err := NewRouterChain(getConditionNoRouteUrl("test-condition")) + chain, err := NewRouterChain(getConditionNoRouteUrl(applicationKey)) assert.Nil(t, err) assert.Equal(t, 1, len(chain.routers)) - url := getConditionRouteUrl("test-condition") + url := getConditionRouteUrl(applicationKey) assert.NotNil(t, url) invokers := []protocol.Invoker{} - dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService")) + dubboURL, _ := common.NewURL(fmt.Sprintf(dubboForamt, test1234IP, port20000)) invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) - targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService")) + targetURL, _ := common.NewURL(fmt.Sprintf(consumerFormat, test1111IP)) inv := &invocation.RPCInvocation{} finalInvokers := chain.Route(invokers, &targetURL, inv) @@ -229,26 +241,26 @@ func TestRouterChain_Route_NoRoute(t *testing.T) { } func getConditionNoRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL(anyUrl) - url.AddParam("application", applicationKey) - url.AddParam("force", "true") + url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP)) + url.AddParam(applicationField, applicationKey) + url.AddParam(forceField, forceValue) rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host != 1.2.3.4")) url.AddParam(constant.RULE_KEY, rule) return &url } func getConditionRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL(anyUrl) - url.AddParam("application", applicationKey) - url.AddParam("force", "true") + url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP)) + url.AddParam(applicationField, applicationKey) + url.AddParam(forceField, forceValue) rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host = 1.2.3.4")) url.AddParam(constant.RULE_KEY, rule) return &url } func getRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL(anyUrl) - url.AddParam("application", applicationKey) - url.AddParam("force", "true") + url, _ := common.NewURL(fmt.Sprintf(anyUrlFormat, test0000IP)) + url.AddParam(applicationField, applicationKey) + url.AddParam(forceField, forceValue) return &url } diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index f37a483e8468bc57d3ce1e73172ccf9a05bc29f0..8b38f2dd6136b4d31f46e7214c0ad1359537b198 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -18,7 +18,7 @@ package condition import ( - "strconv" + "fmt" "testing" "time" ) @@ -31,6 +31,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/remoting" @@ -38,7 +39,15 @@ import ( ) const ( - path = "/dubbo/config/dubbo/test-condition.condition-router" + routerPath = "/dubbo/config/dubbo/test-condition.condition-router" + routerLocalIP = "127.0.0.1" + routerZk = "zookeeper" + routerKey = "test-condition" +) + +var ( + zkFormat = "zookeeper://%s:%d" + conditionFormat = "condition://%s/com.foo.BarService" ) func TestNewAppRouter(t *testing.T) { @@ -51,22 +60,22 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create(path) + err = z.Create(routerPath) assert.NoError(t, err) - _, err = z.Conn.Set(path, []byte(testYML), 0) + _, err = z.Conn.Set(routerPath, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) assert.Nil(t, err) assert.NotNil(t, configuration) - appRouteURL := getAppRouteURL("test-condition") + appRouteURL := getAppRouteURL(routerKey) appRouter, err := NewAppRouter(appRouteURL) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -97,22 +106,22 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create(path) + err = z.Create(routerPath) assert.NoError(t, err) - _, err = z.Conn.Set(path, []byte(testYML), 0) + _, err = z.Conn.Set(routerPath, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) assert.Nil(t, err) assert.NotNil(t, configuration) - appRouteURL := getAppRouteURL("test-condition") + appRouteURL := getAppRouteURL(routerKey) appRouter, err := NewAppRouter(appRouteURL) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -134,22 +143,22 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create(path) + err = z.Create(routerPath) assert.NoError(t, err) - _, err = z.Conn.Set(path, []byte(testYML), 0) + _, err = z.Conn.Set(routerPath, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) - configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) assert.Nil(t, err) assert.NotNil(t, configuration) - appRouteURL := getAppRouteURL("test-condition") + appRouteURL := getAppRouteURL(routerKey) appRouter, err := NewAppRouter(appRouteURL) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -175,7 +184,7 @@ conditions: } func getAppRouteURL(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(fmt.Sprintf(conditionFormat, constant.ANYHOST_VALUE)) url.AddParam("application", applicationKey) url.AddParam("force", "true") return &url diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index d080a46cc736c71f22e9e3d7f8b236b4b5307362..0f61b39fc71af3aaeffc731974a0fa997503693e 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -33,12 +33,21 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) -const anyUrl = "condition://0.0.0.0/com.foo.BarService" +const ( + factory1111Ip = "1.1.1.1" + factoryUrlFormat = "condition://%s/com.foo.BarService" + factoryDubboFormat = "dubbo://%s:20880/com.foo.BarService" + factoryConsumerMethodFormat = "consumer://%s/com.foo.BarService?methods=getFoo" + factory333URL = "dubbo://10.20.3.3:20880/com.foo.BarService" + factoryConsumerFormat = "consumer://%s/com.foo.BarService" + factoryHostIp1234Format = "host = %s => host = 1.2.3.4" +) type MockInvoker struct { url common.URL @@ -61,21 +70,21 @@ func (bi *MockInvoker) GetUrl() common.URL { } func getRouteUrl(rule string) *common.URL { - url, _ := common.NewURL(anyUrl) + url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE)) url.AddParam("rule", rule) url.AddParam("force", "true") return &url } func getRouteUrlWithForce(rule, force string) *common.URL { - url, _ := common.NewURL(anyUrl) + url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE)) url.AddParam("rule", rule) url.AddParam("force", force) return &url } func getRouteUrlWithNoForce(rule string) *common.URL { - url, _ := common.NewURL(anyUrl) + url, _ := common.NewURL(fmt.Sprintf(factoryUrlFormat, constant.ANYHOST_VALUE)) url.AddParam("rule", rule) return &url } @@ -118,11 +127,11 @@ func (bi *MockInvoker) Destroy() { bi.available = false } -func TestRouteMatchWhen(t *testing.T) { +func TestRoute_matchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) - cUrl, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService") + cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip)) matchWhen := router.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen) rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) @@ -151,12 +160,12 @@ func TestRouteMatchWhen(t *testing.T) { assert.Equal(t, true, matchWhen6) } -func TestRouteMatchFilter(t *testing.T) { +func TestRoute_matchFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() t.Logf("The local ip is %s", localIP) url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invokers := []protocol.Invoker{NewMockInvoker(url1, 1), NewMockInvoker(url2, 2), NewMockInvoker(url3, 3)} rule1 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.3")) rule2 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.* & host != 10.20.3.3")) @@ -170,7 +179,7 @@ func TestRouteMatchFilter(t *testing.T) { router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4)) router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5)) router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6)) - cUrl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{}) fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{}) fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{}) @@ -186,22 +195,22 @@ func TestRouteMatchFilter(t *testing.T) { } -func TestRouteMethodRoute(t *testing.T) { +func TestRoute_methodRoute(t *testing.T) { inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") matchWhen := router.(*ConditionRouter).MatchWhen(&url, inv) assert.Equal(t, true, matchWhen) - url1, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo") + url1, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) matchWhen = router.(*ConditionRouter).MatchWhen(&url1, inv) assert.Equal(t, true, matchWhen) - url2, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo") + url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) matchWhen = router2.(*ConditionRouter).MatchWhen(&url2, inv) assert.Equal(t, false, matchWhen) - url3, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo") + url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) matchWhen = router3.(*ConditionRouter).MatchWhen(&url3, inv) @@ -209,31 +218,31 @@ func TestRouteMethodRoute(t *testing.T) { } -func TestRouteReturnFalse(t *testing.T) { +func TestRoute_ReturnFalse(t *testing.T) { url, _ := common.NewURL("") localIP, _ := gxnet.GetLocalIP() invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } -func TestRouteReturnEmpty(t *testing.T) { +func TestRoute_ReturnEmpty(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url, _ := common.NewURL("") invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } -func TestRouteReturnAll(t *testing.T) { +func TestRoute_ReturnAll(t *testing.T) { localIP, _ := gxnet.GetLocalIP() urlString := "dubbo://" + localIP + "/com.foo.BarService" dubboURL, _ := common.NewURL(urlString) @@ -243,24 +252,24 @@ func TestRouteReturnAll(t *testing.T) { invokers := []protocol.Invoker{mockInvoker1, mockInvoker2, mockInvoker3} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, invokers, fileredInvokers) } -func TestRouteHostFilter(t *testing.T) { +func TestRoute_HostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url1, _ := common.NewURL(factory333URL) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) invokers := []protocol.Invoker{invoker1, invoker2, invoker3} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) @@ -268,18 +277,18 @@ func TestRouteHostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRouteEmptyHostFilter(t *testing.T) { +func TestRoute_Empty_HostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url1, _ := common.NewURL(factory333URL) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) invokers := []protocol.Invoker{invoker1, invoker2, invoker3} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) @@ -287,18 +296,18 @@ func TestRouteEmptyHostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRouteFalseHostFilter(t *testing.T) { +func TestRoute_False_HostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url1, _ := common.NewURL(factory333URL) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) invokers := []protocol.Invoker{invoker1, invoker2, invoker3} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) @@ -306,18 +315,18 @@ func TestRouteFalseHostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoutePlaceholder(t *testing.T) { +func TestRoute_Placeholder(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url1, _ := common.NewURL(factory333URL) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) invokers := []protocol.Invoker{invoker1, invoker2, invoker3} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) @@ -325,35 +334,35 @@ func TestRoutePlaceholder(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRouteNoForce(t *testing.T) { +func TestRoute_NoForce(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url1, _ := common.NewURL(factory333URL) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) invokers := []protocol.Invoker{invoker1, invoker2, invoker3} inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP))) + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule)) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, invokers, fileredInvokers) } -func TestRouteForce(t *testing.T) { +func TestRoute_Force(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") - url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) - url3, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) + url1, _ := common.NewURL(factory333URL) + url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) invokers := []protocol.Invoker{invoker1, invoker2, invoker3} inv := &invocation.RPCInvocation{} - rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) - curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") + rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP))) + curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true")) fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) diff --git a/cluster/router/condition/file.go b/cluster/router/condition/file.go index b2c876690043d18a1a9e746fee13f06c77a0de03..eabdf1c263446140b359b3e791238b020cecb50c 100644 --- a/cluster/router/condition/file.go +++ b/cluster/router/condition/file.go @@ -77,10 +77,7 @@ func (f *FileConditionRouter) URL() common.URL { } func parseCondition(conditions []string) string { - var ( - when string - then string - ) + var when, then string for _, condition := range conditions { condition = strings.Trim(condition, " ") if strings.Contains(condition, "=>") { @@ -101,10 +98,7 @@ func parseCondition(conditions []string) string { then = provider } } - } - } - return strings.Join([]string{when, then}, " => ") } diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go index 0267a3c7a462acb43f84ccb4701247147699804a..40a251573f5e73d40032972313565d98b288b1b1 100644 --- a/cluster/router/condition/router.go +++ b/cluster/router/condition/router.go @@ -181,9 +181,7 @@ func parseRule(rule string) (map[string]MatchPair, error) { return condition, nil } - var ( - pair MatchPair - ) + var pair MatchPair values := gxset.NewSet() matches := routerPatternReg.FindAllSubmatch([]byte(rule), -1) for _, groups := range matches { diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 8a95d9a7e8dffdc3f30f94c76274a729837fc133..5d35ae8e486e3f7b29b2a68a3864ef806a1053c7 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -18,6 +18,7 @@ package healthcheck import ( + "fmt" "math" "testing" ) @@ -32,10 +33,16 @@ import ( "github.com/apache/dubbo-go/protocol" ) -func TestDefaultHealthChecker_IsHealthy(t *testing.T) { +const ( + healthCheckDubbo1010IP = "192.168.10.10" + healthCheckDubbo1011IP = "192.168.10.11" + healthCheckMethodTest = "test" + healthCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) +func TestDefaultHealthCheckerIsHealthy(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) invoker := NewMockInvoker(url) healthy := hc.IsHealthy(invoker) @@ -45,7 +52,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") // fake the outgoing request for i := 0; i < 11; i++ { - request(url, "test", 0, true, false) + request(url, healthCheckMethodTest, 0, true, false) } hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) healthy = hc.IsHealthy(invoker) @@ -54,7 +61,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { // successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy for i := 0; i < 11; i++ { - request(url, "test", 0, false, false) + request(url, healthCheckMethodTest, 0, false, false) } url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") @@ -63,18 +70,18 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { assert.False(t, hc.IsHealthy(invoker)) // reset successive failed count and go to healthy - request(url, "test", 0, false, true) + request(url, healthCheckMethodTest, 0, false, true) healthy = hc.IsHealthy(invoker) assert.True(t, hc.IsHealthy(invoker)) } -func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { +func TestDefaultHealthCheckerGetCircuitBreakerSleepWindowTime(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) // Increase the number of failed requests for i := 0; i < 100; i++ { - request(url, "test", 1, false, false) + request(url, healthCheckMethodTest, 1, false, false) } sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) @@ -84,48 +91,48 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) assert.True(t, sleepWindowTime == 0) - url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP)) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) assert.True(t, sleepWindowTime == 0) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) } -func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { +func TestDefaultHealthCheckerGetCircuitBreakerTimeout(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) assert.True(t, timeout == 0) - url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) - request(url1, "test", 1, false, false) + url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1011IP)) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) + request(url1, healthCheckMethodTest, 1, false, false) timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1)) // timeout must after the current time assert.True(t, timeout > protocol.CurrentTimeMillis()) } -func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { +func TestDefaultHealthCheckerIsCircuitBreakerTripped(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) status := protocol.GetURLStatus(url) tripped := defaultHc.isCircuitBreakerTripped(status) assert.False(t, tripped) // Increase the number of failed requests for i := 0; i < 100; i++ { - request(url, "test", 1, false, false) + request(url, healthCheckMethodTest, 1, false, false) } tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url)) assert.True(t, tripped) @@ -134,13 +141,13 @@ func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { func TestNewDefaultHealthChecker(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) assert.NotNil(t, defaultHc) assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) - url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url1, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 7bfffea705bfedade9d1d13ac7e9c380651335dd..d5862fb884114bac0ea2ec9ee8926baac57d5ba6 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -18,6 +18,7 @@ package healthcheck import ( + "fmt" "math" "testing" "time" @@ -34,13 +35,22 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func TestHealthCheckRouter_Route(t *testing.T) { +const ( + healthCheckRoute1010IP = "192.168.10.10" + healthCheckRoute1011IP = "192.168.10.11" + healthCheckRoute1012IP = "192.168.10.12" + healthCheckRouteMethodNameTest = "test" + healthCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" + healthCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestHealthCheckRouterRoute(t *testing.T) { defer protocol.CleanAllStatus() - consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider") + consumerURL, _ := common.NewURL(healthCheck1001URL) consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") - url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") - url2, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") - url3, _ := common.NewURL("dubbo://192.168.10.12:20000/com.ikurento.user.UserProvider") + url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) + url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) + url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) hcr, _ := NewHealthCheckRouter(&consumerURL) var invokers []protocol.Invoker @@ -48,21 +58,21 @@ func TestHealthCheckRouter_Route(t *testing.T) { invoker2 := NewMockInvoker(url2) invoker3 := NewMockInvoker(url3) invokers = append(invokers, invoker1, invoker2, invoker3) - inv := invocation.NewRPCInvocation("test", nil, nil) + inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil) res := hcr.Route(invokers, &consumerURL, inv) // now all invokers are healthy assert.True(t, len(res) == len(invokers)) for i := 0; i < 10; i++ { - request(url1, "test", 0, false, false) + request(url1, healthCheckRouteMethodNameTest, 0, false, false) } res = hcr.Route(invokers, &consumerURL, inv) // invokers1 is unhealthy now assert.True(t, len(res) == 2 && !contains(res, invoker1)) for i := 0; i < 10; i++ { - request(url1, "test", 0, false, false) - request(url2, "test", 0, false, false) + request(url1, healthCheckRouteMethodNameTest, 0, false, false) + request(url2, healthCheckRouteMethodNameTest, 0, false, false) } res = hcr.Route(invokers, &consumerURL, inv) @@ -70,9 +80,9 @@ func TestHealthCheckRouter_Route(t *testing.T) { assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2)) for i := 0; i < 10; i++ { - request(url1, "test", 0, false, false) - request(url2, "test", 0, false, false) - request(url3, "test", 0, false, false) + request(url1, healthCheckRouteMethodNameTest, 0, false, false) + request(url2, healthCheckRouteMethodNameTest, 0, false, false) + request(url3, healthCheckRouteMethodNameTest, 0, false, false) } res = hcr.Route(invokers, &consumerURL, inv) @@ -80,12 +90,12 @@ func TestHealthCheckRouter_Route(t *testing.T) { assert.True(t, len(res) == 3) // reset the invoker1 successive failed count, so invoker1 go to healthy - request(url1, "test", 0, false, true) + request(url1, healthCheckRouteMethodNameTest, 0, false, true) res = hcr.Route(invokers, &consumerURL, inv) assert.True(t, contains(res, invoker1)) for i := 0; i < 6; i++ { - request(url1, "test", 0, false, false) + request(url1, healthCheckRouteMethodNameTest, 0, false, false) } // now all invokers are unhealthy, so downgraded to all again res = hcr.Route(invokers, &consumerURL, inv) @@ -108,7 +118,7 @@ func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool { func TestNewHealthCheckRouter(t *testing.T) { defer protocol.CleanAllStatus() - url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) hcr, _ := NewHealthCheckRouter(&url) h := hcr.(*HealthCheckRouter) assert.Nil(t, h.checker) diff --git a/cluster/router/tag/factory_test.go b/cluster/router/tag/factory_test.go index f11f2944115e0c158b353c7256a850df469b71c5..ee195820c123e1fc67a2c27cd12aaa544650b615 100644 --- a/cluster/router/tag/factory_test.go +++ b/cluster/router/tag/factory_test.go @@ -18,6 +18,7 @@ package tag import ( + "fmt" "testing" ) @@ -29,8 +30,13 @@ import ( "github.com/apache/dubbo-go/common" ) -func TestTagRouterFactory_NewRouter(t *testing.T) { - u1, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true") +const ( + factoryLocalIP = "127.0.0.1" + factoryFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true" +) + +func TestTagRouterFactoryNewRouter(t *testing.T) { + u1, err := common.NewURL(fmt.Sprintf(factoryFormat, factoryLocalIP)) assert.Nil(t, err) factory := NewTagRouterFactory() tagRouter, e := factory.NewPriorityRouter(&u1) diff --git a/cluster/router/tag/file_test.go b/cluster/router/tag/file_test.go index 94fcf9e0e0fabed2445417d14b711f91b65b9e5e..513ba0c0b6c622d6a52fad35a24824121eb71b76 100644 --- a/cluster/router/tag/file_test.go +++ b/cluster/router/tag/file_test.go @@ -29,18 +29,21 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +const ( + fileTestTag = `priority: 100 +force: true` +) + func TestNewFileTagRouter(t *testing.T) { - router, e := NewFileTagRouter([]byte(`priority: 100 -force: true`)) + router, e := NewFileTagRouter([]byte(fileTestTag)) assert.Nil(t, e) assert.NotNil(t, router) assert.Equal(t, 100, router.routerRule.Priority) assert.Equal(t, true, router.routerRule.Force) } -func TestFileTagRouter_URL(t *testing.T) { - router, e := NewFileTagRouter([]byte(`priority: 100 -force: true`)) +func TestFileTagRouterURL(t *testing.T) { + router, e := NewFileTagRouter([]byte(fileTestTag)) assert.Nil(t, e) assert.NotNil(t, router) url := router.URL() @@ -52,9 +55,8 @@ force: true`)) } -func TestFileTagRouter_Priority(t *testing.T) { - router, e := NewFileTagRouter([]byte(`priority: 100 -force: true`)) +func TestFileTagRouterPriority(t *testing.T) { + router, e := NewFileTagRouter([]byte(fileTestTag)) assert.Nil(t, e) assert.NotNil(t, router) priority := router.Priority() diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 280b56c8ccb69eb5d32dae2369bdc862adb8e6fd..000b3ec6724d85590c86456a009d5194c4e71e03 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -32,6 +32,21 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) +const ( + tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou" + tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai" + tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing" + tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true" + tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true" + + tagRouterTestDubboTag = "dubbo.tag" + tagRouterTestDubboForceTag = "dubbo.force.tag" + tagRouterTestHangZhou = "hangzhou" + tagRouterTestGuangZhou = "guangzhou" + tagRouterTestFalse = "false" + tagRouterTestTrue = "true" +) + // MockInvoker is only mock the Invoker to support test tagRouter type MockInvoker struct { url common.URL @@ -73,8 +88,8 @@ func (bi *MockInvoker) Destroy() { bi.available = false } -func TestTagRouter_Priority(t *testing.T) { - u1, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true") +func TestTagRouterPriority(t *testing.T) { + u1, err := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, err) tagRouter, e := NewTagRouter(&u1) assert.Nil(t, e) @@ -82,15 +97,15 @@ func TestTagRouter_Priority(t *testing.T) { assert.Equal(t, int64(0), p) } -func TestTagRouter_Route_force(t *testing.T) { - u1, e1 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true") +func TestTagRouterRouteForce(t *testing.T) { + u1, e1 := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, e1) tagRouter, e := NewTagRouter(&u1) assert.Nil(t, e) - u2, e2 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou") - u3, e3 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai") - u4, e4 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing") + u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) + u3, e3 := common.NewURL(tagRouterTestShangHaiUrl) + u4, e4 := common.NewURL(tagRouterTestBeijingUrl) assert.Nil(t, e2) assert.Nil(t, e3) assert.Nil(t, e4) @@ -100,29 +115,29 @@ func TestTagRouter_Route_force(t *testing.T) { var invokers []protocol.Invoker invokers = append(invokers, inv2, inv3, inv4) inv := &invocation.RPCInvocation{} - inv.SetAttachments("dubbo.tag", "hangzhou") + inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestHangZhou) invRst1 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 1, len(invRst1)) - assert.Equal(t, "hangzhou", invRst1[0].GetUrl().GetParam("dubbo.tag", "")) + assert.Equal(t, tagRouterTestHangZhou, invRst1[0].GetUrl().GetParam(tagRouterTestDubboTag, "")) - inv.SetAttachments("dubbo.tag", "guangzhou") + inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou) invRst2 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 0, len(invRst2)) - inv.SetAttachments("dubbo.force.tag", "false") - inv.SetAttachments("dubbo.tag", "guangzhou") + inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestFalse) + inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou) invRst3 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 3, len(invRst3)) } -func TestTagRouter_Route_noForce(t *testing.T) { - u1, e1 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true") +func TestTagRouterRouteNoForce(t *testing.T) { + u1, e1 := common.NewURL(tagRouterTestUserConsumer) assert.Nil(t, e1) tagRouter, e := NewTagRouter(&u1) assert.Nil(t, e) - u2, e2 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou") - u3, e3 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai") - u4, e4 := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing") + u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) + u3, e3 := common.NewURL(tagRouterTestShangHaiUrl) + u4, e4 := common.NewURL(tagRouterTestShangHaiUrl) assert.Nil(t, e2) assert.Nil(t, e3) assert.Nil(t, e4) @@ -132,16 +147,16 @@ func TestTagRouter_Route_noForce(t *testing.T) { var invokers []protocol.Invoker invokers = append(invokers, inv2, inv3, inv4) inv := &invocation.RPCInvocation{} - inv.SetAttachments("dubbo.tag", "hangzhou") + inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestHangZhou) invRst := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 1, len(invRst)) - assert.Equal(t, "hangzhou", invRst[0].GetUrl().GetParam("dubbo.tag", "")) + assert.Equal(t, tagRouterTestHangZhou, invRst[0].GetUrl().GetParam(tagRouterTestDubboTag, "")) - inv.SetAttachments("dubbo.tag", "guangzhou") - inv.SetAttachments("dubbo.force.tag", "true") + inv.SetAttachments(tagRouterTestDubboTag, tagRouterTestGuangZhou) + inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestTrue) invRst1 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 0, len(invRst1)) - inv.SetAttachments("dubbo.force.tag", "false") + inv.SetAttachments(tagRouterTestDubboForceTag, tagRouterTestFalse) invRst2 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 3, len(invRst2)) }