Skip to content
Snippets Groups Projects
Commit 4d9bb90a authored by fangyincheng's avatar fangyincheng
Browse files

modify default heartbeat time & add timeoutTimes in client

parent 71201233
No related branches found
No related tags found
No related merge requests found
......@@ -197,7 +197,7 @@ func (c *ClientConfig) CheckValidity() error {
}
if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 5 * time.Second
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
......@@ -214,7 +214,7 @@ func (c *ServerConfig) CheckValidity() error {
var err error
if len(c.HeartbeatPeriod) == 0 {
c.heartbeatPeriod = 15 * time.Second
c.heartbeatPeriod = 60 * time.Second
} else if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
......
......@@ -66,7 +66,8 @@ func (s *rpcSession) GetReqNum() int32 {
// nolint
type RpcClientHandler struct {
conn *gettyRPCClient
conn *gettyRPCClient
timeoutTimes int
}
// nolint
......@@ -115,7 +116,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
logger.Errorf("illegal request but not heartbeart. {%#v}", req)
return
}
h.timeoutTimes = 0
p := result.Result.(*remoting.Response)
// get heartbeart
if p.Event {
......@@ -126,10 +127,6 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.pool.rpcClient.responseHandler.Handler(p)
return
}
if result.IsRequest {
logger.Errorf("illegal package for it is response type. {%#v}", pkg)
return
}
logger.Debugf("get rpc response{%#v}", p)
......@@ -156,8 +153,14 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
heartbeatCallBack := func(err error) {
if err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
h.conn.removeSession(session)
if h.timeoutTimes >= 3 {
h.conn.removeSession(session)
return
}
h.timeoutTimes++
return
}
h.timeoutTimes = 0
}
if err := h.conn.pool.rpcClient.heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
......@@ -243,10 +246,14 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
logger.Debugf("get rpc heartbeat response{%#v}", res)
if res.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
return
}
logger.Errorf("illegal package for it is response type. {%#v}", pkg)
logger.Errorf("illegal package but not heartbeart. {%#v}", pkg)
return
}
req := decodeResult.Result.(*remoting.Request)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment