Skip to content
Snippets Groups Projects
Commit 7113ecd8 authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge pull request #763 from dubbo-x/read

Fmt: code clean
parent 0887d8d0
No related branches found
No related tags found
No related merge requests found
...@@ -40,7 +40,7 @@ type RemoteConfig struct { ...@@ -40,7 +40,7 @@ type RemoteConfig struct {
TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"` TimeoutStr string `default:"5s" yaml:"timeout" json:"timeout,omitempty"`
Username string `yaml:"username" json:"username,omitempty" property:"username"` Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"` Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"address,omitempty"` Params map[string]string `yaml:"params" json:"params,omitempty"`
} }
// Timeout return timeout duration. // Timeout return timeout duration.
......
...@@ -52,7 +52,6 @@ var ( ...@@ -52,7 +52,6 @@ var (
) )
func init() { func init() {
// load clientconfig from consumer_config // load clientconfig from consumer_config
// default use dubbo // default use dubbo
consumerConfig := config.GetConsumerConfig() consumerConfig := config.GetConsumerConfig()
...@@ -106,8 +105,11 @@ func GetClientConf() ClientConfig { ...@@ -106,8 +105,11 @@ func GetClientConf() ClientConfig {
func setClientGrpool() { func setClientGrpool() {
if clientConf.GrPoolSize > 1 { if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), clientGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber),
)
} }
} }
...@@ -141,7 +143,6 @@ type Client struct { ...@@ -141,7 +143,6 @@ type Client struct {
// NewClient create a new Client. // NewClient create a new Client.
func NewClient(opt Options) *Client { func NewClient(opt Options) *Client {
switch { switch {
case opt.ConnectTimeout == 0: case opt.ConnectTimeout == 0:
opt.ConnectTimeout = 3 * time.Second opt.ConnectTimeout = 3 * time.Second
...@@ -203,7 +204,6 @@ func NewResponse(reply interface{}, atta map[string]interface{}) *Response { ...@@ -203,7 +204,6 @@ func NewResponse(reply interface{}, atta map[string]interface{}) *Response {
// CallOneway call by one way // CallOneway call by one way
func (c *Client) CallOneway(request *Request) error { func (c *Client) CallOneway(request *Request) error {
return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil))
} }
...@@ -311,9 +311,7 @@ func (c *Client) heartbeat(session getty.Session) error { ...@@ -311,9 +311,7 @@ func (c *Client) heartbeat(session getty.Session) error {
return c.transfer(session, nil, NewPendingResponse()) return c.transfer(session, nil, NewPendingResponse())
} }
func (c *Client) transfer(session getty.Session, pkg *DubboPackage, func (c *Client) transfer(session getty.Session, pkg *DubboPackage, rsp *PendingResponse) error {
rsp *PendingResponse) error {
var ( var (
sequence uint64 sequence uint64
err error err error
......
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
import ( import (
"github.com/apache/dubbo-getty" "github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync" "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
...@@ -42,7 +43,6 @@ var ( ...@@ -42,7 +43,6 @@ var (
) )
func init() { func init() {
// load clientconfig from provider_config // load clientconfig from provider_config
// default use dubbo // default use dubbo
providerConfig := config.GetProviderConfig() providerConfig := config.GetProviderConfig()
...@@ -94,8 +94,11 @@ func GetServerConfig() ServerConfig { ...@@ -94,8 +94,11 @@ func GetServerConfig() ServerConfig {
func setServerGrpool() { func setServerGrpool() {
if srvConf.GrPoolSize > 1 { if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen), srvGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber)) gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber),
)
} }
} }
...@@ -108,51 +111,47 @@ type Server struct { ...@@ -108,51 +111,47 @@ type Server struct {
// NewServer create a new Server. // NewServer create a new Server.
func NewServer() *Server { func NewServer() *Server {
return &Server{
s := &Server{ conf: *srvConf,
conf: *srvConf, rpcHandler: NewRpcServerHandler(srvConf.SessionNumber, srvConf.sessionTimeout),
} }
s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout)
return s
} }
func (s *Server) newSession(session getty.Session) error { func (s *Server) newSession(session getty.Session) error {
var ( var (
ok bool ok bool
tcpConn *net.TCPConn tcpConn *net.TCPConn
err error
) )
conf := s.conf conf := s.conf
if conf.GettySessionParam.CompressEncoding { if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip) session.SetCompressType(getty.CompressZip)
} }
if _, ok = session.Conn().(*tls.Conn); ok {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcServerPkgHandler)
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) if _, ok = session.Conn().(*tls.Conn); !ok {
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
if conf.GettySessionParam.TcpKeepAlive { return perrors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) }
if err = tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive); err != nil {
return err
}
if conf.GettySessionParam.TcpKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize); err != nil {
return err
}
} }
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName) session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
...@@ -163,10 +162,8 @@ func (s *Server) newSession(session getty.Session) error { ...@@ -163,10 +162,8 @@ func (s *Server) newSession(session getty.Session) error {
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout) session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("app accepts new session:%s\n", session.Stat()) logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool) session.SetTaskPool(srvGrpool)
return nil return nil
} }
...@@ -184,7 +181,6 @@ func (s *Server) Start(url common.URL) { ...@@ -184,7 +181,6 @@ func (s *Server) Start(url common.URL) {
getty.WithServerSslEnabled(url.GetParamBool(constant.SSL_ENABLED_KEY, false)), getty.WithServerSslEnabled(url.GetParamBool(constant.SSL_ENABLED_KEY, false)),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()), getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()),
) )
} else { } else {
tcpServer = getty.NewTCPServer( tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr), getty.WithLocalAddress(addr),
...@@ -193,7 +189,6 @@ func (s *Server) Start(url common.URL) { ...@@ -193,7 +189,6 @@ func (s *Server) Start(url common.URL) {
tcpServer.RunEventLoop(s.newSession) tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", addr) logger.Debugf("s bind addr{%s} ok!", addr)
s.tcpServer = tcpServer s.tcpServer = tcpServer
} }
// Stop stop dubbo server. // Stop stop dubbo server.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment