diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index ecd57e466a01b361a429cfd7d6eef4a3345574df..cdd2a10a471dd906fc77d165991752cc7d7e266b 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -22,6 +22,7 @@ import ( "math/rand" "net" "sync" + "sync/atomic" "time" ) @@ -78,11 +79,19 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge time.Sleep(1e6) } logger.Infof("client init ok") - c.created = time.Now().Unix() + c.updateActive(time.Now().Unix()) return c, nil } +func (c *gettyRPCClient) updateActive(active int64) { + atomic.StoreInt64(&c.created, active) +} + +func (c *gettyRPCClient) getActive() int64 { + return atomic.LoadInt64(&c.created) +} + func (c *gettyRPCClient) newSession(session getty.Session) error { var ( ok bool @@ -169,9 +178,8 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { } logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) if len(c.sessions) == 0 { - c.pool.Lock() - c.close() // -> pool.remove(c) - c.pool.Unlock() + c.pool.safeRemove(c) + c.close() } } @@ -225,10 +233,8 @@ func (c *gettyRPCClient) isAvailable() bool { } func (c *gettyRPCClient) close() error { - err := perrors.Errorf("close gettyRPCClient{%#v} again", c) + closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c) c.once.Do(func() { - // delete @c from client pool - c.pool.remove(c) c.gettyClient.Close() c.gettyClient = nil for _, s := range c.sessions { @@ -238,10 +244,17 @@ func (c *gettyRPCClient) close() error { } c.sessions = c.sessions[:0] - c.created = 0 - err = nil + c.updateActive(0) + closeErr = nil }) - return err + return closeErr +} + +func (c *gettyRPCClient) safeClose() error { + c.lock.Lock() + defer c.lock.Unlock() + + return c.close() } type gettyRPCClientPool struct { @@ -268,7 +281,7 @@ func (p *gettyRPCClientPool) close() { p.conns = nil p.Unlock() for _, conn := range conns { - conn.close() + conn.safeClose() } } @@ -286,11 +299,14 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC conn := p.conns[len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1] - if d := now - conn.created; d > p.ttl { - conn.close() // -> pool.remove(c) + if d := now - conn.getActive(); d > p.ttl { + if closeErr := conn.safeClose(); closeErr == nil { + p.remove(conn) + } continue } - conn.created = now //update created time + conn.updateActive(now) //update created time + return conn, nil } // create new conn @@ -298,34 +314,37 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC } func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { - if conn == nil || conn.created == 0 { + if conn == nil || conn.getActive() == 0 { return } + if err != nil { - conn.close() + conn.safeClose() return } p.Lock() defer p.Unlock() + if p.conns == nil { return } if len(p.conns) >= p.size { - conn.close() + if closeErr := conn.safeClose(); closeErr == nil { + // delete @conn from client pool + p.remove(conn) + } return } p.conns = append(p.conns, conn) } func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { - if conn == nil || conn.created == 0 { + if conn == nil || conn.getActive() == 0 { return } - //p.Lock() - //defer p.Unlock() if p.conns == nil { return } @@ -339,3 +358,10 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { } } } + +func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) { + p.Lock() + defer p.Unlock() + + p.remove(conn) +}