diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index d723f68387d55c2219e2b69d78ab147d8c51fd3c..a39c271a4ad77eda39dba5cf5bd05f57e1c0c5b0 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -256,7 +256,13 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac if session == nil { return errSessionNotExist } - defer c.pool.release(conn, err) + defer func() { + if err == nil { + c.pool.put(conn) + return + } + conn.close() + }() if err = c.transfer(session, p, rsp); err != nil { return perrors.WithStack(err) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 926afa9a103201b04639cf283f5c87c53eb12541..c8c792df957cb85381fc07d1cd7ac0888dc4fb49 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -22,6 +22,7 @@ import ( "fmt" "net/url" "sync" + "sync/atomic" "time" ) @@ -51,6 +52,14 @@ type rpcSession struct { reqNum int32 } +func (s *rpcSession) AddReqNum(num int32) { + atomic.AddInt32(&s.reqNum, num) +} + +func (s *rpcSession) GetReqNum() int32 { + return atomic.LoadInt32(&s.reqNum) +} + //////////////////////////////////////////// // RpcClientHandler //////////////////////////////////////////// diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index b5bf040c67c2e0071222466e59db4de67d9e1ca2..2df1c6935305e0d70635613f509021e5b9203833 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -154,11 +154,11 @@ func (c *gettyRPCClient) addSession(session getty.Session) { } c.lock.Lock() + defer c.lock.Unlock() if c.sessions == nil { c.sessions = make([]*rpcSession, 0, 16) } c.sessions = append(c.sessions, &rpcSession{session: session}) - c.lock.Unlock() } func (c *gettyRPCClient) removeSession(session getty.Session) { @@ -166,21 +166,27 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { return } - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return - } + var removeFlag bool + func() { + c.lock.Lock() + defer c.lock.Unlock() + if c.sessions == nil { + return + } - for i, s := range c.sessions { - if s.session == session { - c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) - logger.Debugf("delete session{%s}, its index{%d}", session.Stat(), i) - break + for i, s := range c.sessions { + if s.session == session { + c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) + logger.Debugf("delete session{%s}, its index{%d}", session.Stat(), i) + break + } } - } - logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) - if len(c.sessions) == 0 { + logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) + if len(c.sessions) == 0 { + removeFlag = true + } + }() + if removeFlag { c.pool.safeRemove(c) c.close() } @@ -190,17 +196,24 @@ func (c *gettyRPCClient) updateSession(session getty.Session) { if session == nil { return } - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return - } - for i, s := range c.sessions { - if s.session == session { - c.sessions[i].reqNum++ - break + var rs *rpcSession + func() { + c.lock.RLock() + defer c.lock.RUnlock() + if c.sessions == nil { + return + } + + for i, s := range c.sessions { + if s.session == session { + rs = c.sessions[i] + break + } } + }() + if rs != nil { + rs.AddReqNum(1) } } @@ -238,28 +251,42 @@ func (c *gettyRPCClient) isAvailable() bool { func (c *gettyRPCClient) close() error { closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c) c.once.Do(func() { - c.gettyClient.Close() - c.gettyClient = nil - for _, s := range c.sessions { - logger.Infof("close client session{%s, last active:%s, request number:%d}", - s.session.Stat(), s.session.GetActive().String(), s.reqNum) - s.session.Close() - } - c.sessions = c.sessions[:0] + var ( + gettyClient getty.Client + sessions []*rpcSession + ) + func() { + c.lock.Lock() + defer c.lock.Unlock() + + gettyClient = c.gettyClient + c.gettyClient = nil + + sessions = make([]*rpcSession, 0, len(c.sessions)) + for _, s := range c.sessions { + sessions = append(sessions, s) + } + c.sessions = c.sessions[:0] + }() c.updateActive(0) + + go func() { + if gettyClient != nil { + gettyClient.Close() + } + for _, s := range sessions { + logger.Infof("close client session{%s, last active:%s, request number:%d}", + s.session.Stat(), s.session.GetActive().String(), s.GetReqNum()) + s.session.Close() + } + }() + closeErr = nil }) return closeErr } -func (c *gettyRPCClient) safeClose() error { - c.lock.Lock() - defer c.lock.Unlock() - - return c.close() -} - type gettyRPCClientPool struct { rpcClient *Client size int // size of []*gettyRPCClient @@ -284,37 +311,35 @@ func (p *gettyRPCClientPool) close() { p.conns = nil p.Unlock() for _, conn := range conns { - conn.safeClose() + conn.close() } } func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { - var ( - conn *gettyRPCClient - err error - ) - if conn, err = p.selectGettyRpcClient(protocol, addr); err == nil && conn == nil { + conn, err := p.get() + if err == nil && conn == nil { // create new conn return newGettyRPCClientConn(p, protocol, addr) } return conn, err } -func (p *gettyRPCClientPool) selectGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { + +func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) { + now := time.Now().Unix() + p.Lock() defer p.Unlock() if p.conns == nil { return nil, errClientPoolClosed } - now := time.Now().Unix() - for len(p.conns) > 0 { conn := p.conns[len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1] if d := now - conn.getActive(); d > p.ttl { p.remove(conn) - conn.safeClose() + go conn.close() continue } conn.updateActive(now) //update active time @@ -322,13 +347,9 @@ func (p *gettyRPCClientPool) selectGettyRpcClient(protocol, addr string) (*getty } return nil, nil } -func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { - if conn == nil || conn.getActive() == 0 { - return - } - if err != nil { - conn.safeClose() +func (p *gettyRPCClientPool) put(conn *gettyRPCClient) { + if conn == nil || conn.getActive() == 0 { return } @@ -341,8 +362,8 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { if len(p.conns) >= p.size { // delete @conn from client pool - p.remove(conn) - conn.safeClose() + // p.remove(conn) + conn.close() return } p.conns = append(p.conns, conn)