diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 607ef007b5302fe07ff6090a463eab96d6ae4fa9..67d1d1e7f1710aef71f63063374a848e2981b828 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -24,6 +24,7 @@ import (
)
import (
+ "github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
@@ -72,6 +73,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
inv.SetAttachments(k, v)
}
}
+
+ // put the ctx into attachment
+ di.appendCtx(ctx, inv)
+
url := di.GetUrl()
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
@@ -112,3 +117,17 @@ func (di *DubboInvoker) Destroy() {
}
})
}
+
+// Finally, I made the decision that I don't provide a general way to transfer the whole context
+// because it could be misused. If the context contains to many key-value pairs, the performance will be much lower.
+func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCInvocation) {
+ // inject opentracing ctx
+ currentSpan := opentracing.SpanFromContext(ctx)
+ if currentSpan != nil {
+ carrier := opentracing.TextMapCarrier(inv.Attachments())
+ err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier)
+ if err != nil {
+ logger.Errorf("Could not inject the span context into attachments: %v", err)
+ }
+ }
+}
diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go
index e360d57b8cdd61674d35a665e8ee85e03421cc8f..1a64301f8200a4264001284cca1af3f0f1e07814 100644
--- a/protocol/dubbo/dubbo_invoker_test.go
+++ b/protocol/dubbo/dubbo_invoker_test.go
@@ -25,6 +25,7 @@ import (
)
import (
+ "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
@@ -81,6 +82,11 @@ func TestDubboInvoker_Invoke(t *testing.T) {
res = invoker.Invoke(context.Background(), inv)
assert.EqualError(t, res.Error(), "request need @response")
+ // testing appendCtx
+ span, ctx := opentracing.StartSpanFromContext(context.Background(), "TestOperation")
+ invoker.Invoke(ctx, inv)
+ span.Finish()
+
// destroy
lock.Lock()
proto.Destroy()
diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go
index 204e8a1c5d2607d3158ff4f68334a39fd1fb7861..430c4e49d81d4d5d151a545e1a130fd4ac3fbdc5 100644
--- a/protocol/dubbo/listener.go
+++ b/protocol/dubbo/listener.go
@@ -29,6 +29,7 @@ import (
import (
"github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
+ "github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
@@ -63,9 +64,9 @@ func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}
-////////////////////////////////////////////
+// //////////////////////////////////////////
// RpcClientHandler
-////////////////////////////////////////////
+// //////////////////////////////////////////
// RpcClientHandler ...
type RpcClientHandler struct {
@@ -157,9 +158,9 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
h.conn.pool.rpcClient.heartbeat(session)
}
-////////////////////////////////////////////
+// //////////////////////////////////////////
// RpcServerHandler
-////////////////////////////////////////////
+// //////////////////////////////////////////
// RpcServerHandler ...
type RpcServerHandler struct {
@@ -284,7 +285,10 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
args := p.Body.(map[string]interface{})["args"].([]interface{})
inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
- result := invoker.Invoke(context.Background(), inv)
+
+ ctx := rebuildCtx(inv)
+
+ result := invoker.Invoke(ctx, inv)
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_OK
p.Body = hessian.NewResponse(nil, err, result.Attachments())
@@ -327,6 +331,21 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+ ctx := context.Background()
+
+ // actually, if user do not use any opentracing framework, the err will not be nil.
+ spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+ opentracing.TextMapCarrier(inv.Attachments()))
+ if err == nil {
+ ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+ }
+ return ctx
+}
+
func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
diff --git a/protocol/dubbo/listener_test.go b/protocol/dubbo/listener_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..5f809814607558650e09934019db96dbb2ceeeae
--- /dev/null
+++ b/protocol/dubbo/listener_test.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/mocktracer"
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/protocol/invocation"
+)
+
+// test rebuild the ctx
+func TestRebuildCtx(t *testing.T) {
+ opentracing.SetGlobalTracer(mocktracer.New())
+ attach := make(map[string]string, 10)
+ attach[constant.VERSION_KEY] = "1.0"
+ attach[constant.GROUP_KEY] = "MyGroup"
+ inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
+
+ // attachment doesn't contains any tracing key-value pair,
+ ctx := rebuildCtx(inv)
+ assert.NotNil(t, ctx)
+ assert.Nil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX))
+
+ span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client")
+
+ opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap,
+ opentracing.TextMapCarrier(inv.Attachments()))
+ // rebuild the context success
+ inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
+ ctx = rebuildCtx(inv)
+ span.Finish()
+ assert.NotNil(t, ctx)
+ assert.NotNil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX))
+}