diff --git a/pkg/vm/engine/tae/db/db_test.go b/pkg/vm/engine/tae/db/db_test.go index 0e1d5cde38813bac1b3891e4a06e0f37dd7ae9fd..5049deadac1dbcd6c2aec0a582d90a6e3ee9ca4f 100644 --- a/pkg/vm/engine/tae/db/db_test.go +++ b/pkg/vm/engine/tae/db/db_test.go @@ -2066,6 +2066,105 @@ func TestMergeBlocks(t *testing.T) { assert.Nil(t, txn.Commit()) } +// delete +// merge but not commit +// delete +// commit merge +func TestMergeblocks2(t *testing.T) { + testutils.EnsureNoLeak(t) + opts := config.WithLongScanAndCKPOpts(nil) + tae := newTestEngine(t, opts) + defer tae.Close() + schema := catalog.MockSchemaAll(1, 0) + schema.BlockMaxRows = 3 + schema.SegmentMaxBlocks = 2 + tae.bindSchema(schema) + bat := catalog.MockBatch(schema, 6) + bats := bat.Split(2) + defer bat.Close() + + tae.createRelAndAppend(bats[0], true) + + txn, rel := tae.getRelation() + _ = rel.Append(bats[1]) + assert.Nil(t, txn.Commit()) + + { + v := getSingleSortKeyValue(bat, schema, 1) + t.Logf("v is %v**********", v) + filter := handle.NewEQFilter(v) + txn2, rel := tae.getRelation() + t.Log("********before delete******************") + checkAllColRowsByScan(t, rel, 6, true) + _ = rel.DeleteByFilter(filter) + assert.Nil(t, txn2.Commit()) + } + + _, rel = tae.getRelation() + t.Log("**********************") + checkAllColRowsByScan(t, rel, 5, true) + + { + t.Log("************merge************") + + txn, rel = tae.getRelation() + + segIt := rel.MakeSegmentIt() + seg := segIt.GetSegment().GetMeta().(*catalog.SegmentEntry) + segHandle, err := rel.GetSegment(seg.ID) + assert.NoError(t, err) + + var metas []*catalog.BlockEntry + it := segHandle.MakeBlockIt() + for it.Valid() { + meta := it.GetBlock().GetMeta().(*catalog.BlockEntry) + metas = append(metas, meta) + it.Next() + } + segsToMerge := []*catalog.SegmentEntry{segHandle.GetMeta().(*catalog.SegmentEntry)} + task, err := jobs.NewMergeBlocksTask(nil, txn, metas, segsToMerge, nil, tae.Scheduler) + assert.NoError(t, err) + err = task.OnExec() + assert.NoError(t, err) + + { + v := getSingleSortKeyValue(bat, schema, 2) + t.Logf("v is %v**********", v) + filter := handle.NewEQFilter(v) + txn2, rel := tae.getRelation() + t.Log("********before delete******************") + checkAllColRowsByScan(t, rel, 5, true) + _ = rel.DeleteByFilter(filter) + assert.Nil(t, txn2.Commit()) + } + err = txn.Commit() + assert.NoError(t, err) + } + + t.Log("********************") + _, rel = tae.getRelation() + checkAllColRowsByScan(t, rel, 4, true) + assert.Equal(t, int64(4), rel.Rows()) + + v := getSingleSortKeyValue(bat, schema, 1) + filter := handle.NewEQFilter(v) + _, _, err := rel.GetByFilter(filter) + assert.NotNil(t, err) + + v = getSingleSortKeyValue(bat, schema, 2) + filter = handle.NewEQFilter(v) + _, _, err = rel.GetByFilter(filter) + assert.NotNil(t, err) + + // v = getSingleSortKeyValue(bat, schema, 4) + // filter = handle.NewEQFilter(v) + // _, _, err = rel.GetByFilter(filter) + // assert.NotNil(t, err) + + // tae.restart() + // assert.Equal(t, int64(2), rel.Rows()) +} + func TestDelete2(t *testing.T) { testutils.EnsureNoLeak(t) opts := config.WithLongScanAndCKPOpts(nil) diff --git a/pkg/vm/engine/tae/tables/jobs/mergeblocks.go b/pkg/vm/engine/tae/tables/jobs/mergeblocks.go index 10cd5f94328eba0a1b706104535d08cb05fe3884..02e305eb71932db05dbf0e4a18ebc8af24f1591c 100644 --- a/pkg/vm/engine/tae/tables/jobs/mergeblocks.go +++ b/pkg/vm/engine/tae/tables/jobs/mergeblocks.go @@ -182,6 +182,7 @@ func (task *mergeBlocksTask) Execute() (err error) { if view, err = block.GetColumnDataById(schema.SortKey.Defs[0].Idx, nil); err != nil { return } + task.deletes[i] = view.DeleteMask view.ApplyDeletes() vec = view.Orphan() } else { @@ -190,6 +191,7 @@ func (task *mergeBlocksTask) Execute() (err error) { if view, err = block.GetColumnDataById(schema.SortKey.Defs[idx].Idx, nil); err != nil { return } + task.deletes[i] = view.DeleteMask view.ApplyDeletes() cols[idx] = view.Orphan() defer cols[idx].Close()