diff --git a/common/constant/key.go b/common/constant/key.go index 7538a2995a89b0951c29f532b9ce9e475198f54e..b696952c432ade1e412092edd49ba99ce1e88429 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -137,3 +137,7 @@ const ( NACOS_PROTOCOL_KEY = "protocol" NACOS_PATH_KEY = "path" ) + +const ( + TRACING_CURRENT_SPAN_CTX = "tracing.current.span" +) diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 0b5e2860495604207dc8e4a384225e2fca47df1a..4e4a0771191e50d8e35aacc82d990024db52ebd2 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -18,6 +18,7 @@ package proxy import ( + "context" "reflect" "sync" ) @@ -94,8 +95,11 @@ func (p *Proxy) Implement(v common.RPCService) { start := 0 end := len(in) + invCtx := context.Background() if end > 0 { if in[0].Type().String() == "context.Context" { + // inVArr can not be nil + invCtx = in[0].Interface().(context.Context) start += 1 } if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr { @@ -124,6 +128,7 @@ func (p *Proxy) Implement(v common.RPCService) { inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName), invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()), invocation_impl.WithCallBack(p.callBack), invocation_impl.WithParameterValues(inVArr)) + inv.SetContext(invCtx) for k, value := range p.attachments { inv.SetAttachments(k, value) diff --git a/filter/filter_impl/tracing_filter.go b/filter/filter_impl/tracing_filter.go index dc1e8c314a01e68e3eae0e525834607603755e14..77131f0fbd18457d948be3f6d6fb6efaaf90f14d 100644 --- a/filter/filter_impl/tracing_filter.go +++ b/filter/filter_impl/tracing_filter.go @@ -19,6 +19,9 @@ package filter_impl import ( "context" + "time" + + "github.com/apache/dubbo-go/common/constant" ) import ( @@ -52,16 +55,28 @@ type tracingFilter struct { func (tf *tracingFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - operationName := invoker.GetUrl().ServiceKey() + invocation.MethodName() + operationName := invoker.GetUrl().ServiceKey() + "#" + invocation.MethodName() + // withTimeout after we support the timeout between different ends. invCtx, cancel := context.WithCancel(invocation.Context()) - span, spanCtx := opentracing.StartSpanFromContext(invCtx, operationName) - invocation.SetContext(spanCtx) + + wiredCtx := invCtx.Value(constant.TRACING_CURRENT_SPAN_CTX) + var span opentracing.Span + if wiredCtx == nil { + var spanCtx context.Context + span, spanCtx = opentracing.StartSpanFromContext(invCtx, operationName) + invocation.SetContext(spanCtx) + } else { + // it means that the client passed the SpanContext in their request + span = opentracing.StartSpan(operationName, opentracing.ChildOf(wiredCtx.(opentracing.SpanContext))) + } + defer func() { span.Finish() cancel() }() + time.Sleep(100 * time.Millisecond) result := invoker.Invoke(invocation) span.SetTag(successKey, result.Error() != nil) if result.Error() != nil { diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index c3a508b2aa27c523499b558728fd51d945023f61..4c78ad69efcb0a331ff8bfd951b93cc0fe482887 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -41,7 +41,7 @@ type RPCInvocation struct { attachments map[string]string invoker protocol.Invoker lock sync.RWMutex - ctx context.Context + ctx context.Context } func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation { diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 3d99786624c71818cc5f787c8695d1c116c35707..368bbe5d835ccd746fbfca6e0c013884614062cc 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -30,6 +30,10 @@ import ( "strings" "sync/atomic" "time" + + "github.com/opentracing/opentracing-go" + + "github.com/apache/dubbo-go/common/logger" ) import ( @@ -124,6 +128,13 @@ func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, } } + if span := opentracing.SpanFromContext(ctx); span != nil { + err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(httpHeader)) + if err != nil { + logger.Errorf("Could not inject the Context into http header.") + } + } + // body codec := newJsonClientCodec() codecData := CodecData{ diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index 2c130e0d7617e96a1724edc5b63f8e66f251446e..6c21cadf022e7593dd34fd1374a8660678f26d7e 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -50,12 +50,12 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result inv := invocation.(*invocation_impl.RPCInvocation) url := ji.GetUrl() req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments()) - ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ + ctxNew := context.WithValue(invocation.Context(), constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", "X-Services": url.Path, "X-Method": inv.MethodName(), }) - result.Err = ji.client.Call(ctx, url, req, inv.Reply()) + result.Err = ji.client.Call(ctxNew, url, req, inv.Reply()) if result.Err == nil { result.Rest = inv.Reply() } diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go index 8c910339858f4960ad0e394ae6271863d7654adc..d687c53a60b70b2d63fae822a35ebf5416122e56 100644 --- a/protocol/jsonrpc/jsonrpc_invoker_test.go +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -60,7 +60,8 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) { jsonInvoker := NewJsonrpcInvoker(url, client) user := &User{} - res := jsonInvoker.Invoke(invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}), + res := jsonInvoker.Invoke(invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), + invocation.WithArguments([]interface{}{"1", "username"}), invocation.WithReply(user))) assert.NoError(t, res.Error()) diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index dc79e4a36bd6cce575d50588d11b003cb8e25abe..6e32b80426cdff63646a07be95fe66fb2ccc97e1 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -29,6 +29,8 @@ import ( "runtime/debug" "sync" "time" + + "github.com/opentracing/opentracing-go" ) import ( @@ -149,6 +151,13 @@ func (s *Server) handlePkg(conn net.Conn) { } ctx := context.Background() + + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(r.Header)) + if err == nil { + ctx = context.WithValue(ctx, constant.TRACING_CURRENT_SPAN_CTX, spanCtx) + } + if len(reqHeader["Timeout"]) > 0 { timeout, err := time.ParseDuration(reqHeader["Timeout"]) if err == nil { @@ -330,10 +339,12 @@ func serveRequest(ctx context.Context, exporter, _ := jsonrpcProtocol.ExporterMap().Load(path) invoker := exporter.(*JsonrpcExporter).GetInvoker() if invoker != nil { - result := invoker.Invoke(invocation.NewRPCInvocation(methodName, args, map[string]string{ + invc := invocation.NewRPCInvocation(methodName, args, map[string]string{ constant.PATH_KEY: path, constant.VERSION_KEY: codec.req.Version, - })) + }) + invc.SetContext(ctx) + result := invoker.Invoke(invc) if err := result.Error(); err != nil { rspStream, err := codec.Write(err.Error(), invalidRequest) if err != nil {