From e73d072acbe4a6dc43f2f0f56e03593fdabe16b6 Mon Sep 17 00:00:00 2001
From: fangyincheng <fangyincheng@sina.com>
Date: Thu, 26 Nov 2020 02:55:54 +0800
Subject: [PATCH] refact heartbeat

---
 remoting/exchange.go           | 49 +++++++++++++++++++++++++++
 remoting/exchange_client.go    | 60 ----------------------------------
 remoting/getty/config.go       | 14 ++++++--
 remoting/getty/getty_client.go | 49 +++++++--------------------
 remoting/getty/listener.go     | 56 +++++++++++++++++++++++--------
 5 files changed, 115 insertions(+), 113 deletions(-)

diff --git a/remoting/exchange.go b/remoting/exchange.go
index 848d9cbbc..5fbd8ae9b 100644
--- a/remoting/exchange.go
+++ b/remoting/exchange.go
@@ -17,6 +17,7 @@
 package remoting
 
 import (
+	"sync"
 	"time"
 )
 
@@ -26,13 +27,19 @@ import (
 
 import (
 	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
 )
 
 var (
 	// generate request ID for global use
 	sequence atomic.Int64
+
+	// store requestID and response
+	pendingResponses = new(sync.Map)
 )
 
+type SequenceType int64
+
 func init() {
 	// init request ID
 	sequence.Store(0)
@@ -90,6 +97,23 @@ func (response *Response) IsHeartbeat() bool {
 	return response.Event && response.Result == nil
 }
 
+func (response *Response) Handle() {
+	pendingResponse := removePendingResponse(SequenceType(response.ID))
+	if pendingResponse == nil {
+		logger.Errorf("failed to get pending response context for response package %s", *response)
+		return
+	}
+
+	pendingResponse.response = response
+
+	if pendingResponse.Callback == nil {
+		pendingResponse.Err = pendingResponse.response.Error
+		close(pendingResponse.Done)
+	} else {
+		pendingResponse.Callback(pendingResponse.GetCallResponse())
+	}
+}
+
 type Options struct {
 	// connect timeout
 	ConnectTimeout time.Duration
@@ -142,3 +166,28 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse {
 		Reply:     r.response,
 	}
 }
+
+// store response into map
+func AddPendingResponse(pr *PendingResponse) {
+	pendingResponses.Store(SequenceType(pr.seq), pr)
+}
+
+// get and remove response
+func removePendingResponse(seq SequenceType) *PendingResponse {
+	if pendingResponses == nil {
+		return nil
+	}
+	if presp, ok := pendingResponses.Load(seq); ok {
+		pendingResponses.Delete(seq)
+		return presp.(*PendingResponse)
+	}
+	return nil
+}
+
+// get response
+func GetPendingResponse(seq SequenceType) *PendingResponse {
+	if presp, ok := pendingResponses.Load(seq); ok {
+		return presp.(*PendingResponse)
+	}
+	return nil
+}
diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go
index b3423eb67..d65382035 100644
--- a/remoting/exchange_client.go
+++ b/remoting/exchange_client.go
@@ -18,7 +18,6 @@ package remoting
 
 import (
 	"errors"
-	"sync"
 	"time"
 )
 
@@ -28,19 +27,10 @@ import (
 	"github.com/apache/dubbo-go/protocol"
 )
 
-var (
-	// store requestID and response
-	pendingResponses = new(sync.Map)
-)
-
-type SequenceType int64
-
 // It is interface of client for network communication.
 // If you use getty as network communication, you should define GettyClient that implements this interface.
 type Client interface {
 	SetExchangeClient(client *ExchangeClient)
-	// responseHandler is used to deal with msg
-	SetResponseHandler(responseHandler ResponseHandler)
 	// connect url
 	Connect(url *common.URL) error
 	// close
@@ -63,11 +53,6 @@ type ExchangeClient struct {
 	init bool
 }
 
-// handle the message from server
-type ResponseHandler interface {
-	Handler(response *Response)
-}
-
 // create ExchangeClient
 func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient {
 	exchangeClient := &ExchangeClient{
@@ -82,7 +67,6 @@ func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Durat
 		}
 	}
 
-	client.SetResponseHandler(exchangeClient)
 	return exchangeClient
 }
 
@@ -190,47 +174,3 @@ func (client *ExchangeClient) Close() {
 func (client *ExchangeClient) IsAvailable() bool {
 	return client.client.IsAvailable()
 }
-
-// handle the response from server
-func (client *ExchangeClient) Handler(response *Response) {
-
-	pendingResponse := removePendingResponse(SequenceType(response.ID))
-	if pendingResponse == nil {
-		logger.Errorf("failed to get pending response context for response package %s", *response)
-		return
-	}
-
-	pendingResponse.response = response
-
-	if pendingResponse.Callback == nil {
-		pendingResponse.Err = pendingResponse.response.Error
-		close(pendingResponse.Done)
-	} else {
-		pendingResponse.Callback(pendingResponse.GetCallResponse())
-	}
-}
-
-// store response into map
-func AddPendingResponse(pr *PendingResponse) {
-	pendingResponses.Store(SequenceType(pr.seq), pr)
-}
-
-// get and remove response
-func removePendingResponse(seq SequenceType) *PendingResponse {
-	if pendingResponses == nil {
-		return nil
-	}
-	if presp, ok := pendingResponses.Load(seq); ok {
-		pendingResponses.Delete(seq)
-		return presp.(*PendingResponse)
-	}
-	return nil
-}
-
-// get response
-func GetPendingResponse(seq SequenceType) *PendingResponse {
-	if presp, ok := pendingResponses.Load(seq); ok {
-		return presp.(*PendingResponse)
-	}
-	return nil
-}
diff --git a/remoting/getty/config.go b/remoting/getty/config.go
index 449946000..b6aa08206 100644
--- a/remoting/getty/config.go
+++ b/remoting/getty/config.go
@@ -55,9 +55,13 @@ type (
 		SSLEnabled bool
 
 		// heartbeat
-		HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
+		HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
 		heartbeatPeriod time.Duration
 
+		// heartbeat timeout
+		HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
+		heartbeatTimeout time.Duration
+
 		// session
 		SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
 		sessionTimeout time.Duration
@@ -80,7 +84,7 @@ type (
 		ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
 
 		// heartbeat
-		HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
+		HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
 		heartbeatPeriod time.Duration
 
 		// heartbeat timeout
@@ -224,6 +228,12 @@ func (c *ServerConfig) CheckValidity() error {
 			c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
 	}
 
+	if len(c.HeartbeatTimeout) == 0 {
+		c.heartbeatTimeout = 60 * time.Second
+	} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
+		return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
+	}
+
 	if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
 		return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
 	}
diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go
index ed744c201..e432ca55a 100644
--- a/remoting/getty/getty_client.go
+++ b/remoting/getty/getty_client.go
@@ -38,12 +38,11 @@ import (
 )
 
 var (
-	errInvalidCodecType     = perrors.New("illegal CodecType")
-	errInvalidAddress       = perrors.New("remote address invalid or empty")
-	errSessionNotExist      = perrors.New("session not exist")
-	errClientClosed         = perrors.New("client closed")
-	errClientReadTimeout    = perrors.New("client read timeout")
-	errHeartbeatReadTimeout = perrors.New("heartbeat read timeout")
+	errInvalidCodecType  = perrors.New("illegal CodecType")
+	errInvalidAddress    = perrors.New("remote address invalid or empty")
+	errSessionNotExist   = perrors.New("session not exist")
+	errClientClosed      = perrors.New("client closed")
+	errClientReadTimeout = perrors.New("client read timeout")
 
 	clientConf   *ClientConfig
 	clientGrpool *gxsync.TaskPool
@@ -119,13 +118,12 @@ type Options struct {
 
 // Client : some configuration for network communication.
 type Client struct {
-	addr            string
-	opts            Options
-	conf            ClientConfig
-	pool            *gettyRPCClientPool
-	codec           remoting.Codec
-	responseHandler remoting.ResponseHandler
-	ExchangeClient  *remoting.ExchangeClient
+	addr           string
+	opts           Options
+	conf           ClientConfig
+	pool           *gettyRPCClientPool
+	codec          remoting.Codec
+	ExchangeClient *remoting.ExchangeClient
 }
 
 // create client
@@ -147,9 +145,6 @@ func NewClient(opt Options) *Client {
 func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
 	c.ExchangeClient = client
 }
-func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
-	c.responseHandler = responseHandler
-}
 
 // init client and try to connection.
 func (c *Client) Connect(url *common.URL) error {
@@ -221,28 +216,6 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
 	return rpcClient, rpcClient.selectSession(), nil
 }
 
-func (c *Client) heartbeat(session getty.Session, timeout time.Duration, callBack func(err error)) error {
-	req := remoting.NewRequest("2.0.2")
-	req.TwoWay = true
-	req.Event = true
-	resp := remoting.NewPendingResponse(req.ID)
-	remoting.AddPendingResponse(resp)
-	err := c.transfer(session, req, 3*time.Second)
-
-	go func() {
-		var err1 error
-		select {
-		case <-getty.GetTimeWheel().After(timeout):
-			err1 = errHeartbeatReadTimeout
-		case <-resp.Done:
-			err1 = resp.Err
-		}
-		callBack(err1)
-	}()
-
-	return perrors.WithStack(err)
-}
-
 func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
 	err := session.WritePkg(request, timeout)
 	return perrors.WithStack(err)
diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go
index 3b8c0b582..b2f7790f2 100644
--- a/remoting/getty/listener.go
+++ b/remoting/getty/listener.go
@@ -44,7 +44,8 @@ const (
 )
 
 var (
-	errTooManySessions = perrors.New("too many sessions")
+	errTooManySessions      = perrors.New("too many sessions")
+	errHeartbeatReadTimeout = perrors.New("heartbeat read timeout")
 )
 
 type rpcSession struct {
@@ -124,7 +125,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
 		if p.Error != nil {
 			logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
 		}
-		h.conn.pool.rpcClient.responseHandler.Handler(p)
+		p.Handle()
 		return
 	}
 
@@ -132,7 +133,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
 
 	h.conn.updateSession(session)
 
-	h.conn.pool.rpcClient.responseHandler.Handler(p)
+	p.Handle()
 }
 
 // OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
@@ -163,7 +164,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
 		h.timeoutTimes = 0
 	}
 
-	if err := h.conn.pool.rpcClient.heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
+	if err := heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
 		logger.Warnf("failed to send heartbeat, error{%v}", err)
 	}
 }
