Skip to content
Snippets Groups Projects
Unverified Commit 083897b4 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

dnservice: add retry txn request at dn node (#4746)

In some cases, TxnRequest is supported to retry on the DN to resolve DNShardNotFound errors due to inconsistent message propagation

Approved by: @reusee
parent 408788e3
No related branches found
No related tags found
No related merge requests found
......@@ -17,12 +17,17 @@ package dnservice
import (
"context"
"fmt"
"time"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
)
const (
defaultRetryInterval = time.Millisecond * 100
)
func (s *store) registerRPCHandlers() {
// request from CN node
s.server.RegisterMethodHandler(txn.TxnMethod_Read, s.handleRead)
......@@ -47,16 +52,25 @@ func (s *store) dispatchLocalRequest(shard metadata.DNShard) rpc.TxnRequestHandl
}
func (s *store) handleRead(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
return s.handleWithRetry(ctx, request, response, s.doRead)
}
func (s *store) handleWrite(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
return s.handleWithRetry(ctx, request, response, s.doWrite)
}
func (s *store) doRead(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
r := s.validDNShard(request, response)
if r == nil {
return nil
}
r.waitStarted()
prepareResponse(request, response)
return r.service.Read(ctx, request, response)
}
func (s *store) handleWrite(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
func (s *store) doWrite(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error {
r := s.validDNShard(request, response)
if r == nil {
return nil
......@@ -148,3 +162,45 @@ func prepareResponse(request *txn.TxnRequest, response *txn.TxnResponse) {
response.Flag = request.Flag
response.RequestID = request.RequestID
}
func (s *store) handleWithRetry(ctx context.Context,
request *txn.TxnRequest,
response *txn.TxnResponse,
delegate rpc.TxnRequestHandleFunc) error {
for {
response.Reset()
err := delegate(ctx, request, response)
if err != nil {
return err
}
if !s.maybeRetry(ctx, request, response) {
return nil
}
}
}
func (s *store) maybeRetry(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) bool {
if request.Options != nil &&
len(request.Options.RetryCodes) == 0 ||
response.TxnError == nil {
return false
}
select {
case <-ctx.Done():
return false
default:
for _, code := range request.Options.RetryCodes {
if code == response.TxnError.Code {
wait := time.Duration(request.Options.RetryInterval)
if wait == 0 {
wait = defaultRetryInterval
}
time.Sleep(wait)
return true
}
}
return false
}
}
......@@ -35,6 +35,37 @@ func TestHandleRead(t *testing.T) {
})
}
func TestHandleReadWithRetry(t *testing.T) {
runDNStoreTest(t, func(s *store) {
req := service.NewTestReadRequest(1, service.NewTestTxn(1, 1, 1), 1)
req.CNRequest.Target.ReplicaID = 2
req.Options = &txn.TxnRequestOptions{
RetryCodes: []txn.ErrorCode{txn.ErrorCode_DNShardNotFound},
}
go func() {
time.Sleep(time.Second)
shard := newTestDNShard(1, 2, 3)
assert.NoError(t, s.StartDNReplica(shard))
}()
assert.NoError(t, s.handleRead(context.Background(), &req, &txn.TxnResponse{}))
})
}
func TestHandleReadWithRetryWithTimeout(t *testing.T) {
runDNStoreTest(t, func(s *store) {
req := service.NewTestReadRequest(1, service.NewTestTxn(1, 1, 1), 1)
req.CNRequest.Target.ReplicaID = 2
req.Options = &txn.TxnRequestOptions{
RetryCodes: []txn.ErrorCode{txn.ErrorCode_DNShardNotFound},
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp := &txn.TxnResponse{}
assert.NoError(t, s.handleRead(ctx, &req, resp))
assert.Equal(t, txn.ErrorCode_DNShardNotFound, resp.TxnError.Code)
})
}
func TestHandleWrite(t *testing.T) {
runDNStoreTest(t, func(s *store) {
shard := newTestDNShard(1, 2, 3)
......
This diff is collapsed.
......@@ -188,6 +188,17 @@ message TxnRequest {
TxnRollbackDNShardRequest RollbackDNShardRequest = 11;
// TxnRemoveMetadataRequest corresponds to TxnMethod.RemoveMetadata
TxnRemoveMetadataRequest RemoveMetadata = 12;
// TxnRequestOptions request options
TxnRequestOptions Options = 13;
}
// TxnRequestOptions txn options
message TxnRequestOptions {
// RetryCodes when DN processes TxnRequest and encounters the specified error, it needs to retry
// on the server side. Only read and write can retry.
repeated ErrorCode RetryCodes = 1;
// RetryInterval retry interval, default is 100ms.
int64 RetryInterval = 2;
}
// TxnResponse response of TxnRequest.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment