Skip to content
Snippets Groups Projects
Commit 7a27aa96 authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge pull request #764 from georgehao/refact-seri

feat: update the comment of getty/listener
parent 2ceca6f2
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,7 @@ import ( ...@@ -39,7 +39,7 @@ import (
// todo: WritePkg_Timeout will entry *.yml // todo: WritePkg_Timeout will entry *.yml
const ( const (
// WritePkg_Timeout ... // WritePkg_Timeout the timeout of write pkg
WritePkg_Timeout = 5 * time.Second WritePkg_Timeout = 5 * time.Second
) )
...@@ -64,35 +64,35 @@ func (s *rpcSession) GetReqNum() int32 { ...@@ -64,35 +64,35 @@ func (s *rpcSession) GetReqNum() int32 {
// RpcClientHandler // RpcClientHandler
// ////////////////////////////////////////// // //////////////////////////////////////////
// RpcClientHandler ... // nolint
type RpcClientHandler struct { type RpcClientHandler struct {
conn *gettyRPCClient conn *gettyRPCClient
} }
// NewRpcClientHandler ... // nolint
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler { func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client} return &RpcClientHandler{conn: client}
} }
// OnOpen ... // OnOpen call the getty client session opened, add the session to getty client session list
func (h *RpcClientHandler) OnOpen(session getty.Session) error { func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session) h.conn.addSession(session)
return nil return nil
} }
// OnError ... // OnError the getty client session has errored, so remove the session from the getty client session list
func (h *RpcClientHandler) OnError(session getty.Session, err error) { func (h *RpcClientHandler) OnError(session getty.Session, err error) {
logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session) h.conn.removeSession(session)
} }
// OnClose ... // OnClose close the session, remove it from the getty session list
func (h *RpcClientHandler) OnClose(session getty.Session) { func (h *RpcClientHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat()) logger.Infof("session{%s} is closing......", session.Stat())
h.conn.removeSession(session) h.conn.removeSession(session)
} }
// OnMessage ... // OnMessage get response from getty server, and update the session to the getty client session list
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
result, ok := pkg.(remoting.DecodeResult) result, ok := pkg.(remoting.DecodeResult)
if !ok { if !ok {
...@@ -138,7 +138,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -138,7 +138,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.pool.rpcClient.responseHandler.Handler(p) h.conn.pool.rpcClient.responseHandler.Handler(p)
} }
// OnCron ... // OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
func (h *RpcClientHandler) OnCron(session getty.Session) { func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.conn.getClientRpcSession(session) rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil { if err != nil {
...@@ -160,7 +160,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { ...@@ -160,7 +160,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
// RpcServerHandler // RpcServerHandler
// ////////////////////////////////////////// // //////////////////////////////////////////
// RpcServerHandler implement EventListener of getty. // nolint
type RpcServerHandler struct { type RpcServerHandler struct {
maxSessionNum int maxSessionNum int
sessionTimeout time.Duration sessionTimeout time.Duration
...@@ -169,7 +169,7 @@ type RpcServerHandler struct { ...@@ -169,7 +169,7 @@ type RpcServerHandler struct {
server *Server server *Server
} }
// NewRpcServerHandler ... // nolint
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration, serverP *Server) *RpcServerHandler { func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration, serverP *Server) *RpcServerHandler {
return &RpcServerHandler{ return &RpcServerHandler{
maxSessionNum: maxSessionNum, maxSessionNum: maxSessionNum,
...@@ -179,7 +179,8 @@ func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration, server ...@@ -179,7 +179,8 @@ func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration, server
} }
} }
// OnOpen ... // OnOpen call server session opened, add the session to getty server session list. also onOpen
// will check the max getty server session number
func (h *RpcServerHandler) OnOpen(session getty.Session) error { func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error var err error
h.rwlock.RLock() h.rwlock.RLock()
...@@ -198,7 +199,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error { ...@@ -198,7 +199,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error {
return nil return nil
} }
// OnError ... // OnError the getty server session has errored, so remove the session from the getty server session list
func (h *RpcServerHandler) OnError(session getty.Session, err error) { func (h *RpcServerHandler) OnError(session getty.Session, err error) {
logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock() h.rwlock.Lock()
...@@ -206,7 +207,7 @@ func (h *RpcServerHandler) OnError(session getty.Session, err error) { ...@@ -206,7 +207,7 @@ func (h *RpcServerHandler) OnError(session getty.Session, err error) {
h.rwlock.Unlock() h.rwlock.Unlock()
} }
// OnClose ... // OnClose close the session, remove it from the getty server list
func (h *RpcServerHandler) OnClose(session getty.Session) { func (h *RpcServerHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat()) logger.Infof("session{%s} is closing......", session.Stat())
h.rwlock.Lock() h.rwlock.Lock()
...@@ -214,7 +215,7 @@ func (h *RpcServerHandler) OnClose(session getty.Session) { ...@@ -214,7 +215,7 @@ func (h *RpcServerHandler) OnClose(session getty.Session) {
h.rwlock.Unlock() h.rwlock.Unlock()
} }
// OnMessage ... // OnMessage get request from getty client, update the session reqNum and reply response to client
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock() h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok { if _, ok := h.sessionMap[session]; ok {
...@@ -285,7 +286,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -285,7 +286,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
reply(session, resp, hessian.PackageResponse) reply(session, resp, hessian.PackageResponse)
} }
// OnCron ... // OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
func (h *RpcServerHandler) OnCron(session getty.Session) { func (h *RpcServerHandler) OnCron(session getty.Session) {
var ( var (
flag bool flag bool
...@@ -312,7 +313,6 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { ...@@ -312,7 +313,6 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
} }
func reply(session getty.Session, resp *remoting.Response, tp hessian.PackageType) { func reply(session getty.Session, resp *remoting.Response, tp hessian.PackageType) {
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), resp) logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), resp)
} }
......
...@@ -104,7 +104,6 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler { ...@@ -104,7 +104,6 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
req, length, err := (p.server.codec).Decode(data) req, length, err := (p.server.codec).Decode(data)
//resp,len, err := (*p.).DecodeResponse(buf) //resp,len, err := (*p.).DecodeResponse(buf)
if err != nil { if err != nil {
if err == hessian.ErrHeaderNotEnough || err == hessian.ErrBodyNotEnough { if err == hessian.ErrHeaderNotEnough || err == hessian.ErrBodyNotEnough {
return nil, 0, nil return nil, 0, nil
......
module github.com/apache/dubbo-go/test/integrate/dubbo/go-client module github.com/apache/dubbo-go/test/integrate/dubbo/go-client
require github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44 require github.com/apache/dubbo-go-hessian2 v1.7.0
go 1.13 go 1.13
module github.com/apache/dubbo-go/test/integrate/dubbo/go-server module github.com/apache/dubbo-go/test/integrate/dubbo/go-server
require github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44 require github.com/apache/dubbo-go-hessian2 v1.7.0
go 1.13 go 1.13
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment