Skip to content
Snippets Groups Projects
Unverified Commit 3e490fcd authored by GuokeCui's avatar GuokeCui Committed by GitHub
Browse files

Implement network partition feature for service test framework (#4596)

- Implement network partition feature for service test framework
- Maintain `morpc.Client` options within `managedHAKeeperClient`
- Rename `pkg/tests/service/address.go` as `pkg/tests/service/network.go`, the same as its unit test file

Approved by: @zhangxu19830126, @lni
parent 9a9246c7
No related branches found
No related tags found
No related merge requests found
......@@ -119,7 +119,7 @@ type Config struct {
}
}
func (c *Config) validate() error {
func (c *Config) Validate() error {
if c.UUID == "" {
return fmt.Errorf("Config.UUID not set")
}
......
......@@ -23,9 +23,9 @@ import (
func TestValidate(t *testing.T) {
c := &Config{}
assert.Error(t, c.validate())
assert.Error(t, c.Validate())
c.UUID = "dn1"
assert.NoError(t, c.validate())
assert.NoError(t, c.Validate())
assert.Equal(t, defaultListenAddress, c.ListenAddress)
assert.Equal(t, c.ListenAddress, defaultListenAddress)
......
......@@ -52,7 +52,7 @@ func WithConfigAdjust(adjustConfigFunc func(c *Config)) Option {
}
// WithBackendFilter set filtering txn.TxnRequest sent to other DNShard
func WithBackendFilter(filter func(*txn.TxnRequest, string) bool) Option {
func WithBackendFilter(filter func(morpc.Message, string) bool) Option {
return func(s *store) {
s.options.backendFilter = filter
}
......@@ -88,7 +88,7 @@ type store struct {
options struct {
logServiceClientFactory func(metadata.DNShard) (logservice.Client, error)
hakeekerClientFactory func() (logservice.DNHAKeeperClient, error)
backendFilter func(req *txn.TxnRequest, backendAddr string) bool
backendFilter func(msg morpc.Message, backendAddr string) bool
adjustConfigFunc func(c *Config)
}
......@@ -102,7 +102,7 @@ type store struct {
func NewService(cfg *Config,
fsFactory fileservice.FileServiceFactory,
opts ...Option) (Service, error) {
if err := cfg.validate(); err != nil {
if err := cfg.Validate(); err != nil {
return nil, err
}
......
......@@ -70,6 +70,8 @@ var _ DNHAKeeperClient = (*managedHAKeeperClient)(nil)
var _ LogHAKeeperClient = (*managedHAKeeperClient)(nil)
// NewCNHAKeeperClient creates a HAKeeper client to be used by a CN node.
//
// NB: caller could specify options for morpc.Client via ctx.
func NewCNHAKeeperClient(ctx context.Context,
cfg HAKeeperClientConfig) (CNHAKeeperClient, error) {
if err := cfg.Validate(); err != nil {
......@@ -79,6 +81,8 @@ func NewCNHAKeeperClient(ctx context.Context,
}
// NewDNHAKeeperClient creates a HAKeeper client to be used by a DN node.
//
// NB: caller could specify options for morpc.Client via ctx.
func NewDNHAKeeperClient(ctx context.Context,
cfg HAKeeperClientConfig) (DNHAKeeperClient, error) {
if err := cfg.Validate(); err != nil {
......@@ -88,6 +92,8 @@ func NewDNHAKeeperClient(ctx context.Context,
}
// NewLogHAKeeperClient creates a HAKeeper client to be used by a Log Service node.
//
// NB: caller could specify options for morpc.Client via ctx.
func NewLogHAKeeperClient(ctx context.Context,
cfg HAKeeperClientConfig) (LogHAKeeperClient, error) {
if err := cfg.Validate(); err != nil {
......@@ -102,12 +108,23 @@ func newManagedHAKeeperClient(ctx context.Context,
if err != nil {
return nil, err
}
return &managedHAKeeperClient{client: c, cfg: cfg}, nil
return &managedHAKeeperClient{
client: c,
cfg: cfg,
backendOptions: GetBackendOptions(ctx),
clientOptions: GetClientOptions(ctx),
}, nil
}
type managedHAKeeperClient struct {
cfg HAKeeperClientConfig
client *hakeeperClient
// Method `prepareClient` may update moprc.Client.
// So we need to keep options for morpc.Client.
backendOptions []morpc.BackendOption
clientOptions []morpc.ClientOption
}
func (c *managedHAKeeperClient) Close() error {
......@@ -202,6 +219,11 @@ func (c *managedHAKeeperClient) prepareClient(ctx context.Context) error {
if c.client != nil {
return nil
}
// we must use the recoreded options for morpc.Client
ctx = SetBackendOptions(ctx, c.backendOptions...)
ctx = SetClientOptions(ctx, c.clientOptions...)
cc, err := newHAKeeperClient(ctx, c.cfg)
if err != nil {
return err
......
......@@ -15,12 +15,16 @@
package service
import (
"context"
"fmt"
"sync"
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/dnservice"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
)
......@@ -138,8 +142,8 @@ func newDNService(
// buildDnConfig builds configuration for a dn service.
func buildDnConfig(
index int, opt Options, address serviceAddress,
) (*dnservice.Config, []dnservice.Option) {
index int, opt Options, address serviceAddresses,
) *dnservice.Config {
cfg := &dnservice.Config{
UUID: uuid.New().String(),
ListenAddress: address.getDnListenAddress(index),
......@@ -149,5 +153,59 @@ func buildDnConfig(
// FIXME: support different storage, consult @reusee
cfg.Txn.Storage.Backend = opt.dn.txnStorageBackend
return cfg, nil
// We need the filled version of configuration.
// It's necessary when building dnservice.Option.
if err := cfg.Validate(); err != nil {
panic(fmt.Sprintf("fatal when building dnservice.Config: %s", err))
}
return cfg
}
// buildDnOptions builds options for a dn service.
//
// NB: We need the filled version of dnservice.Config.
func buildDnOptions(cfg *dnservice.Config, filter FilterFunc) dnOptions {
// factory to construct client for hakeeper
hakeeperClientFactory := func() (logservice.DNHAKeeperClient, error) {
ctx, cancel := context.WithTimeout(
context.Background(), cfg.HAKeeper.DiscoveryTimeout.Duration,
)
defer cancel()
// transfer morpc.BackendOption via context
ctx = logservice.SetBackendOptions(ctx, morpc.WithBackendFilter(filter))
client, err := logservice.NewDNHAKeeperClient(
ctx, cfg.HAKeeper.ClientConfig,
)
if err != nil {
return nil, err
}
return client, nil
}
// factory to construct client for log service
logServiceClientFactory := func(shard metadata.DNShard) (logservice.Client, error) {
ctx, cancel := context.WithTimeout(
context.Background(), cfg.LogService.ConnectTimeout.Duration,
)
defer cancel()
// transfer morpc.BackendOption via context
ctx = logservice.SetBackendOptions(ctx, morpc.WithBackendFilter(filter))
return logservice.NewClient(ctx, logservice.ClientConfig{
ReadOnly: false,
LogShardID: shard.LogShardID,
DNReplicaID: shard.ReplicaID,
ServiceAddresses: cfg.HAKeeper.ClientConfig.ServiceAddresses,
})
}
return []dnservice.Option{
dnservice.WithHAKeeperClientFactory(hakeeperClientFactory),
dnservice.WithLogServiceClientFactory(logServiceClientFactory),
dnservice.WithBackendFilter(filter),
}
}
......@@ -130,9 +130,14 @@ func (ls *logService) StartHAKeeperReplica(
return ls.svc.StartHAKeeperReplica(replicaID, initialReplicas, join)
}
// logOptions is options for a log service.
type logOptions []logservice.Option
// newLogService constructs an instance of `LogService`.
func newLogService(cfg logservice.Config) (LogService, error) {
svc, err := logservice.NewWrappedService(cfg)
func newLogService(
cfg logservice.Config, opts logOptions,
) (LogService, error) {
svc, err := logservice.NewWrappedService(cfg, opts...)
if err != nil {
return nil, err
}
......@@ -140,7 +145,9 @@ func newLogService(cfg logservice.Config) (LogService, error) {
}
// buildLogConfig builds configuration for a log service.
func buildLogConfig(index int, opt Options, address serviceAddress) logservice.Config {
func buildLogConfig(
index int, opt Options, address serviceAddresses,
) logservice.Config {
cfg := logservice.Config{
UUID: uuid.New().String(),
FS: vfs.NewStrictMem(),
......@@ -155,7 +162,6 @@ func buildLogConfig(index int, opt Options, address serviceAddress) logservice.C
cfg.HeartbeatInterval.Duration = opt.log.heartbeatInterval
cfg.HAKeeperCheckInterval.Duration = opt.hakeeper.checkInterval
cfg.HAKeeperClientConfig.ServiceAddresses = address.listHAKeeperListenAddresses()
// setting hakeeper configuration
cfg.HAKeeperConfig.TickPerSecond = opt.hakeeper.tickPerSecond
cfg.HAKeeperConfig.LogStoreTimeout.Duration = opt.hakeeper.logStoreTimeout
......@@ -163,9 +169,19 @@ func buildLogConfig(index int, opt Options, address serviceAddress) logservice.C
// we must invoke Fill in order to setting default configruation value.
cfg.Fill()
return cfg
}
// buildLogOptions builds options for a log service.
//
// NB: We need the filled version of logservice.Config.
func buildLogOptions(cfg logservice.Config, filter FilterFunc) logOptions {
return []logservice.Option{
logservice.WithBackendFilter(filter),
}
}
// buildLogDataDir generates data directory for a log service.
func buildLogDataDir(root string, index int) string {
return path.Join(root, "log", strconv.Itoa(index))
......
......@@ -20,6 +20,7 @@ import (
"sync"
"testing"
"github.com/RoaringBitmap/roaring"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
......@@ -30,8 +31,8 @@ var (
curPortMu sync.Mutex
)
// serviceAddress contains addresses of all services.
type serviceAddress struct {
// serviceAddresses contains addresses of all services.
type serviceAddresses struct {
t *testing.T
// Construct service addresses according to service number
......@@ -42,11 +43,11 @@ type serviceAddress struct {
dnAddresses []dnServiceAddress
}
// newServiceAddress constructs addresses for all services.
func newServiceAddress(
// newServiceAddresses constructs addresses for all services.
func newServiceAddresses(
t *testing.T, logServiceNum, dnServiceNum int, hostAddr string,
) serviceAddress {
address := serviceAddress{
) serviceAddresses {
address := serviceAddresses{
t: t,
logServiceNum: logServiceNum,
dnServiceNum: dnServiceNum,
......@@ -76,17 +77,17 @@ func newServiceAddress(
}
// assertDNService asserts constructed address for dn service.
func (a serviceAddress) assertDNService() {
func (a serviceAddresses) assertDNService() {
assert.Equal(a.t, a.dnServiceNum, len(a.dnAddresses))
}
// assertLogService asserts constructed address for log service.
func (a serviceAddress) assertLogService() {
func (a serviceAddresses) assertLogService() {
assert.Equal(a.t, a.logServiceNum, len(a.logAddresses))
}
// getDnListenAddress gets dn service address by its index.
func (a serviceAddress) getDnListenAddress(index int) string {
func (a serviceAddresses) getDnListenAddress(index int) string {
a.assertDNService()
if index >= len(a.dnAddresses) || index < 0 {
......@@ -96,7 +97,7 @@ func (a serviceAddress) getDnListenAddress(index int) string {
}
// getLogListenAddress gets log service address by its index.
func (a serviceAddress) getLogListenAddress(index int) string {
func (a serviceAddresses) getLogListenAddress(index int) string {
a.assertLogService()
if index >= len(a.logAddresses) || index < 0 {
......@@ -106,7 +107,7 @@ func (a serviceAddress) getLogListenAddress(index int) string {
}
// getLogRaftAddress gets log raft address by its index.
func (a serviceAddress) getLogRaftAddress(index int) string {
func (a serviceAddresses) getLogRaftAddress(index int) string {
a.assertLogService()
if index >= len(a.logAddresses) || index < 0 {
......@@ -116,7 +117,7 @@ func (a serviceAddress) getLogRaftAddress(index int) string {
}
// getLogGossipAddress gets log gossip address by its index.
func (a serviceAddress) getLogGossipAddress(index int) string {
func (a serviceAddresses) getLogGossipAddress(index int) string {
a.assertLogService()
if index >= len(a.logAddresses) || index < 0 {
......@@ -130,7 +131,7 @@ func (a serviceAddress) getLogGossipAddress(index int) string {
// Select gossip addresses of the first 3 log services.
// If the number of log services was less than 3,
// then select all of them.
func (a serviceAddress) getLogGossipSeedAddresses() []string {
func (a serviceAddresses) getLogGossipSeedAddresses() []string {
a.assertLogService()
n := gossipSeedNum(len(a.logAddresses))
......@@ -146,7 +147,7 @@ func (a serviceAddress) getLogGossipSeedAddresses() []string {
// Select the first 3 log services to start hakeeper replica.
// If the number of log services was less than 3,
// then select the first of them.
func (a serviceAddress) listHAKeeperListenAddresses() []string {
func (a serviceAddresses) listHAKeeperListenAddresses() []string {
a.assertLogService()
n := haKeeperNum(len(a.logAddresses))
......@@ -157,6 +158,49 @@ func (a serviceAddress) listHAKeeperListenAddresses() []string {
return listenAddrs
}
// buildPartitionAddressSets returns service addresses by every partition.
func (a serviceAddresses) buildPartitionAddressSets(partitions ...NetworkPartition) []addressSet {
sets := make([]addressSet, 0, len(partitions))
for _, part := range partitions {
sets = append(sets, a.listPartitionAddresses(part))
}
return sets
}
// listPartitionAddresses returns all service addresses within the same partition.
func (a serviceAddresses) listPartitionAddresses(partition NetworkPartition) addressSet {
addrSet := newAddressSet()
for _, dnIndex := range partition.ListDNServiceIndex() {
addrs := a.listDnServiceAddresses(int(dnIndex))
addrSet.addAddresses(addrs...)
}
for _, logIndex := range partition.ListLogServiceIndex() {
addrs := a.listLogServiceAddresses(int(logIndex))
addrSet.addAddresses(addrs...)
}
return addrSet
}
// listDnServiceAddresses lists all addresses of dn service by its index.
func (a serviceAddresses) listDnServiceAddresses(index int) []string {
a.assertDNService()
if index >= len(a.dnAddresses) || index < 0 {
return nil
}
return a.dnAddresses[index].listAddresses()
}
// listLogServiceAddresses lists all addresses of log service by its index.
func (a serviceAddresses) listLogServiceAddresses(index int) []string {
a.assertLogService()
if index >= len(a.logAddresses) || index < 0 {
return nil
}
return a.logAddresses[index].listAddresses()
}
// logServiceAddress contains addresses for log service.
type logServiceAddress struct {
listenAddr string
......@@ -177,6 +221,11 @@ func newLogServiceAddress(host string) (logServiceAddress, error) {
}, nil
}
// listAddresses returns all addresses for single log service.
func (la logServiceAddress) listAddresses() []string {
return []string{la.listenAddr, la.raftAddr, la.gossipAddr}
}
// dnServiceAddress contains address for dn service.
type dnServiceAddress struct {
listenAddr string
......@@ -192,6 +241,11 @@ func newDNServiceAddress(host string) (dnServiceAddress, error) {
}, nil
}
// listAddresses returns all addresses for single dn service.
func (da dnServiceAddress) listAddresses() []string {
return []string{da.listenAddr}
}
// getAddressBatch generates service addresses by batch.
func getAddressBatch(host string, batch int) ([]string, error) {
addrs := make([]string, batch)
......@@ -229,3 +283,100 @@ func getAvailablePort(host string) (string, error) {
}
return strconv.Itoa(port), nil
}
// addressSet records addresses for services within the same partition.
type addressSet map[string]struct{}
func newAddressSet() addressSet {
return make(map[string]struct{})
}
// addAddresses registers a list of addresses.
func (s addressSet) addAddresses(addrs ...string) {
for _, addr := range addrs {
s[addr] = struct{}{}
}
}
// contains checks address exist or not.
func (s addressSet) contains(addr string) bool {
_, ok := s[addr]
return ok
}
// NetworkPartition records index of services from the same network partition.
type NetworkPartition struct {
logIndexSet *roaring.Bitmap
dnIndexSet *roaring.Bitmap
}
// newNetworkPartition returns an instance of NetworkPartition.
//
// The returned instance only contains valid index according to service number.
func newNetworkPartition(
logServiceNum int, logIndexes []uint32,
dnServiceNum int, dnIndexes []uint32,
) NetworkPartition {
logTotal := roaring.FlipInt(roaring.NewBitmap(), 0, logServiceNum)
dnTotal := roaring.FlipInt(roaring.NewBitmap(), 0, dnServiceNum)
rawLogSet := roaring.BitmapOf(logIndexes...)
rawDnSet := roaring.BitmapOf(dnIndexes...)
return NetworkPartition{
logIndexSet: roaring.And(logTotal, rawLogSet),
dnIndexSet: roaring.And(dnTotal, rawDnSet),
}
}
// remainingNetworkPartition returns partition for the remaining services.
func remainingNetworkPartition(
logServiceNum, dnServiceNum int, partitions ...NetworkPartition,
) NetworkPartition {
logTotal := roaring.FlipInt(roaring.NewBitmap(), 0, logServiceNum)
dnTotal := roaring.FlipInt(roaring.NewBitmap(), 0, dnServiceNum)
logUsed := roaring.NewBitmap()
dnUsed := roaring.NewBitmap()
for _, p := range partitions {
dnUsed.Or(p.dnIndexSet)
logUsed.Or(p.logIndexSet)
}
return NetworkPartition{
logIndexSet: roaring.AndNot(logTotal, logUsed),
dnIndexSet: roaring.AndNot(dnTotal, dnUsed),
}
}
// ListDNServiceIndex lists index of all dn services in the partition.
func (p NetworkPartition) ListDNServiceIndex() []uint32 {
set := p.dnIndexSet
if set.GetCardinality() == 0 {
return nil
}
indexes := make([]uint32, 0, set.GetCardinality())
iter := set.Iterator()
for iter.HasNext() {
indexes = append(indexes, iter.Next())
}
return indexes
}
// ListLogServiceIndex lists index of all log services in the partition.
func (p NetworkPartition) ListLogServiceIndex() []uint32 {
set := p.logIndexSet
if set.GetCardinality() == 0 {
return nil
}
indexes := make([]uint32, 0, set.GetCardinality())
iter := set.Iterator()
for iter.HasNext() {
indexes = append(indexes, iter.Next())
}
return indexes
}
......@@ -37,14 +37,69 @@ func TestGetAvailablePort(t *testing.T) {
}
func TestServiceAddress(t *testing.T) {
address := newServiceAddress(t, 3, 3, "127.0.0.1")
logServiceNum := 3
dnServiceNum := 2
address := newServiceAddresses(t, logServiceNum, dnServiceNum, "127.0.0.1")
address.assertDNService()
address.assertLogService()
for i := 0; i < dnServiceNum; i++ {
addrList := address.listDnServiceAddresses(i)
// 1 address for every dn service now
require.Equal(t, 1, len(addrList))
}
// valid dn index: 0, 1
// invalid dn index: 2
addrList := address.listDnServiceAddresses(2)
require.Equal(t, 0, len(addrList))
for i := 0; i < logServiceNum; i++ {
addrList := address.listLogServiceAddresses(i)
// 3 addresses for every log service now
require.Equal(t, 3, len(addrList))
}
// valid dn index: 0, 1, 2
// invalid dn index: 3
addrList = address.listLogServiceAddresses(3)
require.Equal(t, 0, len(addrList))
// ------------------------------
// integrate with NetworkPartition
// ------------------------------
dnIndex := uint32(1)
logIndex := uint32(2)
partition1 := newNetworkPartition(
logServiceNum, []uint32{logIndex},
dnServiceNum, []uint32{dnIndex},
)
partition2 := remainingNetworkPartition(
logServiceNum, dnServiceNum, partition1,
)
addrSets := address.buildPartitionAddressSets(partition1, partition2)
// there are 2 address sets corresponding with 2 partitions
require.Equal(t, 2, len(addrSets))
// in partition 1, there are 1 dn service and 1 log service.
require.Equal(t, 3+1, len(addrSets[0]))
// in partition 1, there are 1 dn service and 2 log service.
require.Equal(t, 3*2+1, len(addrSets[1]))
// the first address set should contains the following addresses.
dnListenAddr := address.getDnListenAddress(int(dnIndex))
require.True(t, addrSets[0].contains(dnListenAddr))
logListenAddr := address.getLogListenAddress(int(logIndex))
require.True(t, addrSets[0].contains(logListenAddr))
logRaftAddr := address.getLogListenAddress(int(logIndex))
require.True(t, addrSets[0].contains(logRaftAddr))
logGossipAddr := address.getLogListenAddress(int(logIndex))
require.True(t, addrSets[0].contains(logGossipAddr))
}
func TestGetDnListenAddress(t *testing.T) {
dnNum := 3
address := newServiceAddress(t, 1, dnNum, "127.0.0.1")
address := newServiceAddresses(t, 1, dnNum, "127.0.0.1")
addr0 := address.getDnListenAddress(0)
addr1 := address.getDnListenAddress(1)
......@@ -59,7 +114,7 @@ func TestGetDnListenAddress(t *testing.T) {
func TestGetLogListenAddress(t *testing.T) {
logNum := 3
address := newServiceAddress(t, logNum, 1, "127.0.0.1")
address := newServiceAddresses(t, logNum, 1, "127.0.0.1")
addr0 := address.getLogListenAddress(0)
addr1 := address.getLogListenAddress(1)
......@@ -74,7 +129,7 @@ func TestGetLogListenAddress(t *testing.T) {
func TestGetLogRaftAddress(t *testing.T) {
logNum := 3
address := newServiceAddress(t, logNum, 1, "127.0.0.1")
address := newServiceAddresses(t, logNum, 1, "127.0.0.1")
addr0 := address.getLogRaftAddress(0)
addr1 := address.getLogRaftAddress(1)
......@@ -89,7 +144,7 @@ func TestGetLogRaftAddress(t *testing.T) {
func TestGetLogGossipAddress(t *testing.T) {
logNum := 3
address := newServiceAddress(t, logNum, 1, "127.0.0.1")
address := newServiceAddresses(t, logNum, 1, "127.0.0.1")
addr0 := address.getLogGossipAddress(0)
addr1 := address.getLogGossipAddress(1)
......@@ -104,7 +159,7 @@ func TestGetLogGossipAddress(t *testing.T) {
func TestListHAKeeperListenAddresses(t *testing.T) {
logNum := 3
address := newServiceAddress(t, logNum, 1, "127.0.0.1")
address := newServiceAddresses(t, logNum, 1, "127.0.0.1")
addrs := address.listHAKeeperListenAddresses()
require.Equal(t, logNum, len(addrs))
require.NotEqual(t, addrs[0], addrs[1])
......@@ -114,10 +169,60 @@ func TestListHAKeeperListenAddresses(t *testing.T) {
func TestGetLogGossipSeedAddresses(t *testing.T) {
logNum := 4
address := newServiceAddress(t, logNum, 1, "127.0.0.1")
address := newServiceAddresses(t, logNum, 1, "127.0.0.1")
addrs := address.getLogGossipSeedAddresses()
require.Equal(t, defaultGossipSeedNum, len(addrs))
require.NotEqual(t, addrs[0], addrs[1])
require.NotEqual(t, addrs[0], addrs[2])
require.NotEqual(t, addrs[1], addrs[2])
}
func TestPartition(t *testing.T) {
logServiceNum := 3
dnServiceNum := 2
// normal condition
{
partition := newNetworkPartition(
logServiceNum, []uint32{1},
dnServiceNum, []uint32{0, 1},
)
require.Equal(t, []uint32{0, 1}, partition.ListDNServiceIndex())
require.Equal(t, []uint32{1}, partition.ListLogServiceIndex())
remaining := remainingNetworkPartition(
logServiceNum, dnServiceNum, partition,
)
require.Nil(t, remaining.ListDNServiceIndex())
require.Equal(t, []uint32{0, 2}, remaining.ListLogServiceIndex())
require.Equal(t, uint64(0), remaining.dnIndexSet.GetCardinality())
require.Equal(t, uint64(2), remaining.logIndexSet.GetCardinality())
require.True(t, remaining.logIndexSet.Contains(0))
require.True(t, remaining.logIndexSet.Contains(2))
}
// valid dn index should be: 0, 1
// invoker specifies invalid dn index: 2, 3
{
partition := newNetworkPartition(
logServiceNum, nil,
dnServiceNum, []uint32{0, 2, 3},
)
require.Equal(t, []uint32{0}, partition.ListDNServiceIndex())
require.Nil(t, partition.ListLogServiceIndex())
remaining := remainingNetworkPartition(
logServiceNum, dnServiceNum, partition,
)
require.Equal(t, []uint32{1}, remaining.ListDNServiceIndex())
require.Equal(t, []uint32{0, 1, 2}, remaining.ListLogServiceIndex())
require.Equal(t, uint64(1), remaining.dnIndexSet.GetCardinality())
require.Equal(t, uint64(3), remaining.logIndexSet.GetCardinality())
require.True(t, remaining.dnIndexSet.Contains(1))
require.True(t, remaining.logIndexSet.Contains(0))
require.True(t, remaining.logIndexSet.Contains(1))
require.True(t, remaining.logIndexSet.Contains(2))
}
}
......@@ -46,9 +46,9 @@ const (
// default hakeeper configuration
defaultTickPerSecond = 10
defaultLogStoreTimeout = 10 * time.Second
defaultDNStoreTimeout = 10 * time.Second
defaultCheckInterval = 3 * time.Second
defaultLogStoreTimeout = 4 * time.Second
defaultDNStoreTimeout = 3 * time.Second
defaultCheckInterval = 1 * time.Second
// default heartbeat configuration
defaultLogHeartbeatInterval = 1 * time.Second
......
......@@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/dnservice"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/hakeeper"
......@@ -75,9 +76,14 @@ type ClusterOperation interface {
// StartLogServiceIndexed starts log service by its index.
StartLogServiceIndexed(index int) error
// FXIME: support this in the end of development
// StartNetworkPartition(partitions [][]int) error
// CloseNetworkPartition() error
// NewNetworkPartition constructs network partition from service index.
NewNetworkPartition(dnIndexes, logIndexes []uint32) NetworkPartition
// RemainingNetworkPartition returns partition for the remaining services.
RemainingNetworkPartition(partitions ...NetworkPartition) NetworkPartition
// StartNetworkPartition enables network partition feature.
StartNetworkPartition(partitions ...NetworkPartition)
// CloseNetworkPartition disables network partition feature.
CloseNetworkPartition()
}
// ClusterAwareness provides cluster awareness information.
......@@ -102,7 +108,6 @@ type ClusterAwareness interface {
GetClusterState(ctx context.Context) (*logpb.CheckerState, error)
}
// FIXME: add more convenient methods
// ClusterState provides cluster running state.
type ClusterState interface {
// ListDNShards lists all dn shards within the cluster.
......@@ -153,6 +158,24 @@ type ClusterWaitState interface {
WaitDNReplicaReported(ctx context.Context, shardID uint64)
// WaitLogReplicaReported waits log replicas reported.
WaitLogReplicaReported(ctx context.Context, shardID uint64)
// WaitDNStoreTimeout waits dn store timeout by uuid.
WaitDNStoreTimeout(ctx context.Context, uuid string)
// WaitDNStoreTimeoutIndexed waits dn store timeout by index.
WaitDNStoreTimeoutIndexed(ctx context.Context, index int)
// WaitDNStoreReported waits dn store reported by uuid.
WaitDNStoreReported(ctx context.Context, uuid string)
// WaitDNStoreReportedIndexed waits dn store reported by index.
WaitDNStoreReportedIndexed(ctx context.Context, index int)
// WaitLogStoreTimeout waits log store timeout by uuid.
WaitLogStoreTimeout(ctx context.Context, uuid string)
// WaitLogStoreTimeoutIndexed waits log store timeout by index.
WaitLogStoreTimeoutIndexed(ctx context.Context, index int)
// WaitLogStoreReported waits log store reported by uuid.
WaitLogStoreReported(ctx context.Context, uuid string)
// WaitLogStoreReportedIndexed waits log store reported by index.
WaitLogStoreReportedIndexed(ctx context.Context, index int)
}
// ----------------------------------------------------
......@@ -177,10 +200,16 @@ type testCluster struct {
sync.Mutex
cfgs []logservice.Config
opts []logOptions
svcs []LogService
}
address serviceAddress
network struct {
addresses serviceAddresses
sync.RWMutex
addressSets []addressSet
}
fileservices *fileServices
......@@ -203,16 +232,16 @@ func NewCluster(t *testing.T, opt Options) (Cluster, error) {
)
// build addresses for all services
c.address = c.buildServiceAddress()
c.network.addresses = c.buildServiceAddresses()
// build FileService instances
c.fileservices = c.buildFileServices()
// build log service configurations
c.log.cfgs = c.buildLogConfigs(c.address)
c.log.cfgs, c.log.opts = c.buildLogConfigs(c.network.addresses)
// build dn service configurations
c.dn.cfgs, c.dn.opts = c.buildDnConfigs(c.address)
c.dn.cfgs, c.dn.opts = c.buildDnConfigs(c.network.addresses)
return c, nil
}
......@@ -352,7 +381,17 @@ func (c *testCluster) DNStoreExpired(uuid string) (bool, error) {
}
hkcfg := c.GetHAKeeperConfig()
return hkcfg.DnStoreExpired(dnStore.Tick, state.Tick), nil
expired := hkcfg.DnStoreExpired(dnStore.Tick, state.Tick)
c.logger.Info(
"check dn store expired or not",
zap.Any("hakeeper config", hkcfg),
zap.Uint64("dn store tick", dnStore.Tick),
zap.Uint64("current tick", state.Tick),
zap.Bool("expired", expired),
)
return expired, nil
}
func (c *testCluster) DNStoreExpiredIndexed(index int) (bool, error) {
......@@ -373,7 +412,17 @@ func (c *testCluster) LogStoreExpired(uuid string) (bool, error) {
}
hkcfg := c.GetHAKeeperConfig()
return hkcfg.LogStoreExpired(logStore.Tick, state.Tick), nil
expired := hkcfg.LogStoreExpired(logStore.Tick, state.Tick)
c.logger.Info(
"check log store expired or not",
zap.Any("hakeeper config", hkcfg),
zap.Uint64("log store tick", logStore.Tick),
zap.Uint64("current tick", state.Tick),
zap.Bool("expired", expired),
)
return expired, nil
}
func (c *testCluster) LogStoreExpiredIndexed(index int) (bool, error) {
......@@ -466,6 +515,7 @@ func (c *testCluster) WaitDNShardsReported(ctx context.Context) {
reported := ParseReportedDNShardCount(
state.DNState, c.GetHAKeeperConfig(), state.Tick,
)
// FIXME: what about reported larger than expected
if reported >= expected {
return
......@@ -558,6 +608,146 @@ func (c *testCluster) WaitLogReplicaReported(ctx context.Context, shardID uint64
}
}
func (c *testCluster) WaitDNStoreTimeout(ctx context.Context, uuid string) {
for {
select {
case <-ctx.Done():
assert.FailNow(
c.t,
"terminated when waiting dn store timeout",
"dn store %s, error: %s", uuid, ctx.Err(),
)
default:
time.Sleep(defaultWaitInterval)
expired, err := c.DNStoreExpired(uuid)
if err != nil {
c.logger.Error("fail to check dn store expired or not",
zap.Error(err),
zap.String("uuid", uuid),
)
continue
}
if expired {
return
}
}
}
}
func (c *testCluster) WaitDNStoreTimeoutIndexed(ctx context.Context, index int) {
ds, err := c.GetDNServiceIndexed(index)
require.NoError(c.t, err)
c.WaitDNStoreTimeout(ctx, ds.ID())
}
func (c *testCluster) WaitDNStoreReported(ctx context.Context, uuid string) {
for {
select {
case <-ctx.Done():
assert.FailNow(
c.t,
"terminated when waiting dn store reported",
"dn store %s, error: %s", uuid, ctx.Err(),
)
default:
time.Sleep(defaultWaitInterval)
expired, err := c.DNStoreExpired(uuid)
if err != nil {
c.logger.Error("fail to check dn store expired or not",
zap.Error(err),
zap.String("uuid", uuid),
)
continue
}
if !expired {
return
}
}
}
}
func (c *testCluster) WaitDNStoreReportedIndexed(ctx context.Context, index int) {
ds, err := c.GetDNServiceIndexed(index)
require.NoError(c.t, err)
c.WaitDNStoreReported(ctx, ds.ID())
}
func (c *testCluster) WaitLogStoreTimeout(ctx context.Context, uuid string) {
for {
select {
case <-ctx.Done():
assert.FailNow(
c.t,
"terminated when waiting log store timeout",
"log store %s, error: %s", uuid, ctx.Err(),
)
default:
time.Sleep(defaultWaitInterval)
expired, err := c.LogStoreExpired(uuid)
if err != nil {
c.logger.Error("fail to check log store expired or not",
zap.Error(err),
zap.String("uuid", uuid),
)
continue
}
if expired {
return
}
}
}
}
func (c *testCluster) WaitLogStoreTimeoutIndexed(ctx context.Context, index int) {
ls, err := c.GetLogServiceIndexed(index)
require.NoError(c.t, err)
c.WaitLogStoreTimeout(ctx, ls.ID())
}
func (c *testCluster) WaitLogStoreReported(ctx context.Context, uuid string) {
for {
select {
case <-ctx.Done():
assert.FailNow(
c.t,
"terminated when waiting log store reported",
"log store %s, error: %s", uuid, ctx.Err(),
)
default:
time.Sleep(defaultWaitInterval)
expired, err := c.LogStoreExpired(uuid)
if err != nil {
c.logger.Error("fail to check log store expired or not",
zap.Error(err),
zap.String("uuid", uuid),
)
continue
}
if !expired {
return
}
}
}
}
func (c *testCluster) WaitLogStoreReportedIndexed(ctx context.Context, index int) {
ls, err := c.GetLogServiceIndexed(index)
require.NoError(c.t, err)
c.WaitLogStoreReported(ctx, ls.ID())
}
// --------------------------------------------------------------
// The following are implements for interface `ClusterAwareness`.
// --------------------------------------------------------------
......@@ -705,13 +895,47 @@ func (c *testCluster) StartLogServiceIndexed(index int) error {
return ls.Start()
}
func (c *testCluster) NewNetworkPartition(
dnIndexes, logIndexes []uint32,
) NetworkPartition {
return newNetworkPartition(
c.opt.initial.logServiceNum, logIndexes,
c.opt.initial.dnServiceNum, dnIndexes,
)
}
func (c *testCluster) RemainingNetworkPartition(
partitions ...NetworkPartition,
) NetworkPartition {
return remainingNetworkPartition(
c.opt.initial.logServiceNum,
c.opt.initial.dnServiceNum,
partitions...,
)
}
func (c *testCluster) StartNetworkPartition(parts ...NetworkPartition) {
c.network.Lock()
defer c.network.Unlock()
addressSets := c.network.addresses.buildPartitionAddressSets(parts...)
c.network.addressSets = addressSets
}
func (c *testCluster) CloseNetworkPartition() {
c.network.Lock()
defer c.network.Unlock()
c.network.addressSets = nil
}
// ------------------------------------------------------
// The following are private utilities for `testCluster`.
// ------------------------------------------------------
// buildServiceAddress builds addresses for all services.
func (c *testCluster) buildServiceAddress() serviceAddress {
return newServiceAddress(
// buildServiceAddresses builds addresses for all services.
func (c *testCluster) buildServiceAddresses() serviceAddresses {
return newServiceAddresses(
c.t,
c.opt.initial.logServiceNum,
c.opt.initial.dnServiceNum,
......@@ -726,15 +950,18 @@ func (c *testCluster) buildFileServices() *fileServices {
// buildDnConfigs builds configurations for all dn services.
func (c *testCluster) buildDnConfigs(
address serviceAddress,
address serviceAddresses,
) ([]*dnservice.Config, []dnOptions) {
batch := c.opt.initial.dnServiceNum
cfgs := make([]*dnservice.Config, 0, batch)
opts := make([]dnOptions, 0, batch)
for i := 0; i < batch; i++ {
cfg, opt := buildDnConfig(i, c.opt, address)
cfg := buildDnConfig(i, c.opt, address)
cfgs = append(cfgs, cfg)
localAddr := cfg.ListenAddress
opt := buildDnOptions(cfg, c.backendFilterFactory(localAddr))
opts = append(opts, opt)
}
return cfgs, opts
......@@ -742,16 +969,21 @@ func (c *testCluster) buildDnConfigs(
// buildLogConfigs builds configurations for all log services.
func (c *testCluster) buildLogConfigs(
address serviceAddress,
) []logservice.Config {
address serviceAddresses,
) ([]logservice.Config, []logOptions) {
batch := c.opt.initial.logServiceNum
cfgs := make([]logservice.Config, 0, batch)
opts := make([]logOptions, 0, batch)
for i := 0; i < batch; i++ {
cfg := buildLogConfig(i, c.opt, address)
cfgs = append(cfgs, cfg)
localAddr := cfg.ServiceAddress
opt := buildLogOptions(cfg, c.backendFilterFactory(localAddr))
opts = append(opts, opt)
}
return cfgs
return cfgs, opts
}
// initDNServices builds all dn services.
......@@ -802,7 +1034,8 @@ func (c *testCluster) initLogServices() []LogService {
svcs := make([]LogService, 0, batch)
for i := 0; i < batch; i++ {
cfg := c.log.cfgs[i]
ls, err := newLogService(cfg)
opt := c.log.opts[i]
ls, err := newLogService(cfg, opt)
require.NoError(c.t, err)
c.logger.Info(
......@@ -959,3 +1192,35 @@ func (c *testCluster) rangeHAKeeperService(
}
}
}
// FilterFunc returns true if traffic was allowed.
type FilterFunc func(morpc.Message, string) bool
// backendFilterFactory constructs a closure with the type of FilterFunc.
func (c *testCluster) backendFilterFactory(localAddr string) FilterFunc {
return func(_ morpc.Message, backendAddr string) bool {
// NB: it's possible that partition takes effect once more after disabled.
c.network.RLock()
addressSets := c.network.addressSets
c.network.RUnlock()
if len(addressSets) == 0 {
return true
}
for _, addrSet := range addressSets {
if addrSet.contains(localAddr) &&
addrSet.contains(backendAddr) {
return true
}
}
c.logger.Info(
"traffic not allowed",
zap.String("local", localAddr),
zap.String("backend", backendAddr),
)
return false
}
}
......@@ -24,6 +24,10 @@ import (
logpb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
)
const (
defaultTimeout = 10 * time.Second
)
func TestClusterStart(t *testing.T) {
// initialize cluster
c, err := NewCluster(t, DefaultOptions())
......@@ -79,17 +83,17 @@ func TestClusterAwareness(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ServiceStarted, log.Status())
ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second)
ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel1()
leader := c.WaitHAKeeperLeader(ctx1)
require.NotNil(t, leader)
// we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat.
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel2()
c.WaitHAKeeperState(ctx2, logpb.HAKeeperRunning)
ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second)
ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel3()
state, err := c.GetClusterState(ctx3)
require.NoError(t, err)
......@@ -275,7 +279,7 @@ func TestClusterState(t *testing.T) {
// ----------------------------------------
// the following would test `ClusterState`.
// ----------------------------------------
ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second)
ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel1()
leader := c.WaitHAKeeperLeader(ctx1)
require.NotNil(t, leader)
......@@ -287,7 +291,7 @@ func TestClusterState(t *testing.T) {
require.Equal(t, logSvcNum, len(lsuuids))
// we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat.
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel2()
c.WaitHAKeeperState(ctx2, logpb.HAKeeperRunning)
......@@ -297,7 +301,7 @@ func TestClusterState(t *testing.T) {
// cluster should be healthy
require.True(t, c.IsClusterHealthy())
ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second)
ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel3()
state, err := c.GetClusterState(ctx3)
require.NoError(t, err)
......@@ -305,13 +309,13 @@ func TestClusterState(t *testing.T) {
require.Equal(t, logSvcNum, len(state.LogState.Stores))
// FIXME: validate the result list of dn shards
ctx4, cancel4 := context.WithTimeout(context.Background(), 2*time.Second)
ctx4, cancel4 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel4()
_, err = c.ListDNShards(ctx4)
require.NoError(t, err)
// FIXME: validate the result list of log shards
ctx5, cancel5 := context.WithTimeout(context.Background(), 2*time.Second)
ctx5, cancel5 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel5()
_, err = c.ListLogShards(ctx5)
require.NoError(t, err)
......@@ -325,12 +329,12 @@ func TestClusterState(t *testing.T) {
dnIndex := 0
dsuuid := dsuuids[dnIndex]
ctx6, cancel6 := context.WithTimeout(context.Background(), 2*time.Second)
ctx6, cancel6 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel6()
dnStoreInfo1, err := c.GetDNStoreInfo(ctx6, dsuuid)
require.NoError(t, err)
ctx7, cancel7 := context.WithTimeout(context.Background(), 2*time.Second)
ctx7, cancel7 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel7()
dnStoreInfo2, err := c.GetDNStoreInfoIndexed(ctx7, dnIndex)
require.NoError(t, err)
......@@ -354,12 +358,12 @@ func TestClusterState(t *testing.T) {
logIndex := 1
lsuuid := lsuuids[logIndex]
ctx8, cancel8 := context.WithTimeout(context.Background(), 2*time.Second)
ctx8, cancel8 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel8()
logStoreInfo1, err := c.GetLogStoreInfo(ctx8, lsuuid)
require.NoError(t, err)
ctx9, cancel9 := context.WithTimeout(context.Background(), 2*time.Second)
ctx9, cancel9 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel9()
logStoreInfo2, err := c.GetLogStoreInfoIndexed(ctx9, logIndex)
require.NoError(t, err)
......@@ -396,54 +400,131 @@ func TestClusterWaitState(t *testing.T) {
require.NoError(t, err)
}()
// --------------------------------------------
// the following would test `ClusterWaitState`.
// --------------------------------------------
// we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat.
ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second)
ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel1()
c.WaitHAKeeperState(ctx1, logpb.HAKeeperRunning)
// --------------------------------------------
// the following would test `ClusterWaitState`.
// --------------------------------------------
// test WaitDNShardsReported
{
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel2()
c.WaitDNShardsReported(ctx2)
}
// test WaitLogShardsReported
{
ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second)
ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel3()
c.WaitLogShardsReported(ctx3)
}
// test WaitDNReplicaReported
{
ctx4, cancel4 := context.WithTimeout(context.Background(), 2*time.Second)
ctx4, cancel4 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel4()
dnShards, err := c.ListDNShards(ctx4)
require.NoError(t, err)
require.NotZero(t, len(dnShards))
dnShardID := dnShards[0].ShardID
ctx5, cancel5 := context.WithTimeout(context.Background(), 2*time.Second)
ctx5, cancel5 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel5()
c.WaitDNReplicaReported(ctx5, dnShardID)
}
// test WaitLogReplicaReported
{
ctx6, cancel6 := context.WithTimeout(context.Background(), 2*time.Second)
ctx6, cancel6 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel6()
logShards, err := c.ListLogShards(ctx6)
require.NotZero(t, len(logShards))
require.NoError(t, err)
logShardID := logShards[0].ShardID
ctx7, cancel7 := context.WithTimeout(context.Background(), 2*time.Second)
ctx7, cancel7 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel7()
c.WaitLogReplicaReported(ctx7, logShardID)
}
}
func TestNetworkPartition(t *testing.T) {
dnSvcNum := 2
logSvcNum := 4
opt := DefaultOptions().
WithDNServiceNum(dnSvcNum).
WithLogServiceNum(logSvcNum)
// initialize cluster
c, err := NewCluster(t, opt)
require.NoError(t, err)
// start the cluster
err = c.Start()
require.NoError(t, err)
// close the cluster after all
defer func() {
err := c.Close()
require.NoError(t, err)
}()
// we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat.
ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel1()
c.WaitHAKeeperState(ctx1, logpb.HAKeeperRunning)
// --------------------------------------------
// the following would test network partition
// --------------------------------------------
// dn service index: 0, 1
// log service index: 0, 1, 2, 3
// seperate dn service 1 from other services
partition1 := c.NewNetworkPartition([]uint32{1}, nil)
require.Equal(t, []uint32{1}, partition1.ListDNServiceIndex())
require.Nil(t, partition1.ListLogServiceIndex())
partition2 := c.RemainingNetworkPartition(partition1)
require.Equal(t, []uint32{0}, partition2.ListDNServiceIndex())
require.Equal(t, []uint32{0, 1, 2, 3}, partition2.ListLogServiceIndex())
// enable network partition
c.StartNetworkPartition(partition1, partition2)
ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel2()
c.WaitDNStoreTimeoutIndexed(ctx2, 1)
// disable network partition
c.CloseNetworkPartition()
ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel3()
c.WaitDNStoreReportedIndexed(ctx3, 1)
// dn service index: 0, 1
// log service index: 0, 1, 2, 3
// seperate log service 3 from other services
partition3 := c.NewNetworkPartition(nil, []uint32{3})
require.Nil(t, partition3.ListDNServiceIndex())
require.Equal(t, []uint32{3}, partition3.ListLogServiceIndex())
partition4 := c.RemainingNetworkPartition(partition3)
require.Equal(t, []uint32{0, 1}, partition4.ListDNServiceIndex())
require.Equal(t, []uint32{0, 1, 2}, partition4.ListLogServiceIndex())
// enable network partition
c.StartNetworkPartition(partition3, partition4)
ctx4, cancel4 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel4()
c.WaitLogStoreTimeoutIndexed(ctx4, 3)
// disable network partition
c.CloseNetworkPartition()
ctx5, cancel5 := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel5()
c.WaitLogStoreReportedIndexed(ctx5, 3)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment