Skip to content
Snippets Groups Projects
Unverified Commit 83e744b8 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

dnservice: retry start dnshard (#4637)

Retry start DNShard if failed

Approved by: @reusee
parent c987dd7a
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/txn/storage"
"github.com/matrixorigin/matrixone/pkg/txn/storage/mem"
taestorage "github.com/matrixorigin/matrixone/pkg/txn/storage/tae"
"go.uber.org/zap"
)
const (
......@@ -63,15 +64,28 @@ func (s *store) createTxnStorage(shard metadata.DNShard) (storage.TxnStorage, er
if err != nil {
return nil, err
}
closeLogClient := func() {
if err := logClient.Close(); err != nil {
s.logger.Error("close log client failed",
zap.Error(err))
}
}
switch s.cfg.Txn.Storage.Backend {
case memStorageBackend:
return s.newMemTxnStorage(shard, logClient)
ts, err := s.newMemTxnStorage(shard, logClient)
if err != nil {
closeLogClient()
return nil, err
}
return ts, nil
case taeStorageBackend:
return s.newTAEStorage(shard, logClient)
ts, err := s.newTAEStorage(shard, logClient)
if err != nil {
closeLogClient()
return nil, err
}
return ts, nil
default:
return nil, fmt.Errorf("not implment for %s", s.cfg.Txn.Storage.Backend)
}
......
......@@ -35,6 +35,10 @@ import (
"go.uber.org/zap"
)
var (
retryCreateStorageInterval = time.Second * 5
)
// WithLogger set logger
func WithLogger(logger *zap.Logger) Option {
return func(s *store) {
......@@ -268,24 +272,31 @@ func (s *store) createReplica(shard metadata.DNShard) error {
return nil
}
storage, err := s.createTxnStorage(shard)
if err != nil {
return err
}
err = s.stopper.RunTask(func(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
err := r.start(service.NewTxnService(r.logger,
shard,
storage,
s.sender,
s.clock,
s.cfg.Txn.ZombieTimeout.Duration))
if err != nil {
r.logger.Fatal("start DNShard failed",
zap.Error(err))
err := s.stopper.RunTask(func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
storage, err := s.createTxnStorage(shard)
if err != nil {
r.logger.Error("start DNShard failed",
zap.Error(err))
time.Sleep(retryCreateStorageInterval)
continue
}
err = r.start(service.NewTxnService(r.logger,
shard,
storage,
s.sender,
s.clock,
s.cfg.Txn.ZombieTimeout.Duration))
if err != nil {
r.logger.Fatal("start DNShard failed",
zap.Error(err))
}
return
}
}
})
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment