diff --git a/examples/consul/go-server/server.go b/examples/consul/go-server/server.go index 798fe7c7b0632bf7d366d4bf500846fd6ced73e9..0e1e1f608e4f112e4e47f74682156103345f80e3 100644 --- a/examples/consul/go-server/server.go +++ b/examples/consul/go-server/server.go @@ -57,7 +57,7 @@ func initSignal() { case syscall.SIGHUP: // reload() default: - go time.AfterFunc(time.Duration(float64(survivalTimeout)*float64(time.Second)), func() { + time.AfterFunc(time.Duration(float64(survivalTimeout)*float64(time.Second)), func() { logger.Warnf("app exit now by force...") os.Exit(1) }) diff --git a/examples/generic/go-client/app/client.go b/examples/generic/go-client/app/client.go index 8d5a1bc5fa6cb186fcf456d385f8dad16b750ea3..dbd4665f5cd8e6740094c00f3cb089a932bb3e51 100644 --- a/examples/generic/go-client/app/client.go +++ b/examples/generic/go-client/app/client.go @@ -35,10 +35,6 @@ import ( _ "github.com/apache/dubbo-go/registry/zookeeper" ) -var ( - survivalTimeout int = 10e9 -) - // they are necessary: // export CONF_CONSUMER_FILE_PATH="xxx" // export APP_LOG_CONF_FILE="xxx" diff --git a/examples/hystrixfilter/dubbo/with-hystrix-go-client/app/client.go b/examples/hystrixfilter/dubbo/with-hystrix-go-client/app/client.go index 0d8d7d2ffd6b7ea5db3831b55fa4dc45ace302c5..e759465f002ed68e6f6faf217f99352b98b3d13a 100644 --- a/examples/hystrixfilter/dubbo/with-hystrix-go-client/app/client.go +++ b/examples/hystrixfilter/dubbo/with-hystrix-go-client/app/client.go @@ -20,9 +20,6 @@ package main import ( "context" "fmt" - "os" - "os/signal" - "syscall" "time" ) @@ -44,10 +41,6 @@ import ( _ "github.com/apache/dubbo-go/registry/zookeeper" ) -var ( - survivalTimeout int = 10e9 -) - // they are necessary: // export CONF_CONSUMER_FILE_PATH="xxx" // export APP_LOG_CONF_FILE="xxx" @@ -102,29 +95,4 @@ func main() { resGot := <-getUser1Chan logger.Infof("[GetUser1] %v", resGot) } - //initSignal() -} - -func initSignal() { - signals := make(chan os.Signal, 1) - // It is not possible to block SIGKILL or syscall.SIGSTOP - signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP, - syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) - for { - sig := <-signals - logger.Infof("get signal %s", sig.String()) - switch sig { - case syscall.SIGHUP: - // reload() - default: - go time.AfterFunc(time.Duration(survivalTimeout)*time.Second, func() { - logger.Warnf("app exit now by force...") - os.Exit(1) - }) - - // 瑕佷箞fastFailTimeout鏃堕棿鍐呮墽琛屽畬姣曚笅闈㈢殑閫昏緫鐒跺悗绋嬪簭閫€鍑猴紝瑕佷箞鎵ц涓婇潰鐨勮秴鏃跺嚱鏁扮▼搴忓己琛岄€€鍑� - fmt.Println("app exit now...") - return - } - } } diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index ecd57e466a01b361a429cfd7d6eef4a3345574df..cdd2a10a471dd906fc77d165991752cc7d7e266b 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -22,6 +22,7 @@ import ( "math/rand" "net" "sync" + "sync/atomic" "time" ) @@ -78,11 +79,19 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge time.Sleep(1e6) } logger.Infof("client init ok") - c.created = time.Now().Unix() + c.updateActive(time.Now().Unix()) return c, nil } +func (c *gettyRPCClient) updateActive(active int64) { + atomic.StoreInt64(&c.created, active) +} + +func (c *gettyRPCClient) getActive() int64 { + return atomic.LoadInt64(&c.created) +} + func (c *gettyRPCClient) newSession(session getty.Session) error { var ( ok bool @@ -169,9 +178,8 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { } logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) if len(c.sessions) == 0 { - c.pool.Lock() - c.close() // -> pool.remove(c) - c.pool.Unlock() + c.pool.safeRemove(c) + c.close() } } @@ -225,10 +233,8 @@ func (c *gettyRPCClient) isAvailable() bool { } func (c *gettyRPCClient) close() error { - err := perrors.Errorf("close gettyRPCClient{%#v} again", c) + closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c) c.once.Do(func() { - // delete @c from client pool - c.pool.remove(c) c.gettyClient.Close() c.gettyClient = nil for _, s := range c.sessions { @@ -238,10 +244,17 @@ func (c *gettyRPCClient) close() error { } c.sessions = c.sessions[:0] - c.created = 0 - err = nil + c.updateActive(0) + closeErr = nil }) - return err + return closeErr +} + +func (c *gettyRPCClient) safeClose() error { + c.lock.Lock() + defer c.lock.Unlock() + + return c.close() } type gettyRPCClientPool struct { @@ -268,7 +281,7 @@ func (p *gettyRPCClientPool) close() { p.conns = nil p.Unlock() for _, conn := range conns { - conn.close() + conn.safeClose() } } @@ -286,11 +299,14 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC conn := p.conns[len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1] - if d := now - conn.created; d > p.ttl { - conn.close() // -> pool.remove(c) + if d := now - conn.getActive(); d > p.ttl { + if closeErr := conn.safeClose(); closeErr == nil { + p.remove(conn) + } continue } - conn.created = now //update created time + conn.updateActive(now) //update created time + return conn, nil } // create new conn @@ -298,34 +314,37 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC } func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { - if conn == nil || conn.created == 0 { + if conn == nil || conn.getActive() == 0 { return } + if err != nil { - conn.close() + conn.safeClose() return } p.Lock() defer p.Unlock() + if p.conns == nil { return } if len(p.conns) >= p.size { - conn.close() + if closeErr := conn.safeClose(); closeErr == nil { + // delete @conn from client pool + p.remove(conn) + } return } p.conns = append(p.conns, conn) } func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { - if conn == nil || conn.created == 0 { + if conn == nil || conn.getActive() == 0 { return } - //p.Lock() - //defer p.Unlock() if p.conns == nil { return } @@ -339,3 +358,10 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { } } } + +func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) { + p.Lock() + defer p.Unlock() + + p.remove(conn) +} diff --git a/protocol/RpcStatus.go b/protocol/rpc_status.go similarity index 98% rename from protocol/RpcStatus.go rename to protocol/rpc_status.go index 78796b6beaf24dac33d7e0210703a9027f9fe568..3a8bfbc87f285e0e86269d44c47d6771566d97b1 100644 --- a/protocol/RpcStatus.go +++ b/protocol/rpc_status.go @@ -15,7 +15,6 @@ * limitations under the License. */ -// @author yiji@apache.org package protocol import (