diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 608f0c2e28e77c4bfc2334ff08d5db6ae2170d27..7528571d497fa03f29a45a03b5b90bb7163708b5 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -222,6 +222,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl config.URL, method string ) conn, session, err = c.selectSession(addr) if err != nil || session == nil { + log.Warn(err) return errSessionNotExist } defer c.pool.release(conn, err) @@ -273,7 +274,6 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, ) sequence = c.sequence.Add(1) - session.SetAttribute("seq", sequence) // store seq if pkg == nil { pkg = &DubboPackage{} diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index cc1a215098abfeb5e4455bff990972d06a5150df..6d274b5396f7d64546c511a11b0bab5ea9cd4a2c 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -56,7 +56,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { return bytes.NewBuffer(pkg), nil } -func (p *DubboPackage) Unmarshal(buf *bytes.Buffer) error { +func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { codec := hessian.NewHessianCodec(bufio.NewReader(buf)) // read header @@ -65,6 +65,14 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer) error { return jerrors.Trace(err) } + if len(opts) != 0 { // for client + if client, ok := opts[0].(*Client); ok { + p.Body = client.pendingResponses[SequenceType(p.Header.ID)].reply + } else { + return fmt.Errorf("pendingResponses[%v] = nil", p.Header.ID) + } + } + if p.Header.Type&hessian.Heartbeat != 0x00 { return nil } diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 9d802f6e22860240eabca0ae297ffb4e794370ef..4ff68025c38ee4c24331d59be883554f1114c11a 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -29,12 +29,10 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { p.client.pendingLock.RLock() defer p.client.pendingLock.RUnlock() - pkg := &DubboPackage{ - Body: p.client.pendingResponses[SequenceType(ss.GetAttribute("seq").(uint64))].reply, - } + pkg := &DubboPackage{} buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf) + err := pkg.Unmarshal(buf, p.client) if err != nil { pkg.Err = jerrors.Trace(err) // client will get this err return pkg, len(data), nil