diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index d6155b6021a07e51e4d4a779e56aff5e62bd40ce..72abcdac2327f877f6f141d7598a04477ac2bd40 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -148,14 +148,13 @@ type Client struct { pool *gettyRPCClientPool sequence atomic.Uint64 - pendingLock sync.RWMutex - pendingResponses map[SequenceType]*PendingResponse + pendingResponses *sync.Map } func NewClient() *Client { c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), + pendingResponses: new(sync.Map), conf: *clientConf, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -201,13 +200,6 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args i return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse { - c.pendingLock.RLock() - defer c.pendingLock.RUnlock() - - return c.pendingResponses[SequenceType(seq)] -} - func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { @@ -330,20 +322,16 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, } func (c *Client) addPendingResponse(pr *PendingResponse) { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() - c.pendingResponses[SequenceType(pr.seq)] = pr + c.pendingResponses.Store(SequenceType(pr.seq), pr) } func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() if c.pendingResponses == nil { return nil } - if presp, ok := c.pendingResponses[seq]; ok { - delete(c.pendingResponses, seq) - return presp + if presp, ok := c.pendingResponses.Load(seq); ok { + c.pendingResponses.Delete(seq) + return presp.(*PendingResponse) } return nil } diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 9551c2b0b7b742228ae69701a9d86975d2c85e52..e9ced8449404dd31e7d2c7694d15a3a4756e36a1 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -88,11 +88,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { return perrors.Errorf("opts[0] is not of type *Client") } - pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID)) - if pendingRsp == nil { + pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) + if !ok { return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) } else { - p.Body = &hessian.Response{RspObj: pendingRsp.reply} + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply} } } diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 529aa759a5c39a467a1f72560d56c1b48738a9a6..042b8789104d1e671807405a81045dc30adcf789 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -46,8 +46,6 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { } func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - p.client.pendingLock.RLock() - defer p.client.pendingLock.RUnlock() pkg := &DubboPackage{} buf := bytes.NewBuffer(data)