Skip to content
Snippets Groups Projects
Unverified Commit 972b86fa authored by jiangxinmeng1's avatar jiangxinmeng1 Committed by GitHub
Browse files

fix replay logstore (#4551)

Fix replay logstore. Replay currlsn by checkpointInfo.  Replay internal entry when replay. When truncate, check all groups instead of checkpointed groups.

Approved by: @XuPeng-SH
parent 9977631b
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,7 @@ const (
GTNoop
GTCKp
GTUncommit
GTCommit
GTInternal
GTCustomizedStart
)
......
......@@ -141,6 +141,9 @@ func (w *StoreImpl) onCheckpoint() {
func (w *StoreImpl) CkpCkp() {
e := w.makeInternalCheckpointEntry()
_, err := w.Append(GroupInternal, e)
if err == common.ErrClose {
return
}
if err != nil {
panic(err)
}
......@@ -152,12 +155,15 @@ func (w *StoreImpl) CkpCkp() {
func (w *StoreImpl) onTruncatingQueue(items ...any) {
gid, driverLsn := w.getDriverCheckpointed()
if driverLsn == 0 && gid == 0 {
if gid == 0 {
return
}
if gid == GroupCKP {
if gid == GroupCKP || gid == GroupInternal {
w.CkpCkp()
_, driverLsn = w.getDriverCheckpointed()
gid, driverLsn = w.getDriverCheckpointed()
if gid == 0 {
panic("logic error")
}
}
atomic.StoreUint64(&w.driverCheckpointing, driverLsn)
_, err := w.truncateQueue.Enqueue(struct{}{})
......
......@@ -39,19 +39,39 @@ func (w *StoreImpl) Replay(h ApplyHandle) error {
w.walCurrentLsn[g] = lsn
w.synced[g] = lsn
}
for g, ckped := range w.checkpointed {
if w.walCurrentLsn[g] == 0 {
w.walCurrentLsn[g] = ckped
w.synced[g] = ckped
}
if w.minLsn[g] == 0 {
w.minLsn[g] = ckped + 1
}
}
return nil
}
func (w *StoreImpl) onReplayLsn(g uint32, lsn uint64) {
_, ok := w.minLsn[g]
if !ok {
w.minLsn[g] = lsn
}
}
func (w *StoreImpl) replayEntry(e *entry.Entry, h ApplyHandle) error {
walEntry := e.Entry
info := e.Info
switch info.Group {
case GroupInternal:
w.unmarshalPostCommitEntry(walEntry.GetPayload())
w.checkpointed[GroupCKP] = info.TargetLsn
return nil
case GroupCKP:
w.logCheckpointInfo(info)
case GroupC:
}
w.logDriverLsn(e)
w.onReplayLsn(info.Group, info.GroupLSN)
h(info.Group, info.GroupLSN, walEntry.GetPayload(), walEntry.GetType(), walEntry.GetInfo())
return nil
}
......@@ -45,12 +45,23 @@ import (
// buf = bs.Bytes()
// }
func newTestDriver(t *testing.T) driver.Driver {
func newTestDriver(t *testing.T, size int) driver.Driver {
dir := "/tmp/logstore/teststore/store"
name := "mock"
os.RemoveAll(dir)
cfg := &batchstoredriver.StoreCfg{
RotateChecker: batchstoredriver.NewMaxSizeRotateChecker(int(common.M) * 64),
RotateChecker: batchstoredriver.NewMaxSizeRotateChecker(size),
}
s, err := batchstoredriver.NewBaseStore(dir, name, cfg)
assert.NoError(t, err)
return s
}
func restartTestDriver(t *testing.T, size int) driver.Driver {
dir := "/tmp/logstore/teststore/store"
name := "mock"
cfg := &batchstoredriver.StoreCfg{
RotateChecker: batchstoredriver.NewMaxSizeRotateChecker(size),
}
s, err := batchstoredriver.NewBaseStore(dir, name, cfg)
assert.NoError(t, err)
......@@ -66,7 +77,7 @@ func newTestDriver(t *testing.T) driver.Driver {
// return driver, service
// }
func TestAppendRead(t *testing.T) {
driver := newTestDriver(t)
driver := newTestDriver(t, int(common.M)*64)
wal := NewStore(driver)
defer wal.Close()
......@@ -145,7 +156,7 @@ func mockEntry() entry.Entry {
// }
func TestWal(t *testing.T) {
driver := newTestDriver(t)
driver := newTestDriver(t, int(common.M)*64)
wal := NewStore(driver)
defer wal.Close()
......@@ -234,3 +245,50 @@ func TestWal(t *testing.T) {
e.Free()
}
}
func TestReplay(t *testing.T) {
driver := newTestDriver(t, int(common.K)*3)
wal := NewStore(driver)
e := entry.GetBase()
err := e.SetPayload([]byte("payload"))
if err != nil {
panic(err)
}
_, err = wal.Append(10, e)
assert.NoError(t, err)
err = e.WaitDone()
assert.NoError(t, err)
e.Free()
e2 := entry.GetBase()
err = e2.SetPayload(make([]byte, int(common.K*3)))
if err != nil {
panic(err)
}
_, err = wal.Append(10, e2)
assert.NoError(t, err)
err = e2.WaitDone()
assert.NoError(t, err)
e2.Free()
e2 = entry.GetBase()
err = e2.SetPayload(make([]byte, int(common.K*3)))
if err != nil {
panic(err)
}
_, err = wal.Append(10, e2)
assert.NoError(t, err)
err = e2.WaitDone()
assert.NoError(t, err)
e2.Free()
wal.Close()
driver = restartTestDriver(t, int(common.K)*3)
wal = NewStore(driver)
wal.Close()
}
......@@ -19,10 +19,10 @@ import (
)
const (
GroupC = entry.GTCustomizedStart + iota
GroupCKP
GroupInternal
GroupUC
GroupC = entry.GTCommit
GroupCKP = entry.GTCKp
GroupInternal = entry.GTInternal
GroupUC = entry.GTUncommit
)
type Store interface {
......
......@@ -30,7 +30,8 @@ import (
var (
ErrGroupNotFount = errors.New("group not found")
ErrLsnNotFount = errors.New("lsn not found")
ErrTimeOut = errors.New("retry time our")
ErrTimeOut = errors.New("retry timeout")
ErrLsnTooSmall = errors.New("lsn is too small")
)
type StoreInfo struct {
......@@ -40,11 +41,11 @@ type StoreInfo struct {
lsnMu sync.RWMutex
driverCheckpointing uint64
driverCheckpointed uint64
walCurrentLsn map[uint32]uint64
walCurrentLsn map[uint32]uint64 //todo
lsnmu sync.RWMutex
syncing map[uint32]uint64
syncing map[uint32]uint64 //todo
synced map[uint32]uint64
synced map[uint32]uint64 //todo
syncedMu sync.RWMutex
commitCond sync.Cond
......@@ -52,6 +53,8 @@ type StoreInfo struct {
checkpointedMu sync.RWMutex
ckpcnt map[uint32]uint64
ckpcntMu sync.RWMutex
minLsn map[uint32]uint64
}
func newWalInfo() *StoreInfo {
......@@ -71,8 +74,11 @@ func newWalInfo() *StoreInfo {
syncedMu: sync.RWMutex{},
ckpcnt: make(map[uint32]uint64),
ckpcntMu: sync.RWMutex{},
minLsn: make(map[uint32]uint64),
}
}
func (w *StoreInfo) GetCurrSeqNum(gid uint32) (lsn uint64) {
w.lsnmu.RLock()
defer w.lsnmu.RUnlock()
......@@ -125,6 +131,7 @@ func (w *StoreInfo) logDriverLsn(driverEntry *driverEntry.Entry) {
if info.Group == GroupInternal {
w.checkpointedMu.Lock()
w.checkpointed[GroupCKP] = info.TargetLsn
w.checkpointed[GroupInternal] = info.GroupLSN - 1
w.checkpointedMu.Unlock()
}
......@@ -183,6 +190,10 @@ func (w *StoreInfo) retryGetDriverLsn(gid uint32, lsn uint64) (driverLsn uint64,
func (w *StoreInfo) getDriverLsn(gid uint32, lsn uint64) (driverLsn uint64, err error) {
w.lsnMu.RLock()
defer w.lsnMu.RUnlock()
minLsn := w.minLsn[gid]
if lsn < minLsn {
return 0, ErrLsnTooSmall
}
lsnMap, ok := w.walDriverLsnMap[gid]
if !ok {
return 0, ErrGroupNotFount
......@@ -232,17 +243,45 @@ func (w *StoreInfo) onCheckpoint() {
}
func (w *StoreInfo) getDriverCheckpointed() (gid uint32, driverLsn uint64) {
groups := make([]uint32, 0)
w.syncedMu.RLock()
for g := range w.synced {
groups = append(groups, g)
}
w.syncedMu.RUnlock()
w.checkpointedMu.RLock()
if len(w.checkpointed) == 0 {
w.checkpointedMu.RUnlock()
return
}
driverLsn = math.MaxInt64
for g, lsn := range w.checkpointed {
drLsn, err := w.retryGetDriverLsn(g, lsn)
if err != nil {
logutil.Infof("%d-%d", g, lsn)
panic(err)
for _, g := range groups {
lsn := w.checkpointed[g]
var drLsn uint64
var err error
if lsn == 0 {
drLsn, err = w.retryGetDriverLsn(g, 1)
if err != nil {
logutil.Infof("%d-%d", g, lsn)
panic(err)
}
drLsn--
} else {
drLsn, err = w.retryGetDriverLsn(g, lsn)
if err != nil {
if err == ErrLsnTooSmall {
logutil.Infof("%d-%d too small", g, lsn)
synced := w.GetSynced(g)
if drLsn == synced {
continue
} else {
return 0, 0
}
}
logutil.Infof("%d-%d", g, lsn)
panic(err)
}
}
if drLsn < driverLsn {
gid = g
......@@ -281,6 +320,12 @@ func (w *StoreInfo) marshalPostCommitEntry() (buf []byte, err error) {
return
}
func (w *StoreInfo) unmarshalPostCommitEntry(buf []byte) (err error) {
bbuf := bytes.NewBuffer(buf)
_, err = w.readPostCommitEntry(bbuf)
return
}
func (w *StoreInfo) writePostCommitEntry(writer io.Writer) (n int64, err error) {
w.ckpMu.RLock()
defer w.ckpMu.RUnlock()
......@@ -303,3 +348,34 @@ func (w *StoreInfo) writePostCommitEntry(writer io.Writer) (n int64, err error)
}
return
}
func (w *StoreInfo) readPostCommitEntry(reader io.Reader) (n int64, err error) {
w.ckpMu.Lock()
defer w.ckpMu.Unlock()
//checkpointing
length := uint32(0)
if err = binary.Read(reader, binary.BigEndian, &length); err != nil {
return
}
n += 4
for i := 0; i < int(length); i++ {
groupID := uint32(0)
if err = binary.Read(reader, binary.BigEndian, &groupID); err != nil {
return
}
n += 4
ckpInfo := newCheckpointInfo()
sn, err := ckpInfo.ReadFrom(reader)
n += sn
if err != nil {
return n, err
}
ckp, ok := w.checkpointInfo[groupID]
if ok {
ckp.MergeCheckpointInfo(ckpInfo)
} else {
w.checkpointInfo[groupID] = ckpInfo
}
}
return
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment