Skip to content
Snippets Groups Projects
Commit e73d072a authored by fangyincheng's avatar fangyincheng
Browse files

refact heartbeat

parent 4d9bb90a
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
package remoting
import (
"sync"
"time"
)
......@@ -26,13 +27,19 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
var (
// generate request ID for global use
sequence atomic.Int64
// store requestID and response
pendingResponses = new(sync.Map)
)
type SequenceType int64
func init() {
// init request ID
sequence.Store(0)
......@@ -90,6 +97,23 @@ func (response *Response) IsHeartbeat() bool {
return response.Event && response.Result == nil
}
func (response *Response) Handle() {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
type Options struct {
// connect timeout
ConnectTimeout time.Duration
......@@ -142,3 +166,28 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse {
Reply: r.response,
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
......@@ -18,7 +18,6 @@ package remoting
import (
"errors"
"sync"
"time"
)
......@@ -28,19 +27,10 @@ import (
"github.com/apache/dubbo-go/protocol"
)
var (
// store requestID and response
pendingResponses = new(sync.Map)
)
type SequenceType int64
// It is interface of client for network communication.
// If you use getty as network communication, you should define GettyClient that implements this interface.
type Client interface {
SetExchangeClient(client *ExchangeClient)
// responseHandler is used to deal with msg
SetResponseHandler(responseHandler ResponseHandler)
// connect url
Connect(url *common.URL) error
// close
......@@ -63,11 +53,6 @@ type ExchangeClient struct {
init bool
}
// handle the message from server
type ResponseHandler interface {
Handler(response *Response)
}
// create ExchangeClient
func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient {
exchangeClient := &ExchangeClient{
......@@ -82,7 +67,6 @@ func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Durat
}
}
client.SetResponseHandler(exchangeClient)
return exchangeClient
}
......@@ -190,47 +174,3 @@ func (client *ExchangeClient) Close() {
func (client *ExchangeClient) IsAvailable() bool {
return client.client.IsAvailable()
}
// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
......@@ -55,9 +55,13 @@ type (
SSLEnabled bool
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
......@@ -80,7 +84,7 @@ type (
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// heartbeat timeout
......@@ -224,6 +228,12 @@ func (c *ServerConfig) CheckValidity() error {
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
}
if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
......
......@@ -38,12 +38,11 @@ import (
)
var (
errInvalidCodecType = perrors.New("illegal CodecType")
errInvalidAddress = perrors.New("remote address invalid or empty")
errSessionNotExist = perrors.New("session not exist")
errClientClosed = perrors.New("client closed")
errClientReadTimeout = perrors.New("client read timeout")
errHeartbeatReadTimeout = perrors.New("heartbeat read timeout")
errInvalidCodecType = perrors.New("illegal CodecType")
errInvalidAddress = perrors.New("remote address invalid or empty")
errSessionNotExist = perrors.New("session not exist")
errClientClosed = perrors.New("client closed")
errClientReadTimeout = perrors.New("client read timeout")
clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
......@@ -119,13 +118,12 @@ type Options struct {
// Client : some configuration for network communication.
type Client struct {
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
responseHandler remoting.ResponseHandler
ExchangeClient *remoting.ExchangeClient
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
ExchangeClient *remoting.ExchangeClient
}
// create client
......@@ -147,9 +145,6 @@ func NewClient(opt Options) *Client {
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
c.ExchangeClient = client
}
func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
c.responseHandler = responseHandler
}
// init client and try to connection.
func (c *Client) Connect(url *common.URL) error {
......@@ -221,28 +216,6 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
return rpcClient, rpcClient.selectSession(), nil
}
func (c *Client) heartbeat(session getty.Session, timeout time.Duration, callBack func(err error)) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
err := c.transfer(session, req, 3*time.Second)
go func() {
var err1 error
select {
case <-getty.GetTimeWheel().After(timeout):
err1 = errHeartbeatReadTimeout
case <-resp.Done:
err1 = resp.Err
}
callBack(err1)
}()
return perrors.WithStack(err)
}
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
err := session.WritePkg(request, timeout)
return perrors.WithStack(err)
......
......@@ -44,7 +44,8 @@ const (
)
var (
errTooManySessions = perrors.New("too many sessions")
errTooManySessions = perrors.New("too many sessions")
errHeartbeatReadTimeout = perrors.New("heartbeat read timeout")
)
type rpcSession struct {
......@@ -124,7 +125,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if p.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
}
h.conn.pool.rpcClient.responseHandler.Handler(p)
p.Handle()
return
}
......@@ -132,7 +133,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.updateSession(session)
h.conn.pool.rpcClient.responseHandler.Handler(p)
p.Handle()
}
// OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
......@@ -163,7 +164,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
h.timeoutTimes = 0
}
if err := h.conn.pool.rpcClient.heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
if err := heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
}
}
......@@ -179,6 +180,7 @@ type RpcServerHandler struct {
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
server *Server
timeoutTimes int
}
// nolint
......@@ -246,11 +248,8 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
logger.Debugf("get rpc heartbeat response{%#v}", res)
if res.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
res.Handle()
return
}
logger.Errorf("illegal package but not heartbeart. {%#v}", pkg)
......@@ -334,7 +333,25 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
session.Close()
}
heartbeat(session)
heartbeatCallBack := func(err error) {
if err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
if h.timeoutTimes >= 3 {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
return
}
h.timeoutTimes++
return
}
h.timeoutTimes = 0
}
if err := heartbeat(session, h.server.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
}
}
func reply(session getty.Session, resp *remoting.Response) {
......@@ -343,11 +360,24 @@ func reply(session getty.Session, resp *remoting.Response) {
}
}
func heartbeat(session getty.Session) {
func heartbeat(session getty.Session, timeout time.Duration, callBack func(err error)) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
if err := session.WritePkg(req, WritePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req)
}
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
err := session.WritePkg(req, 3*time.Second)
go func() {
var err1 error
select {
case <-getty.GetTimeWheel().After(timeout):
err1 = errHeartbeatReadTimeout
case <-resp.Done:
err1 = resp.Err
}
callBack(err1)
}()
return perrors.WithStack(err)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment