Skip to content
Snippets Groups Projects
Commit ed11114b authored by flycash's avatar flycash
Browse files

Pass Context from client

parent 938e7863
No related branches found
No related tags found
No related merge requests found
......@@ -137,3 +137,7 @@ const (
NACOS_PROTOCOL_KEY = "protocol"
NACOS_PATH_KEY = "path"
)
const (
TRACING_CURRENT_SPAN_CTX = "tracing.current.span"
)
......@@ -18,6 +18,7 @@
package proxy
import (
"context"
"reflect"
"sync"
)
......@@ -94,8 +95,11 @@ func (p *Proxy) Implement(v common.RPCService) {
start := 0
end := len(in)
invCtx := context.Background()
if end > 0 {
if in[0].Type().String() == "context.Context" {
// inVArr can not be nil
invCtx = in[0].Interface().(context.Context)
start += 1
}
if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr {
......@@ -124,6 +128,7 @@ func (p *Proxy) Implement(v common.RPCService) {
inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()),
invocation_impl.WithCallBack(p.callBack), invocation_impl.WithParameterValues(inVArr))
inv.SetContext(invCtx)
for k, value := range p.attachments {
inv.SetAttachments(k, value)
......
......@@ -19,6 +19,9 @@ package filter_impl
import (
"context"
"time"
"github.com/apache/dubbo-go/common/constant"
)
import (
......@@ -52,16 +55,28 @@ type tracingFilter struct {
func (tf *tracingFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
operationName := invoker.GetUrl().ServiceKey() + invocation.MethodName()
operationName := invoker.GetUrl().ServiceKey() + "#" + invocation.MethodName()
// withTimeout after we support the timeout between different ends.
invCtx, cancel := context.WithCancel(invocation.Context())
span, spanCtx := opentracing.StartSpanFromContext(invCtx, operationName)
invocation.SetContext(spanCtx)
wiredCtx := invCtx.Value(constant.TRACING_CURRENT_SPAN_CTX)
var span opentracing.Span
if wiredCtx == nil {
var spanCtx context.Context
span, spanCtx = opentracing.StartSpanFromContext(invCtx, operationName)
invocation.SetContext(spanCtx)
} else {
// it means that the client passed the SpanContext in their request
span = opentracing.StartSpan(operationName, opentracing.ChildOf(wiredCtx.(opentracing.SpanContext)))
}
defer func() {
span.Finish()
cancel()
}()
time.Sleep(100 * time.Millisecond)
result := invoker.Invoke(invocation)
span.SetTag(successKey, result.Error() != nil)
if result.Error() != nil {
......
......@@ -41,7 +41,7 @@ type RPCInvocation struct {
attachments map[string]string
invoker protocol.Invoker
lock sync.RWMutex
ctx context.Context
ctx context.Context
}
func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation {
......
......@@ -30,6 +30,10 @@ import (
"strings"
"sync/atomic"
"time"
"github.com/opentracing/opentracing-go"
"github.com/apache/dubbo-go/common/logger"
)
import (
......@@ -124,6 +128,13 @@ func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request,
}
}
if span := opentracing.SpanFromContext(ctx); span != nil {
err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(httpHeader))
if err != nil {
logger.Errorf("Could not inject the Context into http header.")
}
}
// body
codec := newJsonClientCodec()
codecData := CodecData{
......
......@@ -50,12 +50,12 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result
inv := invocation.(*invocation_impl.RPCInvocation)
url := ji.GetUrl()
req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments())
ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
ctxNew := context.WithValue(invocation.Context(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Services": url.Path,
"X-Method": inv.MethodName(),
})
result.Err = ji.client.Call(ctx, url, req, inv.Reply())
result.Err = ji.client.Call(ctxNew, url, req, inv.Reply())
if result.Err == nil {
result.Rest = inv.Reply()
}
......
......@@ -60,7 +60,8 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) {
jsonInvoker := NewJsonrpcInvoker(url, client)
user := &User{}
res := jsonInvoker.Invoke(invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}),
res := jsonInvoker.Invoke(invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"),
invocation.WithArguments([]interface{}{"1", "username"}),
invocation.WithReply(user)))
assert.NoError(t, res.Error())
......
......@@ -29,6 +29,8 @@ import (
"runtime/debug"
"sync"
"time"
"github.com/opentracing/opentracing-go"
)
import (
......@@ -149,6 +151,13 @@ func (s *Server) handlePkg(conn net.Conn) {
}
ctx := context.Background()
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header))
if err == nil {
ctx = context.WithValue(ctx, constant.TRACING_CURRENT_SPAN_CTX, spanCtx)
}
if len(reqHeader["Timeout"]) > 0 {
timeout, err := time.ParseDuration(reqHeader["Timeout"])
if err == nil {
......@@ -330,10 +339,12 @@ func serveRequest(ctx context.Context,
exporter, _ := jsonrpcProtocol.ExporterMap().Load(path)
invoker := exporter.(*JsonrpcExporter).GetInvoker()
if invoker != nil {
result := invoker.Invoke(invocation.NewRPCInvocation(methodName, args, map[string]string{
invc := invocation.NewRPCInvocation(methodName, args, map[string]string{
constant.PATH_KEY: path,
constant.VERSION_KEY: codec.req.Version,
}))
})
invc.SetContext(ctx)
result := invoker.Invoke(invc)
if err := result.Error(); err != nil {
rspStream, err := codec.Write(err.Error(), invalidRequest)
if err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment