diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 14bafeddceb33837434096199a365cc4d142d275..fb2d20d3aa03b5899ad02a8aeefad82b3907d5d7 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -93,7 +93,7 @@ func main() { } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser") + println("\n\n\nstart to test dubbo - getUser2") user = &User{} err = userProvider.GetUser2(context.TODO(), []interface{}{1}, user) if err != nil { diff --git a/go.mod b/go.mod index 65ab4e64ea23bfffc66f44cefb6b2d2da18526c4..4c2276429ae315a44c282a8af9fb7e6d7ee1be8e 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/apache/dubbo-go require ( - github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af - github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb + github.com/dubbogo/getty v1.0.7 + github.com/dubbogo/hessian2 v1.0.1 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index 62cc820e0e07220fa1723cd0709a5092e36c0727..a4fd7f5099b5521b03e03cf5958c641991433d39 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af h1:vvXNXyq5uIlf+KlTduhRKY4hBBBjgCUNreT1yIfKftw= -github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb h1:oN6hFLXbT/iDUO8qE4NZtvh89F/7VoAQ1LDxHJdmEH4= -github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= +github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= +github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= +github.com/dubbogo/hessian2 v1.0.1 h1:ztI7gJxR3Isxrrl2jE1IZKX61eNR93eRKGhn49vPEX8= +github.com/dubbogo/hessian2 v1.0.1/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= @@ -29,6 +29,7 @@ golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fT golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 38aed1063c7ec200b6b0654dcb3a5e8ac8138b86..d22ac91ed562468a40daf693948d79962bf07e0d 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -25,7 +25,7 @@ import ( import ( "github.com/dubbogo/getty" - hessian "github.com/dubbogo/hessian2" + "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" "go.uber.org/atomic" "gopkg.in/yaml.v2" diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index 0a02739a86d9ed6bffc0b209152ac789faee87f3..14982c6f123101ea9581542790c74bfc999437b5 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -18,6 +18,7 @@ package dubbo import ( + "bytes" "context" "sync" "testing" @@ -73,7 +74,13 @@ func TestClient_Call(t *testing.T) { c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) user := &User{} - err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) + err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{}, user) + assert.NoError(t, err) + assert.NotEqual(t, "", user.Id) + assert.NotEqual(t, "", user.Name) + + user = &User{} + err = c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) @@ -137,7 +144,7 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) assert.NoError(t, err) - assert.Equal(t, "GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4", methods) + assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4", methods) // config SetClientConf(ClientConfig{ @@ -156,10 +163,10 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { TcpWBufSize: 65536, PkgRQSize: 1024, PkgWQSize: 512, - TcpReadTimeout: "1s", + TcpReadTimeout: "4s", TcpWriteTimeout: "5s", WaitTimeout: "1s", - MaxMsgLen: 1024, + MaxMsgLen: 10240000000, SessionName: "client", }, }) @@ -180,7 +187,7 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { TcpReadTimeout: "1s", TcpWriteTimeout: "5s", WaitTimeout: "1s", - MaxMsgLen: 1024, + MaxMsgLen: 10240000000, SessionName: "server", }}) assert.NoError(t, srvConf.CheckValidity()) @@ -200,6 +207,18 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { return proto, url } +// size:4801228 +func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error { + argBuf := new(bytes.Buffer) + for i := 0; i < 4000; i++ { + argBuf.WriteString("鍑婚紦鍏堕晽锛岃笂璺冪敤鍏点€傚湡鍥藉煄婕曪紝鎴戠嫭鍗楄銆備粠瀛欏瓙浠诧紝骞抽檲涓庡畫銆備笉鎴戜互褰掞紝蹇у績鏈夊俊銆傜埌灞呯埌澶勶紵鐖颁抚鍏堕┈锛熶簬浠ユ眰涔嬶紵浜庢灄涔嬩笅銆傛鐢熷闃旓紝涓庡瓙鎴愯銆傛墽瀛愪箣鎵嬶紝涓庡瓙鍋曡€併€備簬鍡熼様鍏紝涓嶆垜娲诲叜銆備簬鍡熸吹鍏紝涓嶆垜淇″叜銆�") + argBuf.WriteString("鍑婚紦鍏堕晽锛岃笂璺冪敤鍏点€傚湡鍥藉煄婕曪紝鎴戠嫭鍗楄銆備粠瀛欏瓙浠诧紝骞抽檲涓庡畫銆備笉鎴戜互褰掞紝蹇у績鏈夊俊銆傜埌灞呯埌澶勶紵鐖颁抚鍏堕┈锛熶簬浠ユ眰涔嬶紵浜庢灄涔嬩笅銆傛鐢熷闃旓紝涓庡瓙鎴愯銆傛墽瀛愪箣鎵嬶紝涓庡瓙鍋曡€併€備簬鍡熼様鍏紝涓嶆垜娲诲叜銆備簬鍡熸吹鍏紝涓嶆垜淇″叜銆�") + } + rsp.Id = argBuf.String() + rsp.Name = argBuf.String() + return nil +} + func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error { rsp.Id = req[0].(string) rsp.Name = req[1].(string) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 86d1676f1841beab661502c30a7e7c4fe07a38de..9551c2b0b7b742228ae69701a9d86975d2c85e52 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -25,7 +25,7 @@ import ( ) import ( - hessian "github.com/dubbogo/hessian2" + "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" ) @@ -88,16 +88,14 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { return perrors.Errorf("opts[0] is not of type *Client") } - p.Body = client.GetPendingResponse(SequenceType(p.Header.ID)).reply - if p.Body == nil { + pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID)) + if pendingRsp == nil { return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } else { + p.Body = &hessian.Response{RspObj: pendingRsp.reply} } } - if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - return nil - } - // read body err = codec.ReadBody(p.Body) return perrors.WithStack(err) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 9e99bcb628ea1c3075d5768bfd79bb2083f434f9..ea8c38db9817ae2df078ef9b21caaf7b13751b3e 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -86,6 +86,9 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { if p.Header.Type&hessian.PackageHeartbeat != 0x00 { logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + if p.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + } return } logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) @@ -203,7 +206,6 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { // not twoway if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 { twoway = false - h.reply(session, p, hessian.PackageResponse) } invoker := h.exporter.GetInvoker() diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index b7f2700d15d3c3c6b46371cb5eef8ee57ce9415e..6d789ae13e458249747f660c406ab2fe4c6463f0 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -24,7 +24,7 @@ import ( import ( "github.com/dubbogo/getty" - hessian "github.com/dubbogo/hessian2" + "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" ) import ( @@ -63,7 +63,10 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface return nil, 0, perrors.WithStack(err) } - return pkg, len(data), nil + pkg.Err = pkg.Body.(*hessian.Response).Exception + pkg.Body = pkg.Body.(*hessian.Response).RspObj + + return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { @@ -110,44 +113,47 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface return nil, 0, perrors.WithStack(err) } - // convert params of request - req := pkg.Body.([]interface{}) // length of body should be 7 - if len(req) > 0 { - var dubboVersion, argsTypes string - var args []interface{} - var attachments map[interface{}]interface{} - if req[0] != nil { - dubboVersion = req[0].(string) - } - if req[1] != nil { - pkg.Service.Path = req[1].(string) - } - if req[2] != nil { - pkg.Service.Version = req[2].(string) - } - if req[3] != nil { - pkg.Service.Method = req[3].(string) - } - if req[4] != nil { - argsTypes = req[4].(string) - } - if req[5] != nil { - args = req[5].([]interface{}) - } - if req[6] != nil { - attachments = req[6].(map[interface{}]interface{}) - } - pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) - pkg.Body = map[string]interface{}{ - "dubboVersion": dubboVersion, - "argsTypes": argsTypes, - "args": args, - "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface), - "attachments": attachments, + + if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { + // convert params of request + req := pkg.Body.([]interface{}) // length of body should be 7 + if len(req) > 0 { + var dubboVersion, argsTypes string + var args []interface{} + var attachments map[interface{}]interface{} + if req[0] != nil { + dubboVersion = req[0].(string) + } + if req[1] != nil { + pkg.Service.Path = req[1].(string) + } + if req[2] != nil { + pkg.Service.Version = req[2].(string) + } + if req[3] != nil { + pkg.Service.Method = req[3].(string) + } + if req[4] != nil { + argsTypes = req[4].(string) + } + if req[5] != nil { + args = req[5].([]interface{}) + } + if req[6] != nil { + attachments = req[6].(map[interface{}]interface{}) + } + pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) + pkg.Body = map[string]interface{}{ + "dubboVersion": dubboVersion, + "argsTypes": argsTypes, + "args": args, + "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface), + "attachments": attachments, + } } } - return pkg, len(data), nil + return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {