Skip to content
Snippets Groups Projects
Commit 95b5ad37 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #106 from fangyincheng/services

Fix:Lock bug
parents 36c78543 1369c1ef
No related branches found
No related tags found
No related merge requests found
......@@ -148,14 +148,13 @@ type Client struct {
pool *gettyRPCClientPool
sequence atomic.Uint64
pendingLock sync.RWMutex
pendingResponses map[SequenceType]*PendingResponse
pendingResponses *sync.Map
}
func NewClient() *Client {
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
......@@ -201,13 +200,6 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args i
return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}
func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.RLock()
defer c.pendingLock.RUnlock()
return c.pendingResponses[SequenceType(seq)]
}
func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
......@@ -330,20 +322,16 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
}
func (c *Client) addPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
c.pendingResponses.Store(SequenceType(pr.seq), pr)
}
func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
if presp, ok := c.pendingResponses.Load(seq); ok {
c.pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
......@@ -88,11 +88,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.Errorf("opts[0] is not of type *Client")
}
pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID))
if pendingRsp == nil {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
} else {
p.Body = &hessian.Response{RspObj: pendingRsp.reply}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply}
}
}
......
......@@ -46,8 +46,6 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
}
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
p.client.pendingLock.RLock()
defer p.client.pendingLock.RUnlock()
pkg := &DubboPackage{}
buf := bytes.NewBuffer(data)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment