Skip to content
Snippets Groups Projects
Commit 5fb4a2ea authored by xg.gao's avatar xg.gao
Browse files

Merge branch 'develop' into read

parents 4cecf966 b5c79235
No related branches found
No related tags found
No related merge requests found
......@@ -246,6 +246,11 @@ If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it
<img width="222px" src="https://fscdn.zto.com/fs8/M02/B2/E4/wKhBD1-8o52Ae3GnAAASU3r62ME040.png">
</a>
</td>
<td align="center" valign="middle">
<a href="https://www.icsoc.net/" target="_blank">
<img width="222px" src="https://oss.icsoc.net/icsoc-ekt-test-files/icsoc.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
......
......@@ -244,6 +244,11 @@ make test
<img width="222px" src="https://fscdn.zto.com/fs8/M02/B2/E4/wKhBD1-8o52Ae3GnAAASU3r62ME040.png">
</a>
</td>
<td align="center" valign="middle">
<a href="https://www.icsoc.net/" target="_blank">
<img width="222px" src="https://oss.icsoc.net/icsoc-ekt-test-files/icsoc.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
......
......@@ -66,7 +66,7 @@ func initRouterConfig(content []byte, factories map[string]router.FilePriorityRo
r, e := factory.NewFileRouter(content)
if e == nil {
url := r.URL()
routerURLSet.Add(&url)
routerURLSet.Add(url)
return nil
}
logger.Warnf("router config type %s create fail {%v}\n", k, e)
......
......@@ -489,6 +489,10 @@ func getArgType(v interface{}) string {
}
switch t.Kind() {
case reflect.Struct:
v, ok := v.(hessian.POJO)
if ok {
return v.JavaClassName()
}
return "java.lang.Object"
case reflect.Slice, reflect.Array:
if t.Elem().Kind() == reflect.Struct {
......
......@@ -104,6 +104,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
func (l *RegistryDataListener) Close() {
l.mutex.Lock()
defer l.mutex.Unlock()
l.closed = true
for _, listener := range l.subscribed {
listener.(*RegistryConfigurationListener).Close()
}
......
......@@ -127,15 +127,15 @@ func (r *zkRegistry) InitListeners() {
oldDataListener := r.dataListener
oldDataListener.mutex.Lock()
defer oldDataListener.mutex.Unlock()
recoverd := r.dataListener.subscribed
if recoverd != nil && len(recoverd) > 0 {
r.dataListener.closed = true
recovered := r.dataListener.subscribed
if recovered != nil && len(recovered) > 0 {
// recover all subscribed url
for _, oldListener := range recoverd {
for _, oldListener := range recovered {
var (
regConfigListener *RegistryConfigurationListener
ok bool
)
if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok {
regConfigListener.Close()
}
......@@ -212,6 +212,9 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
r.cltLock.Lock()
defer r.cltLock.Unlock()
if r.client == nil{
return perrors.WithStack(perrors.New("zk client already been closed"))
}
err = r.client.Create(root)
if err != nil {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
......@@ -292,10 +295,10 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
if configurationListener != nil {
zkListener, _ := configurationListener.(*RegistryConfigurationListener)
if zkListener != nil {
if zkListener.isClosed {
r.dataListener.mutex.Unlock()
return nil, perrors.New("configListener already been closed")
}
}
......
......@@ -17,6 +17,7 @@
package remoting
import (
"sync"
"time"
)
......@@ -26,13 +27,19 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
var (
// generate request ID for global use
sequence atomic.Int64
// store requestID and response
pendingResponses = new(sync.Map)
)
type SequenceType int64
func init() {
// init request ID
sequence.Store(0)
......@@ -90,6 +97,23 @@ func (response *Response) IsHeartbeat() bool {
return response.Event && response.Result == nil
}
func (response *Response) Handle() {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
type Options struct {
// connect timeout
ConnectTimeout time.Duration
......@@ -142,3 +166,28 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse {
Reply: r.response,
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
......@@ -18,7 +18,6 @@ package remoting
import (
"errors"
"sync"
"time"
)
......@@ -28,19 +27,10 @@ import (
"github.com/apache/dubbo-go/protocol"
)
var (
// store requestID and response
pendingResponses = new(sync.Map)
)
type SequenceType int64
// It is interface of client for network communication.
// If you use getty as network communication, you should define GettyClient that implements this interface.
type Client interface {
SetExchangeClient(client *ExchangeClient)
// responseHandler is used to deal with msg
SetResponseHandler(responseHandler ResponseHandler)
// connect url
Connect(url *common.URL) error
// close
......@@ -63,11 +53,6 @@ type ExchangeClient struct {
init bool
}
// handle the message from server
type ResponseHandler interface {
Handler(response *Response)
}
// create ExchangeClient
func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient {
exchangeClient := &ExchangeClient{
......@@ -82,7 +67,6 @@ func NewExchangeClient(url *common.URL, client Client, connectTimeout time.Durat
}
}
client.SetResponseHandler(exchangeClient)
return exchangeClient
}
......@@ -190,47 +174,3 @@ func (client *ExchangeClient) Close() {
func (client *ExchangeClient) IsAvailable() bool {
return client.client.IsAvailable()
}
// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
close(pendingResponse.Done)
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}
......@@ -54,6 +54,14 @@ type (
ServerConfig struct {
SSLEnabled bool
// heartbeat
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
......@@ -76,9 +84,13 @@ type (
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
HeartbeatPeriod string `default:"60s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// heartbeat timeout
HeartbeatTimeout string `default:"5s" yaml:"heartbeat_timeout" json:"heartbeat_timeout,omitempty"`
heartbeatTimeout time.Duration
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
......@@ -188,6 +200,12 @@ func (c *ClientConfig) CheckValidity() error {
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
}
if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
......@@ -199,6 +217,23 @@ func (c *ClientConfig) CheckValidity() error {
func (c *ServerConfig) CheckValidity() error {
var err error
if len(c.HeartbeatPeriod) == 0 {
c.heartbeatPeriod = 60 * time.Second
} else if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
if c.heartbeatPeriod >= time.Duration(config.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat_period %s should be less than %s",
c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
}
if len(c.HeartbeatTimeout) == 0 {
c.heartbeatTimeout = 60 * time.Second
} else if c.heartbeatTimeout, err = time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
......
......@@ -118,13 +118,12 @@ type Options struct {
// Client : some configuration for network communication.
type Client struct {
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
responseHandler remoting.ResponseHandler
ExchangeClient *remoting.ExchangeClient
addr string
opts Options
conf ClientConfig
pool *gettyRPCClientPool
codec remoting.Codec
ExchangeClient *remoting.ExchangeClient
}
// create client
......@@ -146,9 +145,6 @@ func NewClient(opt Options) *Client {
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
c.ExchangeClient = client
}
func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
c.responseHandler = responseHandler
}
// init client and try to connection.
func (c *Client) Connect(url *common.URL) error {
......@@ -220,15 +216,6 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
return rpcClient, rpcClient.selectSession(), nil
}
func (c *Client) heartbeat(session getty.Session) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
return c.transfer(session, req, 3*time.Second)
}
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
err := session.WritePkg(request, timeout)
return perrors.WithStack(err)
......
......@@ -88,7 +88,6 @@ func getClient(url *common.URL) *Client {
exchangeClient := remoting.NewExchangeClient(url, client, 5*time.Second, false)
client.SetExchangeClient(exchangeClient)
client.Connect(url)
client.SetResponseHandler(exchangeClient)
return client
}
......
......@@ -154,7 +154,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
......@@ -195,7 +195,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)
......
......@@ -44,7 +44,8 @@ const (
)
var (
errTooManySessions = perrors.New("too many sessions")
errTooManySessions = perrors.New("too many sessions")
errHeartbeatReadTimeout = perrors.New("heartbeat read timeout")
)
type rpcSession struct {
......@@ -66,7 +67,8 @@ func (s *rpcSession) GetReqNum() int32 {
// nolint
type RpcClientHandler struct {
conn *gettyRPCClient
conn *gettyRPCClient
timeoutTimes int
}
// nolint
......@@ -109,13 +111,13 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
resp.Event = req.Event
resp.SerialID = req.SerialID
resp.Version = "2.0.2"
reply(session, resp, hessian.PackageHeartbeat)
reply(session, resp)
return
}
logger.Errorf("illegal request but not heartbeart. {%#v}", req)
return
}
h.timeoutTimes = 0
p := result.Result.(*remoting.Response)
// get heartbeart
if p.Event {
......@@ -123,11 +125,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if p.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
}
h.conn.pool.rpcClient.responseHandler.Handler(p)
return
}
if result.IsRequest {
logger.Errorf("illegal package for it is response type. {%#v}", pkg)
p.Handle()
return
}
......@@ -135,7 +133,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.updateSession(session)
h.conn.pool.rpcClient.responseHandler.Handler(p)
p.Handle()
}
// OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
......@@ -153,8 +151,21 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
return
}
if err := h.conn.pool.rpcClient.heartbeat(session); err != nil {
logger.Warnf("failed to send heartbeat, error: %v", err)
heartbeatCallBack := func(err error) {
if err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
if h.timeoutTimes >= 3 {
h.conn.removeSession(session)
return
}
h.timeoutTimes++
return
}
h.timeoutTimes = 0
}
if err := heartbeat(session, h.conn.pool.rpcClient.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
}
}
......@@ -169,6 +180,7 @@ type RpcServerHandler struct {
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
server *Server
timeoutTimes int
}
// nolint
......@@ -231,7 +243,16 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
if !decodeResult.IsRequest {
logger.Errorf("illegal package for it is response type. {%#v}", pkg)
res := decodeResult.Result.(*remoting.Response)
if res.Event {
logger.Debugf("get rpc heartbeat response{%#v}", res)
if res.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
}
res.Handle()
return
}
logger.Errorf("illegal package but not heartbeart. {%#v}", pkg)
return
}
req := decodeResult.Result.(*remoting.Request)
......@@ -245,7 +266,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
// heartbeat
if req.Event {
logger.Debugf("get rpc heartbeat request{%#v}", resp)
reply(session, resp, hessian.PackageHeartbeat)
reply(session, resp)
return
}
......@@ -266,7 +287,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if !req.TwoWay {
return
}
reply(session, resp, hessian.PackageResponse)
reply(session, resp)
}
}()
......@@ -284,7 +305,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
resp.Result = result
reply(session, resp, hessian.PackageResponse)
reply(session, resp)
}
// OnCron check the session health periodic. if the session's sessionTimeout has reached, just close the session
......@@ -311,10 +332,52 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
h.rwlock.Unlock()
session.Close()
}
heartbeatCallBack := func(err error) {
if err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
if h.timeoutTimes >= 3 {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
return
}
h.timeoutTimes++
return
}
h.timeoutTimes = 0
}
if err := heartbeat(session, h.server.conf.heartbeatTimeout, heartbeatCallBack); err != nil {
logger.Warnf("failed to send heartbeat, error{%v}", err)
}
}
func reply(session getty.Session, resp *remoting.Response, tp hessian.PackageType) {
func reply(session getty.Session, resp *remoting.Response) {
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), resp)
}
}
func heartbeat(session getty.Session, timeout time.Duration, callBack func(err error)) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
err := session.WritePkg(req, 3*time.Second)
go func() {
var err1 error
select {
case <-getty.GetTimeWheel().After(timeout):
err1 = errHeartbeatReadTimeout
case <-resp.Done:
err1 = resp.Err
}
callBack(err1)
}()
return perrors.WithStack(err)
}
......@@ -68,18 +68,27 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
// Write send the data to server
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
req, ok := pkg.(*remoting.Request)
if !ok {
logger.Errorf("illegal pkg:%+v\n", pkg)
return nil, perrors.New("invalid rpc request")
if ok {
buf, err := (p.client.codec).EncodeRequest(req)
if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
}
buf, err := (p.client.codec).EncodeRequest(req)
if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
res, ok := pkg.(*remoting.Response)
if ok {
buf, err := (p.client.codec).EncodeResponse(res)
if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
}
return buf.Bytes(), nil
logger.Errorf("illegal pkg:%+v\n", pkg)
return nil, perrors.New("invalid rpc request")
}
////////////////////////////////////////////
......@@ -120,16 +129,26 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
// Write send the data to client
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
res, ok := pkg.(*remoting.Response)
if !ok {
logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return nil, perrors.New("invalid rpc response")
if ok {
buf, err := (p.server.codec).EncodeResponse(res)
if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
}
buf, err := (p.server.codec).EncodeResponse(res)
if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
req, ok := pkg.(*remoting.Request)
if ok {
buf, err := (p.server.codec).EncodeRequest(req)
if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
return buf.Bytes(), nil
}
return buf.Bytes(), nil
logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return nil, perrors.New("invalid rpc response")
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment