diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go index 61d1c934522008e4d9bc46bbd57eb6fed6bf00f9..063100020ad36192a051d1e736af7264cd8df42d 100644 --- a/cluster/cluster_impl/available_cluster_invoker_test.go +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "strings" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,7 +41,8 @@ import ( ) var ( - availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", + constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker { diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index cabd6c5f17cd3a3310054c0ff7b9a9877d581345..bbdfa715d7cdc461689e60a5a41171ad5c9770e1 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { var selectedInvoker protocol.Invoker + if len(invokers) <= 0 { + return selectedInvoker + } + url := invokers[0].GetUrl() sticky := url.GetParamBool(constant.STICKY_KEY, false) //Get the service method sticky config if have @@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p invoker.stickyInvoker = nil } - if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { - if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() { - return invoker.stickyInvoker - } + if sticky && invoker.availablecheck && + invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() && + (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { + return invoker.stickyInvoker } selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked) - if sticky { invoker.stickyInvoker = selectedInvoker } return selectedInvoker - } func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { diff --git a/cluster/cluster_impl/base_cluster_invoker_test.go b/cluster/cluster_impl/base_cluster_invoker_test.go index d074697b85a3cf5b770de90da4847043d98c9df1..695ffcddbbce5a1c65f806b4561670d726588aaa 100644 --- a/cluster/cluster_impl/base_cluster_invoker_test.go +++ b/cluster/cluster_impl/base_cluster_invoker_test.go @@ -33,7 +33,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_StickyNormal(t *testing.T) { +func TestStickyNormal(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) @@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) { base := &baseClusterInvoker{} base.availablecheck = true invoked := []protocol.Invoker{} - result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) - result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) + + tmpRandomBalance := loadbalance.NewRandomLoadBalance() + tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil) + result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) + result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) assert.Equal(t, result, result1) } -func Test_StickyNormalWhenError(t *testing.T) { +func TestStickyNormalWhenError(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go index 9b5733e98b142759c3317f9cb3e3d3f08eea81e4..08d0002ee79b2f3fda5a50ce90747c0aaad91932 100644 --- a/cluster/cluster_impl/broadcast_cluster_invoker_test.go +++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go @@ -20,6 +20,7 @@ package cluster_impl import ( "context" "errors" + "fmt" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,10 +41,11 @@ import ( ) var ( - broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + broadcastUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) -func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { +func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) invokers := []protocol.Invoker{} @@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol return clusterInvoker } -func Test_BroadcastInvokeSuccess(t *testing.T) { +func TestBroadcastInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) } - clusterInvoker := registerBroadcast(t, invokers...) + clusterInvoker := registerBroadcast(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) } -func Test_BroadcastInvokeFailed(t *testing.T) { +func TestBroadcastInvokeFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) } - clusterInvoker := registerBroadcast(t, invokers...) + clusterInvoker := registerBroadcast(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockFailedResult.Err, result.Error()) diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go index 69418bc3b876f7c9375a2164d78bac2fcbb05043..0edb81d4285fa68ceefd96100b541ba334f95bda 100644 --- a/cluster/cluster_impl/failback_cluster_test.go +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "sync" "testing" "time" @@ -34,6 +35,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -41,11 +43,12 @@ import ( ) var ( - failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + failbackUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) // registerFailback register failbackCluster to cluster extension. -func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerFailback(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failbackCluster := NewFailbackCluster() @@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker } // success firstly, failback should return origin invoke result. -func Test_FailbackSuceess(t *testing.T) { +func TestFailbackSuceess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) { } // failed firstly, success later after one retry. -func Test_FailbackRetryOneSuccess(t *testing.T) { +func TestFailbackRetryOneSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { wg.Add(1) now := time.Now() mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} - invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= 5) wg.Done() @@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { } // failed firstly, and failed again after ech retry time. -func Test_FailbackRetryFailed(t *testing.T) { +func TestFailbackRetryFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) { // add retry call that eventually failed. for i := 0; i < retries; i++ { j := i + 1 - invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= int64(5*j)) wg.Done() @@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) { } // add 10 tasks but all failed firstly, and failed again with one retry. -func Test_FailbackRetryFailed10Times(t *testing.T) { +func TestFailbackRetryFailed10Times(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) clusterInvoker.maxRetries = 10 invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { var wg sync.WaitGroup wg.Add(10) now := time.Now() - invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= 5) wg.Done() @@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) } -func Test_FailbackOutOfLimit(t *testing.T) { +func TestFailbackOutOfLimit(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) clusterInvoker.failbackTasks = 1 invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go index c5ab7cd5410ea312e082f8064c13b2356c9b4bb4..77e8e9c5da73bfc8bcf08dbd90351bfd23d7e651 100644 --- a/cluster/cluster_impl/failfast_cluster_test.go +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,11 +41,12 @@ import ( ) var ( - failfastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + failfastUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) // registerFailfast register failfastCluster to cluster extension. -func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failfastCluster := NewFailFastCluster() @@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker return clusterInvoker } -func Test_FailfastInvokeSuccess(t *testing.T) { +func TestFailfastInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailfast(t, invoker) + clusterInvoker := registerFailfast(invoker) invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() @@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) { assert.Equal(t, 0, res.tried) } -func Test_FailfastInvokeFail(t *testing.T) { +func TestFailfastInvokeFail(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailfast(t, invoker) + clusterInvoker := registerFailfast(invoker) invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index 6178a05a1226ba629d2456ad6886b02a26288e45..66adabd1043d6e5d770704774dda22ba9e6faebe 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { } func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + var ( + result protocol.Result + invoked []protocol.Invoker + providers []string + ) invokers := invoker.directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) - - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } - loadbalance := getLoadBalance(invokers[0], invocation) - methodName := invocation.MethodName() - url := invokers[0].GetUrl() - - //get reties - retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) + retries := getRetries(invokers, methodName) + loadBalance := getLoadBalance(invokers[0], invocation) - //Get the service method loadbalance config if have - if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 { - retriesConfig = v - } - retries, err := strconv.Atoi(retriesConfig) - if err != nil || retries < 0 { - logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.") - retries = constant.DEFAULT_RETRIES_INT - } - invoked := []protocol.Invoker{} - providers := []string{} - var result protocol.Result - if retries > len(invokers) { - retries = len(invokers) - } for i := 0; i <= retries; i++ { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { - err := invoker.checkWhetherDestroyed() - if err != nil { + if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } + invokers = invoker.directory.List(invocation) - err = invoker.checkInvokers(invokers, invocation) - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } - ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } @@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr if result.Error() != nil { providers = append(providers, ivk.GetUrl().Key()) continue - } else { - return result } + return result } + ip, _ := gxnet.GetLocalIP() - return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+ - "the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", - methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(), - )} + invokerSvc := invoker.GetUrl().Service() + invokerUrl := invoker.directory.GetUrl() + return &protocol.RPCResult{ + Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", + methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(), + )} +} + +func getRetries(invokers []protocol.Invoker, methodName string) int { + if len(invokers) <= 0 { + return constant.DEFAULT_RETRIES_INT + } + + url := invokers[0].GetUrl() + //get reties + retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 { + retriesConfig = v + } + + retries, err := strconv.Atoi(retriesConfig) + if err != nil || retries < 0 { + logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.") + retries = constant.DEFAULT_RETRIES_INT + } + + if retries > len(invokers) { + retries = len(invokers) + } + return retries } diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go index ee7d48f3497772db3143b1ae62a30f66f99faa58..e05b79202cd202334db1c19421e3163ee28bac26 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() { var count int -func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { +func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failoverCluster := NewFailoverCluster() @@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) } -func Test_FailoverInvokeSuccess(t *testing.T) { +func TestFailoverInvokeSuccess(t *testing.T) { urlParams := url.Values{} - result := normalInvoke(t, 3, urlParams) + result := normalInvoke(3, urlParams) assert.NoError(t, result.Error()) count = 0 } -func Test_FailoverInvokeFail(t *testing.T) { +func TestFailoverInvokeFail(t *testing.T) { urlParams := url.Values{} - result := normalInvoke(t, 4, urlParams) + result := normalInvoke(4, urlParams) assert.Errorf(t, result.Error(), "error") count = 0 } -func Test_FailoverInvoke1(t *testing.T) { +func TestFailoverInvoke1(t *testing.T) { urlParams := url.Values{} urlParams.Set(constant.RETRIES_KEY, "3") - result := normalInvoke(t, 4, urlParams) + result := normalInvoke(4, urlParams) assert.NoError(t, result.Error()) count = 0 } -func Test_FailoverInvoke2(t *testing.T) { +func TestFailoverInvoke2(t *testing.T) { urlParams := url.Values{} urlParams.Set(constant.RETRIES_KEY, "2") urlParams.Set("methods.test."+constant.RETRIES_KEY, "3") ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) - result := normalInvoke(t, 4, urlParams, ivc) + result := normalInvoke(4, urlParams, ivc) assert.NoError(t, result.Error()) count = 0 } -func Test_FailoverDestroy(t *testing.T) { +func TestFailoverDestroy(t *testing.T) { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failoverCluster := NewFailoverCluster() @@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) { count = 0 clusterInvoker.Destroy() assert.Equal(t, false, clusterInvoker.IsAvailable()) - } diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go index 0bfeb576bd095508ef122c55c1345208c50eb339..d9a716e1ae65a84b605b4b7af1872b3a85dc9369 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,11 +41,12 @@ import ( ) var ( - failsafeUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + failsafeUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) // registerFailsafe register failsafeCluster to cluster extension. -func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failsafeCluster := NewFailsafeCluster() @@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker return clusterInvoker } -func Test_FailSafeInvokeSuccess(t *testing.T) { +func TestFailSafeInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailsafe(t, invoker) + clusterInvoker := registerFailsafe(invoker) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() @@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { assert.True(t, res.success) } -func Test_FailSafeInvokeFail(t *testing.T) { +func TestFailSafeInvokeFail(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailsafe(t, invoker) + clusterInvoker := registerFailsafe(invoker) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go index 732569416daea8f878569db143271139b791ceca..a5a3f2ec6605dfb843fab09dff0a53000bbc3298 100644 --- a/cluster/cluster_impl/forking_cluster_invoker.go +++ b/cluster/cluster_impl/forking_cluster_invoker.go @@ -46,14 +46,12 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { // Invoke ... func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - err := invoker.checkWhetherDestroyed() - if err != nil { + if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } invokers := invoker.directory.List(invocation) - err = invoker.checkInvokers(invokers, invocation) - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } @@ -63,11 +61,9 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro if forks < 0 || forks > len(invokers) { selected = invokers } else { - selected = make([]protocol.Invoker, 0) - loadbalance := getLoadBalance(invokers[0], invocation) + loadBalance := getLoadBalance(invokers[0], invocation) for i := 0; i < forks; i++ { - ivk := invoker.doSelect(loadbalance, invocation, invokers, selected) - if ivk != nil { + if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil { selected = append(selected, ivk) } } @@ -77,8 +73,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro for _, ivk := range selected { go func(k protocol.Invoker) { result := k.Invoke(ctx, invocation) - err := resultQ.Put(result) - if err != nil { + if err := resultQ.Put(result); err != nil { logger.Errorf("resultQ put failed with exception: %v.\n", err) } }(ivk) @@ -99,6 +94,5 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro if !ok { return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)} } - return result } diff --git a/cluster/cluster_impl/forking_cluster_test.go b/cluster/cluster_impl/forking_cluster_test.go index 526b137d71c46c166367ac3b3308f9ad5b941538..a2fa136d312db900f45449c92a59009c6661571c 100644 --- a/cluster/cluster_impl/forking_cluster_test.go +++ b/cluster/cluster_impl/forking_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "strconv" "sync" "testing" @@ -42,10 +43,11 @@ import ( ) var ( - forkingUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + forkingUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) -func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { +func registerForking(mockInvokers ...*mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance(loadbalance.RoundRobin, loadbalance.NewRoundRobinLoadBalance) invokers := []protocol.Invoker{} @@ -62,7 +64,7 @@ func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.I return clusterInvoker } -func Test_ForkingInvokeSuccess(t *testing.T) { +func TestForkingInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -79,20 +81,20 @@ func Test_ForkingInvokeSuccess(t *testing.T) { invokers = append(invokers, invoker) invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { wg.Done() return mockResult }) } - clusterInvoker := registerForking(t, invokers...) + clusterInvoker := registerForking(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) wg.Wait() } -func Test_ForkingInvokeTimeout(t *testing.T) { +func TestForkingInvokeTimeout(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -108,14 +110,14 @@ func Test_ForkingInvokeTimeout(t *testing.T) { invokers = append(invokers, invoker) invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { time.Sleep(2 * time.Second) wg.Done() return mockResult }) } - clusterInvoker := registerForking(t, invokers...) + clusterInvoker := registerForking(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NotNil(t, result) @@ -123,7 +125,7 @@ func Test_ForkingInvokeTimeout(t *testing.T) { wg.Wait() } -func Test_ForkingInvokeHalfTimeout(t *testing.T) { +func TestForkingInvokeHalfTimeout(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -140,13 +142,13 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) { invoker.EXPECT().IsAvailable().Return(true).AnyTimes() if i == 1 { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { wg.Done() return mockResult }) } else { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { time.Sleep(2 * time.Second) wg.Done() return mockResult @@ -154,7 +156,7 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) { } } - clusterInvoker := registerForking(t, invokers...) + clusterInvoker := registerForking(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go index 3d0dcc0159839eb0a08aed842ee084449458c645..74584b44800fce3342956f4237a63ffbbabf5544 100644 --- a/cluster/cluster_impl/registry_aware_cluster_test.go +++ b/cluster/cluster_impl/registry_aware_cluster_test.go @@ -33,7 +33,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_RegAwareInvokeSuccess(t *testing.T) { +func TestRegAwareInvokeSuccess(t *testing.T) { regAwareCluster := NewRegistryAwareCluster() diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go index 6dc55b39407c9e88d18a65b5ec02fa866571624b..8b60163b79b7120829e51f69238474a127133fb4 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base_directory_test.go @@ -19,6 +19,7 @@ package directory import ( "encoding/base64" + "fmt" "testing" ) @@ -33,19 +34,20 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +var ( + url, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) + anyUrl, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE)) +) + func TestNewBaseDirectory(t *testing.T) { - url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") directory := NewBaseDirectory(&url) - assert.NotNil(t, directory) - assert.Equal(t, url, directory.GetUrl()) assert.Equal(t, &url, directory.GetDirectoryUrl()) - } func TestBuildRouterChain(t *testing.T) { - url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") directory := NewBaseDirectory(&url) assert.NotNil(t, directory) @@ -62,9 +64,8 @@ func TestBuildRouterChain(t *testing.T) { } func getRouteUrl(rule string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") - url.AddParam("rule", rule) - url.AddParam("force", "true") - url.AddParam(constant.ROUTER_KEY, "router") + anyUrl.AddParam("rule", rule) + anyUrl.AddParam("force", "true") + anyUrl.AddParam(constant.ROUTER_KEY, "router") return &url } diff --git a/cluster/directory/static_directory_test.go b/cluster/directory/static_directory_test.go index c50c9a4063bd1a372c27e47687cbf63850f76cef..8e75a2c2535058f605c3e9bb6d6a01f9ff91032a 100644 --- a/cluster/directory/static_directory_test.go +++ b/cluster/directory/static_directory_test.go @@ -32,7 +32,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_StaticDirList(t *testing.T) { +func TestStaticDirList(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) @@ -45,7 +45,7 @@ func Test_StaticDirList(t *testing.T) { assert.Len(t, list, 10) } -func Test_StaticDirDestroy(t *testing.T) { +func TestStaticDirDestroy(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) diff --git a/cluster/loadbalance/least_active_test.go b/cluster/loadbalance/least_active_test.go index 54e57e930f17008cf6d767ef47c0e754ac85d8f7..34be17a4f311a374eefc56ba76885eef2a23645a 100644 --- a/cluster/loadbalance/least_active_test.go +++ b/cluster/loadbalance/least_active_test.go @@ -28,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -37,7 +38,7 @@ func TestLeastActiveSelect(t *testing.T) { var invokers []protocol.Invoker - url, _ := common.NewURL("dubbo://192.168.1.0:20000/org.apache.demo.HelloService") + url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) invokers = append(invokers, protocol.NewBaseInvoker(url)) i := loadBalance.Select(invokers, &invocation.RPCInvocation{}) assert.True(t, i.GetUrl().URLEqual(url)) diff --git a/cluster/loadbalance/random_test.go b/cluster/loadbalance/random_test.go index ff876f4aef8d229e8041594aaaa096f3ad5b1834..88392de52c93579dd4def3da2d60b415b601b21e 100644 --- a/cluster/loadbalance/random_test.go +++ b/cluster/loadbalance/random_test.go @@ -36,7 +36,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_RandomlbSelect(t *testing.T) { +func TestRandomlbSelect(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} @@ -53,7 +53,7 @@ func Test_RandomlbSelect(t *testing.T) { randomlb.Select(invokers, &invocation.RPCInvocation{}) } -func Test_RandomlbSelectWeight(t *testing.T) { +func TestRandomlbSelectWeight(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} @@ -84,7 +84,7 @@ func Test_RandomlbSelectWeight(t *testing.T) { }) } -func Test_RandomlbSelectWarmup(t *testing.T) { +func TestRandomlbSelectWarmup(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} diff --git a/cluster/loadbalance/round_robin_test.go b/cluster/loadbalance/round_robin_test.go index 1517f2a20b473af57cc23e61b988aa5a6a04de31..5354bae458605ff56ec8a9b35d36730ecdc0babb 100644 --- a/cluster/loadbalance/round_robin_test.go +++ b/cluster/loadbalance/round_robin_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -38,7 +39,8 @@ func TestRoundRobinSelect(t *testing.T) { var invokers []protocol.Invoker - url, _ := common.NewURL("dubbo://192.168.1.0:20000/org.apache.demo.HelloService") + url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService", + constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) invokers = append(invokers, protocol.NewBaseInvoker(url)) i := loadBalance.Select(invokers, &invocation.RPCInvocation{}) assert.True(t, i.GetUrl().URLEqual(url)) diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index 0cb47c4a185fe19b5f70ea4db2b80aab2f1aada5..c7a75f3d8608ecb7a95dcf33027e71b61d7f00f5 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -42,10 +42,16 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) +const ( + path = "/dubbo/config/dubbo/test-condition.condition-router" + zkPrefix = "zookeeper://127.0.0.1:" + anyUrl = "condition://0.0.0.0/com.foo.BarService" +) + func TestNewRouterChain(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) testyml := `enabled: true @@ -55,12 +61,12 @@ conditions: - => host != 172.22.3.91 ` - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + _, err = z.Conn.Set(path, []byte(testyml), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -92,10 +98,10 @@ func TestNewRouterChainURLNil(t *testing.T) { assert.NotNil(t, chain) } -func TestRouterChain_AddRouters(t *testing.T) { +func TestRouterChainAddRouters(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) testyml := `enabled: true @@ -105,12 +111,12 @@ conditions: - => host != 172.22.3.91 ` - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + _, err = z.Conn.Set(path, []byte(testyml), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -131,12 +137,12 @@ conditions: assert.Equal(t, 3, len(chain.routers)) } -func TestRouterChain_Route(t *testing.T) { +func TestRouterChainRoute(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -158,10 +164,10 @@ func TestRouterChain_Route(t *testing.T) { assert.Equal(t, 1, len(finalInvokers)) } -func TestRouterChain_Route_AppRouter(t *testing.T) { +func TestRouterChainRouteAppRouter(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) testyml := `enabled: true @@ -171,12 +177,12 @@ conditions: - => host = 1.1.1.1 => host != 1.2.3.4 ` - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + _, err = z.Conn.Set(path, []byte(testyml), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -200,7 +206,7 @@ func TestRouterChain_Route_NoRoute(t *testing.T) { defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -223,7 +229,7 @@ func TestRouterChain_Route_NoRoute(t *testing.T) { } func getConditionNoRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("application", applicationKey) url.AddParam("force", "true") rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host != 1.2.3.4")) @@ -232,7 +238,7 @@ func getConditionNoRouteUrl(applicationKey string) *common.URL { } func getConditionRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("application", applicationKey) url.AddParam("force", "true") rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host = 1.2.3.4")) @@ -241,7 +247,7 @@ func getConditionRouteUrl(applicationKey string) *common.URL { } func getRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("application", applicationKey) url.AddParam("force", "true") return &url diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index e99307625baf34fa6b744f168ff4e6cb8e042502..f37a483e8468bc57d3ce1e73172ccf9a05bc29f0 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) +const ( + path = "/dubbo/config/dubbo/test-condition.condition-router" +) + func TestNewAppRouter(t *testing.T) { testYML := `enabled: true @@ -47,10 +51,10 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + _, err = z.Conn.Set(path, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() @@ -93,10 +97,10 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + _, err = z.Conn.Set(path, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() @@ -130,10 +134,10 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + _, err = z.Conn.Set(path, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 99cec34096a55d3c2a967b63afdf5f6d0a77279a..a826cafb85ee1a30ac568db34e10dd2c9c9e87d0 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -38,6 +38,8 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) +const anyUrl = "condition://0.0.0.0/com.foo.BarService" + type MockInvoker struct { url common.URL available bool @@ -59,21 +61,21 @@ func (bi *MockInvoker) GetUrl() common.URL { } func getRouteUrl(rule string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("rule", rule) url.AddParam("force", "true") return &url } func getRouteUrlWithForce(rule, force string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("rule", rule) url.AddParam("force", force) return &url } func getRouteUrlWithNoForce(rule string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("rule", rule) return &url } @@ -116,7 +118,7 @@ func (bi *MockInvoker) Destroy() { bi.available = false } -func TestRoute_matchWhen(t *testing.T) { +func TestRouteMatchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) @@ -149,7 +151,7 @@ func TestRoute_matchWhen(t *testing.T) { assert.Equal(t, true, matchWhen6) } -func TestRoute_matchFilter(t *testing.T) { +func TestRouteMatchFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() t.Logf("The local ip is %s", localIP) url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") @@ -184,7 +186,7 @@ func TestRoute_matchFilter(t *testing.T) { } -func TestRoute_methodRoute(t *testing.T) { +func TestRouteMethodRoute(t *testing.T) { inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) @@ -207,7 +209,7 @@ func TestRoute_methodRoute(t *testing.T) { } -func TestRoute_ReturnFalse(t *testing.T) { +func TestRouteReturnFalse(t *testing.T) { url, _ := common.NewURL("") localIP, _ := gxnet.GetLocalIP() invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} @@ -219,7 +221,7 @@ func TestRoute_ReturnFalse(t *testing.T) { assert.Equal(t, 0, len(fileredInvokers)) } -func TestRoute_ReturnEmpty(t *testing.T) { +func TestRouteReturnEmpty(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url, _ := common.NewURL("") invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} @@ -231,7 +233,7 @@ func TestRoute_ReturnEmpty(t *testing.T) { assert.Equal(t, 0, len(fileredInvokers)) } -func TestRoute_ReturnAll(t *testing.T) { +func TestRouteReturnAll(t *testing.T) { localIP, _ := gxnet.GetLocalIP() urlString := "dubbo://" + localIP + "/com.foo.BarService" dubboURL, _ := common.NewURL(urlString) @@ -247,7 +249,7 @@ func TestRoute_ReturnAll(t *testing.T) { assert.Equal(t, invokers, fileredInvokers) } -func TestRoute_HostFilter(t *testing.T) { +func TestRouteHostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -266,7 +268,7 @@ func TestRoute_HostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_Empty_HostFilter(t *testing.T) { +func TestRouteEmptyHostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -285,7 +287,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_False_HostFilter(t *testing.T) { +func TestRouteFalseHostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -304,7 +306,7 @@ func TestRoute_False_HostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_Placeholder(t *testing.T) { +func TestRoutePlaceholder(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -323,7 +325,7 @@ func TestRoute_Placeholder(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_NoForce(t *testing.T) { +func TestRouteNoForce(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -340,7 +342,7 @@ func TestRoute_NoForce(t *testing.T) { assert.Equal(t, invokers, fileredInvokers) } -func TestRoute_Force(t *testing.T) { +func TestRouteForce(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 74aa3940743a012f907cfe3d8811a618f07ff800..8a95d9a7e8dffdc3f30f94c76274a729837fc133 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -37,7 +37,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) - invoker := NewMockInvoker(url, 1) + invoker := NewMockInvoker(url) healthy := hc.IsHealthy(invoker) assert.True(t, healthy) diff --git a/cluster/router/healthcheck/factory_test.go b/cluster/router/healthcheck/factory_test.go index a9d94da7c37f0e0c9640de1386998a85823e80a6..c3a26a93896e185f0dea3732ca5afcf7687ad5ea 100644 --- a/cluster/router/healthcheck/factory_test.go +++ b/cluster/router/healthcheck/factory_test.go @@ -35,7 +35,7 @@ type MockInvoker struct { url common.URL } -func NewMockInvoker(url common.URL, successCount int) *MockInvoker { +func NewMockInvoker(url common.URL) *MockInvoker { return &MockInvoker{ url: url, } diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 759ef93dbeb8d91a82eefd59060afbe8a10a4440..7bfffea705bfedade9d1d13ac7e9c380651335dd 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -44,9 +44,9 @@ func TestHealthCheckRouter_Route(t *testing.T) { hcr, _ := NewHealthCheckRouter(&consumerURL) var invokers []protocol.Invoker - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 1) - invoker3 := NewMockInvoker(url3, 1) + invoker1 := NewMockInvoker(url1) + invoker2 := NewMockInvoker(url2) + invoker3 := NewMockInvoker(url3) invokers = append(invokers, invoker1, invoker2, invoker3) inv := invocation.NewRPCInvocation("test", nil, nil) res := hcr.Route(invokers, &consumerURL, inv) diff --git a/common/constant/default.go b/common/constant/default.go index 3c889158e460031f06b9401008c80f55200a46e4..c69989b4fbc3e95cb42c7f5e403989b9cff9215b 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -43,6 +43,7 @@ const ( DEFAULT_FAILBACK_TASKS = 100 DEFAULT_REST_CLIENT = "resty" DEFAULT_REST_SERVER = "go-restful" + DEFAULT_PORT = 20000 ) const ( @@ -58,6 +59,7 @@ const ( const ( ANY_VALUE = "*" ANYHOST_VALUE = "0.0.0.0" + LOCAL_HOST_VALUE = "192.168.1.1" REMOVE_VALUE_PREFIX = "-" ) diff --git a/common/extension/config_center.go b/common/extension/config_center.go index 3cbced8d3bbcdb3dc7f9af800fa36681d6dc063d..5a2c52f32d070f5ec03bdae0b3cd47f869c28171 100644 --- a/common/extension/config_center.go +++ b/common/extension/config_center.go @@ -27,7 +27,7 @@ var ( ) // SetConfigCenter sets the DynamicConfiguration with @name -func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) { +func SetConfigCenter(name string, v func(*common.URL) (config_center.DynamicConfiguration, error)) { configCenters[name] = v } diff --git a/common/url_test.go b/common/url_test.go index 2372de520e88b0949023e88cec64871736dd6aa0..4d9dff9f373f5d2250deb577621cead8c991cf4d 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -118,6 +118,33 @@ func TestURL_URLEqual(t *testing.T) { u3, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0") assert.NoError(t, err) assert.False(t, u1.URLEqual(u3)) + + // urlGroupAnyValue's group is * + urlGroupAnyValue, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlGroupAnyValue)) + + // test for enabled + urlEnabledEmpty, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlEnabledEmpty)) + + urlEnabledFalse, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=1") + assert.NoError(t, err) + assert.False(t, u3.URLEqual(urlEnabledFalse)) + + urlEnabledTrue, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=true") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlEnabledTrue)) + + urlEnabledAny, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=*") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlEnabledAny)) + + // test for category + categoryAny, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=*&category=*") + assert.NoError(t, err) + assert.True(t, categoryAny.URLEqual(u3)) } func TestURL_GetParam(t *testing.T) {