diff --git a/pkg/vm/engine/tae/catalog/block.go b/pkg/vm/engine/tae/catalog/block.go index 94fbeebff84bf2b54b941ea6f63bb73ddb119cbe..a594e85272f38ec95032ed152550390c27170e53 100644 --- a/pkg/vm/engine/tae/catalog/block.go +++ b/pkg/vm/engine/tae/catalog/block.go @@ -215,3 +215,49 @@ func (entry *BlockEntry) DestroyData() (err error) { func (entry *BlockEntry) MakeKey() []byte { return model.EncodeBlockKeyPrefix(entry.segment.ID, entry.ID) } + +// Coarse API: no consistency check +func (entry *BlockEntry) IsActive() bool { + segment := entry.GetSegment() + if !segment.IsActive() { + return false + } + entry.RLock() + dropped := entry.IsDroppedCommitted() + entry.RUnlock() + return !dropped +} + +// Coarse API: no consistency check +func (entry *BlockEntry) GetTerminationTS() (ts uint64, terminated bool) { + segmentEntry := entry.GetSegment() + tableEntry := segmentEntry.GetTable() + dbEntry := tableEntry.GetDB() + + dbEntry.RLock() + terminated = dbEntry.IsDroppedCommitted() + if terminated { + ts = dbEntry.DeleteAt + } + dbEntry.RUnlock() + if terminated { + return + } + + tableEntry.RLock() + terminated = tableEntry.IsDroppedCommitted() + if terminated { + ts = tableEntry.DeleteAt + } + tableEntry.RUnlock() + return + // segmentEntry.RLock() + // terminated = segmentEntry.IsDroppedCommitted() + // if terminated { + // ts = segmentEntry.DeleteAt + // } + // segmentEntry.RUnlock() + // if terminated { + // return + // } +} diff --git a/pkg/vm/engine/tae/catalog/catalog.go b/pkg/vm/engine/tae/catalog/catalog.go index 1ea24a20fbc15031d2bcb42f39d7911877a951c6..a6a2f57a79ead7f5c35695bdf0bc4358c81cabed 100644 --- a/pkg/vm/engine/tae/catalog/catalog.go +++ b/pkg/vm/engine/tae/catalog/catalog.go @@ -606,6 +606,7 @@ func (catalog *Catalog) RemoveEntry(database *DBEntry) error { if nn.Length() == 0 { delete(catalog.nameNodes, database.name) } + delete(catalog.entries, database.GetID()) } return nil } diff --git a/pkg/vm/engine/tae/catalog/database.go b/pkg/vm/engine/tae/catalog/database.go index 50a2a64c75422405d9e9bcd757b07101d319dd20..f157062cc24369811a6c1719cfcd5758d2814c33 100644 --- a/pkg/vm/engine/tae/catalog/database.go +++ b/pkg/vm/engine/tae/catalog/database.go @@ -212,6 +212,7 @@ func (e *DBEntry) RemoveEntry(table *TableEntry) (err error) { if nn.Length() == 0 { delete(e.nameNodes, table.GetSchema().Name) } + delete(e.entries, table.GetID()) } return } @@ -350,3 +351,11 @@ func (entry *DBEntry) CloneCreate() CheckpointItem { } return cloned } + +// Coarse API: no consistency check +func (entry *DBEntry) IsActive() bool { + entry.RLock() + dropped := entry.IsDroppedCommitted() + entry.RUnlock() + return !dropped +} diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index 94015f1538c33c3d6168d60c47af57ab7854cec6..a684ed80d3b34acd84b7c47512ec6f7db8a5bdd3 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -475,7 +475,7 @@ func MockCompoundSchema(colCnt int, pkIdx ...int) *Schema { func MockSchema(colCnt int, pkIdx int) *Schema { rand.Seed(time.Now().UnixNano()) - schema := NewEmptySchema(fmt.Sprintf("%d", rand.Intn(1000000))) + schema := NewEmptySchema(fmt.Sprintf("%d", rand.Intn(1000000)+rand.Intn(10000))) prefix := "mock_" for i := 0; i < colCnt; i++ { if pkIdx == i { @@ -489,6 +489,7 @@ func MockSchema(colCnt int, pkIdx int) *Schema { } // MockSchemaAll if char/varchar is needed, colCnt = 14, otherwise colCnt = 12 +// pkIdx == -1 means no pk defined func MockSchemaAll(colCnt int, pkIdx int) *Schema { schema := NewEmptySchema(fmt.Sprintf("%d", rand.Intn(1000000))) prefix := "mock_" diff --git a/pkg/vm/engine/tae/catalog/segment.go b/pkg/vm/engine/tae/catalog/segment.go index a67d38be13e3125db691cac638cf3cd563ec96a6..83d722ee45fb3b1688040baea0a3a9992a33dfc4 100644 --- a/pkg/vm/engine/tae/catalog/segment.go +++ b/pkg/vm/engine/tae/catalog/segment.go @@ -21,6 +21,7 @@ import ( "io" "sync" + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" @@ -269,6 +270,7 @@ func (entry *SegmentEntry) deleteEntryLocked(block *BlockEntry) error { return ErrNotFound } else { entry.link.Delete(n) + delete(entry.entries, block.GetID()) } return nil } @@ -282,11 +284,18 @@ func (entry *SegmentEntry) RemoveEntry(block *BlockEntry) (err error) { func (entry *SegmentEntry) PrepareRollback() (err error) { entry.RLock() currOp := entry.CurrOp + logutil.Infof("PrepareRollback %s", entry.StringLocked()) entry.RUnlock() if currOp == OpCreate { if err = entry.GetTable().RemoveEntry(entry); err != nil { return } + //TODO: maybe scheduled? + // entry.GetCatalog().GetScheduler().ScheduleScopedFn(nil, tasks.IOTask, entry.AsCommonID(), entry.DestroyData) + if err = entry.DestroyData(); err != nil { + logutil.Fatalf("Cannot destroy uncommitted segment [%s] data: %v", entry.Repr(), err) + return + } } if err = entry.BaseEntry.PrepareRollback(); err != nil { return @@ -378,5 +387,20 @@ func (entry *SegmentEntry) CollectBlockEntries(commitFilter func(be *BaseEntry) } func (entry *SegmentEntry) DestroyData() (err error) { - return entry.segData.Destory() + if entry.segData != nil { + err = entry.segData.Destory() + } + return +} + +// Coarse API: no consistency check +func (entry *SegmentEntry) IsActive() bool { + table := entry.GetTable() + if !table.IsActive() { + return false + } + entry.RLock() + dropped := entry.IsDroppedCommitted() + entry.RUnlock() + return !dropped } diff --git a/pkg/vm/engine/tae/catalog/table.go b/pkg/vm/engine/tae/catalog/table.go index b2e0ac6c5e78ca1c229becc91f65afb2145bc346..e4ef7d031e22d7e740a846e57f819a8bf32a30e8 100644 --- a/pkg/vm/engine/tae/catalog/table.go +++ b/pkg/vm/engine/tae/catalog/table.go @@ -166,6 +166,7 @@ func (entry *TableEntry) deleteEntryLocked(segment *SegmentEntry) error { return ErrNotFound } else { entry.link.Delete(n) + delete(entry.entries, segment.GetID()) } return nil } @@ -245,6 +246,7 @@ func (entry *TableEntry) RecurLoop(processor Processor) (err error) { if err = processor.OnSegment(segment); err != nil { if err == ErrStopCurrRecur { err = nil + segIt.Next() continue } break @@ -253,7 +255,12 @@ func (entry *TableEntry) RecurLoop(processor Processor) (err error) { for blkIt.Valid() { block := blkIt.Get().GetPayload().(*BlockEntry) if err = processor.OnBlock(block); err != nil { - return + if err == ErrStopCurrRecur { + err = nil + blkIt.Next() + continue + } + break } blkIt.Next() } @@ -349,3 +356,15 @@ func (entry *TableEntry) CloneCreate() CheckpointItem { } return cloned } + +// Coarse API: no consistency check +func (entry *TableEntry) IsActive() bool { + db := entry.GetDB() + if !db.IsActive() { + return false + } + entry.RLock() + dropped := entry.IsDroppedCommitted() + entry.RUnlock() + return !dropped +} diff --git a/pkg/vm/engine/tae/db/base_test.go b/pkg/vm/engine/tae/db/base_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4945e10e34f6e9ce1f60292892777c7b2ed79890 --- /dev/null +++ b/pkg/vm/engine/tae/db/base_test.go @@ -0,0 +1,207 @@ +package db + +import ( + "io/ioutil" + "strconv" + "strings" + "sync" + "testing" + + gbat "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/dataio/mockio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" + "github.com/stretchr/testify/assert" +) + +const ( + ModuleName = "TAEDB" +) + +func initDB(t *testing.T, opts *options.Options) *DB { + mockio.ResetFS() + dir := testutils.InitTestEnv(ModuleName, t) + db, _ := Open(dir, opts) + return db +} + +func getSegmentFileNames(dir string) (names map[uint64]string) { + names = make(map[uint64]string) + files, err := ioutil.ReadDir(dir) + if err != nil { + panic(err) + } + for _, f := range files { + name := f.Name() + segName := strings.TrimSuffix(name, ".seg") + if segName == name { + continue + } + id, err := strconv.ParseUint(segName, 10, 64) + if err != nil { + panic(err) + } + names[id] = name + } + return +} + +func printCheckpointStats(t *testing.T, tae *DB) { + t.Logf("GetCheckpointedLSN: %d", tae.Wal.GetCheckpointed()) + t.Logf("GetPenddingLSNCnt: %d", tae.Wal.GetPenddingCnt()) +} + +func createDB(t *testing.T, e *DB, dbName string) { + txn, err := e.StartTxn(nil) + assert.NoError(t, err) + _, err = txn.CreateDatabase(dbName) + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) +} + +func dropDB(t *testing.T, e *DB, dbName string) { + txn, err := e.StartTxn(nil) + assert.NoError(t, err) + _, err = txn.DropDatabase(dbName) + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) +} + +func dropRelation(t *testing.T, e *DB, dbName, name string) { + txn, err := e.StartTxn(nil) + assert.NoError(t, err) + db, err := txn.GetDatabase(dbName) + assert.NoError(t, err) + _, err = db.DropRelationByName(name) + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) +} + +func createRelation(t *testing.T, e *DB, dbName string, schema *catalog.Schema, createDB bool) (db handle.Database, rel handle.Relation) { + txn, err := e.StartTxn(nil) + assert.NoError(t, err) + if createDB { + db, err = txn.CreateDatabase(dbName) + assert.NoError(t, err) + } else { + db, err = txn.GetDatabase(dbName) + assert.NoError(t, err) + } + rel, err = db.CreateRelation(schema) + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) + return +} + +func createRelationAndAppend( + t *testing.T, + e *DB, + dbName string, + schema *catalog.Schema, + bat *gbat.Batch, + createDB bool) (db handle.Database, rel handle.Relation) { + txn, err := e.StartTxn(nil) + assert.NoError(t, err) + if createDB { + db, err = txn.CreateDatabase(dbName) + assert.NoError(t, err) + } else { + db, err = txn.GetDatabase(dbName) + assert.NoError(t, err) + } + rel, err = db.CreateRelation(schema) + assert.NoError(t, err) + err = rel.Append(bat) + assert.NoError(t, err) + assert.Nil(t, txn.Commit()) + return +} + +func appendFailClosure(t *testing.T, data *gbat.Batch, name string, e *DB, wg *sync.WaitGroup) func() { + return func() { + if wg != nil { + defer wg.Done() + } + txn, _ := e.StartTxn(nil) + database, _ := txn.GetDatabase("db") + rel, _ := database.GetRelationByName(name) + err := rel.Append(data) + assert.NotNil(t, err) + assert.Nil(t, txn.Rollback()) + } +} + +func appendClosure(t *testing.T, data *gbat.Batch, name string, e *DB, wg *sync.WaitGroup) func() { + return func() { + if wg != nil { + defer wg.Done() + } + txn, _ := e.StartTxn(nil) + database, _ := txn.GetDatabase("db") + rel, _ := database.GetRelationByName(name) + err := rel.Append(data) + assert.Nil(t, err) + assert.Nil(t, txn.Commit()) + } +} + +func tryAppendClosure(t *testing.T, data *gbat.Batch, name string, e *DB, wg *sync.WaitGroup) func() { + return func() { + if wg != nil { + defer wg.Done() + } + txn, _ := e.StartTxn(nil) + database, _ := txn.GetDatabase("db") + rel, err := database.GetRelationByName(name) + if err != nil { + _ = txn.Rollback() + return + } + if err = rel.Append(data); err != nil { + _ = txn.Rollback() + return + } + _ = txn.Commit() + } +} + +func compactBlocks(t *testing.T, e *DB, dbName string, schema *catalog.Schema, skipConflict bool) { + txn, _ := e.StartTxn(nil) + db, _ := txn.GetDatabase(dbName) + rel, _ := db.GetRelationByName(schema.Name) + + var metas []*catalog.BlockEntry + it := rel.MakeBlockIt() + for it.Valid() { + blk := it.GetBlock() + if blk.Rows() < int(schema.BlockMaxRows) { + it.Next() + continue + } + meta := blk.GetMeta().(*catalog.BlockEntry) + metas = append(metas, meta) + it.Next() + } + _ = txn.Commit() + for _, meta := range metas { + txn, _ = e.StartTxn(nil) + db, _ = txn.GetDatabase(dbName) + rel, _ = db.GetRelationByName(schema.Name) + task, err := jobs.NewCompactBlockTask(nil, txn, meta, e.Scheduler) + assert.NoError(t, err) + err = task.OnExec() + if skipConflict { + if err != nil { + _ = txn.Rollback() + } else { + _ = txn.Commit() + } + } else { + assert.NoError(t, err) + assert.NoError(t, txn.Commit()) + } + } +} diff --git a/pkg/vm/engine/tae/db/compound_test.go b/pkg/vm/engine/tae/db/compound_test.go index 0133f4b7754237affce5c12d1071b7808dec8da8..398b9bd1db1b20922496e85252dd31cff739acc1 100644 --- a/pkg/vm/engine/tae/db/compound_test.go +++ b/pkg/vm/engine/tae/db/compound_test.go @@ -9,36 +9,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/stretchr/testify/assert" ) -func compactBlocks(t *testing.T, e *DB, dbName string, schema *catalog.Schema) { - txn, _ := e.StartTxn(nil) - db, _ := txn.GetDatabase(dbName) - rel, _ := db.GetRelationByName(schema.Name) - - var metas []*catalog.BlockEntry - it := rel.MakeBlockIt() - for it.Valid() { - blk := it.GetBlock() - if blk.Rows() < int(schema.BlockMaxRows) { - it.Next() - continue - } - meta := blk.GetMeta().(*catalog.BlockEntry) - metas = append(metas, meta) - it.Next() - } - for _, meta := range metas { - task, err := jobs.NewCompactBlockTask(nil, txn, meta, e.Scheduler) - assert.NoError(t, err) - err = task.OnExec() - assert.NoError(t, err) - } - assert.NoError(t, txn.Commit()) -} - func TestCompoundPK1(t *testing.T) { tae := initDB(t, nil) defer tae.Close() @@ -156,7 +129,7 @@ func TestCompoundPK1(t *testing.T) { assert.NoError(t, txn.Commit()) - compactBlocks(t, tae, "db", schema) + compactBlocks(t, tae, "db", schema, false) // TODO // txn, _ = tae.StartTxn(nil) diff --git a/pkg/vm/engine/tae/db/db_test.go b/pkg/vm/engine/tae/db/db_test.go index c40542c27757500e9906ea9ad99858dda1037614..22308b168512f919231b35e40989fd07a64a8a8d 100644 --- a/pkg/vm/engine/tae/db/db_test.go +++ b/pkg/vm/engine/tae/db/db_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/dataio/mockio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" gbat "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/vector" @@ -42,35 +42,16 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - ModuleName = "TAEDB" -) - -func initDB(t *testing.T, opts *options.Options) *DB { - mockio.ResetFS() - dir := testutils.InitTestEnv(ModuleName, t) - db, _ := Open(dir, opts) - return db -} - func TestAppend(t *testing.T) { db := initDB(t, nil) defer db.Close() - txn, _ := db.StartTxn(nil) schema := catalog.MockSchemaAll(14, 3) schema.BlockMaxRows = options.DefaultBlockMaxRows schema.SegmentMaxBlocks = options.DefaultBlocksPerSegment data := catalog.MockData(schema, schema.BlockMaxRows*2) - now := time.Now() bats := compute.SplitBatch(data, 4) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := database.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bats[0]) - assert.Nil(t, err) - t.Log(vector.Length(bats[0].Vecs[0])) - assert.Nil(t, txn.Commit()) + now := time.Now() + createRelationAndAppend(t, db, "db", schema, bats[0], true) t.Log(time.Since(now)) t.Log(vector.Length(bats[0].Vecs[0])) @@ -80,16 +61,7 @@ func TestAppend(t *testing.T) { assert.Nil(t, err) rel, err := database.GetRelationByName(schema.Name) assert.Nil(t, err) - { - txn, _ := db.StartTxn(nil) - database, err := txn.GetDatabase("db") - assert.Nil(t, err) - rel, err := database.GetRelationByName(schema.Name) - assert.Nil(t, err) - err = rel.Append(bats[1]) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + appendClosure(t, bats[1], schema.Name, db, nil)() err = rel.Append(bats[2]) assert.Nil(t, err) assert.Nil(t, txn.Commit()) @@ -97,24 +69,13 @@ func TestAppend(t *testing.T) { } func TestAppend2(t *testing.T) { - opts := new(options.Options) - opts.CheckpointCfg = new(options.CheckpointCfg) - opts.CheckpointCfg.ScannerInterval = 10 - opts.CheckpointCfg.ExecutionLevels = 2 - opts.CheckpointCfg.ExecutionInterval = 10 + opts := config.WithQuickScanAndCKPOpts(nil) db := initDB(t, opts) defer db.Close() schema := catalog.MockSchemaAll(13, 3) schema.BlockMaxRows = 400 schema.SegmentMaxBlocks = 10 - { - txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - _, err = database.CreateRelation(schema) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + createRelation(t, db, "db", schema, true) totalRows := uint64(schema.BlockMaxRows * 30) bat := catalog.MockData(schema, uint32(totalRows)) @@ -123,24 +84,10 @@ func TestAppend2(t *testing.T) { var wg sync.WaitGroup pool, _ := ants.NewPool(80) - doAppend := func(data *gbat.Batch) func() { - return func() { - defer wg.Done() - txn, _ := db.StartTxn(nil) - database, err := txn.GetDatabase("db") - assert.Nil(t, err) - rel, err := database.GetRelationByName(schema.Name) - assert.Nil(t, err) - err = rel.Append(data) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } - } - start := time.Now() for _, data := range bats { wg.Add(1) - err := pool.Submit(doAppend(data)) + err := pool.Submit(appendClosure(t, data, schema.Name, db, &wg)) assert.Nil(t, err) } wg.Wait() @@ -179,13 +126,7 @@ func TestAppend2(t *testing.T) { } func TestAppend3(t *testing.T) { - opts := new(options.Options) - opts.CheckpointCfg = new(options.CheckpointCfg) - opts.CheckpointCfg.ScannerInterval = 10 - opts.CheckpointCfg.ExecutionLevels = 2 - opts.CheckpointCfg.ExecutionInterval = 10 - opts.CheckpointCfg.CatalogCkpInterval = 10 - opts.CheckpointCfg.CatalogUnCkpLimit = 1 + opts := config.WithQuickScanAndCKPOpts(nil) tae := initDB(t, opts) defer tae.Close() schema := catalog.MockSchema(2, 0) @@ -269,14 +210,8 @@ func TestNonAppendableBlock(t *testing.T) { bat := catalog.MockData(schema, 8) - { - txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - _, err = database.CreateRelation(schema) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + createRelation(t, db, "db", schema, true) + { txn, _ := db.StartTxn(nil) database, err := txn.GetDatabase("db") @@ -344,14 +279,8 @@ func TestCreateSegment(t *testing.T) { assert.Nil(t, txn.Commit()) bat := catalog.MockData(schema, 5) - txn, _ = tae.StartTxn(nil) - db, err = txn.GetDatabase("db") - assert.Nil(t, err) - rel, err = db.GetRelationByName(schema.Name) - assert.Nil(t, err) - err = rel.Append(bat) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) + + appendClosure(t, bat, schema.Name, tae, nil)() segCnt := 0 processor := new(catalog.LoopProcessor) @@ -366,29 +295,15 @@ func TestCreateSegment(t *testing.T) { } func TestCompactBlock1(t *testing.T) { - opts := new(options.Options) - opts.CheckpointCfg = new(options.CheckpointCfg) - opts.CheckpointCfg.ScannerInterval = 10000 - opts.CheckpointCfg.ExecutionLevels = 20 - opts.CheckpointCfg.ExecutionInterval = 20000 + opts := config.WithLongScanAndCKPOpts(nil) db := initDB(t, opts) defer db.Close() schema := catalog.MockSchemaAll(13, 2) schema.BlockMaxRows = 10 schema.SegmentMaxBlocks = 4 bat := catalog.MockData(schema, schema.BlockMaxRows) - { - txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := database.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bat) - assert.Nil(t, err) - err = txn.Commit() - assert.Nil(t, err) - t.Log(db.Opts.Catalog.SimplePPString(common.PPL1)) - } + createRelationAndAppend(t, db, "db", schema, bat, true) + t.Log(db.Opts.Catalog.SimplePPString(common.PPL1)) v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 2) filter := handle.Filter{ @@ -530,17 +445,7 @@ func TestCompactBlock2(t *testing.T) { schema.BlockMaxRows = 20 schema.SegmentMaxBlocks = 2 bat := catalog.MockData(schema, schema.BlockMaxRows) - { - txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := database.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bat) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } - ctx := &tasks.Context{Waitable: true} + createRelationAndAppend(t, db, "db", schema, bat, true) var newBlockFp *common.ID { txn, _ := db.StartTxn(nil) @@ -554,7 +459,7 @@ func TestCompactBlock2(t *testing.T) { block = it.GetBlock() break } - task, err := jobs.NewCompactBlockTask(ctx, txn, block.GetMeta().(*catalog.BlockEntry), db.Scheduler) + task, err := jobs.NewCompactBlockTask(tasks.WaitableCtx, txn, block.GetMeta().(*catalog.BlockEntry), db.Scheduler) assert.Nil(t, err) worker.SendOp(task) err = task.WaitDone() @@ -592,7 +497,7 @@ func TestCompactBlock2(t *testing.T) { assert.Nil(t, err) blk, err := seg.GetBlock(newBlockFp.BlockID) assert.Nil(t, err) - task, err := jobs.NewCompactBlockTask(ctx, txn, blk.GetMeta().(*catalog.BlockEntry), db.Scheduler) + task, err := jobs.NewCompactBlockTask(tasks.WaitableCtx, txn, blk.GetMeta().(*catalog.BlockEntry), db.Scheduler) assert.Nil(t, err) worker.SendOp(task) err = task.WaitDone() @@ -627,7 +532,7 @@ func TestCompactBlock2(t *testing.T) { } assert.Equal(t, 1, cnt) - task, err := jobs.NewCompactBlockTask(ctx, txn, blk.GetMeta().(*catalog.BlockEntry), db.Scheduler) + task, err := jobs.NewCompactBlockTask(tasks.WaitableCtx, txn, blk.GetMeta().(*catalog.BlockEntry), db.Scheduler) assert.Nil(t, err) worker.SendOp(task) err = task.WaitDone() @@ -683,7 +588,7 @@ func TestCompactBlock2(t *testing.T) { err = blk2.RangeDelete(7, 7) assert.Nil(t, err) - task, err := jobs.NewCompactBlockTask(ctx, txn, blk.GetMeta().(*catalog.BlockEntry), db.Scheduler) + task, err := jobs.NewCompactBlockTask(tasks.WaitableCtx, txn, blk.GetMeta().(*catalog.BlockEntry), db.Scheduler) assert.Nil(t, err) worker.SendOp(task) err = task.WaitDone() @@ -696,13 +601,7 @@ func TestCompactBlock2(t *testing.T) { } func TestAutoCompactABlk1(t *testing.T) { - opts := new(options.Options) - opts.CheckpointCfg = new(options.CheckpointCfg) - opts.CheckpointCfg.ScannerInterval = 10 - opts.CheckpointCfg.ExecutionLevels = 5 - opts.CheckpointCfg.ExecutionInterval = 1 - opts.CheckpointCfg.CatalogCkpInterval = 10 - opts.CheckpointCfg.CatalogUnCkpLimit = 1 + opts := config.WithQuickScanAndCKPOpts(nil) tae := initDB(t, opts) defer tae.Close() schema := catalog.MockSchemaAll(13, 3) @@ -711,16 +610,7 @@ func TestAutoCompactABlk1(t *testing.T) { totalRows := schema.BlockMaxRows / 5 bat := catalog.MockData(schema, totalRows) - { - txn, _ := tae.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := database.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bat) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + createRelationAndAppend(t, tae, "db", schema, bat, true) err := tae.Catalog.Checkpoint(tae.Scheduler.GetSafeTS()) assert.Nil(t, err) testutils.WaitExpect(1000, func() bool { @@ -752,12 +642,7 @@ func TestAutoCompactABlk2(t *testing.T) { opts.CacheCfg = new(options.CacheCfg) opts.CacheCfg.InsertCapacity = common.K * 5 opts.CacheCfg.TxnCapacity = common.M - opts.CheckpointCfg = new(options.CheckpointCfg) - opts.CheckpointCfg.ScannerInterval = 2 - opts.CheckpointCfg.ExecutionLevels = 2 - opts.CheckpointCfg.ExecutionInterval = 2 - opts.CheckpointCfg.CatalogCkpInterval = 1 - opts.CheckpointCfg.CatalogUnCkpLimit = 1 + opts = config.WithQuickScanAndCKPOpts(opts) db := initDB(t, opts) defer db.Close() @@ -784,19 +669,6 @@ func TestAutoCompactABlk2(t *testing.T) { pool, err := ants.NewPool(20) assert.Nil(t, err) var wg sync.WaitGroup - doFn := func(name string, data *gbat.Batch) func() { - return func() { - defer wg.Done() - txn, _ := db.StartTxn(nil) - database, err := txn.GetDatabase("db") - assert.Nil(t, err) - rel, err := database.GetRelationByName(name) - assert.Nil(t, err) - err = rel.Append(data) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } - } doSearch := func(name string) func() { return func() { defer wg.Done() @@ -823,9 +695,9 @@ func TestAutoCompactABlk2(t *testing.T) { assert.Nil(t, err) err = pool.Submit(doSearch(schema2.Name)) assert.Nil(t, err) - err = pool.Submit(doFn(schema1.Name, data)) + err = pool.Submit(appendClosure(t, data, schema1.Name, db, &wg)) assert.Nil(t, err) - err = pool.Submit(doFn(schema2.Name, data)) + err = pool.Submit(appendClosure(t, data, schema2.Name, db, &wg)) assert.Nil(t, err) } wg.Wait() @@ -848,16 +720,7 @@ func TestCompactABlk(t *testing.T) { totalRows := schema.BlockMaxRows / 5 bat := catalog.MockData(schema, totalRows) - { - txn, _ := tae.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := database.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bat) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + createRelationAndAppend(t, tae, "db", schema, bat, true) { txn, _ := tae.StartTxn(nil) database, err := txn.GetDatabase("db") @@ -889,12 +752,7 @@ func TestRollback1(t *testing.T) { defer db.Close() schema := catalog.MockSchema(2, 0) - txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - _, err = database.CreateRelation(schema) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) + createRelation(t, db, "db", schema, true) segCnt := 0 onSegFn := func(segment *catalog.SegmentEntry) error { @@ -909,8 +767,8 @@ func TestRollback1(t *testing.T) { processor := new(catalog.LoopProcessor) processor.SegmentFn = onSegFn processor.BlockFn = onBlkFn - txn, _ = db.StartTxn(nil) - database, err = txn.GetDatabase("db") + txn, _ := db.StartTxn(nil) + database, err := txn.GetDatabase("db") assert.Nil(t, err) rel, err := database.GetRelationByName(schema.Name) assert.Nil(t, err) @@ -976,12 +834,10 @@ func TestMVCC1(t *testing.T) { bats := compute.SplitBatch(bat, 40) txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := database.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bats[0]) - assert.Nil(t, err) + database, _ := txn.CreateDatabase("db") + rel, _ := database.CreateRelation(schema) + err := rel.Append(bats[0]) + assert.NoError(t, err) row := uint32(5) expectVal := compute.GetValue(bats[0].Vecs[schema.GetSingleSortKeyIdx()], row) @@ -1154,35 +1010,13 @@ func TestUnload1(t *testing.T) { bat := catalog.MockData(schema, schema.BlockMaxRows*2) bats := compute.SplitBatch(bat, int(schema.BlockMaxRows)) - - { - txn, _ := db.StartTxn(nil) - database, err := txn.CreateDatabase("db") - assert.Nil(t, err) - _, err = database.CreateRelation(schema) - assert.Nil(t, err) - // rel.Append(bat) - assert.Nil(t, txn.Commit()) - } + createRelation(t, db, "db", schema, true) var wg sync.WaitGroup - doAppend := func(data *gbat.Batch) func() { - return func() { - defer wg.Done() - txn, _ := db.StartTxn(nil) - database, err := txn.GetDatabase("db") - assert.Nil(t, err) - rel, err := database.GetRelationByName(schema.Name) - assert.Nil(t, err) - err = rel.Append(data) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } - } pool, err := ants.NewPool(1) assert.Nil(t, err) for _, data := range bats { wg.Add(1) - err := pool.Submit(doAppend(data)) + err := pool.Submit(appendClosure(t, data, schema.Name, db, &wg)) assert.Nil(t, err) } wg.Wait() @@ -1238,27 +1072,13 @@ func TestUnload2(t *testing.T) { p, err := ants.NewPool(10) assert.Nil(t, err) var wg sync.WaitGroup - doFn := func(name string, data *gbat.Batch) func() { - return func() { - defer wg.Done() - txn, _ := db.StartTxn(nil) - database, err := txn.GetDatabase("db") - assert.Nil(t, err) - rel, err := database.GetRelationByName(name) - assert.Nil(t, err) - err = rel.Append(data) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } - } - for i, data := range bats { wg.Add(1) name := schema1.Name if i%2 == 1 { name = schema2.Name } - err := p.Submit(doFn(name, data)) + err := p.Submit(appendClosure(t, data, name, db, &wg)) assert.Nil(t, err) } wg.Wait() @@ -1299,17 +1119,7 @@ func TestDelete1(t *testing.T) { schema := catalog.MockSchemaAll(3, 2) schema.BlockMaxRows = 10 bat := catalog.MockData(schema, schema.BlockMaxRows) - - { - txn, _ := tae.StartTxn(nil) - db, err := txn.CreateDatabase("db") - assert.Nil(t, err) - rel, err := db.CreateRelation(schema) - assert.Nil(t, err) - err = rel.Append(bat) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + createRelationAndAppend(t, tae, "db", schema, bat, true) var id *common.ID var row uint32 { @@ -1411,16 +1221,7 @@ func TestLogIndex1(t *testing.T) { schema.BlockMaxRows = 10 bat := catalog.MockData(schema, schema.BlockMaxRows) bats := compute.SplitBatch(bat, int(schema.BlockMaxRows)) - { - txn, _ := tae.StartTxn(nil) - db, err := txn.CreateDatabase("db") - assert.Nil(t, err) - _, err = db.CreateRelation(schema) - assert.Nil(t, err) - // err := rel.Append(bat) - // assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } + createRelation(t, tae, "db", schema, true) txns := make([]txnif.AsyncTxn, 0) doAppend := func(data *gbat.Batch) func() { return func() { @@ -1928,16 +1729,11 @@ func TestUpdatePrimaryKey(t *testing.T) { defer tae.Close() schema := catalog.MockSchemaAll(13, 12) bat := catalog.MockData(schema, 100) - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - rel, _ := db.CreateRelation(schema) - err := rel.Append(bat) - assert.NoError(t, err) - assert.NoError(t, txn.Commit()) + createRelationAndAppend(t, tae, "db", schema, bat, true) - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") + rel, _ := db.GetRelationByName(schema.Name) v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 2) filter := handle.NewEQFilter(v) id, row, err := rel.GetByFilter(filter) @@ -1955,17 +1751,12 @@ func TestADA(t *testing.T) { bat := catalog.MockData(schema, 1) // Append to a block - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - rel, _ := db.CreateRelation(schema) - err := rel.Append(bat) - assert.NoError(t, err) - assert.NoError(t, txn.Commit()) + createRelationAndAppend(t, tae, "db", schema, bat, true) // Delete a row from the block - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") + rel, _ := db.GetRelationByName(schema.Name) v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 0) filter := handle.NewEQFilter(v) id, row, err := rel.GetByFilter(filter) @@ -2090,20 +1881,15 @@ func TestUpdateByFilter(t *testing.T) { schema := catalog.MockSchemaAll(13, 3) bat := catalog.MockData(schema, 100) - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - rel, _ := db.CreateRelation(schema) - err := rel.Append(bat) - assert.NoError(t, err) - assert.NoError(t, txn.Commit()) + createRelationAndAppend(t, tae, "db", schema, bat, true) - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") + rel, _ := db.GetRelationByName(schema.Name) v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 2) filter := handle.NewEQFilter(v) - err = rel.UpdateByFilter(filter, 2, int32(2222)) + err := rel.UpdateByFilter(filter, 2, int32(2222)) assert.NoError(t, err) id, row, err := rel.GetByFilter(filter) @@ -2133,12 +1919,7 @@ func TestGetByFilter(t *testing.T) { bat := catalog.MockData(schema, 10) // Step 1 - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - rel, _ := db.CreateRelation(schema) - err := rel.Append(bat) - assert.NoError(t, err) - assert.NoError(t, txn.Commit()) + createRelationAndAppend(t, tae, "db", schema, bat, true) // Step 2 v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 2) @@ -2146,8 +1927,8 @@ func TestGetByFilter(t *testing.T) { // Step 3 txn1, _ := tae.StartTxn(nil) - db, _ = txn1.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) + db, _ := txn1.GetDatabase("db") + rel, _ := db.GetRelationByName(schema.Name) id, row, err := rel.GetByFilter(filter) assert.NoError(t, err) @@ -2188,11 +1969,7 @@ func TestChaos1(t *testing.T) { schema.SegmentMaxBlocks = 2 bat := catalog.MockData(schema, 1) - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - _, err := db.CreateRelation(schema) - assert.NoError(t, err) - assert.NoError(t, txn.Commit()) + createRelation(t, tae, "db", schema, true) v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 0) filter := handle.NewEQFilter(v) @@ -2245,13 +2022,14 @@ func TestChaos1(t *testing.T) { t.Logf("AppendCnt: %d", appendCnt) t.Logf("DeleteCnt: %d", deleteCnt) assert.True(t, appendCnt-deleteCnt <= 1) - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") rel, _ := db.GetRelationByName(schema.Name) assert.Equal(t, int64(appendCnt-deleteCnt), rel.Rows()) it := rel.MakeBlockIt() blk := it.GetBlock() view, err := blk.GetColumnDataById(schema.GetSingleSortKeyIdx(), nil, nil) + assert.NoError(t, err) assert.Equal(t, int(appendCnt), view.Length()) view.ApplyDeletes() t.Log(view.DeleteMask.String()) @@ -2276,12 +2054,7 @@ func TestSnapshotIsolation1(t *testing.T) { filter := handle.NewEQFilter(v) // Step 1 - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - rel, _ := db.CreateRelation(schema) - err := rel.Append(bat) - assert.NoError(t, err) - assert.NoError(t, txn.Commit()) + createRelationAndAppend(t, tae, "db", schema, bat, true) // Step 2 txn1, _ := tae.StartTxn(nil) @@ -2292,7 +2065,7 @@ func TestSnapshotIsolation1(t *testing.T) { txn2, _ := tae.StartTxn(nil) db2, _ := txn2.GetDatabase("db") rel2, _ := db2.GetRelationByName(schema.Name) - err = rel2.UpdateByFilter(filter, 3, int64(2222)) + err := rel2.UpdateByFilter(filter, 3, int64(2222)) assert.NoError(t, err) assert.NoError(t, txn2.Commit()) @@ -2317,9 +2090,9 @@ func TestSnapshotIsolation1(t *testing.T) { assert.NoError(t, err) assert.NoError(t, txn3.Commit()) - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") + rel, _ := db.GetRelationByName(schema.Name) id, row, err = rel.GetByFilter(filter) assert.NoError(t, err) v, err = rel.GetValue(id, row, 3) @@ -2345,10 +2118,7 @@ func TestSnapshotIsolation2(t *testing.T) { v := compute.GetValue(bat.Vecs[schema.GetSingleSortKeyIdx()], 0) filter := handle.NewEQFilter(v) - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - _, _ = db.CreateRelation(schema) - assert.NoError(t, txn.Commit()) + createRelation(t, tae, "db", schema, true) // Step 1 txn1, _ := tae.StartTxn(nil) diff --git a/pkg/vm/engine/tae/db/gc_test.go b/pkg/vm/engine/tae/db/gc_test.go index c16972dfa59b7118d6c4c5858ee2186f58905355..121a6d91455b223c86217df2ed52622167e73cee 100644 --- a/pkg/vm/engine/tae/db/gc_test.go +++ b/pkg/vm/engine/tae/db/gc_test.go @@ -15,44 +15,21 @@ package db import ( + "math/rand" "sync" "testing" + "time" - gbat "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/compute" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/assert" ) -func appendFailClosure(t *testing.T, data *gbat.Batch, name string, e *DB, wg *sync.WaitGroup) func() { - return func() { - defer wg.Done() - txn, _ := e.StartTxn(nil) - database, _ := txn.GetDatabase("db") - rel, _ := database.GetRelationByName(name) - err := rel.Append(data) - assert.NotNil(t, err) - assert.Nil(t, txn.Rollback()) - } -} - -func appendClosure(t *testing.T, data *gbat.Batch, name string, e *DB, wg *sync.WaitGroup) func() { - return func() { - defer wg.Done() - txn, _ := e.StartTxn(nil) - database, _ := txn.GetDatabase("db") - rel, _ := database.GetRelationByName(name) - err := rel.Append(data) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } -} - func TestGCBlock1(t *testing.T) { tae := initDB(t, nil) defer tae.Close() @@ -61,16 +38,11 @@ func TestGCBlock1(t *testing.T) { schema.SegmentMaxBlocks = 2 bat := catalog.MockData(schema, schema.BlockMaxRows) - txn, _ := tae.StartTxn(nil) - db, _ := txn.CreateDatabase("db") - rel, _ := db.CreateRelation(schema) - err := rel.Append(bat) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) + createRelationAndAppend(t, tae, "db", schema, bat, true) - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") + rel, _ := db.GetRelationByName(schema.Name) it := rel.MakeBlockIt() blk := it.GetBlock() meta := blk.GetMeta().(*catalog.BlockEntry) @@ -96,13 +68,7 @@ func TestGCBlock1(t *testing.T) { } func TestAutoGC1(t *testing.T) { - opts := new(options.Options) - opts.CheckpointCfg = new(options.CheckpointCfg) - opts.CheckpointCfg.ScannerInterval = 10 - opts.CheckpointCfg.ExecutionLevels = 5 - opts.CheckpointCfg.ExecutionInterval = 1 - opts.CheckpointCfg.CatalogCkpInterval = 5 - opts.CheckpointCfg.CatalogUnCkpLimit = 1 + opts := config.WithQuickScanAndCKPOpts(nil) tae := initDB(t, opts) defer tae.Close() schema := catalog.MockSchemaAll(13, 3) @@ -112,14 +78,8 @@ func TestAutoGC1(t *testing.T) { totalRows := schema.BlockMaxRows * 21 / 2 bat := catalog.MockData(schema, totalRows) bats := compute.SplitBatch(bat, 100) + createRelation(t, tae, "db", schema, true) pool, _ := ants.NewPool(50) - { - txn, _ := tae.StartTxn(nil) - database, _ := txn.CreateDatabase("db") - _, err := database.CreateRelation(schema) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } var wg sync.WaitGroup for _, data := range bats { wg.Add(1) @@ -146,3 +106,216 @@ func TestAutoGC1(t *testing.T) { t.Logf("BlockCnt %d, Expect 12", cnt) assert.Equal(t, uint64(0), tae.Scheduler.GetPenddingLSNCnt()) } + +// Test Steps +// 1. Create a table w/o data and commit +// 2. Drop the table and commit +// 3. Create a table w one appendable block data and commit +// 4. Drop the table and commit +func TestGCTable(t *testing.T) { + opts := config.WithQuickScanAndCKPOpts(nil) + tae := initDB(t, opts) + defer tae.Close() + schema := catalog.MockSchemaAll(13, 12) + schema.BlockMaxRows = 10 + schema.SegmentMaxBlocks = 2 + + // 1. Create a table without data + db, _ := createRelation(t, tae, "db", schema, true) + + // 2. Drop the table + dropRelation(t, tae, "db", schema.Name) + + dbEntry, _ := tae.Catalog.GetDatabaseByID(db.GetID()) + now := time.Now() + testutils.WaitExpect(1000, func() bool { + return dbEntry.CoarseTableCnt() == 0 + }) + assert.Equal(t, 0, dbEntry.CoarseTableCnt()) + t.Logf("Takes: %s", time.Since(now)) + printCheckpointStats(t, tae) + + bat := catalog.MockData(schema, schema.BlockMaxRows*uint32(schema.SegmentMaxBlocks+1)-1) + bats := compute.SplitBatch(bat, 4) + + // 3. Create a table and append 7 rows + db, _ = createRelationAndAppend(t, tae, "db", schema, bats[0], false) + + names := getSegmentFileNames(tae.Dir) + assert.Equal(t, 1, len(names)) + + // 4. Drop the table + dropRelation(t, tae, "db", schema.Name) + + dbEntry, _ = tae.Catalog.GetDatabaseByID(db.GetID()) + now = time.Now() + testutils.WaitExpect(1000, func() bool { + return dbEntry.CoarseTableCnt() == 0 + }) + assert.Equal(t, 0, dbEntry.CoarseTableCnt()) + t.Logf("Takes: %s", time.Since(now)) + printCheckpointStats(t, tae) + names = getSegmentFileNames(tae.Dir) + assert.Equal(t, 0, len(names)) + + // 5. Create a table and append 3 block + db, _ = createRelationAndAppend(t, tae, "db", schema, bat, false) + names = getSegmentFileNames(tae.Dir) + t.Log(names) + assert.Equal(t, 2, len(names)) + printCheckpointStats(t, tae) + + compactBlocks(t, tae, "db", schema, true) + + // 6. Drop the table + dropRelation(t, tae, "db", schema.Name) + testutils.WaitExpect(200, func() bool { + return dbEntry.CoarseTableCnt() == 0 + }) + names = getSegmentFileNames(tae.Dir) + printCheckpointStats(t, tae) + t.Log(names) + assert.Equal(t, 0, dbEntry.CoarseTableCnt()) + names = getSegmentFileNames(tae.Dir) + assert.Equal(t, 0, len(names)) + + // 7. Create a table + db, _ = createRelation(t, tae, "db", schema, false) + + // 8. Append blocks and drop + var wg sync.WaitGroup + pool, _ := ants.NewPool(5) + bat = catalog.MockData(schema, schema.BlockMaxRows*10) + bats = compute.SplitBatch(bat, 20) + for i := range bats[:10] { + wg.Add(1) + _ = pool.Submit(tryAppendClosure(t, bats[i], schema.Name, tae, &wg)) + } + wg.Add(1) + _ = pool.Submit(func() { + defer wg.Done() + dropRelation(t, tae, "db", schema.Name) + }) + for i := range bats[10:] { + wg.Add(1) + _ = pool.Submit(tryAppendClosure(t, bats[i+10], schema.Name, tae, &wg)) + } + wg.Wait() + printCheckpointStats(t, tae) + testutils.WaitExpect(2000, func() bool { + return dbEntry.CoarseTableCnt() == 0 + }) + printCheckpointStats(t, tae) + assert.Equal(t, 0, dbEntry.CoarseTableCnt()) + names = getSegmentFileNames(tae.Dir) + assert.Equal(t, 0, len(names)) + // t.Log(common.GPool.String()) +} + +// Test Steps +// 1. Create a db with 2 tables w/o data +// 2. Drop the db +func TestGCDB(t *testing.T) { + opts := config.WithQuickScanAndCKPOpts(nil) + tae := initDB(t, opts) + defer tae.Close() + + schema1 := catalog.MockSchema(13, 12) + schema1.BlockMaxRows = 10 + schema1.SegmentMaxBlocks = 2 + schema2 := catalog.MockSchema(13, 12) + schema2.BlockMaxRows = 10 + schema2.SegmentMaxBlocks = 2 + + createRelation(t, tae, "db", schema1, true) + createRelation(t, tae, "db", schema2, false) + dropDB(t, tae, "db") + testutils.WaitExpect(1000, func() bool { + return tae.Catalog.CoarseDBCnt() == 1 + }) + printCheckpointStats(t, tae) + assert.Equal(t, 1, tae.Catalog.CoarseDBCnt()) + + bat1 := catalog.MockData(schema1, schema1.BlockMaxRows*3-1) + bat2 := catalog.MockData(schema2, schema2.BlockMaxRows*3-1) + + createRelation(t, tae, "db", schema1, true) + createRelation(t, tae, "db", schema2, false) + appendClosure(t, bat1, schema1.Name, tae, nil)() + appendClosure(t, bat2, schema2.Name, tae, nil)() + dropDB(t, tae, "db") + + testutils.WaitExpect(2000, func() bool { + return tae.Catalog.CoarseDBCnt() == 1 + }) + t.Log(tae.Catalog.SimplePPString(common.PPL1)) + assert.Equal(t, 1, tae.Catalog.CoarseDBCnt()) + names := getSegmentFileNames(tae.Dir) + assert.Equal(t, 0, len(names)) + + createRelation(t, tae, "db", schema1, true) + createRelation(t, tae, "db", schema2, false) + appendClosure(t, bat1, schema1.Name, tae, nil)() + appendClosure(t, bat2, schema2.Name, tae, nil)() + compactBlocks(t, tae, "db", schema1, true) + compactBlocks(t, tae, "db", schema2, true) + dropDB(t, tae, "db") + + testutils.WaitExpect(2000, func() bool { + return tae.Catalog.CoarseDBCnt() == 1 + }) + assert.Equal(t, 1, tae.Catalog.CoarseDBCnt()) + names = getSegmentFileNames(tae.Dir) + assert.Equal(t, 0, len(names)) + + createDB(t, tae, "db") + + var wg sync.WaitGroup + pool, _ := ants.NewPool(4) + routine := func() { + defer wg.Done() + schema := catalog.MockSchema(3, 2) + schema.BlockMaxRows = 10 + schema.SegmentMaxBlocks = 2 + bat := catalog.MockData(schema, schema.BlockMaxRows*uint32(rand.Intn(4)+1)-1) + txn, _ := tae.StartTxn(nil) + db, err := txn.GetDatabase("db") + if err != nil { + _ = txn.Rollback() + return + } + rel, err := db.CreateRelation(schema) + assert.NoError(t, err) + err = txn.Commit() + if err != nil { + return + } + + txn, _ = tae.StartTxn(nil) + db, err = txn.GetDatabase("db") + if err != nil { + _ = txn.Rollback() + return + } + rel, err = db.GetRelationByName(schema.Name) + assert.NoError(t, err) + err = rel.Append(bat) + assert.NoError(t, err) + _ = txn.Commit() + } + for i := 0; i < 8; i++ { + wg.Add(1) + _ = pool.Submit(routine) + } + dropDB(t, tae, "db") + wg.Wait() + + testutils.WaitExpect(5000, func() bool { + return tae.Catalog.CoarseDBCnt() == 1 + }) + assert.Equal(t, 1, tae.Catalog.CoarseDBCnt()) + names = getSegmentFileNames(tae.Dir) + assert.Equal(t, 0, len(names)) + t.Log(tae.Catalog.SimplePPString(common.PPL1)) + printCheckpointStats(t, tae) +} diff --git a/pkg/vm/engine/tae/db/gcop.go b/pkg/vm/engine/tae/db/gcop.go index 6dbde8450d3263df5b4b1b1e19dc65083366b914..2532cc0c637ede9b6d534ae8d12f58137e25c957 100644 --- a/pkg/vm/engine/tae/db/gcop.go +++ b/pkg/vm/engine/tae/db/gcop.go @@ -21,58 +21,112 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" ) +type GCType int16 + +const ( + GCType_Block GCType = iota + GCType_Segment + GCType_Table + GCType_DB +) + // Destroy is not thread-safe -func gcBlockClosure(entry *catalog.BlockEntry) tasks.FuncT { - return func() error { +func gcBlockClosure(entry *catalog.BlockEntry, gct GCType) tasks.FuncT { + return func() (err error) { logutil.Debugf("[GCBLK] | %s | Started", entry.Repr()) + defer func() { + if err == nil { + logutil.Infof("[GCBLK] | %s | Removed", entry.Repr()) + } else { + logutil.Warnf("Cannot remove block %s, maybe removed before", entry.String()) + } + }() segment := entry.GetSegment() - segment.RLock() - segDropped := segment.IsDroppedCommitted() - segment.RUnlock() - err := entry.DestroyData() - if err != nil { - return err + if err = entry.DestroyData(); err != nil { + return } - if !segDropped && entry.IsAppendable() { - return nil + // For appendable segment, keep all soft-deleted blocks until the segment is soft-deleted + if gct == GCType_Block && entry.IsAppendable() { + return } err = segment.RemoveEntry(entry) - logutil.Infof("[GCBLK] | %s | Removed", entry.Repr()) - if err != nil { - logutil.Warnf("Cannot remove block %s, maybe removed before", entry.String()) - return err - } - return nil + return } } // Destroy is not thread-safe -func gcSegmentClosure(entry *catalog.SegmentEntry) tasks.FuncT { - return func() error { +func gcSegmentClosure(entry *catalog.SegmentEntry, gct GCType) tasks.FuncT { + return func() (err error) { + scopes := make([]common.ID, 0) logutil.Debugf("[GCSEG] | %s | Started", entry.Repr()) + defer func() { + if err != nil { + logutil.Warnf("Cannot remove segment %s, maybe removed before: %v", entry.String(), err) + } else { + logutil.Infof("[GCSEG] | %s | BLKS=%s | Removed", entry.Repr(), common.IDArraryString(scopes)) + } + }() table := entry.GetTable() - scopes := make([]common.ID, 0) it := entry.MakeBlockIt(false) for it.Valid() { blk := it.Get().GetPayload().(*catalog.BlockEntry) scopes = append(scopes, *blk.AsCommonID()) - err := gcBlockClosure(blk)() + err = gcBlockClosure(blk, gct)() if err != nil { - return err + return } it.Next() } - err := entry.DestroyData() - if err != nil { - return err + if err = entry.DestroyData(); err != nil { + return } err = table.RemoveEntry(entry) - logutil.Infof("[GCSEG] | %s | BLKS=%s | Removed", entry.Repr(), common.IDArraryString(scopes)) - if err != nil { - logutil.Warnf("Cannot remove segment %s, maybe removed before", entry.String()) - return err + return + } +} + +// TODO +func gcTableClosure(entry *catalog.TableEntry, gct GCType) tasks.FuncT { + return func() (err error) { + scopes := make([]common.ID, 0) + logutil.Infof("[GCTABLE] | %s | Started", entry.String()) + defer func() { + logutil.Infof("[GCTABLE] | %s | Ended: %v | SEGS=%s", entry.String(), err, common.IDArraryString(scopes)) + }() + dbEntry := entry.GetDB() + it := entry.MakeSegmentIt(false) + for it.Valid() { + seg := it.Get().GetPayload().(*catalog.SegmentEntry) + scopes = append(scopes, *seg.AsCommonID()) + if err = gcSegmentClosure(seg, gct)(); err != nil { + return + } + it.Next() + } + err = dbEntry.RemoveEntry(entry) + return + } +} + +// TODO +func gcDatabaseClosure(entry *catalog.DBEntry) tasks.FuncT { + return func() (err error) { + scopes := make([]common.ID, 0) + logutil.Infof("[GCDB] | %s | Started", entry.String()) + defer func() { + logutil.Infof("[GCDB] | %s | Ended: %v | TABLES=%s", entry.String(), err, common.IDArraryString(scopes)) + }() + it := entry.MakeTableIt(false) + for it.Valid() { + table := it.Get().GetPayload().(*catalog.TableEntry) + scopes = append(scopes, *table.AsCommonID()) + if err = gcTableClosure(table, GCType_DB)(); err != nil { + return + } + it.Next() } - return nil + err = entry.GetCatalog().RemoveEntry(entry) + return } } diff --git a/pkg/vm/engine/tae/db/helper.go b/pkg/vm/engine/tae/db/helper.go index 6212803914648d91383b318a440005c574f8fb62..8defcc7f8c6fff899ec8836fffa4738c3afbd658 100644 --- a/pkg/vm/engine/tae/db/helper.go +++ b/pkg/vm/engine/tae/db/helper.go @@ -15,10 +15,30 @@ package db import ( + "math" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" ) +var dbScope = common.ID{ + TableID: math.MaxUint64 / 2, +} + +func MakeDBScopes(entry *catalog.DBEntry) (scopes []common.ID) { + scope := dbScope + scope.SegmentID = entry.GetID() + scopes = append(scopes, scope) + return +} + +func MakeTableScopes(entries ...*catalog.TableEntry) (scopes []common.ID) { + for _, entry := range entries { + scopes = append(scopes, *entry.AsCommonID()) + } + return +} + func MakeSegmentScopes(entries ...*catalog.SegmentEntry) (scopes []common.ID) { for _, entry := range entries { scopes = append(scopes, *entry.AsCommonID()) diff --git a/pkg/vm/engine/tae/db/hidden_test.go b/pkg/vm/engine/tae/db/hidden_test.go index 9baf653c398f59e2dfec7c7eee0ff38251158c56..944c861ea03a0abe023a90ff96262a4c28716ee1 100644 --- a/pkg/vm/engine/tae/db/hidden_test.go +++ b/pkg/vm/engine/tae/db/hidden_test.go @@ -9,7 +9,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/compute" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" "github.com/stretchr/testify/assert" ) @@ -99,31 +98,8 @@ func TestHiddenWithPK1(t *testing.T) { assert.NoError(t, err) assert.NoError(t, txn.Commit()) - txn, _ = tae.StartTxn(nil) - db, _ = txn.GetDatabase("db") - rel, _ = db.GetRelationByName(schema.Name) - { - var metas []*catalog.BlockEntry - it := rel.MakeBlockIt() - for it.Valid() { - blk := it.GetBlock() - if blk.Rows() < int(schema.BlockMaxRows) { - it.Next() - continue - } - meta := blk.GetMeta().(*catalog.BlockEntry) - metas = append(metas, meta) - it.Next() - } - for _, meta := range metas { - task, err := jobs.NewCompactBlockTask(nil, txn, meta, tae.Scheduler) - assert.NoError(t, err) - err = task.OnExec() - assert.NoError(t, err) - } - } + compactBlocks(t, tae, "db", schema, false) - assert.NoError(t, txn.Commit()) txn, _ = tae.StartTxn(nil) db, _ = txn.GetDatabase("db") rel, _ = db.GetRelationByName(schema.Name) diff --git a/pkg/vm/engine/tae/db/replay_test.go b/pkg/vm/engine/tae/db/replay_test.go index a8f5c93edc7006374b0cd25f6ddc64ece03c6dde..f5db67cffa60b22b8d46ab417d43503339703732 100644 --- a/pkg/vm/engine/tae/db/replay_test.go +++ b/pkg/vm/engine/tae/db/replay_test.go @@ -10,10 +10,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/compute" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle" - - // "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" - // "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - // ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/assert" diff --git a/pkg/vm/engine/tae/db/scanner.go b/pkg/vm/engine/tae/db/scanner.go index f0e3a00651c5b657f6bdecf4b506c6a88b789257..b737cef06461a02f2c3b4018ac6b41f2aa4d3f98 100644 --- a/pkg/vm/engine/tae/db/scanner.go +++ b/pkg/vm/engine/tae/db/scanner.go @@ -15,6 +15,7 @@ package db import ( + "github.com/RoaringBitmap/roaring" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" @@ -44,6 +45,9 @@ type dbScanner struct { db *DB ops []ScannerOp errHandler ErrHandler + dbmask *roaring.Bitmap + tablemask *roaring.Bitmap + segmask *roaring.Bitmap } func (scanner *dbScanner) OnStopped() { @@ -51,6 +55,9 @@ func (scanner *dbScanner) OnStopped() { } func (scanner *dbScanner) OnExec() { + scanner.dbmask.Clear() + scanner.tablemask.Clear() + scanner.segmask.Clear() for _, op := range scanner.ops { err := op.PreExecute() if err != nil { @@ -77,6 +84,9 @@ func NewDBScanner(db *DB, errHandler ErrHandler) *dbScanner { db: db, ops: make([]ScannerOp, 0), errHandler: errHandler, + dbmask: roaring.New(), + tablemask: roaring.New(), + segmask: roaring.New(), } scanner.BlockFn = scanner.onBlock scanner.SegmentFn = scanner.onSegment @@ -111,22 +121,47 @@ func (scanner *dbScanner) onPostSegment(entry *catalog.SegmentEntry) (err error) } func (scanner *dbScanner) onSegment(entry *catalog.SegmentEntry) (err error) { - for _, op := range scanner.ops { + scanner.segmask.Clear() + for i, op := range scanner.ops { + if scanner.tablemask.Contains(uint32(i)) { + scanner.segmask.Add(uint32(i)) + continue + } err = op.OnSegment(entry) + if err == catalog.ErrStopCurrRecur { + scanner.segmask.Add(uint32(i)) + } if err = scanner.errHandler.OnSegmentErr(entry, err); err != nil { break } } + if scanner.segmask.GetCardinality() == uint64(len(scanner.ops)) { + err = catalog.ErrStopCurrRecur + logutil.Infof("StopRecurScanSegment: %s", entry.String()) + } return } func (scanner *dbScanner) onTable(entry *catalog.TableEntry) (err error) { - for _, op := range scanner.ops { + scanner.tablemask.Clear() + for i, op := range scanner.ops { + // If the specified op was masked OnDatabase. skip it + if scanner.dbmask.Contains(uint32(i)) { + scanner.tablemask.Add(uint32(i)) + continue + } err = op.OnTable(entry) + if err == catalog.ErrStopCurrRecur { + scanner.tablemask.Add(uint32(i)) + } if err = scanner.errHandler.OnTableErr(entry, err); err != nil { break } } + if scanner.tablemask.GetCardinality() == uint64(len(scanner.ops)) { + err = catalog.ErrStopCurrRecur + logutil.Infof("StopRecurScanTable: %s", entry.String()) + } return } @@ -135,11 +170,19 @@ func (scanner *dbScanner) onDatabase(entry *catalog.DBEntry) (err error) { err = catalog.ErrStopCurrRecur return } - for _, op := range scanner.ops { + scanner.dbmask.Clear() + for i, op := range scanner.ops { err = op.OnDatabase(entry) + if err == catalog.ErrStopCurrRecur { + scanner.dbmask.Add(uint32(i)) + } if err = scanner.errHandler.OnDatabaseErr(entry, err); err != nil { break } } + if scanner.dbmask.GetCardinality() == uint64(len(scanner.ops)) { + err = catalog.ErrStopCurrRecur + logutil.Infof("StopRecurScanDatabase: %s", entry.String()) + } return } diff --git a/pkg/vm/engine/tae/db/scannerop.go b/pkg/vm/engine/tae/db/scannerop.go index f8c6b35b91ff7cba485cb43b233ab7fd6724797f..8c7aec7cc59751e1622de18052f1f854c3bdf385 100644 --- a/pkg/vm/engine/tae/db/scannerop.go +++ b/pkg/vm/engine/tae/db/scannerop.go @@ -41,6 +41,7 @@ func newCalibrationOp(db *DB) *calibrationOp { db: db, LoopProcessor: new(catalog.LoopProcessor), } + processor.TableFn = processor.onTable processor.BlockFn = processor.onBlock processor.SegmentFn = processor.onSegment processor.PostSegmentFn = processor.onPostSegment @@ -50,7 +51,17 @@ func newCalibrationOp(db *DB) *calibrationOp { func (processor *calibrationOp) PreExecute() error { return nil } func (processor *calibrationOp) PostExecute() error { return nil } +func (processor *calibrationOp) onTable(tableEntry *catalog.TableEntry) (err error) { + if !tableEntry.IsActive() { + err = catalog.ErrStopCurrRecur + } + return +} + func (processor *calibrationOp) onSegment(segmentEntry *catalog.SegmentEntry) (err error) { + if !segmentEntry.IsActive() { + err = catalog.ErrStopCurrRecur + } processor.blkCntOfSegment = 0 return } @@ -71,17 +82,18 @@ func (processor *calibrationOp) onPostSegment(segmentEntry *catalog.SegmentEntry } func (processor *calibrationOp) onBlock(blockEntry *catalog.BlockEntry) (err error) { + if !blockEntry.IsActive() { + // logutil.Infof("Noop for block %s: table or db was dropped", blockEntry.Repr()) + processor.blkCntOfSegment = 0 + return + } + blockEntry.RLock() // 1. Skip uncommitted entries if !blockEntry.IsCommitted() { blockEntry.RUnlock() return nil } - // 2. Skip committed dropped entries - if blockEntry.IsDroppedCommitted() { - blockEntry.RUnlock() - return nil - } if blockEntry.GetSegment().IsAppendable() && catalog.ActiveWithNoTxnFilter(blockEntry.BaseEntry) && catalog.NonAppendableBlkFilter(blockEntry) { processor.blkCntOfSegment++ } @@ -172,7 +184,8 @@ func (monitor *catalogStatsMonitor) onBlock(entry *catalog.BlockEntry) (err erro entry.RUnlock() if gcNeeded { scopes := MakeBlockScopes(entry) - _, err = monitor.db.Scheduler.ScheduleMultiScopedFn(nil, tasks.GCTask, scopes, gcBlockClosure(entry)) + // _, err = monitor.db.Scheduler.ScheduleMultiScopedFn(nil, tasks.GCTask, scopes, gcBlockClosure(entry, GCType_Block)) + _, err = monitor.db.Scheduler.ScheduleFn(nil, tasks.GCTask, gcBlockClosure(entry, GCType_Block)) logutil.Infof("[GCBLK] | %s | Scheduled | Err=%v | Scopes=%s", entry.Repr(), err, common.IDArraryString(scopes)) if err != nil { // if err == tasks.ErrScheduleScopeConflict { @@ -180,6 +193,16 @@ func (monitor *catalogStatsMonitor) onBlock(entry *catalog.BlockEntry) (err erro // } err = nil } + } else { + blkData := entry.GetBlockData() + ts, terminated := entry.GetTerminationTS() + if terminated && blkData.GetMaxCheckpointTS() < ts { + _, err = monitor.db.Scheduler.ScheduleScopedFn(nil, tasks.CheckpointTask, entry.AsCommonID(), blkData.CheckpointWALClosure(ts)) + if err != nil { + logutil.Warnf("CheckpointWALClosure %s: %v", entry.Repr(), err) + err = nil + } + } } return } @@ -201,15 +224,16 @@ func (monitor *catalogStatsMonitor) onSegment(entry *catalog.SegmentEntry) (err entry.RUnlock() if gcNeeded { scopes := MakeSegmentScopes(entry) - _, err = monitor.db.Scheduler.ScheduleMultiScopedFn(nil, tasks.GCTask, scopes, gcSegmentClosure(entry)) + _, err = monitor.db.Scheduler.ScheduleFn(nil, tasks.GCTask, gcSegmentClosure(entry, GCType_Segment)) logutil.Infof("[GCSEG] | %s | Scheduled | Err=%v | Scopes=%s", entry.Repr(), err, common.IDArraryString(scopes)) if err != nil { // if err != tasks.ErrScheduleScopeConflict { // logutil.Warnf("Schedule | [GC] | %s | Err=%s", entry.String(), err) // } err = nil + } else { + err = catalog.ErrStopCurrRecur } - err = catalog.ErrStopCurrRecur } return } @@ -217,13 +241,59 @@ func (monitor *catalogStatsMonitor) onSegment(entry *catalog.SegmentEntry) (err func (monitor *catalogStatsMonitor) onTable(entry *catalog.TableEntry) (err error) { if monitor.minTs <= monitor.maxTs && catalog.CheckpointSelectOp(entry.BaseEntry, monitor.minTs, monitor.maxTs) { monitor.unCheckpointedCnt++ + return + } + checkpointed := monitor.db.Scheduler.GetCheckpointedLSN() + gcNeeded := false + entry.RLock() + if entry.IsDroppedCommitted() { + if logIndex := entry.GetLogIndex(); logIndex != nil { + gcNeeded = checkpointed >= logIndex.LSN + } + } + entry.RUnlock() + if !gcNeeded { + return + } + + scopes := MakeTableScopes(entry) + // _, err = monitor.db.Scheduler.ScheduleMultiScopedFn(nil, tasks.GCTask, scopes, gcTableClosure(entry, GCType_Table)) + _, err = monitor.db.Scheduler.ScheduleFn(nil, tasks.GCTask, gcTableClosure(entry, GCType_Table)) + logutil.Infof("[GCTABLE] | %s | Scheduled | Err=%v | Scopes=%s", entry.String(), err, common.IDArraryString(scopes)) + if err != nil { + err = nil + } else { + err = catalog.ErrStopCurrRecur } return } func (monitor *catalogStatsMonitor) onDatabase(entry *catalog.DBEntry) (err error) { - if catalog.CheckpointSelectOp(entry.BaseEntry, monitor.minTs, monitor.maxTs) { + if monitor.minTs <= monitor.maxTs && catalog.CheckpointSelectOp(entry.BaseEntry, monitor.minTs, monitor.maxTs) { monitor.unCheckpointedCnt++ + return + } + checkpointed := monitor.db.Scheduler.GetCheckpointedLSN() + gcNeeded := false + entry.RLock() + if entry.IsDroppedCommitted() { + if logIndex := entry.GetLogIndex(); logIndex != nil { + gcNeeded = checkpointed >= logIndex.LSN + } + } + entry.RUnlock() + if !gcNeeded { + return + } + + scopes := MakeDBScopes(entry) + _, err = monitor.db.Scheduler.ScheduleFn(nil, tasks.GCTask, gcDatabaseClosure(entry)) + // _, err = monitor.db.Scheduler.ScheduleMultiScopedFn(nil, tasks.GCTask, scopes, gcDatabaseClosure(entry)) + logutil.Infof("[GCDB] | %s | Scheduled | Err=%v | Scopes=%s", entry.String(), err, common.IDArraryString(scopes)) + if err != nil { + err = nil + } else { + err = catalog.ErrStopCurrRecur } return } diff --git a/pkg/vm/engine/tae/db/scheduler.go b/pkg/vm/engine/tae/db/scheduler.go index 3ee0de7f63ee1c233e9685b8c59a52e9061af1f0..6fd2644d33af5e3c19930b8768bcf47eb9dbe02c 100644 --- a/pkg/vm/engine/tae/db/scheduler.go +++ b/pkg/vm/engine/tae/db/scheduler.go @@ -122,7 +122,10 @@ func newTaskScheduler(db *DB, asyncWorkers int, ioWorkers int) *taskScheduler { jobHandler := tasks.NewPoolHandler(asyncWorkers) jobHandler.Start() jobDispatcher.RegisterHandler(tasks.DataCompactionTask, jobHandler) - jobDispatcher.RegisterHandler(tasks.GCTask, jobHandler) + // jobDispatcher.RegisterHandler(tasks.GCTask, jobHandler) + gcHandler := tasks.NewSingleWorkerHandler("gc") + gcHandler.Start() + jobDispatcher.RegisterHandler(tasks.GCTask, gcHandler) ckpDispatcher := tasks.NewBaseScopedDispatcher(tasks.DefaultScopeSharder) for i := 0; i < 4; i++ { @@ -204,7 +207,8 @@ func (s *taskScheduler) ScheduleScopedFn(ctx *tasks.Context, taskType tasks.Task func (s *taskScheduler) Schedule(task tasks.Task) (err error) { taskType := task.Type() - if taskType == tasks.DataCompactionTask || taskType == tasks.GCTask { + // if taskType == tasks.DataCompactionTask || taskType == tasks.GCTask { + if taskType == tasks.DataCompactionTask { dispatcher := s.Dispatchers[task.Type()].(*asyncJobDispatcher) return dispatcher.TryDispatch(task) } diff --git a/pkg/vm/engine/tae/db/scheduler_test.go b/pkg/vm/engine/tae/db/scheduler_test.go index 54be6a61f149cd01b59f067c41d4b6ff26ffb57a..07aab0121a9ba817922158532530d62f0486dd44 100644 --- a/pkg/vm/engine/tae/db/scheduler_test.go +++ b/pkg/vm/engine/tae/db/scheduler_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - gbat "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/compute" @@ -172,15 +171,6 @@ func TestCheckpoint2(t *testing.T) { t.Log(meta2.String()) assert.Nil(t, txn.Commit()) } - doAppend := func(data *gbat.Batch, name string) { - txn, err := tae.StartTxn(nil) - assert.NoError(t, err) - db, _ := txn.GetDatabase("db") - rel, _ := db.GetRelationByName(name) - err = rel.Append(data) - assert.Nil(t, err) - assert.Nil(t, txn.Commit()) - } for i, data := range bats[0:8] { var name string if i%2 == 0 { @@ -188,7 +178,7 @@ func TestCheckpoint2(t *testing.T) { } else { name = schema2.Name } - doAppend(data, name) + appendClosure(t, data, name, tae, nil)() } var meta *catalog.BlockEntry testutils.WaitExpect(1000, func() bool { @@ -196,8 +186,9 @@ func TestCheckpoint2(t *testing.T) { }) assert.Equal(t, uint64(4), tae.Wal.GetPenddingCnt()) t.Log(tae.Wal.GetPenddingCnt()) - doAppend(bats[8], schema1.Name) + appendClosure(t, bats[8], schema1.Name, tae, nil)() // t.Log(tae.MTBufMgr.String()) + { txn, _ := tae.StartTxn(nil) db, err := txn.GetDatabase("db") @@ -216,6 +207,7 @@ func TestCheckpoint2(t *testing.T) { assert.Nil(t, err) assert.Nil(t, txn.Commit()) } + // testutils.WaitExpect(1000, func() bool { // return tae.Wal.GetPenddingCnt() == 1 // }) @@ -247,20 +239,7 @@ func TestSchedule1(t *testing.T) { assert.Nil(t, err) assert.Nil(t, txn.Commit()) } - { - txn, _ := db.StartTxn(nil) - database, _ := txn.GetDatabase("db") - rel, _ := database.GetRelationByName(schema.Name) - it := rel.MakeBlockIt() - blk := it.GetBlock() - blkMeta := blk.GetMeta().(*catalog.BlockEntry) - factory := jobs.CompactBlockTaskFactory(blkMeta, db.Scheduler) - ctx := tasks.Context{Waitable: true} - task, err := db.Scheduler.ScheduleTxnTask(&ctx, tasks.DataCompactionTask, factory) - assert.Nil(t, err) - err = task.WaitDone() - assert.Nil(t, err) - } + compactBlocks(t, db, "db", schema, false) t.Log(db.Opts.Catalog.SimplePPString(common.PPL1)) db.Close() } diff --git a/pkg/vm/engine/tae/db/task.go b/pkg/vm/engine/tae/db/task.go index 0396dcf95a3f5d4e09ccf7fbfe2dff563efb069f..533f01085aefe105057bf2b3ae1082f9103240f4 100644 --- a/pkg/vm/engine/tae/db/task.go +++ b/pkg/vm/engine/tae/db/task.go @@ -15,6 +15,7 @@ package db import ( + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" ) @@ -55,6 +56,7 @@ func (task *ScheduledTxnTask) Execute() (err error) { if err2 != nil { panic(err2) } + logutil.Warnf("Execute ScheduleTxnTask: %v. Rollbacked", err) return } err = txnTask.OnExec() diff --git a/pkg/vm/engine/tae/iface/txnif/types.go b/pkg/vm/engine/tae/iface/txnif/types.go index 805755d277816258f38b213079b5aa25457820ab..4fd7918a42565dcbb6b0fc9a5b679e37edf0138c 100644 --- a/pkg/vm/engine/tae/iface/txnif/types.go +++ b/pkg/vm/engine/tae/iface/txnif/types.go @@ -50,6 +50,7 @@ type TxnReader interface { GetStore() TxnStore String() string Repr() string + GetLSN() uint64 } type TxnHandle interface { @@ -171,6 +172,7 @@ type TxnStore interface { Txn2PC io.Closer BindTxn(AsyncTxn) + GetLSN() uint64 BatchDedup(dbId, id uint64, pks ...*vector.Vector) error LogSegmentID(dbId, tid, sid uint64) diff --git a/pkg/vm/engine/tae/tables/blocktasks.go b/pkg/vm/engine/tae/tables/blocktasks.go index 3793061c33d1681375210b9d008d671d64f6462f..3cce24d7fe1118ef1d0fe3a2105ce2140d7e3442 100644 --- a/pkg/vm/engine/tae/tables/blocktasks.go +++ b/pkg/vm/engine/tae/tables/blocktasks.go @@ -58,6 +58,7 @@ func (blk *dataBlock) CheckpointWAL(endTs uint64) (err error) { func (blk *dataBlock) BlkCheckpointWAL(endTs uint64) (err error) { ckpTs := blk.GetMaxCheckpointTS() + logutil.Infof("BlkCheckpointWAL | %s | [%d/%d]", blk.meta.Repr(), ckpTs, endTs) if endTs <= ckpTs { return } @@ -85,6 +86,7 @@ func (blk *dataBlock) BlkCheckpointWAL(endTs uint64) (err error) { func (blk *dataBlock) ABlkCheckpointWAL(endTs uint64) (err error) { ckpTs := blk.GetMaxCheckpointTS() + logutil.Infof("ABlkCheckpointWAL | %s | [%d/%d]", blk.meta.Repr(), ckpTs, endTs) if endTs <= ckpTs { return } @@ -191,7 +193,7 @@ func (blk *dataBlock) ABlkFlushData(ts uint64, bat batch.IBatch, masks map[uint1 } ckpTs := blk.GetMaxCheckpointTS() if ts <= ckpTs { - logutil.Infof("FLUSH ABLK | [%s] | CANCELLED | (State Request: Already Compacted)", blk.meta.String()) + logutil.Infof("FLUSH ABLK | [%s] | CANCELLED | (Stale Request: Already Compacted)", blk.meta.String()) return data.ErrStaleRequest } diff --git a/pkg/vm/engine/tae/tables/mock.go b/pkg/vm/engine/tae/tables/mock.go index c794fd1d124757651b4ebc6b2c0ac71b9a91a6b0..a5eea3456ec5a49f80ddcc6ea0c0eca3dcfbd624 100644 --- a/pkg/vm/engine/tae/tables/mock.go +++ b/pkg/vm/engine/tae/tables/mock.go @@ -31,6 +31,7 @@ func newMockTxn() *mockTxn { } } +func (txn *mockTxn) GetLSN() uint64 { return 0 } func (txn *mockTxn) GetError() error { return nil } func (txn *mockTxn) GetStore() txnif.TxnStore { return nil } func (txn *mockTxn) GetTxnState(bool) txnif.TxnState { return 0 } diff --git a/pkg/vm/engine/tae/testutils/config/options.go b/pkg/vm/engine/tae/testutils/config/options.go index 21a7fcb99e8eb2c918d1099b9d5a195836b615b2..b037f23371625fb1a7210e63c7d4a95b5b16feef 100644 --- a/pkg/vm/engine/tae/testutils/config/options.go +++ b/pkg/vm/engine/tae/testutils/config/options.go @@ -94,3 +94,31 @@ func NewCustomizedMetaOptions(dir string, cst CacheSizeType, blockRows uint32, b opts.FillDefaults(dir) return opts } + +func WithQuickScanAndCKPOpts(in *options.Options) (opts *options.Options) { + if in == nil { + opts = new(options.Options) + } else { + opts = in + } + opts.CheckpointCfg = new(options.CheckpointCfg) + opts.CheckpointCfg.ScannerInterval = 10 + opts.CheckpointCfg.ExecutionLevels = 5 + opts.CheckpointCfg.ExecutionInterval = 1 + opts.CheckpointCfg.CatalogCkpInterval = 5 + opts.CheckpointCfg.CatalogUnCkpLimit = 1 + return opts +} + +func WithLongScanAndCKPOpts(in *options.Options) (opts *options.Options) { + if in == nil { + opts = new(options.Options) + } else { + opts = in + } + opts.CheckpointCfg = new(options.CheckpointCfg) + opts.CheckpointCfg.ScannerInterval = 100000 + opts.CheckpointCfg.ExecutionLevels = 20 + opts.CheckpointCfg.ExecutionInterval = 200000 + return opts +} diff --git a/pkg/vm/engine/tae/txn/txnbase/store.go b/pkg/vm/engine/tae/txn/txnbase/store.go index a7f63f675fb2209ea75dc158b9310f670564533e..e8be74d1b0ca61ebd12935f0df7d8103b7f4b365 100644 --- a/pkg/vm/engine/tae/txn/txnbase/store.go +++ b/pkg/vm/engine/tae/txn/txnbase/store.go @@ -26,6 +26,7 @@ var NoopStoreFactory = func() txnif.TxnStore { return new(NoopTxnStore) } type NoopTxnStore struct{} +func (store *NoopTxnStore) GetLSN() uint64 { return 0 } func (store *NoopTxnStore) BindTxn(txn txnif.AsyncTxn) {} func (store *NoopTxnStore) Close() error { return nil } func (store *NoopTxnStore) Append(dbId, id uint64, data *batch.Batch) error { return nil } diff --git a/pkg/vm/engine/tae/txn/txnbase/txn.go b/pkg/vm/engine/tae/txn/txnbase/txn.go index ca9752e5cdefa4bd71569b2eaf8060f49bcd5f9b..1a6a0c47cd0118b7a24b9596c767b4cc81edcb71 100644 --- a/pkg/vm/engine/tae/txn/txnbase/txn.go +++ b/pkg/vm/engine/tae/txn/txnbase/txn.go @@ -56,6 +56,7 @@ type Txn struct { Store txnif.TxnStore Err error DoneCond sync.Cond + LSN uint64 PrepareCommitFn func(txnif.AsyncTxn) error PrepareRollbackFn func(txnif.AsyncTxn) error @@ -112,6 +113,8 @@ func (txn *Txn) GetStore() txnif.TxnStore { return txn.Store } +func (txn *Txn) GetLSN() uint64 { return txn.LSN } + func (txn *Txn) Rollback() (err error) { if txn.Store.IsReadonly() { txn.Mgr.DeleteTxn(txn.GetID()) @@ -201,6 +204,7 @@ func (txn *Txn) PreApplyCommit() (err error) { func (txn *Txn) ApplyCommit() (err error) { defer func() { + txn.LSN = txn.Store.GetLSN() if err == nil { err = txn.Store.Close() } else { @@ -218,6 +222,7 @@ func (txn *Txn) ApplyCommit() (err error) { func (txn *Txn) ApplyRollback() (err error) { defer func() { + txn.LSN = txn.Store.GetLSN() if err == nil { err = txn.Store.Close() } else { diff --git a/pkg/vm/engine/tae/txn/txnimpl/localseg.go b/pkg/vm/engine/tae/txn/txnimpl/localseg.go index eddad5c884325576483062f79868c292a0b8cfad..667177742cf49f4878259a57d17fb147aec75a99 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/localseg.go +++ b/pkg/vm/engine/tae/txn/txnimpl/localseg.go @@ -259,7 +259,9 @@ func (seg *localSegment) RangeDelete(start, end uint32) error { } node := seg.nodes[first] - err = node.RangeDelete(firstOffset, txnbase.MaxNodeRows-1) + if err = node.RangeDelete(firstOffset, txnbase.MaxNodeRows-1); err != nil { + return err + } node = seg.nodes[last] if err = node.RangeDelete(0, lastOffset); err != nil { return err diff --git a/pkg/vm/engine/tae/txn/txnimpl/store.go b/pkg/vm/engine/tae/txn/txnimpl/store.go index 153222bd9fc2faeafacb5d846d4985aec2eb19bc..3ad6debf644b56584b098fe9f64b596ea9bc97eb 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/store.go +++ b/pkg/vm/engine/tae/txn/txnimpl/store.go @@ -442,3 +442,5 @@ func (store *txnStore) PrepareRollback() error { return err } + +func (store *txnStore) GetLSN() uint64 { return store.cmdMgr.lsn }