diff --git a/pkg/hakeeper/bootstrap/bootstrap_test.go b/pkg/hakeeper/bootstrap/bootstrap_test.go
index 212e82cd971b10a6afac72cd61813195ad35ad37..d918893355e5f80801feadd2101a07fcd3be629d 100644
--- a/pkg/hakeeper/bootstrap/bootstrap_test.go
+++ b/pkg/hakeeper/bootstrap/bootstrap_test.go
@@ -345,7 +345,6 @@ func TestIssue3814(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "",
 				}},
 			},
 			log:      pb.LogState{},
diff --git a/pkg/hakeeper/checkers/coordinator_test.go b/pkg/hakeeper/checkers/coordinator_test.go
index 4896867acdf167850d3170f30f66eeb9582804c4..39473ff1078e432ccdd7a9ad25f755980a641611 100644
--- a/pkg/hakeeper/checkers/coordinator_test.go
+++ b/pkg/hakeeper/checkers/coordinator_test.go
@@ -43,7 +43,6 @@ func TestFixExpiredStore(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard1",
 				}},
 			},
 			log: pb.LogState{
@@ -97,7 +96,6 @@ func TestFixExpiredStore(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard1",
 				}},
 			},
 			log: pb.LogState{
@@ -163,7 +161,6 @@ func TestFixExpiredStore(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard1",
 				}},
 			},
 			log: pb.LogState{
@@ -224,7 +221,6 @@ func TestFixExpiredStore(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard1",
 				}},
 			},
 			log: pb.LogState{
@@ -306,7 +302,6 @@ func TestFixZombie(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard1",
 				}},
 			},
 			log: pb.LogState{
diff --git a/pkg/hakeeper/checkers/logservice/check_test.go b/pkg/hakeeper/checkers/logservice/check_test.go
index caf2dbb7f4d22cade76bd1760c7e984e8cf4f5ed..14d79f23c2a5a662d826fc7972ee908bf9480ecd 100644
--- a/pkg/hakeeper/checkers/logservice/check_test.go
+++ b/pkg/hakeeper/checkers/logservice/check_test.go
@@ -42,7 +42,6 @@ func TestCheck(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard 1",
 				}},
 			},
 			infos: pb.LogState{
@@ -96,7 +95,6 @@ func TestCheck(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard 1",
 				}},
 			},
 			infos: pb.LogState{
@@ -164,7 +162,6 @@ func TestCheck(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard 1",
 				}},
 			},
 			infos: pb.LogState{
@@ -220,7 +217,6 @@ func TestCheck(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard 1",
 				}},
 			},
 			infos: pb.LogState{
@@ -273,7 +269,6 @@ func TestCheck(t *testing.T) {
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
 					NumberOfReplicas: 3,
-					Name:             "shard 1",
 				}},
 			},
 			infos: pb.LogState{
diff --git a/pkg/hakeeper/checkers/logservice/parse_test.go b/pkg/hakeeper/checkers/logservice/parse_test.go
index 19288eb686eacea44650e01557537245c47fe151..e89d5a15c1ddd56908f54531ca49d5dfbee3b7d4 100644
--- a/pkg/hakeeper/checkers/logservice/parse_test.go
+++ b/pkg/hakeeper/checkers/logservice/parse_test.go
@@ -41,7 +41,6 @@ func TestFixedLogShardInfo(t *testing.T) {
 			record: metadata.LogShardRecord{
 				ShardID:          1,
 				NumberOfReplicas: 3,
-				Name:             "shard-1",
 			},
 			info: pb.LogShardInfo{
 				ShardID:  1,
@@ -58,7 +57,6 @@ func TestFixedLogShardInfo(t *testing.T) {
 			record: metadata.LogShardRecord{
 				ShardID:          1,
 				NumberOfReplicas: 3,
-				Name:             "shard-1",
 			},
 			info: pb.LogShardInfo{
 				ShardID:  1,
@@ -75,7 +73,6 @@ func TestFixedLogShardInfo(t *testing.T) {
 			record: metadata.LogShardRecord{
 				ShardID:          1,
 				NumberOfReplicas: 3,
-				Name:             "shard-1",
 			},
 			info: pb.LogShardInfo{
 				ShardID:  1,
@@ -109,8 +106,7 @@ func TestCollectStats(t *testing.T) {
 				DNShards: nil,
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
-					NumberOfReplicas: 3,
-					Name:             ""}}},
+					NumberOfReplicas: 3}}},
 			infos: pb.LogState{
 				Shards: map[uint64]pb.LogShardInfo{
 					1: {
@@ -157,8 +153,7 @@ func TestCollectStats(t *testing.T) {
 				DNShards: nil,
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
-					NumberOfReplicas: 3,
-					Name:             ""}}},
+					NumberOfReplicas: 3}}},
 			infos: pb.LogState{
 				Shards: map[uint64]pb.LogShardInfo{1: {
 					ShardID:  1,
@@ -326,8 +321,7 @@ func TestCollectStats(t *testing.T) {
 				DNShards: nil,
 				LogShards: []metadata.LogShardRecord{{
 					ShardID:          1,
-					NumberOfReplicas: 3,
-					Name:             ""}}},
+					NumberOfReplicas: 3}}},
 			infos: pb.LogState{
 				Shards: map[uint64]pb.LogShardInfo{
 					1: {
diff --git a/pkg/logservice/commands.go b/pkg/logservice/commands.go
index 7c4be1c73e4dc2ec6ea2854cdf9dd2b272d03bcb..17ed7b099d6a37b05d23ec421073cc5c098c81dd 100644
--- a/pkg/logservice/commands.go
+++ b/pkg/logservice/commands.go
@@ -32,6 +32,8 @@ func (s *Service) handleCommands(cmds []pb.ScheduleCommand) {
 			case pb.AddReplica:
 				s.handleAddReplica(cmd)
 			case pb.RemoveReplica:
+				// FIXME: when remove replica cmd is received, we need to stop the zombie
+				// replica running on the local store.
 				s.handleRemoveReplica(cmd)
 			case pb.StartReplica:
 				s.handleStartReplica(cmd)
diff --git a/pkg/logservice/service.go b/pkg/logservice/service.go
index fc7d5ac36117b5bd97e915eeddd989c868919c83..4422a64aa6954b1e53020b6be7d90470e97134af 100644
--- a/pkg/logservice/service.go
+++ b/pkg/logservice/service.go
@@ -76,6 +76,12 @@ func NewService(cfg Config) (*Service, error) {
 		plog.Errorf("failed to create log store %v", err)
 		return nil, err
 	}
+	if err := store.loadMetadata(); err != nil {
+		return nil, err
+	}
+	if err := store.startReplicas(); err != nil {
+		return nil, err
+	}
 	pool := &sync.Pool{}
 	pool.New = func() interface{} {
 		return &RPCRequest{pool: pool}
@@ -125,6 +131,10 @@ func NewService(cfg Config) (*Service, error) {
 	return service, nil
 }
 
+func (s *Service) Start() error {
+	return nil
+}
+
 func (s *Service) Close() (err error) {
 	s.stopper.Stop()
 	if s.haClient != nil {
diff --git a/pkg/logservice/store.go b/pkg/logservice/store.go
index 862747ab21fe12d1f15f6fdebbe0d409a43bcca5..3e18a9fa32dd8ccc7ddb7f06dcd28b72916e4f5f 100644
--- a/pkg/logservice/store.go
+++ b/pkg/logservice/store.go
@@ -33,6 +33,7 @@ import (
 	"github.com/matrixorigin/matrixone/pkg/hakeeper/bootstrap"
 	"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers"
 	pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
+	"github.com/matrixorigin/matrixone/pkg/pb/metadata"
 )
 
 var (
@@ -118,6 +119,7 @@ type store struct {
 		sync.Mutex
 		truncateCh      chan struct{}
 		pendingTruncate map[uint64]struct{}
+		metadata        metadata.LogStore
 	}
 }
 
@@ -135,6 +137,7 @@ func newLogStore(cfg Config) (*store, error) {
 	}
 	ls.mu.truncateCh = make(chan struct{})
 	ls.mu.pendingTruncate = make(map[uint64]struct{})
+	ls.mu.metadata = metadata.LogStore{UUID: cfg.UUID}
 	if err := ls.stopper.RunTask(func(ctx context.Context) {
 		ls.truncationWorker(ctx)
 	}); err != nil {
@@ -155,6 +158,26 @@ func (l *store) id() string {
 	return l.nh.ID()
 }
 
+func (l *store) startReplicas() error {
+	l.mu.Lock()
+	shards := make([]metadata.LogShard, 0)
+	shards = append(shards, l.mu.metadata.Shards...)
+	l.mu.Unlock()
+
+	for _, rec := range shards {
+		if rec.ShardID == hakeeper.DefaultHAKeeperShardID {
+			if err := l.startHAKeeperReplica(rec.ReplicaID, nil, false); err != nil {
+				return err
+			}
+		} else {
+			if err := l.startReplica(rec.ShardID, rec.ReplicaID, nil, false); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
 func (l *store) startHAKeeperReplica(replicaID uint64,
 	initialReplicas map[uint64]dragonboat.Target, join bool) error {
 	raftConfig := getRaftConfig(hakeeper.DefaultHAKeeperShardID, replicaID)
@@ -162,6 +185,7 @@ func (l *store) startHAKeeperReplica(replicaID uint64,
 		join, hakeeper.NewStateMachine, raftConfig); err != nil {
 		return err
 	}
+	l.addMetadata(hakeeper.DefaultHAKeeperShardID, replicaID)
 	atomic.StoreUint64(&l.haKeeperReplicaID, replicaID)
 	if !l.cfg.DisableWorkers {
 		if err := l.stopper.RunTask(func(ctx context.Context) {
@@ -179,8 +203,12 @@ func (l *store) startReplica(shardID uint64, replicaID uint64,
 	if shardID == hakeeper.DefaultHAKeeperShardID {
 		return ErrInvalidShardID
 	}
-	raftConfig := getRaftConfig(shardID, replicaID)
-	return l.nh.StartReplica(initialReplicas, join, newStateMachine, raftConfig)
+	cfg := getRaftConfig(shardID, replicaID)
+	if err := l.nh.StartReplica(initialReplicas, join, newStateMachine, cfg); err != nil {
+		return err
+	}
+	l.addMetadata(shardID, replicaID)
+	return nil
 }
 
 func (l *store) stopReplica(shardID uint64, replicaID uint64) error {
@@ -225,11 +253,13 @@ func (l *store) removeReplica(shardID uint64, replicaID uint64, cci uint64) erro
 				l.retryWait()
 				continue
 			}
+			// FIXME: internally handle dragonboat.ErrTimeoutTooSmall
 			if errors.Is(err, dragonboat.ErrTimeoutTooSmall) && count > 1 {
 				return dragonboat.ErrTimeout
 			}
 			return err
 		}
+		l.removeMetadata(shardID, replicaID)
 		return nil
 	}
 }
diff --git a/pkg/logservice/store_metadata.go b/pkg/logservice/store_metadata.go
new file mode 100644
index 0000000000000000000000000000000000000000..ee8ca1ffa5759c0a1d866cb3126b4625c3e8cb91
--- /dev/null
+++ b/pkg/logservice/store_metadata.go
@@ -0,0 +1,277 @@
+// Copyright 2021 - 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logservice
+
+import (
+	"bytes"
+	"crypto/md5"
+	"fmt"
+	"io"
+	"path/filepath"
+	"runtime"
+
+	"github.com/cockroachdb/errors/oserror"
+	"github.com/lni/vfs"
+
+	"github.com/matrixorigin/matrixone/pkg/pb/metadata"
+)
+
+const (
+	logMetadataFilename = "mo-logservice.metadata"
+	defaultDirFileMode  = 0750
+)
+
+func ws(err error) error {
+	return err
+}
+
+func dirExist(name string, fs vfs.FS) (result bool, err error) {
+	if name == "." || name == "/" {
+		return true, nil
+	}
+	f, err := fs.OpenDir(name)
+	if err != nil && oserror.IsNotExist(err) {
+		return false, nil
+	}
+	if err != nil {
+		return false, err
+	}
+	defer func() {
+		err = firstError(err, ws(f.Close()))
+	}()
+	s, err := f.Stat()
+	if err != nil {
+		return false, ws(err)
+	}
+	if !s.IsDir() {
+		panic("not a dir")
+	}
+	return true, nil
+}
+
+func mkdirAll(dir string, fs vfs.FS) error {
+	exist, err := dirExist(dir, fs)
+	if err != nil {
+		return err
+	}
+	if exist {
+		return nil
+	}
+	parent := fs.PathDir(dir)
+	exist, err = dirExist(parent, fs)
+	if err != nil {
+		return err
+	}
+	if !exist {
+		if err := mkdirAll(parent, fs); err != nil {
+			return err
+		}
+	}
+	return mkdir(dir, fs)
+}
+
+func mkdir(dir string, fs vfs.FS) error {
+	parent := fs.PathDir(dir)
+	exist, err := dirExist(parent, fs)
+	if err != nil {
+		return err
+	}
+	if !exist {
+		panic(fmt.Sprintf("%s doesn't exist when creating %s", parent, dir))
+	}
+	if err := fs.MkdirAll(dir, defaultDirFileMode); err != nil {
+		return err
+	}
+	return syncDir(parent, fs)
+}
+
+func syncDir(dir string, fs vfs.FS) (err error) {
+	if runtime.GOOS == "windows" {
+		return nil
+	}
+	if dir == "." {
+		return nil
+	}
+	f, err := fs.OpenDir(dir)
+	if err != nil {
+		return err
+	}
+	defer func() {
+		err = firstError(err, ws(f.Close()))
+	}()
+	fileInfo, err := f.Stat()
+	if err != nil {
+		return ws(err)
+	}
+	if !fileInfo.IsDir() {
+		panic("not a dir")
+	}
+	df, err := fs.OpenDir(filepath.Clean(dir))
+	if err != nil {
+		return err
+	}
+	defer func() {
+		err = firstError(err, ws(df.Close()))
+	}()
+	return ws(df.Sync())
+}
+
+func getHash(data []byte) []byte {
+	h := md5.New()
+	if _, err := h.Write(data); err != nil {
+		panic(err)
+	}
+	s := h.Sum(nil)
+	return s[8:]
+}
+
+func exist(name string, fs vfs.FS) (bool, error) {
+	if name == "." || name == "/" {
+		return true, nil
+	}
+	_, err := fs.Stat(name)
+	if err != nil && oserror.IsNotExist(err) {
+		return false, nil
+	}
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func createMetadataFile(dir string,
+	filename string, obj Marshaler, fs vfs.FS) (err error) {
+	de, err := dirExist(dir, fs)
+	if err != nil {
+		return err
+	}
+	if !de {
+		if err := mkdirAll(dir, fs); err != nil {
+			return err
+		}
+	}
+	tmp := fs.PathJoin(dir, fmt.Sprintf("%s.tmp", filename))
+	fp := fs.PathJoin(dir, filename)
+	f, err := fs.Create(tmp)
+	if err != nil {
+		return err
+	}
+	defer func() {
+		err = firstError(err, f.Close())
+		err = firstError(err, syncDir(dir, fs))
+	}()
+	data := MustMarshal(obj)
+	h := getHash(data)
+	n, err := f.Write(h)
+	if err != nil {
+		return ws(err)
+	}
+	if n != len(h) {
+		return ws(io.ErrShortWrite)
+	}
+	n, err = f.Write(data)
+	if err != nil {
+		return ws(err)
+	}
+	if n != len(data) {
+		return ws(io.ErrShortWrite)
+	}
+	if err := ws(f.Sync()); err != nil {
+		return err
+	}
+	return fs.Rename(tmp, fp)
+}
+
+func readMetadataFile(dir string,
+	filename string, obj Unmarshaler, fs vfs.FS) (err error) {
+	fp := fs.PathJoin(dir, filename)
+	f, err := fs.Open(filepath.Clean(fp))
+	if err != nil {
+		return err
+	}
+	defer func() {
+		err = firstError(err, ws(f.Close()))
+	}()
+	data, err := io.ReadAll(f)
+	if err != nil {
+		return ws(err)
+	}
+	if len(data) < 8 {
+		panic("corrupted flag file")
+	}
+	h := data[:8]
+	buf := data[8:]
+	expectedHash := getHash(buf)
+	if !bytes.Equal(h, expectedHash) {
+		panic("corrupted flag file content")
+	}
+	MustUnmarshal(obj, buf)
+	return nil
+}
+
+func (l *store) loadMetadata() error {
+	fs := l.cfg.FS
+	dir := l.cfg.DataDir
+	fp := fs.PathJoin(dir, logMetadataFilename)
+	found, err := exist(fp, fs)
+	if err != nil {
+		return err
+	}
+	if !found {
+		return nil
+	}
+	expectedUUID := l.mu.metadata.UUID
+	if err := readMetadataFile(dir, logMetadataFilename, &l.mu.metadata, fs); err != nil {
+		return err
+	}
+	if expectedUUID != l.mu.metadata.UUID {
+		plog.Panicf("unexpected UUID, on disk UUID %s, expect %s",
+			l.mu.metadata.UUID, expectedUUID)
+	}
+	return nil
+}
+
+func (l *store) mustSaveMetadata() {
+	fs := l.cfg.FS
+	dir := l.cfg.DataDir
+	if err := createMetadataFile(dir, logMetadataFilename, &l.mu.metadata, fs); err != nil {
+		plog.Panicf("failed to save metadata file: %v", err)
+	}
+}
+
+func (l *store) addMetadata(shardID uint64, replicaID uint64) {
+	rec := metadata.LogShard{}
+	rec.ShardID = shardID
+	rec.ReplicaID = replicaID
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	l.mu.metadata.Shards = append(l.mu.metadata.Shards, rec)
+	l.mustSaveMetadata()
+}
+
+func (l *store) removeMetadata(shardID uint64, replicaID uint64) {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	shards := make([]metadata.LogShard, 0)
+	for _, rec := range l.mu.metadata.Shards {
+		if rec.ShardID != shardID || rec.ReplicaID != replicaID {
+			shards = append(shards, rec)
+		}
+	}
+	l.mu.metadata.Shards = shards
+	l.mustSaveMetadata()
+}
diff --git a/pkg/logservice/store_metadata_test.go b/pkg/logservice/store_metadata_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..8f087df5db3039e2221c033c8fc6cfc5585bfcfb
--- /dev/null
+++ b/pkg/logservice/store_metadata_test.go
@@ -0,0 +1,97 @@
+// Copyright 2021 - 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logservice
+
+import (
+	"testing"
+	"time"
+
+	"github.com/lni/dragonboat/v4"
+	"github.com/lni/vfs"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	"github.com/matrixorigin/matrixone/pkg/pb/metadata"
+)
+
+func TestAddMetadata(t *testing.T) {
+	cfg := getStoreTestConfig()
+	defer vfs.ReportLeakedFD(cfg.FS, t)
+	cfg.Fill()
+	s := store{cfg: cfg}
+	require.NoError(t, mkdirAll(s.cfg.DataDir, cfg.FS))
+	s.addMetadata(10, 1)
+	ss := store{cfg: s.cfg}
+	ss.mu.metadata = metadata.LogStore{}
+	assert.NoError(t, ss.loadMetadata())
+	require.Equal(t, 1, len(ss.mu.metadata.Shards))
+	assert.Equal(t, uint64(10), ss.mu.metadata.Shards[0].ShardID)
+	assert.Equal(t, uint64(1), ss.mu.metadata.Shards[0].ReplicaID)
+}
+
+func TestRemoveMetadata(t *testing.T) {
+	cfg := getStoreTestConfig()
+	defer vfs.ReportLeakedFD(cfg.FS, t)
+	cfg.Fill()
+	s := store{cfg: cfg}
+	require.NoError(t, mkdirAll(s.cfg.DataDir, cfg.FS))
+	s.addMetadata(10, 1)
+	s.addMetadata(20, 2)
+	s.removeMetadata(10, 1)
+	ss := store{cfg: s.cfg}
+	ss.mu.metadata = metadata.LogStore{}
+	assert.NoError(t, ss.loadMetadata())
+	require.Equal(t, 1, len(ss.mu.metadata.Shards))
+	assert.Equal(t, uint64(20), ss.mu.metadata.Shards[0].ShardID)
+	assert.Equal(t, uint64(2), ss.mu.metadata.Shards[0].ReplicaID)
+}
+
+func TestStartReplicas(t *testing.T) {
+	cfg := getStoreTestConfig()
+	defer vfs.ReportLeakedFD(cfg.FS, t)
+	cfg.Fill()
+	require.NoError(t, mkdirAll(cfg.DataDir, cfg.FS))
+	func() {
+		store, err := getTestStore(cfg, false)
+		require.NoError(t, err)
+		members := make(map[uint64]dragonboat.Target)
+		members[1] = store.id()
+		defer func() {
+			require.NoError(t, store.close())
+		}()
+		require.NoError(t, store.startReplica(10, 1, members, false))
+		require.NoError(t, store.startReplica(20, 1, members, false))
+	}()
+
+	store, err := getTestStore(cfg, false)
+	require.NoError(t, err)
+	defer func() {
+		require.NoError(t, store.close())
+	}()
+	require.NoError(t, store.loadMetadata())
+	require.NoError(t, store.startReplicas())
+	done := false
+	for i := 0; i < 1000; i++ {
+		hb := store.getHeartbeatMessage()
+		if len(hb.Replicas) != 2 {
+			time.Sleep(10 * time.Millisecond)
+			continue
+		}
+		done = true
+	}
+	if !done {
+		t.Fatalf("failed to start all replicas")
+	}
+}
diff --git a/pkg/pb/metadata/metadata.pb.go b/pkg/pb/metadata/metadata.pb.go
index 2a72939dcebf85479b2f3bfbd8ed8295e6c07186..92d193766ad1a9803f69dc88ec3e1afacf7c4167 100644
--- a/pkg/pb/metadata/metadata.pb.go
+++ b/pkg/pb/metadata/metadata.pb.go
@@ -153,9 +153,7 @@ type LogShardRecord struct {
 	// ShardID is the id of the Log Shard.
 	ShardID uint64 `protobuf:"varint,1,opt,name=ShardID,proto3" json:"ShardID,omitempty"`
 	// NumberOfReplicas is the number of replicas in the shard.
-	NumberOfReplicas uint64 `protobuf:"varint,2,opt,name=NumberOfReplicas,proto3" json:"NumberOfReplicas,omitempty"`
-	// Name is the human readable name of the shard given by the DN.
-	Name                 string   `protobuf:"bytes,3,opt,name=Name,proto3" json:"Name,omitempty"`
+	NumberOfReplicas     uint64   `protobuf:"varint,2,opt,name=NumberOfReplicas,proto3" json:"NumberOfReplicas,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -208,11 +206,55 @@ func (m *LogShardRecord) GetNumberOfReplicas() uint64 {
 	return 0
 }
 
-func (m *LogShardRecord) GetName() string {
+// LogShard
+type LogShard struct {
+	// LogShard extends LogShardRecord
+	LogShardRecord `protobuf:"bytes,1,opt,name=LogShardRecord,proto3,embedded=LogShardRecord" json:"LogShardRecord"`
+	// ReplicaID is the replica ID of the replica running on the LogStore.
+	ReplicaID            uint64   `protobuf:"varint,2,opt,name=ReplicaID,proto3" json:"ReplicaID,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *LogShard) Reset()         { *m = LogShard{} }
+func (m *LogShard) String() string { return proto.CompactTextString(m) }
+func (*LogShard) ProtoMessage()    {}
+func (*LogShard) Descriptor() ([]byte, []int) {
+	return fileDescriptor_56d9f74966f40d04, []int{3}
+}
+func (m *LogShard) XXX_Unmarshal(b []byte) error {
+	return m.Unmarshal(b)
+}
+func (m *LogShard) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	if deterministic {
+		return xxx_messageInfo_LogShard.Marshal(b, m, deterministic)
+	} else {
+		b = b[:cap(b)]
+		n, err := m.MarshalToSizedBuffer(b)
+		if err != nil {
+			return nil, err
+		}
+		return b[:n], nil
+	}
+}
+func (m *LogShard) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_LogShard.Merge(m, src)
+}
+func (m *LogShard) XXX_Size() int {
+	return m.Size()
+}
+func (m *LogShard) XXX_DiscardUnknown() {
+	xxx_messageInfo_LogShard.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LogShard proto.InternalMessageInfo
+
+func (m *LogShard) GetReplicaID() uint64 {
 	if m != nil {
-		return m.Name
+		return m.ReplicaID
 	}
-	return ""
+	return 0
 }
 
 // DNStore DN store metadata
@@ -230,7 +272,7 @@ func (m *DNStore) Reset()         { *m = DNStore{} }
 func (m *DNStore) String() string { return proto.CompactTextString(m) }
 func (*DNStore) ProtoMessage()    {}
 func (*DNStore) Descriptor() ([]byte, []int) {
-	return fileDescriptor_56d9f74966f40d04, []int{3}
+	return fileDescriptor_56d9f74966f40d04, []int{4}
 }
 func (m *DNStore) XXX_Unmarshal(b []byte) error {
 	return m.Unmarshal(b)
@@ -273,37 +315,99 @@ func (m *DNStore) GetShards() []DNShard {
 	return nil
 }
 
+// LogStore is for the metadata for Log store.
+type LogStore struct {
+	// UUID is the id of the Log store.
+	UUID string `protobuf:"bytes,1,opt,name=UUID,proto3" json:"UUID,omitempty"`
+	// Shards is for Log shards metadata.
+	Shards               []LogShard `protobuf:"bytes,2,rep,name=Shards,proto3" json:"Shards"`
+	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
+	XXX_unrecognized     []byte     `json:"-"`
+	XXX_sizecache        int32      `json:"-"`
+}
+
+func (m *LogStore) Reset()         { *m = LogStore{} }
+func (m *LogStore) String() string { return proto.CompactTextString(m) }
+func (*LogStore) ProtoMessage()    {}
+func (*LogStore) Descriptor() ([]byte, []int) {
+	return fileDescriptor_56d9f74966f40d04, []int{5}
+}
+func (m *LogStore) XXX_Unmarshal(b []byte) error {
+	return m.Unmarshal(b)
+}
+func (m *LogStore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	if deterministic {
+		return xxx_messageInfo_LogStore.Marshal(b, m, deterministic)
+	} else {
+		b = b[:cap(b)]
+		n, err := m.MarshalToSizedBuffer(b)
+		if err != nil {
+			return nil, err
+		}
+		return b[:n], nil
+	}
+}
+func (m *LogStore) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_LogStore.Merge(m, src)
+}
+func (m *LogStore) XXX_Size() int {
+	return m.Size()
+}
+func (m *LogStore) XXX_DiscardUnknown() {
+	xxx_messageInfo_LogStore.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LogStore proto.InternalMessageInfo
+
+func (m *LogStore) GetUUID() string {
+	if m != nil {
+		return m.UUID
+	}
+	return ""
+}
+
+func (m *LogStore) GetShards() []LogShard {
+	if m != nil {
+		return m.Shards
+	}
+	return nil
+}
+
 func init() {
 	proto.RegisterType((*DNShardRecord)(nil), "metadata.DNShardRecord")
 	proto.RegisterType((*DNShard)(nil), "metadata.DNShard")
 	proto.RegisterType((*LogShardRecord)(nil), "metadata.LogShardRecord")
+	proto.RegisterType((*LogShard)(nil), "metadata.LogShard")
 	proto.RegisterType((*DNStore)(nil), "metadata.DNStore")
+	proto.RegisterType((*LogStore)(nil), "metadata.LogStore")
 }
 
 func init() { proto.RegisterFile("metadata.proto", fileDescriptor_56d9f74966f40d04) }
 
 var fileDescriptor_56d9f74966f40d04 = []byte{
-	// 317 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x51, 0xcd, 0x4a, 0x03, 0x31,
-	0x10, 0x36, 0xb6, 0xf4, 0x27, 0xc5, 0xa2, 0xb9, 0x58, 0x44, 0xb6, 0x65, 0x4f, 0x45, 0xb0, 0xc1,
-	0xfa, 0x00, 0x62, 0x29, 0x48, 0x41, 0x56, 0x88, 0xf4, 0xe2, 0x2d, 0xdb, 0x4d, 0xd3, 0x55, 0xb7,
-	0x59, 0xb2, 0x59, 0xf0, 0x19, 0x7c, 0xb2, 0x1e, 0xfb, 0x04, 0x45, 0xf6, 0x49, 0x64, 0xc7, 0xa4,
-	0xd6, 0xf6, 0xe0, 0x6d, 0xbe, 0xf9, 0x26, 0xdf, 0xf7, 0x4d, 0x06, 0xb7, 0x13, 0x61, 0x78, 0xc4,
-	0x0d, 0x1f, 0xa4, 0x5a, 0x19, 0x45, 0x1a, 0x0e, 0x5f, 0x5c, 0xcb, 0xd8, 0x2c, 0xf2, 0x70, 0x30,
-	0x53, 0x09, 0x95, 0x4a, 0x2a, 0x0a, 0x03, 0x61, 0x3e, 0x07, 0x04, 0x00, 0xaa, 0x9f, 0x87, 0xfe,
-	0x04, 0x9f, 0x8c, 0x83, 0xe7, 0x05, 0xd7, 0x11, 0x13, 0x33, 0xa5, 0x23, 0xd2, 0xc1, 0x75, 0x80,
-	0x93, 0x71, 0x07, 0xf5, 0x50, 0xbf, 0xca, 0x1c, 0x24, 0x1e, 0xc6, 0x8f, 0x4a, 0x3a, 0xf2, 0x18,
-	0xc8, 0x9d, 0x8e, 0xff, 0x89, 0x70, 0xdd, 0x6a, 0x91, 0x87, 0x3d, 0x59, 0xd0, 0x6a, 0x0d, 0xcf,
-	0x07, 0xdb, 0xdc, 0x7f, 0xe8, 0x51, 0x63, 0xb5, 0xe9, 0x1e, 0xad, 0x37, 0x5d, 0xc4, 0xf6, 0xe2,
-	0x5c, 0xe2, 0x26, 0x13, 0xe9, 0x7b, 0x3c, 0xe3, 0x5b, 0xcf, 0xdf, 0x46, 0x19, 0xf6, 0x3e, 0x8a,
-	0xb4, 0xc8, 0xb2, 0x4e, 0xa5, 0x87, 0xfa, 0x4d, 0xe6, 0xa0, 0xff, 0x8a, 0xdb, 0x2e, 0xda, 0xbf,
-	0x8b, 0x5d, 0xe1, 0xd3, 0x20, 0x4f, 0x42, 0xa1, 0x9f, 0xe6, 0x56, 0x3a, 0xb3, 0x56, 0x07, 0x7d,
-	0x42, 0x70, 0x35, 0xe0, 0x89, 0xb0, 0x76, 0x50, 0xfb, 0x01, 0xec, 0x6d, 0x94, 0x16, 0x25, 0x3d,
-	0x9d, 0x5a, 0x87, 0x26, 0x83, 0x9a, 0x50, 0x5c, 0x03, 0xa7, 0x52, 0xb4, 0xd2, 0x6f, 0x0d, 0xcf,
-	0x0e, 0x3e, 0x61, 0x54, 0x2d, 0xd7, 0x67, 0x76, 0x6c, 0x74, 0xb7, 0x2a, 0x3c, 0xb4, 0x2e, 0x3c,
-	0xf4, 0x55, 0x78, 0xe8, 0xe5, 0x66, 0xe7, 0xa0, 0x09, 0x37, 0x3a, 0xfe, 0x50, 0x3a, 0x96, 0xf1,
-	0xd2, 0x81, 0xa5, 0xa0, 0xe9, 0x9b, 0xa4, 0x69, 0x48, 0x9d, 0x6c, 0x58, 0x83, 0xdb, 0xde, 0x7e,
-	0x07, 0x00, 0x00, 0xff, 0xff, 0x5b, 0xd0, 0xb2, 0x72, 0x26, 0x02, 0x00, 0x00,
+	// 347 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xdf, 0x4a, 0x02, 0x41,
+	0x14, 0xc6, 0x9b, 0x14, 0xff, 0x1c, 0x49, 0x6a, 0x6e, 0x5a, 0x22, 0x56, 0xd9, 0x2b, 0x09, 0x72,
+	0xcb, 0x1e, 0x20, 0x12, 0x21, 0x8c, 0xb0, 0x98, 0xb0, 0x8b, 0xee, 0x66, 0xdd, 0x71, 0x5c, 0x6a,
+	0x9d, 0x65, 0x1c, 0xa1, 0x67, 0xe8, 0xc9, 0xbc, 0xf4, 0x09, 0x24, 0x7c, 0x92, 0x70, 0x9c, 0xf1,
+	0xcf, 0x2e, 0xd5, 0xdd, 0x7c, 0xe7, 0xcc, 0xfc, 0xce, 0xf7, 0x0d, 0x07, 0xaa, 0x31, 0x53, 0x34,
+	0xa4, 0x8a, 0x36, 0x13, 0x29, 0x94, 0xc0, 0x25, 0xab, 0xcf, 0x2e, 0x79, 0xa4, 0x46, 0xd3, 0xa0,
+	0x39, 0x10, 0xb1, 0xcf, 0x05, 0x17, 0xbe, 0xbe, 0x10, 0x4c, 0x87, 0x5a, 0x69, 0xa1, 0x4f, 0xeb,
+	0x87, 0x5e, 0x17, 0x8e, 0x3a, 0xbd, 0x97, 0x11, 0x95, 0x21, 0x61, 0x03, 0x21, 0x43, 0xec, 0x40,
+	0x51, 0xcb, 0x6e, 0xc7, 0x41, 0x75, 0xd4, 0xc8, 0x13, 0x2b, 0xb1, 0x0b, 0xf0, 0x28, 0xb8, 0x6d,
+	0x1e, 0xea, 0xe6, 0x4e, 0xc5, 0xfb, 0x42, 0x50, 0x34, 0x2c, 0x7c, 0x9f, 0xc2, 0x6a, 0x56, 0xa5,
+	0x75, 0xda, 0xdc, 0xf8, 0xde, 0x6b, 0xb7, 0x4b, 0xb3, 0x45, 0xed, 0x60, 0xbe, 0xa8, 0x21, 0x92,
+	0xb2, 0x73, 0x0e, 0x65, 0xc2, 0x92, 0x8f, 0x68, 0x40, 0x37, 0x33, 0xb7, 0x85, 0x95, 0xd9, 0xbb,
+	0x30, 0x94, 0x6c, 0x32, 0x71, 0x72, 0x75, 0xd4, 0x28, 0x13, 0x2b, 0xbd, 0x57, 0xa8, 0x5a, 0x6b,
+	0xff, 0x06, 0xbb, 0x80, 0xe3, 0xde, 0x34, 0x0e, 0x98, 0x7c, 0x1a, 0x1a, 0xf4, 0xc4, 0x8c, 0xca,
+	0xd4, 0x3d, 0x05, 0x25, 0xcb, 0xc5, 0x0f, 0xe9, 0x19, 0x26, 0xa5, 0xb3, 0x4d, 0xb9, 0xdf, 0xdf,
+	0x89, 0x99, 0x76, 0xf7, 0x67, 0x4e, 0xaf, 0xa7, 0x7f, 0x56, 0x09, 0xc9, 0x30, 0x86, 0x7c, 0xbf,
+	0x6f, 0x32, 0x94, 0x89, 0x3e, 0x63, 0x1f, 0x0a, 0x9a, 0xb5, 0xb2, 0x9d, 0x6b, 0x54, 0x5a, 0x27,
+	0x99, 0x6f, 0x6e, 0xe7, 0x57, 0x93, 0x89, 0xb9, 0xe6, 0x3d, 0xaf, 0x53, 0xfc, 0x0a, 0xbc, 0x4a,
+	0x01, 0x71, 0x36, 0xd1, 0x3e, 0xb1, 0x7d, 0x3b, 0x5b, 0xba, 0x68, 0xbe, 0x74, 0xd1, 0xf7, 0xd2,
+	0x45, 0x6f, 0xd7, 0x3b, 0x4b, 0x18, 0x53, 0x25, 0xa3, 0x4f, 0x21, 0x23, 0x1e, 0x8d, 0xad, 0x18,
+	0x33, 0x3f, 0x79, 0xe7, 0x7e, 0x12, 0xf8, 0x96, 0x1b, 0x14, 0xf4, 0x3e, 0xde, 0xfc, 0x04, 0x00,
+	0x00, 0xff, 0xff, 0x2d, 0x13, 0xf1, 0xfd, 0xda, 0x02, 0x00, 0x00,
 }
 
 func (m *DNShardRecord) Marshal() (dAtA []byte, err error) {
@@ -416,13 +520,6 @@ func (m *LogShardRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 		i -= len(m.XXX_unrecognized)
 		copy(dAtA[i:], m.XXX_unrecognized)
 	}
-	if len(m.Name) > 0 {
-		i -= len(m.Name)
-		copy(dAtA[i:], m.Name)
-		i = encodeVarintMetadata(dAtA, i, uint64(len(m.Name)))
-		i--
-		dAtA[i] = 0x1a
-	}
 	if m.NumberOfReplicas != 0 {
 		i = encodeVarintMetadata(dAtA, i, uint64(m.NumberOfReplicas))
 		i--
@@ -436,6 +533,48 @@ func (m *LogShardRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 	return len(dAtA) - i, nil
 }
 
+func (m *LogShard) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalToSizedBuffer(dAtA[:size])
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *LogShard) MarshalTo(dAtA []byte) (int, error) {
+	size := m.Size()
+	return m.MarshalToSizedBuffer(dAtA[:size])
+}
+
+func (m *LogShard) MarshalToSizedBuffer(dAtA []byte) (int, error) {
+	i := len(dAtA)
+	_ = i
+	var l int
+	_ = l
+	if m.XXX_unrecognized != nil {
+		i -= len(m.XXX_unrecognized)
+		copy(dAtA[i:], m.XXX_unrecognized)
+	}
+	if m.ReplicaID != 0 {
+		i = encodeVarintMetadata(dAtA, i, uint64(m.ReplicaID))
+		i--
+		dAtA[i] = 0x10
+	}
+	{
+		size, err := m.LogShardRecord.MarshalToSizedBuffer(dAtA[:i])
+		if err != nil {
+			return 0, err
+		}
+		i -= size
+		i = encodeVarintMetadata(dAtA, i, uint64(size))
+	}
+	i--
+	dAtA[i] = 0xa
+	return len(dAtA) - i, nil
+}
+
 func (m *DNStore) Marshal() (dAtA []byte, err error) {
 	size := m.Size()
 	dAtA = make([]byte, size)
@@ -484,6 +623,54 @@ func (m *DNStore) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 	return len(dAtA) - i, nil
 }
 
+func (m *LogStore) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalToSizedBuffer(dAtA[:size])
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *LogStore) MarshalTo(dAtA []byte) (int, error) {
+	size := m.Size()
+	return m.MarshalToSizedBuffer(dAtA[:size])
+}
+
+func (m *LogStore) MarshalToSizedBuffer(dAtA []byte) (int, error) {
+	i := len(dAtA)
+	_ = i
+	var l int
+	_ = l
+	if m.XXX_unrecognized != nil {
+		i -= len(m.XXX_unrecognized)
+		copy(dAtA[i:], m.XXX_unrecognized)
+	}
+	if len(m.Shards) > 0 {
+		for iNdEx := len(m.Shards) - 1; iNdEx >= 0; iNdEx-- {
+			{
+				size, err := m.Shards[iNdEx].MarshalToSizedBuffer(dAtA[:i])
+				if err != nil {
+					return 0, err
+				}
+				i -= size
+				i = encodeVarintMetadata(dAtA, i, uint64(size))
+			}
+			i--
+			dAtA[i] = 0x12
+		}
+	}
+	if len(m.UUID) > 0 {
+		i -= len(m.UUID)
+		copy(dAtA[i:], m.UUID)
+		i = encodeVarintMetadata(dAtA, i, uint64(len(m.UUID)))
+		i--
+		dAtA[i] = 0xa
+	}
+	return len(dAtA) - i, nil
+}
+
 func encodeVarintMetadata(dAtA []byte, offset int, v uint64) int {
 	offset -= sovMetadata(v)
 	base := offset
@@ -546,17 +733,52 @@ func (m *LogShardRecord) Size() (n int) {
 	if m.NumberOfReplicas != 0 {
 		n += 1 + sovMetadata(uint64(m.NumberOfReplicas))
 	}
-	l = len(m.Name)
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func (m *LogShard) Size() (n int) {
+	if m == nil {
+		return 0
+	}
+	var l int
+	_ = l
+	l = m.LogShardRecord.Size()
+	n += 1 + l + sovMetadata(uint64(l))
+	if m.ReplicaID != 0 {
+		n += 1 + sovMetadata(uint64(m.ReplicaID))
+	}
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func (m *DNStore) Size() (n int) {
+	if m == nil {
+		return 0
+	}
+	var l int
+	_ = l
+	l = len(m.UUID)
 	if l > 0 {
 		n += 1 + l + sovMetadata(uint64(l))
 	}
+	if len(m.Shards) > 0 {
+		for _, e := range m.Shards {
+			l = e.Size()
+			n += 1 + l + sovMetadata(uint64(l))
+		}
+	}
 	if m.XXX_unrecognized != nil {
 		n += len(m.XXX_unrecognized)
 	}
 	return n
 }
 
-func (m *DNStore) Size() (n int) {
+func (m *LogStore) Size() (n int) {
 	if m == nil {
 		return 0
 	}
@@ -875,9 +1097,163 @@ func (m *LogShardRecord) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 3:
+		default:
+			iNdEx = preIndex
+			skippy, err := skipMetadata(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if (skippy < 0) || (iNdEx+skippy) < 0 {
+				return ErrInvalidLengthMetadata
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *LogShard) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowMetadata
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= uint64(b&0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: LogShard: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: LogShard: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
 			if wireType != 2 {
-				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
+				return fmt.Errorf("proto: wrong wireType = %d for field LogShardRecord", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowMetadata
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= int(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthMetadata
+			}
+			postIndex := iNdEx + msglen
+			if postIndex < 0 {
+				return ErrInvalidLengthMetadata
+			}
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if err := m.LogShardRecord.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field ReplicaID", wireType)
+			}
+			m.ReplicaID = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowMetadata
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				m.ReplicaID |= uint64(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			iNdEx = preIndex
+			skippy, err := skipMetadata(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if (skippy < 0) || (iNdEx+skippy) < 0 {
+				return ErrInvalidLengthMetadata
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *DNStore) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowMetadata
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= uint64(b&0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: DNStore: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: DNStore: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field UUID", wireType)
 			}
 			var stringLen uint64
 			for shift := uint(0); ; shift += 7 {
@@ -905,7 +1281,41 @@ func (m *LogShardRecord) Unmarshal(dAtA []byte) error {
 			if postIndex > l {
 				return io.ErrUnexpectedEOF
 			}
-			m.Name = string(dAtA[iNdEx:postIndex])
+			m.UUID = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Shards", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowMetadata
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= int(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthMetadata
+			}
+			postIndex := iNdEx + msglen
+			if postIndex < 0 {
+				return ErrInvalidLengthMetadata
+			}
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Shards = append(m.Shards, DNShard{})
+			if err := m.Shards[len(m.Shards)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
 			iNdEx = postIndex
 		default:
 			iNdEx = preIndex
@@ -929,7 +1339,7 @@ func (m *LogShardRecord) Unmarshal(dAtA []byte) error {
 	}
 	return nil
 }
-func (m *DNStore) Unmarshal(dAtA []byte) error {
+func (m *LogStore) Unmarshal(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
 	for iNdEx < l {
@@ -952,10 +1362,10 @@ func (m *DNStore) Unmarshal(dAtA []byte) error {
 		fieldNum := int32(wire >> 3)
 		wireType := int(wire & 0x7)
 		if wireType == 4 {
-			return fmt.Errorf("proto: DNStore: wiretype end group for non-group")
+			return fmt.Errorf("proto: LogStore: wiretype end group for non-group")
 		}
 		if fieldNum <= 0 {
-			return fmt.Errorf("proto: DNStore: illegal tag %d (wire type %d)", fieldNum, wire)
+			return fmt.Errorf("proto: LogStore: illegal tag %d (wire type %d)", fieldNum, wire)
 		}
 		switch fieldNum {
 		case 1:
@@ -1019,7 +1429,7 @@ func (m *DNStore) Unmarshal(dAtA []byte) error {
 			if postIndex > l {
 				return io.ErrUnexpectedEOF
 			}
-			m.Shards = append(m.Shards, DNShard{})
+			m.Shards = append(m.Shards, LogShard{})
 			if err := m.Shards[len(m.Shards)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
 				return err
 			}
diff --git a/pkg/pb/plan/plan.pb.go b/pkg/pb/plan/plan.pb.go
index cad988c399d807dbe417e7496be0de0500730e78..c20e000e5b0d8747c7b20303fa43b6e09e669ed1 100644
--- a/pkg/pb/plan/plan.pb.go
+++ b/pkg/pb/plan/plan.pb.go
@@ -896,6 +896,7 @@ func (m *Type) GetScale() int32 {
 type Const struct {
 	Isnull bool `protobuf:"varint,1,opt,name=isnull,proto3" json:"isnull,omitempty"`
 	// Types that are valid to be assigned to Value:
+	//
 	//	*Const_Ival
 	//	*Const_Dval
 	//	*Const_Sval
@@ -1684,6 +1685,7 @@ func (m *Function) GetArgs() []*Expr {
 type Expr struct {
 	Typ *Type `protobuf:"bytes,1,opt,name=typ,proto3" json:"typ,omitempty"`
 	// Types that are valid to be assigned to Expr:
+	//
 	//	*Expr_C
 	//	*Expr_P
 	//	*Expr_V
@@ -1933,6 +1935,7 @@ func (m *DefaultExpr) GetIsNull() bool {
 
 type ConstantValue struct {
 	// Types that are valid to be assigned to ConstantValue:
+	//
 	//	*ConstantValue_UnknownV
 	//	*ConstantValue_Int64V
 	//	*ConstantValue_Decimal64V
@@ -2623,6 +2626,7 @@ func (m *TableDef) GetDefs() []*TableDef_DefType {
 
 type TableDef_DefType struct {
 	// Types that are valid to be assigned to Def:
+	//
 	//	*TableDef_DefType_Pk
 	//	*TableDef_DefType_Idx
 	//	*TableDef_DefType_Properties
@@ -3647,9 +3651,10 @@ func (m *Query) GetHeadings() []string {
 }
 
 type TransationControl struct {
-	//TransationControl type
+	// TransationControl type
 	TclType TransationControl_TclType `protobuf:"varint,1,opt,name=tcl_type,json=tclType,proto3,enum=plan.TransationControl_TclType" json:"tcl_type,omitempty"`
 	// Types that are valid to be assigned to Action:
+	//
 	//	*TransationControl_Begin
 	//	*TransationControl_Commit
 	//	*TransationControl_Rollback
@@ -3899,6 +3904,7 @@ func (m *TransationRollback) GetCompletionType() TransationCompletionType {
 
 type Plan struct {
 	// Types that are valid to be assigned to Plan:
+	//
 	//	*Plan_Query
 	//	*Plan_Tcl
 	//	*Plan_Ddl
@@ -4012,9 +4018,10 @@ func (*Plan) XXX_OneofWrappers() []interface{} {
 }
 
 type DataControl struct {
-	//DataDefinition type
+	// DataDefinition type
 	DclType DataControl_DclType `protobuf:"varint,1,opt,name=dcl_type,json=dclType,proto3,enum=plan.DataControl_DclType" json:"dcl_type,omitempty"`
 	// Types that are valid to be assigned to Control:
+	//
 	//	*DataControl_SetVariables
 	//	*DataControl_Prepare
 	//	*DataControl_Execute
@@ -4135,13 +4142,14 @@ func (*DataControl) XXX_OneofWrappers() []interface{} {
 }
 
 type DataDefinition struct {
-	//DataDefinition type
+	// DataDefinition type
 	DdlType DataDefinition_DdlType `protobuf:"varint,1,opt,name=ddl_type,json=ddlType,proto3,enum=plan.DataDefinition_DdlType" json:"ddl_type,omitempty"`
-	//other show statement we will rewrite to a select statement
-	//then we will get a Query
-	//eg: 'show databases' will rewrite to 'select md.datname as `Database` from mo_database md'
+	// other show statement we will rewrite to a select statement
+	// then we will get a Query
+	// eg: 'show databases' will rewrite to 'select md.datname as `Database` from mo_database md'
 	Query *Query `protobuf:"bytes,2,opt,name=query,proto3" json:"query,omitempty"`
 	// Types that are valid to be assigned to Definition:
+	//
 	//	*DataDefinition_CreateDatabase
 	//	*DataDefinition_AlterDatabase
 	//	*DataDefinition_DropDatabase
diff --git a/pkg/pb/txn/txn.pb.go b/pkg/pb/txn/txn.pb.go
index 50830ee4e0f487badfe725eb1aeff5342387d1ff..66e4c8866ac10becde2a6150087d0108a4a54568 100644
--- a/pkg/pb/txn/txn.pb.go
+++ b/pkg/pb/txn/txn.pb.go
@@ -526,9 +526,9 @@ func (m *CNOpResponse) GetPayload() []byte {
 // to TxnMethod.
 //
 // Request flow of TxnRequest as below:
-// 1. CN -> DN (TxnMethod.Read, TxnMethod.Write, TxnMethod.Commit, TxnMethod.Rollback)
-// 2. DN -> DN (TxnMethod.Prepare, TxnMethod.GetStatus, TxnMethod.CommitDNShard, TxnMethod.RollbackDNShard,
-//             TxnMethod.RemoveMetadata)
+//  1. CN -> DN (TxnMethod.Read, TxnMethod.Write, TxnMethod.Commit, TxnMethod.Rollback)
+//  2. DN -> DN (TxnMethod.Prepare, TxnMethod.GetStatus, TxnMethod.CommitDNShard, TxnMethod.RollbackDNShard,
+//     TxnMethod.RemoveMetadata)
 type TxnRequest struct {
 	// RequestID request id
 	RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
diff --git a/proto/metadata.proto b/proto/metadata.proto
index f23abc5302b692d18646515c8ccd26098ea1b508..daa46ce7835f0a897f7435b1e4cc6bd58b6adad7 100644
--- a/proto/metadata.proto
+++ b/proto/metadata.proto
@@ -28,7 +28,7 @@ message DNShardRecord {
   uint64 ShardID    = 1;
   // LogShardID a DN corresponds to a unique Shard of LogService.
   uint64 LogShardID = 2;
-};
+}
 
 // DNShard 
 message DNShard {
@@ -40,7 +40,7 @@ message DNShard {
   uint64 ReplicaID  = 2;
   // Address is DN's external service address.
   string Address    = 3;
-};
+}
 
 // LogShardRecord is Log Shard Metadata describing what is a Log shard. It is
 // internally used by the HAKeeper to maintain how many Log shards are available
@@ -50,9 +50,15 @@ message LogShardRecord {
   uint64 ShardID = 1;
   // NumberOfReplicas is the number of replicas in the shard.
   uint64 NumberOfReplicas = 2;
-  // Name is the human readable name of the shard given by the DN.
-  string Name = 3;
-};
+}
+
+// LogShard
+message LogShard {
+  // LogShard extends LogShardRecord
+  LogShardRecord LogShardRecord = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
+  // ReplicaID is the replica ID of the replica running on the LogStore. 
+  uint64 ReplicaID  = 2;
+}
 
 // DNStore DN store metadata
 message DNStore {
@@ -60,4 +66,12 @@ message DNStore {
   string UUID = 1;
   // Shards DNShards
   repeated DNShard Shards = 2 [(gogoproto.nullable) = false];
-}
\ No newline at end of file
+}
+
+// LogStore is for the metadata for Log store.
+message LogStore {
+  // UUID is the id of the Log store.
+  string UUID = 1;
+  // Shards is for Log shards metadata.
+  repeated LogShard Shards = 2 [(gogoproto.nullable) = false];
+}