diff --git a/pkg/vm/engine/aoe/storage/common/refs.go b/pkg/vm/engine/aoe/storage/common/refs.go new file mode 100644 index 0000000000000000000000000000000000000000..1650253439b72d48a0ef5111259426e8c49319cf --- /dev/null +++ b/pkg/vm/engine/aoe/storage/common/refs.go @@ -0,0 +1,29 @@ +package common + +import "sync/atomic" + +type OnZeroCB func() + +type RefHelper struct { + Refs int64 + OnZeroCB OnZeroCB +} + +func (helper *RefHelper) RefCount() int64 { + return atomic.LoadInt64(&helper.Refs) +} + +func (helper *RefHelper) Ref() { + atomic.AddInt64(&helper.Refs, int64(1)) +} + +func (helper *RefHelper) Unref() { + v := atomic.AddInt64(&helper.Refs, int64(-1)) + if v == 0 { + if helper.OnZeroCB != nil { + helper.OnZeroCB() + } + } else if v < 0 { + panic("logic error") + } +} diff --git a/pkg/vm/engine/aoe/storage/db/db_test.go b/pkg/vm/engine/aoe/storage/db/db_test.go index 6ad4e116368721b1b55a63f3e53d5d2e7b1154e5..3a6bac66061601473194edcc0dfa63dfe1a2b7ce 100644 --- a/pkg/vm/engine/aoe/storage/db/db_test.go +++ b/pkg/vm/engine/aoe/storage/db/db_test.go @@ -3,9 +3,10 @@ package db import ( "context" "fmt" - "github.com/panjf2000/ants/v2" "math/rand" e "matrixone/pkg/vm/engine/aoe/storage" + "matrixone/pkg/vm/engine/aoe/storage/layout/base" + "matrixone/pkg/vm/engine/aoe/storage/layout/table/index" md "matrixone/pkg/vm/engine/aoe/storage/metadata" "matrixone/pkg/vm/engine/aoe/storage/mock/type/chunk" "os" @@ -13,6 +14,8 @@ import ( "testing" "time" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/assert" ) @@ -302,8 +305,22 @@ func TestConcurrency(t *testing.T) { for blkIt.Valid() { tblkCnt++ blkHandle := blkIt.GetBlockHandle() - // indexHolder := blkHandle.GetIndexHolder() - // t.Log(indexHolder.String()) + col0 := blkHandle.GetColumn(0) + ctx := index.NewFilterCtx(index.OpEq) + ctx.Val = int32(0 + col0.GetColIdx()*100) + err = col0.EvalFilter(ctx) + assert.Nil(t, err) + if col0.GetBlockType() > base.PERSISTENT_BLK { + assert.False(t, ctx.BoolRes) + } + ctx.Reset() + ctx.Op = index.OpEq + ctx.Val = int32(1 + col0.GetColIdx()*100) + err = col0.EvalFilter(ctx) + assert.Nil(t, err) + if col0.GetBlockType() > base.PERSISTENT_BLK { + assert.True(t, ctx.BoolRes) + } cursors := blkHandle.InitScanCursor() for _, cursor := range cursors { cursor.Close() diff --git a/pkg/vm/engine/aoe/storage/iterator/base.go b/pkg/vm/engine/aoe/storage/iterator/base.go new file mode 100644 index 0000000000000000000000000000000000000000..ec84576c90a31ba5f05aeca3e97e7ef96d18dda9 --- /dev/null +++ b/pkg/vm/engine/aoe/storage/iterator/base.go @@ -0,0 +1,90 @@ +package it + +type BaseResources struct { + Impl IResources +} + +type BaseIterator struct { + Err error + Result interface{} + Executor ExecutorT + Resources IResources + Impl Iterator +} + +var ( + _ IResources = (*BaseResources)(nil) + _ Iterator = (*BaseIterator)(nil) +) + +func NewBaseResources(impl IResources) IResources { + res := &BaseResources{ + Impl: impl, + } + return res +} + +func NewBaseIterator(impl Iterator, executor ExecutorT, resources IResources) Iterator { + iter := &BaseIterator{ + Executor: executor, + Resources: resources, + Impl: impl, + } + return iter +} + +func (res *BaseResources) IterResource(iter Iterator) { + err := iter.PreIter() + if err != nil { + iter.SetErr(err) + return + } + + err = res.Impl.HandleResources(iter.Execute) + if err != nil { + iter.SetErr(err) + return + } + + err = iter.PreIter() + iter.SetErr(err) +} + +func (res *BaseResources) HandleResources(handle HandleT) error { + return nil +} + +func (iter *BaseIterator) SetErr(err error) { + iter.Err = err +} + +func (iter *BaseIterator) GetErr() error { + return iter.Err +} + +func (iter *BaseIterator) SetResult(r interface{}) { + iter.Result = r +} + +func (iter *BaseIterator) GetResult() interface{} { + return iter.Result +} + +func (iter *BaseIterator) PreIter() error { + return nil +} + +func (iter *BaseIterator) Iter() { + iter.Resources.IterResource(iter) +} + +func (iter *BaseIterator) PostIter() error { + return nil +} + +func (iter *BaseIterator) Execute(res interface{}) error { + if iter.Executor != nil { + return iter.Executor(res, iter.Impl) + } + return nil +} diff --git a/pkg/vm/engine/aoe/storage/iterator/types.go b/pkg/vm/engine/aoe/storage/iterator/types.go new file mode 100644 index 0000000000000000000000000000000000000000..6c0ec37d5d6f4c3177b69f16097288dab9a222ea --- /dev/null +++ b/pkg/vm/engine/aoe/storage/iterator/types.go @@ -0,0 +1,22 @@ +package it + +import () + +type ExecutorT func(interface{}, Iterator) error +type HandleT func(interface{}) error + +type IResources interface { + IterResource(Iterator) + HandleResources(HandleT) error +} + +type Iterator interface { + PreIter() error + Iter() + PostIter() error + GetResult() interface{} + GetErr() error + SetResult(interface{}) + SetErr(error) + Execute(interface{}) error +} diff --git a/pkg/vm/engine/aoe/storage/layout/base/file.go b/pkg/vm/engine/aoe/storage/layout/base/file.go index e26fee872c0930ffaa2503dd99b99cc69412af59..27ec5c3d99a0351382f4cc25e338bef463b83b1d 100644 --- a/pkg/vm/engine/aoe/storage/layout/base/file.go +++ b/pkg/vm/engine/aoe/storage/layout/base/file.go @@ -69,7 +69,7 @@ type IBaseFile interface { ReadPoint(ptr *Pointer, buf []byte) ReadPart(colIdx uint64, id common.ID, buf []byte) Destory() - MakeVirtalIndexFile(*IndexMeta) IVirtaulFile + MakeVirtualIndexFile(*IndexMeta) IVirtaulFile GetDir() string } diff --git a/pkg/vm/engine/aoe/storage/layout/dataio/blk.go b/pkg/vm/engine/aoe/storage/layout/dataio/blk.go index 6245f86ea6a119b004f09c6c943366111c970270..260db5c206f3e0331282db2f8e63bbc08f489036 100644 --- a/pkg/vm/engine/aoe/storage/layout/dataio/blk.go +++ b/pkg/vm/engine/aoe/storage/layout/dataio/blk.go @@ -68,7 +68,7 @@ func (bf *BlockFile) GetIndexesMeta() *base.IndexesMeta { return bf.Meta.Indexes } -func (bf *BlockFile) MakeVirtalIndexFile(meta *base.IndexMeta) base.IVirtaulFile { +func (bf *BlockFile) MakeVirtualIndexFile(meta *base.IndexMeta) base.IVirtaulFile { vf := &EmbbedBlockIndexFile{ EmbbedIndexFile: EmbbedIndexFile{ SegmentFile: bf.SegmentFile, diff --git a/pkg/vm/engine/aoe/storage/layout/dataio/mockseg.go b/pkg/vm/engine/aoe/storage/layout/dataio/mockseg.go index 0b6da58a6aee3c68a3e8bee88af3cfab39553b9b..81e7b6bfc40742224717187619e25f44948f83be 100644 --- a/pkg/vm/engine/aoe/storage/layout/dataio/mockseg.go +++ b/pkg/vm/engine/aoe/storage/layout/dataio/mockseg.go @@ -107,7 +107,7 @@ func (msf *MockSegmentFile) GetDir() string { return filepath.Dir(msf.FileName) } -func (msf *MockSegmentFile) MakeVirtalIndexFile(meta *base.IndexMeta) base.IVirtaulFile { +func (msf *MockSegmentFile) MakeVirtualIndexFile(meta *base.IndexMeta) base.IVirtaulFile { return nil } diff --git a/pkg/vm/engine/aoe/storage/layout/dataio/sortedseg.go b/pkg/vm/engine/aoe/storage/layout/dataio/sortedseg.go index aabb770cee056a6066e8221de594d5dd128292ab..bdb1df59e4814cd18ec8da1b13bc50cf0ad2f6f2 100644 --- a/pkg/vm/engine/aoe/storage/layout/dataio/sortedseg.go +++ b/pkg/vm/engine/aoe/storage/layout/dataio/sortedseg.go @@ -45,7 +45,7 @@ type SortedSegmentFile struct { BlocksMeta map[common.ID]*FileMeta } -func (sf *SortedSegmentFile) MakeVirtalIndexFile(meta *base.IndexMeta) base.IVirtaulFile { +func (sf *SortedSegmentFile) MakeVirtualIndexFile(meta *base.IndexMeta) base.IVirtaulFile { vf := &EmbbedIndexFile{ SegmentFile: sf, Meta: meta, diff --git a/pkg/vm/engine/aoe/storage/layout/dataio/unsortedseg.go b/pkg/vm/engine/aoe/storage/layout/dataio/unsortedseg.go index 149c2ca26c004ae66c87cb71c80f89177d508ebe..77d29bea79192d79d758b6ed4c2741987d68a326 100644 --- a/pkg/vm/engine/aoe/storage/layout/dataio/unsortedseg.go +++ b/pkg/vm/engine/aoe/storage/layout/dataio/unsortedseg.go @@ -78,7 +78,7 @@ func (sf *UnsortedSegmentFile) GetBlockIndexesMeta(id common.ID) *base.IndexesMe return blk.GetIndexesMeta() } -func (sf *UnsortedSegmentFile) MakeVirtalIndexFile(meta *base.IndexMeta) base.IVirtaulFile { +func (sf *UnsortedSegmentFile) MakeVirtualIndexFile(meta *base.IndexMeta) base.IVirtaulFile { return nil } @@ -87,7 +87,7 @@ func (sf *UnsortedSegmentFile) MakeVirtualBlkIndexFile(id *common.ID, meta *base if blk == nil { return nil } - return blk.MakeVirtalIndexFile(meta) + return blk.MakeVirtualIndexFile(meta) } func (sf *UnsortedSegmentFile) MakeVirtualPartFile(id *common.ID) base.IVirtaulFile { diff --git a/pkg/vm/engine/aoe/storage/layout/table/col/blk.go b/pkg/vm/engine/aoe/storage/layout/table/col/blk.go index 6d0a9e4db91dba28772b037da050d74c14e59771..678b0cb2541a20ef922ee564fda565316157065a 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/col/blk.go +++ b/pkg/vm/engine/aoe/storage/layout/table/col/blk.go @@ -8,6 +8,7 @@ import ( md "matrixone/pkg/vm/engine/aoe/storage/metadata" "sync" "sync/atomic" + // log "github.com/sirupsen/logrus" ) type IColumnBlock interface { @@ -23,6 +24,7 @@ type IColumnBlock interface { GetIndexHolder() *index.BlockHolder GetColIdx() int CloneWithUpgrade(IColumnSegment, *md.Block) IColumnBlock + EvalFilter(*index.FilterCtx) error String() string Ref() IColumnBlock UnRef() @@ -41,6 +43,10 @@ type ColumnBlock struct { IndexHolder *index.BlockHolder } +func (blk *ColumnBlock) EvalFilter(ctx *index.FilterCtx) error { + return blk.IndexHolder.EvalFilter(blk.ColIdx, ctx) +} + func (blk *ColumnBlock) GetRefs() int64 { return atomic.LoadInt64(&blk.Refs) } diff --git a/pkg/vm/engine/aoe/storage/layout/table/col/blk_test.go b/pkg/vm/engine/aoe/storage/layout/table/col/blk_test.go index 382e9134579570f8d5431a2a701cd20a62968344..3bcb61223dd1f57e63a5ca0a9f7e077bcc80dbd2 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/col/blk_test.go +++ b/pkg/vm/engine/aoe/storage/layout/table/col/blk_test.go @@ -4,6 +4,7 @@ import ( bmgr "matrixone/pkg/vm/engine/aoe/storage/buffer/manager" mgrif "matrixone/pkg/vm/engine/aoe/storage/buffer/manager/iface" dio "matrixone/pkg/vm/engine/aoe/storage/dataio" + it "matrixone/pkg/vm/engine/aoe/storage/iterator" "matrixone/pkg/vm/engine/aoe/storage/layout/base" ldio "matrixone/pkg/vm/engine/aoe/storage/layout/dataio" "matrixone/pkg/vm/engine/aoe/storage/layout/table/index" @@ -361,6 +362,18 @@ func makeSegments(indexHolder *index.TableHolder, fsMgr base.IManager, mtBufMgr, return segs } +type testBlkIt struct { + it.BaseIterator +} + +func newTestBlkIt(executor it.ExecutorT, res it.IResources) it.Iterator { + blkIt := new(testBlkIt) + blkIt.Impl = blkIt + blkIt.Executor = executor + blkIt.Resources = res + return blkIt +} + func TestUpgradeStdSegment(t *testing.T) { seg_cnt := uint64(5) blkRows := uint64(10) @@ -450,6 +463,13 @@ func TestUpgradeStdSegment(t *testing.T) { assert.Equal(t, base.PERSISTENT_SORTED_BLK, blk.GetBlockType()) blk.UnRef() } + executor := func(blk interface{}, iter it.Iterator) error { + t.Log(blk) + t.Log(iter) + return nil + } + blkIt := newTestBlkIt(executor, currSeg) + blkIt.Iter() currSeg.UnRef() currSeg = currSeg.GetNext() } diff --git a/pkg/vm/engine/aoe/storage/layout/table/col/seg.go b/pkg/vm/engine/aoe/storage/layout/table/col/seg.go index 8dda609a8cb65830fad428e76ac084191ec4796c..c704f00614586733a445cdf2beff4a7184e66415 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/col/seg.go +++ b/pkg/vm/engine/aoe/storage/layout/table/col/seg.go @@ -6,6 +6,7 @@ import ( "io" bmgrif "matrixone/pkg/vm/engine/aoe/storage/buffer/manager/iface" "matrixone/pkg/vm/engine/aoe/storage/common" + it "matrixone/pkg/vm/engine/aoe/storage/iterator" "matrixone/pkg/vm/engine/aoe/storage/layout/base" "matrixone/pkg/vm/engine/aoe/storage/layout/table/index" md "matrixone/pkg/vm/engine/aoe/storage/metadata" @@ -16,6 +17,7 @@ import ( ) type IColumnSegment interface { + it.IResources io.Closer sync.Locker GetNext() IColumnSegment @@ -43,10 +45,12 @@ type IColumnSegment interface { GetRefs() int64 GetMeta() *md.Segment GetFsManager() base.IManager + EvalFilter(*index.FilterCtx) error } type ColumnSegment struct { sync.RWMutex + it.BaseResources Refs int64 ID common.ID ColIdx int @@ -79,15 +83,56 @@ func NewColumnSegment(tblHolder *index.TableHolder, fsMgr base.IManager, mtBufMg SSTBufMgr: sstBufMgr, FsMgr: fsMgr, } + seg.Impl = seg seg.IndexHolder = tblHolder.GetSegment(seg.ID.SegmentID) + var err error if seg.IndexHolder == nil { segHolder := tblHolder.RegisterSegment(seg.ID.AsSegmentID(), indexSegType, nil) seg.IndexHolder = segHolder + id := seg.ID.AsSegmentID() + segFile := fsMgr.GetUnsortedFile(id) + if segType == base.UNSORTED_SEG { + if segFile == nil { + segFile, err = fsMgr.RegisterUnsortedFiles(seg.GetID()) + if err != nil { + panic(err) + } + } + seg.IndexHolder.Init(segFile) + } else { + if segFile != nil { + fsMgr.UpgradeFile(seg.GetID()) + } else { + segFile = fsMgr.GetSortedFile(seg.GetID()) + if segFile == nil { + segFile, err = fsMgr.RegisterSortedFiles(seg.GetID()) + if err != nil { + panic(err) + } + } + } + seg.IndexHolder.Init(segFile) + } } return seg.Ref() } +func (seg *ColumnSegment) EvalFilter(ctx *index.FilterCtx) error { + return seg.IndexHolder.EvalFilter(seg.ColIdx, ctx) +} + +func (seg *ColumnSegment) HandleResources(handle it.HandleT) error { + var err error + for _, blk := range seg.Blocks { + err = handle(blk) + if err != nil { + return err + } + } + return err +} + func (seg *ColumnSegment) GetIndexHolder() *index.SegmentHolder { return seg.IndexHolder } @@ -205,7 +250,15 @@ func (seg *ColumnSegment) CloneWithUpgrade(meta *md.Segment, indexTblHolder *ind } if segIndexHolder.Type == base.UNSORTED_SEG { segIndexHolder = indexTblHolder.UpgradeSegment(seg.ID.SegmentID, base.SORTED_SEG) - // segIndexHolder.Init() + id := seg.ID.AsSegmentID() + segFile := cloned.FsMgr.GetSortedFile(id) + if segFile == nil { + segFile = cloned.FsMgr.UpgradeFile(id) + if segFile == nil { + panic("logic error") + } + } + seg.IndexHolder.Init(segFile) } cloned.IndexHolder = segIndexHolder var prev IColumnBlock diff --git a/pkg/vm/engine/aoe/storage/layout/table/col/stdblk.go b/pkg/vm/engine/aoe/storage/layout/table/col/stdblk.go index 937cc904e67cf7beda3354db9f267cdde77d49f8..1aef754961f12a54ad2095d66e0fb64ab52650c5 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/col/stdblk.go +++ b/pkg/vm/engine/aoe/storage/layout/table/col/stdblk.go @@ -48,9 +48,12 @@ func NewStdColumnBlock(seg IColumnSegment, meta *md.Block) IColumnBlock { if segFile != nil { fsMgr.UpgradeFile(seg.GetID()) } else { - _, err := fsMgr.RegisterSortedFiles(seg.GetID()) - if err != nil { - panic(err) + segFile = fsMgr.GetSortedFile(seg.GetID()) + if segFile == nil { + _, err := fsMgr.RegisterSortedFiles(seg.GetID()) + if err != nil { + panic(err) + } } } if indexHolder == nil { diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/blk.go b/pkg/vm/engine/aoe/storage/layout/table/index/blk.go index d4c6356e4d812fea0c551a582b4f596e9b227aa8..ecd05bbe6fd39a461f2ee6d32604f5c72bbd99f5 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/blk.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/blk.go @@ -9,10 +9,11 @@ import ( ) type BlockHolder struct { - RefHelper + common.RefHelper ID common.ID sync.RWMutex Indexes []*Node + ColIndexes map[int][]int Type base.BlockType BufMgr mgrif.IBufferManager Inited bool @@ -27,6 +28,7 @@ func newBlockHolder(bufMgr mgrif.IBufferManager, id common.ID, t base.BlockType, Inited: false, PostCloseCB: cb, } + holder.ColIndexes = make(map[int][]int) holder.Indexes = make([]*Node, 0) holder.OnZeroCB = holder.close holder.Ref() @@ -43,12 +45,39 @@ func (holder *BlockHolder) Init(segFile base.ISegmentFile) { } for _, meta := range indexesMeta.Data { vf := segFile.MakeVirtualBlkIndexFile(&holder.ID, meta) + col := int(meta.Cols.Slice()[0]) node := newNode(holder.BufMgr, vf, ZoneMapIndexConstructor, meta.Ptr.Len, meta.Cols, nil) + idxes, ok := holder.ColIndexes[col] + if !ok { + idxes = make([]int, 0) + holder.ColIndexes[col] = idxes + } + holder.ColIndexes[col] = append(holder.ColIndexes[col], len(holder.Indexes)) holder.Indexes = append(holder.Indexes, node) } holder.Inited = true } +func (holder *BlockHolder) EvalFilter(colIdx int, ctx *FilterCtx) error { + idxes, ok := holder.ColIndexes[colIdx] + if !ok { + // TODO + ctx.BoolRes = true + return nil + } + var err error + for _, idx := range idxes { + node := holder.Indexes[idx].GetManagedNode() + err = node.DataNode.(Index).Eval(ctx) + if err != nil { + node.Close() + return err + } + node.Close() + } + return nil +} + func (holder *BlockHolder) close() { for _, index := range holder.Indexes { index.Unref() @@ -83,5 +112,6 @@ func (holder *BlockHolder) stringNoLock() string { for _, i := range holder.Indexes { s = fmt.Sprintf("%s\n\tIndex: [Refs=%d]", s, i.RefCount()) } + // s = fmt.Sprintf("%s\n%vs, holder.ColIndexes) return s } diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/helper.go b/pkg/vm/engine/aoe/storage/layout/table/index/helper.go index 2a20f4ce2aee80bff936be0af5034535f2bdaa69..2a77d05c8cce7d0a0540e746cff2e2d4b6975cb2 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/helper.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/helper.go @@ -3,11 +3,13 @@ package index import ( "bytes" "fmt" - log "github.com/sirupsen/logrus" "io" "matrixone/pkg/encoding" "matrixone/pkg/vm/engine/aoe/storage/layout/base" "os" + + "github.com/pilosa/pilosa/roaring" + log "github.com/sirupsen/logrus" ) var ( @@ -28,6 +30,12 @@ func (h *RWHelper) WriteIndexes(indexes []Index) ([]byte, error) { return nil, err } } + for _, i := range indexes { + _, err := buf.Write(encoding.EncodeInt16(i.GetCol())) + if err != nil { + return nil, err + } + } for _, i := range indexes { ibuf, _ := i.Marshall() buf.Write(encoding.EncodeInt32(int32(len(ibuf)))) @@ -59,6 +67,12 @@ func (h *RWHelper) ReadIndexes(f os.File) (indexes []Index, err error) { panic("unsupported") } } + for i := 0; i < int(indexCnt); i++ { + _, err := f.Read(twoBytes) + if err != nil { + panic(fmt.Sprintf("unexpect error: %s", err)) + } + } for i := 0; i < int(indexCnt); i++ { _, err := f.Read(fourBytes) if err != nil { @@ -95,8 +109,17 @@ func (h *RWHelper) ReadIndexesMeta(f os.File) (meta *base.IndexesMeta, err error im := new(base.IndexMeta) im.Type = indexType im.Ptr = new(base.Pointer) + im.Cols = roaring.NewBitmap() meta.Data = append(meta.Data, im) } + for i := 0; i < int(indexCnt); i++ { + _, err := f.Read(twoBytes) + if err != nil { + panic(fmt.Sprintf("unexpect error: %s", err)) + } + col := encoding.DecodeInt16(twoBytes) + meta.Data[i].Cols.Add(uint64(col)) + } for i := 0; i < int(indexCnt); i++ { _, err := f.Read(fourBytes) if err != nil { diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/index_test.go b/pkg/vm/engine/aoe/storage/layout/table/index/index_test.go index 40e4072a971d4028699aaa947f578dfd25663583..82c3288afaca0a63ce76cb83cc543f8b982c6a95 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/index_test.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/index_test.go @@ -72,10 +72,23 @@ func TestTable(t *testing.T) { func TestIndex(t *testing.T) { int32zm := NewZoneMap(types.Type{Oid: types.T_int32, Size: 4}, int32(10), int32(100), int16(0)) - assert.False(t, int32zm.Eq(int32(9))) - assert.True(t, int32zm.Eq(int32(10))) - assert.True(t, int32zm.Eq(int32(100))) - assert.False(t, int32zm.Eq(int32(101))) + ctx := NewFilterCtx(OpEq) + ctx.Val = int32(9) + ctx.Eval(int32zm) + assert.False(t, ctx.BoolRes) + ctx.Reset() + ctx.Val = int32(10) + ctx.Op = OpEq + ctx.Eval(int32zm) + assert.True(t, ctx.BoolRes) + ctx.Val = int32(100) + ctx.Op = OpEq + ctx.Eval(int32zm) + assert.True(t, ctx.BoolRes) + ctx.Val = int32(101) + ctx.Op = OpEq + ctx.Eval(int32zm) + assert.False(t, ctx.BoolRes) } func TestRefs1(t *testing.T) { diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/node.go b/pkg/vm/engine/aoe/storage/layout/table/index/node.go index ff5841298a1e7736221c8715ee0576de9704d3c8..79e924fbe932fb51038a77b80c46f063c15c5bb4 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/node.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/node.go @@ -5,11 +5,12 @@ import ( buf "matrixone/pkg/vm/engine/aoe/storage/buffer" bmgr "matrixone/pkg/vm/engine/aoe/storage/buffer/manager" bmgrif "matrixone/pkg/vm/engine/aoe/storage/buffer/manager/iface" + "matrixone/pkg/vm/engine/aoe/storage/common" ) type Node struct { *bmgr.Node - RefHelper + common.RefHelper Cols *roaring.Bitmap PostCloseCB PostCloseCB } diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/seg.go b/pkg/vm/engine/aoe/storage/layout/table/index/seg.go index aab5483fd82ea75bbfefed682d66c148a4c8aebf..4fb075ae8bf2a13400ddeeb0b785747f9f00fd22 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/seg.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/seg.go @@ -9,41 +9,17 @@ import ( "sync/atomic" ) -type OnZeroCB func() - -type RefHelper struct { - Refs int64 - OnZeroCB OnZeroCB -} - -func (helper *RefHelper) RefCount() int64 { - return atomic.LoadInt64(&helper.Refs) -} - -func (helper *RefHelper) Ref() { - atomic.AddInt64(&helper.Refs, int64(1)) -} - -func (helper *RefHelper) Unref() { - v := atomic.AddInt64(&helper.Refs, int64(-1)) - if v == 0 { - if helper.OnZeroCB != nil { - helper.OnZeroCB() - } - } else if v < 0 { - panic("logic error") - } -} - type PostCloseCB = func(interface{}) type SegmentHolder struct { - RefHelper + common.RefHelper ID common.ID BufMgr mgrif.IBufferManager + Inited bool self struct { sync.RWMutex - Indexes []*Node + Indexes []*Node + ColIndexes map[int][]int } tree struct { sync.RWMutex @@ -59,6 +35,7 @@ func newSegmentHolder(bufMgr mgrif.IBufferManager, id common.ID, segType base.Se holder := &SegmentHolder{ID: id, Type: segType, BufMgr: bufMgr} holder.tree.Blocks = make([]*BlockHolder, 0) holder.tree.IdMap = make(map[uint64]int) + holder.self.ColIndexes = make(map[int][]int) holder.self.Indexes = make([]*Node, 0) holder.OnZeroCB = holder.close holder.PostCloseCB = cb @@ -66,19 +43,62 @@ func newSegmentHolder(bufMgr mgrif.IBufferManager, id common.ID, segType base.Se return holder } +func (holder *SegmentHolder) Init(segFile base.ISegmentFile) { + if holder.Inited { + panic("logic error") + } + indexesMeta := segFile.GetIndexesMeta() + if indexesMeta == nil { + return + } + for _, meta := range indexesMeta.Data { + vf := segFile.MakeVirtualIndexFile(meta) + col := int(meta.Cols.Slice()[0]) + node := newNode(holder.BufMgr, vf, ZoneMapIndexConstructor, meta.Ptr.Len, meta.Cols, nil) + idxes, ok := holder.self.ColIndexes[col] + if !ok { + idxes = make([]int, 0) + holder.self.ColIndexes[col] = idxes + } + holder.self.ColIndexes[col] = append(holder.self.ColIndexes[col], len(holder.self.Indexes)) + holder.self.Indexes = append(holder.self.Indexes, node) + } + holder.Inited = true +} + func (holder *SegmentHolder) close() { for _, blk := range holder.tree.Blocks { blk.Unref() } - for _, index := range holder.self.Indexes { - index.Unref() + for _, colIndex := range holder.self.Indexes { + colIndex.Unref() } if holder.PostCloseCB != nil { holder.PostCloseCB(holder) } } +func (holder *SegmentHolder) EvalFilter(colIdx int, ctx *FilterCtx) error { + idxes, ok := holder.self.ColIndexes[colIdx] + if !ok { + // TODO + ctx.BoolRes = true + return nil + } + var err error + for _, idx := range idxes { + node := holder.self.Indexes[idx].GetManagedNode() + err = node.DataNode.(Index).Eval(ctx) + if err != nil { + node.Close() + return err + } + node.Close() + } + return nil +} + func (holder *SegmentHolder) stringNoLock() string { s := fmt.Sprintf("<IndexSegmentHolder[%s]>[Ty=%v](Cnt=%d)(RefCount=%d)", holder.ID.SegmentString(), holder.Type, holder.tree.BlockCnt, holder.RefCount()) diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/types.go b/pkg/vm/engine/aoe/storage/layout/table/index/types.go index 4cdd3ec72a99fbb5f5db1294d408f0b0741f5410..403cd874f43efa946cad956eb2ad05526a3df85f 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/types.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/types.go @@ -3,17 +3,67 @@ package index import ( buf "matrixone/pkg/vm/engine/aoe/storage/buffer" "matrixone/pkg/vm/engine/aoe/storage/layout/base" + + "github.com/pilosa/pilosa/roaring" +) + +type OpType uint8 + +const ( + OpInv OpType = iota + OpEq + OpNe + OpLt + OpLe + OpGt + OpGe + OpIn + OpOut ) -// TODO: Just for index framework implementation placeholder +type FilterCtx struct { + Op OpType + Val interface{} + + // Used for IN | NOT IN + ValSet map[interface{}]bool + + ValMin interface{} + ValMax interface{} + + BoolRes bool + BMRes *roaring.Bitmap + Err error +} + +func NewFilterCtx(t OpType) *FilterCtx { + ctx := &FilterCtx{ + Op: t, + ValSet: make(map[interface{}]bool), + } + return ctx +} + +func (ctx *FilterCtx) Reset() { + ctx.Op = OpInv + ctx.Val = nil + for k := range ctx.ValSet { + delete(ctx.ValSet, k) + } + ctx.ValMin = nil + ctx.ValMax = nil + ctx.BoolRes = false + ctx.BMRes = nil + ctx.Err = nil +} + +func (ctx *FilterCtx) Eval(i Index) error { + return i.Eval(ctx) +} + type Index interface { buf.IMemoryNode Type() base.IndexType - Eq(interface{}) bool - Ne(interface{}) bool - Lt(interface{}) bool - Le(interface{}) bool - Gt(interface{}) bool - Ge(interface{}) bool - Btw(interface{}) bool + GetCol() int16 + Eval(ctx *FilterCtx) error } diff --git a/pkg/vm/engine/aoe/storage/layout/table/index/zonemap.go b/pkg/vm/engine/aoe/storage/layout/table/index/zonemap.go index 3764a98ce8f16c78e173365b00255d103be0610f..e2253014bc5f6f8aad8914a9ae107fd1b6a6b409 100644 --- a/pkg/vm/engine/aoe/storage/layout/table/index/zonemap.go +++ b/pkg/vm/engine/aoe/storage/layout/table/index/zonemap.go @@ -50,6 +50,32 @@ func MockInt32ZmIndexes(cols int) (indexes []Index) { return indexes } +func (i *ZoneMapIndex) GetCol() int16 { + return i.Col +} + +func (i *ZoneMapIndex) Eval(ctx *FilterCtx) error { + switch ctx.Op { + case OpEq: + ctx.BoolRes = i.Eq(ctx.Val) + case OpNe: + ctx.BoolRes = i.Ne(ctx.Val) + case OpGe: + ctx.BoolRes = i.Ge(ctx.Val) + case OpGt: + ctx.BoolRes = i.Gt(ctx.Val) + case OpLe: + ctx.BoolRes = i.Le(ctx.Val) + case OpLt: + ctx.BoolRes = i.Lt(ctx.Val) + case OpIn: + ctx.BoolRes = i.Ge(ctx.ValMin) && i.Le(ctx.ValMax) + case OpOut: + ctx.BoolRes = i.Lt(ctx.ValMin) || i.Gt(ctx.ValMax) + } + return nil +} + func (i *ZoneMapIndex) FreeMemory() { if i.FreeFunc != nil { i.FreeFunc(i)