Skip to content
Snippets Groups Projects
Commit 9e17bad7 authored by zhanghuiren's avatar zhanghuiren
Browse files

Merge remote-tracking branch 'upstream/develop' into dev

parents 9be788c9 c8f57d96
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
......
......@@ -89,11 +89,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.Errorf("opts[0] is not of type *Client")
}
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
if p.Header.Type&hessian.PackageRequest != 0x00 {
// size of this array must be '7'
// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
p.Body = make([]interface{}, 7)
} else {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
}
// read body
......
......@@ -85,11 +85,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
}
if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
if p.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
if p.Header.Type&hessian.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
if p.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
}
h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
} else {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
p.Header.ResponseStatus = hessian.Response_OK
reply(session, p, hessian.PackageHeartbeat)
}
h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
return
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body)
......@@ -199,7 +205,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
// heartbeat
if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
h.reply(session, p, hessian.PackageHeartbeat)
reply(session, p, hessian.PackageHeartbeat)
return
}
......@@ -226,7 +232,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if !twoway {
return
}
h.reply(session, p, hessian.PackageResponse)
reply(session, p, hessian.PackageResponse)
}
}()
......@@ -241,7 +247,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
logger.Errorf(err.Error())
p.Header.ResponseStatus = hessian.Response_OK
p.Body = err
h.reply(session, p, hessian.PackageResponse)
reply(session, p, hessian.PackageResponse)
return
}
invoker := exporter.(protocol.Exporter).GetInvoker()
......@@ -266,7 +272,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if !twoway {
return
}
h.reply(session, p, hessian.PackageResponse)
reply(session, p, hessian.PackageResponse)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
......@@ -294,7 +300,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
SerialID: req.Header.SerialID,
......
......@@ -62,8 +62,10 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
return nil, 0, perrors.WithStack(err)
}
pkg.Err = pkg.Body.(*hessian.Response).Exception
pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments)
if pkg.Header.Type&hessian.PackageRequest == 0x00 {
pkg.Err = pkg.Body.(*hessian.Response).Exception
pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments)
}
return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
......
......@@ -19,6 +19,7 @@ package invocation
import (
"reflect"
"sync"
)
import (
......@@ -37,6 +38,7 @@ type RPCInvocation struct {
callBack interface{}
attachments map[string]string
invoker protocol.Invoker
lock sync.RWMutex
}
func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation {
......@@ -80,6 +82,8 @@ func (r *RPCInvocation) Attachments() map[string]string {
}
func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string {
r.lock.RLock()
defer r.lock.RUnlock()
if r.attachments == nil {
return defaultValue
}
......@@ -91,6 +95,8 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string
}
func (r *RPCInvocation) SetAttachments(key string, value string) {
r.lock.Lock()
defer r.lock.Unlock()
if r.attachments == nil {
r.attachments = make(map[string]string)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment