Skip to content
Snippets Groups Projects
Commit a71d1a80 authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #318 from divebomb/develop

Imp: reduce protocol/dubbo/pool.go lock scope
parents 2387c15d 31286e48
No related branches found
No related tags found
No related merge requests found
......@@ -256,7 +256,13 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
if session == nil {
return errSessionNotExist
}
defer c.pool.release(conn, err)
defer func() {
if err == nil {
c.pool.put(conn)
return
}
conn.close()
}()
if err = c.transfer(session, p, rsp); err != nil {
return perrors.WithStack(err)
......
......@@ -22,6 +22,7 @@ import (
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"
)
......@@ -51,6 +52,14 @@ type rpcSession struct {
reqNum int32
}
func (s *rpcSession) AddReqNum(num int32) {
atomic.AddInt32(&s.reqNum, num)
}
func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}
////////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
......
......@@ -154,11 +154,11 @@ func (c *gettyRPCClient) addSession(session getty.Session) {
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
c.sessions = make([]*rpcSession, 0, 16)
}
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *gettyRPCClient) removeSession(session getty.Session) {
......@@ -166,21 +166,27 @@ func (c *gettyRPCClient) removeSession(session getty.Session) {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
var removeFlag bool
func() {
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
logger.Debugf("delete session{%s}, its index{%d}", session.Stat(), i)
break
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
logger.Debugf("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
}
logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
removeFlag = true
}
}()
if removeFlag {
c.pool.safeRemove(c)
c.close()
}
......@@ -190,17 +196,24 @@ func (c *gettyRPCClient) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
var rs *rpcSession
func() {
c.lock.RLock()
defer c.lock.RUnlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
rs = c.sessions[i]
break
}
}
}()
if rs != nil {
rs.AddReqNum(1)
}
}
......@@ -238,28 +251,42 @@ func (c *gettyRPCClient) isAvailable() bool {
func (c *gettyRPCClient) close() error {
closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c)
c.once.Do(func() {
c.gettyClient.Close()
c.gettyClient = nil
for _, s := range c.sessions {
logger.Infof("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.sessions = c.sessions[:0]
var (
gettyClient getty.Client
sessions []*rpcSession
)
func() {
c.lock.Lock()
defer c.lock.Unlock()
gettyClient = c.gettyClient
c.gettyClient = nil
sessions = make([]*rpcSession, 0, len(c.sessions))
for _, s := range c.sessions {
sessions = append(sessions, s)
}
c.sessions = c.sessions[:0]
}()
c.updateActive(0)
go func() {
if gettyClient != nil {
gettyClient.Close()
}
for _, s := range sessions {
logger.Infof("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.GetReqNum())
s.session.Close()
}
}()
closeErr = nil
})
return closeErr
}
func (c *gettyRPCClient) safeClose() error {
c.lock.Lock()
defer c.lock.Unlock()
return c.close()
}
type gettyRPCClientPool struct {
rpcClient *Client
size int // size of []*gettyRPCClient
......@@ -284,37 +311,35 @@ func (p *gettyRPCClientPool) close() {
p.conns = nil
p.Unlock()
for _, conn := range conns {
conn.safeClose()
conn.close()
}
}
func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
var (
conn *gettyRPCClient
err error
)
if conn, err = p.selectGettyRpcClient(protocol, addr); err == nil && conn == nil {
conn, err := p.get()
if err == nil && conn == nil {
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}
return conn, err
}
func (p *gettyRPCClientPool) selectGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) {
func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
now := time.Now().Unix()
p.Lock()
defer p.Unlock()
if p.conns == nil {
return nil, errClientPoolClosed
}
now := time.Now().Unix()
for len(p.conns) > 0 {
conn := p.conns[len(p.conns)-1]
p.conns = p.conns[:len(p.conns)-1]
if d := now - conn.getActive(); d > p.ttl {
p.remove(conn)
conn.safeClose()
go conn.close()
continue
}
conn.updateActive(now) //update active time
......@@ -322,13 +347,9 @@ func (p *gettyRPCClientPool) selectGettyRpcClient(protocol, addr string) (*getty
}
return nil, nil
}
func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if conn == nil || conn.getActive() == 0 {
return
}
if err != nil {
conn.safeClose()
func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
if conn == nil || conn.getActive() == 0 {
return
}
......@@ -341,8 +362,8 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if len(p.conns) >= p.size {
// delete @conn from client pool
p.remove(conn)
conn.safeClose()
// p.remove(conn)
conn.close()
return
}
p.conns = append(p.conns, conn)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment