Skip to content
Snippets Groups Projects
Unverified Commit 014da98f authored by reusee's avatar reusee Committed by GitHub
Browse files

cnservice: initialize hakeeper client, txn sender, txn client lazily (#4608)

cnservice: initialize hakeeper client, txn sender, txn client lazily

Approved by: @zhangxu19830126, @nnsgmsone
parent 2d9e3003
No related branches found
No related tags found
No related merge requests found
......@@ -12,6 +12,6 @@ service-addresses = [
"2"
]
[engine]
[cn.Engine]
type = "tae"
......@@ -46,16 +46,6 @@ func NewService(cfg *Config, ctx context.Context) (Service, error) {
},
}
if err := srv.initHAKeeperClient(); err != nil {
return nil, err
}
if err := srv.initTxnSender(); err != nil {
return nil, err
}
if err := srv.initTxnClient(); err != nil {
return nil, err
}
server, err := morpc.NewRPCServer("cn-server", cfg.ListenAddress,
morpc.NewMessageCodec(srv.acquireMessage, 16<<20),
morpc.WithServerGoettyOptions(goetty.WithSessionRWBUfferSize(1<<20, 1<<20)))
......@@ -143,13 +133,21 @@ func (s *service) initEngine(
//TODO
case EngineMemory:
pu.TxnClient = s.txnClient
client, err := s.getTxnClient()
if err != nil {
return err
}
pu.TxnClient = client
hakeeper, err := s.getHAKeeperClient()
if err != nil {
return err
}
pu.StorageEngine = txnengine.New(
ctx,
new(txnengine.ShardToSingleStatic), //TODO use hashing shard policy
txnengine.GetClusterDetailsFromHAKeeper(
ctx,
s.hakeeperClient,
hakeeper,
),
)
......@@ -202,31 +200,45 @@ func (s *service) serverShutdown(isgraceful bool) error {
return s.mo.Stop()
}
func (s *service) initHAKeeperClient() error {
ctx, cancel := context.WithTimeout(
context.Background(),
s.cfg.HAKeeper.DiscoveryTimeout.Duration,
)
defer cancel()
client, err := logservice.NewCNHAKeeperClient(ctx, s.cfg.HAKeeper.ClientConfig)
if err != nil {
return err
}
s.hakeeperClient = client
return nil
func (s *service) getHAKeeperClient() (client logservice.CNHAKeeperClient, err error) {
s.initHakeeperClientOnce.Do(func() {
ctx, cancel := context.WithTimeout(
context.Background(),
s.cfg.HAKeeper.DiscoveryTimeout.Duration,
)
defer cancel()
client, err = logservice.NewCNHAKeeperClient(ctx, s.cfg.HAKeeper.ClientConfig)
if err != nil {
return
}
s._hakeeperClient = client
})
client = s._hakeeperClient
return
}
func (s *service) initTxnSender() error {
sender, err := rpc.NewSender(s.logger) //TODO options
if err != nil {
return err
}
s.txnSender = sender
return nil
func (s *service) getTxnSender() (sender rpc.TxnSender, err error) {
s.initTxnSenderOnce.Do(func() {
sender, err = rpc.NewSender(s.logger) //TODO options
if err != nil {
return
}
s._txnSender = sender
})
sender = s._txnSender
return
}
func (s *service) initTxnClient() error {
txnClient := client.NewTxnClient(s.txnSender) //TODO options
s.txnClient = txnClient
return nil
func (s *service) getTxnClient() (c client.TxnClient, err error) {
s.initTxnClientOnce.Do(func() {
var sender rpc.TxnSender
sender, err = s.getTxnSender()
if err != nil {
return
}
c = client.NewTxnClient(sender) //TODO options
s._txnClient = c
})
c = s._txnClient
return
}
......@@ -91,13 +91,16 @@ type Config struct {
}
type service struct {
cfg *Config
pool *sync.Pool
logger *zap.Logger
server morpc.RPCServer
cancelMoServerFunc context.CancelFunc
mo *frontend.MOServer
hakeeperClient logservice.CNHAKeeperClient
txnSender rpc.TxnSender
txnClient client.TxnClient
cfg *Config
pool *sync.Pool
logger *zap.Logger
server morpc.RPCServer
cancelMoServerFunc context.CancelFunc
mo *frontend.MOServer
initHakeeperClientOnce sync.Once
_hakeeperClient logservice.CNHAKeeperClient
initTxnSenderOnce sync.Once
_txnSender rpc.TxnSender
initTxnClientOnce sync.Once
_txnClient client.TxnClient
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment