Skip to content
Snippets Groups Projects
Unverified Commit a272bea5 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

dnservice: fix start failed if any replica exists (#4465)

If any Replica exists, a deadlock will occur when DNService starts.

Approved by: @lni
parent f0e300db
No related branches found
No related tags found
No related merge requests found
......@@ -295,7 +295,7 @@ func (s *store) createReplica(shard metadata.DNShard) error {
return err
}
s.addDNShard(shard)
s.addDNShardLocked(shard)
return nil
}
......
......@@ -62,10 +62,7 @@ func (s *store) initMetadata() error {
return nil
}
func (s *store) addDNShard(shard metadata.DNShard) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *store) addDNShardLocked(shard metadata.DNShard) {
for _, dn := range s.mu.metadata.Shards {
if dn.ShardID == shard.ShardID {
return
......
......@@ -34,7 +34,6 @@ import (
var (
testDNStoreAddr = "unix:///tmp/test-dnstore.sock"
testDataDir = "/tmp/mo/dn"
)
func TestNewAndStartAndCloseService(t *testing.T) {
......@@ -50,33 +49,27 @@ func TestNewAndStartAndCloseService(t *testing.T) {
func TestAddReplica(t *testing.T) {
runDNStoreTest(t, func(s *store) {
thc := s.hakeeperClient.(*testHAKeeperClient)
thc.setCommandBatch(logservicepb.CommandBatch{
Commands: []logservicepb.ScheduleCommand{
{
ServiceType: logservicepb.DnService,
ConfigChange: &logservicepb.ConfigChange{
ChangeType: logservicepb.AddReplica,
Replica: logservicepb.Replica{
LogShardID: 3,
ReplicaID: 2,
ShardID: 1,
},
},
},
},
})
addTestReplica(t, s, 1, 2, 3)
})
}
for {
r := s.getReplica(1)
if r != nil {
r.waitStarted()
assert.Equal(t, newTestDNShard(1, 2, 3), r.shard)
return
}
time.Sleep(time.Millisecond * 10)
func TestStartWithReplicas(t *testing.T) {
localFS, err := fileservice.NewMemoryFS()
assert.NoError(t, err)
factory := func(name string) (fileservice.FileService, error) {
if name == localFileServiceName {
return localFS, nil
}
})
return fileservice.NewMemoryFS()
}
runDNStoreTestWithFileServiceFactory(t, func(s *store) {
addTestReplica(t, s, 1, 2, 3)
}, factory)
runDNStoreTestWithFileServiceFactory(t, func(s *store) {
}, factory)
}
func TestStartReplica(t *testing.T) {
......@@ -134,12 +127,25 @@ func TestCloseReplica(t *testing.T) {
})
}
func runDNStoreTest(t *testing.T, testFn func(*store), opts ...Option) {
thc := newTestHAKeeperClient()
func runDNStoreTest(
t *testing.T,
testFn func(*store),
opts ...Option) {
runDNStoreTestWithFileServiceFactory(t, testFn, func(name string) (fileservice.FileService, error) {
return fileservice.NewMemoryFS()
}, opts...)
}
opts = append(opts, WithHAKeeperClientFactory(func() (logservice.DNHAKeeperClient, error) {
return thc, nil
}),
func runDNStoreTestWithFileServiceFactory(
t *testing.T,
testFn func(*store),
fsFactory fileservice.FileServiceFactory,
opts ...Option) {
thc := newTestHAKeeperClient()
opts = append(opts,
WithHAKeeperClientFactory(func() (logservice.DNHAKeeperClient, error) {
return thc, nil
}),
WithLogServiceClientFactory(func(d metadata.DNShard) (logservice.Client, error) {
return mem.NewMemLog(), nil
}),
......@@ -148,27 +154,60 @@ func runDNStoreTest(t *testing.T, testFn func(*store), opts ...Option) {
c.Txn.Storage.Backend = memStorageBackend
}))
s := newTestStore(t, "u1", opts...)
if fsFactory == nil {
fsFactory = func(name string) (fileservice.FileService, error) {
return fileservice.NewMemoryFS()
}
}
s := newTestStore(t, "u1", fsFactory, opts...)
defer func() {
assert.NoError(t, s.Close())
}()
assert.NoError(t, s.Start())
testFn(s)
}
func newTestStore(t *testing.T, uuid string, options ...Option) *store {
assert.NoError(t, os.RemoveAll(testDataDir))
func addTestReplica(t *testing.T, s *store, shardID, replicaID, logShardID uint64) {
thc := s.hakeeperClient.(*testHAKeeperClient)
thc.setCommandBatch(logservicepb.CommandBatch{
Commands: []logservicepb.ScheduleCommand{
{
ServiceType: logservicepb.DnService,
ConfigChange: &logservicepb.ConfigChange{
ChangeType: logservicepb.AddReplica,
Replica: logservicepb.Replica{
LogShardID: logShardID,
ReplicaID: replicaID,
ShardID: shardID,
},
},
},
},
})
for {
r := s.getReplica(1)
if r != nil {
r.waitStarted()
assert.Equal(t, newTestDNShard(shardID, replicaID, logShardID), r.shard)
return
}
time.Sleep(time.Millisecond * 10)
}
}
func newTestStore(
t *testing.T,
uuid string,
fsFactory fileservice.FileServiceFactory,
options ...Option) *store {
assert.NoError(t, os.RemoveAll(testDNStoreAddr[7:]))
assert.NoError(t, os.MkdirAll(testDataDir, 0755))
c := &Config{
UUID: uuid,
ListenAddress: testDNStoreAddr,
}
c.Txn.Clock.MaxClockOffset.Duration = time.Duration(math.MaxInt64)
s, err := NewService(c, func(name string) (fileservice.FileService, error) {
return fileservice.NewMemoryFS()
}, options...)
s, err := NewService(c, fsFactory, options...)
assert.NoError(t, err)
return s.(*store)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment