diff --git a/pkg/dnservice/cfg.go b/pkg/dnservice/cfg.go index dbe5252c3f0e5bc409e4ba2bd2ea32790625fcf2..ac8c4f88a94824d1cb72b15ca3e58ce84efd1b89 100644 --- a/pkg/dnservice/cfg.go +++ b/pkg/dnservice/cfg.go @@ -119,7 +119,7 @@ type Config struct { } } -func (c *Config) validate() error { +func (c *Config) Validate() error { if c.UUID == "" { return fmt.Errorf("Config.UUID not set") } diff --git a/pkg/dnservice/cfg_test.go b/pkg/dnservice/cfg_test.go index 4cbfd2b62a468fdcb2fb8147911e780caafcf098..ea2651fc3cd8bfdcd9c314d4766a28b387e14979 100644 --- a/pkg/dnservice/cfg_test.go +++ b/pkg/dnservice/cfg_test.go @@ -23,9 +23,9 @@ import ( func TestValidate(t *testing.T) { c := &Config{} - assert.Error(t, c.validate()) + assert.Error(t, c.Validate()) c.UUID = "dn1" - assert.NoError(t, c.validate()) + assert.NoError(t, c.Validate()) assert.Equal(t, defaultListenAddress, c.ListenAddress) assert.Equal(t, c.ListenAddress, defaultListenAddress) diff --git a/pkg/dnservice/store.go b/pkg/dnservice/store.go index a94aaa02fc580d6a7d953e93667b90c867e409b2..cb3860de3b290294f46cda710804af40acfbb8e3 100644 --- a/pkg/dnservice/store.go +++ b/pkg/dnservice/store.go @@ -52,7 +52,7 @@ func WithConfigAdjust(adjustConfigFunc func(c *Config)) Option { } // WithBackendFilter set filtering txn.TxnRequest sent to other DNShard -func WithBackendFilter(filter func(*txn.TxnRequest, string) bool) Option { +func WithBackendFilter(filter func(morpc.Message, string) bool) Option { return func(s *store) { s.options.backendFilter = filter } @@ -88,7 +88,7 @@ type store struct { options struct { logServiceClientFactory func(metadata.DNShard) (logservice.Client, error) hakeekerClientFactory func() (logservice.DNHAKeeperClient, error) - backendFilter func(req *txn.TxnRequest, backendAddr string) bool + backendFilter func(msg morpc.Message, backendAddr string) bool adjustConfigFunc func(c *Config) } @@ -102,7 +102,7 @@ type store struct { func NewService(cfg *Config, fsFactory fileservice.FileServiceFactory, opts ...Option) (Service, error) { - if err := cfg.validate(); err != nil { + if err := cfg.Validate(); err != nil { return nil, err } diff --git a/pkg/logservice/hakeeper_client.go b/pkg/logservice/hakeeper_client.go index e793d6fd6d6c13b1d0d6548fad9641648f8323f5..7facacebe7c80f0b86a7f6c965946c67398ab0b9 100644 --- a/pkg/logservice/hakeeper_client.go +++ b/pkg/logservice/hakeeper_client.go @@ -70,6 +70,8 @@ var _ DNHAKeeperClient = (*managedHAKeeperClient)(nil) var _ LogHAKeeperClient = (*managedHAKeeperClient)(nil) // NewCNHAKeeperClient creates a HAKeeper client to be used by a CN node. +// +// NB: caller could specify options for morpc.Client via ctx. func NewCNHAKeeperClient(ctx context.Context, cfg HAKeeperClientConfig) (CNHAKeeperClient, error) { if err := cfg.Validate(); err != nil { @@ -79,6 +81,8 @@ func NewCNHAKeeperClient(ctx context.Context, } // NewDNHAKeeperClient creates a HAKeeper client to be used by a DN node. +// +// NB: caller could specify options for morpc.Client via ctx. func NewDNHAKeeperClient(ctx context.Context, cfg HAKeeperClientConfig) (DNHAKeeperClient, error) { if err := cfg.Validate(); err != nil { @@ -88,6 +92,8 @@ func NewDNHAKeeperClient(ctx context.Context, } // NewLogHAKeeperClient creates a HAKeeper client to be used by a Log Service node. +// +// NB: caller could specify options for morpc.Client via ctx. func NewLogHAKeeperClient(ctx context.Context, cfg HAKeeperClientConfig) (LogHAKeeperClient, error) { if err := cfg.Validate(); err != nil { @@ -102,12 +108,23 @@ func newManagedHAKeeperClient(ctx context.Context, if err != nil { return nil, err } - return &managedHAKeeperClient{client: c, cfg: cfg}, nil + + return &managedHAKeeperClient{ + client: c, + cfg: cfg, + backendOptions: GetBackendOptions(ctx), + clientOptions: GetClientOptions(ctx), + }, nil } type managedHAKeeperClient struct { cfg HAKeeperClientConfig client *hakeeperClient + + // Method `prepareClient` may update moprc.Client. + // So we need to keep options for morpc.Client. + backendOptions []morpc.BackendOption + clientOptions []morpc.ClientOption } func (c *managedHAKeeperClient) Close() error { @@ -202,6 +219,11 @@ func (c *managedHAKeeperClient) prepareClient(ctx context.Context) error { if c.client != nil { return nil } + + // we must use the recoreded options for morpc.Client + ctx = SetBackendOptions(ctx, c.backendOptions...) + ctx = SetClientOptions(ctx, c.clientOptions...) + cc, err := newHAKeeperClient(ctx, c.cfg) if err != nil { return err diff --git a/pkg/tests/service/address_test.go b/pkg/tests/service/address_test.go deleted file mode 100644 index 735308bf1ecf2d7263401f2692d7d22ae6fa390a..0000000000000000000000000000000000000000 --- a/pkg/tests/service/address_test.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2021 - 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package service - -import ( - "strconv" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGetAvailablePort(t *testing.T) { - defer func() { - curPort = 10000 - }() - - curPort = maxPort - _, err := getAvailablePort("127.0.0.1") - require.Error(t, err) - - curPort = maxPort - 1 - port, err := getAvailablePort("127.0.0.1") - require.NoError(t, err) - require.Equal(t, port, strconv.Itoa(maxPort)) -} - -func TestServiceAddress(t *testing.T) { - address := newServiceAddress(t, 3, 3, "127.0.0.1") - address.assertDNService() - address.assertLogService() -} - -func TestGetDnListenAddress(t *testing.T) { - dnNum := 3 - address := newServiceAddress(t, 1, dnNum, "127.0.0.1") - - addr0 := address.getDnListenAddress(0) - addr1 := address.getDnListenAddress(1) - addr2 := address.getDnListenAddress(2) - addr3 := address.getDnListenAddress(3) - - require.NotEqual(t, addr0, addr1) - require.NotEqual(t, addr0, addr2) - require.NotEqual(t, addr1, addr2) - require.Equal(t, "", addr3) -} - -func TestGetLogListenAddress(t *testing.T) { - logNum := 3 - address := newServiceAddress(t, logNum, 1, "127.0.0.1") - - addr0 := address.getLogListenAddress(0) - addr1 := address.getLogListenAddress(1) - addr2 := address.getLogListenAddress(2) - addr3 := address.getLogListenAddress(3) - - require.NotEqual(t, addr0, addr1) - require.NotEqual(t, addr0, addr2) - require.NotEqual(t, addr1, addr2) - require.Equal(t, "", addr3) -} - -func TestGetLogRaftAddress(t *testing.T) { - logNum := 3 - address := newServiceAddress(t, logNum, 1, "127.0.0.1") - - addr0 := address.getLogRaftAddress(0) - addr1 := address.getLogRaftAddress(1) - addr2 := address.getLogRaftAddress(2) - addr3 := address.getLogRaftAddress(3) - - require.NotEqual(t, addr0, addr1) - require.NotEqual(t, addr0, addr2) - require.NotEqual(t, addr1, addr2) - require.Equal(t, "", addr3) -} - -func TestGetLogGossipAddress(t *testing.T) { - logNum := 3 - address := newServiceAddress(t, logNum, 1, "127.0.0.1") - - addr0 := address.getLogGossipAddress(0) - addr1 := address.getLogGossipAddress(1) - addr2 := address.getLogGossipAddress(2) - addr3 := address.getLogGossipAddress(3) - - require.NotEqual(t, addr0, addr1) - require.NotEqual(t, addr0, addr2) - require.NotEqual(t, addr1, addr2) - require.Equal(t, "", addr3) -} - -func TestListHAKeeperListenAddresses(t *testing.T) { - logNum := 3 - address := newServiceAddress(t, logNum, 1, "127.0.0.1") - addrs := address.listHAKeeperListenAddresses() - require.Equal(t, logNum, len(addrs)) - require.NotEqual(t, addrs[0], addrs[1]) - require.NotEqual(t, addrs[0], addrs[2]) - require.NotEqual(t, addrs[1], addrs[2]) -} - -func TestGetLogGossipSeedAddresses(t *testing.T) { - logNum := 4 - address := newServiceAddress(t, logNum, 1, "127.0.0.1") - addrs := address.getLogGossipSeedAddresses() - require.Equal(t, defaultGossipSeedNum, len(addrs)) - require.NotEqual(t, addrs[0], addrs[1]) - require.NotEqual(t, addrs[0], addrs[2]) - require.NotEqual(t, addrs[1], addrs[2]) -} diff --git a/pkg/tests/service/dnservice.go b/pkg/tests/service/dnservice.go index 459912b83ad64e6938446c857e7f0dc925e8a82b..60d92cec244f0613e483dcbc03c11a1e6e5308f5 100644 --- a/pkg/tests/service/dnservice.go +++ b/pkg/tests/service/dnservice.go @@ -15,12 +15,16 @@ package service import ( + "context" + "fmt" "sync" "github.com/google/uuid" + "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/dnservice" "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/logservice" "github.com/matrixorigin/matrixone/pkg/pb/metadata" ) @@ -138,8 +142,8 @@ func newDNService( // buildDnConfig builds configuration for a dn service. func buildDnConfig( - index int, opt Options, address serviceAddress, -) (*dnservice.Config, []dnservice.Option) { + index int, opt Options, address serviceAddresses, +) *dnservice.Config { cfg := &dnservice.Config{ UUID: uuid.New().String(), ListenAddress: address.getDnListenAddress(index), @@ -149,5 +153,59 @@ func buildDnConfig( // FIXME: support different storage, consult @reusee cfg.Txn.Storage.Backend = opt.dn.txnStorageBackend - return cfg, nil + // We need the filled version of configuration. + // It's necessary when building dnservice.Option. + if err := cfg.Validate(); err != nil { + panic(fmt.Sprintf("fatal when building dnservice.Config: %s", err)) + } + + return cfg +} + +// buildDnOptions builds options for a dn service. +// +// NB: We need the filled version of dnservice.Config. +func buildDnOptions(cfg *dnservice.Config, filter FilterFunc) dnOptions { + // factory to construct client for hakeeper + hakeeperClientFactory := func() (logservice.DNHAKeeperClient, error) { + ctx, cancel := context.WithTimeout( + context.Background(), cfg.HAKeeper.DiscoveryTimeout.Duration, + ) + defer cancel() + + // transfer morpc.BackendOption via context + ctx = logservice.SetBackendOptions(ctx, morpc.WithBackendFilter(filter)) + + client, err := logservice.NewDNHAKeeperClient( + ctx, cfg.HAKeeper.ClientConfig, + ) + if err != nil { + return nil, err + } + return client, nil + } + + // factory to construct client for log service + logServiceClientFactory := func(shard metadata.DNShard) (logservice.Client, error) { + ctx, cancel := context.WithTimeout( + context.Background(), cfg.LogService.ConnectTimeout.Duration, + ) + defer cancel() + + // transfer morpc.BackendOption via context + ctx = logservice.SetBackendOptions(ctx, morpc.WithBackendFilter(filter)) + + return logservice.NewClient(ctx, logservice.ClientConfig{ + ReadOnly: false, + LogShardID: shard.LogShardID, + DNReplicaID: shard.ReplicaID, + ServiceAddresses: cfg.HAKeeper.ClientConfig.ServiceAddresses, + }) + } + + return []dnservice.Option{ + dnservice.WithHAKeeperClientFactory(hakeeperClientFactory), + dnservice.WithLogServiceClientFactory(logServiceClientFactory), + dnservice.WithBackendFilter(filter), + } } diff --git a/pkg/tests/service/logservice.go b/pkg/tests/service/logservice.go index 9c26049b43df397100b504225ed309d8e1d70404..a58d8190532538793a119e767668190ba93d2786 100644 --- a/pkg/tests/service/logservice.go +++ b/pkg/tests/service/logservice.go @@ -130,9 +130,14 @@ func (ls *logService) StartHAKeeperReplica( return ls.svc.StartHAKeeperReplica(replicaID, initialReplicas, join) } +// logOptions is options for a log service. +type logOptions []logservice.Option + // newLogService constructs an instance of `LogService`. -func newLogService(cfg logservice.Config) (LogService, error) { - svc, err := logservice.NewWrappedService(cfg) +func newLogService( + cfg logservice.Config, opts logOptions, +) (LogService, error) { + svc, err := logservice.NewWrappedService(cfg, opts...) if err != nil { return nil, err } @@ -140,7 +145,9 @@ func newLogService(cfg logservice.Config) (LogService, error) { } // buildLogConfig builds configuration for a log service. -func buildLogConfig(index int, opt Options, address serviceAddress) logservice.Config { +func buildLogConfig( + index int, opt Options, address serviceAddresses, +) logservice.Config { cfg := logservice.Config{ UUID: uuid.New().String(), FS: vfs.NewStrictMem(), @@ -155,7 +162,6 @@ func buildLogConfig(index int, opt Options, address serviceAddress) logservice.C cfg.HeartbeatInterval.Duration = opt.log.heartbeatInterval cfg.HAKeeperCheckInterval.Duration = opt.hakeeper.checkInterval cfg.HAKeeperClientConfig.ServiceAddresses = address.listHAKeeperListenAddresses() - // setting hakeeper configuration cfg.HAKeeperConfig.TickPerSecond = opt.hakeeper.tickPerSecond cfg.HAKeeperConfig.LogStoreTimeout.Duration = opt.hakeeper.logStoreTimeout @@ -163,9 +169,19 @@ func buildLogConfig(index int, opt Options, address serviceAddress) logservice.C // we must invoke Fill in order to setting default configruation value. cfg.Fill() + return cfg } +// buildLogOptions builds options for a log service. +// +// NB: We need the filled version of logservice.Config. +func buildLogOptions(cfg logservice.Config, filter FilterFunc) logOptions { + return []logservice.Option{ + logservice.WithBackendFilter(filter), + } +} + // buildLogDataDir generates data directory for a log service. func buildLogDataDir(root string, index int) string { return path.Join(root, "log", strconv.Itoa(index)) diff --git a/pkg/tests/service/address.go b/pkg/tests/service/network.go similarity index 50% rename from pkg/tests/service/address.go rename to pkg/tests/service/network.go index b70969015da11a749ff8530a7fd8465fd5e35bea..1b094e477493e6e66f760529777dac9f60cf17bd 100644 --- a/pkg/tests/service/address.go +++ b/pkg/tests/service/network.go @@ -20,6 +20,7 @@ import ( "sync" "testing" + "github.com/RoaringBitmap/roaring" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -30,8 +31,8 @@ var ( curPortMu sync.Mutex ) -// serviceAddress contains addresses of all services. -type serviceAddress struct { +// serviceAddresses contains addresses of all services. +type serviceAddresses struct { t *testing.T // Construct service addresses according to service number @@ -42,11 +43,11 @@ type serviceAddress struct { dnAddresses []dnServiceAddress } -// newServiceAddress constructs addresses for all services. -func newServiceAddress( +// newServiceAddresses constructs addresses for all services. +func newServiceAddresses( t *testing.T, logServiceNum, dnServiceNum int, hostAddr string, -) serviceAddress { - address := serviceAddress{ +) serviceAddresses { + address := serviceAddresses{ t: t, logServiceNum: logServiceNum, dnServiceNum: dnServiceNum, @@ -76,17 +77,17 @@ func newServiceAddress( } // assertDNService asserts constructed address for dn service. -func (a serviceAddress) assertDNService() { +func (a serviceAddresses) assertDNService() { assert.Equal(a.t, a.dnServiceNum, len(a.dnAddresses)) } // assertLogService asserts constructed address for log service. -func (a serviceAddress) assertLogService() { +func (a serviceAddresses) assertLogService() { assert.Equal(a.t, a.logServiceNum, len(a.logAddresses)) } // getDnListenAddress gets dn service address by its index. -func (a serviceAddress) getDnListenAddress(index int) string { +func (a serviceAddresses) getDnListenAddress(index int) string { a.assertDNService() if index >= len(a.dnAddresses) || index < 0 { @@ -96,7 +97,7 @@ func (a serviceAddress) getDnListenAddress(index int) string { } // getLogListenAddress gets log service address by its index. -func (a serviceAddress) getLogListenAddress(index int) string { +func (a serviceAddresses) getLogListenAddress(index int) string { a.assertLogService() if index >= len(a.logAddresses) || index < 0 { @@ -106,7 +107,7 @@ func (a serviceAddress) getLogListenAddress(index int) string { } // getLogRaftAddress gets log raft address by its index. -func (a serviceAddress) getLogRaftAddress(index int) string { +func (a serviceAddresses) getLogRaftAddress(index int) string { a.assertLogService() if index >= len(a.logAddresses) || index < 0 { @@ -116,7 +117,7 @@ func (a serviceAddress) getLogRaftAddress(index int) string { } // getLogGossipAddress gets log gossip address by its index. -func (a serviceAddress) getLogGossipAddress(index int) string { +func (a serviceAddresses) getLogGossipAddress(index int) string { a.assertLogService() if index >= len(a.logAddresses) || index < 0 { @@ -130,7 +131,7 @@ func (a serviceAddress) getLogGossipAddress(index int) string { // Select gossip addresses of the first 3 log services. // If the number of log services was less than 3, // then select all of them. -func (a serviceAddress) getLogGossipSeedAddresses() []string { +func (a serviceAddresses) getLogGossipSeedAddresses() []string { a.assertLogService() n := gossipSeedNum(len(a.logAddresses)) @@ -146,7 +147,7 @@ func (a serviceAddress) getLogGossipSeedAddresses() []string { // Select the first 3 log services to start hakeeper replica. // If the number of log services was less than 3, // then select the first of them. -func (a serviceAddress) listHAKeeperListenAddresses() []string { +func (a serviceAddresses) listHAKeeperListenAddresses() []string { a.assertLogService() n := haKeeperNum(len(a.logAddresses)) @@ -157,6 +158,49 @@ func (a serviceAddress) listHAKeeperListenAddresses() []string { return listenAddrs } +// buildPartitionAddressSets returns service addresses by every partition. +func (a serviceAddresses) buildPartitionAddressSets(partitions ...NetworkPartition) []addressSet { + sets := make([]addressSet, 0, len(partitions)) + for _, part := range partitions { + sets = append(sets, a.listPartitionAddresses(part)) + } + return sets +} + +// listPartitionAddresses returns all service addresses within the same partition. +func (a serviceAddresses) listPartitionAddresses(partition NetworkPartition) addressSet { + addrSet := newAddressSet() + for _, dnIndex := range partition.ListDNServiceIndex() { + addrs := a.listDnServiceAddresses(int(dnIndex)) + addrSet.addAddresses(addrs...) + } + for _, logIndex := range partition.ListLogServiceIndex() { + addrs := a.listLogServiceAddresses(int(logIndex)) + addrSet.addAddresses(addrs...) + } + return addrSet +} + +// listDnServiceAddresses lists all addresses of dn service by its index. +func (a serviceAddresses) listDnServiceAddresses(index int) []string { + a.assertDNService() + + if index >= len(a.dnAddresses) || index < 0 { + return nil + } + return a.dnAddresses[index].listAddresses() +} + +// listLogServiceAddresses lists all addresses of log service by its index. +func (a serviceAddresses) listLogServiceAddresses(index int) []string { + a.assertLogService() + + if index >= len(a.logAddresses) || index < 0 { + return nil + } + return a.logAddresses[index].listAddresses() +} + // logServiceAddress contains addresses for log service. type logServiceAddress struct { listenAddr string @@ -177,6 +221,11 @@ func newLogServiceAddress(host string) (logServiceAddress, error) { }, nil } +// listAddresses returns all addresses for single log service. +func (la logServiceAddress) listAddresses() []string { + return []string{la.listenAddr, la.raftAddr, la.gossipAddr} +} + // dnServiceAddress contains address for dn service. type dnServiceAddress struct { listenAddr string @@ -192,6 +241,11 @@ func newDNServiceAddress(host string) (dnServiceAddress, error) { }, nil } +// listAddresses returns all addresses for single dn service. +func (da dnServiceAddress) listAddresses() []string { + return []string{da.listenAddr} +} + // getAddressBatch generates service addresses by batch. func getAddressBatch(host string, batch int) ([]string, error) { addrs := make([]string, batch) @@ -229,3 +283,100 @@ func getAvailablePort(host string) (string, error) { } return strconv.Itoa(port), nil } + +// addressSet records addresses for services within the same partition. +type addressSet map[string]struct{} + +func newAddressSet() addressSet { + return make(map[string]struct{}) +} + +// addAddresses registers a list of addresses. +func (s addressSet) addAddresses(addrs ...string) { + for _, addr := range addrs { + s[addr] = struct{}{} + } +} + +// contains checks address exist or not. +func (s addressSet) contains(addr string) bool { + _, ok := s[addr] + return ok +} + +// NetworkPartition records index of services from the same network partition. +type NetworkPartition struct { + logIndexSet *roaring.Bitmap + dnIndexSet *roaring.Bitmap +} + +// newNetworkPartition returns an instance of NetworkPartition. +// +// The returned instance only contains valid index according to service number. +func newNetworkPartition( + logServiceNum int, logIndexes []uint32, + dnServiceNum int, dnIndexes []uint32, +) NetworkPartition { + logTotal := roaring.FlipInt(roaring.NewBitmap(), 0, logServiceNum) + dnTotal := roaring.FlipInt(roaring.NewBitmap(), 0, dnServiceNum) + + rawLogSet := roaring.BitmapOf(logIndexes...) + rawDnSet := roaring.BitmapOf(dnIndexes...) + + return NetworkPartition{ + logIndexSet: roaring.And(logTotal, rawLogSet), + dnIndexSet: roaring.And(dnTotal, rawDnSet), + } +} + +// remainingNetworkPartition returns partition for the remaining services. +func remainingNetworkPartition( + logServiceNum, dnServiceNum int, partitions ...NetworkPartition, +) NetworkPartition { + logTotal := roaring.FlipInt(roaring.NewBitmap(), 0, logServiceNum) + dnTotal := roaring.FlipInt(roaring.NewBitmap(), 0, dnServiceNum) + + logUsed := roaring.NewBitmap() + dnUsed := roaring.NewBitmap() + for _, p := range partitions { + dnUsed.Or(p.dnIndexSet) + logUsed.Or(p.logIndexSet) + } + + return NetworkPartition{ + logIndexSet: roaring.AndNot(logTotal, logUsed), + dnIndexSet: roaring.AndNot(dnTotal, dnUsed), + } +} + +// ListDNServiceIndex lists index of all dn services in the partition. +func (p NetworkPartition) ListDNServiceIndex() []uint32 { + set := p.dnIndexSet + + if set.GetCardinality() == 0 { + return nil + } + + indexes := make([]uint32, 0, set.GetCardinality()) + iter := set.Iterator() + for iter.HasNext() { + indexes = append(indexes, iter.Next()) + } + return indexes +} + +// ListLogServiceIndex lists index of all log services in the partition. +func (p NetworkPartition) ListLogServiceIndex() []uint32 { + set := p.logIndexSet + + if set.GetCardinality() == 0 { + return nil + } + + indexes := make([]uint32, 0, set.GetCardinality()) + iter := set.Iterator() + for iter.HasNext() { + indexes = append(indexes, iter.Next()) + } + return indexes +} diff --git a/pkg/tests/service/network_test.go b/pkg/tests/service/network_test.go new file mode 100644 index 0000000000000000000000000000000000000000..35479a47523c6f315d0b4a01bee1b594520c9cc8 --- /dev/null +++ b/pkg/tests/service/network_test.go @@ -0,0 +1,228 @@ +// Copyright 2021 - 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetAvailablePort(t *testing.T) { + defer func() { + curPort = 10000 + }() + + curPort = maxPort + _, err := getAvailablePort("127.0.0.1") + require.Error(t, err) + + curPort = maxPort - 1 + port, err := getAvailablePort("127.0.0.1") + require.NoError(t, err) + require.Equal(t, port, strconv.Itoa(maxPort)) +} + +func TestServiceAddress(t *testing.T) { + logServiceNum := 3 + dnServiceNum := 2 + + address := newServiceAddresses(t, logServiceNum, dnServiceNum, "127.0.0.1") + address.assertDNService() + address.assertLogService() + + for i := 0; i < dnServiceNum; i++ { + addrList := address.listDnServiceAddresses(i) + // 1 address for every dn service now + require.Equal(t, 1, len(addrList)) + } + // valid dn index: 0, 1 + // invalid dn index: 2 + addrList := address.listDnServiceAddresses(2) + require.Equal(t, 0, len(addrList)) + + for i := 0; i < logServiceNum; i++ { + addrList := address.listLogServiceAddresses(i) + // 3 addresses for every log service now + require.Equal(t, 3, len(addrList)) + } + // valid dn index: 0, 1, 2 + // invalid dn index: 3 + addrList = address.listLogServiceAddresses(3) + require.Equal(t, 0, len(addrList)) + + // ------------------------------ + // integrate with NetworkPartition + // ------------------------------ + dnIndex := uint32(1) + logIndex := uint32(2) + partition1 := newNetworkPartition( + logServiceNum, []uint32{logIndex}, + dnServiceNum, []uint32{dnIndex}, + ) + + partition2 := remainingNetworkPartition( + logServiceNum, dnServiceNum, partition1, + ) + + addrSets := address.buildPartitionAddressSets(partition1, partition2) + // there are 2 address sets corresponding with 2 partitions + require.Equal(t, 2, len(addrSets)) + // in partition 1, there are 1 dn service and 1 log service. + require.Equal(t, 3+1, len(addrSets[0])) + // in partition 1, there are 1 dn service and 2 log service. + require.Equal(t, 3*2+1, len(addrSets[1])) + + // the first address set should contains the following addresses. + dnListenAddr := address.getDnListenAddress(int(dnIndex)) + require.True(t, addrSets[0].contains(dnListenAddr)) + logListenAddr := address.getLogListenAddress(int(logIndex)) + require.True(t, addrSets[0].contains(logListenAddr)) + logRaftAddr := address.getLogListenAddress(int(logIndex)) + require.True(t, addrSets[0].contains(logRaftAddr)) + logGossipAddr := address.getLogListenAddress(int(logIndex)) + require.True(t, addrSets[0].contains(logGossipAddr)) +} + +func TestGetDnListenAddress(t *testing.T) { + dnNum := 3 + address := newServiceAddresses(t, 1, dnNum, "127.0.0.1") + + addr0 := address.getDnListenAddress(0) + addr1 := address.getDnListenAddress(1) + addr2 := address.getDnListenAddress(2) + addr3 := address.getDnListenAddress(3) + + require.NotEqual(t, addr0, addr1) + require.NotEqual(t, addr0, addr2) + require.NotEqual(t, addr1, addr2) + require.Equal(t, "", addr3) +} + +func TestGetLogListenAddress(t *testing.T) { + logNum := 3 + address := newServiceAddresses(t, logNum, 1, "127.0.0.1") + + addr0 := address.getLogListenAddress(0) + addr1 := address.getLogListenAddress(1) + addr2 := address.getLogListenAddress(2) + addr3 := address.getLogListenAddress(3) + + require.NotEqual(t, addr0, addr1) + require.NotEqual(t, addr0, addr2) + require.NotEqual(t, addr1, addr2) + require.Equal(t, "", addr3) +} + +func TestGetLogRaftAddress(t *testing.T) { + logNum := 3 + address := newServiceAddresses(t, logNum, 1, "127.0.0.1") + + addr0 := address.getLogRaftAddress(0) + addr1 := address.getLogRaftAddress(1) + addr2 := address.getLogRaftAddress(2) + addr3 := address.getLogRaftAddress(3) + + require.NotEqual(t, addr0, addr1) + require.NotEqual(t, addr0, addr2) + require.NotEqual(t, addr1, addr2) + require.Equal(t, "", addr3) +} + +func TestGetLogGossipAddress(t *testing.T) { + logNum := 3 + address := newServiceAddresses(t, logNum, 1, "127.0.0.1") + + addr0 := address.getLogGossipAddress(0) + addr1 := address.getLogGossipAddress(1) + addr2 := address.getLogGossipAddress(2) + addr3 := address.getLogGossipAddress(3) + + require.NotEqual(t, addr0, addr1) + require.NotEqual(t, addr0, addr2) + require.NotEqual(t, addr1, addr2) + require.Equal(t, "", addr3) +} + +func TestListHAKeeperListenAddresses(t *testing.T) { + logNum := 3 + address := newServiceAddresses(t, logNum, 1, "127.0.0.1") + addrs := address.listHAKeeperListenAddresses() + require.Equal(t, logNum, len(addrs)) + require.NotEqual(t, addrs[0], addrs[1]) + require.NotEqual(t, addrs[0], addrs[2]) + require.NotEqual(t, addrs[1], addrs[2]) +} + +func TestGetLogGossipSeedAddresses(t *testing.T) { + logNum := 4 + address := newServiceAddresses(t, logNum, 1, "127.0.0.1") + addrs := address.getLogGossipSeedAddresses() + require.Equal(t, defaultGossipSeedNum, len(addrs)) + require.NotEqual(t, addrs[0], addrs[1]) + require.NotEqual(t, addrs[0], addrs[2]) + require.NotEqual(t, addrs[1], addrs[2]) +} + +func TestPartition(t *testing.T) { + logServiceNum := 3 + dnServiceNum := 2 + + // normal condition + { + partition := newNetworkPartition( + logServiceNum, []uint32{1}, + dnServiceNum, []uint32{0, 1}, + ) + require.Equal(t, []uint32{0, 1}, partition.ListDNServiceIndex()) + require.Equal(t, []uint32{1}, partition.ListLogServiceIndex()) + + remaining := remainingNetworkPartition( + logServiceNum, dnServiceNum, partition, + ) + require.Nil(t, remaining.ListDNServiceIndex()) + require.Equal(t, []uint32{0, 2}, remaining.ListLogServiceIndex()) + + require.Equal(t, uint64(0), remaining.dnIndexSet.GetCardinality()) + require.Equal(t, uint64(2), remaining.logIndexSet.GetCardinality()) + require.True(t, remaining.logIndexSet.Contains(0)) + require.True(t, remaining.logIndexSet.Contains(2)) + } + + // valid dn index should be: 0, 1 + // invoker specifies invalid dn index: 2, 3 + { + partition := newNetworkPartition( + logServiceNum, nil, + dnServiceNum, []uint32{0, 2, 3}, + ) + require.Equal(t, []uint32{0}, partition.ListDNServiceIndex()) + require.Nil(t, partition.ListLogServiceIndex()) + + remaining := remainingNetworkPartition( + logServiceNum, dnServiceNum, partition, + ) + require.Equal(t, []uint32{1}, remaining.ListDNServiceIndex()) + require.Equal(t, []uint32{0, 1, 2}, remaining.ListLogServiceIndex()) + + require.Equal(t, uint64(1), remaining.dnIndexSet.GetCardinality()) + require.Equal(t, uint64(3), remaining.logIndexSet.GetCardinality()) + require.True(t, remaining.dnIndexSet.Contains(1)) + require.True(t, remaining.logIndexSet.Contains(0)) + require.True(t, remaining.logIndexSet.Contains(1)) + require.True(t, remaining.logIndexSet.Contains(2)) + } +} diff --git a/pkg/tests/service/options.go b/pkg/tests/service/options.go index 1b7a61b27fc2d88c7a5e914508ba69ea861c638a..43803007496eeb9e258363514ec66bb4a01b0d1e 100644 --- a/pkg/tests/service/options.go +++ b/pkg/tests/service/options.go @@ -46,9 +46,9 @@ const ( // default hakeeper configuration defaultTickPerSecond = 10 - defaultLogStoreTimeout = 10 * time.Second - defaultDNStoreTimeout = 10 * time.Second - defaultCheckInterval = 3 * time.Second + defaultLogStoreTimeout = 4 * time.Second + defaultDNStoreTimeout = 3 * time.Second + defaultCheckInterval = 1 * time.Second // default heartbeat configuration defaultLogHeartbeatInterval = 1 * time.Second diff --git a/pkg/tests/service/service.go b/pkg/tests/service/service.go index 09ef43449f0beb3710d42db61661f68a92b2f558..d974cde2e622a1bdaeffbe4b5857c74ae7178775 100644 --- a/pkg/tests/service/service.go +++ b/pkg/tests/service/service.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/dnservice" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/hakeeper" @@ -75,9 +76,14 @@ type ClusterOperation interface { // StartLogServiceIndexed starts log service by its index. StartLogServiceIndexed(index int) error - // FXIME: support this in the end of development - // StartNetworkPartition(partitions [][]int) error - // CloseNetworkPartition() error + // NewNetworkPartition constructs network partition from service index. + NewNetworkPartition(dnIndexes, logIndexes []uint32) NetworkPartition + // RemainingNetworkPartition returns partition for the remaining services. + RemainingNetworkPartition(partitions ...NetworkPartition) NetworkPartition + // StartNetworkPartition enables network partition feature. + StartNetworkPartition(partitions ...NetworkPartition) + // CloseNetworkPartition disables network partition feature. + CloseNetworkPartition() } // ClusterAwareness provides cluster awareness information. @@ -102,7 +108,6 @@ type ClusterAwareness interface { GetClusterState(ctx context.Context) (*logpb.CheckerState, error) } -// FIXME: add more convenient methods // ClusterState provides cluster running state. type ClusterState interface { // ListDNShards lists all dn shards within the cluster. @@ -153,6 +158,24 @@ type ClusterWaitState interface { WaitDNReplicaReported(ctx context.Context, shardID uint64) // WaitLogReplicaReported waits log replicas reported. WaitLogReplicaReported(ctx context.Context, shardID uint64) + + // WaitDNStoreTimeout waits dn store timeout by uuid. + WaitDNStoreTimeout(ctx context.Context, uuid string) + // WaitDNStoreTimeoutIndexed waits dn store timeout by index. + WaitDNStoreTimeoutIndexed(ctx context.Context, index int) + // WaitDNStoreReported waits dn store reported by uuid. + WaitDNStoreReported(ctx context.Context, uuid string) + // WaitDNStoreReportedIndexed waits dn store reported by index. + WaitDNStoreReportedIndexed(ctx context.Context, index int) + + // WaitLogStoreTimeout waits log store timeout by uuid. + WaitLogStoreTimeout(ctx context.Context, uuid string) + // WaitLogStoreTimeoutIndexed waits log store timeout by index. + WaitLogStoreTimeoutIndexed(ctx context.Context, index int) + // WaitLogStoreReported waits log store reported by uuid. + WaitLogStoreReported(ctx context.Context, uuid string) + // WaitLogStoreReportedIndexed waits log store reported by index. + WaitLogStoreReportedIndexed(ctx context.Context, index int) } // ---------------------------------------------------- @@ -177,10 +200,16 @@ type testCluster struct { sync.Mutex cfgs []logservice.Config + opts []logOptions svcs []LogService } - address serviceAddress + network struct { + addresses serviceAddresses + + sync.RWMutex + addressSets []addressSet + } fileservices *fileServices @@ -203,16 +232,16 @@ func NewCluster(t *testing.T, opt Options) (Cluster, error) { ) // build addresses for all services - c.address = c.buildServiceAddress() + c.network.addresses = c.buildServiceAddresses() // build FileService instances c.fileservices = c.buildFileServices() // build log service configurations - c.log.cfgs = c.buildLogConfigs(c.address) + c.log.cfgs, c.log.opts = c.buildLogConfigs(c.network.addresses) // build dn service configurations - c.dn.cfgs, c.dn.opts = c.buildDnConfigs(c.address) + c.dn.cfgs, c.dn.opts = c.buildDnConfigs(c.network.addresses) return c, nil } @@ -352,7 +381,17 @@ func (c *testCluster) DNStoreExpired(uuid string) (bool, error) { } hkcfg := c.GetHAKeeperConfig() - return hkcfg.DnStoreExpired(dnStore.Tick, state.Tick), nil + expired := hkcfg.DnStoreExpired(dnStore.Tick, state.Tick) + + c.logger.Info( + "check dn store expired or not", + zap.Any("hakeeper config", hkcfg), + zap.Uint64("dn store tick", dnStore.Tick), + zap.Uint64("current tick", state.Tick), + zap.Bool("expired", expired), + ) + + return expired, nil } func (c *testCluster) DNStoreExpiredIndexed(index int) (bool, error) { @@ -373,7 +412,17 @@ func (c *testCluster) LogStoreExpired(uuid string) (bool, error) { } hkcfg := c.GetHAKeeperConfig() - return hkcfg.LogStoreExpired(logStore.Tick, state.Tick), nil + expired := hkcfg.LogStoreExpired(logStore.Tick, state.Tick) + + c.logger.Info( + "check log store expired or not", + zap.Any("hakeeper config", hkcfg), + zap.Uint64("log store tick", logStore.Tick), + zap.Uint64("current tick", state.Tick), + zap.Bool("expired", expired), + ) + + return expired, nil } func (c *testCluster) LogStoreExpiredIndexed(index int) (bool, error) { @@ -466,6 +515,7 @@ func (c *testCluster) WaitDNShardsReported(ctx context.Context) { reported := ParseReportedDNShardCount( state.DNState, c.GetHAKeeperConfig(), state.Tick, ) + // FIXME: what about reported larger than expected if reported >= expected { return @@ -558,6 +608,146 @@ func (c *testCluster) WaitLogReplicaReported(ctx context.Context, shardID uint64 } } +func (c *testCluster) WaitDNStoreTimeout(ctx context.Context, uuid string) { + for { + select { + case <-ctx.Done(): + assert.FailNow( + c.t, + "terminated when waiting dn store timeout", + "dn store %s, error: %s", uuid, ctx.Err(), + ) + default: + time.Sleep(defaultWaitInterval) + + expired, err := c.DNStoreExpired(uuid) + if err != nil { + c.logger.Error("fail to check dn store expired or not", + zap.Error(err), + zap.String("uuid", uuid), + ) + continue + } + + if expired { + return + } + } + } +} + +func (c *testCluster) WaitDNStoreTimeoutIndexed(ctx context.Context, index int) { + ds, err := c.GetDNServiceIndexed(index) + require.NoError(c.t, err) + + c.WaitDNStoreTimeout(ctx, ds.ID()) +} + +func (c *testCluster) WaitDNStoreReported(ctx context.Context, uuid string) { + for { + select { + case <-ctx.Done(): + assert.FailNow( + c.t, + "terminated when waiting dn store reported", + "dn store %s, error: %s", uuid, ctx.Err(), + ) + default: + time.Sleep(defaultWaitInterval) + + expired, err := c.DNStoreExpired(uuid) + if err != nil { + c.logger.Error("fail to check dn store expired or not", + zap.Error(err), + zap.String("uuid", uuid), + ) + continue + } + + if !expired { + return + } + } + } +} + +func (c *testCluster) WaitDNStoreReportedIndexed(ctx context.Context, index int) { + ds, err := c.GetDNServiceIndexed(index) + require.NoError(c.t, err) + + c.WaitDNStoreReported(ctx, ds.ID()) +} + +func (c *testCluster) WaitLogStoreTimeout(ctx context.Context, uuid string) { + for { + select { + case <-ctx.Done(): + assert.FailNow( + c.t, + "terminated when waiting log store timeout", + "log store %s, error: %s", uuid, ctx.Err(), + ) + default: + time.Sleep(defaultWaitInterval) + + expired, err := c.LogStoreExpired(uuid) + if err != nil { + c.logger.Error("fail to check log store expired or not", + zap.Error(err), + zap.String("uuid", uuid), + ) + continue + } + + if expired { + return + } + } + } +} + +func (c *testCluster) WaitLogStoreTimeoutIndexed(ctx context.Context, index int) { + ls, err := c.GetLogServiceIndexed(index) + require.NoError(c.t, err) + + c.WaitLogStoreTimeout(ctx, ls.ID()) +} + +func (c *testCluster) WaitLogStoreReported(ctx context.Context, uuid string) { + for { + select { + case <-ctx.Done(): + assert.FailNow( + c.t, + "terminated when waiting log store reported", + "log store %s, error: %s", uuid, ctx.Err(), + ) + default: + time.Sleep(defaultWaitInterval) + + expired, err := c.LogStoreExpired(uuid) + if err != nil { + c.logger.Error("fail to check log store expired or not", + zap.Error(err), + zap.String("uuid", uuid), + ) + continue + } + + if !expired { + return + } + } + } +} + +func (c *testCluster) WaitLogStoreReportedIndexed(ctx context.Context, index int) { + ls, err := c.GetLogServiceIndexed(index) + require.NoError(c.t, err) + + c.WaitLogStoreReported(ctx, ls.ID()) +} + // -------------------------------------------------------------- // The following are implements for interface `ClusterAwareness`. // -------------------------------------------------------------- @@ -705,13 +895,47 @@ func (c *testCluster) StartLogServiceIndexed(index int) error { return ls.Start() } +func (c *testCluster) NewNetworkPartition( + dnIndexes, logIndexes []uint32, +) NetworkPartition { + return newNetworkPartition( + c.opt.initial.logServiceNum, logIndexes, + c.opt.initial.dnServiceNum, dnIndexes, + ) +} + +func (c *testCluster) RemainingNetworkPartition( + partitions ...NetworkPartition, +) NetworkPartition { + return remainingNetworkPartition( + c.opt.initial.logServiceNum, + c.opt.initial.dnServiceNum, + partitions..., + ) +} + +func (c *testCluster) StartNetworkPartition(parts ...NetworkPartition) { + c.network.Lock() + defer c.network.Unlock() + + addressSets := c.network.addresses.buildPartitionAddressSets(parts...) + c.network.addressSets = addressSets +} + +func (c *testCluster) CloseNetworkPartition() { + c.network.Lock() + defer c.network.Unlock() + + c.network.addressSets = nil +} + // ------------------------------------------------------ // The following are private utilities for `testCluster`. // ------------------------------------------------------ -// buildServiceAddress builds addresses for all services. -func (c *testCluster) buildServiceAddress() serviceAddress { - return newServiceAddress( +// buildServiceAddresses builds addresses for all services. +func (c *testCluster) buildServiceAddresses() serviceAddresses { + return newServiceAddresses( c.t, c.opt.initial.logServiceNum, c.opt.initial.dnServiceNum, @@ -726,15 +950,18 @@ func (c *testCluster) buildFileServices() *fileServices { // buildDnConfigs builds configurations for all dn services. func (c *testCluster) buildDnConfigs( - address serviceAddress, + address serviceAddresses, ) ([]*dnservice.Config, []dnOptions) { batch := c.opt.initial.dnServiceNum cfgs := make([]*dnservice.Config, 0, batch) opts := make([]dnOptions, 0, batch) for i := 0; i < batch; i++ { - cfg, opt := buildDnConfig(i, c.opt, address) + cfg := buildDnConfig(i, c.opt, address) cfgs = append(cfgs, cfg) + + localAddr := cfg.ListenAddress + opt := buildDnOptions(cfg, c.backendFilterFactory(localAddr)) opts = append(opts, opt) } return cfgs, opts @@ -742,16 +969,21 @@ func (c *testCluster) buildDnConfigs( // buildLogConfigs builds configurations for all log services. func (c *testCluster) buildLogConfigs( - address serviceAddress, -) []logservice.Config { + address serviceAddresses, +) ([]logservice.Config, []logOptions) { batch := c.opt.initial.logServiceNum cfgs := make([]logservice.Config, 0, batch) + opts := make([]logOptions, 0, batch) for i := 0; i < batch; i++ { cfg := buildLogConfig(i, c.opt, address) cfgs = append(cfgs, cfg) + + localAddr := cfg.ServiceAddress + opt := buildLogOptions(cfg, c.backendFilterFactory(localAddr)) + opts = append(opts, opt) } - return cfgs + return cfgs, opts } // initDNServices builds all dn services. @@ -802,7 +1034,8 @@ func (c *testCluster) initLogServices() []LogService { svcs := make([]LogService, 0, batch) for i := 0; i < batch; i++ { cfg := c.log.cfgs[i] - ls, err := newLogService(cfg) + opt := c.log.opts[i] + ls, err := newLogService(cfg, opt) require.NoError(c.t, err) c.logger.Info( @@ -959,3 +1192,35 @@ func (c *testCluster) rangeHAKeeperService( } } } + +// FilterFunc returns true if traffic was allowed. +type FilterFunc func(morpc.Message, string) bool + +// backendFilterFactory constructs a closure with the type of FilterFunc. +func (c *testCluster) backendFilterFactory(localAddr string) FilterFunc { + return func(_ morpc.Message, backendAddr string) bool { + // NB: it's possible that partition takes effect once more after disabled. + c.network.RLock() + addressSets := c.network.addressSets + c.network.RUnlock() + + if len(addressSets) == 0 { + return true + } + + for _, addrSet := range addressSets { + if addrSet.contains(localAddr) && + addrSet.contains(backendAddr) { + return true + } + } + + c.logger.Info( + "traffic not allowed", + zap.String("local", localAddr), + zap.String("backend", backendAddr), + ) + + return false + } +} diff --git a/pkg/tests/service/service_test.go b/pkg/tests/service/service_test.go index 6f2e798b0f8ab46193086891dd541ca5fe4413e0..495d6a5daae4b00192bdc1086df9e84e894e5bd2 100644 --- a/pkg/tests/service/service_test.go +++ b/pkg/tests/service/service_test.go @@ -24,6 +24,10 @@ import ( logpb "github.com/matrixorigin/matrixone/pkg/pb/logservice" ) +const ( + defaultTimeout = 10 * time.Second +) + func TestClusterStart(t *testing.T) { // initialize cluster c, err := NewCluster(t, DefaultOptions()) @@ -79,17 +83,17 @@ func TestClusterAwareness(t *testing.T) { require.NoError(t, err) require.Equal(t, ServiceStarted, log.Status()) - ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel1() leader := c.WaitHAKeeperLeader(ctx1) require.NotNil(t, leader) // we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat. - ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel2() c.WaitHAKeeperState(ctx2, logpb.HAKeeperRunning) - ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second) + ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel3() state, err := c.GetClusterState(ctx3) require.NoError(t, err) @@ -275,7 +279,7 @@ func TestClusterState(t *testing.T) { // ---------------------------------------- // the following would test `ClusterState`. // ---------------------------------------- - ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel1() leader := c.WaitHAKeeperLeader(ctx1) require.NotNil(t, leader) @@ -287,7 +291,7 @@ func TestClusterState(t *testing.T) { require.Equal(t, logSvcNum, len(lsuuids)) // we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat. - ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel2() c.WaitHAKeeperState(ctx2, logpb.HAKeeperRunning) @@ -297,7 +301,7 @@ func TestClusterState(t *testing.T) { // cluster should be healthy require.True(t, c.IsClusterHealthy()) - ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second) + ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel3() state, err := c.GetClusterState(ctx3) require.NoError(t, err) @@ -305,13 +309,13 @@ func TestClusterState(t *testing.T) { require.Equal(t, logSvcNum, len(state.LogState.Stores)) // FIXME: validate the result list of dn shards - ctx4, cancel4 := context.WithTimeout(context.Background(), 2*time.Second) + ctx4, cancel4 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel4() _, err = c.ListDNShards(ctx4) require.NoError(t, err) // FIXME: validate the result list of log shards - ctx5, cancel5 := context.WithTimeout(context.Background(), 2*time.Second) + ctx5, cancel5 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel5() _, err = c.ListLogShards(ctx5) require.NoError(t, err) @@ -325,12 +329,12 @@ func TestClusterState(t *testing.T) { dnIndex := 0 dsuuid := dsuuids[dnIndex] - ctx6, cancel6 := context.WithTimeout(context.Background(), 2*time.Second) + ctx6, cancel6 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel6() dnStoreInfo1, err := c.GetDNStoreInfo(ctx6, dsuuid) require.NoError(t, err) - ctx7, cancel7 := context.WithTimeout(context.Background(), 2*time.Second) + ctx7, cancel7 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel7() dnStoreInfo2, err := c.GetDNStoreInfoIndexed(ctx7, dnIndex) require.NoError(t, err) @@ -354,12 +358,12 @@ func TestClusterState(t *testing.T) { logIndex := 1 lsuuid := lsuuids[logIndex] - ctx8, cancel8 := context.WithTimeout(context.Background(), 2*time.Second) + ctx8, cancel8 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel8() logStoreInfo1, err := c.GetLogStoreInfo(ctx8, lsuuid) require.NoError(t, err) - ctx9, cancel9 := context.WithTimeout(context.Background(), 2*time.Second) + ctx9, cancel9 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel9() logStoreInfo2, err := c.GetLogStoreInfoIndexed(ctx9, logIndex) require.NoError(t, err) @@ -396,54 +400,131 @@ func TestClusterWaitState(t *testing.T) { require.NoError(t, err) }() - // -------------------------------------------- - // the following would test `ClusterWaitState`. - // -------------------------------------------- - // we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat. - ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel1() c.WaitHAKeeperState(ctx1, logpb.HAKeeperRunning) + // -------------------------------------------- + // the following would test `ClusterWaitState`. + // -------------------------------------------- + // test WaitDNShardsReported { - ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel2() c.WaitDNShardsReported(ctx2) } // test WaitLogShardsReported { - ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second) + ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel3() c.WaitLogShardsReported(ctx3) } // test WaitDNReplicaReported { - ctx4, cancel4 := context.WithTimeout(context.Background(), 2*time.Second) + ctx4, cancel4 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel4() dnShards, err := c.ListDNShards(ctx4) require.NoError(t, err) require.NotZero(t, len(dnShards)) dnShardID := dnShards[0].ShardID - ctx5, cancel5 := context.WithTimeout(context.Background(), 2*time.Second) + ctx5, cancel5 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel5() c.WaitDNReplicaReported(ctx5, dnShardID) } // test WaitLogReplicaReported { - ctx6, cancel6 := context.WithTimeout(context.Background(), 2*time.Second) + ctx6, cancel6 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel6() logShards, err := c.ListLogShards(ctx6) require.NotZero(t, len(logShards)) require.NoError(t, err) logShardID := logShards[0].ShardID - ctx7, cancel7 := context.WithTimeout(context.Background(), 2*time.Second) + ctx7, cancel7 := context.WithTimeout(context.Background(), defaultTimeout) defer cancel7() c.WaitLogReplicaReported(ctx7, logShardID) } } + +func TestNetworkPartition(t *testing.T) { + dnSvcNum := 2 + logSvcNum := 4 + opt := DefaultOptions(). + WithDNServiceNum(dnSvcNum). + WithLogServiceNum(logSvcNum) + + // initialize cluster + c, err := NewCluster(t, opt) + require.NoError(t, err) + + // start the cluster + err = c.Start() + require.NoError(t, err) + + // close the cluster after all + defer func() { + err := c.Close() + require.NoError(t, err) + }() + + // we must wait for hakeeper's running state, or hakeeper wouldn't receive hearbeat. + ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel1() + c.WaitHAKeeperState(ctx1, logpb.HAKeeperRunning) + + // -------------------------------------------- + // the following would test network partition + // -------------------------------------------- + + // dn service index: 0, 1 + // log service index: 0, 1, 2, 3 + // seperate dn service 1 from other services + partition1 := c.NewNetworkPartition([]uint32{1}, nil) + require.Equal(t, []uint32{1}, partition1.ListDNServiceIndex()) + require.Nil(t, partition1.ListLogServiceIndex()) + + partition2 := c.RemainingNetworkPartition(partition1) + require.Equal(t, []uint32{0}, partition2.ListDNServiceIndex()) + require.Equal(t, []uint32{0, 1, 2, 3}, partition2.ListLogServiceIndex()) + + // enable network partition + c.StartNetworkPartition(partition1, partition2) + ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel2() + c.WaitDNStoreTimeoutIndexed(ctx2, 1) + + // disable network partition + c.CloseNetworkPartition() + ctx3, cancel3 := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel3() + c.WaitDNStoreReportedIndexed(ctx3, 1) + + // dn service index: 0, 1 + // log service index: 0, 1, 2, 3 + // seperate log service 3 from other services + partition3 := c.NewNetworkPartition(nil, []uint32{3}) + require.Nil(t, partition3.ListDNServiceIndex()) + require.Equal(t, []uint32{3}, partition3.ListLogServiceIndex()) + + partition4 := c.RemainingNetworkPartition(partition3) + require.Equal(t, []uint32{0, 1}, partition4.ListDNServiceIndex()) + require.Equal(t, []uint32{0, 1, 2}, partition4.ListLogServiceIndex()) + + // enable network partition + c.StartNetworkPartition(partition3, partition4) + ctx4, cancel4 := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel4() + c.WaitLogStoreTimeoutIndexed(ctx4, 3) + + // disable network partition + c.CloseNetworkPartition() + ctx5, cancel5 := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel5() + c.WaitLogStoreReportedIndexed(ctx5, 3) +}