diff --git a/pkg/vm/engine/tae/logstore/entry/types.go b/pkg/vm/engine/tae/logstore/entry/types.go index 2daa52eff0b24a72db5191d078fff3b513d32421..2aaac414441a21d93d0245cf5f52d3d70893628c 100644 --- a/pkg/vm/engine/tae/logstore/entry/types.go +++ b/pkg/vm/engine/tae/logstore/entry/types.go @@ -41,6 +41,7 @@ const ( GTNoop GTCKp GTUncommit + GTCommit GTInternal GTCustomizedStart ) diff --git a/pkg/vm/engine/tae/logstore/store/checkpoint.go b/pkg/vm/engine/tae/logstore/store/checkpoint.go index 92c1e4627b0830c719289f6965635ea0cdd6e964..8f12455ff283caea92e13cb94bc4dbeee715ddd4 100644 --- a/pkg/vm/engine/tae/logstore/store/checkpoint.go +++ b/pkg/vm/engine/tae/logstore/store/checkpoint.go @@ -141,6 +141,9 @@ func (w *StoreImpl) onCheckpoint() { func (w *StoreImpl) CkpCkp() { e := w.makeInternalCheckpointEntry() _, err := w.Append(GroupInternal, e) + if err == common.ErrClose { + return + } if err != nil { panic(err) } @@ -152,12 +155,15 @@ func (w *StoreImpl) CkpCkp() { func (w *StoreImpl) onTruncatingQueue(items ...any) { gid, driverLsn := w.getDriverCheckpointed() - if driverLsn == 0 && gid == 0 { + if gid == 0 { return } - if gid == GroupCKP { + if gid == GroupCKP || gid == GroupInternal { w.CkpCkp() - _, driverLsn = w.getDriverCheckpointed() + gid, driverLsn = w.getDriverCheckpointed() + if gid == 0 { + panic("logic error") + } } atomic.StoreUint64(&w.driverCheckpointing, driverLsn) _, err := w.truncateQueue.Enqueue(struct{}{}) diff --git a/pkg/vm/engine/tae/logstore/store/replay.go b/pkg/vm/engine/tae/logstore/store/replay.go index c02f69ff1127cf6a1c3c78a1f67756e883775259..54936ea52f0d96dfb780e145dfab09df2022d566 100644 --- a/pkg/vm/engine/tae/logstore/store/replay.go +++ b/pkg/vm/engine/tae/logstore/store/replay.go @@ -39,19 +39,39 @@ func (w *StoreImpl) Replay(h ApplyHandle) error { w.walCurrentLsn[g] = lsn w.synced[g] = lsn } + for g, ckped := range w.checkpointed { + if w.walCurrentLsn[g] == 0 { + w.walCurrentLsn[g] = ckped + w.synced[g] = ckped + } + if w.minLsn[g] == 0 { + w.minLsn[g] = ckped + 1 + } + } return nil } +func (w *StoreImpl) onReplayLsn(g uint32, lsn uint64) { + _, ok := w.minLsn[g] + if !ok { + w.minLsn[g] = lsn + } +} + func (w *StoreImpl) replayEntry(e *entry.Entry, h ApplyHandle) error { walEntry := e.Entry info := e.Info switch info.Group { case GroupInternal: + w.unmarshalPostCommitEntry(walEntry.GetPayload()) + w.checkpointed[GroupCKP] = info.TargetLsn + return nil case GroupCKP: w.logCheckpointInfo(info) case GroupC: } w.logDriverLsn(e) + w.onReplayLsn(info.Group, info.GroupLSN) h(info.Group, info.GroupLSN, walEntry.GetPayload(), walEntry.GetType(), walEntry.GetInfo()) return nil } diff --git a/pkg/vm/engine/tae/logstore/store/store_test.go b/pkg/vm/engine/tae/logstore/store/store_test.go index 6be9eeed5807bbc1ffd3b82b6977cd4530bc1f30..24dfed774c152061d2f3469ee1d4f993e7208dd7 100644 --- a/pkg/vm/engine/tae/logstore/store/store_test.go +++ b/pkg/vm/engine/tae/logstore/store/store_test.go @@ -45,12 +45,23 @@ import ( // buf = bs.Bytes() // } -func newTestDriver(t *testing.T) driver.Driver { +func newTestDriver(t *testing.T, size int) driver.Driver { dir := "/tmp/logstore/teststore/store" name := "mock" os.RemoveAll(dir) cfg := &batchstoredriver.StoreCfg{ - RotateChecker: batchstoredriver.NewMaxSizeRotateChecker(int(common.M) * 64), + RotateChecker: batchstoredriver.NewMaxSizeRotateChecker(size), + } + s, err := batchstoredriver.NewBaseStore(dir, name, cfg) + assert.NoError(t, err) + return s +} + +func restartTestDriver(t *testing.T, size int) driver.Driver { + dir := "/tmp/logstore/teststore/store" + name := "mock" + cfg := &batchstoredriver.StoreCfg{ + RotateChecker: batchstoredriver.NewMaxSizeRotateChecker(size), } s, err := batchstoredriver.NewBaseStore(dir, name, cfg) assert.NoError(t, err) @@ -66,7 +77,7 @@ func newTestDriver(t *testing.T) driver.Driver { // return driver, service // } func TestAppendRead(t *testing.T) { - driver := newTestDriver(t) + driver := newTestDriver(t, int(common.M)*64) wal := NewStore(driver) defer wal.Close() @@ -145,7 +156,7 @@ func mockEntry() entry.Entry { // } func TestWal(t *testing.T) { - driver := newTestDriver(t) + driver := newTestDriver(t, int(common.M)*64) wal := NewStore(driver) defer wal.Close() @@ -234,3 +245,50 @@ func TestWal(t *testing.T) { e.Free() } } + +func TestReplay(t *testing.T) { + driver := newTestDriver(t, int(common.K)*3) + wal := NewStore(driver) + + e := entry.GetBase() + err := e.SetPayload([]byte("payload")) + if err != nil { + panic(err) + } + _, err = wal.Append(10, e) + assert.NoError(t, err) + + err = e.WaitDone() + assert.NoError(t, err) + e.Free() + + e2 := entry.GetBase() + err = e2.SetPayload(make([]byte, int(common.K*3))) + if err != nil { + panic(err) + } + _, err = wal.Append(10, e2) + assert.NoError(t, err) + + err = e2.WaitDone() + assert.NoError(t, err) + e2.Free() + + e2 = entry.GetBase() + err = e2.SetPayload(make([]byte, int(common.K*3))) + if err != nil { + panic(err) + } + _, err = wal.Append(10, e2) + assert.NoError(t, err) + + err = e2.WaitDone() + assert.NoError(t, err) + e2.Free() + + wal.Close() + + driver = restartTestDriver(t, int(common.K)*3) + wal = NewStore(driver) + wal.Close() +} diff --git a/pkg/vm/engine/tae/logstore/store/type.go b/pkg/vm/engine/tae/logstore/store/type.go index f760316e87adac57aed059380f7a7b8992452d36..5da809af6ea212a880aa893162cd1eaff42923af 100644 --- a/pkg/vm/engine/tae/logstore/store/type.go +++ b/pkg/vm/engine/tae/logstore/store/type.go @@ -19,10 +19,10 @@ import ( ) const ( - GroupC = entry.GTCustomizedStart + iota - GroupCKP - GroupInternal - GroupUC + GroupC = entry.GTCommit + GroupCKP = entry.GTCKp + GroupInternal = entry.GTInternal + GroupUC = entry.GTUncommit ) type Store interface { diff --git a/pkg/vm/engine/tae/logstore/store/walinfo.go b/pkg/vm/engine/tae/logstore/store/walinfo.go index d9a787741e2b4eae33734177518657b1a862f6b8..0d786b788a05078c04a12f0e63cbe050367c3d25 100644 --- a/pkg/vm/engine/tae/logstore/store/walinfo.go +++ b/pkg/vm/engine/tae/logstore/store/walinfo.go @@ -30,7 +30,8 @@ import ( var ( ErrGroupNotFount = errors.New("group not found") ErrLsnNotFount = errors.New("lsn not found") - ErrTimeOut = errors.New("retry time our") + ErrTimeOut = errors.New("retry timeout") + ErrLsnTooSmall = errors.New("lsn is too small") ) type StoreInfo struct { @@ -40,11 +41,11 @@ type StoreInfo struct { lsnMu sync.RWMutex driverCheckpointing uint64 driverCheckpointed uint64 - walCurrentLsn map[uint32]uint64 + walCurrentLsn map[uint32]uint64 //todo lsnmu sync.RWMutex - syncing map[uint32]uint64 + syncing map[uint32]uint64 //todo - synced map[uint32]uint64 + synced map[uint32]uint64 //todo syncedMu sync.RWMutex commitCond sync.Cond @@ -52,6 +53,8 @@ type StoreInfo struct { checkpointedMu sync.RWMutex ckpcnt map[uint32]uint64 ckpcntMu sync.RWMutex + + minLsn map[uint32]uint64 } func newWalInfo() *StoreInfo { @@ -71,8 +74,11 @@ func newWalInfo() *StoreInfo { syncedMu: sync.RWMutex{}, ckpcnt: make(map[uint32]uint64), ckpcntMu: sync.RWMutex{}, + + minLsn: make(map[uint32]uint64), } } + func (w *StoreInfo) GetCurrSeqNum(gid uint32) (lsn uint64) { w.lsnmu.RLock() defer w.lsnmu.RUnlock() @@ -125,6 +131,7 @@ func (w *StoreInfo) logDriverLsn(driverEntry *driverEntry.Entry) { if info.Group == GroupInternal { w.checkpointedMu.Lock() w.checkpointed[GroupCKP] = info.TargetLsn + w.checkpointed[GroupInternal] = info.GroupLSN - 1 w.checkpointedMu.Unlock() } @@ -183,6 +190,10 @@ func (w *StoreInfo) retryGetDriverLsn(gid uint32, lsn uint64) (driverLsn uint64, func (w *StoreInfo) getDriverLsn(gid uint32, lsn uint64) (driverLsn uint64, err error) { w.lsnMu.RLock() defer w.lsnMu.RUnlock() + minLsn := w.minLsn[gid] + if lsn < minLsn { + return 0, ErrLsnTooSmall + } lsnMap, ok := w.walDriverLsnMap[gid] if !ok { return 0, ErrGroupNotFount @@ -232,17 +243,45 @@ func (w *StoreInfo) onCheckpoint() { } func (w *StoreInfo) getDriverCheckpointed() (gid uint32, driverLsn uint64) { + groups := make([]uint32, 0) + w.syncedMu.RLock() + for g := range w.synced { + groups = append(groups, g) + } + w.syncedMu.RUnlock() + w.checkpointedMu.RLock() if len(w.checkpointed) == 0 { w.checkpointedMu.RUnlock() return } driverLsn = math.MaxInt64 - for g, lsn := range w.checkpointed { - drLsn, err := w.retryGetDriverLsn(g, lsn) - if err != nil { - logutil.Infof("%d-%d", g, lsn) - panic(err) + for _, g := range groups { + lsn := w.checkpointed[g] + var drLsn uint64 + var err error + if lsn == 0 { + drLsn, err = w.retryGetDriverLsn(g, 1) + if err != nil { + logutil.Infof("%d-%d", g, lsn) + panic(err) + } + drLsn-- + } else { + drLsn, err = w.retryGetDriverLsn(g, lsn) + if err != nil { + if err == ErrLsnTooSmall { + logutil.Infof("%d-%d too small", g, lsn) + synced := w.GetSynced(g) + if drLsn == synced { + continue + } else { + return 0, 0 + } + } + logutil.Infof("%d-%d", g, lsn) + panic(err) + } } if drLsn < driverLsn { gid = g @@ -281,6 +320,12 @@ func (w *StoreInfo) marshalPostCommitEntry() (buf []byte, err error) { return } +func (w *StoreInfo) unmarshalPostCommitEntry(buf []byte) (err error) { + bbuf := bytes.NewBuffer(buf) + _, err = w.readPostCommitEntry(bbuf) + return +} + func (w *StoreInfo) writePostCommitEntry(writer io.Writer) (n int64, err error) { w.ckpMu.RLock() defer w.ckpMu.RUnlock() @@ -303,3 +348,34 @@ func (w *StoreInfo) writePostCommitEntry(writer io.Writer) (n int64, err error) } return } + +func (w *StoreInfo) readPostCommitEntry(reader io.Reader) (n int64, err error) { + w.ckpMu.Lock() + defer w.ckpMu.Unlock() + //checkpointing + length := uint32(0) + if err = binary.Read(reader, binary.BigEndian, &length); err != nil { + return + } + n += 4 + for i := 0; i < int(length); i++ { + groupID := uint32(0) + if err = binary.Read(reader, binary.BigEndian, &groupID); err != nil { + return + } + n += 4 + ckpInfo := newCheckpointInfo() + sn, err := ckpInfo.ReadFrom(reader) + n += sn + if err != nil { + return n, err + } + ckp, ok := w.checkpointInfo[groupID] + if ok { + ckp.MergeCheckpointInfo(ckpInfo) + } else { + w.checkpointInfo[groupID] = ckpInfo + } + } + return +}