diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go index cd201a42c759354ca536ea3e9e77116d89ea8b4b..7f77f33166de293836c15391f5eedd5a18084dbe 100644 --- a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go +++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go @@ -44,7 +44,7 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { //defer ctrl.Finish() mockResult := &protocol.RPCResult{ - Attrs: map[string]string{constant.PREFERRED_KEY: "true"}, + Attrs: map[string]interface{}{constant.PREFERRED_KEY: "true"}, Rest: rest{tried: 0, success: true}} var invokers []protocol.Invoker @@ -99,7 +99,7 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( func(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ - Attrs: map[string]string{constant.WEIGHT_KEY: w1}, + Attrs: map[string]interface{}{constant.WEIGHT_KEY: w1}, Rest: rest{tried: 0, success: true}} }).MaxTimes(100) } else { @@ -107,7 +107,7 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( func(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ - Attrs: map[string]string{constant.WEIGHT_KEY: w2}, + Attrs: map[string]interface{}{constant.WEIGHT_KEY: w2}, Rest: rest{tried: 0, success: true}} }).MaxTimes(100) } @@ -154,7 +154,7 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( func(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ - Attrs: map[string]string{constant.ZONE_KEY: zoneValue}, + Attrs: map[string]interface{}{constant.ZONE_KEY: zoneValue}, Rest: rest{tried: 0, success: true}} }) invokers = append(invokers, invoker) diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 12e6eda3e7fcb7ad28ded327ddfeb07288be5cd9..9f92e83b210fd2bc04c30769f115601dcb253766 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -361,7 +361,9 @@ func isAnyHost(addr string) bool { func findTag(invocation protocol.Invocation, consumerUrl *common.URL) string { tag, ok := invocation.Attachments()[constant.Tagkey] if !ok { - tag = consumerUrl.GetParam(constant.Tagkey, "") + return consumerUrl.GetParam(constant.Tagkey, "") + } else if v, t := tag.(string); t { + return v } - return tag + return "" } diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index ce0f4d1d3f4dc8b93467aaeede40ea03b53c6e66..d51ce1cc1bf40a8ad25804c797eeed3b88e7d132 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -145,12 +145,17 @@ func (p *Proxy) Implement(v common.RPCService) { inv.SetAttachments(k, value) } - // add user setAttachment + // add user setAttachment. It is compatibility with previous versions. atm := invCtx.Value(constant.AttachmentKey) if m, ok := atm.(map[string]string); ok { for k, value := range m { inv.SetAttachments(k, value) } + } else if m2, ok2 := atm.(map[string]interface{}); ok2 { + // it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7. + for k, value := range m2 { + inv.SetAttachments(k, value) + } } result := p.invoke.Invoke(invCtx, inv) diff --git a/common/proxy/proxy_test.go b/common/proxy/proxy_test.go index 14b2befbc47242d9cc9a2f88e9070b84828062c0..c6066157fd8d2147fd3a319c8d48fdd910752711 100644 --- a/common/proxy/proxy_test.go +++ b/common/proxy/proxy_test.go @@ -24,6 +24,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/protocol/invocation" perrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -32,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/dubbo/hessian2" ) type TestService struct { @@ -40,6 +42,7 @@ type TestService struct { MethodThree func(int, bool) (interface{}, error) MethodFour func(int, bool) (*interface{}, error) `dubbo:"methodFour"` MethodFive func() error + MethodSix func(context.Context, string) (interface{}, error) Echo func(interface{}, *interface{}) error } @@ -120,3 +123,34 @@ func TestProxyImplement(t *testing.T) { assert.Nil(t, s3.MethodOne) } + +func TestProxyImplementForContext(t *testing.T) { + invoker := &TestProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(common.URL{}), + } + p := NewProxy(invoker, nil, map[string]string{constant.ASYNC_KEY: "false"}) + s := &TestService{} + p.Implement(s) + attahments1 := make(map[string]interface{}, 4) + attahments1["k1"] = "v1" + attahments1["k2"] = "v2" + context := context.WithValue(context.Background(), constant.AttachmentKey, attahments1) + r, err := p.Get().(*TestService).MethodSix(context, "xxx") + v1 := r.(map[string]interface{}) + assert.NoError(t, err) + assert.Equal(t, v1["TestProxyInvoker"], "TestProxyInvokerValue") +} + +type TestProxyInvoker struct { + protocol.BaseInvoker +} + +func (bi *TestProxyInvoker) Invoke(context context.Context, inv protocol.Invocation) protocol.Result { + rpcInv := inv.(*invocation.RPCInvocation) + mapV := inv.Attachments() + mapV["TestProxyInvoker"] = "TestProxyInvokerValue" + hessian2.ReflectResponse(mapV, rpcInv.Reply()) + return &protocol.RPCResult{ + Rest: inv.Arguments(), + } +} diff --git a/filter/filter_impl/access_log_filter.go b/filter/filter_impl/access_log_filter.go index 621012c24c0ad12b3bda397148a3ed9c29d080ed..6eaf9cb00bafe8fb0d4b9f8cda50e6bc7115461b 100644 --- a/filter/filter_impl/access_log_filter.go +++ b/filter/filter_impl/access_log_filter.go @@ -105,13 +105,27 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) { func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string { dataMap := make(map[string]string, 16) attachments := invocation.Attachments() - dataMap[constant.INTERFACE_KEY] = attachments[constant.INTERFACE_KEY] - dataMap[constant.METHOD_KEY] = invocation.MethodName() - dataMap[constant.VERSION_KEY] = attachments[constant.VERSION_KEY] - dataMap[constant.GROUP_KEY] = attachments[constant.GROUP_KEY] - dataMap[constant.TIMESTAMP_KEY] = time.Now().Format(MessageDateLayout) - dataMap[constant.LOCAL_ADDR], _ = attachments[constant.LOCAL_ADDR] - dataMap[constant.REMOTE_ADDR], _ = attachments[constant.REMOTE_ADDR] + if v, ok := attachments[constant.INTERFACE_KEY]; ok && v != nil { + dataMap[constant.INTERFACE_KEY] = v.(string) + } + if v, ok := attachments[constant.METHOD_KEY]; ok && v != nil { + dataMap[constant.METHOD_KEY] = v.(string) + } + if v, ok := attachments[constant.VERSION_KEY]; ok && v != nil { + dataMap[constant.VERSION_KEY] = v.(string) + } + if v, ok := attachments[constant.GROUP_KEY]; ok && v != nil { + dataMap[constant.GROUP_KEY] = v.(string) + } + if v, ok := attachments[constant.TIMESTAMP_KEY]; ok && v != nil { + dataMap[constant.TIMESTAMP_KEY] = v.(string) + } + if v, ok := attachments[constant.LOCAL_ADDR]; ok && v != nil { + dataMap[constant.LOCAL_ADDR] = v.(string) + } + if v, ok := attachments[constant.REMOTE_ADDR]; ok && v != nil { + dataMap[constant.REMOTE_ADDR] = v.(string) + } if len(invocation.Arguments()) > 0 { builder := strings.Builder{} diff --git a/filter/filter_impl/access_log_filter_test.go b/filter/filter_impl/access_log_filter_test.go index 55c328cc30ae892c603fcc65034e48d2a52403d2..a3a6151aa1b6a933c57543248f3703125fa356d9 100644 --- a/filter/filter_impl/access_log_filter_test.go +++ b/filter/filter_impl/access_log_filter_test.go @@ -45,7 +45,7 @@ func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) { "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker := protocol.NewBaseInvoker(url) - attach := make(map[string]string, 10) + attach := make(map[string]interface{}, 10) inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) accessLogFilter := GetAccessLogFilter() @@ -64,7 +64,7 @@ func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) { "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker := protocol.NewBaseInvoker(url) - attach := make(map[string]string, 10) + attach := make(map[string]interface{}, 10) attach[constant.VERSION_KEY] = "1.0" attach[constant.GROUP_KEY] = "MyGroup" inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) diff --git a/filter/filter_impl/active_filter_test.go b/filter/filter_impl/active_filter_test.go index 6b72830e6a1a523b775b9294863ab18f8fe518a2..9837a49c72e28c7a7209f8af6059bdc30c222cc2 100644 --- a/filter/filter_impl/active_filter_test.go +++ b/filter/filter_impl/active_filter_test.go @@ -37,7 +37,7 @@ import ( ) func TestActiveFilterInvoke(t *testing.T) { - invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}, 0)) url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") filter := ActiveFilter{} ctrl := gomock.NewController(t) @@ -53,7 +53,7 @@ func TestActiveFilterInvoke(t *testing.T) { func TestActiveFilterOnResponse(t *testing.T) { c := protocol.CurrentTimeMillis() elapsed := 100 - invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]string{ + invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]interface{}{ dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10), }) url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") diff --git a/filter/filter_impl/auth/default_authenticator_test.go b/filter/filter_impl/auth/default_authenticator_test.go index 5b107b5960ff5adc383d52aa5e393d9fc6e71d14..8b0fb6b0e3de04ccb2c30d2c7e22c32af5733742 100644 --- a/filter/filter_impl/auth/default_authenticator_test.go +++ b/filter/filter_impl/auth/default_authenticator_test.go @@ -52,7 +52,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) { var authenticator = &DefaultAuthenticator{} - invcation := invocation.NewRPCInvocation("test", parmas, map[string]string{ + invcation := invocation.NewRPCInvocation("test", parmas, map[string]interface{}{ constant.REQUEST_SIGNATURE_KEY: signature, constant.CONSUMER: "test", constant.REQUEST_TIMESTAMP_KEY: requestTime, @@ -61,7 +61,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) { err := authenticator.Authenticate(invcation, &testurl) assert.Nil(t, err) // modify the params - invcation = invocation.NewRPCInvocation("test", parmas[:1], map[string]string{ + invcation = invocation.NewRPCInvocation("test", parmas[:1], map[string]interface{}{ constant.REQUEST_SIGNATURE_KEY: signature, constant.CONSUMER: "test", constant.REQUEST_TIMESTAMP_KEY: requestTime, @@ -119,7 +119,7 @@ func Test_getAccessKeyPairFailed(t *testing.T) { func Test_getSignatureWithinParams(t *testing.T) { testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0") testurl.SetParam(constant.PARAMTER_SIGNATURE_ENABLE_KEY, "true") - inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]string{ + inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]interface{}{ "": "", }) secret := "dubbo" diff --git a/filter/filter_impl/auth/provider_auth_test.go b/filter/filter_impl/auth/provider_auth_test.go index 626782ae8390f046f441c1f162700a883e6f22d0..f6ebfcd7abb83a55b4a09c1ec2d6c71b88bf7727 100644 --- a/filter/filter_impl/auth/provider_auth_test.go +++ b/filter/filter_impl/auth/provider_auth_test.go @@ -54,7 +54,7 @@ func TestProviderAuthFilter_Invoke(t *testing.T) { requestTime := strconv.Itoa(int(time.Now().Unix() * 1000)) signature, _ := getSignature(&url, inv, secret, requestTime) - inv = invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]string{ + inv = invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]interface{}{ constant.REQUEST_SIGNATURE_KEY: signature, constant.CONSUMER: "test", constant.REQUEST_TIMESTAMP_KEY: requestTime, diff --git a/filter/filter_impl/execute_limit_filter_test.go b/filter/filter_impl/execute_limit_filter_test.go index d36d6edef1ec52c24a9ccd64233b4620b4f10bc7..953f5e1cc8f6ff3299dcac21c5cd2a41de08cdc1 100644 --- a/filter/filter_impl/execute_limit_filter_test.go +++ b/filter/filter_impl/execute_limit_filter_test.go @@ -36,7 +36,7 @@ import ( func TestExecuteLimitFilterInvokeIgnored(t *testing.T) { methodName := "hello" - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), @@ -51,7 +51,7 @@ func TestExecuteLimitFilterInvokeIgnored(t *testing.T) { func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) { methodName := "hello1" - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), @@ -68,7 +68,7 @@ func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) { func TestExecuteLimitFilterInvoke(t *testing.T) { methodName := "hello1" - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), diff --git a/filter/filter_impl/graceful_shutdown_filter_test.go b/filter/filter_impl/graceful_shutdown_filter_test.go index 87ac2eac616a20617b7a5e68254a1f47ecb8ac17..220ef6f20857aa06bcc45d3ac0a9bd52b0d65af2 100644 --- a/filter/filter_impl/graceful_shutdown_filter_test.go +++ b/filter/filter_impl/graceful_shutdown_filter_test.go @@ -39,7 +39,7 @@ import ( ) func TestGenericFilterInvoke(t *testing.T) { - invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{}, 0)) invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{})) diff --git a/filter/filter_impl/metrics_filter_test.go b/filter/filter_impl/metrics_filter_test.go index 881106f4bc5a7890569be347122da5144e440c8b..ac10d52cf3c156e3580760a4409bab49bb4d0c4f 100644 --- a/filter/filter_impl/metrics_filter_test.go +++ b/filter/filter_impl/metrics_filter_test.go @@ -57,7 +57,7 @@ func TestMetricsFilterInvoke(t *testing.T) { "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker := protocol.NewBaseInvoker(url) - attach := make(map[string]string, 10) + attach := make(map[string]interface{}, 10) inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) ctx := context.Background() diff --git a/filter/filter_impl/seata_filter_test.go b/filter/filter_impl/seata_filter_test.go index 6c39897f7aa3a044490e9eece8f352a9db9fc74b..45817e95cbd2eaa7365adc8a299523af8310f797 100644 --- a/filter/filter_impl/seata_filter_test.go +++ b/filter/filter_impl/seata_filter_test.go @@ -48,8 +48,9 @@ func (iv *testMockSeataInvoker) Invoke(ctx context.Context, _ protocol.Invocatio func TestSeataFilter_Invoke(t *testing.T) { filter := getSeataFilter() - result := filter.Invoke(context.Background(), &testMockSeataInvoker{}, invocation.NewRPCInvocation("$echo", []interface{}{"OK"}, map[string]string{ - SEATA_XID: "10.30.21.227:8091:2000047792", - })) + result := filter.Invoke(context.Background(), &testMockSeataInvoker{}, invocation.NewRPCInvocation("$echo", + []interface{}{"OK"}, map[string]interface{}{ + SEATA_XID: "10.30.21.227:8091:2000047792", + })) assert.Equal(t, "10.30.21.227:8091:2000047792", result.Result()) } diff --git a/filter/filter_impl/token_filter.go b/filter/filter_impl/token_filter.go index fe4e38747ed10a40950c87747220339d0d566781..b5e05605c29905d9e66a63129f064c2f844f0e71 100644 --- a/filter/filter_impl/token_filter.go +++ b/filter/filter_impl/token_filter.go @@ -51,7 +51,7 @@ func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, inv if len(invokerTkn) > 0 { attachs := invocation.Attachments() remoteTkn, exist := attachs[constant.TOKEN_KEY] - if exist && strings.EqualFold(invokerTkn, remoteTkn) { + if exist && remoteTkn != nil && strings.EqualFold(invokerTkn, remoteTkn.(string)) { return invoker.Invoke(ctx, invocation) } return &protocol.RPCResult{Err: perrors.Errorf("Invalid token! Forbid invoke remote service %v method %s ", diff --git a/filter/filter_impl/token_filter_test.go b/filter/filter_impl/token_filter_test.go index c2f69bd03941b1404585dc5842c56eb2bf3c918f..cd1bba3d4a830822c67f1e6157653d5477264c94 100644 --- a/filter/filter_impl/token_filter_test.go +++ b/filter/filter_impl/token_filter_test.go @@ -40,7 +40,7 @@ func TestTokenFilterInvoke(t *testing.T) { url := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) attch[constant.TOKEN_KEY] = "ori_key" result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(*url), @@ -54,7 +54,7 @@ func TestTokenFilterInvokeEmptyToken(t *testing.T) { filter := GetTokenFilter() testUrl := common.URL{} - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) attch[constant.TOKEN_KEY] = "ori_key" result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) @@ -67,7 +67,7 @@ func TestTokenFilterInvokeEmptyAttach(t *testing.T) { testUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(*testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.NotNil(t, result.Error()) } @@ -78,7 +78,7 @@ func TestTokenFilterInvokeNotEqual(t *testing.T) { testUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TOKEN_KEY, "ori_key")) - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) attch[constant.TOKEN_KEY] = "err_key" result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(*testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) diff --git a/filter/filter_impl/tps/tps_limiter_method_service_test.go b/filter/filter_impl/tps/tps_limiter_method_service_test.go index edae99ec2d3157ad7f0d81c95a2fb181410475fa..61f28e442f4b76d18d7750aa58831c322f939207 100644 --- a/filter/filter_impl/tps/tps_limiter_method_service_test.go +++ b/filter/filter_impl/tps/tps_limiter_method_service_test.go @@ -36,7 +36,7 @@ import ( func TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) { methodName := "hello" - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -63,7 +63,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) { func TestMethodServiceTpsLimiterImplIsAllowableNoConfig(t *testing.T) { methodName := "hello1" - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) // ctrl := gomock.NewController(t) // defer ctrl.Finish() @@ -80,7 +80,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableNoConfig(t *testing.T) { func TestMethodServiceTpsLimiterImplIsAllowableMethodLevelOverride(t *testing.T) { methodName := "hello2" methodConfigPrefix := "methods." + methodName + "." - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -113,7 +113,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableMethodLevelOverride(t *testing.T) func TestMethodServiceTpsLimiterImplIsAllowableBothMethodAndService(t *testing.T) { methodName := "hello3" methodConfigPrefix := "methods." + methodName + "." - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]string, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/filter/filter_impl/tps_limit_filter_test.go b/filter/filter_impl/tps_limit_filter_test.go index 274e4e6de61b94079e9ad3b2f7a5bcd79a276cc6..da0fc482ce559dad52947786823f604128857c30 100644 --- a/filter/filter_impl/tps_limit_filter_test.go +++ b/filter/filter_impl/tps_limit_filter_test.go @@ -44,7 +44,7 @@ func TestTpsLimitFilterInvokeWithNoTpsLimiter(t *testing.T) { invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TPS_LIMITER_KEY, "")) - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) result := tpsFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), @@ -68,7 +68,7 @@ func TestGenericFilterInvokeWithDefaultTpsLimiter(t *testing.T) { invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) result := tpsFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), @@ -99,7 +99,7 @@ func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t *testing.T) { invokeUrl := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY)) - attch := make(map[string]string, 0) + attch := make(map[string]interface{}, 0) result := tpsFilter.Invoke(context.Background(), protocol.NewBaseInvoker(*invokeUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) diff --git a/filter/filter_impl/tracing_filter_test.go b/filter/filter_impl/tracing_filter_test.go index 57f4095d49be784d7688d2acf17c1ea0225d0000..e159b7400d46069018a00a849319423285072dc2 100644 --- a/filter/filter_impl/tracing_filter_test.go +++ b/filter/filter_impl/tracing_filter_test.go @@ -42,7 +42,7 @@ func TestTracingFilterInvoke(t *testing.T) { "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker := protocol.NewBaseInvoker(url) - attach := make(map[string]string, 10) + attach := make(map[string]interface{}, 10) inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) ctx := context.Background() tf := newTracingFilter() diff --git a/go.mod b/go.mod index c9737b9af90178cf4e689771e962fc7590d875a2..bd7796d5410119b1407b84684e28b7e66bd234cf 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/apache/dubbo-getty v1.3.10 - github.com/apache/dubbo-go-hessian2 v1.6.2 + github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44 github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/go.sum b/go.sum index c5ab85db14cc95ca332d6f81b0aca9c6a9823090..f744fc313ba5dd7932f330ccb3d3e50a054a15a3 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFm github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= github.com/apache/dubbo-getty v1.3.10 h1:ys5mwjPdxG/KwkPjS6EI0RzQtU6p6FCPoKpaFEzpAL0= github.com/apache/dubbo-getty v1.3.10/go.mod h1:x6rraK01BL5C7jUM2fPl5KMkAxLVIx54ZB8/XEOik9Y= -github.com/apache/dubbo-go-hessian2 v1.6.2 h1:i7F5GjVaUatLQz1x9vUmmSIFj49L8J6rVICdF6xw4qw= -github.com/apache/dubbo-go-hessian2 v1.6.2/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w= +github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44 h1:9biQu3Z0PjDN1m8h6poo76dFkvaIpfryUVpJ5VsYVrM= +github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -150,7 +150,6 @@ github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5Xh github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/dubbogo/go-zookeeper v1.0.1 h1:irLzvOsDOTNsN8Sv9tvYYxVu6DCQfLtziZQtUHmZgz8= github.com/dubbogo/go-zookeeper v1.0.1/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= -github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk= github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.9.1 h1:0/PPFo13zPbjt4Ia0zYWMFi3C6rAe9X7O1J2Iv+BHNM= github.com/dubbogo/gost v1.9.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= @@ -805,13 +804,11 @@ golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= -google.golang.org/api v0.7.0 h1:9sdfJOzWlkqPltHAuzT2Cp+yrBeY1KRVYgms8soxMwM= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -820,7 +817,6 @@ google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsb google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= @@ -831,7 +827,6 @@ google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s= -google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 h1:iKtrH9Y8mcbADOP0YFaEMth7OfuHY9xHOwNj4znpM1A= google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= @@ -859,7 +854,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go index 7e01439f042a2046559188ec9df6924da0236cb1..e2b29686f49aeade5c61a46f464e6bf00165e70c 100644 --- a/metadata/service/inmemory/service_proxy.go +++ b/metadata/service/inmemory/service_proxy.go @@ -55,7 +55,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName), invocation.WithArguments([]interface{}{siV.Interface(), gV.Interface(), vV.Interface(), pV.Interface()}), invocation.WithReply(reflect.ValueOf(&[]interface{}{}).Interface()), - invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}), + invocation.WithAttachments(map[string]interface{}{constant.ASYNC_KEY: "false"}), invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV})) res := m.invkr.Invoke(context.Background(), inv) diff --git a/metrics/prometheus/reporter_test.go b/metrics/prometheus/reporter_test.go index 0cb7d09a2c8e71fb88b54789c8eb3ee2cf967fbf..eaba0e324ff078bdfb2fd4b146ac9ea60d429724 100644 --- a/metrics/prometheus/reporter_test.go +++ b/metrics/prometheus/reporter_test.go @@ -43,7 +43,7 @@ func TestPrometheusReporter_Report(t *testing.T) { "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker := protocol.NewBaseInvoker(url) - attach := make(map[string]string, 10) + attach := make(map[string]interface{}, 10) inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) assert.False(t, isConsumer(url)) diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index e51cc5fccfe43a97de582521f7e7b8116600d39e..fa7b0f8cc00366ded7532ba035df57e83e71f861 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -188,8 +188,7 @@ func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCI // inject opentracing ctx currentSpan := opentracing.SpanFromContext(ctx) if currentSpan != nil { - carrier := opentracing.TextMapCarrier(inv.Attachments()) - err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier) + err := injectTraceCtx(currentSpan, inv) if err != nil { logger.Errorf("Could not inject the span context into attachments: %v", err) } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 3c68b63f7b1678075d505799044d1a9b1a7e1da4..4d32c29a222e8bb73486d664b2ed2e0038a4b3f5 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -51,7 +51,7 @@ func TestDubboInvokerInvoke(t *testing.T) { user := &User{} inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}), - invocation.WithReply(user), invocation.WithAttachments(map[string]string{"test_key": "test_value"})) + invocation.WithReply(user), invocation.WithAttachments(map[string]interface{}{"test_key": "test_value"})) // Call res := invoker.Invoke(context.Background(), inv) diff --git a/protocol/dubbo/hessian2/const.go b/protocol/dubbo/hessian2/const.go new file mode 100644 index 0000000000000000000000000000000000000000..74a00b601db22397916aab215ccd33bc918d91e7 --- /dev/null +++ b/protocol/dubbo/hessian2/const.go @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This constants are also defined in dubbo constants. But we will still used these until hessian is stable. + +package hessian2 + +import ( + "reflect" + "regexp" +) + +import ( + perrors "github.com/pkg/errors" +) + +const ( + mask = byte(127) + flag = byte(128) +) + +const ( + // Zero : byte zero + Zero = byte(0x00) +) + +// constansts +const ( + TAG_READ = int32(-1) + ASCII_GAP = 32 + CHUNK_SIZE = 4096 + BC_BINARY = byte('B') // final chunk + BC_BINARY_CHUNK = byte('A') // non-final chunk + + BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary + BINARY_DIRECT_MAX = byte(0x0f) + BC_BINARY_SHORT = byte(0x34) // 2-byte length binary + BINARY_SHORT_MAX = 0x3ff // 0-1023 binary + + BC_DATE = byte(0x4a) // 64-bit millisecond UTC date + BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date + + BC_DOUBLE = byte('D') // IEEE 64-bit double + + BC_DOUBLE_ZERO = byte(0x5b) + BC_DOUBLE_ONE = byte(0x5c) + BC_DOUBLE_BYTE = byte(0x5d) + BC_DOUBLE_SHORT = byte(0x5e) + BC_DOUBLE_MILL = byte(0x5f) + + BC_FALSE = byte('F') // boolean false + + BC_INT = byte('I') // 32-bit int + + INT_DIRECT_MIN = -0x10 + INT_DIRECT_MAX = byte(0x2f) + BC_INT_ZERO = byte(0x90) + + INT_BYTE_MIN = -0x800 + INT_BYTE_MAX = 0x7ff + BC_INT_BYTE_ZERO = byte(0xc8) + + BC_END = byte('Z') + + INT_SHORT_MIN = -0x40000 + INT_SHORT_MAX = 0x3ffff + BC_INT_SHORT_ZERO = byte(0xd4) + + BC_LIST_VARIABLE = byte(0x55) + BC_LIST_FIXED = byte('V') + BC_LIST_VARIABLE_UNTYPED = byte(0x57) + BC_LIST_FIXED_UNTYPED = byte(0x58) + _listFixedTypedLenTagMin = byte(0x70) + _listFixedTypedLenTagMax = byte(0x77) + _listFixedUntypedLenTagMin = byte(0x78) + _listFixedUntypedLenTagMax = byte(0x7f) + + BC_LIST_DIRECT = byte(0x70) + BC_LIST_DIRECT_UNTYPED = byte(0x78) + LIST_DIRECT_MAX = byte(0x7) + + BC_LONG = byte('L') // 64-bit signed integer + LONG_DIRECT_MIN = -0x08 + LONG_DIRECT_MAX = byte(0x0f) + BC_LONG_ZERO = byte(0xe0) + + LONG_BYTE_MIN = -0x800 + LONG_BYTE_MAX = 0x7ff + BC_LONG_BYTE_ZERO = byte(0xf8) + + LONG_SHORT_MIN = -0x40000 + LONG_SHORT_MAX = 0x3ffff + BC_LONG_SHORT_ZERO = byte(0x3c) + + BC_LONG_INT = byte(0x59) + + BC_MAP = byte('M') + BC_MAP_UNTYPED = byte('H') + + BC_NULL = byte('N') // x4e + + BC_OBJECT = byte('O') + BC_OBJECT_DEF = byte('C') + + BC_OBJECT_DIRECT = byte(0x60) + OBJECT_DIRECT_MAX = byte(0x0f) + + BC_REF = byte(0x51) + + BC_STRING = byte('S') // final string + BC_STRING_CHUNK = byte('R') // non-final string + + BC_STRING_DIRECT = byte(0x00) + STRING_DIRECT_MAX = byte(0x1f) + BC_STRING_SHORT = byte(0x30) + STRING_SHORT_MAX = 0x3ff + + BC_TRUE = byte('T') + + P_PACKET_CHUNK = byte(0x4f) + P_PACKET = byte('P') + + P_PACKET_DIRECT = byte(0x80) + PACKET_DIRECT_MAX = byte(0x7f) + + P_PACKET_SHORT = byte(0x70) + PACKET_SHORT_MAX = 0xfff + ARRAY_STRING = "[string" + ARRAY_INT = "[int" + ARRAY_DOUBLE = "[double" + ARRAY_FLOAT = "[float" + ARRAY_BOOL = "[boolean" + ARRAY_LONG = "[long" + + PATH_KEY = "path" + GROUP_KEY = "group" + INTERFACE_KEY = "interface" + VERSION_KEY = "version" + TIMEOUT_KEY = "timeout" + + STRING_NIL = "" + STRING_TRUE = "true" + STRING_FALSE = "false" + STRING_ZERO = "0.0" + STRING_ONE = "1.0" +) + +// DubboResponse related consts +const ( + Response_OK byte = 20 + Response_CLIENT_TIMEOUT byte = 30 + Response_SERVER_TIMEOUT byte = 31 + Response_BAD_REQUEST byte = 40 + Response_BAD_RESPONSE byte = 50 + Response_SERVICE_NOT_FOUND byte = 60 + Response_SERVICE_ERROR byte = 70 + Response_SERVER_ERROR byte = 80 + Response_CLIENT_ERROR byte = 90 + + // According to "java dubbo" There are two cases of response: + // 1. with attachments + // 2. no attachments + RESPONSE_WITH_EXCEPTION int32 = 0 + RESPONSE_VALUE int32 = 1 + RESPONSE_NULL_VALUE int32 = 2 + RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3 + RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4 + RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5 +) + +/** + * the dubbo protocol header length is 16 Bytes. + * the first 2 Bytes is magic code '0xdabb' + * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag + * the next 1 Bytes is response state. + * the next 8 Bytes is package DI. + * the next 4 Bytes is package length. + **/ +const ( + // header length. + HEADER_LENGTH = 16 + + // magic header + MAGIC = uint16(0xdabb) + MAGIC_HIGH = byte(0xda) + MAGIC_LOW = byte(0xbb) + + // message flag. + FLAG_REQUEST = byte(0x80) + FLAG_TWOWAY = byte(0x40) + FLAG_EVENT = byte(0x20) // for heartbeat + SERIAL_MASK = 0x1f + + DUBBO_VERSION = "2.5.4" + DUBBO_VERSION_KEY = "dubbo" + DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2 + LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200 + DEFAULT_LEN = 8388608 // 8 * 1024 * 1024 default body max length +) + +// regular +const ( + JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)" + CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" + JAVA_IDENT_REGEX + ")*;)" + ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))" + DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" + ARRAY_DESC + ")" +) + +// Dubbo request response related consts +var ( + DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY} + DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST} + DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, Zero, Response_OK} + DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT} + DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_EVENT} +) + +// Error part +var ( + ErrHeaderNotEnough = perrors.New("header buffer too short") + ErrBodyNotEnough = perrors.New("body buffer too short") + ErrJavaException = perrors.New("got java exception") + ErrIllegalPackage = perrors.New("illegal package!") +) + +// nolint +var DescRegex, _ = regexp.Compile(DESC_REGEX) + +var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem()) diff --git a/protocol/dubbo/hessian2/hessian_dubbo.go b/protocol/dubbo/hessian2/hessian_dubbo.go new file mode 100644 index 0000000000000000000000000000000000000000..1afa4ec96eccbb8077852dfcc020e0eb05be3257 --- /dev/null +++ b/protocol/dubbo/hessian2/hessian_dubbo.go @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hessian2 + +import ( + "bufio" + "encoding/binary" + "time" +) + +import ( + "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" +) + +// enum part +const ( + PackageError = PackageType(0x01) + PackageRequest = PackageType(0x02) + PackageResponse = PackageType(0x04) + PackageHeartbeat = PackageType(0x08) + PackageRequest_TwoWay = PackageType(0x10) + PackageResponse_Exception = PackageType(0x20) + PackageType_BitSize = 0x2f +) + +// PackageType nolint +type PackageType int + +// DubboHeader dubbo header +type DubboHeader struct { + SerialID byte + Type PackageType + ID int64 + BodyLen int + ResponseStatus byte +} + +// Service defines service instance +type Service struct { + Path string + Interface string + Group string + Version string + Method string + Timeout time.Duration // request timeout +} + +// HessianCodec defines hessian codec +type HessianCodec struct { + pkgType PackageType + reader *bufio.Reader + bodyLen int +} + +// NewHessianCodec generate a new hessian codec instance +func NewHessianCodec(reader *bufio.Reader) *HessianCodec { + return &HessianCodec{ + reader: reader, + } +} + +// NewHessianCodec generate a new hessian codec instance +func NewHessianCodecCustom(pkgType PackageType, reader *bufio.Reader, bodyLen int) *HessianCodec { + return &HessianCodec{ + pkgType: pkgType, + reader: reader, + bodyLen: bodyLen, + } +} + +func (h *HessianCodec) Write(service Service, header DubboHeader, body interface{}) ([]byte, error) { + switch header.Type { + case PackageHeartbeat: + if header.ResponseStatus == Zero { + return packRequest(service, header, body) + } + return packResponse(header, body) + + case PackageRequest, PackageRequest_TwoWay: + return packRequest(service, header, body) + + case PackageResponse: + return packResponse(header, body) + + default: + return nil, perrors.Errorf("Unrecognised message type: %v", header.Type) + } + + // unreachable return nil, nil +} + +// ReadHeader uses hessian codec to read dubbo header +func (h *HessianCodec) ReadHeader(header *DubboHeader) error { + + var err error + + if h.reader.Size() < HEADER_LENGTH { + return ErrHeaderNotEnough + } + buf, err := h.reader.Peek(HEADER_LENGTH) + if err != nil { // this is impossible + return perrors.WithStack(err) + } + _, err = h.reader.Discard(HEADER_LENGTH) + if err != nil { // this is impossible + return perrors.WithStack(err) + } + + //// read header + + if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW { + return ErrIllegalPackage + } + + // Header{serialization id(5 bit), event, two way, req/response} + if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero { + return perrors.Errorf("serialization ID:%v", header.SerialID) + } + + flag := buf[2] & FLAG_EVENT + if flag != Zero { + header.Type |= PackageHeartbeat + } + flag = buf[2] & FLAG_REQUEST + if flag != Zero { + header.Type |= PackageRequest + flag = buf[2] & FLAG_TWOWAY + if flag != Zero { + header.Type |= PackageRequest_TwoWay + } + } else { + header.Type |= PackageResponse + header.ResponseStatus = buf[3] + if header.ResponseStatus != Response_OK { + header.Type |= PackageResponse_Exception + } + } + + // Header{req id} + header.ID = int64(binary.BigEndian.Uint64(buf[4:])) + + // Header{body len} + header.BodyLen = int(binary.BigEndian.Uint32(buf[12:])) + if header.BodyLen < 0 { + return ErrIllegalPackage + } + + h.pkgType = header.Type + h.bodyLen = header.BodyLen + + if h.reader.Buffered() < h.bodyLen { + return ErrBodyNotEnough + } + + return perrors.WithStack(err) + +} + +// ReadBody uses hessian codec to read response body +func (h *HessianCodec) ReadBody(rspObj interface{}) error { + + if h.reader.Buffered() < h.bodyLen { + return ErrBodyNotEnough + } + buf, err := h.reader.Peek(h.bodyLen) + if err != nil { + return perrors.WithStack(err) + } + _, err = h.reader.Discard(h.bodyLen) + if err != nil { // this is impossible + return perrors.WithStack(err) + } + + switch h.pkgType & PackageType_BitSize { + case PackageResponse | PackageHeartbeat | PackageResponse_Exception, PackageResponse | PackageResponse_Exception: + decoder := hessian.NewDecoder(buf[:]) + exception, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + rsp, ok := rspObj.(*DubboResponse) + if !ok { + return perrors.Errorf("java exception:%s", exception.(string)) + } + rsp.Exception = perrors.Errorf("java exception:%s", exception.(string)) + return nil + case PackageRequest | PackageHeartbeat, PackageResponse | PackageHeartbeat: + case PackageRequest: + if rspObj != nil { + if err = unpackRequestBody(hessian.NewDecoder(buf[:]), rspObj); err != nil { + return perrors.WithStack(err) + } + } + case PackageResponse: + if rspObj != nil { + if err = unpackResponseBody(hessian.NewDecoder(buf[:]), rspObj); err != nil { + return perrors.WithStack(err) + } + } + } + + return nil +} + +// ignore body, but only read attachments +func (h *HessianCodec) ReadAttachments() (map[string]interface{}, error) { + if h.reader.Buffered() < h.bodyLen { + return nil, ErrBodyNotEnough + } + buf, err := h.reader.Peek(h.bodyLen) + if err != nil { + return nil, perrors.WithStack(err) + } + _, err = h.reader.Discard(h.bodyLen) + if err != nil { // this is impossible + return nil, perrors.WithStack(err) + } + + switch h.pkgType & PackageType_BitSize { + case PackageRequest: + rspObj := make([]interface{}, 7) + if err = unpackRequestBody(hessian.NewDecoderWithSkip(buf[:]), rspObj); err != nil { + return nil, perrors.WithStack(err) + } + return rspObj[6].(map[string]interface{}), nil + case PackageResponse: + rspObj := &DubboResponse{} + if err = unpackResponseBody(hessian.NewDecoderWithSkip(buf[:]), rspObj); err != nil { + return nil, perrors.WithStack(err) + } + return rspObj.Attachments, nil + } + + return nil, nil +} diff --git a/protocol/dubbo/hessian2/hessian_dubbo_test.go b/protocol/dubbo/hessian2/hessian_dubbo_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c3f19f04536484816e4b4f709f534dcbf4adb2b4 --- /dev/null +++ b/protocol/dubbo/hessian2/hessian_dubbo_test.go @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hessian2 + +import ( + "bufio" + "bytes" + "reflect" + "testing" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/stretchr/testify/assert" +) + +type Case struct { + A string + B int +} + +type CaseA struct { + A string + B int + C Case +} + +type CaseB struct { + A string + B CaseA +} + +func (c *CaseB) JavaClassName() string { + return "com.test.caseb" +} + +func (c CaseA) JavaClassName() string { + return "com.test.casea" +} + +//JavaClassName java fully qualified path +func (c Case) JavaClassName() string { + return "com.test.case" +} + +func doTestHessianEncodeHeader(t *testing.T, packageType PackageType, responseStatus byte, body interface{}) ([]byte, error) { + hessian.RegisterPOJO(&Case{}) + codecW := NewHessianCodec(nil) + resp, err := codecW.Write(Service{ + Path: "test", + Interface: "ITest", + Version: "v1.0", + Method: "test", + Timeout: time.Second * 10, + }, DubboHeader{ + SerialID: 2, + Type: packageType, + ID: 1, + ResponseStatus: responseStatus, + }, body) + assert.Nil(t, err) + return resp, err +} + +func doTestResponse(t *testing.T, packageType PackageType, responseStatus byte, body interface{}, decodedResponse *DubboResponse, assertFunc func()) { + resp, err := doTestHessianEncodeHeader(t, packageType, responseStatus, body) + + codecR := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp))) + + h := &DubboHeader{} + err = codecR.ReadHeader(h) + assert.Nil(t, err) + + assert.Equal(t, byte(2), h.SerialID) + assert.Equal(t, packageType, h.Type&(PackageRequest|PackageResponse|PackageHeartbeat)) + assert.Equal(t, int64(1), h.ID) + assert.Equal(t, responseStatus, h.ResponseStatus) + + err = codecR.ReadBody(decodedResponse) + assert.Nil(t, err) + t.Log(decodedResponse) + + if assertFunc != nil { + assertFunc() + return + } + + if h.ResponseStatus != Zero && h.ResponseStatus != Response_OK { + assert.Equal(t, "java exception:"+body.(string), decodedResponse.Exception.Error()) + return + } + + in, _ := hessian.EnsureInterface(hessian.UnpackPtrValue(hessian.EnsurePackValue(body)), nil) + out, _ := hessian.EnsureInterface(hessian.UnpackPtrValue(hessian.EnsurePackValue(decodedResponse.RspObj)), nil) + assert.Equal(t, in, out) +} + +func TestResponse(t *testing.T) { + caseObj := Case{A: "a", B: 1} + decodedResponse := &DubboResponse{} + + arr := []*Case{&caseObj} + var arrRes []interface{} + decodedResponse.RspObj = &arrRes + doTestResponse(t, PackageResponse, Response_OK, arr, decodedResponse, func() { + assert.Equal(t, 1, len(arrRes)) + assert.Equal(t, &caseObj, arrRes[0]) + }) + + decodedResponse.RspObj = &Case{} + doTestResponse(t, PackageResponse, Response_OK, &Case{A: "a", B: 1}, decodedResponse, nil) + + s := "ok!!!!!" + strObj := "" + decodedResponse.RspObj = &strObj + doTestResponse(t, PackageResponse, Response_OK, s, decodedResponse, nil) + + var intObj int64 + decodedResponse.RspObj = &intObj + doTestResponse(t, PackageResponse, Response_OK, int64(3), decodedResponse, nil) + + boolObj := false + decodedResponse.RspObj = &boolObj + doTestResponse(t, PackageResponse, Response_OK, true, decodedResponse, nil) + + strObj = "" + decodedResponse.RspObj = &strObj + doTestResponse(t, PackageResponse, hessian.Response_SERVER_ERROR, "error!!!!!", decodedResponse, nil) + + mapObj := map[string][]*Case{"key": {&caseObj}} + mapRes := map[interface{}]interface{}{} + decodedResponse.RspObj = &mapRes + doTestResponse(t, PackageResponse, Response_OK, mapObj, decodedResponse, func() { + c, ok := mapRes["key"] + if !ok { + assert.FailNow(t, "no key in decoded response map") + } + + mapValueArr, ok := c.([]*Case) + if !ok { + assert.FailNow(t, "invalid decoded response map value", "expect []*Case, but get %v", reflect.TypeOf(c)) + } + assert.Equal(t, 1, len(mapValueArr)) + assert.Equal(t, &caseObj, mapValueArr[0]) + }) +} + +func doTestRequest(t *testing.T, packageType PackageType, responseStatus byte, body interface{}) { + resp, err := doTestHessianEncodeHeader(t, packageType, responseStatus, body) + + codecR := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp))) + + h := &DubboHeader{} + err = codecR.ReadHeader(h) + assert.Nil(t, err) + assert.Equal(t, byte(2), h.SerialID) + assert.Equal(t, packageType, h.Type&(PackageRequest|PackageResponse|PackageHeartbeat)) + assert.Equal(t, int64(1), h.ID) + assert.Equal(t, responseStatus, h.ResponseStatus) + + c := make([]interface{}, 7) + err = codecR.ReadBody(c) + assert.Nil(t, err) + t.Log(c) + assert.True(t, len(body.([]interface{})) == len(c[5].([]interface{}))) +} + +func TestRequest(t *testing.T) { + doTestRequest(t, PackageRequest, Zero, []interface{}{"a"}) + doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3}) + doTestRequest(t, PackageRequest, Zero, []interface{}{"a", true}) + doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3, true}) + doTestRequest(t, PackageRequest, Zero, []interface{}{3.2, true}) + doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3, true, &Case{A: "a", B: 3}}) + doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3, true, []*Case{{A: "a", B: 3}}}) + doTestRequest(t, PackageRequest, Zero, []interface{}{map[string][]*Case{"key": {{A: "a", B: 3}}}}) +} + +func TestHessianCodec_ReadAttachments(t *testing.T) { + hessian.RegisterPOJO(&AttachTestObject{}) + body := &DubboResponse{ + RspObj: &CaseB{A: "A", B: CaseA{A: "a", B: 1, C: Case{A: "c", B: 2}}}, + Exception: nil, + Attachments: map[string]interface{}{DUBBO_VERSION_KEY: "2.6.4", "att": AttachTestObject{Id: 23, Name: "haha"}}, + } + resp, err := doTestHessianEncodeHeader(t, PackageResponse, Response_OK, body) + assert.NoError(t, err) + hessian.UnRegisterPOJOs(&CaseB{}, &CaseA{}) + codecR1 := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp))) + codecR2 := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp))) + h := &DubboHeader{} + assert.NoError(t, codecR1.ReadHeader(h)) + t.Log(h) + assert.NoError(t, codecR2.ReadHeader(h)) + t.Log(h) + + err = codecR1.ReadBody(body) + assert.Equal(t, "can not find go type name com.test.caseb in registry", err.Error()) + attrs, err := codecR2.ReadAttachments() + assert.NoError(t, err) + assert.Equal(t, "2.6.4", attrs[DUBBO_VERSION_KEY]) + assert.Equal(t, AttachTestObject{Id: 23, Name: "haha"}, *(attrs["att"].(*AttachTestObject))) + assert.NotEqual(t, AttachTestObject{Id: 24, Name: "nohaha"}, *(attrs["att"].(*AttachTestObject))) + + t.Log(attrs) +} + +type AttachTestObject struct { + Id int32 + Name string `dubbo:name` +} + +func (AttachTestObject) JavaClassName() string { + return "com.test.Test" +} diff --git a/protocol/dubbo/hessian2/hessian_request.go b/protocol/dubbo/hessian2/hessian_request.go new file mode 100644 index 0000000000000000000000000000000000000000..4ebb4aa1be05d4d1941661fed452dda06cf55fa0 --- /dev/null +++ b/protocol/dubbo/hessian2/hessian_request.go @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hessian2 + +import ( + "encoding/binary" + "reflect" + "strconv" + "strings" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" +) + +///////////////////////////////////////// +// dubbo +///////////////////////////////////////// + +func getArgType(v interface{}) string { + if v == nil { + return "V" + } + + switch v.(type) { + // Serialized tags for base types + case nil: + return "V" + case bool: + return "Z" + case []bool: + return "[Z" + case byte: + return "B" + case []byte: + return "[B" + case int8: + return "B" + case []int8: + return "[B" + case int16: + return "S" + case []int16: + return "[S" + case uint16: // Equivalent to Char of Java + return "C" + case []uint16: + return "[C" + // case rune: + // return "C" + case int: + return "J" + case []int: + return "[J" + case int32: + return "I" + case []int32: + return "[I" + case int64: + return "J" + case []int64: + return "[J" + case time.Time: + return "java.util.Date" + case []time.Time: + return "[Ljava.util.Date" + case float32: + return "F" + case []float32: + return "[F" + case float64: + return "D" + case []float64: + return "[D" + case string: + return "java.lang.String" + case []string: + return "[Ljava.lang.String;" + case []hessian.Object: + return "[Ljava.lang.Object;" + case map[interface{}]interface{}: + // return "java.util.HashMap" + return "java.util.Map" + case hessian.POJOEnum: + return v.(hessian.POJOEnum).JavaClassName() + // Serialized tags for complex types + default: + t := reflect.TypeOf(v) + if reflect.Ptr == t.Kind() { + t = reflect.TypeOf(reflect.ValueOf(v).Elem()) + } + switch t.Kind() { + case reflect.Struct: + return "java.lang.Object" + case reflect.Slice, reflect.Array: + if t.Elem().Kind() == reflect.Struct { + return "[Ljava.lang.Object;" + } + // return "java.util.ArrayList" + return "java.util.List" + case reflect.Map: // Enter here, map may be map[string]int + return "java.util.Map" + default: + return "" + } + } + + // unreachable + // return "java.lang.RuntimeException" +} + +func getArgsTypeList(args []interface{}) (string, error) { + var ( + typ string + types string + ) + + for i := range args { + typ = getArgType(args[i]) + if typ == "" { + return types, perrors.Errorf("cat not get arg %#v type", args[i]) + } + if !strings.Contains(typ, ".") { + types += typ + } else if strings.Index(typ, "[") == 0 { + types += strings.Replace(typ, ".", "/", -1) + } else { + // java.util.List -> Ljava/util/List; + types += "L" + strings.Replace(typ, ".", "/", -1) + ";" + } + } + + return types, nil +} + +type DubboRequest struct { + Params interface{} + Attachments map[string]interface{} +} + +// NewRequest create a new DubboRequest +func NewRequest(params interface{}, atta map[string]interface{}) *DubboRequest { + if atta == nil { + atta = make(map[string]interface{}) + } + return &DubboRequest{ + Params: params, + Attachments: atta, + } +} + +func EnsureRequest(body interface{}) *DubboRequest { + if req, ok := body.(*DubboRequest); ok { + return req + } + return NewRequest(body, nil) +} + +func packRequest(service Service, header DubboHeader, req interface{}) ([]byte, error) { + var ( + err error + types string + byteArray []byte + pkgLen int + ) + + request := EnsureRequest(req) + + args, ok := request.Params.([]interface{}) + if !ok { + return nil, perrors.Errorf("@params is not of type: []interface{}") + } + + hb := header.Type == PackageHeartbeat + + ////////////////////////////////////////// + // byteArray + ////////////////////////////////////////// + // magic + switch header.Type { + case PackageHeartbeat: + byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...) + case PackageRequest_TwoWay: + byteArray = append(byteArray, DubboRequestHeaderBytesTwoWay[:]...) + default: + byteArray = append(byteArray, DubboRequestHeaderBytes[:]...) + } + + // serialization id, two way flag, event, request/response flag + // SerialID is id of serialization approach in java dubbo + byteArray[2] |= header.SerialID & SERIAL_MASK + // request id + binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID)) + + encoder := hessian.NewEncoder() + encoder.Append(byteArray[:HEADER_LENGTH]) + + ////////////////////////////////////////// + // body + ////////////////////////////////////////// + if hb { + encoder.Encode(nil) + goto END + } + + // dubbo version + path + version + method + encoder.Encode(DEFAULT_DUBBO_PROTOCOL_VERSION) + encoder.Encode(service.Path) + encoder.Encode(service.Version) + encoder.Encode(service.Method) + + // args = args type list + args value list + if types, err = getArgsTypeList(args); err != nil { + return nil, perrors.Wrapf(err, " PackRequest(args:%+v)", args) + } + encoder.Encode(types) + for _, v := range args { + encoder.Encode(v) + } + + request.Attachments[PATH_KEY] = service.Path + request.Attachments[VERSION_KEY] = service.Version + if len(service.Group) > 0 { + request.Attachments[GROUP_KEY] = service.Group + } + if len(service.Interface) > 0 { + request.Attachments[INTERFACE_KEY] = service.Interface + } + if service.Timeout != 0 { + request.Attachments[TIMEOUT_KEY] = strconv.Itoa(int(service.Timeout / time.Millisecond)) + } + + encoder.Encode(request.Attachments) + +END: + byteArray = encoder.Buffer() + pkgLen = len(byteArray) + if pkgLen > int(DEFAULT_LEN) { // 8M + return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN) + } + // byteArray{body length} + binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen-HEADER_LENGTH)) + return byteArray, nil +} + +// hessian decode request body +func unpackRequestBody(decoder *hessian.Decoder, reqObj interface{}) error { + + if decoder == nil { + return perrors.Errorf("@decoder is nil") + } + + req, ok := reqObj.([]interface{}) + if !ok { + return perrors.Errorf("@reqObj is not of type: []interface{}") + } + if len(req) < 7 { + return perrors.New("length of @reqObj should be 7") + } + + var ( + err error + dubboVersion, target, serviceVersion, method, argsTypes interface{} + args []interface{} + ) + + dubboVersion, err = decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + req[0] = dubboVersion + + target, err = decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + req[1] = target + + serviceVersion, err = decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + req[2] = serviceVersion + + method, err = decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + req[3] = method + + argsTypes, err = decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + req[4] = argsTypes + + ats := DescRegex.FindAllString(argsTypes.(string), -1) + var arg interface{} + for i := 0; i < len(ats); i++ { + arg, err = decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + args = append(args, arg) + } + req[5] = args + + attachments, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + v[DUBBO_VERSION_KEY] = dubboVersion + req[6] = ToMapStringInterface(v) + return nil + } + + return perrors.Errorf("get wrong attachments: %+v", attachments) +} + +func ToMapStringInterface(origin map[interface{}]interface{}) map[string]interface{} { + dest := make(map[string]interface{}, len(origin)) + for k, v := range origin { + if kv, ok := k.(string); ok { + if v == nil { + dest[kv] = "" + continue + } + dest[kv] = v + } + } + return dest +} diff --git a/protocol/dubbo/hessian2/hessian_request_test.go b/protocol/dubbo/hessian2/hessian_request_test.go new file mode 100644 index 0000000000000000000000000000000000000000..98d5f2399c9fdbf99a7274e0c3e53c2c175f774f --- /dev/null +++ b/protocol/dubbo/hessian2/hessian_request_test.go @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hessian2 + +import ( + "reflect" + "strconv" + "testing" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/stretchr/testify/assert" +) + +type TestEnumGender hessian.JavaEnum + +const ( + MAN hessian.JavaEnum = iota + WOMAN +) + +var genderName = map[hessian.JavaEnum]string{ + MAN: "MAN", + WOMAN: "WOMAN", +} + +var genderValue = map[string]hessian.JavaEnum{ + "MAN": MAN, + "WOMAN": WOMAN, +} + +func (g TestEnumGender) JavaClassName() string { + return "com.ikurento.test.TestEnumGender" +} + +func (g TestEnumGender) String() string { + s, ok := genderName[hessian.JavaEnum(g)] + if ok { + return s + } + + return strconv.Itoa(int(g)) +} + +func (g TestEnumGender) EnumValue(s string) hessian.JavaEnum { + v, ok := genderValue[s] + if ok { + return v + } + + return hessian.InvalidJavaEnum +} + +func TestPackRequest(t *testing.T) { + bytes, err := packRequest(Service{ + Path: "test", + Interface: "ITest", + Version: "v1.0", + Method: "test", + Timeout: time.Second * 10, + }, DubboHeader{ + SerialID: 0, + Type: PackageRequest, + ID: 123, + }, []interface{}{1, 2}) + + assert.Nil(t, err) + + if bytes != nil { + t.Logf("pack request: %s", string(bytes)) + } +} + +func TestGetArgsTypeList(t *testing.T) { + type Test struct{} + str, err := getArgsTypeList([]interface{}{nil, 1, []int{2}, true, []bool{false}, "a", []string{"b"}, Test{}, &Test{}, []Test{}, map[string]Test{}, TestEnumGender(MAN)}) + assert.NoError(t, err) + assert.Equal(t, "VJ[JZ[ZLjava/lang/String;[Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;[Ljava/lang/Object;Ljava/util/Map;Lcom/ikurento/test/TestEnumGender;", str) +} + +func TestDescRegex(t *testing.T) { + results := DescRegex.FindAllString("Ljava/lang/String;", -1) + assert.Equal(t, 1, len(results)) + assert.Equal(t, "Ljava/lang/String;", results[0]) + + results = DescRegex.FindAllString("Ljava/lang/String;I", -1) + assert.Equal(t, 2, len(results)) + assert.Equal(t, "Ljava/lang/String;", results[0]) + assert.Equal(t, "I", results[1]) + + results = DescRegex.FindAllString("ILjava/lang/String;", -1) + assert.Equal(t, 2, len(results)) + assert.Equal(t, "I", results[0]) + assert.Equal(t, "Ljava/lang/String;", results[1]) + + results = DescRegex.FindAllString("ILjava/lang/String;IZ", -1) + assert.Equal(t, 4, len(results)) + assert.Equal(t, "I", results[0]) + assert.Equal(t, "Ljava/lang/String;", results[1]) + assert.Equal(t, "I", results[2]) + assert.Equal(t, "Z", results[3]) + + results = DescRegex.FindAllString("[Ljava/lang/String;[I", -1) + assert.Equal(t, 2, len(results)) + assert.Equal(t, "[Ljava/lang/String;", results[0]) + assert.Equal(t, "[I", results[1]) +} + +func TestIssue192(t *testing.T) { + type args struct { + origin map[interface{}]interface{} + } + tests := []struct { + name string + args args + want map[string]interface{} + }{ + { + name: "not null", + args: args{ + origin: map[interface{}]interface{}{ + "1": nil, + "2": "3", + "": "", + }, + }, + want: map[string]interface{}{ + "1": "", + "2": "3", + "": "", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ToMapStringInterface(tt.args.origin); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ToMapStringString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/protocol/dubbo/hessian2/hessian_response.go b/protocol/dubbo/hessian2/hessian_response.go new file mode 100644 index 0000000000000000000000000000000000000000..982960ed87e74b325687ac364c97a347efe6c38f --- /dev/null +++ b/protocol/dubbo/hessian2/hessian_response.go @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hessian2 + +import ( + "encoding/binary" + "math" + "reflect" + "strconv" + "strings" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go-hessian2/java_exception" + perrors "github.com/pkg/errors" +) + +// DubboResponse dubbo response +type DubboResponse struct { + RspObj interface{} + Exception error + Attachments map[string]interface{} +} + +// NewResponse create a new DubboResponse +func NewResponse(rspObj interface{}, exception error, attachments map[string]interface{}) *DubboResponse { + if attachments == nil { + attachments = make(map[string]interface{}, 8) + } + return &DubboResponse{ + RspObj: rspObj, + Exception: exception, + Attachments: attachments, + } +} + +// EnsureResponse check body type, make sure it's a DubboResponse or package it as a DubboResponse +func EnsureResponse(body interface{}) *DubboResponse { + if res, ok := body.(*DubboResponse); ok { + return res + } + if exp, ok := body.(error); ok { + return NewResponse(nil, exp, nil) + } + return NewResponse(body, nil, nil) +} + +// https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java#L256 +// hessian encode response +func packResponse(header DubboHeader, ret interface{}) ([]byte, error) { + var ( + byteArray []byte + ) + + response := EnsureResponse(ret) + + hb := header.Type == PackageHeartbeat + + // magic + if hb { + byteArray = append(byteArray, DubboResponseHeartbeatHeader[:]...) + } else { + byteArray = append(byteArray, DubboResponseHeaderBytes[:]...) + } + // set serialID, identify serialization types, eg: fastjson->6, hessian2->2 + byteArray[2] |= header.SerialID & SERIAL_MASK + // response status + if header.ResponseStatus != 0 { + byteArray[3] = header.ResponseStatus + } + + // request id + binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID)) + + // body + encoder := hessian.NewEncoder() + encoder.Append(byteArray[:HEADER_LENGTH]) + + if header.ResponseStatus == Response_OK { + if hb { + encoder.Encode(nil) + } else { + atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY]) + + var resWithException, resValue, resNullValue int32 + if atta { + resWithException = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS + resValue = RESPONSE_VALUE_WITH_ATTACHMENTS + resNullValue = RESPONSE_NULL_VALUE_WITH_ATTACHMENTS + } else { + resWithException = RESPONSE_WITH_EXCEPTION + resValue = RESPONSE_VALUE + resNullValue = RESPONSE_NULL_VALUE + } + + if response.Exception != nil { // throw error + encoder.Encode(resWithException) + if t, ok := response.Exception.(java_exception.Throwabler); ok { + encoder.Encode(t) + } else { + encoder.Encode(java_exception.NewThrowable(response.Exception.Error())) + } + } else { + if response.RspObj == nil { + encoder.Encode(resNullValue) + } else { + encoder.Encode(resValue) + encoder.Encode(response.RspObj) // result + } + } + + if atta { + encoder.Encode(response.Attachments) // attachments + } + } + } else { + if response.Exception != nil { // throw error + encoder.Encode(response.Exception.Error()) + } else { + encoder.Encode(response.RspObj) + } + } + + byteArray = encoder.Buffer() + byteArray = hessian.EncNull(byteArray) // if not, "java client" will throw exception "unexpected end of file" + pkgLen := len(byteArray) + if pkgLen > int(DEFAULT_LEN) { // 8M + return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN) + } + // byteArray{body length} + binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen-HEADER_LENGTH)) + return byteArray, nil + +} + +// hessian decode response body +func unpackResponseBody(decoder *hessian.Decoder, resp interface{}) error { + // body + if decoder == nil { + return perrors.Errorf("@decoder is nil") + } + rspType, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + + response := EnsureResponse(resp) + + switch rspType { + case RESPONSE_WITH_EXCEPTION, RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: + expt, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + if rspType == RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS { + attachments, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + atta := ToMapStringInterface(v) + response.Attachments = atta + } else { + return perrors.Errorf("get wrong attachments: %+v", attachments) + } + } + + if e, ok := expt.(error); ok { + response.Exception = e + } else { + response.Exception = perrors.Errorf("got exception: %+v", expt) + } + return nil + + case RESPONSE_VALUE, RESPONSE_VALUE_WITH_ATTACHMENTS: + rsp, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + if rspType == RESPONSE_VALUE_WITH_ATTACHMENTS { + attachments, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + response.Attachments = ToMapStringInterface(v) + } else { + return perrors.Errorf("get wrong attachments: %+v", attachments) + } + } + + // If the return value is nil, + // we should consider it normal + if rsp == nil { + return nil + } + + return perrors.WithStack(ReflectResponse(rsp, response.RspObj)) + + case RESPONSE_NULL_VALUE, RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: + if rspType == RESPONSE_NULL_VALUE_WITH_ATTACHMENTS { + attachments, err := decoder.Decode() + if err != nil { + return perrors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + atta := ToMapStringInterface(v) + response.Attachments = atta + } else { + return perrors.Errorf("get wrong attachments: %+v", attachments) + } + } + return nil + } + + return nil +} + +// CopySlice copy from inSlice to outSlice +func CopySlice(inSlice, outSlice reflect.Value) error { + if inSlice.IsNil() { + return perrors.New("@in is nil") + } + if inSlice.Kind() != reflect.Slice { + return perrors.Errorf("@in is not slice, but %v", inSlice.Kind()) + } + + for outSlice.Kind() == reflect.Ptr { + outSlice = outSlice.Elem() + } + + size := inSlice.Len() + outSlice.Set(reflect.MakeSlice(outSlice.Type(), size, size)) + + for i := 0; i < size; i++ { + inSliceValue := inSlice.Index(i) + if !inSliceValue.Type().AssignableTo(outSlice.Index(i).Type()) { + return perrors.Errorf("in element type [%s] can not assign to out element type [%s]", + inSliceValue.Type().String(), outSlice.Type().String()) + } + outSlice.Index(i).Set(inSliceValue) + } + + return nil +} + +// CopyMap copy from in map to out map +func CopyMap(inMapValue, outMapValue reflect.Value) error { + if inMapValue.IsNil() { + return perrors.New("@in is nil") + } + if !inMapValue.CanInterface() { + return perrors.New("@in's Interface can not be used.") + } + if inMapValue.Kind() != reflect.Map { + return perrors.Errorf("@in is not map, but %v", inMapValue.Kind()) + } + + outMapType := hessian.UnpackPtrType(outMapValue.Type()) + hessian.SetValue(outMapValue, reflect.MakeMap(outMapType)) + + outKeyType := outMapType.Key() + + outMapValue = hessian.UnpackPtrValue(outMapValue) + outValueType := outMapValue.Type().Elem() + + for _, inKey := range inMapValue.MapKeys() { + inValue := inMapValue.MapIndex(inKey) + + if !inKey.Type().AssignableTo(outKeyType) { + return perrors.Errorf("in Key:{type:%s, value:%#v} can not assign to out Key:{type:%s} ", + inKey.Type().String(), inKey, outKeyType.String()) + } + if !inValue.Type().AssignableTo(outValueType) { + return perrors.Errorf("in Value:{type:%s, value:%#v} can not assign to out value:{type:%s}", + inValue.Type().String(), inValue, outValueType.String()) + } + outMapValue.SetMapIndex(inKey, inValue) + } + + return nil +} + +// ReflectResponse reflect return value +// TODO response object should not be copied again to another object, it should be the exact type of the object +func ReflectResponse(in interface{}, out interface{}) error { + if in == nil { + return perrors.Errorf("@in is nil") + } + + if out == nil { + return perrors.Errorf("@out is nil") + } + if reflect.TypeOf(out).Kind() != reflect.Ptr { + return perrors.Errorf("@out should be a pointer") + } + + inValue := hessian.EnsurePackValue(in) + outValue := hessian.EnsurePackValue(out) + + outType := outValue.Type().String() + if outType == "interface {}" || outType == "*interface {}" { + hessian.SetValue(outValue, inValue) + return nil + } + + switch inValue.Type().Kind() { + case reflect.Slice, reflect.Array: + return CopySlice(inValue, outValue) + case reflect.Map: + return CopyMap(inValue, outValue) + default: + hessian.SetValue(outValue, inValue) + } + + return nil +} + +var versionInt = make(map[string]int) + +// https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java#L96 +// isSupportResponseAttachment is for compatibility among some dubbo version +func isSupportResponseAttachment(ver interface{}) bool { + version, ok := ver.(string) + if !ok || len(version) == 0 { + return false + } + + v, ok := versionInt[version] + if !ok { + v = version2Int(version) + if v == -1 { + return false + } + } + + if v >= 2001000 && v <= 2060200 { // 2.0.10 ~ 2.6.2 + return false + } + return v >= LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT +} + +func version2Int(ver interface{}) int { + version, ok := ver.(string) + if !ok || len(version) == 0 { + return 0 + } + var v = 0 + varr := strings.Split(version, ".") + length := len(varr) + for key, value := range varr { + v0, err := strconv.Atoi(value) + if err != nil { + return -1 + } + v += v0 * int(math.Pow10((length-key-1)*2)) + } + if length == 3 { + return v * 100 + } + return v +} diff --git a/protocol/dubbo/hessian2/hessian_response_test.go b/protocol/dubbo/hessian2/hessian_response_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f5c84baa90b3b31b271979cb1107503facac71d9 --- /dev/null +++ b/protocol/dubbo/hessian2/hessian_response_test.go @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hessian2 + +import ( + "reflect" + "testing" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/stretchr/testify/assert" +) + +func doTestReflectResponse(t *testing.T, in interface{}, out interface{}) { + err := ReflectResponse(in, out) + if err != nil { + t.Error(err) + t.FailNow() + } + + result := hessian.UnpackPtrValue(reflect.ValueOf(out)).Interface() + + equal := reflect.DeepEqual(in, result) + if !equal { + t.Errorf("expect [%v]: %v, but got [%v]: %v", reflect.TypeOf(in), in, reflect.TypeOf(result), result) + } +} + +func TestReflectResponse(t *testing.T) { + var b bool + doTestReflectResponse(t, true, &b) + doTestReflectResponse(t, false, &b) + + var i int + doTestReflectResponse(t, 123, &i) + doTestReflectResponse(t, 234, &i) + + var i16 int16 + doTestReflectResponse(t, int16(456), &i16) + + var i64 int64 + doTestReflectResponse(t, int64(789), &i64) + + var s string + doTestReflectResponse(t, "hello world", &s) + + type rr struct { + Name string + Num int + } + + var r1 rr + doTestReflectResponse(t, rr{"dubbogo", 32}, &r1) + + // ------ map test ------- + m1 := make(map[interface{}]interface{}) + var m1r map[interface{}]interface{} + m1["hello"] = "world" + m1[1] = "go" + m1["dubbo"] = 666 + doTestReflectResponse(t, m1, &m1r) + + m2 := make(map[string]string) + var m2r map[string]string + m2["hello"] = "world" + m2["dubbo"] = "666" + doTestReflectResponse(t, m2, &m2r) + + m3 := make(map[string]rr) + var m3r map[string]rr + m3["dubbo"] = rr{"hello", 123} + m3["go"] = rr{"world", 456} + doTestReflectResponse(t, m3, &m3r) + + // ------ slice test ------- + s1 := []string{"abc", "def", "hello", "world"} + var s1r []string + doTestReflectResponse(t, s1, &s1r) + + s2 := []rr{rr{"dubbo", 666}, rr{"go", 999}} + var s2r []rr + doTestReflectResponse(t, s2, &s2r) + + s3 := []interface{}{rr{"dubbo", 666}, 123, "hello"} + var s3r []interface{} + doTestReflectResponse(t, s3, &s3r) + + // ------ interface test ------- + in1 := []interface{}{rr{"dubbo", 666}, 123, "hello"} + var inr1 *interface{} + doTestReflectResponse(t, in1, reflect.New(reflect.TypeOf(inr1).Elem()).Interface()) + + in2 := make(map[string]rr) + var inr2 map[string]rr + m3["dubbo"] = rr{"hello", 123} + m3["go"] = rr{"world", 456} + doTestReflectResponse(t, in2, &inr2) +} + +// separately test copy normal map to map[interface{}]interface{} +func TestCopyMap(t *testing.T) { + type rr struct { + Name string + Num int + } + + m3 := make(map[string]rr) + var m3r map[interface{}]interface{} + r1 := rr{"hello", 123} + r2 := rr{"world", 456} + m3["dubbo"] = r1 + m3["go"] = r2 + + err := ReflectResponse(m3, &m3r) + if err != nil { + t.Error(err) + t.FailNow() + } + + assert.Equal(t, 2, len(m3r)) + + rr1, ok := m3r["dubbo"] + assert.True(t, ok) + assert.True(t, reflect.DeepEqual(r1, rr1)) + + rr2, ok := m3r["go"] + assert.True(t, ok) + assert.True(t, reflect.DeepEqual(r2, rr2)) +} + +// separately test copy normal slice to []interface{} +func TestCopySlice(t *testing.T) { + type rr struct { + Name string + Num int + } + + r1 := rr{"hello", 123} + r2 := rr{"world", 456} + + s1 := []rr{r1, r2} + var s1r []interface{} + + err := ReflectResponse(s1, &s1r) + if err != nil { + t.Error(err) + t.FailNow() + } + + assert.Equal(t, 2, len(s1r)) + assert.True(t, reflect.DeepEqual(r1, s1r[0])) + assert.True(t, reflect.DeepEqual(r2, s1r[1])) +} + +func TestIsSupportResponseAttachment(t *testing.T) { + is := isSupportResponseAttachment("2.X") + assert.False(t, is) + + is = isSupportResponseAttachment("2.0.10") + assert.False(t, is) + + is = isSupportResponseAttachment("2.5.3") + assert.False(t, is) + + is = isSupportResponseAttachment("2.6.2") + assert.False(t, is) + + is = isSupportResponseAttachment("1.5.5") + assert.False(t, is) + + is = isSupportResponseAttachment("0.0.0") + assert.False(t, is) + + is = isSupportResponseAttachment("2.0.2") + assert.True(t, is) + + is = isSupportResponseAttachment("2.7.2") + assert.True(t, is) +} + +func TestVersion2Int(t *testing.T) { + v := version2Int("2.1.3") + assert.Equal(t, 2010300, v) + + v = version2Int("22.11.33") + assert.Equal(t, 22113300, v) + + v = version2Int("222.111.333") + assert.Equal(t, 223143300, v) + + v = version2Int("220.110.333") + assert.Equal(t, 221133300, v) + + v = version2Int("229.119.333") + assert.Equal(t, 230223300, v) + + v = version2Int("2222.1111.3333") + assert.Equal(t, 2233443300, v) + + v = version2Int("2.11") + assert.Equal(t, 211, v) + + v = version2Int("2.1.3.4") + assert.Equal(t, 2010304, v) + + v = version2Int("2.1.3.4.5") + assert.Equal(t, 201030405, v) + +} diff --git a/protocol/dubbo/opentracing.go b/protocol/dubbo/opentracing.go new file mode 100644 index 0000000000000000000000000000000000000000..2dcd6a4d0d9f491ba6d51ea7a3ba96812a6f9e08 --- /dev/null +++ b/protocol/dubbo/opentracing.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +import ( + "github.com/opentracing/opentracing-go" +) +import ( + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" +) + +func injectTraceCtx(currentSpan opentracing.Span, inv *invocation_impl.RPCInvocation) error { + // inject opentracing ctx + traceAttachments := filterContext(inv.Attachments()) + carrier := opentracing.TextMapCarrier(traceAttachments) + err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier) + if err == nil { + fillTraceAttachments(inv.Attachments(), traceAttachments) + } + return err +} + +func extractTraceCtx(inv *invocation_impl.RPCInvocation) (opentracing.SpanContext, error) { + traceAttachments := filterContext(inv.Attachments()) + // actually, if user do not use any opentracing framework, the err will not be nil. + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, + opentracing.TextMapCarrier(traceAttachments)) + return spanCtx, err +} + +func filterContext(attachments map[string]interface{}) map[string]string { + var traceAttchment = make(map[string]string) + for k, v := range attachments { + if r, ok := v.(string); ok { + traceAttchment[k] = r + } + } + return traceAttchment +} + +func fillTraceAttachments(attachments map[string]interface{}, traceAttachment map[string]string) { + for k, v := range traceAttachment { + attachments[k] = v + } +} diff --git a/protocol/invocation.go b/protocol/invocation.go index 296ec0540c1eed69c30e1b1477be038a4a9cc00e..452f619e2dd9a5835141d91d7adfed37bb9f6859 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -34,15 +34,16 @@ type Invocation interface { // Reply gets response of request Reply() interface{} // Attachments gets all attachments - Attachments() map[string]string - // AttachmentsByKey gets attachment by key , if nil then return default value + Attachments() map[string]interface{} + // AttachmentsByKey gets attachment by key , if nil then return default value. (It will be deprecated in the future) AttachmentsByKey(string, string) string + Attachment(string) interface{} // Attributes refers to dubbo 2.7.6. It is different from attachment. It is used in internal process. Attributes() map[string]interface{} // AttributeByKey gets attribute by key , if nil then return default value AttributeByKey(string, interface{}) interface{} // SetAttachments sets attribute by @key and @value. - SetAttachments(key string, value string) + SetAttachments(key string, value interface{}) // Invoker gets the invoker in current context. Invoker() Invoker } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 103bf71f74ade0e81849a55a2bf81e347db161ca..4e806324bf7b236d80a932a92898ba117fb1638d 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -41,7 +41,7 @@ type RPCInvocation struct { arguments []interface{} reply interface{} callBack interface{} - attachments map[string]string + attachments map[string]interface{} // Refer to dubbo 2.7.6. It is different from attachment. It is used in internal process. attributes map[string]interface{} invoker protocol.Invoker @@ -49,7 +49,7 @@ type RPCInvocation struct { } // NewRPCInvocation creates a RPC invocation. -func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation { +func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]interface{}) *RPCInvocation { return &RPCInvocation{ methodName: methodName, arguments: arguments, @@ -101,7 +101,7 @@ func (r *RPCInvocation) SetReply(reply interface{}) { } // Attachments gets all attachments of RPC. -func (r *RPCInvocation) Attachments() map[string]string { +func (r *RPCInvocation) Attachments() map[string]interface{} { return r.attachments } @@ -114,11 +114,25 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string } value, ok := r.attachments[key] if ok { - return value + return value.(string) } return defaultValue } +// Attachment returns the corresponding value from dubbo's attachment with the given key. +func (r *RPCInvocation) Attachment(key string) interface{} { + r.lock.RLock() + defer r.lock.RUnlock() + if r.attachments == nil { + return nil + } + value, ok := r.attachments[key] + if ok { + return value + } + return nil +} + // Attributes gets all attributes of RPC. func (r *RPCInvocation) Attributes() map[string]interface{} { return r.attributes @@ -136,11 +150,11 @@ func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) int } // SetAttachments sets attribute by @key and @value. -func (r *RPCInvocation) SetAttachments(key string, value string) { +func (r *RPCInvocation) SetAttachments(key string, value interface{}) { r.lock.Lock() defer r.lock.Unlock() if r.attachments == nil { - r.attachments = make(map[string]string) + r.attachments = make(map[string]interface{}) } r.attachments[key] = value } @@ -228,7 +242,7 @@ func WithCallBack(callBack interface{}) option { } // WithAttachments creates option with @attachments. -func WithAttachments(attachments map[string]string) option { +func WithAttachments(attachments map[string]interface{}) option { return func(invo *RPCInvocation) { invo.attachments = attachments } diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 29eba0223eb49f2cdf91d20ac023b87fa2717f31..9755a481fdba325887a1d16731cbbcb8e3943d8a 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -343,7 +343,7 @@ func serveRequest(ctx context.Context, header map[string]string, body []byte, co exporter, _ := jsonrpcProtocol.ExporterMap().Load(path) invoker := exporter.(*JsonrpcExporter).GetInvoker() if invoker != nil { - result := invoker.Invoke(ctx, invocation.NewRPCInvocation(methodName, args, map[string]string{ + result := invoker.Invoke(ctx, invocation.NewRPCInvocation(methodName, args, map[string]interface{}{ constant.PATH_KEY: path, constant.VERSION_KEY: codec.req.Version})) if err := result.Error(); err != nil { diff --git a/protocol/rest/server/rest_server.go b/protocol/rest/server/rest_server.go index fbd6fb7ad9dd81f043e4d45ee94a54e12ef89cdd..d9542bb8760cd2ddb7b839ebc6761dab75ae9c30 100644 --- a/protocol/rest/server/rest_server.go +++ b/protocol/rest/server/rest_server.go @@ -111,7 +111,7 @@ func GetRouteFunc(invoker protocol.Invoker, methodConfig *rest_config.RestMethod logger.Errorf("[Go Restful] WriteErrorString error:%v", err) } } - result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodConfig.MethodName, args, make(map[string]string))) + result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodConfig.MethodName, args, make(map[string]interface{}))) if result.Error() != nil { err = resp.WriteError(http.StatusInternalServerError, result.Error()) if err != nil { diff --git a/protocol/result.go b/protocol/result.go index 2a33be612fd1f319c8c46cbd480865d5564b189d..a36b16d1cc56557c2976df5550f5d9c01b88619b 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -28,13 +28,14 @@ type Result interface { // Result gets invoker result. Result() interface{} // SetAttachments replaces the existing attachments with the specified param. - SetAttachments(map[string]string) + SetAttachments(map[string]interface{}) // Attachments gets all attachments - Attachments() map[string]string + Attachments() map[string]interface{} + // AddAttachment adds the specified map to existing attachments in this instance. - AddAttachment(string, string) + AddAttachment(string, interface{}) // Attachment gets attachment by key with default value. - Attachment(string, string) string + Attachment(string, interface{}) interface{} } ///////////////////////////// @@ -43,7 +44,7 @@ type Result interface { // RPCResult is default RPC result. type RPCResult struct { - Attrs map[string]string + Attrs map[string]interface{} Err error Rest interface{} } @@ -69,22 +70,22 @@ func (r *RPCResult) Result() interface{} { } // SetAttachments replaces the existing attachments with the specified param. -func (r *RPCResult) SetAttachments(attr map[string]string) { +func (r *RPCResult) SetAttachments(attr map[string]interface{}) { r.Attrs = attr } // Attachments gets all attachments -func (r *RPCResult) Attachments() map[string]string { +func (r *RPCResult) Attachments() map[string]interface{} { return r.Attrs } // AddAttachment adds the specified map to existing attachments in this instance. -func (r *RPCResult) AddAttachment(key, value string) { +func (r *RPCResult) AddAttachment(key string, value interface{}) { r.Attrs[key] = value } // Attachment gets attachment by key with default value. -func (r *RPCResult) Attachment(key, defaultValue string) string { +func (r *RPCResult) Attachment(key string, defaultValue interface{}) interface{} { v, ok := r.Attrs[key] if !ok { v = defaultValue diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 60becfb34135470b0e69972c25a743f44efe19d5..978534ea8b98ddd76f68619155ff90fe5e031ac1 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -98,7 +98,10 @@ func (rpc *RPCStatus) GetSuccessiveRequestFailureCount() int32 { // GetURLStatus get URL RPC status. func GetURLStatus(url common.URL) *RPCStatus { - rpcStatus, _ := serviceStatistic.LoadOrStore(url.Key(), &RPCStatus{}) + rpcStatus, found := serviceStatistic.Load(url.Key()) + if !found { + rpcStatus, _ = serviceStatistic.LoadOrStore(url.Key(), &RPCStatus{}) + } return rpcStatus.(*RPCStatus) } @@ -107,15 +110,13 @@ func GetMethodStatus(url common.URL, methodName string) *RPCStatus { identifier := url.Key() methodMap, found := methodStatistics.Load(identifier) if !found { - methodMap = &sync.Map{} - methodStatistics.Store(identifier, methodMap) + methodMap, _ = methodStatistics.LoadOrStore(identifier, &sync.Map{}) } methodActive := methodMap.(*sync.Map) rpcStatus, found := methodActive.Load(methodName) if !found { - rpcStatus = &RPCStatus{} - methodActive.Store(methodName, rpcStatus) + rpcStatus, _ = methodActive.LoadOrStore(methodName, &RPCStatus{}) } status := rpcStatus.(*RPCStatus) diff --git a/remoting/getty/listener_test.go b/remoting/getty/listener_test.go index da460aec1430f8ecb2c2cb84fc6978b92ba9be58..3d1787a334c14f04fa7446f202481dfe82261c3d 100644 --- a/remoting/getty/listener_test.go +++ b/remoting/getty/listener_test.go @@ -36,7 +36,7 @@ import ( // test rebuild the ctx func TestRebuildCtx(t *testing.T) { opentracing.SetGlobalTracer(mocktracer.New()) - attach := make(map[string]string, 10) + attach := make(map[string]interface{}, 10) attach[constant.VERSION_KEY] = "1.0" attach[constant.GROUP_KEY] = "MyGroup" inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) @@ -48,8 +48,7 @@ func TestRebuildCtx(t *testing.T) { span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client") - opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, - opentracing.TextMapCarrier(inv.Attachments())) + injectTraceCtx(span, inv) // rebuild the context success inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) ctx = rebuildCtx(inv) diff --git a/test/integrate/dubbo/go-client/go.mod b/test/integrate/dubbo/go-client/go.mod index 4708eb1f0f48c10acc254880ecb6dad3a03529f2..8428a513adedd114b96168a2028bb8920c58956e 100644 --- a/test/integrate/dubbo/go-client/go.mod +++ b/test/integrate/dubbo/go-client/go.mod @@ -1,3 +1,5 @@ module github.com/apache/dubbo-go/test/integrate/dubbo/go-client +require github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44 + go 1.13 diff --git a/test/integrate/dubbo/go-server/go.mod b/test/integrate/dubbo/go-server/go.mod index 9e1162327de374fb131c2a0b89d1be3baa578a1b..6a56b189e16fced03130c7870302a7bef9d0e1b5 100644 --- a/test/integrate/dubbo/go-server/go.mod +++ b/test/integrate/dubbo/go-server/go.mod @@ -1,3 +1,5 @@ module github.com/apache/dubbo-go/test/integrate/dubbo/go-server +require github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44 + go 1.13