Skip to content
Snippets Groups Projects
Unverified Commit 01f84dc5 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #302 from fangyincheng/heartbeat_for_server

Imp: receive heartbeat from server
parents c6f6377e 53bb7fb4
No related branches found
No related tags found
No related merge requests found
......@@ -89,11 +89,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.Errorf("opts[0] is not of type *Client")
}
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
if p.Header.Type&hessian.PackageRequest != 0x00 {
// size of this array must be '7'
// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
p.Body = make([]interface{}, 7)
} else {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
}
// read body
......
......@@ -85,11 +85,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
}
if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
if p.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
if p.Header.Type&hessian.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
if p.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
}
h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
} else {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
p.Header.ResponseStatus = hessian.Response_OK
reply(session, p, hessian.PackageHeartbeat)
}
h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
return
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body)
......@@ -199,7 +205,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
// heartbeat
if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
h.reply(session, p, hessian.PackageHeartbeat)
reply(session, p, hessian.PackageHeartbeat)
return
}
......@@ -226,7 +232,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if !twoway {
return
}
h.reply(session, p, hessian.PackageResponse)
reply(session, p, hessian.PackageResponse)
}
}()
......@@ -241,7 +247,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
logger.Errorf(err.Error())
p.Header.ResponseStatus = hessian.Response_OK
p.Body = err
h.reply(session, p, hessian.PackageResponse)
reply(session, p, hessian.PackageResponse)
return
}
invoker := exporter.(protocol.Exporter).GetInvoker()
......@@ -266,7 +272,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if !twoway {
return
}
h.reply(session, p, hessian.PackageResponse)
reply(session, p, hessian.PackageResponse)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
......@@ -294,7 +300,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
SerialID: req.Header.SerialID,
......
......@@ -62,8 +62,10 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
return nil, 0, perrors.WithStack(err)
}
pkg.Err = pkg.Body.(*hessian.Response).Exception
pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments)
if pkg.Header.Type&hessian.PackageRequest == 0x00 {
pkg.Err = pkg.Body.(*hessian.Response).Exception
pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments)
}
return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment