Skip to content
Snippets Groups Projects
Unverified Commit 8394520d authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #291 from Patrick0308/generic-service

Ftr: Generic Implement
parents 01f84dc5 bd7989ad
No related branches found
No related tags found
No related merge requests found
Showing
with 308 additions and 36 deletions
...@@ -46,7 +46,7 @@ const ( ...@@ -46,7 +46,7 @@ const (
const ( const (
DEFAULT_KEY = "default" DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default." PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute,pshutdown" DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,generic_service,execute,pshutdown"
DEFAULT_REFERENCE_FILTERS = "cshutdown" DEFAULT_REFERENCE_FILTERS = "cshutdown"
GENERIC_REFERENCE_FILTERS = "generic" GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke" GENERIC = "$invoke"
......
...@@ -19,12 +19,11 @@ package extension ...@@ -19,12 +19,11 @@ package extension
import ( import (
"github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common"
) )
var ( var (
filters = make(map[string]func() filter.Filter) filters = make(map[string]func() filter.Filter)
rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler) rejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler)
) )
func SetFilter(name string, v func() filter.Filter) { func SetFilter(name string, v func() filter.Filter) {
...@@ -38,11 +37,11 @@ func GetFilter(name string) filter.Filter { ...@@ -38,11 +37,11 @@ func GetFilter(name string) filter.Filter {
return filters[name]() return filters[name]()
} }
func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) { func SetRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) {
rejectedExecutionHandler[name] = creator rejectedExecutionHandler[name] = creator
} }
func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler { func GetRejectedExecutionHandler(name string) filter.RejectedExecutionHandler {
creator, ok := rejectedExecutionHandler[name] creator, ok := rejectedExecutionHandler[name]
if !ok { if !ok {
panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
......
...@@ -18,19 +18,19 @@ ...@@ -18,19 +18,19 @@
package extension package extension
import ( import (
"github.com/apache/dubbo-go/filter/impl/tps" "github.com/apache/dubbo-go/filter"
) )
var ( var (
tpsLimitStrategy = make(map[string]tps.TpsLimitStrategyCreator) tpsLimitStrategy = make(map[string]filter.TpsLimitStrategyCreator)
tpsLimiter = make(map[string]func() tps.TpsLimiter) tpsLimiter = make(map[string]func() filter.TpsLimiter)
) )
func SetTpsLimiter(name string, creator func() tps.TpsLimiter) { func SetTpsLimiter(name string, creator func() filter.TpsLimiter) {
tpsLimiter[name] = creator tpsLimiter[name] = creator
} }
func GetTpsLimiter(name string) tps.TpsLimiter { func GetTpsLimiter(name string) filter.TpsLimiter {
creator, ok := tpsLimiter[name] creator, ok := tpsLimiter[name]
if !ok { if !ok {
panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " + panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " +
...@@ -39,11 +39,11 @@ func GetTpsLimiter(name string) tps.TpsLimiter { ...@@ -39,11 +39,11 @@ func GetTpsLimiter(name string) tps.TpsLimiter {
return creator() return creator()
} }
func SetTpsLimitStrategy(name string, creator tps.TpsLimitStrategyCreator) { func SetTpsLimitStrategy(name string, creator filter.TpsLimitStrategyCreator) {
tpsLimitStrategy[name] = creator tpsLimitStrategy[name] = creator
} }
func GetTpsLimitStrategyCreator(name string) tps.TpsLimitStrategyCreator { func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator {
creator, ok := tpsLimitStrategy[name] creator, ok := tpsLimitStrategy[name]
if !ok { if !ok {
panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " + panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " +
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
// Source: rejected_execution_handler.go // Source: rejected_execution_handler.go
// Package filter is a generated GoMock package. // Package filter is a generated GoMock package.
package impl package common
import ( import (
reflect "reflect" reflect "reflect"
......
...@@ -15,9 +15,10 @@ ...@@ -15,9 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package common
import ( import (
"github.com/apache/dubbo-go/filter"
"sync" "sync"
) )
...@@ -26,7 +27,6 @@ import ( ...@@ -26,7 +27,6 @@ import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
filterCommon "github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
...@@ -61,7 +61,7 @@ func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL ...@@ -61,7 +61,7 @@ func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL
return &protocol.RPCResult{} return &protocol.RPCResult{}
} }
func GetOnlyLogRejectedExecutionHandler() filterCommon.RejectedExecutionHandler { func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() { onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
}) })
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package common
import ( import (
"net/url" "net/url"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"os" "os"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"context" "context"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"testing" "testing"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"strconv" "strconv"
...@@ -32,7 +32,7 @@ import ( ...@@ -32,7 +32,7 @@ import (
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/filter"
_ "github.com/apache/dubbo-go/filter/common/impl" _ "github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"net/url" "net/url"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"reflect" "reflect"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"reflect" "reflect"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter
import (
"reflect"
"strings"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/mitchellh/mapstructure"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
invocation2 "github.com/apache/dubbo-go/protocol/invocation"
)
const (
GENERIC_SERVICE = "generic_service"
GENERIC_SERIALIZATION_DEFAULT = "true"
)
func init() {
extension.SetFilter(GENERIC_SERVICE, GetGenericServiceFilter)
}
type GenericServiceFilter struct{}
func (ef *GenericServiceFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking generic service filter.")
logger.Debugf("generic service filter methodName:%v,args:%v", invocation.MethodName(), len(invocation.Arguments()))
if invocation.MethodName() != constant.GENERIC || len(invocation.Arguments()) != 3 {
return invoker.Invoke(invocation)
}
var (
ok bool
err error
methodName string
newParams []interface{}
genericKey string
argsType []reflect.Type
oldParams []hessian.Object
)
url := invoker.GetUrl()
methodName = invocation.Arguments()[0].(string)
// get service
svc := common.ServiceMap.GetService(url.Protocol, strings.TrimPrefix(url.Path, "/"))
// get method
method := svc.Method()[methodName]
if method == nil {
logger.Errorf("[Generic Service Filter] Don't have this method: %s", methodName)
return &protocol.RPCResult{}
}
argsType = method.ArgsType()
genericKey = invocation.AttachmentsByKey(constant.GENERIC_KEY, GENERIC_SERIALIZATION_DEFAULT)
if genericKey == GENERIC_SERIALIZATION_DEFAULT {
oldParams, ok = invocation.Arguments()[2].([]hessian.Object)
} else {
logger.Errorf("[Generic Service Filter] Don't support this generic: %s", genericKey)
return &protocol.RPCResult{}
}
if !ok {
logger.Errorf("[Generic Service Filter] wrong serialization")
return &protocol.RPCResult{}
}
if len(oldParams) != len(argsType) {
logger.Errorf("[Generic Service Filter] method:%s invocation arguments number was wrong", methodName)
return &protocol.RPCResult{}
}
// oldParams convert to newParams
newParams = make([]interface{}, len(oldParams))
for i := range argsType {
newParam := reflect.New(argsType[i]).Interface()
err = mapstructure.Decode(oldParams[i], newParam)
newParam = reflect.ValueOf(newParam).Elem().Interface()
if err != nil {
logger.Errorf("[Generic Service Filter] decode arguments map to struct wrong: error{%v}", perrors.WithStack(err))
return &protocol.RPCResult{}
}
newParams[i] = newParam
}
newInvocation := invocation2.NewRPCInvocation(methodName, newParams, invocation.Attachments())
newInvocation.SetReply(invocation.Reply())
return invoker.Invoke(newInvocation)
}
func (ef *GenericServiceFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 && result.Result() != nil {
v := reflect.ValueOf(result.Result())
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
result.SetResult(struct2MapAll(v.Interface()))
}
return result
}
func GetGenericServiceFilter() filter.Filter {
return &GenericServiceFilter{}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter
import (
"context"
"errors"
"reflect"
"testing"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
type TestStruct struct {
AaAa string
BaBa string `m:"baBa"`
XxYy struct {
xxXx string `m:"xxXx"`
Xx string `m:"xx"`
} `m:"xxYy"`
}
func (c *TestStruct) JavaClassName() string {
return "com.test.testStruct"
}
type TestService struct {
}
func (ts *TestService) MethodOne(ctx context.Context, test1 *TestStruct, test2 []TestStruct,
test3 interface{}, test4 []interface{}, test5 *string) (*TestStruct, error) {
if test1 == nil {
return nil, errors.New("param test1 is nil")
}
if test2 == nil {
return nil, errors.New("param test2 is nil")
}
if test3 == nil {
return nil, errors.New("param test3 is nil")
}
if test4 == nil {
return nil, errors.New("param test4 is nil")
}
if test5 == nil {
return nil, errors.New("param test5 is nil")
}
return &TestStruct{}, nil
}
func (s *TestService) Reference() string {
return "com.test.Path"
}
func TestGenericServiceFilter_Invoke(t *testing.T) {
hessian.RegisterPOJO(&TestStruct{})
methodName := "$invoke"
m := make(map[string]interface{})
m["AaAa"] = "nihao"
x := make(map[string]interface{})
x["xxXX"] = "nihaoxxx"
m["XxYy"] = x
aurguments := []interface{}{
"MethodOne",
nil,
[]hessian.Object{
hessian.Object(m),
hessian.Object(append(make([]map[string]interface{}, 1), m)),
hessian.Object("111"),
hessian.Object(append(make([]map[string]interface{}, 1), m)),
hessian.Object("222")},
}
s := &TestService{}
_, _ = common.ServiceMap.Register("testprotocol", s)
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
filter := GetGenericServiceFilter()
url, _ := common.NewURL(context.Background(), "testprotocol://127.0.0.1:20000/com.test.Path")
result := filter.Invoke(&proxy_factory.ProxyInvoker{BaseInvoker: *protocol.NewBaseInvoker(url)}, rpcInvocation)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
}
func TestGenericServiceFilter_ResponseTestStruct(t *testing.T) {
ts := &TestStruct{
AaAa: "aaa",
BaBa: "bbb",
XxYy: struct {
xxXx string `m:"xxXx"`
Xx string `m:"xx"`
}{},
}
result := &protocol.RPCResult{
Rest: ts,
}
aurguments := []interface{}{
"MethodOne",
nil,
[]hessian.Object{nil},
}
filter := GetGenericServiceFilter()
methodName := "$invoke"
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
r := filter.OnResponse(result, nil, rpcInvocation)
assert.NotNil(t, r.Result())
assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.Map)
}
func TestGenericServiceFilter_ResponseString(t *testing.T) {
str := "111"
result := &protocol.RPCResult{
Rest: str,
}
aurguments := []interface{}{
"MethodOne",
nil,
[]hessian.Object{nil},
}
filter := GetGenericServiceFilter()
methodName := "$invoke"
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
r := filter.OnResponse(result, nil, rpcInvocation)
assert.NotNil(t, r.Result())
assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.String)
}
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"sync/atomic" "sync/atomic"
...@@ -27,7 +27,6 @@ import ( ...@@ -27,7 +27,6 @@ import (
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
...@@ -78,7 +77,7 @@ func (gf *gracefulShutdownFilter) rejectNewRequest() bool { ...@@ -78,7 +77,7 @@ func (gf *gracefulShutdownFilter) rejectNewRequest() bool {
return gf.shutdownConfig.RejectRequest return gf.shutdownConfig.RejectRequest
} }
func (gf *gracefulShutdownFilter) getRejectHandler() common.RejectedExecutionHandler { func (gf *gracefulShutdownFilter) getRejectHandler() filter.RejectedExecutionHandler {
handler := constant.DEFAULT_KEY handler := constant.DEFAULT_KEY
if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 { if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 {
handler = gf.shutdownConfig.RejectRequestHandler handler = gf.shutdownConfig.RejectRequestHandler
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"net/url" "net/url"
...@@ -31,8 +31,8 @@ import ( ...@@ -31,8 +31,8 @@ import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config"
filterCommon "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common/impl" common2 "github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/invocation"
) )
...@@ -66,8 +66,8 @@ func TestGenericFilter_Invoke(t *testing.T) { ...@@ -66,8 +66,8 @@ func TestGenericFilter_Invoke(t *testing.T) {
assert.True(t, shutdownFilter.rejectNewRequest()) assert.True(t, shutdownFilter.rejectNewRequest())
result = shutdownFilter.OnResponse(nil, protocol.NewBaseInvoker(*invokeUrl), invoc) result = shutdownFilter.OnResponse(nil, protocol.NewBaseInvoker(*invokeUrl), invoc)
rejectHandler := &impl.OnlyLogRejectedExecutionHandler{} rejectHandler := &common2.OnlyLogRejectedExecutionHandler{}
extension.SetRejectedExecutionHandler("mock", func() filterCommon.RejectedExecutionHandler { extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler {
return rejectHandler return rejectHandler
}) })
assert.True(t, providerConfig.ShutdownConfig.RequestsFinished) assert.True(t, providerConfig.ShutdownConfig.RequestsFinished)
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package impl package filter
import ( import (
"fmt" "fmt"
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment