diff --git a/README.md b/README.md index 906c08fc05d4398bcee8edfc1e4afa67cc3f01e4..7460d8ebe20187e6c686bc4dd13ea92c30b2cff5 100644 --- a/README.md +++ b/README.md @@ -182,7 +182,7 @@ If you are willing to do some code contributions and document contributions to [ ## Community -If u want to communicate with our community, pls scan the following dubbobo Ding-Ding QR code or search our commnity DingDing group code 31363295. +If u want to communicate with our community, pls scan the following dubbobo DingDing QR code or search our commnity DingDing group code 31363295. <div> <table> diff --git a/common/constant/key.go b/common/constant/key.go index 54b7be58b63f6a71c27f361cec138fbb9e57c1fa..1cdd258834eb8190c67bdf41f13c9c9a226f51db 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -51,6 +51,8 @@ const ( PATH_SEPARATOR = "/" // DUBBO_KEY = "dubbo" SSL_ENABLED_KEY = "ssl-enabled" + // PARAMS_TYPE_Key key used in pass through invoker factory, to define param type + PARAMS_TYPE_Key = "parameter-type-names" ) const ( @@ -200,6 +202,8 @@ const ( // default deregister critical server after DEFAULT_DEREGISTER_TIME = "20s" DEREGISTER_AFTER = "consul-deregister-critical-service-after" + // PassThroughProxyFactoryKey is key of proxy factory with raw data input service + PassThroughProxyFactoryKey = "dubbo-raw" ) const ( diff --git a/common/proxy/proxy_factory/pass_through.go b/common/proxy/proxy_factory/pass_through.go new file mode 100644 index 0000000000000000000000000000000000000000..02fb265a0553c37a3c9fb065e4808f1e985c8b01 --- /dev/null +++ b/common/proxy/proxy_factory/pass_through.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package proxy_factory + +import ( + "context" + "reflect" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/proxy" + "github.com/apache/dubbo-go/protocol" +) + +func init() { + extension.SetProxyFactory(constant.PassThroughProxyFactoryKey, NewPassThroughProxyFactory) +} + +// PassThroughProxyFactory is the factory of PassThroughProxyInvoker +type PassThroughProxyFactory struct { +} + +// NewPassThroughProxyFactory returns a proxy factory instance +func NewPassThroughProxyFactory(_ ...proxy.Option) proxy.ProxyFactory { + return &PassThroughProxyFactory{} +} + +// GetProxy gets a proxy +func (factory *PassThroughProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { + return factory.GetAsyncProxy(invoker, nil, url) +} + +// GetAsyncProxy gets a async proxy +func (factory *PassThroughProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { + //create proxy + attachments := map[string]string{} + attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") + return proxy.NewProxy(invoker, callBack, attachments) +} + +// GetInvoker gets a invoker +func (factory *PassThroughProxyFactory) GetInvoker(url *common.URL) protocol.Invoker { + return &PassThroughProxyInvoker{ + ProxyInvoker: &ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }, + } +} + +// PassThroughProxyInvoker is a invoker struct, it calls service with specific method 'Serivce' and params: +// Service(method string, argsTypes []string, args [][]byte, attachment map[string]interface{}) +// PassThroughProxyInvoker pass through raw invocation data and method name to service, which will deal with them. +type PassThroughProxyInvoker struct { + *ProxyInvoker +} + +// Invoke is used to call service method by invocation +func (pi *PassThroughProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + result := &protocol.RPCResult{} + result.SetAttachments(invocation.Attachments()) + url := getProviderURL(pi.GetUrl()) + + arguments := invocation.Arguments() + srv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey()) + + var args [][]byte + if len(arguments) > 0 { + args = make([][]byte, 0, len(arguments)) + for _, arg := range arguments { + if v, ok := arg.([]byte); ok { + args = append(args, v) + } else { + result.Err = perrors.New("the param type is not []byte") + return result + } + } + } + method := srv.Method()["Service"] + + in := make([]reflect.Value, 5) + in = append(in, srv.Rcvr()) + in = append(in, reflect.ValueOf(invocation.MethodName())) + in = append(in, reflect.ValueOf(invocation.Attachment(constant.PARAMS_TYPE_Key))) + in = append(in, reflect.ValueOf(args)) + in = append(in, reflect.ValueOf(invocation.Attachments())) + + returnValues := method.Method().Func.Call(in) + + var retErr interface{} + replyv := returnValues[0] + retErr = returnValues[1].Interface() + + if retErr != nil { + result.SetError(retErr.(error)) + return result + } + if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { + result.SetResult(replyv.Interface()) + } + return result +} diff --git a/common/proxy/proxy_factory/pass_through_test.go b/common/proxy/proxy_factory/pass_through_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1607503046eaddf13017fc9235574b2c73b9b601 --- /dev/null +++ b/common/proxy/proxy_factory/pass_through_test.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package proxy_factory + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +func TestPassThroughProxyFactoryGetProxy(t *testing.T) { + proxyFactory := NewPassThroughProxyFactory() + url := common.NewURLWithOptions() + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(url), url) + assert.NotNil(t, proxy) +} + +type TestPassThroughProxyFactoryAsync struct { +} + +func (u *TestPassThroughProxyFactoryAsync) CallBack(res common.CallbackResponse) { + fmt.Println("CallBack res:", res) +} + +func TestPassThroughProxyFactoryGetAsyncProxy(t *testing.T) { + proxyFactory := NewPassThroughProxyFactory() + url := common.NewURLWithOptions() + async := &TestPassThroughProxyFactoryAsync{} + proxy := proxyFactory.GetAsyncProxy(protocol.NewBaseInvoker(url), async.CallBack, url) + assert.NotNil(t, proxy) +} + +func TestPassThroughProxyFactoryGetInvoker(t *testing.T) { + proxyFactory := NewPassThroughProxyFactory() + url := common.NewURLWithOptions() + invoker := proxyFactory.GetInvoker(url) + assert.True(t, invoker.IsAvailable()) +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 7bf8f46afb02cc51f8af3f67d838cbbee5e40e2e..98990bde73be85463065fae9313dcfea7ad52ce2 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -187,7 +187,7 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent // loop the updateEvents for _, event := range addEvents { logger.Debugf("registry update, result{%s}", event) - if event.Service != nil { + if event != nil && event.Service != nil { logger.Infof("selector add service url{%s}", event.Service.String()) } if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 3e6e62967d1cc8fd4e4e7eccd0b19d39504c65dc..0282e4293a8ed1b8aec390bdf95820c52f5b4586 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -48,7 +48,7 @@ var ( reserveParams = []string{ "application", "codec", "exchanger", "serialization", "cluster", "connections", "deprecated", "group", "loadbalance", "mock", "path", "timeout", "token", "version", "warmup", "weight", "timestamp", "dubbo", - "release", "interface", + "release", "interface", "registry.role", } ) diff --git a/test/integrate/dubbo/go-client/go.mod b/test/integrate/dubbo/go-client/go.mod index 73ba3d4ca46991789823fe5c2b1faa9f786c2bc0..38cda5b423c5e97067655e55f0ff34992ab94899 100644 --- a/test/integrate/dubbo/go-client/go.mod +++ b/test/integrate/dubbo/go-client/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/apache/dubbo-go v1.5.6-rc2 - github.com/apache/dubbo-go-hessian2 v1.9.1 + github.com/apache/dubbo-go-hecssian2 v1.9.1 ) replace github.com/apache/dubbo-go => ../../../../../dubbo-go