diff --git a/README.md b/README.md index fdc3062d6f086be14ad19b1f718f713c54fb48ed..694f0091533f644b58438c6487dc53348add6e60 100644 --- a/README.md +++ b/README.md @@ -98,11 +98,11 @@ You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubb ## Document -TODO +https://dubbogo.github.io/dubbo-go-website(**Improving**) ## Quick Start -[dubbogo-samples](https://github.com/dubbogo/dubbogo-samples) shows how to use dubbo-go. Please read the [dubbogo-samples/README.md](https://github.com/dubbogo/dubbogo-samples/blob/master/README.md) carefully to learn how to dispose the configuration and compile the program. +[dubbo-samples/golang](https://github.com/dubbogo/dubbo-samples) shows how to use dubbo-go. Please read the [dubbo-samples/golang/README.md](https://github.com/dubbogo/dubbo-samples/blob/master/golang/README.md) carefully to learn how to dispose the configuration and compile the program. ## Running unit tests diff --git a/README_CN.md b/README_CN.md index edabe64d376cd4d49f6ab4251db9387aca48bbf0..99b26c5357ddb0482faf5a95b5935b0d9603c40b 100644 --- a/README_CN.md +++ b/README_CN.md @@ -97,11 +97,11 @@ Apache License, Version 2.0 ## 鏂囨。 -TODO +https://dubbogo.github.io/dubbo-go-website(**瀹屽杽涓�**) ## 蹇€熷紑濮� ## -[dubbogo-samples](https://github.com/dubbogo/dubbogo-samples)杩欎釜椤圭洰鐨勪簨渚嬪睍绀轰簡濡備綍浣跨敤 dubbo-go 銆傝浠旂粏闃呰 [dubbogo-samples/README.md](https://github.com/dubbogo/dubbogo-samples/blob/master/README.md) 瀛︿範濡備綍澶勭悊閰嶇疆骞剁紪璇戠▼搴忋€� +[dubbo-samples/golang](https://github.com/dubbogo/dubbo-samples)杩欎釜椤圭洰鐨勪簨渚嬪睍绀轰簡濡備綍浣跨敤 dubbo-go 銆傝浠旂粏闃呰 [dubbo-samples/golang/README.md](https://github.com/dubbogo/dubbo-samples/blob/master/golang/README.md) 瀛︿範濡備綍澶勭悊閰嶇疆骞剁紪璇戠▼搴忋€� ## 杩愯鍗曟祴 diff --git a/common/constant/default.go b/common/constant/default.go index 6e0f8488783ebe66939436ca14670395e2719be7..992fc32748bb4fc7777cffecc9137663c681c3f7 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -46,7 +46,7 @@ const ( const ( DEFAULT_KEY = "default" PREFIX_DEFAULT_KEY = "default." - DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute,pshutdown" + DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,generic_service,execute,pshutdown" DEFAULT_REFERENCE_FILTERS = "cshutdown" GENERIC_REFERENCE_FILTERS = "generic" GENERIC = "$invoke" diff --git a/common/extension/filter.go b/common/extension/filter.go index 93f7f8cf7ccc4108fe1120b685fad36a2f9f83df..0b5c4b40aa78f7ea489a306f4a52efbb07243b41 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -19,12 +19,11 @@ package extension import ( "github.com/apache/dubbo-go/filter" - "github.com/apache/dubbo-go/filter/common" ) var ( filters = make(map[string]func() filter.Filter) - rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler) + rejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler) ) func SetFilter(name string, v func() filter.Filter) { @@ -38,11 +37,11 @@ func GetFilter(name string) filter.Filter { return filters[name]() } -func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) { +func SetRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { rejectedExecutionHandler[name] = creator } -func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler { +func GetRejectedExecutionHandler(name string) filter.RejectedExecutionHandler { creator, ok := rejectedExecutionHandler[name] if !ok { panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index 151c33ad5e64ffa4059489e2cbcfae6f2e823328..8c131fafa3159047d25b43ae0a57d674418a2170 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -18,19 +18,19 @@ package extension import ( - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) var ( - tpsLimitStrategy = make(map[string]tps.TpsLimitStrategyCreator) - tpsLimiter = make(map[string]func() tps.TpsLimiter) + tpsLimitStrategy = make(map[string]filter.TpsLimitStrategyCreator) + tpsLimiter = make(map[string]func() filter.TpsLimiter) ) -func SetTpsLimiter(name string, creator func() tps.TpsLimiter) { +func SetTpsLimiter(name string, creator func() filter.TpsLimiter) { tpsLimiter[name] = creator } -func GetTpsLimiter(name string) tps.TpsLimiter { +func GetTpsLimiter(name string) filter.TpsLimiter { creator, ok := tpsLimiter[name] if !ok { panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " + @@ -39,11 +39,11 @@ func GetTpsLimiter(name string) tps.TpsLimiter { return creator() } -func SetTpsLimitStrategy(name string, creator tps.TpsLimitStrategyCreator) { +func SetTpsLimitStrategy(name string, creator filter.TpsLimitStrategyCreator) { tpsLimitStrategy[name] = creator } -func GetTpsLimitStrategyCreator(name string) tps.TpsLimitStrategyCreator { +func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator { creator, ok := tpsLimitStrategy[name] if !ok { panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " + diff --git a/filter/common/impl/rejected_execution_handler_mock.go b/filter/common/rejected_execution_handler_mock.go similarity index 99% rename from filter/common/impl/rejected_execution_handler_mock.go rename to filter/common/rejected_execution_handler_mock.go index dace1894668d3a4a154a87bfbdbcc860a97a11ec..a5631af9f7600cae772437877b1ac9139655cc5f 100644 --- a/filter/common/impl/rejected_execution_handler_mock.go +++ b/filter/common/rejected_execution_handler_mock.go @@ -18,7 +18,7 @@ // Source: rejected_execution_handler.go // Package filter is a generated GoMock package. -package impl +package common import ( reflect "reflect" diff --git a/filter/common/impl/rejected_execution_handler_only_log.go b/filter/common/rejected_execution_handler_only_log.go similarity index 93% rename from filter/common/impl/rejected_execution_handler_only_log.go rename to filter/common/rejected_execution_handler_only_log.go index 8943433af1ba908fc834740163df78e3b2b6443a..65abe677f129fa2fbe412c7e3ea2b23de2f1ade6 100644 --- a/filter/common/impl/rejected_execution_handler_only_log.go +++ b/filter/common/rejected_execution_handler_only_log.go @@ -15,9 +15,10 @@ * limitations under the License. */ -package impl +package common import ( + "github.com/apache/dubbo-go/filter" "sync" ) @@ -26,7 +27,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - filterCommon "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) @@ -61,7 +61,7 @@ func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL return &protocol.RPCResult{} } -func GetOnlyLogRejectedExecutionHandler() filterCommon.RejectedExecutionHandler { +func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler { onlyLogHandlerOnce.Do(func() { onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} }) diff --git a/filter/common/impl/rejected_execution_handler_only_log_test.go b/filter/common/rejected_execution_handler_only_log_test.go similarity index 98% rename from filter/common/impl/rejected_execution_handler_only_log_test.go rename to filter/common/rejected_execution_handler_only_log_test.go index da54d8a106338dd4f21f9b01e66b031e3c311e01..0efc3d813771577d38fd5e7989255fc097b49a08 100644 --- a/filter/common/impl/rejected_execution_handler_only_log_test.go +++ b/filter/common/rejected_execution_handler_only_log_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package common import ( "net/url" diff --git a/filter/impl/access_log_filter.go b/filter/filter/access_log_filter.go similarity index 99% rename from filter/impl/access_log_filter.go rename to filter/filter/access_log_filter.go index 89fa34952f99057f1d8bb35794a57f9905f5f169..cce2c5050fcbc60bc45b7bc2751685a8d9677b76 100644 --- a/filter/impl/access_log_filter.go +++ b/filter/filter/access_log_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "os" diff --git a/filter/impl/access_log_filter_test.go b/filter/filter/access_log_filter_test.go similarity index 99% rename from filter/impl/access_log_filter_test.go rename to filter/filter/access_log_filter_test.go index 834d531f05f952c41abfe8e1c56c20c0285926b8..2c17021a9f17d3d99c48e5763087c0b03b490b93 100644 --- a/filter/impl/access_log_filter_test.go +++ b/filter/filter/access_log_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "context" diff --git a/filter/impl/active_filter.go b/filter/filter/active_filter.go similarity index 99% rename from filter/impl/active_filter.go rename to filter/filter/active_filter.go index 36a4e1a767ab7170ce8e5bebf2cfa4403f6ad4ff..e0f73c2b2facd53b23e491be2e5b123b5d33087d 100644 --- a/filter/impl/active_filter.go +++ b/filter/filter/active_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "github.com/apache/dubbo-go/common/extension" diff --git a/filter/impl/echo_filter.go b/filter/filter/echo_filter.go similarity index 99% rename from filter/impl/echo_filter.go rename to filter/filter/echo_filter.go index 18e42c8cb2b15acb27573c5e24f11a8b69e0d496..1515c0a99c77d7a4d9af93e141cbed9b529158d7 100644 --- a/filter/impl/echo_filter.go +++ b/filter/filter/echo_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "github.com/apache/dubbo-go/common/constant" diff --git a/filter/impl/echo_filter_test.go b/filter/filter/echo_filter_test.go similarity index 98% rename from filter/impl/echo_filter_test.go rename to filter/filter/echo_filter_test.go index e2e592974701ad18c5b01e884485c022ee2320b8..d57d54329f52955d283366f6edc1376a1a474bde 100644 --- a/filter/impl/echo_filter_test.go +++ b/filter/filter/echo_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "testing" diff --git a/filter/impl/execute_limit_filter.go b/filter/filter/execute_limit_filter.go similarity index 98% rename from filter/impl/execute_limit_filter.go rename to filter/filter/execute_limit_filter.go index 156af1b140283dd76c4867ca26e9b42ce8eb25c0..4b5ea7491c19a726f1d90b7588ac5a480cd38590 100644 --- a/filter/impl/execute_limit_filter.go +++ b/filter/filter/execute_limit_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "strconv" @@ -32,7 +32,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" - _ "github.com/apache/dubbo-go/filter/common/impl" + _ "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) diff --git a/filter/impl/execute_limit_filter_test.go b/filter/filter/execute_limit_filter_test.go similarity index 99% rename from filter/impl/execute_limit_filter_test.go rename to filter/filter/execute_limit_filter_test.go index 5d729c0e6a1205902856eccfa6aa96b0bee0e790..326b13677b157fbba2495caf5699c246d0d62879 100644 --- a/filter/impl/execute_limit_filter_test.go +++ b/filter/filter/execute_limit_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "net/url" diff --git a/filter/impl/generic_filter.go b/filter/filter/generic_filter.go similarity index 90% rename from filter/impl/generic_filter.go rename to filter/filter/generic_filter.go index 35aadb11a444bda56109e238b17267f71ec2606b..9fb26f15aec7027d46526ed61a46d088a0b6b5e8 100644 --- a/filter/impl/generic_filter.go +++ b/filter/filter/generic_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "reflect" @@ -47,22 +47,21 @@ type GenericFilter struct{} func (ef *GenericFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { oldArguments := invocation.Arguments() - var newParams []hessian.Object + if oldParams, ok := oldArguments[2].([]interface{}); ok { + newParams := make([]hessian.Object, 0, len(oldParams)) for i := range oldParams { newParams = append(newParams, hessian.Object(struct2MapAll(oldParams[i]))) } - } else { - return invoker.Invoke(invocation) - } - newArguments := []interface{}{ - oldArguments[0], - oldArguments[1], - newParams, + newArguments := []interface{}{ + oldArguments[0], + oldArguments[1], + newParams, + } + newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) + newInvocation.SetReply(invocation.Reply()) + return invoker.Invoke(newInvocation) } - newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) - newInvocation.SetReply(invocation.Reply()) - return invoker.Invoke(newInvocation) } return invoker.Invoke(invocation) } diff --git a/filter/impl/generic_filter_test.go b/filter/filter/generic_filter_test.go similarity index 99% rename from filter/impl/generic_filter_test.go rename to filter/filter/generic_filter_test.go index 9797c40df1f57017241675013620a53320e475ad..d5298adbd404d7a525bf66ef248cf54b525a6647 100644 --- a/filter/impl/generic_filter_test.go +++ b/filter/filter/generic_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "reflect" diff --git a/filter/filter/generic_service_filter.go b/filter/filter/generic_service_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..514a51f0b0f4c5d16109b97f74d1095e1842f658 --- /dev/null +++ b/filter/filter/generic_service_filter.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "reflect" + "strings" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/mitchellh/mapstructure" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" + invocation2 "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + GENERIC_SERVICE = "generic_service" + GENERIC_SERIALIZATION_DEFAULT = "true" +) + +func init() { + extension.SetFilter(GENERIC_SERVICE, GetGenericServiceFilter) +} + +type GenericServiceFilter struct{} + +func (ef *GenericServiceFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + logger.Infof("invoking generic service filter.") + logger.Debugf("generic service filter methodName:%v,args:%v", invocation.MethodName(), len(invocation.Arguments())) + + if invocation.MethodName() != constant.GENERIC || len(invocation.Arguments()) != 3 { + return invoker.Invoke(invocation) + } + + var ( + ok bool + err error + methodName string + newParams []interface{} + genericKey string + argsType []reflect.Type + oldParams []hessian.Object + ) + + url := invoker.GetUrl() + methodName = invocation.Arguments()[0].(string) + // get service + svc := common.ServiceMap.GetService(url.Protocol, strings.TrimPrefix(url.Path, "/")) + // get method + method := svc.Method()[methodName] + if method == nil { + logger.Errorf("[Generic Service Filter] Don't have this method: %s", methodName) + return &protocol.RPCResult{} + } + argsType = method.ArgsType() + genericKey = invocation.AttachmentsByKey(constant.GENERIC_KEY, GENERIC_SERIALIZATION_DEFAULT) + if genericKey == GENERIC_SERIALIZATION_DEFAULT { + oldParams, ok = invocation.Arguments()[2].([]hessian.Object) + } else { + logger.Errorf("[Generic Service Filter] Don't support this generic: %s", genericKey) + return &protocol.RPCResult{} + } + if !ok { + logger.Errorf("[Generic Service Filter] wrong serialization") + return &protocol.RPCResult{} + } + if len(oldParams) != len(argsType) { + logger.Errorf("[Generic Service Filter] method:%s invocation arguments number was wrong", methodName) + return &protocol.RPCResult{} + } + // oldParams convert to newParams + newParams = make([]interface{}, len(oldParams)) + for i := range argsType { + newParam := reflect.New(argsType[i]).Interface() + err = mapstructure.Decode(oldParams[i], newParam) + newParam = reflect.ValueOf(newParam).Elem().Interface() + if err != nil { + logger.Errorf("[Generic Service Filter] decode arguments map to struct wrong: error{%v}", perrors.WithStack(err)) + return &protocol.RPCResult{} + } + newParams[i] = newParam + } + newInvocation := invocation2.NewRPCInvocation(methodName, newParams, invocation.Attachments()) + newInvocation.SetReply(invocation.Reply()) + return invoker.Invoke(newInvocation) +} + +func (ef *GenericServiceFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 && result.Result() != nil { + v := reflect.ValueOf(result.Result()) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + result.SetResult(struct2MapAll(v.Interface())) + } + return result +} + +func GetGenericServiceFilter() filter.Filter { + return &GenericServiceFilter{} +} diff --git a/filter/filter/generic_service_filter_test.go b/filter/filter/generic_service_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..599a6a66d07ee0ed95545680ccb195f1a2fdeb68 --- /dev/null +++ b/filter/filter/generic_service_filter_test.go @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "context" + "errors" + "reflect" + "testing" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +type TestStruct struct { + AaAa string + BaBa string `m:"baBa"` + XxYy struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + } `m:"xxYy"` +} + +func (c *TestStruct) JavaClassName() string { + return "com.test.testStruct" +} + +type TestService struct { +} + +func (ts *TestService) MethodOne(ctx context.Context, test1 *TestStruct, test2 []TestStruct, + test3 interface{}, test4 []interface{}, test5 *string) (*TestStruct, error) { + if test1 == nil { + return nil, errors.New("param test1 is nil") + } + if test2 == nil { + return nil, errors.New("param test2 is nil") + } + if test3 == nil { + return nil, errors.New("param test3 is nil") + } + if test4 == nil { + return nil, errors.New("param test4 is nil") + } + if test5 == nil { + return nil, errors.New("param test5 is nil") + } + return &TestStruct{}, nil +} + +func (s *TestService) Reference() string { + return "com.test.Path" +} + +func TestGenericServiceFilter_Invoke(t *testing.T) { + hessian.RegisterPOJO(&TestStruct{}) + methodName := "$invoke" + m := make(map[string]interface{}) + m["AaAa"] = "nihao" + x := make(map[string]interface{}) + x["xxXX"] = "nihaoxxx" + m["XxYy"] = x + aurguments := []interface{}{ + "MethodOne", + nil, + []hessian.Object{ + hessian.Object(m), + hessian.Object(append(make([]map[string]interface{}, 1), m)), + hessian.Object("111"), + hessian.Object(append(make([]map[string]interface{}, 1), m)), + hessian.Object("222")}, + } + s := &TestService{} + _, _ = common.ServiceMap.Register("testprotocol", s) + rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) + filter := GetGenericServiceFilter() + url, _ := common.NewURL(context.Background(), "testprotocol://127.0.0.1:20000/com.test.Path") + result := filter.Invoke(&proxy_factory.ProxyInvoker{BaseInvoker: *protocol.NewBaseInvoker(url)}, rpcInvocation) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) +} + +func TestGenericServiceFilter_ResponseTestStruct(t *testing.T) { + ts := &TestStruct{ + AaAa: "aaa", + BaBa: "bbb", + XxYy: struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + }{}, + } + result := &protocol.RPCResult{ + Rest: ts, + } + aurguments := []interface{}{ + "MethodOne", + nil, + []hessian.Object{nil}, + } + filter := GetGenericServiceFilter() + methodName := "$invoke" + rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) + r := filter.OnResponse(result, nil, rpcInvocation) + assert.NotNil(t, r.Result()) + assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.Map) +} + +func TestGenericServiceFilter_ResponseString(t *testing.T) { + str := "111" + result := &protocol.RPCResult{ + Rest: str, + } + aurguments := []interface{}{ + "MethodOne", + nil, + []hessian.Object{nil}, + } + filter := GetGenericServiceFilter() + methodName := "$invoke" + rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) + r := filter.OnResponse(result, nil, rpcInvocation) + assert.NotNil(t, r.Result()) + assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.String) +} diff --git a/filter/impl/graceful_shutdown_filter.go b/filter/filter/graceful_shutdown_filter.go similarity index 95% rename from filter/impl/graceful_shutdown_filter.go rename to filter/filter/graceful_shutdown_filter.go index b912ea88e4ba4741b7d7fe36b8bbd3ba158abe63..c682c7ef79deef2e66178cf1c43ec87992e960dc 100644 --- a/filter/impl/graceful_shutdown_filter.go +++ b/filter/filter/graceful_shutdown_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "sync/atomic" @@ -27,7 +27,6 @@ import ( "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/filter" - "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) @@ -78,7 +77,7 @@ func (gf *gracefulShutdownFilter) rejectNewRequest() bool { return gf.shutdownConfig.RejectRequest } -func (gf *gracefulShutdownFilter) getRejectHandler() common.RejectedExecutionHandler { +func (gf *gracefulShutdownFilter) getRejectHandler() filter.RejectedExecutionHandler { handler := constant.DEFAULT_KEY if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 { handler = gf.shutdownConfig.RejectRequestHandler diff --git a/filter/impl/graceful_shutdown_filter_test.go b/filter/filter/graceful_shutdown_filter_test.go similarity index 89% rename from filter/impl/graceful_shutdown_filter_test.go rename to filter/filter/graceful_shutdown_filter_test.go index 21da167ea0f201ea357c51cab0ecb4f8ebec0957..af57cd4ec8db7a32abc9cdc09724035d8f67cd36 100644 --- a/filter/impl/graceful_shutdown_filter_test.go +++ b/filter/filter/graceful_shutdown_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "net/url" @@ -31,8 +31,8 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config" - filterCommon "github.com/apache/dubbo-go/filter/common" - "github.com/apache/dubbo-go/filter/common/impl" + "github.com/apache/dubbo-go/filter" + common2 "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -66,8 +66,8 @@ func TestGenericFilter_Invoke(t *testing.T) { assert.True(t, shutdownFilter.rejectNewRequest()) result = shutdownFilter.OnResponse(nil, protocol.NewBaseInvoker(*invokeUrl), invoc) - rejectHandler := &impl.OnlyLogRejectedExecutionHandler{} - extension.SetRejectedExecutionHandler("mock", func() filterCommon.RejectedExecutionHandler { + rejectHandler := &common2.OnlyLogRejectedExecutionHandler{} + extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler { return rejectHandler }) assert.True(t, providerConfig.ShutdownConfig.RequestsFinished) diff --git a/filter/impl/hystrix_filter.go b/filter/filter/hystrix_filter.go similarity index 99% rename from filter/impl/hystrix_filter.go rename to filter/filter/hystrix_filter.go index 3fd9f87168616b69d5ec72460767890d6956c154..a7c57b4dd6c4a50f8ff90c6e22ff27cc5ef6658e 100644 --- a/filter/impl/hystrix_filter.go +++ b/filter/filter/hystrix_filter.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package impl +package filter import ( "fmt" diff --git a/filter/impl/hystrix_filter_test.go b/filter/filter/hystrix_filter_test.go similarity index 99% rename from filter/impl/hystrix_filter_test.go rename to filter/filter/hystrix_filter_test.go index d3a5183ede25d8a325bb1c73020edddd2ffbc638..37432940300e558eee971448c5829b2d6c8f2696 100644 --- a/filter/impl/hystrix_filter_test.go +++ b/filter/filter/hystrix_filter_test.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package impl +package filter import ( "regexp" diff --git a/filter/impl/token_filter.go b/filter/filter/token_filter.go similarity index 99% rename from filter/impl/token_filter.go rename to filter/filter/token_filter.go index d10dff5b761d0fbe40ff3a14a93ee8962d000e02..07b80f3402dbd63243b1c48e2d98c80a1f704362 100644 --- a/filter/impl/token_filter.go +++ b/filter/filter/token_filter.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package impl +package filter import ( "strings" diff --git a/filter/impl/token_filter_test.go b/filter/filter/token_filter_test.go similarity index 99% rename from filter/impl/token_filter_test.go rename to filter/filter/token_filter_test.go index 1473f274037699260725ff9ebb1b3d1377efb326..4434865de7918e41720fdd74eace32e9483901b6 100644 --- a/filter/impl/token_filter_test.go +++ b/filter/filter/token_filter_test.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package impl +package filter import ( "net/url" diff --git a/filter/impl/tps_limit_filter.go b/filter/filter/tps_limit_filter.go similarity index 95% rename from filter/impl/tps_limit_filter.go rename to filter/filter/tps_limit_filter.go index 3cb7381c8616abd61fe2ac306b59694a92715dda..ccccec00d4741481534185eaab290fc717864bd8 100644 --- a/filter/impl/tps_limit_filter.go +++ b/filter/filter/tps_limit_filter.go @@ -15,15 +15,15 @@ * limitations under the License. */ -package impl +package filter import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" - _ "github.com/apache/dubbo-go/filter/common/impl" - _ "github.com/apache/dubbo-go/filter/impl/tps/impl" + _ "github.com/apache/dubbo-go/filter/common" + _ "github.com/apache/dubbo-go/filter/tps" "github.com/apache/dubbo-go/protocol" ) diff --git a/filter/impl/tps_limit_filter_test.go b/filter/filter/tps_limit_filter_test.go similarity index 84% rename from filter/impl/tps_limit_filter_test.go rename to filter/filter/tps_limit_filter_test.go index debdbd00dec97ed67d789bfc45103993c014ab4a..6acaab7036307299da18aefbccf011923df8e287 100644 --- a/filter/impl/tps_limit_filter_test.go +++ b/filter/filter/tps_limit_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "net/url" @@ -23,6 +23,9 @@ import ( ) import ( + "github.com/apache/dubbo-go/filter" + common2 "github.com/apache/dubbo-go/filter/common" + "github.com/apache/dubbo-go/filter/tps" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -31,10 +34,6 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - filterCommon "github.com/apache/dubbo-go/filter/common" - filterCommonImpl "github.com/apache/dubbo-go/filter/common/impl" - "github.com/apache/dubbo-go/filter/impl/tps" - "github.com/apache/dubbo-go/filter/impl/tps/impl" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -56,9 +55,9 @@ func TestTpsLimitFilter_Invoke_With_No_TpsLimiter(t *testing.T) { func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockLimiter := impl.NewMockTpsLimiter(ctrl) + mockLimiter := tps.NewMockTpsLimiter(ctrl) mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(true).Times(1) - extension.SetTpsLimiter(constant.DEFAULT_KEY, func() tps.TpsLimiter { + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter { return mockLimiter }) @@ -77,17 +76,17 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockLimiter := impl.NewMockTpsLimiter(ctrl) + mockLimiter := tps.NewMockTpsLimiter(ctrl) mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(false).Times(1) - extension.SetTpsLimiter(constant.DEFAULT_KEY, func() tps.TpsLimiter { + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter { return mockLimiter }) mockResult := &protocol.RPCResult{} - mockRejectedHandler := filterCommonImpl.NewMockRejectedExecutionHandler(ctrl) + mockRejectedHandler := common2.NewMockRejectedExecutionHandler(ctrl) mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1) - extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() filterCommon.RejectedExecutionHandler { + extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() filter.RejectedExecutionHandler { return mockRejectedHandler }) diff --git a/filter/common/rejected_execution_handler.go b/filter/rejected_execution_handler.go similarity index 98% rename from filter/common/rejected_execution_handler.go rename to filter/rejected_execution_handler.go index b993b8444c14c13ce9a8861c113dc02ca5fd335a..ce95b54b14d01e0aec6f6089799df8378b5bcca5 100644 --- a/filter/common/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package common +package filter import ( "github.com/apache/dubbo-go/common" diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go b/filter/tps/tps_limit_fix_window_strategy.go similarity index 96% rename from filter/impl/tps/impl/tps_limit_fix_window_strategy.go rename to filter/tps/tps_limit_fix_window_strategy.go index 285ecfa658cf838cc1140ba716bd72e1976b86fe..6ea5dc10333739848a96881b6dcf7e4bb54ccbe9 100644 --- a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go +++ b/filter/tps/tps_limit_fix_window_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "sync/atomic" @@ -25,7 +25,7 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) const ( @@ -79,7 +79,7 @@ func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { type fixedWindowStrategyCreator struct{} -func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { return &FixedWindowTpsLimitStrategyImpl{ rate: int32(rate), interval: int64(interval) * int64(time.Millisecond), // convert to ns diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go b/filter/tps/tps_limit_fix_window_strategy_test.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go rename to filter/tps/tps_limit_fix_window_strategy_test.go index 7ef539ed3b2b93da5c56a05f606e75282226d1ef..5eaf2f707dcc9dd6cf325988242623dd5161c1a8 100644 --- a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go +++ b/filter/tps/tps_limit_fix_window_strategy_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "testing" diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go b/filter/tps/tps_limit_sliding_window_strategy.go similarity index 96% rename from filter/impl/tps/impl/tps_limit_sliding_window_strategy.go rename to filter/tps/tps_limit_sliding_window_strategy.go index d1a5db6e259ffa63282065f881f6cc8360c8d25b..40ea2d14be91a948752455ad8e1a7e611354017a 100644 --- a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go +++ b/filter/tps/tps_limit_sliding_window_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "container/list" @@ -25,7 +25,7 @@ import ( import ( "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) func init() { @@ -82,7 +82,7 @@ func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { type slidingWindowStrategyCreator struct{} -func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { return &SlidingWindowTpsLimitStrategyImpl{ rate: rate, interval: int64(interval) * int64(time.Millisecond), diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go b/filter/tps/tps_limit_sliding_window_strategy_test.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go rename to filter/tps/tps_limit_sliding_window_strategy_test.go index 075f1d9d2be2d18edfee7dc8691b71da65f5da45..57342d1c443993c49c6124f0ef28dae5ebb203e8 100644 --- a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go +++ b/filter/tps/tps_limit_sliding_window_strategy_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "testing" diff --git a/filter/impl/tps/impl/tps_limit_strategy_mock.go b/filter/tps/tps_limit_strategy_mock.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_strategy_mock.go rename to filter/tps/tps_limit_strategy_mock.go index a653fb287a2d89d8c6151889ca14b4b7b4832505..72c658fb9a5d48b6080900a4645d318dfd2b0c21 100644 --- a/filter/impl/tps/impl/tps_limit_strategy_mock.go +++ b/filter/tps/tps_limit_strategy_mock.go @@ -18,7 +18,7 @@ // Source: tps_limit_strategy.go // Package filter is a generated GoMock package. -package impl +package tps import ( gomock "github.com/golang/mock/gomock" diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go b/filter/tps/tps_limit_thread_safe_fix_window_strategy.go similarity index 95% rename from filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go rename to filter/tps/tps_limit_thread_safe_fix_window_strategy.go index 9a1b21a3349845e32cb0fe38b07a7f932ec4f454..faec9b6ec1466e0c2c7d5df9ca0dd82a965494ec 100644 --- a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go +++ b/filter/tps/tps_limit_thread_safe_fix_window_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "sync" @@ -23,7 +23,7 @@ import ( import ( "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) func init() { @@ -62,7 +62,7 @@ type threadSafeFixedWindowStrategyCreator struct { fixedWindowStrategyCreator *fixedWindowStrategyCreator } -func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { fixedWindowStrategy := creator.fixedWindowStrategyCreator.Create(rate, interval).(*FixedWindowTpsLimitStrategyImpl) return &ThreadSafeFixedWindowTpsLimitStrategyImpl{ fixedWindow: fixedWindowStrategy, diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go b/filter/tps/tps_limit_thread_safe_fix_window_strategy_test.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go rename to filter/tps/tps_limit_thread_safe_fix_window_strategy_test.go index 129493962403e0028b09f9646054fda236c99ff7..90cd15201cd71aafcc50a1dfb801ece7a5dee26a 100644 --- a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go +++ b/filter/tps/tps_limit_thread_safe_fix_window_strategy_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "testing" diff --git a/filter/impl/tps/impl/tps_limiter_method_service.go b/filter/tps/tps_limiter_method_service.go similarity index 96% rename from filter/impl/tps/impl/tps_limiter_method_service.go rename to filter/tps/tps_limiter_method_service.go index 426ae5994867c5a09653641870ebcef531c0d43c..ac4498a33d195128ad89828f9696b90cbd2db082 100644 --- a/filter/impl/tps/impl/tps_limiter_method_service.go +++ b/filter/tps/tps_limiter_method_service.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package impl +package tps import ( "fmt" @@ -30,7 +30,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/protocol" ) @@ -127,7 +127,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio limitState, found := limiter.tpsState.Load(limitTarget) if found { - return limitState.(tps.TpsLimitStrategy).IsAllowable() + return limitState.(filter.TpsLimitStrategy).IsAllowable() } limitRate := getLimitConfig(methodLimitRateConfig, url, invocation, @@ -149,7 +149,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval))) - return limitState.(tps.TpsLimitStrategy).IsAllowable() + return limitState.(filter.TpsLimitStrategy).IsAllowable() } func getLimitConfig(methodLevelConfig string, @@ -178,7 +178,7 @@ func getLimitConfig(methodLevelConfig string, var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl var methodServiceTpsLimiterOnce sync.Once -func GetMethodServiceTpsLimiter() tps.TpsLimiter { +func GetMethodServiceTpsLimiter() filter.TpsLimiter { methodServiceTpsLimiterOnce.Do(func() { methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ tpsState: concurrent.NewMap(), diff --git a/filter/impl/tps/impl/tps_limiter_method_service_test.go b/filter/tps/tps_limiter_method_service_test.go similarity index 97% rename from filter/impl/tps/impl/tps_limiter_method_service_test.go rename to filter/tps/tps_limiter_method_service_test.go index e747d4682d0a8bdee03da6f012fb76b7bd1e02af..441224a3e35147b85c3553871dcaa1fefd09db04 100644 --- a/filter/impl/tps/impl/tps_limiter_method_service_test.go +++ b/filter/tps/tps_limiter_method_service_test.go @@ -15,13 +15,14 @@ * limitations under the License. */ -package impl +package tps import ( "net/url" "testing" ) import ( + "github.com/apache/dubbo-go/filter" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -30,7 +31,6 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -144,10 +144,10 @@ type mockStrategyCreator struct { rate int interval int t *testing.T - strategy tps.TpsLimitStrategy + strategy filter.TpsLimitStrategy } -func (creator *mockStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *mockStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { assert.Equal(creator.t, creator.rate, rate) assert.Equal(creator.t, creator.interval, interval) return creator.strategy diff --git a/filter/impl/tps/impl/tps_limiter_mock.go b/filter/tps/tps_limiter_mock.go similarity index 99% rename from filter/impl/tps/impl/tps_limiter_mock.go rename to filter/tps/tps_limiter_mock.go index acd3a15d18baf10838faf57e141afe1711f0aebb..463b0988acbeb17a967c9803337a61c4914bce42 100644 --- a/filter/impl/tps/impl/tps_limiter_mock.go +++ b/filter/tps/tps_limiter_mock.go @@ -18,7 +18,7 @@ // Source: tps_limiter.go // Package filter is a generated GoMock package. -package impl +package tps import ( reflect "reflect" diff --git a/filter/impl/tps/tps_limit_strategy.go b/filter/tps_limit_strategy.go similarity index 98% rename from filter/impl/tps/tps_limit_strategy.go rename to filter/tps_limit_strategy.go index c55f008a09b3743f728ab0506c6b0095cbfd181c..1051c3d96d37619e0e507cc845f144a45a9bb421 100644 --- a/filter/impl/tps/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package tps +package filter /* * please register your implementation by invoking SetTpsLimitStrategy diff --git a/filter/impl/tps/tps_limiter.go b/filter/tps_limiter.go similarity index 98% rename from filter/impl/tps/tps_limiter.go rename to filter/tps_limiter.go index 0622a957a8ba14fdebb52ff44ecf72da17703163..1d2b2341ac7d9b12f75d373909b0baa58bc7295f 100644 --- a/filter/impl/tps/tps_limiter.go +++ b/filter/tps_limiter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package tps +package filter import ( "github.com/apache/dubbo-go/common" diff --git a/go.mod b/go.mod index 88085488d2ae2535f2f1ba7d00f2388d16e1b965..4d65602d9dc40fc200e140fd4a376534a8ea6a73 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 758363117f1720a7fe89eb9745b415e506315db8..6b41d5e7d76d31ea23f08b77c841d0f87986bef7 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -89,11 +89,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { return perrors.Errorf("opts[0] is not of type *Client") } - pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) - if !ok { - return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + if p.Header.Type&hessian.PackageRequest != 0x00 { + // size of this array must be '7' + // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 + p.Body = make([]interface{}, 7) + } else { + pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) + if !ok { + return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } // read body diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index df9ab28e0e4b896b11b2345a83cae14401a70759..2e4b3999dfc08262a2cfb80f29c9a9e7bc2decf8 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -85,11 +85,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { } if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) - if p.Err != nil { - logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + if p.Header.Type&hessian.PackageResponse != 0x00 { + logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + if p.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + } + h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) + } else { + logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) + p.Header.ResponseStatus = hessian.Response_OK + reply(session, p, hessian.PackageHeartbeat) } - h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) return } logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) @@ -199,7 +205,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { // heartbeat if p.Header.Type&hessian.PackageHeartbeat != 0x00 { logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - h.reply(session, p, hessian.PackageHeartbeat) + reply(session, p, hessian.PackageHeartbeat) return } @@ -226,7 +232,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if !twoway { return } - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) } }() @@ -241,7 +247,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { logger.Errorf(err.Error()) p.Header.ResponseStatus = hessian.Response_OK p.Body = err - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) return } invoker := exporter.(protocol.Exporter).GetInvoker() @@ -266,7 +272,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if !twoway { return } - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) } func (h *RpcServerHandler) OnCron(session getty.Session) { @@ -294,7 +300,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { +func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ SerialID: req.Header.SerialID, diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 930382cca8bac6955b516a88e93ce26d73e235fe..e9dff1cfc77fb34ba75e604334d9c7ab5cfa36d7 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -62,8 +62,10 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface return nil, 0, perrors.WithStack(err) } - pkg.Err = pkg.Body.(*hessian.Response).Exception - pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + if pkg.Header.Type&hessian.PackageRequest == 0x00 { + pkg.Err = pkg.Body.(*hessian.Response).Exception + pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + } return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 2124a22f1611b24d7f4370de64b117c58c4f7e7b..bddd83b5db60cc3ccaa1ab0c43aaeec28e77855d 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -19,6 +19,7 @@ package invocation import ( "reflect" + "sync" ) import ( @@ -37,6 +38,7 @@ type RPCInvocation struct { callBack interface{} attachments map[string]string invoker protocol.Invoker + lock sync.RWMutex } func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation { @@ -80,6 +82,8 @@ func (r *RPCInvocation) Attachments() map[string]string { } func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string { + r.lock.RLock() + defer r.lock.RUnlock() if r.attachments == nil { return defaultValue } @@ -91,6 +95,8 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string } func (r *RPCInvocation) SetAttachments(key string, value string) { + r.lock.Lock() + defer r.lock.Unlock() if r.attachments == nil { r.attachments = make(map[string]string) }