From b1a85b3db76d81c7b822ebb11b59fe165a2385f3 Mon Sep 17 00:00:00 2001 From: fangyincheng <fangyincheng@sina.com> Date: Tue, 11 Jun 2019 10:29:45 +0800 Subject: [PATCH] Fix:heartbeat and exception --- examples/dubbo/go-client/app/client.go | 2 +- go.mod | 2 +- go.sum | 4 +- protocol/dubbo/client.go | 2 +- protocol/dubbo/codec.go | 11 +--- protocol/dubbo/listener.go | 6 ++- protocol/dubbo/readwriter.go | 72 ++++++++++++++------------ 7 files changed, 49 insertions(+), 50 deletions(-) diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 6167fbc8e..25b3263ac 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -93,7 +93,7 @@ func main() { } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser") + println("\n\n\nstart to test dubbo - getUser2") user = &User{} err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser2(context.TODO(), []interface{}{1}, user) if err != nil { diff --git a/go.mod b/go.mod index e88b8da6c..4c2276429 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/apache/dubbo-go require ( github.com/dubbogo/getty v1.0.7 - github.com/dubbogo/hessian2 v1.0.0 + github.com/dubbogo/hessian2 v1.0.1 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index 8c83a280a..a4fd7f509 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= -github.com/dubbogo/hessian2 v1.0.0 h1:KM7BKAUZ+2sjv1nAV9JwJNPMK0T+sHYiSe7Gls1GFS4= -github.com/dubbogo/hessian2 v1.0.0/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= +github.com/dubbogo/hessian2 v1.0.1 h1:ztI7gJxR3Isxrrl2jE1IZKX61eNR93eRKGhn49vPEX8= +github.com/dubbogo/hessian2 v1.0.1/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 38aed1063..d22ac91ed 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -25,7 +25,7 @@ import ( import ( "github.com/dubbogo/getty" - hessian "github.com/dubbogo/hessian2" + "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" "go.uber.org/atomic" "gopkg.in/yaml.v2" diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index b578fb79b..9551c2b0b 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -79,10 +79,6 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { // read header err := codec.ReadHeader(&p.Header) if err != nil { - if p.Header.Type&hessian.PackageError != hessian.PackageType(hessian.Zero) { - p.Body = &hessian.Response{Exception: err} - return nil - } return perrors.WithStack(err) } @@ -95,12 +91,9 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID)) if pendingRsp == nil { return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } else { + p.Body = &hessian.Response{RspObj: pendingRsp.reply} } - p.Body = &hessian.Response{RspObj: pendingRsp.reply} - } - - if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - return nil } // read body diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 6a58de24b..ea8c38db9 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -86,9 +86,12 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { if p.Header.Type&hessian.PackageHeartbeat != 0x00 { logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + if p.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + } return } - //logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) + logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) h.conn.updateSession(session) @@ -203,7 +206,6 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { // not twoway if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 { twoway = false - h.reply(session, p, hessian.PackageResponse) } invoker := h.exporter.GetInvoker() diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index fd7c7c317..6d789ae13 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -65,6 +65,7 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface pkg.Err = pkg.Body.(*hessian.Response).Exception pkg.Body = pkg.Body.(*hessian.Response).RspObj + return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } @@ -112,40 +113,43 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface return nil, 0, perrors.WithStack(err) } - // convert params of request - req := pkg.Body.([]interface{}) // length of body should be 7 - if len(req) > 0 { - var dubboVersion, argsTypes string - var args []interface{} - var attachments map[interface{}]interface{} - if req[0] != nil { - dubboVersion = req[0].(string) - } - if req[1] != nil { - pkg.Service.Path = req[1].(string) - } - if req[2] != nil { - pkg.Service.Version = req[2].(string) - } - if req[3] != nil { - pkg.Service.Method = req[3].(string) - } - if req[4] != nil { - argsTypes = req[4].(string) - } - if req[5] != nil { - args = req[5].([]interface{}) - } - if req[6] != nil { - attachments = req[6].(map[interface{}]interface{}) - } - pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) - pkg.Body = map[string]interface{}{ - "dubboVersion": dubboVersion, - "argsTypes": argsTypes, - "args": args, - "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface), - "attachments": attachments, + + if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { + // convert params of request + req := pkg.Body.([]interface{}) // length of body should be 7 + if len(req) > 0 { + var dubboVersion, argsTypes string + var args []interface{} + var attachments map[interface{}]interface{} + if req[0] != nil { + dubboVersion = req[0].(string) + } + if req[1] != nil { + pkg.Service.Path = req[1].(string) + } + if req[2] != nil { + pkg.Service.Version = req[2].(string) + } + if req[3] != nil { + pkg.Service.Method = req[3].(string) + } + if req[4] != nil { + argsTypes = req[4].(string) + } + if req[5] != nil { + args = req[5].([]interface{}) + } + if req[6] != nil { + attachments = req[6].(map[interface{}]interface{}) + } + pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) + pkg.Body = map[string]interface{}{ + "dubboVersion": dubboVersion, + "argsTypes": argsTypes, + "args": args, + "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface), + "attachments": attachments, + } } } -- GitLab