diff --git a/remoting/getty/config.go b/remoting/getty/config.go index 2fcc16541deab767f1f0c9ac7560910aba54d038..449946000a17a345c5265e1c7334a133503f6623 100644 --- a/remoting/getty/config.go +++ b/remoting/getty/config.go @@ -197,7 +197,7 @@ func (c *ClientConfig) CheckValidity() error { } if len(c.HeartbeatTimeout) == 0 { - c.heartbeatTimeout = 5 * time.Second + c.heartbeatTimeout = 60 * time.Second } else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil { return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout) } @@ -214,7 +214,7 @@ func (c *ServerConfig) CheckValidity() error { var err error if len(c.HeartbeatPeriod) == 0 { - c.heartbeatPeriod = 15 * time.Second + c.heartbeatPeriod = 60 * time.Second } else if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil { return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod) } diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index d2b01ed0d8f319a76f1c929d33bc71a05acc3ff1..3b8c0b5823ab90ef10f5ccafe913980a0daab496 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -66,7 +66,8 @@ func (s *rpcSession) GetReqNum() int32 { // nolint type RpcClientHandler struct { - conn *gettyRPCClient + conn *gettyRPCClient + timeoutTimes int } // nolint @@ -115,7 +116,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { logger.Errorf("illegal request but not heartbeart. {%#v}", req) return } - + h.timeoutTimes = 0 p := result.Result.(*remoting.Response) // get heartbeart if p.Event { @@ -126,10 +127,6 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { h.conn.pool.rpcClient.responseHandler.Handler(p) return } - if result.IsRequest { - logger.Errorf("illegal package for it is response type. {%#v}", pkg) - return - } logger.Debugf("get rpc response{%#v}", p) @@ -156,8 +153,14 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { heartbeatCallBack := func(err error) { if err != nil { logger.Warnf("failed to send heartbeat, error{%v}", err) - h.conn.removeSession(session) + if h.timeoutTimes >= 3 { + h.conn.removeSession(session) + return + } + h.timeoutTimes++ + return } + h.timeoutTimes = 0 } if err := h.conn.pool.rpcClient.heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil { @@ -243,10 +246,14 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { logger.Debugf("get rpc heartbeat response{%#v}", res) if res.Error != nil { logger.Errorf("rpc heartbeat response{error: %#v}", res.Error) + h.rwlock.Lock() + delete(h.sessionMap, session) + h.rwlock.Unlock() + session.Close() } return } - logger.Errorf("illegal package for it is response type. {%#v}", pkg) + logger.Errorf("illegal package but not heartbeart. {%#v}", pkg) return } req := decodeResult.Result.(*remoting.Request)