diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index a7f265c3bb189dc72a50d1f1302a5613f7cc45db..12ab408d08737f9c7036768e77c6a3975536903e 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -177,11 +177,6 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, } } - //if response.Header.Type&hessian.PackageRequest != 0x00 { - // resp.Body = req.Body - //} else { - // resp.Body = nil - //} codec := hessian.NewHessianCodec(nil) pkg, err := codec.Write(resp.Service, resp.Header, resp.Body) @@ -261,9 +256,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) methodName = req[3].(string) } if req[4] != nil { - //argsType - //invocation.ParameterTypes(constant., req[1].(string)) - //argsTypes = req[4].(string) + //ignore argTypes } if req[5] != nil { args = req[5].([]interface{}) @@ -271,27 +264,9 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) if req[6] != nil { attachments = req[6].(map[string]string) } - //if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { - // pkg.Service.Path = attachments[constant.PATH_KEY] - //} - //if _, ok := attachments[constant.INTERFACE_KEY]; ok { - // pkg.Service.Interface = attachments[constant.INTERFACE_KEY] - //} else { - // pkg.Service.Interface = pkg.Service.Path - //} - //if len(attachments[constant.GROUP_KEY]) > 0 { - // pkg.Service.Group = attachments[constant.GROUP_KEY] - //} invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), invocation.WithArguments(args), invocation.WithMethodName(methodName)) request.Data = invoc - //pkg.Body = map[string]interface{}{ - // "dubboVersion": dubboVersion, - // "argsTypes": argsTypes, - // "args": args, - // "service": common.ServiceMap.GetService("dubbo", pkg.Service.Path), // path as a key - // "attachments": attachments, - //} } } return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index ddee734b1b637d9727464bf409e5acd99fa6800a..1b3ac6fc8085f12fa1088c263b334420fec79410 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -19,24 +19,22 @@ package dubbo import ( "context" - "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/remoting" "strconv" + "strings" "sync" "sync/atomic" "time" -) -import ( + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/remoting" "github.com/opentracing/opentracing-go" - perrors "github.com/pkg/errors" -) -import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" + perrors "github.com/pkg/errors" + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) @@ -111,18 +109,19 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } //response := NewResponse(inv.Reply(), nil) rest := &protocol.RPCResult{} + timeout := di.getTimeout(inv) if async { if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { //result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) - result.Err = di.client.AsyncRequest(&invocation, url, di.timeout, callBack, rest) + result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest) } else { - result.Err = di.client.Send(&invocation, di.timeout) + result.Err = di.client.Send(&invocation, timeout) } } else { if inv.Reply() == nil { result.Err = ErrNoReply } else { - result.Err = di.client.Request(&invocation, url, di.timeout, rest) + result.Err = di.client.Request(&invocation, url, timeout, rest) } } if result.Err == nil { @@ -134,6 +133,20 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati return &result } +// get timeout including methodConfig +func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration { + var timeout = di.GetUrl().GetParam(strings.Join([]string{constant.METHOD_KEYS, invocation.MethodName(), constant.TIMEOUT_KEY}, "."), "") + if len(timeout) != 0 { + if t, err := time.ParseDuration(timeout); err == nil { + // config timeout into attachment + invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(t.Milliseconds()))) + return t + } + } + invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(di.timeout.Milliseconds()))) + return di.timeout +} + // Destroy ... func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 4fd2492a7591a5529a83ecc6ef8ed13c34545ea2..8c2845604550309e2c4408907fea0ab145745f18 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -82,18 +82,6 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { // Refer ... func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { - //default requestTimeout - //var requestTimeout = config.GetConsumerConfig().RequestTimeout - // - //requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) - //if t, err := time.ParseDuration(requestTimeoutStr); err == nil { - // requestTimeout = t - //} - - //invoker := NewDubboInvoker(url, NewClient(Options{ - // ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, - // RequestTimeout: requestTimeout, - //})) exchangeClient := getExchangeClient(url) if exchangeClient == nil { return nil diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 932e94862dd1a5c92a1f416482e8b61e574cc85c..b2894ed7fa76135df5a7371680aacd61fc5f1b44 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -27,11 +27,50 @@ import ( "github.com/stretchr/testify/assert" ) +func init() { + getty.SetServerConfig(getty.ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + GettySessionParam: getty.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "server", + }}) + getty.SetClientConf(getty.ClientConfig{ + ConnectionNum: 1, + HeartbeatPeriod: "3s", + SessionTimeout: "20s", + PoolTTL: 600, + PoolSize: 64, + GettySessionParam: getty.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "4s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "client", + }, + }) +} func TestDubboProtocol_Export(t *testing.T) { // Export proto := GetProtocol() - getty.SetServerConfig(getty.ServerConfig{}) - url, err := common.NewURL("dubbo://127.0.0.1:20080/com.ikurento.user.UserProvider?anyhost=true&" + + url, err := common.NewURL("dubbo://127.0.0.1:20094/com.ikurento.user.UserProvider?anyhost=true&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + @@ -44,7 +83,7 @@ func TestDubboProtocol_Export(t *testing.T) { assert.True(t, eq) // second service: the same path and the different version - url2, err := common.NewURL("dubbo://127.0.0.1:20080/com.ikurento.user.UserProvider?anyhost=true&"+ + url2, err := common.NewURL("dubbo://127.0.0.1:20095/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ @@ -70,16 +109,31 @@ func TestDubboProtocol_Export(t *testing.T) { assert.False(t, ok) } +func TestDubboProtocol_Refer_No_connect(t *testing.T) { + // Refer + proto := GetProtocol() + url, err := common.NewURL("dubbo://127.0.0.1:20096/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + invoker := proto.Refer(url) + assert.Nil(t, invoker) +} + func TestDubboProtocol_Refer(t *testing.T) { // Refer proto := GetProtocol() - url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + + url, err := common.NewURL("dubbo://127.0.0.1:20091/com.ikurento.user.UserProvider?anyhost=true&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + "side=provider&timeout=3000×tamp=1556509797245") assert.NoError(t, err) - getty.SetClientConf(getty.ClientConfig{}) + proto.Export(protocol.NewBaseInvoker(url)) + invoker := proto.Refer(url) // make sure url diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go index ccf8881bfe6bbefca6980638ccf014283af635cb..9687c9cde9faf5f84178425456333d8c669192f1 100644 --- a/remoting/getty/getty_client.go +++ b/remoting/getty/getty_client.go @@ -158,7 +158,7 @@ func (c *Client) Connect(url common.URL) error { c.codec = remoting.GetCodec(url.Protocol) c.addr = url.Location _, _, err := c.selectSession(c.addr) - logger.Error("try to connect server %v failed for %v", url.Location, err) + logger.Errorf("try to connect server %v failed for : %v", url.Location, err) return err }