diff --git a/.travis.yml b/.travis.yml index ea6b2035584ba07ae7caa709f83be59cf20c730e..707e64481416b2c090bad05cddce2b3ccebf4535 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,3 +18,6 @@ script: after_success: - bash <(curl -s https://codecov.io/bash) + +notifications: + webhooks: https://oapi.dingtalk.com/robot/send?access_token=f5d6237f2c79db584e75604f7f88db1ce1673c8c0e98451217b28fde791e1d4f \ No newline at end of file diff --git a/LICENSE b/LICENSE index e76f9d9dd705daad997776153b1060f5bf8c2a1d..75b52484ea471f882c29e02693b4f02dba175b5e 100644 --- a/LICENSE +++ b/LICENSE @@ -176,6 +176,19 @@ END OF TERMS AND CONDITIONS + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/NOTICE b/NOTICE index a9bd809c5c43a9d88a773b5f0c421b252abf38de..d7aa899d1cef0fba67826bebd0d587e9cc17ba5d 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache Dubbo Go -Copyright 2018-2019 The Apache Software Foundation +Copyright 2018-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/cluster/cluster_impl/available_cluster_invoker.go b/cluster/cluster_impl/available_cluster_invoker.go index 57ad7fd7f436d81105bebb98792fd24274397a4e..6f6d2dffbbbf2f6c758097b11713ae0c1b6bd387 100644 --- a/cluster/cluster_impl/available_cluster_invoker.go +++ b/cluster/cluster_impl/available_cluster_invoker.go @@ -18,6 +18,7 @@ limitations under the License. package cluster_impl import ( + "context" "fmt" ) @@ -41,7 +42,7 @@ func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { } } -func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { @@ -55,7 +56,7 @@ func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) p for _, ivk := range invokers { if ivk.IsAvailable() { - return ivk.Invoke(invocation) + return ivk.Invoke(ctx, invocation) } } return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))} diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go index 04032a7f24dec0e73acb15921f753921391f1515..de04db1da4e8e6df12960b1a2ee81b0044379d6f 100644 --- a/cluster/cluster_impl/available_cluster_invoker_test.go +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -66,7 +66,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) { invoker.EXPECT().IsAvailable().Return(true) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) } @@ -80,7 +80,7 @@ func TestAvailableClusterInvokerNoAvail(t *testing.T) { invoker.EXPECT().IsAvailable().Return(false) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{}) assert.NotNil(t, result.Error()) assert.True(t, strings.Contains(result.Error().Error(), "no provider available")) diff --git a/cluster/cluster_impl/broadcast_cluster_invoker.go b/cluster/cluster_impl/broadcast_cluster_invoker.go index 238df0acfa7fb946e38bfbfd490bce7c0bb34e60..1b49e9a115252d4eca94bedd557ebcc21fee4cc7 100644 --- a/cluster/cluster_impl/broadcast_cluster_invoker.go +++ b/cluster/cluster_impl/broadcast_cluster_invoker.go @@ -17,6 +17,9 @@ limitations under the License. package cluster_impl +import ( + "context" +) import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/logger" @@ -33,7 +36,7 @@ func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker { } } -func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *broadcastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { @@ -46,7 +49,7 @@ func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) p var result protocol.Result for _, ivk := range invokers { - result = ivk.Invoke(invocation) + result = ivk.Invoke(ctx, invocation) if result.Error() != nil { logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk) err = result.Error() diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go index 565684a8ae25c648ff77aef71d2ced0665202fe7..b20d962e2cffb34d0a151488a1bdf63499e4de86 100644 --- a/cluster/cluster_impl/broadcast_cluster_invoker_test.go +++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go @@ -74,7 +74,7 @@ func Test_BroadcastInvokeSuccess(t *testing.T) { clusterInvoker := registerBroadcast(t, invokers...) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) } @@ -104,6 +104,6 @@ func Test_BroadcastInvokeFailed(t *testing.T) { clusterInvoker := registerBroadcast(t, invokers...) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockFailedResult.Err, result.Error()) } diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go index c8dbeda09f62e88b51dd4ad2b6b09d5715f0b224..46b0ff634e56c45223a5aeb5566b9b1401518960 100644 --- a/cluster/cluster_impl/failback_cluster_invoker.go +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -18,6 +18,7 @@ package cluster_impl import ( + "context" "strconv" "sync" "time" @@ -71,7 +72,7 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { return invoker } -func (invoker *failbackClusterInvoker) process() { +func (invoker *failbackClusterInvoker) process(ctx context.Context) { invoker.ticker = time.NewTicker(time.Second * 1) for range invoker.ticker.C { // check each timeout task and re-run @@ -102,7 +103,7 @@ func (invoker *failbackClusterInvoker) process() { retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) var result protocol.Result - result = retryInvoker.Invoke(retryTask.invocation) + result = retryInvoker.Invoke(ctx, retryTask.invocation) if result.Error() != nil { retryTask.lastInvoker = retryInvoker invoker.checkRetry(retryTask, result.Error()) @@ -126,7 +127,7 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err } } -func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { @@ -150,11 +151,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) //DO INVOKE - result = ivk.Invoke(invocation) + result = ivk.Invoke(ctx, invocation) if result.Error() != nil { invoker.once.Do(func() { invoker.taskList = queue.New(invoker.failbackTasks) - go invoker.process() + go invoker.process(ctx) }) taskLen := invoker.taskList.Len() diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go index 1d2266cabebf591b09188fb723f02126a3f1e0ec..895077922a88abc05416e58459205b449831ac56 100644 --- a/cluster/cluster_impl/failback_cluster_test.go +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -72,7 +72,7 @@ func Test_FailbackSuceess(t *testing.T) { mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) } @@ -102,7 +102,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) { return mockSuccResult }) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) assert.Equal(t, 0, len(result.Attachments())) @@ -150,7 +150,7 @@ func Test_FailbackRetryFailed(t *testing.T) { } // first call should failed. - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) assert.Equal(t, 0, len(result.Attachments())) @@ -192,7 +192,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) { }).Times(10) for i := 0; i < 10; i++ { - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) assert.Equal(t, 0, len(result.Attachments())) @@ -222,14 +222,14 @@ func Test_FailbackOutOfLimit(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11) // reached limit - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) assert.Equal(t, 0, len(result.Attachments())) // all will be out of limit for i := 0; i < 10; i++ { - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) assert.Equal(t, 0, len(result.Attachments())) diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster_impl/failfast_cluster_invoker.go index 734ea2c6cb19bf54a338a76a10c9cfcc59d3954b..49e7c7689f5a19a36154e092a6a83cc39da604ba 100644 --- a/cluster/cluster_impl/failfast_cluster_invoker.go +++ b/cluster/cluster_impl/failfast_cluster_invoker.go @@ -17,6 +17,9 @@ limitations under the License. package cluster_impl +import ( + "context" +) import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/protocol" @@ -32,7 +35,7 @@ func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker { } } -func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { @@ -47,5 +50,5 @@ func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) pr } ivk := invoker.doSelect(loadbalance, invocation, invokers, nil) - return ivk.Invoke(invocation) + return ivk.Invoke(ctx, invocation) } diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go index 1a4342e6c2b74fd6b1359646eeb463bb6dc17d0a..9585f03b7fa8f45a19c7c47e04dcd57cc1e4bb11 100644 --- a/cluster/cluster_impl/failfast_cluster_test.go +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -69,7 +69,7 @@ func Test_FailfastInvokeSuccess(t *testing.T) { mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) res := result.Result().(rest) @@ -89,7 +89,7 @@ func Test_FailfastInvokeFail(t *testing.T) { mockResult := &protocol.RPCResult{Err: perrors.New("error")} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NotNil(t, result.Error()) assert.Equal(t, "error", result.Error().Error()) diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index dcce7369931a11f31fb6b9e4e1a6c0aa0ec7cdf6..6178a05a1226ba629d2456ad6886b02a26288e45 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -18,6 +18,7 @@ package cluster_impl import ( + "context" "strconv" ) @@ -43,7 +44,7 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { } } -func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) @@ -95,7 +96,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr } invoked = append(invoked, ivk) //DO INVOKE - result = ivk.Invoke(invocation) + result = ivk.Invoke(ctx, invocation) if result.Error() != nil { providers = append(providers, ivk.GetUrl().Key()) continue diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go index 78b799320dfa58d55e531c658ec5eb0e69306cff..7bde83ea66a49f9317732ec46da0f11800f846eb 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -77,7 +77,7 @@ type rest struct { success bool } -func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result { count++ var success bool var err error = nil @@ -112,9 +112,9 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio staticDir := directory.NewStaticDirectory(invokers) clusterInvoker := failoverCluster.Join(staticDir) if len(invocations) > 0 { - return clusterInvoker.Invoke(invocations[0]) + return clusterInvoker.Invoke(context.Background(), invocations[0]) } - return clusterInvoker.Invoke(&invocation.RPCInvocation{}) + return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) } func Test_FailoverInvokeSuccess(t *testing.T) { urlParams := url.Values{} @@ -155,14 +155,14 @@ func Test_FailoverDestroy(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(context.Background(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) invokers = append(invokers, NewMockInvoker(url, 1)) } staticDir := directory.NewStaticDirectory(invokers) clusterInvoker := failoverCluster.Join(staticDir) assert.Equal(t, true, clusterInvoker.IsAvailable()) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) count = 0 clusterInvoker.Destroy() diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go index b95f997fef87cf466f07c4e506e41758e7998e52..4d8fe27719eb71fa287fe4142d8e92ca17acfba4 100644 --- a/cluster/cluster_impl/failsafe_cluster_invoker.go +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -17,6 +17,9 @@ package cluster_impl +import ( + "context" +) import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" @@ -42,7 +45,7 @@ func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { } } -func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) @@ -65,7 +68,7 @@ func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) pr ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) //DO INVOKE - result = ivk.Invoke(invocation) + result = ivk.Invoke(ctx, invocation) if result.Error() != nil { // ignore logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error()) diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go index 7888b97c3a02bd4679f8ec5267637b8d2a7c12e4..930b4bb16628e2b363659a65fc174543b7f2cf6e 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -69,7 +69,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) res := result.Result().(rest) @@ -88,7 +88,7 @@ func Test_FailSafeInvokeFail(t *testing.T) { mockResult := &protocol.RPCResult{Err: perrors.New("error")} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) assert.Nil(t, result.Result()) diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go index d6cf2f4b89ab4f322fa758deecae90c60742ef49..c830079ff6d3c29c3385eda289782f5e52877be2 100644 --- a/cluster/cluster_impl/forking_cluster_invoker.go +++ b/cluster/cluster_impl/forking_cluster_invoker.go @@ -18,6 +18,7 @@ limitations under the License. package cluster_impl import ( + "context" "errors" "fmt" "time" @@ -44,7 +45,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { } } -func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { err := invoker.checkWhetherDestroyed() if err != nil { return &protocol.RPCResult{Err: err} @@ -75,7 +76,7 @@ func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) pro resultQ := queue.New(1) for _, ivk := range selected { go func(k protocol.Invoker) { - result := k.Invoke(invocation) + result := k.Invoke(ctx, invocation) err := resultQ.Put(result) if err != nil { logger.Errorf("resultQ put failed with exception: %v.\n", err) diff --git a/cluster/cluster_impl/forking_cluster_test.go b/cluster/cluster_impl/forking_cluster_test.go index 8603f8aedc4e28a3a4ca2f115355debc1a5ecc62..d819781eb23631e6b8eef76e5bdf7d7837f43d53 100644 --- a/cluster/cluster_impl/forking_cluster_test.go +++ b/cluster/cluster_impl/forking_cluster_test.go @@ -87,7 +87,7 @@ func Test_ForkingInvokeSuccess(t *testing.T) { clusterInvoker := registerForking(t, invokers...) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) wg.Wait() } @@ -117,7 +117,7 @@ func Test_ForkingInvokeTimeout(t *testing.T) { clusterInvoker := registerForking(t, invokers...) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NotNil(t, result) assert.NotNil(t, result.Error()) wg.Wait() @@ -156,7 +156,7 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) { clusterInvoker := registerForking(t, invokers...) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.Equal(t, mockResult, result) wg.Wait() } diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go index 5785c02489f95168d5419f0087f38b07c851a4a3..cded5bf16432e6b0c590e15b81c28369889a5f88 100644 --- a/cluster/cluster_impl/registry_aware_cluster_invoker.go +++ b/cluster/cluster_impl/registry_aware_cluster_invoker.go @@ -17,6 +17,9 @@ package cluster_impl +import ( + "context" +) import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" @@ -33,19 +36,19 @@ func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoke } } -func (invoker *registryAwareClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. for _, invoker := range invokers { if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } } //If none of the invokers has a local signal, pick the first one available. for _, invoker := range invokers { if invoker.IsAvailable() { - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } } return nil diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go index 4ae15cc5066c70646dee66cf4ef601202653cb07..7f916c1aaa5609beb3d818e08f5b0950c3273e6d 100644 --- a/cluster/cluster_impl/registry_aware_cluster_test.go +++ b/cluster/cluster_impl/registry_aware_cluster_test.go @@ -39,13 +39,13 @@ func Test_RegAwareInvokeSuccess(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(context.Background(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) invokers = append(invokers, NewMockInvoker(url, 1)) } staticDir := directory.NewStaticDirectory(invokers) clusterInvoker := regAwareCluster.Join(staticDir) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) count = 0 } @@ -55,14 +55,14 @@ func TestDestroy(t *testing.T) { invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url, _ := common.NewURL(context.Background(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) invokers = append(invokers, NewMockInvoker(url, 1)) } staticDir := directory.NewStaticDirectory(invokers) clusterInvoker := regAwareCluster.Join(staticDir) assert.Equal(t, true, clusterInvoker.IsAvailable()) - result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) count = 0 clusterInvoker.Destroy() diff --git a/cluster/router/condition_router_test.go b/cluster/router/condition_router_test.go index 7d8b0d88cab688e6ea10d1562a27de4609d51f58..7acbdabc9b6c1976664fce7596ce22c187f48068 100644 --- a/cluster/router/condition_router_test.go +++ b/cluster/router/condition_router_test.go @@ -93,7 +93,7 @@ type rest struct { var count int -func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (bi *MockInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { count++ var success bool var err error = nil diff --git a/common/constant/key.go b/common/constant/key.go index 7538a2995a89b0951c29f532b9ce9e475198f54e..d201570b9ad5415694af5598fba7983289b2b295 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -137,3 +137,7 @@ const ( NACOS_PROTOCOL_KEY = "protocol" NACOS_PATH_KEY = "path" ) + +const ( + TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx" +) diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index c8158b72970388806abfcff4c89099bba3b93d37..d0be491d406170ea4c52e65f70f0dfbe7b1b3cb6 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -18,6 +18,7 @@ package proxy import ( + "context" "reflect" "sync" ) @@ -95,8 +96,13 @@ func (p *Proxy) Implement(v common.RPCService) { start := 0 end := len(in) + invCtx := context.Background() if end > 0 { if in[0].Type().String() == "context.Context" { + if !in[0].IsNil() { + // the user declared context as method's parameter + invCtx = in[0].Interface().(context.Context) + } start += 1 } if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr { @@ -130,7 +136,7 @@ func (p *Proxy) Implement(v common.RPCService) { inv.SetAttachments(k, value) } - result := p.invoke.Invoke(inv) + result := p.invoke.Invoke(invCtx, inv) err = result.Error() logger.Infof("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 0c323aaa8c0541d8149654a2759480c559a615e0..114cfee2363022da5f7957a825a16fc42b8c928f 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -18,6 +18,7 @@ package proxy_factory import ( + "context" "reflect" "strings" ) @@ -83,7 +84,7 @@ type ProxyInvoker struct { } // Invoke ... -func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { result := &protocol.RPCResult{} result.SetAttachments(invocation.Attachments()) @@ -112,7 +113,7 @@ func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result { in := []reflect.Value{svc.Rcvr()} if method.CtxType() != nil { - in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later. + in = append(in, method.SuiteContext(ctx)) } // prepare argv diff --git a/filter/filter.go b/filter/filter.go index 5bd78998a76a1b0e8af99b0b3f0d7e6c103bb794..6c9e4455476b42d97718b5364d9687ac9671f687 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -17,12 +17,15 @@ package filter +import ( + "context" +) import ( "github.com/apache/dubbo-go/protocol" ) // Extension - Filter type Filter interface { - Invoke(protocol.Invoker, protocol.Invocation) protocol.Result - OnResponse(protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result + Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result + OnResponse(context.Context, protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result } diff --git a/filter/filter_impl/access_log_filter.go b/filter/filter_impl/access_log_filter.go index 20ddd7d1670f987fbdfe6c7ff92689f0bc94a3e9..a07f479742a578038f1beeeb12c4950fe850ba32 100644 --- a/filter/filter_impl/access_log_filter.go +++ b/filter/filter_impl/access_log_filter.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "os" "reflect" "strings" @@ -67,13 +68,13 @@ type AccessLogFilter struct { } // Invoke ... -func (ef *AccessLogFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { accessLog := invoker.GetUrl().GetParam(constant.ACCESS_LOG_KEY, "") if len(accessLog) > 0 { accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog} ef.logIntoChannel(accessLogData) } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // it won't block the invocation @@ -121,7 +122,7 @@ func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocati } // OnResponse ... -func (ef *AccessLogFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *AccessLogFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/access_log_filter_test.go b/filter/filter_impl/access_log_filter_test.go index 5076962486da1ca40c4bdf6d7ba4b75a05bb0f92..14b9166b0fc486638c77388c76b49423a8d4a83e 100644 --- a/filter/filter_impl/access_log_filter_test.go +++ b/filter/filter_impl/access_log_filter_test.go @@ -49,7 +49,7 @@ func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) { inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) accessLogFilter := GetAccessLogFilter() - result := accessLogFilter.Invoke(invoker, inv) + result := accessLogFilter.Invoke(context.Background(), invoker, inv) assert.Nil(t, result.Error()) } @@ -70,13 +70,13 @@ func TestAccessLogFilter_Invoke_Default_Config(t *testing.T) { inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) accessLogFilter := GetAccessLogFilter() - result := accessLogFilter.Invoke(invoker, inv) + result := accessLogFilter.Invoke(context.Background(), invoker, inv) assert.Nil(t, result.Error()) } func TestAccessLogFilter_OnResponse(t *testing.T) { result := &protocol.RPCResult{} accessLogFilter := GetAccessLogFilter() - response := accessLogFilter.OnResponse(result, nil, nil) + response := accessLogFilter.OnResponse(nil, result, nil, nil) assert.Equal(t, result, response) } diff --git a/filter/filter_impl/active_filter.go b/filter/filter_impl/active_filter.go index 0c3f03045523b1bec5451fe45428928c751ad5b1..23f2c8e25609dff89392107251715fe6f5175f09 100644 --- a/filter/filter_impl/active_filter.go +++ b/filter/filter_impl/active_filter.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "strconv" ) @@ -43,17 +44,15 @@ type ActiveFilter struct { } // Invoke ... -func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments())) - invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10)) protocol.BeginCount(invoker.GetUrl(), invocation.MethodName()) - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - +func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64) if err != nil { result.SetError(err) diff --git a/filter/filter_impl/active_filter_test.go b/filter/filter_impl/active_filter_test.go index acc4f9121641bfbbc484a711c0ea04dffeab55e3..7b355086f9d48b3fb864ed40d1cb5db999543d77 100644 --- a/filter/filter_impl/active_filter_test.go +++ b/filter/filter_impl/active_filter_test.go @@ -28,7 +28,7 @@ func TestActiveFilter_Invoke(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) invoker.EXPECT().Invoke(gomock.Any()).Return(nil) invoker.EXPECT().GetUrl().Return(url).Times(1) - filter.Invoke(invoker, invoc) + filter.Invoke(context.Background(), invoker, invoc) assert.True(t, invoc.AttachmentsByKey(dubboInvokeStartTime, "") != "") } @@ -48,7 +48,7 @@ func TestActiveFilter_OnResponse(t *testing.T) { result := &protocol.RPCResult{ Err: errors.New("test"), } - filter.OnResponse(result, invoker, invoc) + filter.OnResponse(nil, result, invoker, invoc) methodStatus := protocol.GetMethodStatus(url, "test") urlStatus := protocol.GetURLStatus(url) diff --git a/filter/filter_impl/echo_filter.go b/filter/filter_impl/echo_filter.go index 981318d2a5d290b00686b36b65f483f241c57cf5..4ccecc2dbc68383071b789692babadac2c80c7fd 100644 --- a/filter/filter_impl/echo_filter.go +++ b/filter/filter_impl/echo_filter.go @@ -17,6 +17,9 @@ package filter_impl +import ( + "context" +) import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -39,7 +42,7 @@ func init() { type EchoFilter struct{} // Invoke ... -func (ef *EchoFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking echo filter.") logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 { @@ -49,11 +52,11 @@ func (ef *EchoFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invoc } } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (ef *EchoFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *EchoFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/echo_filter_test.go b/filter/filter_impl/echo_filter_test.go index b75b9c19a1f073cc23dfccfa97a51e456e59d9cc..fc09bdce696c6be3c9e11d0ac864b187d1d85cde 100644 --- a/filter/filter_impl/echo_filter_test.go +++ b/filter/filter_impl/echo_filter_test.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "testing" ) @@ -33,12 +34,10 @@ import ( func TestEchoFilter_Invoke(t *testing.T) { filter := GetFilter() - result := filter.Invoke(protocol.NewBaseInvoker(common.URL{}), - invocation.NewRPCInvocation("$echo", []interface{}{"OK"}, nil)) + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(common.URL{}), invocation.NewRPCInvocation("$echo", []interface{}{"OK"}, nil)) assert.Equal(t, "OK", result.Result()) - result = filter.Invoke(protocol.NewBaseInvoker(common.URL{}), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, nil)) + result = filter.Invoke(context.Background(), protocol.NewBaseInvoker(common.URL{}), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, nil)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } diff --git a/filter/filter_impl/execute_limit_filter.go b/filter/filter_impl/execute_limit_filter.go index 270364bd011e6a94afb9a55c861b2440ebf27ae0..f9dab06ebe7d7e02be5b6ae23587495d2df7d95b 100644 --- a/filter/filter_impl/execute_limit_filter.go +++ b/filter/filter_impl/execute_limit_filter.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "strconv" "sync" "sync/atomic" @@ -77,7 +78,7 @@ type ExecuteState struct { } // Invoke ... -func (ef *ExecuteLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { methodConfigPrefix := "methods." + invocation.MethodName() + "." url := invoker.GetUrl() limitTarget := url.ServiceKey() @@ -99,7 +100,7 @@ func (ef *ExecuteLimitFilter) Invoke(invoker protocol.Invoker, invocation protoc } if limitRate < 0 { - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } state, _ := ef.executeState.LoadOrStore(limitTarget, &ExecuteState{ @@ -115,11 +116,11 @@ func (ef *ExecuteLimitFilter) Invoke(invoker protocol.Invoker, invocation protoc return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(url, invocation) } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (ef *ExecuteLimitFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *ExecuteLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/execute_limit_filter_test.go b/filter/filter_impl/execute_limit_filter_test.go index e3836251df4ba78befcbb5720affb5dbc3cbdf1f..ae8641f2db0b98b59f9939cfc85f3ad096b1bc7f 100644 --- a/filter/filter_impl/execute_limit_filter_test.go +++ b/filter/filter_impl/execute_limit_filter_test.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "net/url" "testing" ) @@ -43,7 +44,7 @@ func TestExecuteLimitFilter_Invoke_Ignored(t *testing.T) { limitFilter := GetExecuteLimitFilter() - result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + result := limitFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), invoc) assert.NotNil(t, result) assert.Nil(t, result.Error()) } @@ -60,7 +61,7 @@ func TestExecuteLimitFilter_Invoke_Configure_Error(t *testing.T) { limitFilter := GetExecuteLimitFilter() - result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + result := limitFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), invoc) assert.NotNil(t, result) assert.Nil(t, result.Error()) } @@ -77,7 +78,7 @@ func TestExecuteLimitFilter_Invoke(t *testing.T) { limitFilter := GetExecuteLimitFilter() - result := limitFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + result := limitFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), invoc) assert.NotNil(t, result) assert.Nil(t, result.Error()) } diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go index 819474a78da2ed915e7ec635a76cbeb53bef930c..fec1c3aa51451d5cb18e037b14ec778393072a93 100644 --- a/filter/filter_impl/generic_filter.go +++ b/filter/filter_impl/generic_filter.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "reflect" "strings" ) @@ -46,7 +47,7 @@ func init() { type GenericFilter struct{} // Invoke ... -func (ef *GenericFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { oldArguments := invocation.Arguments() @@ -62,14 +63,14 @@ func (ef *GenericFilter) Invoke(invoker protocol.Invoker, invocation protocol.In } newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) newInvocation.SetReply(invocation.Reply()) - return invoker.Invoke(newInvocation) + return invoker.Invoke(ctx, newInvocation) } } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (ef *GenericFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *GenericFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/generic_service_filter.go b/filter/filter_impl/generic_service_filter.go index 1562268fcbbc84d5b747cf99619b0ab501f75f24..c577ae2077fbd042def0a7209459ec59c62b684f 100644 --- a/filter/filter_impl/generic_service_filter.go +++ b/filter/filter_impl/generic_service_filter.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "reflect" "strings" ) @@ -51,12 +52,12 @@ func init() { type GenericServiceFilter struct{} // Invoke ... -func (ef *GenericServiceFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *GenericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking generic service filter.") logger.Debugf("generic service filter methodName:%v,args:%v", invocation.MethodName(), len(invocation.Arguments())) if invocation.MethodName() != constant.GENERIC || len(invocation.Arguments()) != 3 { - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } var ( @@ -109,11 +110,11 @@ func (ef *GenericServiceFilter) Invoke(invoker protocol.Invoker, invocation prot } newInvocation := invocation2.NewRPCInvocation(methodName, newParams, invocation.Attachments()) newInvocation.SetReply(invocation.Reply()) - return invoker.Invoke(newInvocation) + return invoker.Invoke(ctx, newInvocation) } // OnResponse ... -func (ef *GenericServiceFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *GenericServiceFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 && result.Result() != nil { v := reflect.ValueOf(result.Result()) if v.Kind() == reflect.Ptr { diff --git a/filter/filter_impl/generic_service_filter_test.go b/filter/filter_impl/generic_service_filter_test.go index e36ec5086ecacffbf56a0da6dd9249ffd6fec649..8211e717564465bba3009772715a3ab1cd3322dd 100644 --- a/filter/filter_impl/generic_service_filter_test.go +++ b/filter/filter_impl/generic_service_filter_test.go @@ -99,7 +99,7 @@ func TestGenericServiceFilter_Invoke(t *testing.T) { rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) filter := GetGenericServiceFilter() url, _ := common.NewURL(context.Background(), "testprotocol://127.0.0.1:20000/com.test.Path") - result := filter.Invoke(&proxy_factory.ProxyInvoker{BaseInvoker: *protocol.NewBaseInvoker(url)}, rpcInvocation) + result := filter.Invoke(context.Background(), &proxy_factory.ProxyInvoker{BaseInvoker: *protocol.NewBaseInvoker(url)}, rpcInvocation) assert.NotNil(t, result) assert.Nil(t, result.Error()) } @@ -124,7 +124,7 @@ func TestGenericServiceFilter_ResponseTestStruct(t *testing.T) { filter := GetGenericServiceFilter() methodName := "$invoke" rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) - r := filter.OnResponse(result, nil, rpcInvocation) + r := filter.OnResponse(nil, result, nil, rpcInvocation) assert.NotNil(t, r.Result()) assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.Map) } @@ -142,7 +142,7 @@ func TestGenericServiceFilter_ResponseString(t *testing.T) { filter := GetGenericServiceFilter() methodName := "$invoke" rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) - r := filter.OnResponse(result, nil, rpcInvocation) + r := filter.OnResponse(nil, result, nil, rpcInvocation) assert.NotNil(t, r.Result()) assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.String) } diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/filter_impl/graceful_shutdown_filter.go index 1af7e1f8c32ea3924550399a7ff5e76c68368636..95e625b2d56895a4d57823e4e0e2e7d1d5e90a08 100644 --- a/filter/filter_impl/graceful_shutdown_filter.go +++ b/filter/filter_impl/graceful_shutdown_filter.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "sync/atomic" ) @@ -52,16 +53,16 @@ type gracefulShutdownFilter struct { shutdownConfig *config.ShutdownConfig } -func (gf *gracefulShutdownFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if gf.rejectNewRequest() { logger.Info("The application is closing, new request will be rejected.") return gf.getRejectHandler().RejectedExecution(invoker.GetUrl(), invocation) } atomic.AddInt32(&gf.activeCount, 1) - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } -func (gf *gracefulShutdownFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { atomic.AddInt32(&gf.activeCount, -1) // although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true. if gf.shutdownConfig != nil && gf.activeCount <= 0 { diff --git a/filter/filter_impl/graceful_shutdown_filter_test.go b/filter/filter_impl/graceful_shutdown_filter_test.go index fc437c3557fa452273e770d3d50678401ba3b33b..4c670933e3dcec29ad9ae7bfef250b4236ae7c54 100644 --- a/filter/filter_impl/graceful_shutdown_filter_test.go +++ b/filter/filter_impl/graceful_shutdown_filter_test.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "net/url" "testing" ) @@ -53,7 +54,7 @@ func TestGenericFilter_Invoke(t *testing.T) { assert.Equal(t, extension.GetRejectedExecutionHandler(constant.DEFAULT_KEY), shutdownFilter.getRejectHandler()) - result := shutdownFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), invoc) + result := shutdownFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), invoc) assert.NotNil(t, result) assert.Nil(t, result.Error()) @@ -64,7 +65,7 @@ func TestGenericFilter_Invoke(t *testing.T) { shutdownFilter.shutdownConfig = providerConfig.ShutdownConfig assert.True(t, shutdownFilter.rejectNewRequest()) - result = shutdownFilter.OnResponse(nil, protocol.NewBaseInvoker(*invokeUrl), invoc) + result = shutdownFilter.OnResponse(nil, nil, protocol.NewBaseInvoker(*invokeUrl), invoc) rejectHandler := &common2.OnlyLogRejectedExecutionHandler{} extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler { diff --git a/filter/filter_impl/hystrix_filter.go b/filter/filter_impl/hystrix_filter.go index f3f3bf150f3151d9c63a0c84d627ec21958b77de..c2834480e72b81d1c8d5d8973db06e9487692118 100644 --- a/filter/filter_impl/hystrix_filter.go +++ b/filter/filter_impl/hystrix_filter.go @@ -17,6 +17,7 @@ package filter_impl import ( + "context" "fmt" "regexp" "sync" @@ -88,8 +89,7 @@ type HystrixFilter struct { } // Invoke ... -func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - +func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName()) // Do the configuration if the circuit breaker is created for the first time @@ -121,12 +121,12 @@ func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.In configLoadMutex.RUnlock() if err != nil { logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: ", cmdName, err) - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName) var result protocol.Result _ = hystrix.Do(cmdName, func() error { - result = invoker.Invoke(invocation) + result = invoker.Invoke(ctx, invocation) err := result.Error() if err != nil { result.SetError(NewHystrixFilterError(err, false)) @@ -151,7 +151,7 @@ func (hf *HystrixFilter) Invoke(invoker protocol.Invoker, invocation protocol.In } // OnResponse ... -func (hf *HystrixFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } @@ -225,6 +225,7 @@ func initHystrixConfigConsumer() error { } return nil } + func initHystrixConfigProvider() error { if config.GetProviderConfig().FilterConf == nil { return perrors.Errorf("no config for hystrix") diff --git a/filter/filter_impl/hystrix_filter_test.go b/filter/filter_impl/hystrix_filter_test.go index 2bbc3e079e7ae563db1efa18f82423931fd5919d..894573036ae6dd9edca88e8e4cdd92e7643abcb5 100644 --- a/filter/filter_impl/hystrix_filter_test.go +++ b/filter/filter_impl/hystrix_filter_test.go @@ -17,6 +17,7 @@ package filter_impl import ( + "context" "regexp" "testing" ) @@ -125,7 +126,7 @@ type testMockSuccessInvoker struct { protocol.BaseInvoker } -func (iv *testMockSuccessInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (iv *testMockSuccessInvoker) Invoke(context context.Context, invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ Rest: "Sucess", Err: nil, @@ -136,7 +137,7 @@ type testMockFailInvoker struct { protocol.BaseInvoker } -func (iv *testMockFailInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (iv *testMockFailInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ Err: errors.Errorf("exception"), } @@ -144,7 +145,7 @@ func (iv *testMockFailInvoker) Invoke(invocation protocol.Invocation) protocol.R func TestHystrixFilter_Invoke_Success(t *testing.T) { hf := &HystrixFilter{} - result := hf.Invoke(&testMockSuccessInvoker{}, &invocation.RPCInvocation{}) + result := hf.Invoke(context.Background(), &testMockSuccessInvoker{}, &invocation.RPCInvocation{}) assert.NotNil(t, result) assert.NoError(t, result.Error()) assert.NotNil(t, result.Result()) @@ -152,7 +153,7 @@ func TestHystrixFilter_Invoke_Success(t *testing.T) { func TestHystrixFilter_Invoke_Fail(t *testing.T) { hf := &HystrixFilter{} - result := hf.Invoke(&testMockFailInvoker{}, &invocation.RPCInvocation{}) + result := hf.Invoke(context.Background(), &testMockFailInvoker{}, &invocation.RPCInvocation{}) assert.NotNil(t, result) assert.Error(t, result.Error()) } @@ -164,7 +165,7 @@ func TestHystricFilter_Invoke_CircuitBreak(t *testing.T) { resChan := make(chan protocol.Result, 50) for i := 0; i < 50; i++ { go func() { - result := hf.Invoke(&testMockFailInvoker{}, &invocation.RPCInvocation{}) + result := hf.Invoke(context.Background(), &testMockFailInvoker{}, &invocation.RPCInvocation{}) resChan <- result }() } @@ -189,7 +190,7 @@ func TestHystricFilter_Invoke_CircuitBreak_Omit_Exception(t *testing.T) { resChan := make(chan protocol.Result, 50) for i := 0; i < 50; i++ { go func() { - result := hf.Invoke(&testMockFailInvoker{}, &invocation.RPCInvocation{}) + result := hf.Invoke(context.Background(), &testMockFailInvoker{}, &invocation.RPCInvocation{}) resChan <- result }() } diff --git a/filter/filter_impl/token_filter.go b/filter/filter_impl/token_filter.go index 8eab7adf40eac1f5097660970740c0d3a367a407..2340e7271fb6ae0dac582341c3537931b7fbc4aa 100644 --- a/filter/filter_impl/token_filter.go +++ b/filter/filter_impl/token_filter.go @@ -18,6 +18,7 @@ limitations under the License. package filter_impl import ( + "context" "strings" ) @@ -44,23 +45,23 @@ func init() { type TokenFilter struct{} // Invoke ... -func (tf *TokenFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { invokerTkn := invoker.GetUrl().GetParam(constant.TOKEN_KEY, "") if len(invokerTkn) > 0 { attachs := invocation.Attachments() remoteTkn, exist := attachs[constant.TOKEN_KEY] if exist && strings.EqualFold(invokerTkn, remoteTkn) { - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } return &protocol.RPCResult{Err: perrors.Errorf("Invalid token! Forbid invoke remote service %v method %s ", invoker, invocation.MethodName())} } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (tf *TokenFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (tf *TokenFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/token_filter_test.go b/filter/filter_impl/token_filter_test.go index 675d33dc7d401b04f59037c1ec2eb44c8d6ecbe4..672082c729bc371a40573a66d13bc57a7024186b 100644 --- a/filter/filter_impl/token_filter_test.go +++ b/filter/filter_impl/token_filter_test.go @@ -18,6 +18,7 @@ limitations under the License. package filter_impl import ( + "context" "net/url" "testing" ) @@ -41,8 +42,10 @@ func TestTokenFilter_Invoke(t *testing.T) { common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) attch := make(map[string]string, 0) attch[constant.TOKEN_KEY] = "ori_key" - result := filter.Invoke(protocol.NewBaseInvoker(*url), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := filter.Invoke(context.Background(), + protocol.NewBaseInvoker(*url), + invocation.NewRPCInvocation("MethodName", + []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } @@ -53,8 +56,7 @@ func TestTokenFilter_InvokeEmptyToken(t *testing.T) { url := common.URL{} attch := make(map[string]string, 0) attch[constant.TOKEN_KEY] = "ori_key" - result := filter.Invoke(protocol.NewBaseInvoker(url), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } @@ -66,8 +68,7 @@ func TestTokenFilter_InvokeEmptyAttach(t *testing.T) { common.WithParams(url.Values{}), common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) attch := make(map[string]string, 0) - result := filter.Invoke(protocol.NewBaseInvoker(*url), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(*url), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.NotNil(t, result.Error()) } @@ -79,7 +80,7 @@ func TestTokenFilter_InvokeNotEqual(t *testing.T) { common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) attch := make(map[string]string, 0) attch[constant.TOKEN_KEY] = "err_key" - result := filter.Invoke(protocol.NewBaseInvoker(*url), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := filter.Invoke(context.Background(), + protocol.NewBaseInvoker(*url), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.NotNil(t, result.Error()) } diff --git a/filter/filter_impl/tps_limit_filter.go b/filter/filter_impl/tps_limit_filter.go index 0742e7e7108f0a625ef01aa563d0d10d0d424cf0..52ac5d147904011bd5286c90bc565584f1869f33 100644 --- a/filter/filter_impl/tps_limit_filter.go +++ b/filter/filter_impl/tps_limit_filter.go @@ -17,6 +17,9 @@ package filter_impl +import ( + "context" +) import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -52,23 +55,23 @@ type TpsLimitFilter struct { } // Invoke ... -func (t TpsLimitFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { url := invoker.GetUrl() tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "") rejectedExeHandler := url.GetParam(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY) if len(tpsLimiter) > 0 { allow := extension.GetTpsLimiter(tpsLimiter).IsAllowable(invoker.GetUrl(), invocation) if allow { - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } logger.Errorf("The invocation was rejected due to over the tps limitation, url: %s ", url.String()) return extension.GetRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation) } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (t TpsLimitFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (t TpsLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/tps_limit_filter_test.go b/filter/filter_impl/tps_limit_filter_test.go index 5e04804aa23c4e6e417f6bb9975a3269a2118739..cc423ae1e5f3589dd60b0c8655f1123c290f0ffc 100644 --- a/filter/filter_impl/tps_limit_filter_test.go +++ b/filter/filter_impl/tps_limit_filter_test.go @@ -18,6 +18,7 @@ package filter_impl import ( + "context" "net/url" "testing" ) @@ -45,8 +46,10 @@ func TestTpsLimitFilter_Invoke_With_No_TpsLimiter(t *testing.T) { common.WithParamsValue(constant.TPS_LIMITER_KEY, "")) attch := make(map[string]string, 0) - result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := tpsFilter.Invoke(context.Background(), + protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", + []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) @@ -67,8 +70,10 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) attch := make(map[string]string, 0) - result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := tpsFilter.Invoke(context.Background(), + protocol.NewBaseInvoker(*invokeUrl), + invocation.NewRPCInvocation("MethodName", + []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } @@ -96,8 +101,8 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) attch := make(map[string]string, 0) - result := tpsFilter.Invoke(protocol.NewBaseInvoker(*invokeUrl), - invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + result := tpsFilter.Invoke(context.Background(), + protocol.NewBaseInvoker(*invokeUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } diff --git a/filter/filter_impl/tracing_filter.go b/filter/filter_impl/tracing_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..b8058aa601af98b5416da882321546675459c413 --- /dev/null +++ b/filter/filter_impl/tracing_filter.go @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter_impl + +import ( + "context" +) + +import ( + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" +) + +const ( + tracingFilterName = "tracing" +) + +// this should be executed before users set their own Tracer +func init() { + extension.SetFilter(tracingFilterName, newTracingFilter) + opentracing.SetGlobalTracer(opentracing.NoopTracer{}) +} + +var ( + errorKey = "ErrorMsg" + successKey = "Success" +) + +// if you wish to using opentracing, please add the this filter into your filter attribute in your configure file. +// notice that this could be used in both client-side and server-side. +type tracingFilter struct { +} + +func (tf *tracingFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + var ( + spanCtx context.Context + span opentracing.Span + ) + operationName := invoker.GetUrl().ServiceKey() + "#" + invocation.MethodName() + + wiredCtx := ctx.Value(constant.TRACING_REMOTE_SPAN_CTX) + preSpan := opentracing.SpanFromContext(ctx) + + if preSpan != nil { + // it means that someone already create a span to trace, so we use the span to be the parent span + span = opentracing.StartSpan(operationName, opentracing.ChildOf(preSpan.Context())) + spanCtx = opentracing.ContextWithSpan(ctx, span) + + } else if wiredCtx != nil { + + // it means that there has a remote span, usually from client side. so we use this as the parent + span = opentracing.StartSpan(operationName, opentracing.ChildOf(wiredCtx.(opentracing.SpanContext))) + spanCtx = opentracing.ContextWithSpan(ctx, span) + } else { + // it means that there is not any span, so we create a span as the root span. + span, spanCtx = opentracing.StartSpanFromContext(ctx, operationName) + } + + defer func() { + span.Finish() + }() + + result := invoker.Invoke(spanCtx, invocation) + span.SetTag(successKey, result.Error() != nil) + if result.Error() != nil { + span.LogFields(log.String(errorKey, result.Error().Error())) + } + return result +} + +func (tf *tracingFilter) OnResponse(ctx context.Context, result protocol.Result, + invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result +} + +var ( + tracingFilterInstance *tracingFilter +) + +func newTracingFilter() filter.Filter { + if tracingFilterInstance == nil { + tracingFilterInstance = &tracingFilter{} + } + return tracingFilterInstance +} diff --git a/filter/filter_impl/tracing_filter_test.go b/filter/filter_impl/tracing_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..30d9d2b7dde318e2ca86127ba480c9a53d7e6f66 --- /dev/null +++ b/filter/filter_impl/tracing_filter_test.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/opentracing/opentracing-go" +) + +import ( + "github.com/apache/dubbo-go/common/constant" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestTracingFilter_Invoke(t *testing.T) { + url, _ := common.NewURL(context.Background(), + "dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider"+ + "&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser."+ + "loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name="+ + "BDTService&organization=ikurento.com&owner=ZX®istry.role=3&retries=&"+ + "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") + invoker := protocol.NewBaseInvoker(url) + + attach := make(map[string]string, 10) + inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) + ctx := context.Background() + tf := newTracingFilter() + + // do not has any span + tf.Invoke(ctx, invoker, inv) + + span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Operation") + defer span.Finish() + + // has previous span + tf.Invoke(ctx, invoker, inv) + + ctx = context.Background() + // has remote ctx + ctx = context.WithValue(context.Background(), constant.TRACING_REMOTE_SPAN_CTX, span.Context()) + tf.Invoke(ctx, invoker, inv) +} diff --git a/go.mod b/go.mod index c89b3978f799ca0d68302cc73840df74a67e55dc..db6dc92c63176334f6dfa0436889ffab6f3c9c53 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb + github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 // indirect github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec diff --git a/go.sum b/go.sum index b23cb24ea1ce5af0b7b75ac7569e81c1e2bac259..f215a81b209579c0bb5de0123153b10b7a36767b 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/SAP/go-hdb v0.12.0 h1:5hBQZ2jjyZ268qjDmoDZJuCyLzR6oRLI60eYzmTW9m4= github.com/SAP/go-hdb v0.12.0/go.mod h1:etBT+FAi1t5k3K3tf5vQTnosgYmhDkRi8jEnQqCnxF0= github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc h1:LkkwnbY+S8WmwkWq1SVyRWMH9nYWO1P5XN3OD1tts/w= github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc/go.mod h1:ARgCUhI1MHQH+ONky/PAtmVHQrP5JlGY0F3poXOp/fA= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= @@ -110,8 +112,12 @@ github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8= github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.8.0 h1:uE6Fp4fOcAJdc1wTQXLJ+SYistkbG1dNoi6Zs1+Ybvk= github.com/envoyproxy/go-control-plane v0.8.0/go.mod h1:GSSbY9P1neVhdY7G4wu+IK1rk/dqhiCC/4ExuWJZVuk= github.com/envoyproxy/protoc-gen-validate v0.0.14 h1:YBW6/cKy9prEGRYLnaGa4IDhzxZhRCtKsax8srGKDnM= @@ -146,6 +152,7 @@ github.com/gocql/gocql v0.0.0-20180617115710-e06f8c1bcd78/go.mod h1:4Fw1eo5iaEhD github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= @@ -184,6 +191,9 @@ github.com/gophercloud/gophercloud v0.0.0-20180828235145-f29afc2cceca/go.mod h1: github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:AQwinXlbQR2HvPjQZOmDhRqsv5mZf+Jb1RnSLxcqZcI= @@ -319,6 +329,7 @@ github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5j github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5WuCYnc6RtbfLVAB7nmC5M= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= @@ -361,14 +372,22 @@ github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0= github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y= github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= +github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA= +github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= +github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/ory/dockertest v3.3.4+incompatible h1:VrpM6Gqg7CrPm3bL4Wm1skO+zFWLbh7/Xb5kGEbJRh8= github.com/ory/dockertest v3.3.4+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs= github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c h1:vwpFWvAO8DeIZfFeqASzZfsxuWPno9ncAebBEP0N3uE= @@ -380,9 +399,11 @@ github.com/patrickmn/go-cache v0.0.0-20180527043350-9f6ff22cfff8 h1:BR6MM54q4W9p github.com/patrickmn/go-cache v0.0.0-20180527043350-9f6ff22cfff8/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1 h1:ccV59UEOTzVDnDUEFdT95ZzHVZ+5+158q8+SJb2QV5w= @@ -404,6 +425,7 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 h1:Wdi9nwnhFNAlseAOekn6B5G/+GMtks9UKbvRU/CMM/o= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -438,6 +460,7 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -457,6 +480,7 @@ github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 h1:G3dpKMzFDjgEh2q1Z7zUUtKa8ViPtH+ocF0bE0g00O8= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/uber/jaeger-client-go v2.17.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo= github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= @@ -535,6 +559,7 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index f99c342e088644f94583d7b38fd2ac07c2fd6acd..4131c4533742858db1827f0e6256d3080f38f118 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -18,6 +18,7 @@ package dubbo import ( + "context" "strconv" "sync" ) @@ -57,8 +58,7 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { } // Invoke ... -func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { - +func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( err error result protocol.RPCResult diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 8a032d0ca9536e53dc18c43f8e699212f7c30ec4..e360d57b8cdd61674d35a665e8ee85e03421cc8f 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -18,6 +18,7 @@ package dubbo import ( + "context" "sync" "testing" "time" @@ -53,14 +54,14 @@ func TestDubboInvoker_Invoke(t *testing.T) { invocation.WithReply(user), invocation.WithAttachments(map[string]string{"test_key": "test_value"})) // Call - res := invoker.Invoke(inv) + res := invoker.Invoke(context.Background(), inv) assert.NoError(t, res.Error()) assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) assert.Equal(t, "test_value", res.Attachments()["test_key"]) // test attachments for request/response // CallOneway inv.SetAttachments(constant.ASYNC_KEY, "true") - res = invoker.Invoke(inv) + res = invoker.Invoke(context.Background(), inv) assert.NoError(t, res.Error()) // AsyncCall @@ -71,13 +72,13 @@ func TestDubboInvoker_Invoke(t *testing.T) { assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }) - res = invoker.Invoke(inv) + res = invoker.Invoke(context.Background(), inv) assert.NoError(t, res.Error()) // Err_No_Reply inv.SetAttachments(constant.ASYNC_KEY, "false") inv.SetReply(nil) - res = invoker.Invoke(inv) + res = invoker.Invoke(context.Background(), inv) assert.EqualError(t, res.Error(), "request need @response") // destroy diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 5f493950e1981087b47404d8c0a05f82eb380fe7..926afa9a103201b04639cf283f5c87c53eb12541 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -18,6 +18,7 @@ package dubbo import ( + "context" "fmt" "net/url" "sync" @@ -271,7 +272,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { args := p.Body.(map[string]interface{})["args"].([]interface{}) inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments) - result := invoker.Invoke(inv) + result := invoker.Invoke(context.Background(), inv) if err := result.Error(); err != nil { p.Header.ResponseStatus = hessian.Response_OK p.Body = hessian.NewResponse(nil, err, result.Attachments()) diff --git a/protocol/grpc/common_test.go b/protocol/grpc/common_test.go index 7f78bdc40d07a9089c1cf40f55803f04b39cb949..165b82fabc5703a720766b04659b158d2b3fdbdf 100644 --- a/protocol/grpc/common_test.go +++ b/protocol/grpc/common_test.go @@ -97,7 +97,7 @@ func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec f invo := invocation.NewRPCInvocation("SayHello", args, nil) if interceptor == nil { - result := base.GetProxyImpl().Invoke(invo) + result := base.GetProxyImpl().Invoke(context.Background(), invo) return result.Result(), result.Error() } info := &native_grpc.UnaryServerInfo{ @@ -105,7 +105,7 @@ func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec f FullMethod: "/helloworld.Greeter/SayHello", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - result := base.GetProxyImpl().Invoke(invo) + result := base.GetProxyImpl().Invoke(context.Background(), invo) return result.Result(), result.Error() } return interceptor(ctx, in, info, handler) diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go index 411fe16eddf434dc678f546088deaa2410fe0be5..26bc86f3aa46c8048b16284bd61cb5d9fb4664f9 100644 --- a/protocol/grpc/grpc_invoker.go +++ b/protocol/grpc/grpc_invoker.go @@ -54,7 +54,7 @@ func NewGrpcInvoker(url common.URL, client *Client) *GrpcInvoker { } // Invoke ... -func (gi *GrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.RPCResult ) diff --git a/protocol/grpc/grpc_invoker_test.go b/protocol/grpc/grpc_invoker_test.go index 4f97e1063191692ce5f47e0d4f8242d95cc8a6fc..5d4b97051438f8404cd8fd89bcf73d24e0121868 100644 --- a/protocol/grpc/grpc_invoker_test.go +++ b/protocol/grpc/grpc_invoker_test.go @@ -49,7 +49,7 @@ func TestInvoke(t *testing.T) { bizReply := &internal.HelloReply{} invo := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("SayHello"), invocation.WithParameterValues(args), invocation.WithReply(bizReply)) - res := invoker.Invoke(invo) + res := invoker.Invoke(context.Background(), invo) assert.Nil(t, res.Error()) assert.NotNil(t, res.Result()) assert.Equal(t, "Hello request name", bizReply.Message) diff --git a/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go b/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go index 4ed55ab7612200d28816508e4c4fcb7de0a803c0..f5d3a49b0916050fc6b2e6373fde0b70df0a1c31 100644 --- a/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go +++ b/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go @@ -271,7 +271,7 @@ func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec f args = append(args, in) invo := invocation.NewRPCInvocation("SayHello", args, nil) if interceptor == nil { - result := base.GetProxyImpl().Invoke(invo) + result := base.GetProxyImpl().Invoke(context.Background(), invo) return result.Result(), result.Error() } info := &grpc.UnaryServerInfo{ @@ -279,7 +279,7 @@ func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec f FullMethod: "/main.Greeter/SayHello", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - result := base.GetProxyImpl().Invoke(invo) + result := base.GetProxyImpl().Invoke(context.Background(), invo) return result.Result(), result.Error() } return interceptor(ctx, in, info, handler) diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 5937b7e225fee582d52ec63d3318671ba3374820..0f42e96d5453229591a47fbc0a3c8f794312fa4a 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -26,9 +26,9 @@ import ( "github.com/apache/dubbo-go/protocol" ) -///////////////////////////// +// /////////////////////////// // Invocation Impletment of RPC -///////////////////////////// +// /////////////////////////// // todo: is it necessary to separate fields of consumer(provider) from RPCInvocation type RPCInvocation struct { methodName string @@ -139,9 +139,9 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { r.callBack = c } -/////////////////////////// +// ///////////////////////// // option -/////////////////////////// +// ///////////////////////// type option func(invo *RPCInvocation) diff --git a/protocol/invoker.go b/protocol/invoker.go index 11793e7e430e35b64bed03b965134a81eedfd0d4..6805f3fd034ac553e701b7dcbc4e23d93adb1c63 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -17,6 +17,9 @@ package protocol +import ( + "context" +) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" @@ -26,7 +29,7 @@ import ( // Extension - Invoker type Invoker interface { common.Node - Invoke(Invocation) Result + Invoke(context.Context, Invocation) Result } ///////////////////////////// @@ -65,7 +68,7 @@ func (bi *BaseInvoker) IsDestroyed() bool { } // Invoke ... -func (bi *BaseInvoker) Invoke(invocation Invocation) Result { +func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result { return &RPCResult{} } diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 5712be97501b42d3f0c56d94bfe515f803f91608..7ae825e1eb0f4846982bad3237bc0197024b073d 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -33,17 +33,19 @@ import ( ) import ( + "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" ) -////////////////////////////////////////////// +// //////////////////////////////////////////// // Request -////////////////////////////////////////////// +// //////////////////////////////////////////// // Request ... type Request struct { @@ -57,9 +59,9 @@ type Request struct { contentType string } -////////////////////////////////////////////// +// //////////////////////////////////////////// // HTTP Client -////////////////////////////////////////////// +// //////////////////////////////////////////// // HTTPOptions ... type HTTPOptions struct { @@ -130,6 +132,13 @@ func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, } } + if span := opentracing.SpanFromContext(ctx); span != nil { + err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(httpHeader)) + if err != nil { + logger.Error("Could not inject the Context into http header.") + } + } + // body codec := newJsonClientCodec() codecData := CodecData{ diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 9be55e247a730460a3adee5622fa978ef2defbfb..12ddb2dbc858aac26fb4243ecc6578b548e3af01 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -25,6 +25,7 @@ import ( ) import ( + "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -74,6 +75,7 @@ func TestHTTPClient_Call(t *testing.T) { "X-Services": url.Path, "X-Method": "GetUser", }) + req := client.NewRequest(url, "GetUser", []interface{}{"1", "username"}) reply := &User{} err = client.Call(ctx, url, req, reply) @@ -147,6 +149,10 @@ func TestHTTPClient_Call(t *testing.T) { "X-Services": url.Path, "X-Method": "GetUser4", }) + + span := opentracing.StartSpan("Test-Inject-Tracing-ID") + ctx = opentracing.ContextWithSpan(ctx, span) + req = client.NewRequest(url, "GetUser4", []interface{}{1}) reply = &User{} err = client.Call(ctx, url, req, reply) diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index e7a9c408df0ed06d685ba81654dd14a9c656125d..b6e194ce0e93e84c164eccf8574e5eb20430f6e8 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -44,7 +44,7 @@ func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker { } // Invoke ... -func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.RPCResult @@ -53,12 +53,12 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result inv := invocation.(*invocation_impl.RPCInvocation) url := ji.GetUrl() req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments()) - ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ + ctxNew := context.WithValue(ctx, constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", "X-Services": url.Path, "X-Method": inv.MethodName(), }) - result.Err = ji.client.Call(ctx, url, req, inv.Reply()) + result.Err = ji.client.Call(ctxNew, url, req, inv.Reply()) if result.Err == nil { result.Rest = inv.Reply() } diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go index 8c910339858f4960ad0e394ae6271863d7654adc..9eed22e67155f1b0915cbb398bcef55962258407 100644 --- a/protocol/jsonrpc/jsonrpc_invoker_test.go +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -60,7 +60,7 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) { jsonInvoker := NewJsonrpcInvoker(url, client) user := &User{} - res := jsonInvoker.Invoke(invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}), + res := jsonInvoker.Invoke(context.Background(), invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}), invocation.WithReply(user))) assert.NoError(t, res.Error()) diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index ef36dc3dec2ac77de2143c985221e9088b9670ed..2e1bc16986abe1933088dd76ddf31c39e87a9f06 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -32,6 +32,7 @@ import ( ) import ( + "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" ) @@ -151,6 +152,13 @@ func (s *Server) handlePkg(conn net.Conn) { } ctx := context.Background() + + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(r.Header)) + if err == nil { + ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx) + } + if len(reqHeader["Timeout"]) > 0 { timeout, err := time.ParseDuration(reqHeader["Timeout"]) if err == nil { @@ -334,10 +342,9 @@ func serveRequest(ctx context.Context, exporter, _ := jsonrpcProtocol.ExporterMap().Load(path) invoker := exporter.(*JsonrpcExporter).GetInvoker() if invoker != nil { - result := invoker.Invoke(invocation.NewRPCInvocation(methodName, args, map[string]string{ + result := invoker.Invoke(ctx, invocation.NewRPCInvocation(methodName, args, map[string]string{ constant.PATH_KEY: path, - constant.VERSION_KEY: codec.req.Version, - })) + constant.VERSION_KEY: codec.req.Version})) if err := result.Error(); err != nil { rspStream, err := codec.Write(err.Error(), invalidRequest) if err != nil { diff --git a/protocol/mock/mock_invoker.go b/protocol/mock/mock_invoker.go index c509cef054f5a23fe504486e01d7cc0e8772711d..5c5b476b7b07f6c41a74a7ec8f51648aff84b1a3 100644 --- a/protocol/mock/mock_invoker.go +++ b/protocol/mock/mock_invoker.go @@ -21,6 +21,7 @@ package mock import ( + "context" "reflect" ) @@ -91,7 +92,7 @@ func (mr *MockInvokerMockRecorder) Destroy() *gomock.Call { } // Invoke mocks base method -func (m *MockInvoker) Invoke(arg0 protocol.Invocation) protocol.Result { +func (m *MockInvoker) Invoke(ctx context.Context, arg0 protocol.Invocation) protocol.Result { ret := m.ctrl.Call(m, "Invoke", arg0) ret0, _ := ret[0].(protocol.Result) return ret0 diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 1174ada4dfaeeb644d614696496dcacb78b9a9d6..08479fe9d139e31821a0a41a851ded52687f87b3 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -18,6 +18,7 @@ package protocolwrapper import ( + "context" "strings" ) @@ -110,9 +111,9 @@ func (fi *FilterInvoker) IsAvailable() bool { } // Invoke ... -func (fi *FilterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { - result := fi.filter.Invoke(fi.next, invocation) - return fi.filter.OnResponse(result, fi.invoker, invocation) +func (fi *FilterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + result := fi.filter.Invoke(ctx, fi.next, invocation) + return fi.filter.OnResponse(ctx, result, fi.invoker, invocation) } // Destroy ... diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index dc376313549c24da1cc6cb64a42e8445ef4fe346..8491d57462d47d6af72040d41b78dcb30e6da697 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -18,6 +18,7 @@ package protocolwrapper import ( + "context" "net/url" "testing" ) @@ -66,7 +67,7 @@ func init() { type EchoFilterForTest struct{} -func (ef *EchoFilterForTest) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *EchoFilterForTest) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking echo filter.") logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 { @@ -75,10 +76,10 @@ func (ef *EchoFilterForTest) Invoke(invoker protocol.Invoker, invocation protoco } } - return invoker.Invoke(invocation) + return invoker.Invoke(ctx, invocation) } -func (ef *EchoFilterForTest) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *EchoFilterForTest) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index a03b9c8c5a0ac15e164c5a30cb7d919a41feafce..748b8204d97e60c9803821290184fc5717c41025 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -18,6 +18,7 @@ package protocol import ( + "context" "strings" "sync" ) @@ -357,10 +358,10 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke } } -func (ivk *wrappedInvoker) Invoke(invocation protocol.Invocation) protocol.Result { +func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { // get right url ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) - return ivk.invoker.Invoke(invocation) + return ivk.invoker.Invoke(ctx, invocation) } type providerConfigurationListener struct {