diff --git a/before_ut.bat b/before_ut.bat index 5e1c877af229b2b30bffc8b802cc35b6aab6c80a..dc51008dadaad21af6fcb6021863ff4102b0afa2 100644 --- a/before_ut.bat +++ b/before_ut.bat @@ -20,7 +20,7 @@ set zkJarPath=remoting/zookeeper/zookeeper-4unittest/contrib/fatjar set zkJar=%zkJarPath%/%zkJarName% if not exist "%zkJar%" ( - md %zkJarPath% + md "%zkJarPath%" curl -L %remoteJarUrl% -o %zkJar% ) diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go index 61d1c934522008e4d9bc46bbd57eb6fed6bf00f9..063100020ad36192a051d1e736af7264cd8df42d 100644 --- a/cluster/cluster_impl/available_cluster_invoker_test.go +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "strings" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,7 +41,8 @@ import ( ) var ( - availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", + constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker { diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index cabd6c5f17cd3a3310054c0ff7b9a9877d581345..bbdfa715d7cdc461689e60a5a41171ad5c9770e1 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { var selectedInvoker protocol.Invoker + if len(invokers) <= 0 { + return selectedInvoker + } + url := invokers[0].GetUrl() sticky := url.GetParamBool(constant.STICKY_KEY, false) //Get the service method sticky config if have @@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p invoker.stickyInvoker = nil } - if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { - if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() { - return invoker.stickyInvoker - } + if sticky && invoker.availablecheck && + invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() && + (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { + return invoker.stickyInvoker } selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked) - if sticky { invoker.stickyInvoker = selectedInvoker } return selectedInvoker - } func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { diff --git a/cluster/cluster_impl/base_cluster_invoker_test.go b/cluster/cluster_impl/base_cluster_invoker_test.go index d074697b85a3cf5b770de90da4847043d98c9df1..695ffcddbbce5a1c65f806b4561670d726588aaa 100644 --- a/cluster/cluster_impl/base_cluster_invoker_test.go +++ b/cluster/cluster_impl/base_cluster_invoker_test.go @@ -33,7 +33,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_StickyNormal(t *testing.T) { +func TestStickyNormal(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) @@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) { base := &baseClusterInvoker{} base.availablecheck = true invoked := []protocol.Invoker{} - result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) - result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked) + + tmpRandomBalance := loadbalance.NewRandomLoadBalance() + tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil) + result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) + result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) assert.Equal(t, result, result1) } -func Test_StickyNormalWhenError(t *testing.T) { +func TestStickyNormalWhenError(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go index 9b5733e98b142759c3317f9cb3e3d3f08eea81e4..08d0002ee79b2f3fda5a50ce90747c0aaad91932 100644 --- a/cluster/cluster_impl/broadcast_cluster_invoker_test.go +++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go @@ -20,6 +20,7 @@ package cluster_impl import ( "context" "errors" + "fmt" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,10 +41,11 @@ import ( ) var ( - broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + broadcastUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) -func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { +func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) invokers := []protocol.Invoker{} @@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol return clusterInvoker } -func Test_BroadcastInvokeSuccess(t *testing.T) { +func TestBroadcastInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) } - clusterInvoker := registerBroadcast(t, invokers...) + clusterInvoker := registerBroadcast(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) } -func Test_BroadcastInvokeFailed(t *testing.T) { +func TestBroadcastInvokeFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) } - clusterInvoker := registerBroadcast(t, invokers...) + clusterInvoker := registerBroadcast(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockFailedResult.Err, result.Error()) diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go index 69418bc3b876f7c9375a2164d78bac2fcbb05043..0edb81d4285fa68ceefd96100b541ba334f95bda 100644 --- a/cluster/cluster_impl/failback_cluster_test.go +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "sync" "testing" "time" @@ -34,6 +35,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -41,11 +43,12 @@ import ( ) var ( - failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + failbackUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) // registerFailback register failbackCluster to cluster extension. -func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerFailback(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failbackCluster := NewFailbackCluster() @@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker } // success firstly, failback should return origin invoke result. -func Test_FailbackSuceess(t *testing.T) { +func TestFailbackSuceess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) { } // failed firstly, success later after one retry. -func Test_FailbackRetryOneSuccess(t *testing.T) { +func TestFailbackRetryOneSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { wg.Add(1) now := time.Now() mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} - invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= 5) wg.Done() @@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { } // failed firstly, and failed again after ech retry time. -func Test_FailbackRetryFailed(t *testing.T) { +func TestFailbackRetryFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) { // add retry call that eventually failed. for i := 0; i < retries; i++ { j := i + 1 - invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= int64(5*j)) wg.Done() @@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) { } // add 10 tasks but all failed firstly, and failed again with one retry. -func Test_FailbackRetryFailed10Times(t *testing.T) { +func TestFailbackRetryFailed10Times(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) clusterInvoker.maxRetries = 10 invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() @@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { var wg sync.WaitGroup wg.Add(10) now := time.Now() - invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= 5) wg.Done() @@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) } -func Test_FailbackOutOfLimit(t *testing.T) { +func TestFailbackOutOfLimit(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) clusterInvoker.failbackTasks = 1 invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go index c5ab7cd5410ea312e082f8064c13b2356c9b4bb4..77e8e9c5da73bfc8bcf08dbd90351bfd23d7e651 100644 --- a/cluster/cluster_impl/failfast_cluster_test.go +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,11 +41,12 @@ import ( ) var ( - failfastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + failfastUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) // registerFailfast register failfastCluster to cluster extension. -func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failfastCluster := NewFailFastCluster() @@ -57,12 +60,12 @@ func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker return clusterInvoker } -func Test_FailfastInvokeSuccess(t *testing.T) { +func TestFailfastInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailfast(t, invoker) + clusterInvoker := registerFailfast(invoker) invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() @@ -77,12 +80,12 @@ func Test_FailfastInvokeSuccess(t *testing.T) { assert.Equal(t, 0, res.tried) } -func Test_FailfastInvokeFail(t *testing.T) { +func TestFailfastInvokeFail(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailfast(t, invoker) + clusterInvoker := registerFailfast(invoker) invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index 6178a05a1226ba629d2456ad6886b02a26288e45..66adabd1043d6e5d770704774dda22ba9e6faebe 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -45,52 +45,35 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { } func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + var ( + result protocol.Result + invoked []protocol.Invoker + providers []string + ) invokers := invoker.directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) - - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } - loadbalance := getLoadBalance(invokers[0], invocation) - methodName := invocation.MethodName() - url := invokers[0].GetUrl() - - //get reties - retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) + retries := getRetries(invokers, methodName) + loadBalance := getLoadBalance(invokers[0], invocation) - //Get the service method loadbalance config if have - if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 { - retriesConfig = v - } - retries, err := strconv.Atoi(retriesConfig) - if err != nil || retries < 0 { - logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.") - retries = constant.DEFAULT_RETRIES_INT - } - invoked := []protocol.Invoker{} - providers := []string{} - var result protocol.Result - if retries > len(invokers) { - retries = len(invokers) - } for i := 0; i <= retries; i++ { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { - err := invoker.checkWhetherDestroyed() - if err != nil { + if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } + invokers = invoker.directory.List(invocation) - err = invoker.checkInvokers(invokers, invocation) - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } - ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } @@ -100,13 +83,40 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr if result.Error() != nil { providers = append(providers, ivk.GetUrl().Key()) continue - } else { - return result } + return result } + ip, _ := gxnet.GetLocalIP() - return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+ - "the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", - methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(), - )} + invokerSvc := invoker.GetUrl().Service() + invokerUrl := invoker.directory.GetUrl() + return &protocol.RPCResult{ + Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", + methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(), + )} +} + +func getRetries(invokers []protocol.Invoker, methodName string) int { + if len(invokers) <= 0 { + return constant.DEFAULT_RETRIES_INT + } + + url := invokers[0].GetUrl() + //get reties + retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 { + retriesConfig = v + } + + retries, err := strconv.Atoi(retriesConfig) + if err != nil || retries < 0 { + logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.") + retries = constant.DEFAULT_RETRIES_INT + } + + if retries > len(invokers) { + retries = len(invokers) + } + return retries } diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go index ee7d48f3497772db3143b1ae62a30f66f99faa58..e05b79202cd202334db1c19421e3163ee28bac26 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -101,7 +101,7 @@ func (bi *MockInvoker) Destroy() { var count int -func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { +func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failoverCluster := NewFailoverCluster() @@ -119,40 +119,40 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) } -func Test_FailoverInvokeSuccess(t *testing.T) { +func TestFailoverInvokeSuccess(t *testing.T) { urlParams := url.Values{} - result := normalInvoke(t, 3, urlParams) + result := normalInvoke(3, urlParams) assert.NoError(t, result.Error()) count = 0 } -func Test_FailoverInvokeFail(t *testing.T) { +func TestFailoverInvokeFail(t *testing.T) { urlParams := url.Values{} - result := normalInvoke(t, 4, urlParams) + result := normalInvoke(4, urlParams) assert.Errorf(t, result.Error(), "error") count = 0 } -func Test_FailoverInvoke1(t *testing.T) { +func TestFailoverInvoke1(t *testing.T) { urlParams := url.Values{} urlParams.Set(constant.RETRIES_KEY, "3") - result := normalInvoke(t, 4, urlParams) + result := normalInvoke(4, urlParams) assert.NoError(t, result.Error()) count = 0 } -func Test_FailoverInvoke2(t *testing.T) { +func TestFailoverInvoke2(t *testing.T) { urlParams := url.Values{} urlParams.Set(constant.RETRIES_KEY, "2") urlParams.Set("methods.test."+constant.RETRIES_KEY, "3") ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) - result := normalInvoke(t, 4, urlParams, ivc) + result := normalInvoke(4, urlParams, ivc) assert.NoError(t, result.Error()) count = 0 } -func Test_FailoverDestroy(t *testing.T) { +func TestFailoverDestroy(t *testing.T) { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failoverCluster := NewFailoverCluster() @@ -170,5 +170,4 @@ func Test_FailoverDestroy(t *testing.T) { count = 0 clusterInvoker.Destroy() assert.Equal(t, false, clusterInvoker.IsAvailable()) - } diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go index 0bfeb576bd095508ef122c55c1345208c50eb339..d9a716e1ae65a84b605b4b7af1872b3a85dc9369 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "testing" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/cluster/loadbalance" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -39,11 +41,12 @@ import ( ) var ( - failsafeUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + failsafeUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) // registerFailsafe register failsafeCluster to cluster extension. -func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failsafeCluster := NewFailsafeCluster() @@ -57,12 +60,12 @@ func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker return clusterInvoker } -func Test_FailSafeInvokeSuccess(t *testing.T) { +func TestFailSafeInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailsafe(t, invoker) + clusterInvoker := registerFailsafe(invoker) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() @@ -76,12 +79,12 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { assert.True(t, res.success) } -func Test_FailSafeInvokeFail(t *testing.T) { +func TestFailSafeInvokeFail(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailsafe(t, invoker) + clusterInvoker := registerFailsafe(invoker) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go index 732569416daea8f878569db143271139b791ceca..a5a3f2ec6605dfb843fab09dff0a53000bbc3298 100644 --- a/cluster/cluster_impl/forking_cluster_invoker.go +++ b/cluster/cluster_impl/forking_cluster_invoker.go @@ -46,14 +46,12 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { // Invoke ... func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - err := invoker.checkWhetherDestroyed() - if err != nil { + if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } invokers := invoker.directory.List(invocation) - err = invoker.checkInvokers(invokers, invocation) - if err != nil { + if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } @@ -63,11 +61,9 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro if forks < 0 || forks > len(invokers) { selected = invokers } else { - selected = make([]protocol.Invoker, 0) - loadbalance := getLoadBalance(invokers[0], invocation) + loadBalance := getLoadBalance(invokers[0], invocation) for i := 0; i < forks; i++ { - ivk := invoker.doSelect(loadbalance, invocation, invokers, selected) - if ivk != nil { + if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil { selected = append(selected, ivk) } } @@ -77,8 +73,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro for _, ivk := range selected { go func(k protocol.Invoker) { result := k.Invoke(ctx, invocation) - err := resultQ.Put(result) - if err != nil { + if err := resultQ.Put(result); err != nil { logger.Errorf("resultQ put failed with exception: %v.\n", err) } }(ivk) @@ -99,6 +94,5 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro if !ok { return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)} } - return result } diff --git a/cluster/cluster_impl/forking_cluster_test.go b/cluster/cluster_impl/forking_cluster_test.go index 526b137d71c46c166367ac3b3308f9ad5b941538..a2fa136d312db900f45449c92a59009c6661571c 100644 --- a/cluster/cluster_impl/forking_cluster_test.go +++ b/cluster/cluster_impl/forking_cluster_test.go @@ -19,6 +19,7 @@ package cluster_impl import ( "context" + "fmt" "strconv" "sync" "testing" @@ -42,10 +43,11 @@ import ( ) var ( - forkingUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") + forkingUrl, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) ) -func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { +func registerForking(mockInvokers ...*mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance(loadbalance.RoundRobin, loadbalance.NewRoundRobinLoadBalance) invokers := []protocol.Invoker{} @@ -62,7 +64,7 @@ func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.I return clusterInvoker } -func Test_ForkingInvokeSuccess(t *testing.T) { +func TestForkingInvokeSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -79,20 +81,20 @@ func Test_ForkingInvokeSuccess(t *testing.T) { invokers = append(invokers, invoker) invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { wg.Done() return mockResult }) } - clusterInvoker := registerForking(t, invokers...) + clusterInvoker := registerForking(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) wg.Wait() } -func Test_ForkingInvokeTimeout(t *testing.T) { +func TestForkingInvokeTimeout(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -108,14 +110,14 @@ func Test_ForkingInvokeTimeout(t *testing.T) { invokers = append(invokers, invoker) invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { time.Sleep(2 * time.Second) wg.Done() return mockResult }) } - clusterInvoker := registerForking(t, invokers...) + clusterInvoker := registerForking(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NotNil(t, result) @@ -123,7 +125,7 @@ func Test_ForkingInvokeTimeout(t *testing.T) { wg.Wait() } -func Test_ForkingInvokeHalfTimeout(t *testing.T) { +func TestForkingInvokeHalfTimeout(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -140,13 +142,13 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) { invoker.EXPECT().IsAvailable().Return(true).AnyTimes() if i == 1 { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { wg.Done() return mockResult }) } else { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( - func(invocation protocol.Invocation) protocol.Result { + func(protocol.Invocation) protocol.Result { time.Sleep(2 * time.Second) wg.Done() return mockResult @@ -154,7 +156,7 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) { } } - clusterInvoker := registerForking(t, invokers...) + clusterInvoker := registerForking(invokers...) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go index 3d0dcc0159839eb0a08aed842ee084449458c645..74584b44800fce3342956f4237a63ffbbabf5544 100644 --- a/cluster/cluster_impl/registry_aware_cluster_test.go +++ b/cluster/cluster_impl/registry_aware_cluster_test.go @@ -33,7 +33,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_RegAwareInvokeSuccess(t *testing.T) { +func TestRegAwareInvokeSuccess(t *testing.T) { regAwareCluster := NewRegistryAwareCluster() diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go index 6dc55b39407c9e88d18a65b5ec02fa866571624b..8b60163b79b7120829e51f69238474a127133fb4 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base_directory_test.go @@ -19,6 +19,7 @@ package directory import ( "encoding/base64" + "fmt" "testing" ) @@ -33,19 +34,20 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +var ( + url, _ = common.NewURL( + fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) + anyUrl, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.ANYHOST_VALUE)) +) + func TestNewBaseDirectory(t *testing.T) { - url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") directory := NewBaseDirectory(&url) - assert.NotNil(t, directory) - assert.Equal(t, url, directory.GetUrl()) assert.Equal(t, &url, directory.GetDirectoryUrl()) - } func TestBuildRouterChain(t *testing.T) { - url, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") directory := NewBaseDirectory(&url) assert.NotNil(t, directory) @@ -62,9 +64,8 @@ func TestBuildRouterChain(t *testing.T) { } func getRouteUrl(rule string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") - url.AddParam("rule", rule) - url.AddParam("force", "true") - url.AddParam(constant.ROUTER_KEY, "router") + anyUrl.AddParam("rule", rule) + anyUrl.AddParam("force", "true") + anyUrl.AddParam(constant.ROUTER_KEY, "router") return &url } diff --git a/cluster/directory/static_directory_test.go b/cluster/directory/static_directory_test.go index c50c9a4063bd1a372c27e47687cbf63850f76cef..8e75a2c2535058f605c3e9bb6d6a01f9ff91032a 100644 --- a/cluster/directory/static_directory_test.go +++ b/cluster/directory/static_directory_test.go @@ -32,7 +32,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_StaticDirList(t *testing.T) { +func TestStaticDirList(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) @@ -45,7 +45,7 @@ func Test_StaticDirList(t *testing.T) { assert.Len(t, list, 10) } -func Test_StaticDirDestroy(t *testing.T) { +func TestStaticDirDestroy(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) diff --git a/cluster/loadbalance/least_active_test.go b/cluster/loadbalance/least_active_test.go index 54e57e930f17008cf6d767ef47c0e754ac85d8f7..34be17a4f311a374eefc56ba76885eef2a23645a 100644 --- a/cluster/loadbalance/least_active_test.go +++ b/cluster/loadbalance/least_active_test.go @@ -28,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -37,7 +38,7 @@ func TestLeastActiveSelect(t *testing.T) { var invokers []protocol.Invoker - url, _ := common.NewURL("dubbo://192.168.1.0:20000/org.apache.demo.HelloService") + url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) invokers = append(invokers, protocol.NewBaseInvoker(url)) i := loadBalance.Select(invokers, &invocation.RPCInvocation{}) assert.True(t, i.GetUrl().URLEqual(url)) diff --git a/cluster/loadbalance/random_test.go b/cluster/loadbalance/random_test.go index ff876f4aef8d229e8041594aaaa096f3ad5b1834..88392de52c93579dd4def3da2d60b415b601b21e 100644 --- a/cluster/loadbalance/random_test.go +++ b/cluster/loadbalance/random_test.go @@ -36,7 +36,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) -func Test_RandomlbSelect(t *testing.T) { +func TestRandomlbSelect(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} @@ -53,7 +53,7 @@ func Test_RandomlbSelect(t *testing.T) { randomlb.Select(invokers, &invocation.RPCInvocation{}) } -func Test_RandomlbSelectWeight(t *testing.T) { +func TestRandomlbSelectWeight(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} @@ -84,7 +84,7 @@ func Test_RandomlbSelectWeight(t *testing.T) { }) } -func Test_RandomlbSelectWarmup(t *testing.T) { +func TestRandomlbSelectWarmup(t *testing.T) { randomlb := NewRandomLoadBalance() invokers := []protocol.Invoker{} diff --git a/cluster/loadbalance/round_robin_test.go b/cluster/loadbalance/round_robin_test.go index 1517f2a20b473af57cc23e61b988aa5a6a04de31..5354bae458605ff56ec8a9b35d36730ecdc0babb 100644 --- a/cluster/loadbalance/round_robin_test.go +++ b/cluster/loadbalance/round_robin_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -38,7 +39,8 @@ func TestRoundRobinSelect(t *testing.T) { var invokers []protocol.Invoker - url, _ := common.NewURL("dubbo://192.168.1.0:20000/org.apache.demo.HelloService") + url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService", + constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) invokers = append(invokers, protocol.NewBaseInvoker(url)) i := loadBalance.Select(invokers, &invocation.RPCInvocation{}) assert.True(t, i.GetUrl().URLEqual(url)) diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index 0cb47c4a185fe19b5f70ea4db2b80aab2f1aada5..c7a75f3d8608ecb7a95dcf33027e71b61d7f00f5 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -42,10 +42,16 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) +const ( + path = "/dubbo/config/dubbo/test-condition.condition-router" + zkPrefix = "zookeeper://127.0.0.1:" + anyUrl = "condition://0.0.0.0/com.foo.BarService" +) + func TestNewRouterChain(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) testyml := `enabled: true @@ -55,12 +61,12 @@ conditions: - => host != 172.22.3.91 ` - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + _, err = z.Conn.Set(path, []byte(testyml), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -92,10 +98,10 @@ func TestNewRouterChainURLNil(t *testing.T) { assert.NotNil(t, chain) } -func TestRouterChain_AddRouters(t *testing.T) { +func TestRouterChainAddRouters(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) testyml := `enabled: true @@ -105,12 +111,12 @@ conditions: - => host != 172.22.3.91 ` - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + _, err = z.Conn.Set(path, []byte(testyml), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -131,12 +137,12 @@ conditions: assert.Equal(t, 3, len(chain.routers)) } -func TestRouterChain_Route(t *testing.T) { +func TestRouterChainRoute(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -158,10 +164,10 @@ func TestRouterChain_Route(t *testing.T) { assert.Equal(t, 1, len(finalInvokers)) } -func TestRouterChain_Route_AppRouter(t *testing.T) { +func TestRouterChainRouteAppRouter(t *testing.T) { ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) testyml := `enabled: true @@ -171,12 +177,12 @@ conditions: - => host = 1.1.1.1 => host != 1.2.3.4 ` - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + _, err = z.Conn.Set(path, []byte(testyml), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -200,7 +206,7 @@ func TestRouterChain_Route_NoRoute(t *testing.T) { defer ts.Stop() defer z.Close() - zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + zkUrl, _ := common.NewURL(zkPrefix + strconv.Itoa(ts.Servers[0].Port)) configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) config.GetEnvInstance().SetDynamicConfiguration(configuration) @@ -223,7 +229,7 @@ func TestRouterChain_Route_NoRoute(t *testing.T) { } func getConditionNoRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("application", applicationKey) url.AddParam("force", "true") rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host != 1.2.3.4")) @@ -232,7 +238,7 @@ func getConditionNoRouteUrl(applicationKey string) *common.URL { } func getConditionRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("application", applicationKey) url.AddParam("force", "true") rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host = 1.2.3.4")) @@ -241,7 +247,7 @@ func getConditionRouteUrl(applicationKey string) *common.URL { } func getRouteUrl(applicationKey string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("application", applicationKey) url.AddParam("force", "true") return &url diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index e99307625baf34fa6b744f168ff4e6cb8e042502..f37a483e8468bc57d3ce1e73172ccf9a05bc29f0 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) +const ( + path = "/dubbo/config/dubbo/test-condition.condition-router" +) + func TestNewAppRouter(t *testing.T) { testYML := `enabled: true @@ -47,10 +51,10 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + _, err = z.Conn.Set(path, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() @@ -93,10 +97,10 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + _, err = z.Conn.Set(path, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() @@ -130,10 +134,10 @@ conditions: ` ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) assert.NoError(t, err) - err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + err = z.Create(path) assert.NoError(t, err) - _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + _, err = z.Conn.Set(path, []byte(testYML), 0) assert.NoError(t, err) defer ts.Stop() defer z.Close() diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 99cec34096a55d3c2a967b63afdf5f6d0a77279a..a826cafb85ee1a30ac568db34e10dd2c9c9e87d0 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -38,6 +38,8 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" ) +const anyUrl = "condition://0.0.0.0/com.foo.BarService" + type MockInvoker struct { url common.URL available bool @@ -59,21 +61,21 @@ func (bi *MockInvoker) GetUrl() common.URL { } func getRouteUrl(rule string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("rule", rule) url.AddParam("force", "true") return &url } func getRouteUrlWithForce(rule, force string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("rule", rule) url.AddParam("force", force) return &url } func getRouteUrlWithNoForce(rule string) *common.URL { - url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url, _ := common.NewURL(anyUrl) url.AddParam("rule", rule) return &url } @@ -116,7 +118,7 @@ func (bi *MockInvoker) Destroy() { bi.available = false } -func TestRoute_matchWhen(t *testing.T) { +func TestRouteMatchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) @@ -149,7 +151,7 @@ func TestRoute_matchWhen(t *testing.T) { assert.Equal(t, true, matchWhen6) } -func TestRoute_matchFilter(t *testing.T) { +func TestRouteMatchFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() t.Logf("The local ip is %s", localIP) url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") @@ -184,7 +186,7 @@ func TestRoute_matchFilter(t *testing.T) { } -func TestRoute_methodRoute(t *testing.T) { +func TestRouteMethodRoute(t *testing.T) { inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) @@ -207,7 +209,7 @@ func TestRoute_methodRoute(t *testing.T) { } -func TestRoute_ReturnFalse(t *testing.T) { +func TestRouteReturnFalse(t *testing.T) { url, _ := common.NewURL("") localIP, _ := gxnet.GetLocalIP() invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} @@ -219,7 +221,7 @@ func TestRoute_ReturnFalse(t *testing.T) { assert.Equal(t, 0, len(fileredInvokers)) } -func TestRoute_ReturnEmpty(t *testing.T) { +func TestRouteReturnEmpty(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url, _ := common.NewURL("") invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} @@ -231,7 +233,7 @@ func TestRoute_ReturnEmpty(t *testing.T) { assert.Equal(t, 0, len(fileredInvokers)) } -func TestRoute_ReturnAll(t *testing.T) { +func TestRouteReturnAll(t *testing.T) { localIP, _ := gxnet.GetLocalIP() urlString := "dubbo://" + localIP + "/com.foo.BarService" dubboURL, _ := common.NewURL(urlString) @@ -247,7 +249,7 @@ func TestRoute_ReturnAll(t *testing.T) { assert.Equal(t, invokers, fileredInvokers) } -func TestRoute_HostFilter(t *testing.T) { +func TestRouteHostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -266,7 +268,7 @@ func TestRoute_HostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_Empty_HostFilter(t *testing.T) { +func TestRouteEmptyHostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -285,7 +287,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_False_HostFilter(t *testing.T) { +func TestRouteFalseHostFilter(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -304,7 +306,7 @@ func TestRoute_False_HostFilter(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_Placeholder(t *testing.T) { +func TestRoutePlaceholder(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -323,7 +325,7 @@ func TestRoute_Placeholder(t *testing.T) { assert.Equal(t, invoker3, fileredInvokers[1]) } -func TestRoute_NoForce(t *testing.T) { +func TestRouteNoForce(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) @@ -340,7 +342,7 @@ func TestRoute_NoForce(t *testing.T) { assert.Equal(t, invokers, fileredInvokers) } -func TestRoute_Force(t *testing.T) { +func TestRouteForce(t *testing.T) { localIP, _ := gxnet.GetLocalIP() url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService") url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP)) diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 74aa3940743a012f907cfe3d8811a618f07ff800..8a95d9a7e8dffdc3f30f94c76274a729837fc133 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -37,7 +37,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) - invoker := NewMockInvoker(url, 1) + invoker := NewMockInvoker(url) healthy := hc.IsHealthy(invoker) assert.True(t, healthy) diff --git a/cluster/router/healthcheck/factory_test.go b/cluster/router/healthcheck/factory_test.go index a9d94da7c37f0e0c9640de1386998a85823e80a6..c3a26a93896e185f0dea3732ca5afcf7687ad5ea 100644 --- a/cluster/router/healthcheck/factory_test.go +++ b/cluster/router/healthcheck/factory_test.go @@ -35,7 +35,7 @@ type MockInvoker struct { url common.URL } -func NewMockInvoker(url common.URL, successCount int) *MockInvoker { +func NewMockInvoker(url common.URL) *MockInvoker { return &MockInvoker{ url: url, } diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 759ef93dbeb8d91a82eefd59060afbe8a10a4440..7bfffea705bfedade9d1d13ac7e9c380651335dd 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -44,9 +44,9 @@ func TestHealthCheckRouter_Route(t *testing.T) { hcr, _ := NewHealthCheckRouter(&consumerURL) var invokers []protocol.Invoker - invoker1 := NewMockInvoker(url1, 1) - invoker2 := NewMockInvoker(url2, 1) - invoker3 := NewMockInvoker(url3, 1) + invoker1 := NewMockInvoker(url1) + invoker2 := NewMockInvoker(url2) + invoker3 := NewMockInvoker(url3) invokers = append(invokers, invoker1, invoker2, invoker3) inv := invocation.NewRPCInvocation("test", nil, nil) res := hcr.Route(invokers, &consumerURL, inv) diff --git a/common/constant/default.go b/common/constant/default.go index 3c889158e460031f06b9401008c80f55200a46e4..c69989b4fbc3e95cb42c7f5e403989b9cff9215b 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -43,6 +43,7 @@ const ( DEFAULT_FAILBACK_TASKS = 100 DEFAULT_REST_CLIENT = "resty" DEFAULT_REST_SERVER = "go-restful" + DEFAULT_PORT = 20000 ) const ( @@ -58,6 +59,7 @@ const ( const ( ANY_VALUE = "*" ANYHOST_VALUE = "0.0.0.0" + LOCAL_HOST_VALUE = "192.168.1.1" REMOVE_VALUE_PREFIX = "-" ) diff --git a/common/constant/key.go b/common/constant/key.go index 9af387193e2ad47f5f6adb6f7c88eec6244467b2..5be63fe82ce61d41b0845e2e076e48c3aebcd0cb 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -119,6 +119,7 @@ const ( CONFIG_CLUSTER_KEY = "config.cluster" CONFIG_CHECK_KEY = "config.check" CONFIG_TIMEOUT_KET = "config.timeout" + CONFIG_LOG_DIR_KEY = "config.logDir" CONFIG_VERSION_KEY = "configVersion" COMPATIBLE_CONFIG_KEY = "compatible_config" ) diff --git a/common/extension/auth.go b/common/extension/auth.go index 7caae00e84fd80666ff79b599e12f8516e23209c..4fca0a8e8c255456720df9e4fd9852295715b160 100644 --- a/common/extension/auth.go +++ b/common/extension/auth.go @@ -32,7 +32,7 @@ func SetAuthenticator(name string, fcn func() filter.Authenticator) { } // GetAuthenticator finds the Authenticator with @name -// if not found, it will panic +// Panic if not found func GetAuthenticator(name string) filter.Authenticator { if authenticators[name] == nil { panic("authenticator for " + name + " is not existing, make sure you have import the package.") @@ -46,7 +46,7 @@ func SetAccesskeyStorages(name string, fcn func() filter.AccessKeyStorage) { } // GetAccesskeyStorages finds the storage with the @name. -// If not found, it will panic. +// Panic if not found func GetAccesskeyStorages(name string) filter.AccessKeyStorage { if accesskeyStorages[name] == nil { panic("accesskeyStorages for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_center.go b/common/extension/config_center.go index 3cbced8d3bbcdb3dc7f9af800fa36681d6dc063d..5a2c52f32d070f5ec03bdae0b3cd47f869c28171 100644 --- a/common/extension/config_center.go +++ b/common/extension/config_center.go @@ -27,7 +27,7 @@ var ( ) // SetConfigCenter sets the DynamicConfiguration with @name -func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) { +func SetConfigCenter(name string, v func(*common.URL) (config_center.DynamicConfiguration, error)) { configCenters[name] = v } diff --git a/common/url_test.go b/common/url_test.go index 2372de520e88b0949023e88cec64871736dd6aa0..4d9dff9f373f5d2250deb577621cead8c991cf4d 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -118,6 +118,33 @@ func TestURL_URLEqual(t *testing.T) { u3, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0") assert.NoError(t, err) assert.False(t, u1.URLEqual(u3)) + + // urlGroupAnyValue's group is * + urlGroupAnyValue, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlGroupAnyValue)) + + // test for enabled + urlEnabledEmpty, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlEnabledEmpty)) + + urlEnabledFalse, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=1") + assert.NoError(t, err) + assert.False(t, u3.URLEqual(urlEnabledFalse)) + + urlEnabledTrue, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=true") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlEnabledTrue)) + + urlEnabledAny, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=*") + assert.NoError(t, err) + assert.True(t, u3.URLEqual(urlEnabledAny)) + + // test for category + categoryAny, err := NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=*&version=2.6.0&enabled=*&category=*") + assert.NoError(t, err) + assert.True(t, categoryAny.URLEqual(u3)) } func TestURL_GetParam(t *testing.T) { diff --git a/config/config_center_config.go b/config/config_center_config.go index 56d575e204d77d57204f839ca59fd1559fb1e33d..c9133dc26df0b05e3bb61df0f612d0e2914e98bb 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -46,6 +46,7 @@ type ConfigCenterConfig struct { Group string `default:"dubbo" yaml:"group" json:"group,omitempty"` Username string `yaml:"username" json:"username,omitempty"` Password string `yaml:"password" json:"password,omitempty"` + LogDir string `yaml:"log_dir" json:"log_dir,omitempty"` ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"` Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty"` AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` @@ -73,5 +74,6 @@ func (c *ConfigCenterConfig) GetUrlMap() url.Values { urlMap.Set(constant.CONFIG_GROUP_KEY, c.Group) urlMap.Set(constant.CONFIG_CLUSTER_KEY, c.Cluster) urlMap.Set(constant.CONFIG_APP_ID_KEY, c.AppId) + urlMap.Set(constant.CONFIG_LOG_DIR_KEY, c.LogDir) return urlMap } diff --git a/config_center/dynamic_configuration_test.go b/config_center/dynamic_configuration_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8b5a8d7cc5afea8559d563f82f5b2dd80c51488e --- /dev/null +++ b/config_center/dynamic_configuration_test.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config_center + +import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +func TestWithTimeout(t *testing.T) { + fa := WithTimeout(12 * time.Second) + opt := &Options{} + fa(opt) + assert.Equal(t, 12*time.Second, opt.Timeout) +} + +func TestGetRuleKey(t *testing.T) { + url, err := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider?interface=test&group=groupA&version=0") + assert.NoError(t, err) + assert.Equal(t, "test:0:groupA", GetRuleKey(url)) +} diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index d3373e249bf99873dd3aa05b7488b0e7f38730ec..3b432819f43327888ade3da5303e445d6a2ef0fe 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -18,6 +18,7 @@ package nacos import ( + "path/filepath" "strconv" "strings" "sync" @@ -36,7 +37,8 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -const logDir = "logs/nacos/log" +// Nacos Log dir, it can be override when creating client by config_center.log_dir +var logDir = filepath.Join("logs", "nacos", "log") // NacosClient Nacos client type NacosClient struct { @@ -87,6 +89,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error { } url := container.GetUrl() + logDir = url.GetParam(constant.CONFIG_LOG_DIR_KEY, logDir) if container.NacosClient() == nil { //in dubbo ,every registry only connect one node ,so this is []string{r.Address} diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go index ef63eeff6ddf4e5cd6fa2ba7da7996b3dbed94ac..53ab325c955517a859dab0df358a0387179351d3 100644 --- a/config_center/nacos/client_test.go +++ b/config_center/nacos/client_test.go @@ -53,3 +53,62 @@ func Test_newNacosClient(t *testing.T) { <-c.client.Done() c.Destroy() } + +func Test_setNacosClient(t *testing.T) { + server := mockCommonNacosServer() + nacosURL := server.Listener.Addr().String() + registryUrl, _ := common.NewURL(nacosURL) + c := &nacosDynamicConfiguration{ + url: ®istryUrl, + done: make(chan struct{}), + } + var client *NacosClient + client = &NacosClient{ + name: nacosClientName, + NacosAddrs: []string{nacosURL}, + Timeout: 15, + exit: make(chan struct{}), + onceClose: func() { + close(client.exit) + }, + } + c.SetNacosClient(client) + err := ValidateNacosClient(c, WithNacosName(nacosClientName)) + assert.NoError(t, err) + c.wg.Add(1) + go HandleClientRestart(c) + go func() { + // c.client.Close() and <-c.client.Done() have order requirements. + // If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)" + // sets c.client to nil before calling c.client.Done(). + time.Sleep(time.Second) + c.client.Close() + }() + <-c.client.Done() + c.Destroy() +} + +func Test_newNacosClient_connectError(t *testing.T) { + nacosURL := "registry://127.0.0.1:8888" + registryUrl, err := common.NewURL(nacosURL) + assert.NoError(t, err) + c := &nacosDynamicConfiguration{ + url: ®istryUrl, + done: make(chan struct{}), + } + err = ValidateNacosClient(c, WithNacosName(nacosClientName)) + assert.NoError(t, err) + c.wg.Add(1) + go HandleClientRestart(c) + go func() { + // c.client.Close() and <-c.client.Done() have order requirements. + // If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)" + // sets c.client to nil before calling c.client.Done(). + time.Sleep(time.Second) + c.client.Close() + }() + <-c.client.Done() + // let client do retry + time.Sleep(5 * time.Second) + c.Destroy() +} diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index 4032c91cda512b649140db6eea3dc11eeb482f27..ebaad1c19ddda1e00bd0f22ebd8c7c3cb57615a6 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -72,12 +72,13 @@ func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { server := mockCommonNacosServer() nacosURL := strings.ReplaceAll(server.URL, "http", "registry") regurl, _ := common.NewURL(nacosURL) - nacosConfiguration, err := newNacosDynamicConfiguration(®url) + factory := &nacosDynamicConfigurationFactory{} + nacosConfiguration, err := factory.GetDynamicConfiguration(®url) assert.NoError(t, err) nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) - return nacosConfiguration, err + return nacosConfiguration.(*nacosDynamicConfiguration), err } func Test_GetConfig(t *testing.T) { diff --git a/config_center/parser/configuration_parser_test.go b/config_center/parser/configuration_parser_test.go index 7a59ea9b48b8b8d2a84735a416bcba1bb9ec8652..3ba10f73a4549181f37b89aedd4aedf4612bd7d4 100644 --- a/config_center/parser/configuration_parser_test.go +++ b/config_center/parser/configuration_parser_test.go @@ -32,3 +32,58 @@ func TestDefaultConfigurationParser_Parser(t *testing.T) { assert.Equal(t, 2, len(m)) assert.Equal(t, "172.0.0.1", m["dubbo.registry.address"]) } + +func TestDefaultConfigurationParser_appItemToUrls_ParserToUrls(t *testing.T) { + parser := &DefaultConfigurationParser{} + content := `configVersion: 2.7.1 +scope: application +key: org.apache.dubbo-go.mockService +enabled: true +configs: +- type: application + enabled: true + addresses: + - 0.0.0.0 + providerAddresses: [] + services: + - org.apache.dubbo-go.mockService + applications: [] + parameters: + cluster: mock1 + side: provider` + urls, err := parser.ParseToUrls(content) + assert.NoError(t, err) + assert.Equal(t, 1, len(urls)) + assert.Equal(t, "org.apache.dubbo-go.mockService", urls[0].GetParam("application", "")) + assert.Equal(t, "mock1", urls[0].GetParam("cluster", "")) + assert.Equal(t, "override", urls[0].Protocol) + assert.Equal(t, "0.0.0.0", urls[0].Location) +} + +func TestDefaultConfigurationParser_serviceItemToUrls_ParserToUrls(t *testing.T) { + parser := &DefaultConfigurationParser{} + content := `configVersion: 2.7.1 +scope: notApplication +key: groupA/test:1 +enabled: true +configs: +- type: application + enabled: true + addresses: + - 0.0.0.0 + providerAddresses: [] + services: + - org.apache.dubbo-go.mockService + applications: [] + parameters: + cluster: mock1 + side: provider` + urls, err := parser.ParseToUrls(content) + assert.NoError(t, err) + assert.Equal(t, 1, len(urls)) + assert.Equal(t, "groupA", urls[0].GetParam("group", "")) + assert.Equal(t, "/test", urls[0].Path) + assert.Equal(t, "mock1", urls[0].GetParam("cluster", "")) + assert.Equal(t, "override", urls[0].Protocol) + assert.Equal(t, "0.0.0.0", urls[0].Location) +} diff --git a/filter/access_key.go b/filter/access_key.go index 40d4157b31d13ed8fd8b1ba8cc9d16b53638ac6a..4801d64fe46461424c5dac5aef2eebc719ee19c4 100644 --- a/filter/access_key.go +++ b/filter/access_key.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// AccessKeyPair stores the basic attributes for authentication. type AccessKeyPair struct { AccessKey string `yaml:"accessKey" json:"accessKey,omitempty" property:"accessKey"` SecretKey string `yaml:"secretKey" json:"secretKey,omitempty" property:"secretKey"` @@ -31,8 +32,7 @@ type AccessKeyPair struct { Options string `yaml:"options" json:"options,omitempty" property:"options"` } -// AccessKeyStorage -// This SPI Extension support us to store our AccessKeyPair or load AccessKeyPair from other +// AccessKeyStorage supports us to store our AccessKeyPair or load AccessKeyPair from other // storage, such as filesystem. type AccessKeyStorage interface { GetAccessKeyPair(protocol.Invocation, *common.URL) *AccessKeyPair diff --git a/filter/authenticator.go b/filter/authenticator.go index ac2c8601d4a0d2e5ae3aed56415d9d23856cb502..71f659d4918293e2eb05b8b7a72b6db1cece42ba 100644 --- a/filter/authenticator.go +++ b/filter/authenticator.go @@ -22,14 +22,13 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// Authenticator +// Authenticator defines how an Authenticator works. +// Custom Authenticator must be set by calling auth.SetAuthenticator before use. type Authenticator interface { - // Sign - // give a sign to request + // Sign adds signature to the invocation Sign(protocol.Invocation, *common.URL) error - // Authenticate - // verify the signature of the request is valid or not + // Authenticate verifies the signature of the request Authenticate(protocol.Invocation, *common.URL) error } diff --git a/filter/filter.go b/filter/filter.go index c069510498c7ac68b2bb2169dfe7132a4ef63229..d20ca72c345c6812f4bce6df5dbaf683429a9874 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -24,9 +24,11 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// Filter +// Filter interface defines the functions of a filter // Extension - Filter type Filter interface { + // Invoke is the core function of a filter, it determins the process of the filter Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result + // OnResponse updates the results from Invoke and then returns the modified results. OnResponse(context.Context, protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result } diff --git a/filter/filter_impl/active_filter.go b/filter/filter_impl/active_filter.go index 23f2c8e25609dff89392107251715fe6f5175f09..795de968b57207830cc15fc8a0476bfdc3d2cb43 100644 --- a/filter/filter_impl/active_filter.go +++ b/filter/filter_impl/active_filter.go @@ -39,11 +39,11 @@ func init() { extension.SetFilter(active, GetActiveFilter) } -// ActiveFilter ... +// ActiveFilter tracks the requests status type ActiveFilter struct { } -// Invoke ... +// Invoke starts to record the requests status func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments())) invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10)) @@ -51,7 +51,7 @@ func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in return invoker.Invoke(ctx, invocation) } -// OnResponse ... +// OnResponse update the active count base on the request result. func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64) if err != nil { @@ -64,7 +64,7 @@ func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, return result } -// GetActiveFilter ... +// GetActiveFilter creates ActiveFilter instance func GetActiveFilter() filter.Filter { return &ActiveFilter{} } diff --git a/filter/filter_impl/auth/accesskey_storage.go b/filter/filter_impl/auth/accesskey_storage.go index 5adb9d9ee37329228d1d02dc8802deeede68d327..90d3efb5ad897b874c89745740637804808b5133 100644 --- a/filter/filter_impl/auth/accesskey_storage.go +++ b/filter/filter_impl/auth/accesskey_storage.go @@ -25,13 +25,11 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// DefaultAccesskeyStorage -// The default implementation of AccesskeyStorage +// DefaultAccesskeyStorage is the default implementation of AccesskeyStorage type DefaultAccesskeyStorage struct { } -// GetAccessKeyPair -// get AccessKeyPair from url by the key "accessKeyId" and "secretAccessKey" +// GetAccessKeyPair retrieves AccessKeyPair from url by the key "accessKeyId" and "secretAccessKey" func (storage *DefaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Invocation, url *common.URL) *filter.AccessKeyPair { return &filter.AccessKeyPair{ AccessKey: url.GetParam(constant.ACCESS_KEY_ID_KEY, ""), @@ -43,6 +41,7 @@ func init() { extension.SetAccesskeyStorages(constant.DEFAULT_ACCESS_KEY_STORAGE, GetDefaultAccesskeyStorage) } +// GetDefaultAccesskeyStorage initiates an empty DefaultAccesskeyStorage func GetDefaultAccesskeyStorage() filter.AccessKeyStorage { return &DefaultAccesskeyStorage{} } diff --git a/filter/filter_impl/auth/consumer_sign.go b/filter/filter_impl/auth/consumer_sign.go index 062744771acf8ccd505265875a103d24afeb06af..945cf3e6e7e728042b5422174162dd5aded50361 100644 --- a/filter/filter_impl/auth/consumer_sign.go +++ b/filter/filter_impl/auth/consumer_sign.go @@ -29,8 +29,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// ConsumerSignFilter -// This filter is working for signing the request on consumer side +// ConsumerSignFilter signs the request on consumer side type ConsumerSignFilter struct { } @@ -38,6 +37,7 @@ func init() { extension.SetFilter(constant.CONSUMER_SIGN_FILTER, getConsumerSignFilter) } +// Invoke retrieves the configured Authenticator to add signature to invocation func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking ConsumerSign filter.") url := invoker.GetUrl() @@ -52,6 +52,7 @@ func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invo return invoker.Invoke(ctx, invocation) } +// OnResponse dummy process, returns the result directly func (csf *ConsumerSignFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/auth/default_authenticator.go b/filter/filter_impl/auth/default_authenticator.go index 2b8d55927807407f350ecc6cfc28b6913a6d1a81..5b86fc148e42b6364bcb0752c31bfbbc3cfa2b9d 100644 --- a/filter/filter_impl/auth/default_authenticator.go +++ b/filter/filter_impl/auth/default_authenticator.go @@ -37,13 +37,11 @@ func init() { extension.SetAuthenticator(constant.DEFAULT_AUTHENTICATOR, GetDefaultAuthenticator) } -// DefaultAuthenticator -// The default implemetation of Authenticator +// DefaultAuthenticator is the default implementation of Authenticator type DefaultAuthenticator struct { } -// Sign -// add the signature for the invocation +// Sign adds the signature to the invocation func (authenticator *DefaultAuthenticator) Sign(invocation protocol.Invocation, url *common.URL) error { currentTimeMillis := strconv.Itoa(int(time.Now().Unix() * 1000)) @@ -84,8 +82,7 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st return signature, nil } -// Authenticate -// This method verifies whether the signature sent by the requester is correct +// Authenticate verifies whether the signature sent by the requester is correct func (authenticator *DefaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error { accessKeyId := invocation.AttachmentsByKey(constant.AK_KEY, "") @@ -122,6 +119,7 @@ func getAccessKeyPair(invocation protocol.Invocation, url *common.URL) (*filter. } } +// GetDefaultAuthenticator creates an empty DefaultAuthenticator instance func GetDefaultAuthenticator() filter.Authenticator { return &DefaultAuthenticator{} } diff --git a/filter/filter_impl/auth/provider_auth.go b/filter/filter_impl/auth/provider_auth.go index 0d5772e5508894111a88443bfe2d1b02ebfac54a..d5f5db300d4e7c94978d5d52e32f741f7d27bb48 100644 --- a/filter/filter_impl/auth/provider_auth.go +++ b/filter/filter_impl/auth/provider_auth.go @@ -29,8 +29,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// ProviderAuthFilter -// This filter is used to verify the correctness of the signature on provider side +// ProviderAuthFilter verifies the correctness of the signature on provider side type ProviderAuthFilter struct { } @@ -38,6 +37,7 @@ func init() { extension.SetFilter(constant.PROVIDER_AUTH_FILTER, getProviderAuthFilter) } +// Invoke retrieves the configured Authenticator to verify the signature in an invocation func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking providerAuth filter.") url := invoker.GetUrl() @@ -55,6 +55,7 @@ func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invo return invoker.Invoke(ctx, invocation) } +// OnResponse dummy process, returns the result directly func (paf *ProviderAuthFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/auth/sign_util.go b/filter/filter_impl/auth/sign_util.go index 043a549a849dde66712e1bef389dd91a024660df..45170bb8117284275a87a3a57d14ce68d6cc4e9c 100644 --- a/filter/filter_impl/auth/sign_util.go +++ b/filter/filter_impl/auth/sign_util.go @@ -26,12 +26,12 @@ import ( "strings" ) -// Sign -// get a signature string with given information, such as metadata or parameters +// Sign gets a signature string with given bytes func Sign(metadata, key string) string { return doSign([]byte(metadata), key) } +// SignWithParams returns a signature with giving params and metadata. func SignWithParams(params []interface{}, metadata, key string) (string, error) { if params == nil || len(params) == 0 { return Sign(metadata, key), nil @@ -61,6 +61,7 @@ func doSign(bytes []byte, key string) string { return base64.URLEncoding.EncodeToString(signature) } +// IsEmpty verify whether the inputted string is empty func IsEmpty(s string, allowSpace bool) bool { if len(s) == 0 { return true diff --git a/filter/filter_impl/echo_filter.go b/filter/filter_impl/echo_filter.go index a12800a21a8ebe4545b4a8b5bd0f8a30c1462105..7da5ec7029ea698b1bf1a14ad36123fbec3aacf7 100644 --- a/filter/filter_impl/echo_filter.go +++ b/filter/filter_impl/echo_filter.go @@ -38,13 +38,13 @@ func init() { extension.SetFilter(ECHO, GetFilter) } -// EchoFilter +// EchoFilter health check // RPCService need a Echo method in consumer, if you want to use EchoFilter // eg: // Echo func(ctx context.Context, arg interface{}, rsp *Xxx) error type EchoFilter struct{} -// Invoke ... +// Invoke response to the callers with its first argument. func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking echo filter.") logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) @@ -58,7 +58,7 @@ func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invo return invoker.Invoke(ctx, invocation) } -// OnResponse ... +// OnResponse dummy process, returns the result directly func (ef *EchoFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { diff --git a/filter/filter_impl/execute_limit_filter.go b/filter/filter_impl/execute_limit_filter.go index 434c378045456eb13317e0a48630ebd33f244c05..bfc5096ca089867f6e6234089e387d3f9b48a3aa 100644 --- a/filter/filter_impl/execute_limit_filter.go +++ b/filter/filter_impl/execute_limit_filter.go @@ -45,9 +45,8 @@ func init() { extension.SetFilter(name, GetExecuteLimitFilter) } +// ExecuteLimitFilter will limit the number of in-progress request and it's thread-safe. /** - * ExecuteLimitFilter - * The filter will limit the number of in-progress request and it's thread-safe. * example: * "UserProvider": * registry: "hangzhouzk" @@ -80,7 +79,7 @@ type ExecuteState struct { concurrentCount int64 } -// Invoke ... +// Invoke judges whether the current processing requests over the threshold func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { methodConfigPrefix := "methods." + invocation.MethodName() + "." ivkURL := invoker.GetUrl() @@ -122,7 +121,7 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok return invoker.Invoke(ctx, invocation) } -// OnResponse ... +// OnResponse dummy process, returns the result directly func (ef *ExecuteLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { return result } @@ -138,7 +137,7 @@ func (state *ExecuteState) decrease() { var executeLimitOnce sync.Once var executeLimitFilter *ExecuteLimitFilter -// GetExecuteLimitFilter ... +// GetExecuteLimitFilter returns the singleton ExecuteLimitFilter instance func GetExecuteLimitFilter() filter.Filter { executeLimitOnce.Do(func() { executeLimitFilter = &ExecuteLimitFilter{ diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go index 9bc131ef8903942b84df2b8fc14fd11143d1a7b6..3f4d714e6b0cbdf48f5e1afce3222a18857041f9 100644 --- a/filter/filter_impl/generic_filter.go +++ b/filter/filter_impl/generic_filter.go @@ -50,7 +50,7 @@ func init() { // GenericFilter ... type GenericFilter struct{} -// Invoke ... +// Invoke turns the parameters to map for generic method func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { oldArguments := invocation.Arguments() @@ -73,13 +73,13 @@ func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i return invoker.Invoke(ctx, invocation) } -// OnResponse ... +// OnResponse dummy process, returns the result directly func (ef *GenericFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { return result } -// GetGenericFilter ... +// GetGenericFilter returns GenericFilter instance func GetGenericFilter() filter.Filter { return &GenericFilter{} } diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/filter_impl/graceful_shutdown_filter.go index 95e625b2d56895a4d57823e4e0e2e7d1d5e90a08..4a4e8ce466edabe82815b99244404ac024d73b26 100644 --- a/filter/filter_impl/graceful_shutdown_filter.go +++ b/filter/filter_impl/graceful_shutdown_filter.go @@ -53,6 +53,7 @@ type gracefulShutdownFilter struct { shutdownConfig *config.ShutdownConfig } +// Invoke adds the requests count and block the new requests if application is closing func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if gf.rejectNewRequest() { logger.Info("The application is closing, new request will be rejected.") @@ -62,6 +63,7 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I return invoker.Invoke(ctx, invocation) } +// OnResponse reduces the number of active processes then return the process result func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { atomic.AddInt32(&gf.activeCount, -1) // although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true. diff --git a/filter/filter_impl/hystrix_filter.go b/filter/filter_impl/hystrix_filter.go index 4c872bed3e7fef8eca47f51422525a4918d6c1d8..711ef71c44192c5a1d76783a3b3d4cbd0b97632c 100644 --- a/filter/filter_impl/hystrix_filter.go +++ b/filter/filter_impl/hystrix_filter.go @@ -55,14 +55,14 @@ var ( //The filter in the server end of dubbo-go can't get the invoke result for now, //this filter ONLY works in CLIENT end (consumer side) temporarily -//Only after the callService logic is integrated into the filter chain of server end can this filter be used, +//Only after the callService logic is integrated into the filter chain of server end then the filter can be used, //which will be done soon func init() { extension.SetFilter(HYSTRIX_CONSUMER, GetHystrixFilterConsumer) extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider) } -// HystrixFilterError ... +// HystrixFilterError implements error interface type HystrixFilterError struct { err error failByHystrix bool @@ -72,12 +72,12 @@ func (hfError *HystrixFilterError) Error() string { return hfError.err.Error() } -// FailByHystrix ... +// FailByHystrix returns whether the fails causing by Hystrix func (hfError *HystrixFilterError) FailByHystrix() bool { return hfError.failByHystrix } -// NewHystrixFilterError ... +// NewHystrixFilterError return a HystrixFilterError instance func NewHystrixFilterError(err error, failByHystrix bool) error { return &HystrixFilterError{ err: err, @@ -92,7 +92,7 @@ type HystrixFilter struct { ifNewMap sync.Map } -// Invoke ... +// Invoke is an implentation of filter, provides Hystrix pattern latency and fault tolerance func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName()) @@ -154,12 +154,12 @@ func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i return result } -// OnResponse ... +// OnResponse dummy process, returns the result directly func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } -// GetHystrixFilterConsumer ... +// GetHystrixFilterConsumer returns HystrixFilter instance for consumer func GetHystrixFilterConsumer() filter.Filter { //When first called, load the config in consumerConfigOnce.Do(func() { @@ -170,7 +170,7 @@ func GetHystrixFilterConsumer() filter.Filter { return &HystrixFilter{COrP: true} } -// GetHystrixFilterProvider ... +// GetHystrixFilterProvider returns HystrixFilter instance for provider func GetHystrixFilterProvider() filter.Filter { providerConfigOnce.Do(func() { if err := initHystrixConfigProvider(); err != nil { diff --git a/filter/filter_impl/token_filter.go b/filter/filter_impl/token_filter.go index 8ec3929b6ddc8dcfa430204cd22d2f6d297c59d3..23742c66e94d9ecfc09d004441a54aad86ef049e 100644 --- a/filter/filter_impl/token_filter.go +++ b/filter/filter_impl/token_filter.go @@ -42,10 +42,10 @@ func init() { extension.SetFilter(TOKEN, GetTokenFilter) } -// TokenFilter ... +// TokenFilter will verify if the token is valid type TokenFilter struct{} -// Invoke ... +// Invoke verifies the incoming token with the service configured token func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { invokerTkn := invoker.GetUrl().GetParam(constant.TOKEN_KEY, "") if len(invokerTkn) > 0 { @@ -61,7 +61,7 @@ func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, inv return invoker.Invoke(ctx, invocation) } -// OnResponse ... +// OnResponse dummy process, returns the result directly func (tf *TokenFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go b/filter/filter_impl/tps/tps_limit_fix_window_strategy.go index a9c2ac15a417ffa6ff8f5b8d78d5c6a94877db30..7419a4576122d4db334969b0711666b5b2816e60 100644 --- a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_fix_window_strategy.go @@ -39,8 +39,8 @@ func init() { extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, creator) } +// FixedWindowTpsLimitStrategyImpl implements the TPS limit strategy base on requests count during the interval /** - * FixedWindowTpsLimitStrategyImpl * It's the same as default implementation in Java * It's not a thread-safe implementation. * It you want to use the thread-safe implementation, please use ThreadSafeFixedWindowTpsLimitStrategyImpl @@ -65,7 +65,8 @@ type FixedWindowTpsLimitStrategyImpl struct { timestamp int64 } -// IsAllowable ... +// IsAllowable determines if the requests over the TPS limit within the interval. +// It is not thread-safe. func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { current := time.Now().UnixNano() @@ -82,6 +83,7 @@ func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { type fixedWindowStrategyCreator struct{} +// Create returns a FixedWindowTpsLimitStrategyImpl instance with pre-configured limit rate and interval func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { return &FixedWindowTpsLimitStrategyImpl{ rate: int32(rate), diff --git a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go b/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go index a781cc7bfbf297d0b9cf84ca0aa9dcfbbef7e14b..cbbba19fff65be222cb895dcbe9b2e4d02082985 100644 --- a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go @@ -32,8 +32,8 @@ func init() { extension.SetTpsLimitStrategy("slidingWindow", &slidingWindowStrategyCreator{}) } +// SlidingWindowTpsLimitStrategyImpl implements a thread-safe TPS limit strategy base on requests count. /** - * SlidingWindowTpsLimitStrategyImpl * it's thread-safe. * "UserProvider": * registry: "hangzhouzk" @@ -54,7 +54,8 @@ type SlidingWindowTpsLimitStrategyImpl struct { queue *list.List } -// IsAllowable ... +// IsAllowable determins whether the number of requests within the time window overs the threshold +// It is thread-safe. func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { impl.mutex.Lock() defer impl.mutex.Unlock() @@ -84,6 +85,7 @@ func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { type slidingWindowStrategyCreator struct{} +// Create returns SlidingWindowTpsLimitStrategyImpl instance with configured limit rate and interval func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { return &SlidingWindowTpsLimitStrategyImpl{ rate: rate, diff --git a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go b/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go index 16624836e6397df5adda3f2aa5a80966721a97fb..f78cd8211cd076dcab84759e2bf784d080c72a1c 100644 --- a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go @@ -32,10 +32,9 @@ func init() { }) } +// ThreadSafeFixedWindowTpsLimitStrategyImpl is the thread-safe implementation. +// It's also a thread-safe decorator of FixedWindowTpsLimitStrategyImpl /** - * ThreadSafeFixedWindowTpsLimitStrategyImpl - * it's the thread-safe implementation. - * Also, it's a thread-safe decorator of FixedWindowTpsLimitStrategyImpl * "UserProvider": * registry: "hangzhouzk" * protocol : "dubbo" @@ -53,7 +52,7 @@ type ThreadSafeFixedWindowTpsLimitStrategyImpl struct { fixedWindow *FixedWindowTpsLimitStrategyImpl } -// IsAllowable ... +// IsAllowable implements thread-safe then run the FixedWindowTpsLimitStrategy func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool { impl.mutex.Lock() defer impl.mutex.Unlock() @@ -64,6 +63,7 @@ type threadSafeFixedWindowStrategyCreator struct { fixedWindowStrategyCreator *fixedWindowStrategyCreator } +// Create returns ThreadSafeFixedWindowTpsLimitStrategyImpl instance func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { fixedWindowStrategy := creator.fixedWindowStrategyCreator.Create(rate, interval).(*FixedWindowTpsLimitStrategyImpl) return &ThreadSafeFixedWindowTpsLimitStrategyImpl{ diff --git a/filter/filter_impl/tps/tps_limiter_method_service.go b/filter/filter_impl/tps/tps_limiter_method_service.go index 2d44c688ebd460f60a49da0f148fd7c6e2b79f50..5761579a38a22500d54193a9564170cc0215cf0f 100644 --- a/filter/filter_impl/tps/tps_limiter_method_service.go +++ b/filter/filter_impl/tps/tps_limiter_method_service.go @@ -44,9 +44,8 @@ func init() { extension.SetTpsLimiter(name, GetMethodServiceTpsLimiter) } +// MethodServiceTpsLimiterImpl allows developer to config both method-level and service-level tps limiter. /** - * MethodServiceTpsLimiterImpl - * This implementation allows developer to config both method-level and service-level tps limiter. * for example: * "UserProvider": * registry: "hangzhouzk" diff --git a/filter/filter_impl/tps_limit_filter.go b/filter/filter_impl/tps_limit_filter.go index fa78288f9678d67d0eb0d025a83b75493f7fda80..ea1e3bc15e1952799227d712db114ff790527720 100644 --- a/filter/filter_impl/tps_limit_filter.go +++ b/filter/filter_impl/tps_limit_filter.go @@ -39,8 +39,8 @@ func init() { extension.SetFilter(TpsLimitFilterKey, GetTpsLimitFilter) } +// TpsLimitFilter filters the requests by TPS /** - * TpsLimitFilter * if you wish to use the TpsLimiter, please add the configuration into your service provider configuration: * for example: * "UserProvider": @@ -56,7 +56,7 @@ func init() { type TpsLimitFilter struct { } -// Invoke ... +// Invoke gets the configured limter to impose TPS limiting func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { url := invoker.GetUrl() tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "") @@ -72,13 +72,13 @@ func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in return invoker.Invoke(ctx, invocation) } -// OnResponse ... +// OnResponse dummy process, returns the result directly func (t TpsLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { return result } -// GetTpsLimitFilter ... +// GetTpsLimitFilter returns an TpsLimitFilter instance. func GetTpsLimitFilter() filter.Filter { return &TpsLimitFilter{} } diff --git a/filter/handler/rejected_execution_handler_only_log.go b/filter/handler/rejected_execution_handler_only_log.go index fe9cf4869f16e1d7c136e7f48e4138d046fcb057..52ac1765f78172c0062de8884198e759b8d494ca 100644 --- a/filter/handler/rejected_execution_handler_only_log.go +++ b/filter/handler/rejected_execution_handler_only_log.go @@ -44,8 +44,8 @@ func init() { var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler var onlyLogHandlerOnce sync.Once +// OnlyLogRejectedExecutionHandler implements the RejectedExecutionHandler /** - * OnlyLogRejectedExecutionHandler * This implementation only logs the invocation info. * it always return en error inside the result. * "UserProvider": diff --git a/filter/rejected_execution_handler.go b/filter/rejected_execution_handler.go index d02481b98d2937ac58d277becdb1240b8a4e9b0f..3d1e1c1e641a836411ce0f71f97acf5d5a55f6d1 100644 --- a/filter/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -22,8 +22,8 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// RejectedExecutionHandler defines the handler to handle exceptions from invoking filters. /** - * RejectedExecutionHandler * If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter, * the implementation will be used. * The common case is that sometimes you want to return the default value when the request was rejected. diff --git a/filter/tps_limit_strategy.go b/filter/tps_limit_strategy.go index e194f1da06b0599464f0c66f6b7747fc78fbedce..2ee876a0b340ed1f87b94c35b149b548371e2bf9 100644 --- a/filter/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -17,8 +17,8 @@ package filter +// TpsLimitStrategy defines how to do the TPS limiting in method level. /* - * TpsLimitStrategy * please register your implementation by invoking SetTpsLimitStrategy * "UserProvider": * registry: "hangzhouzk" @@ -37,7 +37,7 @@ type TpsLimitStrategy interface { IsAllowable() bool } -// TpsLimitStrategyCreator, the creator abstraction for TpsLimitStrategy +// TpsLimitStrategyCreator is the creator abstraction for TpsLimitStrategy type TpsLimitStrategyCreator interface { // Create will create an instance of TpsLimitStrategy // It will be a little hard to understand this method. diff --git a/filter/tps_limiter.go b/filter/tps_limiter.go index 531eb098232cd34a467d71882b29858c07f88aef..8385d7b5d84a420b54df6bf51e32a35d17e1b249 100644 --- a/filter/tps_limiter.go +++ b/filter/tps_limiter.go @@ -22,8 +22,8 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// TpsLimiter defines the Limiter that judge if the TPS overs the threshold /* - * TpsLimiter * please register your implementation by invoking SetTpsLimiter * The usage, for example: * "UserProvider": diff --git a/metadata/identifier/base_metadata_identifier.go b/metadata/identifier/base_metadata_identifier.go index a314671055be523844fd7d8f9589b8b6031632bc..5f3df4c607e69d2b56e1258d081c148524cd7aca 100644 --- a/metadata/identifier/base_metadata_identifier.go +++ b/metadata/identifier/base_metadata_identifier.go @@ -25,11 +25,13 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// BaseMetadataIdentifier defined for description the Metadata base identify type BaseMetadataIdentifier interface { getFilePathKey(params ...string) string getIdentifierKey(params ...string) string } +// BaseMetadataIdentifier is the base implement of BaseMetadataIdentifier interface type BaseServiceMetadataIdentifier struct { serviceInterface string version string @@ -37,7 +39,7 @@ type BaseServiceMetadataIdentifier struct { side string } -// joinParams... +// joinParams will join the specified char in slice, and return a string func joinParams(joinChar string, params []string) string { var joinedStr string for _, param := range params { @@ -47,7 +49,7 @@ func joinParams(joinChar string, params []string) string { return joinedStr } -// getIdentifierKey... +// getIdentifierKey returns string that format is service:Version:Group:Side:param1:param2... func (mdi *BaseServiceMetadataIdentifier) getIdentifierKey(params ...string) string { return mdi.serviceInterface + constant.KEY_SEPARATOR + mdi.version + @@ -56,7 +58,7 @@ func (mdi *BaseServiceMetadataIdentifier) getIdentifierKey(params ...string) str joinParams(constant.KEY_SEPARATOR, params) } -// getFilePathKey... +// getFilePathKey returns string that format is metadata/path/Version/Group/Side/param1/param2... func (mdi *BaseServiceMetadataIdentifier) getFilePathKey(params ...string) string { path := serviceToPath(mdi.serviceInterface) @@ -69,7 +71,6 @@ func (mdi *BaseServiceMetadataIdentifier) getFilePathKey(params ...string) strin } -// serviceToPath... func serviceToPath(serviceInterface string) string { if serviceInterface == constant.ANY_VALUE { return "" @@ -83,7 +84,6 @@ func serviceToPath(serviceInterface string) string { } -//withPathSeparator... func withPathSeparator(path string) string { if len(path) != 0 { path = constant.PATH_SEPARATOR + path diff --git a/metadata/identifier/metadata_identifier.go b/metadata/identifier/metadata_identifier.go index f3df8f36546093a826279c4e9ec1546f78d444bd..7e72c10da9c088ca167fa4fbc4dcb57f44b8c06d 100644 --- a/metadata/identifier/metadata_identifier.go +++ b/metadata/identifier/metadata_identifier.go @@ -17,17 +17,18 @@ package identifier +// MetadataIdentifier is inherit baseMetaIdentifier with Application name type MetadataIdentifier struct { application string BaseMetadataIdentifier } -// getIdentifierKey... +// GetIdentifierKey returns string that format is service:Version:Group:Side:Application func (mdi *MetadataIdentifier) getIdentifierKey(params ...string) string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.application) } -// getIdentifierKey... +// GetFilePathKey returns string that format is metadata/path/Version/Group/Side/Application func (mdi *MetadataIdentifier) getFilePathKey(params ...string) string { return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.application) } diff --git a/metadata/identifier/service_metadata_identifier.go b/metadata/identifier/service_metadata_identifier.go index 373df0130dd1f87e3175918bde50060c4be89616..ccc149f7306c125b19a25373d4da660a154cc84e 100644 --- a/metadata/identifier/service_metadata_identifier.go +++ b/metadata/identifier/service_metadata_identifier.go @@ -21,18 +21,19 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ServiceMetadataIdentifier is inherit baseMetaIdentifier with service params: Revision and Protocol type ServiceMetadataIdentifier struct { revision string protocol string BaseMetadataIdentifier } -// getIdentifierKey... +// GetIdentifierKey returns string that format is service:Version:Group:Side:Protocol:"revision"+Revision func (mdi *ServiceMetadataIdentifier) getIdentifierKey(params ...string) string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.protocol + constant.KEY_REVISON_PREFIX + mdi.revision) } -// getIdentifierKey... +// GetFilePathKey returns string that format is metadata/path/Version/Group/Side/Protocol/"revision"+Revision func (mdi *ServiceMetadataIdentifier) getFilePathKey(params ...string) string { return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.protocol + constant.KEY_REVISON_PREFIX + mdi.revision) } diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go index 321a216a3e3ad3f2390ab832782924a81e226160..38f3ebbd462338b581d83cd19403a00a5064b5a4 100644 --- a/metadata/identifier/subscribe_metadata_identifier.go +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -17,17 +17,18 @@ package identifier +// SubscriberMetadataIdentifier is inherit baseMetaIdentifier with service params: Revision type SubscriberMetadataIdentifier struct { revision string BaseMetadataIdentifier } -// getIdentifierKey... +// GetIdentifierKey returns string that format is service:Version:Group:Side:Revision func (mdi *SubscriberMetadataIdentifier) getIdentifierKey(params ...string) string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.revision) } -// getIdentifierKey... +// GetFilePathKey returns string that format is metadata/path/Version/Group/Side/Revision func (mdi *SubscriberMetadataIdentifier) getFilePathKey(params ...string) string { return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.revision) } diff --git a/metadata/service.go b/metadata/service.go index d85703c95a57183d5c0a5b2445839e946dc6a59b..89df68fb313b1abe63082c0c220b0114c11fca19 100644 --- a/metadata/service.go +++ b/metadata/service.go @@ -22,6 +22,9 @@ import ( gxset "github.com/dubbogo/gost/container/set" ) +// Metadata service is a built-in service around the metadata of Dubbo services, +// whose interface is provided by Dubbo Framework and exported automatically before subscription after other services exporting, +// which may be used for Dubbo subscribers and admin. type MetadataService interface { ServiceName() string ExportURL(url *common.URL) bool diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go index 5773eb80a9b07e300fd980d580ac052bbfd45841..a0ab0be80cc905115e675c1c4dea2b1c748f6c09 100644 --- a/protocol/grpc/client.go +++ b/protocol/grpc/client.go @@ -25,14 +25,63 @@ import ( "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" + "gopkg.in/yaml.v2" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" ) +var ( + clientConf *ClientConfig +) + +func init() { + // load clientconfig from consumer_config + consumerConfig := config.GetConsumerConfig() + + clientConfig := GetClientConfig() + clientConf = &clientConfig + + // check client config and decide whether to use the default config + defer func() { + if clientConf == nil || len(clientConf.ContentSubType) == 0 { + defaultClientConfig := GetDefaultClientConfig() + clientConf = &defaultClientConfig + } + if err := clientConf.Validate(); err != nil { + panic(err) + } + }() + + if consumerConfig.ApplicationConfig == nil { + return + } + protocolConf := config.GetConsumerConfig().ProtocolConf + + if protocolConf == nil { + logger.Info("protocol_conf default use dubbo config") + } else { + grpcConf := protocolConf.(map[interface{}]interface{})[GRPC] + if grpcConf == nil { + logger.Warnf("grpcConf is nil") + return + } + grpcConfByte, err := yaml.Marshal(grpcConf) + if err != nil { + panic(err) + } + err = yaml.Unmarshal(grpcConfByte, clientConf) + if err != nil { + panic(err) + } + } + +} + // Client is gRPC client include client connection and invoker type Client struct { *grpc.ClientConn @@ -43,9 +92,11 @@ type Client struct { func NewClient(url common.URL) *Client { // if global trace instance was set , it means trace function enabled. If not , will return Nooptracer tracer := opentracing.GlobalTracer() - conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock(), - grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads()))) + dailOpts := make([]grpc.DialOption, 0, 4) + dailOpts = append(dailOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( + otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())), + grpc.WithDefaultCallOptions(grpc.CallContentSubtype(clientConf.ContentSubType))) + conn, err := grpc.Dial(url.Location, dailOpts...) if err != nil { panic(err) } diff --git a/protocol/grpc/codec.go b/protocol/grpc/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..7235a3941baefcdce5964ae21dbdbfe9d6ca9cc8 --- /dev/null +++ b/protocol/grpc/codec.go @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "bytes" + "encoding/json" +) + +import ( + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/encoding" +) + +const ( + codecJson = "json" + codecProto = "proto" +) + +func init() { + encoding.RegisterCodec(grpcJson{ + Marshaler: jsonpb.Marshaler{ + EmitDefaults: true, + OrigName: true, + }, + }) +} + +type grpcJson struct { + jsonpb.Marshaler + jsonpb.Unmarshaler +} + +// Name implements grpc encoding package Codec interface method, +// returns the name of the Codec implementation. +func (_ grpcJson) Name() string { + return codecJson +} + +// Marshal implements grpc encoding package Codec interface method,returns the wire format of v. +func (j grpcJson) Marshal(v interface{}) (out []byte, err error) { + if pm, ok := v.(proto.Message); ok { + b := new(bytes.Buffer) + err := j.Marshaler.Marshal(b, pm) + if err != nil { + return nil, err + } + return b.Bytes(), nil + } + return json.Marshal(v) +} + +// Unmarshal implements grpc encoding package Codec interface method,Unmarshal parses the wire format into v. +func (j grpcJson) Unmarshal(data []byte, v interface{}) (err error) { + if pm, ok := v.(proto.Message); ok { + b := bytes.NewBuffer(data) + return j.Unmarshaler.Unmarshal(b, pm) + } + return json.Unmarshal(data, v) +} diff --git a/protocol/grpc/config.go b/protocol/grpc/config.go new file mode 100644 index 0000000000000000000000000000000000000000..e8a9baac8999a009bab435a5bd4e70d102f77b56 --- /dev/null +++ b/protocol/grpc/config.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + perrors "github.com/pkg/errors" +) + +type ( + // ServerConfig currently is empty struct,for future expansion + ServerConfig struct { + } + + // ClientConfig wrap client call parameters + ClientConfig struct { + // content type, more information refer by https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + ContentSubType string `default:"proto" yaml:"content_sub_type" json:"content_sub_type,omitempty"` + } +) + +// GetDefaultClientConfig return grpc client default call options +func GetDefaultClientConfig() ClientConfig { + return ClientConfig{ + ContentSubType: codecProto, + } +} + +// GetDefaultServerConfig currently return empty struct,for future expansion +func GetDefaultServerConfig() ServerConfig { + return ServerConfig{} +} + +// GetClientConfig return grpc client custom call options +func GetClientConfig() ClientConfig { + return ClientConfig{} +} + +// Validate check if custom config encoding is supported in dubbo grpc +func (c *ClientConfig) Validate() error { + if c.ContentSubType != codecJson && c.ContentSubType != codecProto { + return perrors.Errorf(" dubbo-go grpc codec currently only support protoã€json, %s isn't supported,"+ + " please check protocol content_sub_type config", c.ContentSubType) + } + return nil +} + +// Validate currently return empty struct,for future expansion +func (c *ServerConfig) Validate() error { + return nil +} diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go index 83d28519f6f8e28bf471ce2ea6807603c1324911..1af4fafdc606783e937ede63f99e5a08f0b2419e 100644 --- a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go @@ -107,7 +107,6 @@ func (g *dubboGrpc) GenerateImports(file *generator.FileDescriptor) { g.P(`dgrpc "github.com/apache/dubbo-go/protocol/grpc"`) g.P(`"github.com/apache/dubbo-go/protocol/invocation"`) g.P(`"github.com/apache/dubbo-go/protocol"`) - g.P(`"github.com/apache/dubbo-go/config"`) g.P(` ) `) } @@ -266,7 +265,7 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method * g.P(`invo := invocation.NewRPCInvocation("`, methName, `", args, nil)`) g.P("if interceptor == nil {") - g.P("result := base.GetProxyImpl().Invoke(invo)") + g.P("result := base.GetProxyImpl().Invoke(context.Background(), invo)") g.P("return result.Result(), result.Error()") g.P("}") @@ -276,7 +275,7 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method * g.P("}") g.P("handler := func(ctx ", contextPkg, ".Context, req interface{}) (interface{}, error) {") - g.P("result := base.GetProxyImpl().Invoke(invo)") + g.P("result := base.GetProxyImpl().Invoke(context.Background(), invo)") g.P("return result.Result(), result.Error()") g.P("}")