From 4f7ef9ddef1de6dc7cb121947357234a0202c056 Mon Sep 17 00:00:00 2001 From: "Xin.Zh" <dragoncharlie@foxmail.com> Date: Sat, 8 Jun 2019 10:15:27 +0800 Subject: [PATCH] Imp: add Client.GetPendingResponse to limit the lock scope (#83) * Imp: add Client.GetPendingResponse to limit the lock scope * Fix: return pkg len as case 1 * Mod: update getty&hessian2 version * Fix: use bufio.NewReadSize instead of bufio.NewReader * Fix: return {nil, 0, nil} for tcp case 1 * Fix: return {nil, 0, nil} for tcp case 1 for server handler --- go.mod | 4 ++-- go.sum | 23 ++--------------------- protocol/dubbo/client.go | 9 ++++++++- protocol/dubbo/codec.go | 17 ++++++++--------- protocol/dubbo/readwriter.go | 18 ++++++++++++++++-- 5 files changed, 36 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index df9ce48d1..0527279e4 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/apache/dubbo-go require ( - github.com/dubbogo/getty v0.0.0-20190523180329-bdf5e640ea53 - github.com/dubbogo/hessian2 v0.0.0-20190606185624-13bbc9786e3f + github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af // indirect + github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb // indirect github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index f3cd52835..8617076c1 100644 --- a/go.sum +++ b/go.sum @@ -1,33 +1,19 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dubbogo/getty v0.0.0-20190523180329-bdf5e640ea53 h1:bniSNoC4xnAbrx4estwc9F0qkWnh6ZDsAS0y9d7mPos= -github.com/dubbogo/getty v0.0.0-20190523180329-bdf5e640ea53/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/hessian2 v0.0.0-20190606185624-13bbc9786e3f h1:oN6hLbasIJuqlT+/R0F1dVIDaXG+Sun9PE6yFogTtSs= -github.com/dubbogo/hessian2 v0.0.0-20190606185624-13bbc9786e3f/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= +github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -35,14 +21,9 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index d055ddac9..38aed1063 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -201,6 +201,13 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args i return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } +func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse { + c.pendingLock.RLock() + defer c.pendingLock.RUnlock() + + return c.pendingResponses[SequenceType(seq)] +} + func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { @@ -257,7 +264,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string } select { - case <-getty.GetTimeWheel().After(opts.ResponseTimeout): + case <-time.After(opts.ResponseTimeout): err = errClientReadTimeout c.removePendingResponse(SequenceType(rsp.seq)) case <-rsp.done: diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index e26db06a7..86d1676f1 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -83,15 +83,14 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { } if len(opts) != 0 { // for client - if client, ok := opts[0].(*Client); ok { - - r := client.pendingResponses[SequenceType(p.Header.ID)] - if r == nil { - return perrors.Errorf("pendingResponses[%v] = nil", p.Header.ID) - } - p.Body = client.pendingResponses[SequenceType(p.Header.ID)].reply - } else { - return perrors.Errorf("opts[0] is not *Client") + client, ok := opts[0].(*Client) + if !ok { + return perrors.Errorf("opts[0] is not of type *Client") + } + + p.Body = client.GetPendingResponse(SequenceType(p.Header.ID)).reply + if p.Body == nil { + return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) } } diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 5de96acaa..b7f2700d1 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -24,6 +24,7 @@ import ( import ( "github.com/dubbogo/getty" + hessian "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" ) import ( @@ -52,8 +53,14 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface buf := bytes.NewBuffer(data) err := pkg.Unmarshal(buf, p.client) if err != nil { - pkg.Err = perrors.WithStack(err) // client will get this err - return pkg, len(data), nil + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + return nil, 0, nil + } + + logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) + + return nil, 0, perrors.WithStack(err) } return pkg, len(data), nil @@ -94,6 +101,13 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface buf := bytes.NewBuffer(data) err := pkg.Unmarshal(buf) if err != nil { + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + return nil, 0, nil + } + + logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) + return nil, 0, perrors.WithStack(err) } // convert params of request -- GitLab