From f31effdd084bea00efb95a293845540fd2b312cc Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 <51114574+jiangxinmeng1@users.noreply.github.com> Date: Mon, 15 Aug 2022 12:31:52 +0800 Subject: [PATCH] fix catalog replay (#4476) Fix replay catalog. Replace baseEntry in onReplay(). Approved by: @XuPeng-SH --- pkg/vm/engine/tae/catalog/catalog.go | 40 ++++++++------------------ pkg/vm/engine/tae/catalog/replay.go | 1 + pkg/vm/engine/tae/db/replay_test.go | 43 ++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 28 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalog.go b/pkg/vm/engine/tae/catalog/catalog.go index fc984a4a3..88bcb6486 100644 --- a/pkg/vm/engine/tae/catalog/catalog.go +++ b/pkg/vm/engine/tae/catalog/catalog.go @@ -233,17 +233,10 @@ func (catalog *Catalog) onReplayDatabase(cmd *EntryCommand) { var db *DBEntry db, err = catalog.GetDatabaseByID(cmd.DB.ID) if err == nil { - cmd.DB.entries = db.entries - cmd.DB.link = db.link - cmd.DB.nameNodes = db.nameNodes - if err = db.ApplyDeleteCmd(cmd.DB.DeleteAt, cmd.DB.LogIndex); err != nil { - panic(err) - } - if err = catalog.RemoveEntry(db); err != nil { - panic(err) - } + db.BaseEntry = cmd.DB.BaseEntry + } else { + err = catalog.AddEntryLocked(cmd.DB) } - err = catalog.AddEntryLocked(cmd.DB) if err != nil { panic(err) } @@ -320,16 +313,10 @@ func (catalog *Catalog) onReplayTable(cmd *EntryCommand, dataFactory DataFactory } else { rel, _ := db.GetTableEntryByID(cmd.Table.ID) if rel != nil { - cmd.Table.entries = rel.entries - cmd.Table.link = rel.link - if err = rel.ApplyDeleteCmd(cmd.Table.DeleteAt, cmd.Table.LogIndex); err != nil { - panic(err) - } - if err = db.RemoveEntry(rel); err != nil { - panic(err) - } + rel.BaseEntry = cmd.Table.BaseEntry + } else { + err = db.AddEntryLocked(cmd.Table) } - err = db.AddEntryLocked(cmd.Table) } if err != nil { panic(err) @@ -417,12 +404,10 @@ func (catalog *Catalog) onReplaySegment(cmd *EntryCommand, dataFactory DataFacto } else { seg, _ := rel.GetSegmentByID(cmd.Segment.ID) if seg != nil { - cmd.Segment.entries = seg.entries - if err = rel.deleteEntryLocked(seg); err != nil { - panic(err) - } + seg.BaseEntry = cmd.Segment.BaseEntry + } else { + rel.AddEntryLocked(cmd.Segment) } - rel.AddEntryLocked(cmd.Segment) } } @@ -521,11 +506,10 @@ func (catalog *Catalog) onReplayBlock(cmd *EntryCommand, dataFactory DataFactory } else { blk, _ := seg.GetBlockEntryByID(cmd.Block.ID) if blk != nil { - if err = seg.deleteEntryLocked(blk); err != nil { - panic(err) - } + blk.BaseEntry = cmd.Block.BaseEntry + } else { + seg.AddEntryLocked(cmd.Block) } - seg.AddEntryLocked(cmd.Block) } } func (catalog *Catalog) ReplayTableRows() { diff --git a/pkg/vm/engine/tae/catalog/replay.go b/pkg/vm/engine/tae/catalog/replay.go index a5639f5ce..1e4edb773 100644 --- a/pkg/vm/engine/tae/catalog/replay.go +++ b/pkg/vm/engine/tae/catalog/replay.go @@ -44,6 +44,7 @@ func (replayer *Replayer) ReplayerHandle(group uint32, commitId uint64, payload if err := e.Unmarshal(payload); err != nil { panic(err) } + checkpoint := new(Checkpoint) checkpoint.CommitId = commitId checkpoint.MaxTS = e.MaxTS diff --git a/pkg/vm/engine/tae/db/replay_test.go b/pkg/vm/engine/tae/db/replay_test.go index e6ff073db..57676b569 100644 --- a/pkg/vm/engine/tae/db/replay_test.go +++ b/pkg/vm/engine/tae/db/replay_test.go @@ -1351,3 +1351,46 @@ func TestReplay10(t *testing.T) { assert.Equal(t, "hello", string(schema1.ColDefs[1].Default.Expr)) assert.Equal(t, true, schema1.ColDefs[2].Default.NullAbility) } + +// create db,tbl,seg,blk +// checkpoint +// softdelete seg +// checkpoint +// restart +func TestReplaySnapshots(t *testing.T) { + opts := config.WithLongScanAndCKPOpts(nil) + tae := newTestEngine(t, opts) + schema := catalog.MockSchemaAll(1, -1) + + txn, err := tae.StartTxn(nil) + assert.NoError(t, err) + db, err := txn.CreateDatabase("db") + assert.NoError(t, err) + rel, err := db.CreateRelation(schema) + assert.NoError(t, err) + seg, err := rel.CreateSegment() + assert.NoError(t, err) + _, err = seg.CreateBlock() + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) + + tae.Catalog.Checkpoint(tae.Scheduler.GetSafeTS()) + + txn, err = tae.StartTxn(nil) + assert.NoError(t, err) + db, err = txn.GetDatabase("db") + assert.NoError(t, err) + rel, err = db.GetRelationByName(schema.Name) + assert.NoError(t, err) + err = rel.SoftDeleteSegment(seg.GetID()) + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) + + tae.Catalog.Checkpoint(tae.Scheduler.GetSafeTS()) + t.Log(tae.Catalog.SimplePPString(3)) + + tae.restart() + t.Log(tae.Catalog.SimplePPString(3)) + + tae.Close() +} -- GitLab