@@ -179,6 +180,7 @@ type RpcServerHandler struct {
 	sessionMap     map[getty.Session]*rpcSession
 	rwlock         sync.RWMutex
 	server         *Server
+	timeoutTimes   int
 }
 
 // nolint
@@ -246,11 +248,8 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
 			logger.Debugf("get rpc heartbeat response{%#v}", res)
 			if res.Error != nil {
 				logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
-				h.rwlock.Lock()
-				delete(h.sessionMap, session)
-				h.rwlock.Unlock()
-				session.Close()
 			}
+			res.Handle()
 			return
 		}
 		logger.Errorf("illegal package but not heartbeart. {%#v}", pkg)
@@ -334,7 +333,25 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
 		session.Close()
 	}
 
-	heartbeat(session)
+	heartbeatCallBack := func(err error) {
+		if err != nil {
+			logger.Warnf("failed to send heartbeat, error{%v}", err)
+			if h.timeoutTimes >= 3 {
+				h.rwlock.Lock()
+				delete(h.sessionMap, session)
+				h.rwlock.Unlock()
+				session.Close()
+				return
+			}
+			h.timeoutTimes++
+			return
+		}
+		h.timeoutTimes = 0
+	}
+
+	if err := heartbeat(session, h.server.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
+		logger.Warnf("failed to send heartbeat, error{%v}", err)
+	}
 }
 
 func reply(session getty.Session, resp *remoting.Response) {
@@ -343,11 +360,24 @@ func reply(session getty.Session, resp *remoting.Response) {
 	}
 }
 
-func heartbeat(session getty.Session) {
+func heartbeat(session getty.Session, timeout time.Duration, callBack func(err error)) error {
 	req := remoting.NewRequest("2.0.2")
 	req.TwoWay = true
 	req.Event = true
-	if err := session.WritePkg(req, WritePkg_Timeout); err != nil {
-		logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req)
-	}
+	resp := remoting.NewPendingResponse(req.ID)
+	remoting.AddPendingResponse(resp)
+	err := session.WritePkg(req, 3*time.Second)
+
+	go func() {
+		var err1 error
+		select {
+		case <-getty.GetTimeWheel().After(timeout):
+			err1 = errHeartbeatReadTimeout
+		case <-resp.Done:
+			err1 = resp.Err
+		}
+		callBack(err1)
+	}()
+
+	return perrors.WithStack(err)
 }
-- 
GitLab