diff --git a/pkg/logservice/client.go b/pkg/logservice/client.go index a3e7b3c50fb34003ad20cebf32ebc38f1dbad1b0..b045b4d8d7eb96b6d2e72105817273daab38090b 100644 --- a/pkg/logservice/client.go +++ b/pkg/logservice/client.go @@ -19,6 +19,9 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" + "github.com/lni/dragonboat/v4" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/morpc" pb "github.com/matrixorigin/matrixone/pkg/pb/logservice" @@ -101,6 +104,153 @@ type Client interface { GetTSOTimestamp(ctx context.Context, count uint64) (uint64, error) } +type managedClient struct { + cfg ClientConfig + client *client +} + +var _ Client = (*managedClient)(nil) + +// NewClient creates a Log Service client. Each returned client can be used +// to synchronously issue requests to the Log Service. To send multiple requests +// to the Log Service in parallel, multiple clients should be created and used +// to do so. +func NewClient(ctx context.Context, cfg ClientConfig) (Client, error) { + client, err := newClient(ctx, cfg) + if err != nil { + return nil, err + } + return &managedClient{cfg: cfg, client: client}, nil +} + +func (c *managedClient) Close() error { + if c.client != nil { + return c.client.close() + } + return nil +} + +func (c *managedClient) Config() ClientConfig { + return c.cfg +} + +func (c *managedClient) GetLogRecord(payloadLength int) pb.LogRecord { + data := make([]byte, headerSize+8+payloadLength) + binaryEnc.PutUint32(data, uint32(pb.UserEntryUpdate)) + binaryEnc.PutUint64(data[headerSize:], c.cfg.DNReplicaID) + return pb.LogRecord{Data: data} +} + +func (c *managedClient) Append(ctx context.Context, rec pb.LogRecord) (Lsn, error) { + for { + if err := c.prepareClient(ctx); err != nil { + return 0, err + } + v, err := c.client.append(ctx, rec) + if err != nil { + c.resetClient() + } + if c.isRetryableError(err) { + continue + } + return v, err + } +} + +func (c *managedClient) Read(ctx context.Context, + firstLsn Lsn, maxSize uint64) ([]pb.LogRecord, Lsn, error) { + for { + if err := c.prepareClient(ctx); err != nil { + return nil, 0, err + } + recs, v, err := c.client.read(ctx, firstLsn, maxSize) + if err != nil { + c.resetClient() + } + if c.isRetryableError(err) { + continue + } + return recs, v, err + } +} + +func (c *managedClient) Truncate(ctx context.Context, lsn Lsn) error { + for { + if err := c.prepareClient(ctx); err != nil { + return err + } + err := c.client.truncate(ctx, lsn) + if err != nil { + c.resetClient() + } + if c.isRetryableError(err) { + continue + } + return err + } +} + +func (c *managedClient) GetTruncatedLsn(ctx context.Context) (Lsn, error) { + for { + if err := c.prepareClient(ctx); err != nil { + return 0, err + } + v, err := c.client.getTruncatedLsn(ctx) + if err != nil { + c.resetClient() + } + if c.isRetryableError(err) { + continue + } + return v, err + } +} + +func (c *managedClient) GetTSOTimestamp(ctx context.Context, count uint64) (uint64, error) { + for { + if err := c.prepareClient(ctx); err != nil { + return 0, err + } + v, err := c.client.getTSOTimestamp(ctx, count) + if err != nil { + c.resetClient() + } + if c.isRetryableError(err) { + continue + } + return v, err + } +} + +func (c *managedClient) isRetryableError(err error) bool { + if errors.Is(err, dragonboat.ErrTimeout) { + return false + } + return errors.Is(err, dragonboat.ErrShardNotFound) +} + +func (c *managedClient) resetClient() { + if c.client != nil { + cc := c.client + c.client = nil + if err := cc.close(); err != nil { + plog.Errorf("failed to close client %v", err) + } + } +} + +func (c *managedClient) prepareClient(ctx context.Context) error { + if c.client != nil { + return nil + } + cc, err := newClient(ctx, c.cfg) + if err != nil { + return err + } + c.client = cc + return nil +} + type client struct { cfg ClientConfig client morpc.RPCClient @@ -109,14 +259,8 @@ type client struct { respPool *sync.Pool } -var _ Client = (*client)(nil) - -// NewClient creates a Log Service client. Each returned client can be used -// to synchronously issue requests to the Log Service. To send multiple requests -// to the Log Service in parallel, multiple clients should be created and used -// to do so. -func NewClient(ctx context.Context, - cfg ClientConfig) (Client, error) { +func newClient(ctx context.Context, + cfg ClientConfig) (*client, error) { pool := &sync.Pool{} pool.New = func() interface{} { return &RPCRequest{pool: pool} @@ -143,6 +287,9 @@ func NewClient(ctx context.Context, if err := c.connectReadOnly(ctx); err == nil { return c, nil } else { + if err := c.close(); err != nil { + plog.Errorf("failed to close the client %v", err) + } e = err } } else { @@ -151,6 +298,9 @@ func NewClient(ctx context.Context, if err := c.connectReadWrite(ctx); err == nil { return c, nil } else { + if err := c.close(); err != nil { + plog.Errorf("failed to close the client %v", err) + } e = err } } @@ -158,46 +308,35 @@ func NewClient(ctx context.Context, return nil, e } -func (c *client) Close() error { +func (c *client) close() error { return c.client.Close() } -func (c *client) Config() ClientConfig { - return c.cfg -} - -func (c *client) GetLogRecord(payloadLength int) pb.LogRecord { - data := make([]byte, headerSize+8+payloadLength) - binaryEnc.PutUint32(data, uint32(pb.UserEntryUpdate)) - binaryEnc.PutUint64(data[headerSize:], c.cfg.DNReplicaID) - return pb.LogRecord{Data: data} -} - -func (c *client) Append(ctx context.Context, rec pb.LogRecord) (Lsn, error) { +func (c *client) append(ctx context.Context, rec pb.LogRecord) (Lsn, error) { if c.readOnly() { return 0, ErrIncompatibleClient } // TODO: check piggybacked hint on whether we are connected to the leader node - return c.append(ctx, rec) + return c.doAppend(ctx, rec) } -func (c *client) Read(ctx context.Context, +func (c *client) read(ctx context.Context, firstLsn Lsn, maxSize uint64) ([]pb.LogRecord, Lsn, error) { - return c.read(ctx, firstLsn, maxSize) + return c.doRead(ctx, firstLsn, maxSize) } -func (c *client) Truncate(ctx context.Context, lsn Lsn) error { +func (c *client) truncate(ctx context.Context, lsn Lsn) error { if c.readOnly() { return ErrIncompatibleClient } - return c.truncate(ctx, lsn) + return c.doTruncate(ctx, lsn) } -func (c *client) GetTruncatedLsn(ctx context.Context) (Lsn, error) { - return c.getTruncatedLsn(ctx) +func (c *client) getTruncatedLsn(ctx context.Context) (Lsn, error) { + return c.doGetTruncatedLsn(ctx) } -func (c *client) GetTSOTimestamp(ctx context.Context, count uint64) (uint64, error) { +func (c *client) getTSOTimestamp(ctx context.Context, count uint64) (uint64, error) { return c.tsoRequest(ctx, count) } @@ -305,7 +444,7 @@ func (c *client) connect(ctx context.Context, mt pb.MethodType) error { return err } -func (c *client) append(ctx context.Context, rec pb.LogRecord) (Lsn, error) { +func (c *client) doAppend(ctx context.Context, rec pb.LogRecord) (Lsn, error) { resp, _, err := c.request(ctx, pb.APPEND, rec.Data, 0, 0) if err != nil { return 0, err @@ -313,7 +452,7 @@ func (c *client) append(ctx context.Context, rec pb.LogRecord) (Lsn, error) { return resp.LogResponse.Lsn, nil } -func (c *client) read(ctx context.Context, +func (c *client) doRead(ctx context.Context, firstLsn Lsn, maxSize uint64) ([]pb.LogRecord, Lsn, error) { resp, recs, err := c.request(ctx, pb.READ, nil, firstLsn, maxSize) if err != nil { @@ -322,12 +461,12 @@ func (c *client) read(ctx context.Context, return recs, resp.LogResponse.LastLsn, nil } -func (c *client) truncate(ctx context.Context, lsn Lsn) error { +func (c *client) doTruncate(ctx context.Context, lsn Lsn) error { _, _, err := c.request(ctx, pb.TRUNCATE, nil, lsn, 0) return err } -func (c *client) getTruncatedLsn(ctx context.Context) (Lsn, error) { +func (c *client) doGetTruncatedLsn(ctx context.Context) (Lsn, error) { resp, _, err := c.request(ctx, pb.GET_TRUNCATE, nil, 0, 0) if err != nil { return 0, err diff --git a/pkg/logservice/client_test.go b/pkg/logservice/client_test.go index afb3b906680df038eca38f6dfb4bceba3c796f81..25aab7449fc6e51336e3b7aa0a2588d3e8003236 100644 --- a/pkg/logservice/client_test.go +++ b/pkg/logservice/client_test.go @@ -63,6 +63,29 @@ func runClientTest(t *testing.T, fn(t, service, scfg, c) } +func TestClientCanBeReset(t *testing.T) { + fn := func(t *testing.T, s *Service, cfg ClientConfig, c Client) { + client := c.(*managedClient) + client.resetClient() + assert.Nil(t, client.client) + } + runClientTest(t, false, fn) +} + +func TestPrepareClient(t *testing.T) { + fn := func(t *testing.T, s *Service, cfg ClientConfig, c Client) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + client := c.(*managedClient) + assert.NoError(t, client.prepareClient(ctx)) + client.resetClient() + assert.Nil(t, client.client) + assert.NoError(t, client.prepareClient(ctx)) + assert.NotNil(t, client.client) + } + runClientTest(t, false, fn) +} + func TestLogShardNotFoundErrorIsConsideredAsTempError(t *testing.T) { fn := func(t *testing.T, s *Service, cfg ClientConfig, c Client) { require.NoError(t, s.store.stopReplica(1, 2)) @@ -71,6 +94,8 @@ func TestLogShardNotFoundErrorIsConsideredAsTempError(t *testing.T) { _, err := c.GetTSOTimestamp(ctx, 100) require.Equal(t, dragonboat.ErrShardNotFound, err) assert.True(t, isTempError(err)) + client := c.(*managedClient) + assert.True(t, client.isRetryableError(err)) } runClientTest(t, false, fn) } diff --git a/pkg/logservice/errors.go b/pkg/logservice/errors.go index 7fbc2ca9fcc1ef03d106847739d445ed8083ae10..d298f408cced03f179df89568e80e09c13d81636 100644 --- a/pkg/logservice/errors.go +++ b/pkg/logservice/errors.go @@ -39,7 +39,6 @@ var errorToCodeMappings = getErrorToCodeMapping() func getErrorToCodeMapping() []errorToCode { return []errorToCode{ {dragonboat.ErrTimeout, pb.Timeout, true}, - {dragonboat.ErrShardNotFound, pb.InvalidShard, true}, // TODO: why ErrTimeoutTooSmall is possible {dragonboat.ErrTimeoutTooSmall, pb.Timeout, false}, {dragonboat.ErrPayloadTooBig, pb.InvalidPayloadSize, true}, diff --git a/pkg/logservice/hakeeper_client.go b/pkg/logservice/hakeeper_client.go index 4ac5043b9ffbdbae03ac8e1452fcf7a4a2aeb7b1..ab05cbfa2e1d694f6557b602f76fb0cd8cb38438 100644 --- a/pkg/logservice/hakeeper_client.go +++ b/pkg/logservice/hakeeper_client.go @@ -116,7 +116,7 @@ func (c *managedHAKeeperClient) GetClusterDetails(ctx context.Context) (pb.Clust if err != nil { c.resetClient() } - if c.isInternalError(err) { + if c.isRetryableError(err) { continue } return cd, err @@ -133,7 +133,7 @@ func (c *managedHAKeeperClient) SendCNHeartbeat(ctx context.Context, if err != nil { c.resetClient() } - if c.isInternalError(err) { + if c.isRetryableError(err) { continue } return err @@ -150,7 +150,7 @@ func (c *managedHAKeeperClient) SendDNHeartbeat(ctx context.Context, if err != nil { c.resetClient() } - if c.isInternalError(err) { + if c.isRetryableError(err) { continue } return cb, err @@ -167,14 +167,14 @@ func (c *managedHAKeeperClient) SendLogHeartbeat(ctx context.Context, if err != nil { c.resetClient() } - if c.isInternalError(err) { + if c.isRetryableError(err) { continue } return cb, err } } -func (c *managedHAKeeperClient) isInternalError(err error) bool { +func (c *managedHAKeeperClient) isRetryableError(err error) bool { return errors.Is(err, ErrNotHAKeeper) }