Skip to content
Snippets Groups Projects
Unverified Commit e23b8efe authored by XuPeng-SH's avatar XuPeng-SH Committed by GitHub
Browse files

Replay bug fix (#1214)

parent b277a3fa
No related branches found
No related tags found
No related merge requests found
......@@ -690,25 +690,7 @@ func (catalog *Catalog) onReplayCreateTable(entry *tableLogEntry) error {
return db.onNewTable(tbl)
}
func (catalog *Catalog) onReplayAddIndice(entry *tableLogEntry) error {
db := catalog.Databases[entry.DatabaseId]
tbl := db.TableSet[entry.Id]
return tbl.onCommit(entry.CommitInfo)
}
func (catalog *Catalog) onReplayDropIndice(entry *tableLogEntry) error {
db := catalog.Databases[entry.DatabaseId]
tbl := db.TableSet[entry.Id]
return tbl.onCommit(entry.CommitInfo)
}
func (catalog *Catalog) onReplaySoftDeleteTable(entry *tableLogEntry) error {
db := catalog.Databases[entry.DatabaseId]
tbl := db.TableSet[entry.Id]
return tbl.onCommit(entry.CommitInfo)
}
func (catalog *Catalog) onReplayHardDeleteTable(entry *tableLogEntry) error {
func (catalog *Catalog) onReplayTableOperation(entry *tableLogEntry) error {
db := catalog.Databases[entry.DatabaseId]
tbl := db.TableSet[entry.Id]
return tbl.onCommit(entry.CommitInfo)
......
......@@ -91,15 +91,15 @@ func (cache *replayCache) onReplayTxnEntry(entry LogEntry) error {
dbEntry := &databaseLogEntry{}
dbEntry.Unmarshal(entry.GetPayload())
cache.replayer.catalog.onReplaySoftDeleteDatabase(dbEntry)
case ETSoftDeleteTable:
tblEntry := &tableLogEntry{}
tblEntry.Unmarshal(entry.GetPayload())
cache.replayer.catalog.onReplayTableOperation(tblEntry)
case ETCreateTable:
tblEntry := &tableLogEntry{}
tblEntry.Unmarshal(entry.GetPayload())
cache.replayer.catalog.Sequence.TryUpdateTableId(tblEntry.Table.Id)
cache.replayer.catalog.onReplayCreateTable(tblEntry)
case ETSoftDeleteTable:
tblEntry := &tableLogEntry{}
tblEntry.Unmarshal(entry.GetPayload())
cache.replayer.catalog.onReplaySoftDeleteTable(tblEntry)
default:
panic("not supported")
}
......@@ -158,14 +158,8 @@ func (cache *replayCache) applyReplayEntry(entry *replayEntry, catalog *Catalog,
case ETCreateTable:
catalog.Sequence.TryUpdateTableId(entry.tblEntry.Table.Id)
err = catalog.onReplayCreateTable(entry.tblEntry)
case ETSoftDeleteTable:
err = catalog.onReplaySoftDeleteTable(entry.tblEntry)
case ETHardDeleteTable:
err = catalog.onReplayHardDeleteTable(entry.tblEntry)
case ETAddIndice:
err = catalog.onReplayAddIndice(entry.tblEntry)
case ETDropIndice:
err = catalog.onReplayDropIndice(entry.tblEntry)
case ETAddIndice, ETDropIndice, ETSoftDeleteTable, ETHardDeleteTable:
err = catalog.onReplayTableOperation(entry.tblEntry)
case ETCreateSegment:
catalog.Sequence.TryUpdateSegmentId(entry.segEntry.Id)
err = catalog.onReplayCreateSegment(entry.segEntry)
......@@ -423,24 +417,7 @@ func (replayer *catalogReplayer) onReplayEntry(entry LogEntry, observer logstore
tblEntry: tbl,
commitId: GetCommitIdFromLogEntry(entry),
})
case ETSoftDeleteTable:
tbl := &tableLogEntry{}
tbl.Unmarshal(entry.GetPayload())
replayer.cache.Append(&replayEntry{
typ: ETSoftDeleteTable,
tblEntry: tbl,
commitId: GetCommitIdFromLogEntry(entry),
})
case ETHardDeleteTable:
tbl := &tableLogEntry{}
tbl.Unmarshal(entry.GetPayload())
replayer.cache.Append(&replayEntry{
typ: ETHardDeleteTable,
tblEntry: tbl,
commitId: GetCommitIdFromLogEntry(entry),
})
case ETAddIndice:
case ETDropIndice:
case ETAddIndice, ETDropIndice, ETSoftDeleteTable, ETHardDeleteTable:
tbl := &tableLogEntry{}
tbl.Unmarshal(entry.GetPayload())
replayer.cache.Append(&replayEntry{
......
......@@ -448,8 +448,7 @@ func (e *Table) ToLogEntry(eType LogEntryType) LogEntry {
DatabaseId: e.Database.Id,
}
buf, _ = entry.Marshal()
case ETAddIndice:
case ETDropIndice:
case ETAddIndice, ETDropIndice:
entry := tableLogEntry{
BaseEntry: e.BaseEntry,
DatabaseId: e.Database.Id,
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment