diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index 673588ef241400323980011fcdb3afbf2afb166b..b84952a125bd694341a690f3212c1d1652904751 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -213,7 +213,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) //invocation := request.Data.(*invocation.RPCInvocation) var methodName string var args []interface{} - var attachments map[string]string = make(map[string]string) + attachments := make(map[string]interface{}) if req[impl.DubboVersionKey] != nil { //dubbo version request.Version = req[impl.DubboVersionKey].(string) @@ -225,7 +225,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) //method methodName = pkg.Service.Method args = req[impl.ArgsKey].([]interface{}) - attachments = req[impl.AttachmentsKey].(map[string]string) + attachments = req[impl.AttachmentsKey].(map[string]interface{}) invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), invocation.WithArguments(args), invocation.WithMethodName(methodName)) request.Data = invoc diff --git a/protocol/dubbo/impl/codec_test.go b/protocol/dubbo/impl/codec_test.go index 03e768dacd9743cfd0de6f65f468440698dca202..1c379282383c90ef27628af19ddaaafb2a4db9a6 100644 --- a/protocol/dubbo/impl/codec_test.go +++ b/protocol/dubbo/impl/codec_test.go @@ -78,5 +78,12 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { assert.Equal(t, "Method", pkgres.Service.Method) assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string)) assert.Equal(t, []interface{}{"a"}, reassembleBody["args"]) - assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string)) + tmpData := map[string]interface{}{ + "dubbo": "2.0.2", + "interface": "Service", + "path": "path", + "timeout": "1000", + "version": "2.6", + } + assert.Equal(t, tmpData, reassembleBody["attachments"]) } diff --git a/protocol/dubbo/impl/hessian.go b/protocol/dubbo/impl/hessian.go index 513421b95f197a0fa72c115f0e55ca2a253847ef..b686d5728d026fdb1e37180d5a9d24d32bf4b7bc 100644 --- a/protocol/dubbo/impl/hessian.go +++ b/protocol/dubbo/impl/hessian.go @@ -67,7 +67,11 @@ func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) { if p.IsHeartBeat() { encoder.Encode(nil) } else { - atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY]) + var version string + if attachmentVersion, ok := response.Attachments[DUBBO_VERSION_KEY]; ok { + version = attachmentVersion.(string) + } + atta := isSupportResponseAttachment(version) var resWithException, resValue, resNullValue int32 if atta { @@ -255,7 +259,7 @@ func unmarshalRequestBody(body []byte, p *DubboPackage) error { if v, ok := attachments.(map[interface{}]interface{}); ok { v[DUBBO_VERSION_KEY] = dubboVersion - req[6] = hessian.ToMapStringString(v) + req[6] = ToMapStringInterface(v) buildServerSidePackageBody(p) return nil } @@ -285,7 +289,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error { return perrors.WithStack(err) } if v, ok := attachments.(map[interface{}]interface{}); ok { - atta := hessian.ToMapStringString(v) + atta := ToMapStringInterface(v) response.Attachments = atta } else { return perrors.Errorf("get wrong attachments: %+v", attachments) @@ -310,7 +314,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error { return perrors.WithStack(err) } if v, ok := attachments.(map[interface{}]interface{}); ok { - atta := hessian.ToMapStringString(v) + atta := ToMapStringInterface(v) response.Attachments = atta } else { return perrors.Errorf("get wrong attachments: %+v", attachments) @@ -326,7 +330,7 @@ func unmarshalResponseBody(body []byte, p *DubboPackage) error { return perrors.WithStack(err) } if v, ok := attachments.(map[interface{}]interface{}); ok { - atta := hessian.ToMapStringString(v) + atta := ToMapStringInterface(v) response.Attachments = atta } else { return perrors.Errorf("get wrong attachments: %+v", attachments) @@ -342,7 +346,7 @@ func buildServerSidePackageBody(pkg *DubboPackage) { if len(req) > 0 { var dubboVersion, argsTypes string var args []interface{} - var attachments map[string]string + var attachments map[string]interface{} svc := Service{} if req[0] != nil { dubboVersion = req[0].(string) @@ -363,18 +367,18 @@ func buildServerSidePackageBody(pkg *DubboPackage) { args = req[5].([]interface{}) } if req[6] != nil { - attachments = req[6].(map[string]string) + attachments = req[6].(map[string]interface{}) } - if svc.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { - svc.Path = attachments[constant.PATH_KEY] + if svc.Path == "" && attachments[constant.PATH_KEY] != nil && len(attachments[constant.PATH_KEY].(string)) > 0 { + svc.Path = attachments[constant.PATH_KEY].(string) } if _, ok := attachments[constant.INTERFACE_KEY]; ok { - svc.Interface = attachments[constant.INTERFACE_KEY] + svc.Interface = attachments[constant.INTERFACE_KEY].(string) } else { svc.Interface = svc.Path } - if len(attachments[constant.GROUP_KEY]) > 0 { - svc.Group = attachments[constant.GROUP_KEY] + if _, ok := attachments[constant.GROUP_KEY]; ok { + svc.Group = attachments[constant.GROUP_KEY].(string) } pkg.SetService(svc) pkg.SetBody(map[string]interface{}{ @@ -503,6 +507,20 @@ func getArgType(v interface{}) string { // return "java.lang.RuntimeException" } +func ToMapStringInterface(origin map[interface{}]interface{}) map[string]interface{} { + dest := make(map[string]interface{}, len(origin)) + for k, v := range origin { + if kv, ok := k.(string); ok { + if v == nil { + dest[kv] = "" + continue + } + dest[kv] = v + } + } + return dest +} + func init() { SetSerializer("hessian2", HessianSerializer{}) } diff --git a/protocol/dubbo/impl/request.go b/protocol/dubbo/impl/request.go index 0e770c3afbf5c0a714b9b98eb5094695e11950b5..ef520083e6457f0ceaf3e80778f79d3e0ce686ab 100644 --- a/protocol/dubbo/impl/request.go +++ b/protocol/dubbo/impl/request.go @@ -19,12 +19,12 @@ package impl type RequestPayload struct { Params interface{} - Attachments map[string]string + Attachments map[string]interface{} } -func NewRequestPayload(args interface{}, atta map[string]string) *RequestPayload { +func NewRequestPayload(args interface{}, atta map[string]interface{}) *RequestPayload { if atta == nil { - atta = make(map[string]string) + atta = make(map[string]interface{}) } return &RequestPayload{ Params: args, diff --git a/protocol/dubbo/impl/response.go b/protocol/dubbo/impl/response.go index ea0a6efb23c74b2130b6dbf628363275aa9380e8..9fde1eb249c40e546bdabc54d15e4a3b6d1ea399 100644 --- a/protocol/dubbo/impl/response.go +++ b/protocol/dubbo/impl/response.go @@ -20,13 +20,13 @@ package impl type ResponsePayload struct { RspObj interface{} Exception error - Attachments map[string]string + Attachments map[string]interface{} } // NewResponse create a new ResponsePayload -func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]string) *ResponsePayload { +func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]interface{}) *ResponsePayload { if attachments == nil { - attachments = make(map[string]string) + attachments = make(map[string]interface{}) } return &ResponsePayload{ RspObj: rspObj, diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go index bde7d9e696fbe8c2fd2af065bad7c9105bad1cdd..b91fc9f4ccf69299870f9daf3707521d913cd4c0 100644 --- a/remoting/getty/dubbo_codec_for_test.go +++ b/remoting/getty/dubbo_codec_for_test.go @@ -205,7 +205,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err //invocation := request.Data.(*invocation.RPCInvocation) var methodName string var args []interface{} - var attachments map[string]string = make(map[string]string) + attachments := make(map[string]interface{}) if req[impl.DubboVersionKey] != nil { //dubbo version request.Version = req[impl.DubboVersionKey].(string) @@ -217,7 +217,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err //method methodName = pkg.Service.Method args = req[impl.ArgsKey].([]interface{}) - attachments = req[impl.AttachmentsKey].(map[string]string) + attachments = req[impl.AttachmentsKey].(map[string]interface{}) invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), invocation.WithArguments(args), invocation.WithMethodName(methodName)) request.Data = invoc diff --git a/remoting/getty/listener_test.go b/remoting/getty/listener_test.go index 7a54323d19f5e2b9833d01fbeb035d3e98529b58..7e7ac5fed440a02188057d520a944b48c8bf7b64 100644 --- a/remoting/getty/listener_test.go +++ b/remoting/getty/listener_test.go @@ -48,7 +48,9 @@ func TestRebuildCtx(t *testing.T) { span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client") - injectTraceCtx(span, inv) + err := injectTraceCtx(span, inv) + assert.NoError(t, err) + // rebuild the context success inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) ctx = rebuildCtx(inv) @@ -71,13 +73,3 @@ func rebuildCtx(inv *invocation.RPCInvocation) context.Context { } return ctx } - -func filterContext(attachments map[string]interface{}) map[string]string { - var traceAttchment = make(map[string]string) - for k, v := range attachments { - if r, ok := v.(string); ok { - traceAttchment[k] = r - } - } - return traceAttchment -} diff --git a/remoting/getty/opentracing.go b/remoting/getty/opentracing.go new file mode 100644 index 0000000000000000000000000000000000000000..7db733cbe919f2bef46cfc477bda836dc2da0d45 --- /dev/null +++ b/remoting/getty/opentracing.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package getty + +import ( + "github.com/opentracing/opentracing-go" +) +import ( + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" +) + +func injectTraceCtx(currentSpan opentracing.Span, inv *invocation_impl.RPCInvocation) error { + // inject opentracing ctx + traceAttachments := filterContext(inv.Attachments()) + carrier := opentracing.TextMapCarrier(traceAttachments) + err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier) + if err == nil { + fillTraceAttachments(inv.Attachments(), traceAttachments) + } + return err +} + +func extractTraceCtx(inv *invocation_impl.RPCInvocation) (opentracing.SpanContext, error) { + traceAttachments := filterContext(inv.Attachments()) + // actually, if user do not use any opentracing framework, the err will not be nil. + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, + opentracing.TextMapCarrier(traceAttachments)) + return spanCtx, err +} + +func filterContext(attachments map[string]interface{}) map[string]string { + var traceAttchment = make(map[string]string) + for k, v := range attachments { + if r, ok := v.(string); ok { + traceAttchment[k] = r + } + } + return traceAttchment +} + +func fillTraceAttachments(attachments map[string]interface{}, traceAttachment map[string]string) { + for k, v := range traceAttachment { + attachments[k] = v + } +}