Skip to content
Snippets Groups Projects
Commit bb7bf3e0 authored by xg.gao's avatar xg.gao
Browse files

Merge branch 'develop' into read

parents 0249c47e 68ff8be7
No related branches found
No related tags found
No related merge requests found
...@@ -246,6 +246,11 @@ If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it ...@@ -246,6 +246,11 @@ If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it
<img width="222px" src="https://fscdn.zto.com/fs8/M02/B2/E4/wKhBD1-8o52Ae3GnAAASU3r62ME040.png"> <img width="222px" src="https://fscdn.zto.com/fs8/M02/B2/E4/wKhBD1-8o52Ae3GnAAASU3r62ME040.png">
</a> </a>
</td> </td>
<td align="center" valign="middle">
<a href="https://www.icsoc.net/" target="_blank">
<img width="222px" src="https://oss.icsoc.net/icsoc-ekt-test-files/icsoc.png">
</a>
</td>
</tr> </tr>
<tr></tr> <tr></tr>
</tbody> </tbody>
......
...@@ -244,6 +244,11 @@ make test ...@@ -244,6 +244,11 @@ make test
<img width="222px" src="https://fscdn.zto.com/fs8/M02/B2/E4/wKhBD1-8o52Ae3GnAAASU3r62ME040.png"> <img width="222px" src="https://fscdn.zto.com/fs8/M02/B2/E4/wKhBD1-8o52Ae3GnAAASU3r62ME040.png">
</a> </a>
</td> </td>
<td align="center" valign="middle">
<a href="https://www.icsoc.net/" target="_blank">
<img width="222px" src="https://oss.icsoc.net/icsoc-ekt-test-files/icsoc.png">
</a>
</td>
</tr> </tr>
<tr></tr> <tr></tr>
</tbody> </tbody>
......
...@@ -66,7 +66,7 @@ func initRouterConfig(content []byte, factories map[string]router.FilePriorityRo ...@@ -66,7 +66,7 @@ func initRouterConfig(content []byte, factories map[string]router.FilePriorityRo
r, e := factory.NewFileRouter(content) r, e := factory.NewFileRouter(content)
if e == nil { if e == nil {
url := r.URL() url := r.URL()
routerURLSet.Add(&url) routerURLSet.Add(url)
return nil return nil
} }
logger.Warnf("router config type %s create fail {%v}\n", k, e) logger.Warnf("router config type %s create fail {%v}\n", k, e)
......
...@@ -489,6 +489,10 @@ func getArgType(v interface{}) string { ...@@ -489,6 +489,10 @@ func getArgType(v interface{}) string {
} }
switch t.Kind() { switch t.Kind() {
case reflect.Struct: case reflect.Struct:
v, ok := v.(hessian.POJO)
if ok {
return v.JavaClassName()
}
return "java.lang.Object" return "java.lang.Object"
case reflect.Slice, reflect.Array: case reflect.Slice, reflect.Array:
if t.Elem().Kind() == reflect.Struct { if t.Elem().Kind() == reflect.Struct {
......
...@@ -104,6 +104,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { ...@@ -104,6 +104,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
func (l *RegistryDataListener) Close() { func (l *RegistryDataListener) Close() {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
l.closed = true
for _, listener := range l.subscribed { for _, listener := range l.subscribed {
listener.(*RegistryConfigurationListener).Close() listener.(*RegistryConfigurationListener).Close()
} }
......
...@@ -127,15 +127,15 @@ func (r *zkRegistry) InitListeners() { ...@@ -127,15 +127,15 @@ func (r *zkRegistry) InitListeners() {
oldDataListener := r.dataListener oldDataListener := r.dataListener
oldDataListener.mutex.Lock() oldDataListener.mutex.Lock()
defer oldDataListener.mutex.Unlock() defer oldDataListener.mutex.Unlock()
recoverd := r.dataListener.subscribed r.dataListener.closed = true
if recoverd != nil && len(recoverd) > 0 { recovered := r.dataListener.subscribed
if recovered != nil && len(recovered) > 0 {
// recover all subscribed url // recover all subscribed url
for _, oldListener := range recoverd { for _, oldListener := range recovered {
var ( var (
regConfigListener *RegistryConfigurationListener regConfigListener *RegistryConfigurationListener
ok bool ok bool
) )
if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok { if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok {
regConfigListener.Close() regConfigListener.Close()
} }
...@@ -212,6 +212,9 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { ...@@ -212,6 +212,9 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
r.cltLock.Lock() r.cltLock.Lock()
defer r.cltLock.Unlock() defer r.cltLock.Unlock()
if r.client == nil{
return perrors.WithStack(perrors.New("zk client already been closed"))
}
err = r.client.Create(root) err = r.client.Create(root)
if err != nil { if err != nil {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err)) logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
...@@ -292,10 +295,10 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL ...@@ -292,10 +295,10 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
r.dataListener.mutex.Lock() r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf.ServiceKey()] configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
if configurationListener != nil { if configurationListener != nil {
zkListener, _ := configurationListener.(*RegistryConfigurationListener) zkListener, _ := configurationListener.(*RegistryConfigurationListener)
if zkListener != nil { if zkListener != nil {
if zkListener.isClosed { if zkListener.isClosed {
r.dataListener.mutex.Unlock()
return nil, perrors.New("configListener already been closed") return nil, perrors.New("configListener already been closed")
} }
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package remoting package remoting
import ( import (
"sync"
"time" "time"
) )
...@@ -26,13 +27,19 @@ import ( ...@@ -26,13 +27,19 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
) )
var ( var (
// generate request ID for global use // generate request ID for global use
sequence atomic.Int64 sequence atomic.Int64
// store requestID and response
pendingResponses = new(sync.Map)
) )
type SequenceType int64
func init() { func init() {
// init request ID // init request ID
sequence.Store(0) sequence.Store(0)
...@@ -90,6 +97,23 @@ func (response *Response) IsHeartbeat() bool { ...@@ -90,6 +97,23 @@ func (response *Response) IsHeartbeat() bool {
return response.Event && response.Result == nil return response.Event && response.Result == nil
} }
func (response *Response) Handle() {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
type Options struct { type Options struct {
// connect timeout // connect timeout
ConnectTimeout time.Duration ConnectTimeout time.Duration
...@@ -142,3 +166,28 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse { ...@@ -142,3 +166,28 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse {
Reply: r.response, Reply: r.response,
} }
} }
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
...@@ -18,7 +18,6 @@ package remoting ...@@ -18,7 +18,6 @@ package remoting
import ( import (
"errors" "errors"
"sync"
"time" "time"
) )
...@@ -28,19 +27,10 @@ import ( ...@@ -28,19 +27,10 @@ import (
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
var (
// store requestID and response
pendingResponses = new(sync.Map)
)
type SequenceType int64
// It is interface of client for network communication. // It is interface of client for network communication.
// If you use getty as network communication, you should define GettyClient that implements this interface. // If you use getty as network communication, you should define GettyClient that implements this interface.
type Client interface { type Client interface {
SetExchangeClient(client *ExchangeClient) SetExchangeClient(client *ExchangeClient)
// responseHandler is used to deal with msg
SetResponseHandler(responseHandler ResponseHandler)
// connect url // connect url
Connect(url *common.URL) error Connect(url *common.URL) error
// close // close
...@@ -63,11 +53,6 @@ type ExchangeClient struct { ...@@ -63,11 +53,6 @@ type ExchangeClient struct {
init bool init bool
} }
// handle the message from server
type ResponseHandler interface {
Handler(response *Response)
}
// create ExchangeClient // create ExchangeClient
func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient { func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient {
exchangeClient := &ExchangeClient{ exchangeClient := &ExchangeClient{
...@@ -82,7 +67,6 @@ func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Durat ...@@ -82,7 +67,6 @@ func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Durat
} }
} }
client.SetResponseHandler(exchangeClient)
return exchangeClient return exchangeClient
} }
...@@ -190,47 +174,3 @@ func (client *ExchangeClient) Close() { ...@@ -190,47 +174,3 @@ func (client *ExchangeClient) Close() {
func (client *ExchangeClient) IsAvailable() bool { func (client *ExchangeClient) IsAvailable() bool {
return client.client.IsAvailable() return client.client.IsAvailable()
} }
// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
...@@ -54,6 +54,14 @@ type ( ...@@ -54,6 +54,14 @@ type (
ServerConfig struct { ServerConfig struct {
SSLEnabled bool SSLEnabled bool
// heartbeat
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration
// session // session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration sessionTimeout time.Duration
...@@ -76,9 +84,13 @@ type ( ...@@ -76,9 +84,13 @@ type (
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"` ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat // heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"` HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration heartbeatPeriod time.Duration
// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration
// session // session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration sessionTimeout time.Duration
...@@ -188,6 +200,12 @@ func (c *ClientConfig) CheckValidity() error { ...@@ -188,6 +200,12 @@ func (c *ClientConfig) CheckValidity() error {
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan)) c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
} }
if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
} }
...@@ -199,6 +217,23 @@ func (c *ClientConfig) CheckValidity() error { ...@@ -199,6 +217,23 @@ func (c *ClientConfig) CheckValidity() error {
func (c *ServerConfig) CheckValidity() error { func (c *ServerConfig) CheckValidity() error {
var err error var err error
if len(c.HeartbeatPeriod) == 0 {
c.heartbeatPeriod = 60 * time.Second
} else if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
if c.heartbeatPeriod >= time.Duration(config.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat_period %s should be less than %s",
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
}
if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
} }
......
...@@ -118,13 +118,12 @@ type Options struct { ...@@ -118,13 +118,12 @@ type Options struct {
// Client : some configuration for network communication. // Client : some configuration for network communication.
type Client struct { type Client struct {
addr string addr string
opts Options opts Options
conf ClientConfig conf ClientConfig
pool *gettyRPCClientPool pool *gettyRPCClientPool
codec remoting.Codec codec remoting.Codec
responseHandler remoting.ResponseHandler ExchangeClient *remoting.ExchangeClient
ExchangeClient *remoting.ExchangeClient
} }
// create client // create client
...@@ -146,9 +145,6 @@ func NewClient(opt Options) *Client { ...@@ -146,9 +145,6 @@ func NewClient(opt Options) *Client {
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) { func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
c.ExchangeClient = client c.ExchangeClient = client
} }
func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
c.responseHandler = responseHandler
}
// init client and try to connection. // init client and try to connection.
func (c *Client) Connect(url *common.URL) error { func (c *Client) Connect(url *common.URL) error {
...@@ -220,15 +216,6 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err ...@@ -220,15 +216,6 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
return rpcClient, rpcClient.selectSession(), nil return rpcClient, rpcClient.selectSession(), nil
} }
func (c *Client) heartbeat(session getty.Session) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
return c.transfer(session, req, 3*time.Second)
}
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error { func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
err := session.WritePkg(request, timeout) err := session.WritePkg(request, timeout)
return perrors.WithStack(err) return perrors.WithStack(err)
......
...@@ -88,7 +88,6 @@ func getClient(url *common.URL) *Client { ...@@ -88,7 +88,6 @@ func getClient(url *common.URL) *Client {
exchangeClient := remoting.NewExchangeClient(url, client, 5*time.Second, false) exchangeClient := remoting.NewExchangeClient(url, client, 5*time.Second, false)
client.SetExchangeClient(exchangeClient) client.SetExchangeClient(exchangeClient)
client.Connect(url) client.Connect(url)
client.SetResponseHandler(exchangeClient)
return client return client
} }
......
...@@ -154,7 +154,7 @@ func (s *Server) newSession(session getty.Session) error { ...@@ -154,7 +154,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout) session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat()) logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool) session.SetTaskPool(srvGrpool)
...@@ -195,7 +195,7 @@ func (s *Server) newSession(session getty.Session) error { ...@@ -195,7 +195,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout) session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session: %s", session.Stat()) logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool) session.SetTaskPool(srvGrpool)
......
...@@ -44,7 +44,8 @@ const ( ...@@ -44,7 +44,8 @@ const (
) )
var ( var (
errTooManySessions = perrors.New("too many sessions") errTooManySessions = perrors.New("too many sessions")
errHeartbeatReadTimeout = perrors.New("heartbeat read timeout")
) )
type rpcSession struct { type rpcSession struct {
...@@ -66,7 +67,8 @@ func (s *rpcSession) GetReqNum() int32 { ...@@ -66,7 +67,8 @@ func (s *rpcSession) GetReqNum() int32 {
// nolint // nolint
type RpcClientHandler struct { type RpcClientHandler struct {
conn *gettyRPCClient conn *gettyRPCClient
timeoutTimes int
} }
// nolint // nolint
...@@ -109,13 +111,13 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -109,13 +111,13 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
resp.Event = req.Event resp.Event = req.Event
resp.SerialID = req.SerialID resp.SerialID = req.SerialID
resp.Version = "2.0.2" resp.Version = "2.0.2"
reply(session, resp, hessian.PackageHeartbeat) reply(session, resp)
return return
} }
logger.Errorf("illegal request but not heartbeart. {%#v}", req) logger.Errorf("illegal request but not heartbeart. {%#v}", req)
return return
} }
h.timeoutTimes = 0
p := result.Result.(*remoting.Response) p := result.Result.(*remoting.Response)
// get heartbeart // get heartbeart
if p.Event { if p.Event {
...@@ -123,11 +125,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -123,11 +125,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if p.Error != nil { if p.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Error) logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
} }
h.conn.pool.rpcClient.responseHandler.Handler(p) p.Handle()
return
}
if result.IsRequest {
logger.Errorf("illegal package for it is response type. {%#v}", pkg)
return return
} }
...@@ -135,7 +133,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -135,7 +133,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.updateSession(session) h.conn.updateSession(session)
h.conn.pool.rpcClient.responseHandler.Handler(p) p.Handle()
} }
// OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session // OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
...@@ -153,8 +151,21 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { ...@@ -153,8 +151,21 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
return return
} }
if err := h.conn.pool.rpcClient.heartbeat(session); err != nil { heartbeatCallBack := func(err error) {
logger.Warnf("failed to send heartbeat, error: %v", err) if err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
if h.timeoutTimes >= 3 {
h.conn.removeSession(session)
return
}
h.timeoutTimes++
return
}
h.timeoutTimes = 0
}
if err := heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
} }
} }
...@@ -169,6 +180,7 @@ type RpcServerHandler struct { ...@@ -169,6 +180,7 @@ type RpcServerHandler struct {
sessionMap map[getty.Session]*rpcSession sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex rwlock sync.RWMutex
server *Server server *Server
timeoutTimes int
} }
// nolint // nolint
...@@ -231,7 +243,16 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -231,7 +243,16 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
return return
} }
if !decodeResult.IsRequest { if !decodeResult.IsRequest {
logger.Errorf("illegal package for it is response type. {%#v}", pkg) res := decodeResult.Result.(*remoting.Response)
if res.Event {
logger.Debugf("get rpc heartbeat response{%#v}", res)
if res.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
}
res.Handle()
return
}
logger.Errorf("illegal package but not heartbeart. {%#v}", pkg)
return return
} }
req := decodeResult.Result.(*remoting.Request) req := decodeResult.Result.(*remoting.Request)
...@@ -245,7 +266,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -245,7 +266,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
// heartbeat // heartbeat
if req.Event { if req.Event {
logger.Debugf("get rpc heartbeat request{%#v}", resp) logger.Debugf("get rpc heartbeat request{%#v}", resp)
reply(session, resp, hessian.PackageHeartbeat) reply(session, resp)
return return
} }
...@@ -266,7 +287,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -266,7 +287,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if !req.TwoWay { if !req.TwoWay {
return return
} }
reply(session, resp, hessian.PackageResponse) reply(session, resp)
} }
}() }()
...@@ -284,7 +305,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -284,7 +305,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
return return
} }
resp.Result = result resp.Result = result
reply(session, resp, hessian.PackageResponse) reply(session, resp)
} }
// OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session // OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
...@@ -311,10 +332,52 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { ...@@ -311,10 +332,52 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
h.rwlock.Unlock() h.rwlock.Unlock()
session.Close() session.Close()
} }
heartbeatCallBack := func(err error) {
if err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
if h.timeoutTimes >= 3 {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
return
}
h.timeoutTimes++
return
}
h.timeoutTimes = 0
}
if err := heartbeat(session, h.server.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
}
} }
func reply(session getty.Session, resp *remoting.Response, tp hessian.PackageType) { func reply(session getty.Session, resp *remoting.Response) {
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), resp) logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), resp)
} }
} }
func heartbeat(session getty.Session, timeout time.Duration, callBack func(err error)) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
err := session.WritePkg(req, 3*time.Second)
go func() {
var err1 error
select {
case <-getty.GetTimeWheel().After(timeout):
err1 = errHeartbeatReadTimeout
case <-resp.Done:
err1 = resp.Err
}
callBack(err1)
}()
return perrors.WithStack(err)
}
...@@ -68,18 +68,27 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -68,18 +68,27 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
// Write send the data to server // Write send the data to server
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
req, ok := pkg.(*remoting.Request) req, ok := pkg.(*remoting.Request)
if !ok { if ok {
logger.Errorf("illegal pkg:%+v\n", pkg) buf, err := (p.client.codec).EncodeRequest(req)
return nil, perrors.New("invalid rpc request") if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
} }
buf, err := (p.client.codec).EncodeRequest(req) res, ok := pkg.(*remoting.Response)
if err != nil { if ok {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err)) buf, err := (p.client.codec).EncodeResponse(res)
return nil, perrors.WithStack(err) if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
} }
return buf.Bytes(), nil logger.Errorf("illegal pkg:%+v\n", pkg)
return nil, perrors.New("invalid rpc request")
} }
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -120,16 +129,26 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -120,16 +129,26 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
// Write send the data to client // Write send the data to client
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
res, ok := pkg.(*remoting.Response) res, ok := pkg.(*remoting.Response)
if !ok { if ok {
logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) buf, err := (p.server.codec).EncodeResponse(res)
return nil, perrors.New("invalid rpc response") if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
} }
buf, err := (p.server.codec).EncodeResponse(res) req, ok := pkg.(*remoting.Request)
if err != nil { if ok {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err)) buf, err := (p.server.codec).EncodeRequest(req)
return nil, perrors.WithStack(err) if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
} }
return buf.Bytes(), nil logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return nil, perrors.New("invalid rpc response")
} }
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment