diff --git a/pkg/container/vector/types.go b/pkg/container/vector/types.go index b57842d0a40d9f81858370707b7c3e4f9093902f..2adefd8692646acb249636e1a76108d4b8dfc792 100644 --- a/pkg/container/vector/types.go +++ b/pkg/container/vector/types.go @@ -3,6 +3,7 @@ package vector import ( "matrixbase/pkg/container/nulls" "matrixbase/pkg/container/types" + "unsafe" ) /* @@ -42,3 +43,9 @@ type Vector struct { Col interface{} Nsp *nulls.Nulls } + +// emptyInterface is the header for an interface{} value. +type emptyInterface struct { + _ *int + word unsafe.Pointer +} diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go index 2db14ee3ee114c0bd06256053f0c9c139b56c80d..1824f3d0b53f3bb866075d1b8e6a810b5b39329d 100644 --- a/pkg/container/vector/vector.go +++ b/pkg/container/vector/vector.go @@ -122,7 +122,7 @@ func (v *Vector) Reset() { case types.T_char, types.T_varchar, types.T_json: v.Col.(*types.Bytes).Reset() default: - *(*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&v.Col)) + uintptr(strconv.IntSize>>3))) = 0 + *(*int)(unsafe.Pointer(uintptr((*(*emptyInterface)(unsafe.Pointer(&v.Col))).word) + uintptr(strconv.IntSize>>3))) = 0 } } @@ -143,7 +143,7 @@ func (v *Vector) Length() int { case types.T_char, types.T_varchar, types.T_json: return len(v.Col.(*types.Bytes).Offsets) default: - hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v.Col)) + hp := *(*reflect.SliceHeader)((*(*emptyInterface)(unsafe.Pointer(&v.Col))).word) return hp.Len } } @@ -158,10 +158,10 @@ func (v *Vector) Window(start, end int) *Vector { } default: col := v.Col - ptr := unsafe.Pointer(&col) + ptr := (*(*emptyInterface)(unsafe.Pointer(&col))).word data := *(*uintptr)(unsafe.Pointer(uintptr(ptr))) *(*uintptr)(unsafe.Pointer(uintptr(ptr))) = data + uintptr(v.Typ.Size)*uintptr(start) - *(*int)(unsafe.Pointer(uintptr(ptr) + uintptr(strconv.IntSize>>3))) = end - start + 1 + *(*int)(unsafe.Pointer(uintptr(ptr) + uintptr(strconv.IntSize>>3))) = end - start return &Vector{ Typ: v.Typ, Col: col, @@ -308,6 +308,7 @@ func (v *Vector) Copy(w *Vector, vi, wi int64, proc *process.Process) error { if err != nil { return err } + copy(buf[:mempool.CountSize], v.Data[:mempool.CountSize]) copy(buf[mempool.CountSize:], vs.Data[:vs.Offsets[vi]]) copy(buf[mempool.CountSize+vs.Offsets[vi]:], data) o := vs.Offsets[vi] + vs.Lengths[vi] @@ -337,8 +338,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeInt8Slice(data[mempool.CountSize : mempool.CountSize+len(col)]) v.Data = data @@ -356,8 +359,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeInt16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*2]) v.Data = data @@ -375,8 +380,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeInt32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4]) v.Data = data @@ -413,8 +420,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeUint8Slice(data[mempool.CountSize : mempool.CountSize+len(col)]) v.Data = data @@ -432,8 +441,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeUint16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*2]) v.Data = data @@ -451,8 +462,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeUint32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4]) v.Data = data @@ -470,8 +483,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeInt16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8]) v.Data = data @@ -489,8 +504,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeDecimalSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DecimalSize]) v.Data = data @@ -508,8 +525,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeFloat32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4]) v.Data = data @@ -527,8 +546,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeFloat64Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8]) v.Data = data @@ -546,8 +567,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeDateSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DateSize]) v.Data = data @@ -565,8 +588,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } v.Col = encoding.DecodeDecimalSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DatetimeSize]) v.Data = data @@ -589,13 +614,14 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { return err } if v.Data != nil { - copy(data[mempool.CountSize:], v.Data[mempool.CountSize:]) + copy(data, v.Data) proc.Free(v.Data) + } else { + copy(data[:mempool.CountSize], w.Data[:mempool.CountSize]) } data = data[:mempool.CountSize+len(col.Data)] v.Data = data col.Data = data[mempool.CountSize:] - } } col.Lengths = append(col.Lengths, uint32(len(from))) diff --git a/pkg/container/vector/vector_test.go b/pkg/container/vector/vector_test.go index c0851909afb72f8bd1258a492d9fde3082754fb6..aff16b7f2d50749d1b02ede44db616edd7176743 100644 --- a/pkg/container/vector/vector_test.go +++ b/pkg/container/vector/vector_test.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "matrixbase/pkg/container/types" + "matrixbase/pkg/encoding" "matrixbase/pkg/vm/mempool" "matrixbase/pkg/vm/mmu/guest" "matrixbase/pkg/vm/mmu/host" @@ -23,6 +24,7 @@ func TestVector(t *testing.T) { if err := v.Append(vs); err != nil { log.Fatal(err) } + v.Data = encoding.EncodeInt64(1) } hm := host.New(1 << 20) gm := guest.New(1<<20, hm) diff --git a/pkg/sql/colexec/mergetop/top.go b/pkg/sql/colexec/mergetop/top.go new file mode 100644 index 0000000000000000000000000000000000000000..f797fffdd50ea1c04b2d0bcbf12e043af2caaba1 --- /dev/null +++ b/pkg/sql/colexec/mergetop/top.go @@ -0,0 +1,230 @@ +package mergetop + +import ( + "bytes" + "container/heap" + "fmt" + "matrixbase/pkg/compare" + "matrixbase/pkg/container/batch" + "matrixbase/pkg/container/vector" + "matrixbase/pkg/encoding" + "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/process" + "matrixbase/pkg/vm/register" +) + +func String(arg interface{}, buf *bytes.Buffer) { + n := arg.(*Argument) + buf.WriteString("蟿([") + for i, f := range n.Fs { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString(f.String()) + } + buf.WriteString(fmt.Sprintf("], %v)", n.Limit)) +} + +func Prepare(proc *process.Process, arg interface{}) error { + n := arg.(*Argument) + ctr := &n.Ctr + { + ctr.attrs = make([]string, len(n.Fs)) + for i, f := range n.Fs { + ctr.attrs[i] = f.Attr + } + } + { + data, err := proc.Alloc(n.Limit * 8) + if err != nil { + return err + } + sels := encoding.DecodeInt64Slice(data[mempool.CountSize : mempool.CountSize+n.Limit*8]) + for i := int64(0); i < n.Limit; i++ { + sels[i] = i + } + n.Ctr.data = data + n.Ctr.sels = sels[:0] + } + n.Ctr.n = len(n.Fs) + return nil +} + +func Call(proc *process.Process, arg interface{}) (bool, error) { + n := arg.(*Argument) + ctr := &n.Ctr + for { + if len(proc.Reg.Ws) == 0 { + break + } + for i := 0; i < len(proc.Reg.Ws); i++ { + reg := proc.Reg.Ws[i] + v := <-reg.Ch + if v == nil { + reg.Wg.Done() + proc.Reg.Ws = append(proc.Reg.Ws[:i], proc.Reg.Ws[i+1:]...) + i-- + continue + } + bat := v.(*batch.Batch) + if ctr.bat == nil { + bat.Reorder(ctr.attrs) + } else { + bat.Reorder(ctr.bat.Attrs) + } + if err := bat.Prefetch(bat.Attrs, bat.Vecs, proc); err != nil { + reg.Wg.Done() + ctr.clean(bat, proc) + return true, err + } + if ctr.bat == nil { + ctr.bat = batch.New(true, bat.Attrs) + for i, vec := range bat.Vecs { + ctr.bat.Vecs[i] = vector.New(vec.Typ) + } + ctr.cmps = make([]compare.Compare, len(bat.Attrs)) + for i, f := range n.Fs { + n.Ctr.cmps[i] = compare.New(bat.Vecs[i].Typ.Oid, f.Type == Descending) + } + for i, j := len(n.Fs), len(bat.Attrs); i < j; i++ { + n.Ctr.cmps[i] = compare.New(bat.Vecs[i].Typ.Oid, false) + } + } + if len(bat.Sels) == 0 { + if err := ctr.processBatch(n.Limit, bat, proc); err != nil { + reg.Wg.Done() + ctr.clean(bat, proc) + return true, err + } + } else { + if err := ctr.processBatchSels(n.Limit, bat, proc); err != nil { + reg.Wg.Done() + ctr.clean(bat, proc) + return true, err + } + } + bat.Clean(proc) + reg.Wg.Done() + } + } + for i, cmp := range ctr.cmps { + ctr.bat.Vecs[i] = cmp.Vector() + } + sels := make([]int64, len(ctr.sels)) + for i, j := 0, len(ctr.sels); i < j; i++ { + sels[len(sels)-1-i] = heap.Pop(ctr).(int64) + } + ctr.sels = append(ctr.sels, sels...) // no expansion here + ctr.bat.Sels = ctr.sels + ctr.bat.SelsData = ctr.data + ctr.bat.Reduce(ctr.attrs, proc) + proc.Reg.Ax = ctr.bat + ctr.bat = nil + ctr.data = nil + ctr.clean(nil, proc) + return true, nil +} + +func (ctr *Container) processBatch(limit int64, bat *batch.Batch, proc *process.Process) error { + var start int64 + + length := int64(bat.Vecs[0].Length()) + if n := int64(len(ctr.sels)); n < limit { + start = limit - n + if start > length { + start = length + } + for i := int64(0); i < start; i++ { + for j, vec := range ctr.bat.Vecs { + if err := vec.UnionOne(bat.Vecs[j], int64(i), proc); err != nil { + return err + } + } + ctr.sels = append(ctr.sels, n) + n++ + } + if n == limit { + for i, cmp := range ctr.cmps { + cmp.Set(0, ctr.bat.Vecs[i]) + } + heap.Init(ctr) + } + } + if start == length { + return nil + } + for i, cmp := range ctr.cmps { + cmp.Set(1, bat.Vecs[i]) + } + for i, j := start, length; i < j; i++ { + if ctr.compare(1, 0, i, ctr.sels[0]) < 0 { + for _, cmp := range ctr.cmps { + if err := cmp.Copy(1, 0, i, ctr.sels[0], proc); err != nil { + return err + } + } + heap.Fix(ctr, 0) + } + } + return nil +} + +func (ctr *Container) processBatchSels(limit int64, bat *batch.Batch, proc *process.Process) error { + var start int64 + + length := int64(len(bat.Sels)) + if n := int64(len(ctr.sels)); n < limit { + start = limit - n + if start > length { + start = length + } + for i := int64(0); i < start; i++ { + for j, vec := range ctr.bat.Vecs { + if err := vec.UnionOne(bat.Vecs[j], bat.Sels[i], proc); err != nil { + return err + } + } + ctr.sels = append(ctr.sels, n) + n++ + } + if n == limit { + for i, cmp := range ctr.cmps { + cmp.Set(0, ctr.bat.Vecs[i]) + } + heap.Init(ctr) + } + } + if start == length { + return nil + } + for i, cmp := range ctr.cmps { + cmp.Set(1, bat.Vecs[i]) + } + for i, j := start, length; i < j; i++ { + sel := bat.Sels[i] + if ctr.compare(1, 0, sel, ctr.sels[0]) < 0 { + for _, cmp := range ctr.cmps { + if err := cmp.Copy(1, 0, sel, ctr.sels[0], proc); err != nil { + return err + } + } + heap.Fix(ctr, 0) + } + } + return nil +} + +func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) { + if bat != nil { + bat.Clean(proc) + } + if ctr.bat != nil { + ctr.bat.Clean(proc) + } + if ctr.data != nil { + proc.Free(ctr.data) + ctr.data = nil + ctr.sels = nil + } + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/mergetop/types.go b/pkg/sql/colexec/mergetop/types.go new file mode 100644 index 0000000000000000000000000000000000000000..273f4cd97cd53b5e829c89ab3633f168ae4292de --- /dev/null +++ b/pkg/sql/colexec/mergetop/types.go @@ -0,0 +1,91 @@ +package mergetop + +import ( + "fmt" + "matrixbase/pkg/compare" + "matrixbase/pkg/container/batch" +) + +// Direction for ordering results. +type Direction int8 + +// Direction values. +const ( + DefaultDirection Direction = iota + Ascending + Descending +) + +type Container struct { + n int // number of attributes involved in sorting + data []byte + sels []int64 + attrs []string + bat *batch.Batch + cmps []compare.Compare +} + +type Field struct { + Attr string + Type Direction +} + +type Argument struct { + Limit int64 + Fs []Field + Ctr Container +} + +var directionName = [...]string{ + DefaultDirection: "", + Ascending: "ASC", + Descending: "DESC", +} + +func (n Field) String() string { + s := n.Attr + if n.Type != DefaultDirection { + s += " " + n.Type.String() + } + return s +} + +func (i Direction) String() string { + if i < 0 || i > Direction(len(directionName)-1) { + return fmt.Sprintf("Direction(%d)", i) + } + return directionName[i] +} + +func (ctr *Container) compare(vi, vj int, i, j int64) int { + for k := 0; k < ctr.n; k++ { + if r := ctr.cmps[k].Compare(vi, vj, i, j); r != 0 { + return r + } + } + return 0 +} + +// maximum heap +func (ctr *Container) Len() int { + return len(ctr.sels) +} + +func (ctr *Container) Less(i, j int) bool { + return ctr.compare(0, 0, ctr.sels[i], ctr.sels[j]) > 0 +} + +func (ctr *Container) Swap(i, j int) { + ctr.sels[i], ctr.sels[j] = ctr.sels[j], ctr.sels[i] +} + +func (ctr *Container) Push(x interface{}) { + ctr.sels = append(ctr.sels, x.(int64)) +} + +func (ctr *Container) Pop() interface{} { + n := len(ctr.sels) - 1 + x := ctr.sels[n] + ctr.sels = ctr.sels[:n] + return x +} diff --git a/pkg/sql/colexec/top/top.go b/pkg/sql/colexec/top/top.go index d3eb2a6dc1144ffabf9843ea80a291ce7e1eef93..f97f32ef28eadaaa768f15ecc4393e77fb54f1fd 100644 --- a/pkg/sql/colexec/top/top.go +++ b/pkg/sql/colexec/top/top.go @@ -1,22 +1,36 @@ package top import ( + "bytes" "container/heap" + "fmt" "matrixbase/pkg/compare" "matrixbase/pkg/container/batch" - "matrixbase/pkg/container/vector" "matrixbase/pkg/encoding" "matrixbase/pkg/vm/mempool" "matrixbase/pkg/vm/process" "matrixbase/pkg/vm/register" ) +func String(arg interface{}, buf *bytes.Buffer) { + n := arg.(*Argument) + buf.WriteString("蟿([") + for i, f := range n.Fs { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString(f.String()) + } + buf.WriteString(fmt.Sprintf("], %v)", n.Limit)) +} + func Prepare(proc *process.Process, arg interface{}) error { n := arg.(*Argument) + ctr := &n.Ctr { - n.Attrs = make([]string, len(n.Fs)) + ctr.attrs = make([]string, len(n.Fs)) for i, f := range n.Fs { - n.Attrs[i] = f.Attr + ctr.attrs[i] = f.Attr } } { @@ -28,15 +42,11 @@ func Prepare(proc *process.Process, arg interface{}) error { for i := int64(0); i < n.Limit; i++ { sels[i] = i } - n.Ctr.sels = sels - n.Ctr.selsData = data + n.Ctr.data = data + n.Ctr.sels = sels[:n.Limit] } n.Ctr.n = len(n.Fs) - n.Ctr.vecs = make([]*vector.Vector, len(n.Fs)) n.Ctr.cmps = make([]compare.Compare, len(n.Fs)) - for i, f := range n.Fs { - n.Ctr.cmps[i] = compare.New(f.Oid, f.Type == Descending) - } return nil } @@ -46,76 +56,90 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { if proc.Reg.Ax == nil { return false, nil } - n := arg.(Argument) + n := arg.(*Argument) + ctr := &n.Ctr bat := proc.Reg.Ax.(*batch.Batch) - if err = bat.Prefetch(n.Attrs, n.Ctr.vecs, proc); err != nil { - clean(&n.Ctr, bat, proc) + bat.Reorder(ctr.attrs) + { + for i, f := range n.Fs { + n.Ctr.cmps[i] = compare.New(bat.Vecs[i].Typ.Oid, f.Type == Descending) + } + } + if err = bat.Prefetch(ctr.attrs, bat.Vecs, proc); err != nil { + ctr.clean(bat, proc) return false, err } - processBatch(n, bat) - data, err := proc.Alloc(int64(len(n.Ctr.sels)) * 8) + ctr.processBatch(n.Limit, bat) + data, err := proc.Alloc(int64(len(ctr.sels) * 8)) if err != nil { - clean(&n.Ctr, bat, proc) + ctr.clean(bat, proc) return false, err } sels := encoding.DecodeInt64Slice(data[mempool.CountSize:]) - for i, j := 0, len(n.Ctr.sels); i < j; i++ { - sels[len(sels)-1-i] = heap.Pop(&n.Ctr).(int64) + sels = sels[:len(ctr.sels)] + for i, j := 0, len(ctr.sels); i < j; i++ { + sels[len(sels)-1-i] = heap.Pop(ctr).(int64) } if len(bat.Sels) > 0 { proc.Free(bat.SelsData) } bat.Sels = sels bat.SelsData = data - bat.Reduce(n.Attrs, proc) proc.Reg.Ax = bat + ctr.clean(nil, proc) register.FreeRegisters(proc) return false, nil } -func processBatch(n Argument, bat *batch.Batch) { +func (ctr *Container) processBatch(limit int64, bat *batch.Batch) { + for i, cmp := range ctr.cmps { + cmp.Set(0, bat.Vecs[i]) + cmp.Set(1, bat.Vecs[i]) + } if length := int64(len(bat.Sels)); length > 0 { - if length < n.Limit { + if length < limit { for i := int64(0); i < length; i++ { - n.Ctr.sels[i] = bat.Sels[i] + ctr.sels[i] = bat.Sels[i] } - n.Ctr.sels = n.Ctr.sels[:length] - heap.Init(&n.Ctr) + ctr.sels = ctr.sels[:length] + heap.Init(ctr) return } - for i := int64(0); i < n.Limit; i++ { - n.Ctr.sels[i] = bat.Sels[i] + for i := int64(0); i < limit; i++ { + ctr.sels[i] = bat.Sels[i] } - heap.Init(&n.Ctr) - for i, j := n.Limit, length; i < j; i++ { - if n.Ctr.compare(bat.Sels[i], n.Ctr.sels[0]) < 0 { - n.Ctr.sels[0] = bat.Sels[i] + heap.Init(ctr) + for i, j := limit, length; i < j; i++ { + if ctr.compare(bat.Sels[i], ctr.sels[0]) < 0 { + ctr.sels[0] = bat.Sels[i] } - heap.Fix(&n.Ctr, 0) + heap.Fix(ctr, 0) } return } - length := int64(n.Ctr.vecs[0].Length()) - if length < n.Limit { - n.Ctr.sels = n.Ctr.sels[:length] - heap.Init(&n.Ctr) + length := int64(bat.Vecs[0].Length()) + if length < limit { + ctr.sels = ctr.sels[:length] + heap.Init(ctr) return } - heap.Init(&n.Ctr) - for i, j := n.Limit, length; i < j; i++ { - if n.Ctr.compare(i, n.Ctr.sels[0]) < 0 { - n.Ctr.sels[0] = i + heap.Init(ctr) + for i, j := limit, length; i < j; i++ { + if ctr.compare(i, ctr.sels[0]) < 0 { + ctr.sels[0] = i } - heap.Fix(&n.Ctr, 0) + heap.Fix(ctr, 0) } } -func clean(ctr *Container, bat *batch.Batch, proc *process.Process) { - if ctr.selsData != nil { - proc.Free(ctr.selsData) +func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) { + if bat != nil { + bat.Clean(proc) + } + if ctr.data != nil { + proc.Free(ctr.data) + ctr.data = nil ctr.sels = nil - ctr.selsData = nil } - bat.Clean(proc) register.FreeRegisters(proc) } diff --git a/pkg/sql/colexec/top/types.go b/pkg/sql/colexec/top/types.go index cf007562479f400f910fb0f37b3b5175915ed778..b60476d204d50c5e58f0d8eadd2298e7da38ff97 100644 --- a/pkg/sql/colexec/top/types.go +++ b/pkg/sql/colexec/top/types.go @@ -1,9 +1,8 @@ package top import ( + "fmt" "matrixbase/pkg/compare" - "matrixbase/pkg/container/types" - "matrixbase/pkg/container/vector" ) // Direction for ordering results. @@ -17,15 +16,14 @@ const ( ) type Container struct { - n int // number of attributes involved in sorting - sels []int64 - selsData []byte - vecs []*vector.Vector - cmps []compare.Compare + n int // number of attributes involved in sorting + data []byte + sels []int64 + attrs []string + cmps []compare.Compare } type Field struct { - Oid types.T Attr string Type Direction } @@ -33,10 +31,30 @@ type Field struct { type Argument struct { Limit int64 Fs []Field - Attrs []string Ctr Container } +var directionName = [...]string{ + DefaultDirection: "", + Ascending: "ASC", + Descending: "DESC", +} + +func (n Field) String() string { + s := n.Attr + if n.Type != DefaultDirection { + s += " " + n.Type.String() + } + return s +} + +func (i Direction) String() string { + if i < 0 || i > Direction(len(directionName)-1) { + return fmt.Sprintf("Direction(%d)", i) + } + return directionName[i] +} + func (ctr *Container) compare(i, j int64) int { for k := 0; k < ctr.n; k++ { if r := ctr.cmps[k].Compare(0, 0, i, j); r != 0 { diff --git a/pkg/sql/colexec/unittest/top_test.go b/pkg/sql/colexec/unittest/top_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a06d411d1ce81e3c1730ccda3b4ba29890bd6193 --- /dev/null +++ b/pkg/sql/colexec/unittest/top_test.go @@ -0,0 +1,94 @@ +package unittest + +import ( + "fmt" + "matrixbase/pkg/sql/colexec/mergetop" + "matrixbase/pkg/sql/colexec/top" + "matrixbase/pkg/sql/colexec/transfer" + "matrixbase/pkg/vm" + "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/mmu/guest" + "matrixbase/pkg/vm/mmu/host" + "matrixbase/pkg/vm/pipeline" + "matrixbase/pkg/vm/process" + "sync" + "testing" +) + +func TestTop(t *testing.T) { + var wg sync.WaitGroup + var ins vm.Instructions + + hm := host.New(1 << 20) + gm := guest.New(1<<20, hm) + proc := process.New(gm, mempool.New(1<<32, 8)) + { + proc.Refer = make(map[string]uint64) + proc.Reg.Ws = make([]*process.WaitRegister, 2) + for i := 0; i < 2; i++ { + proc.Reg.Ws[i] = &process.WaitRegister{ + Wg: new(sync.WaitGroup), + Ch: make(chan interface{}), + } + } + } + { + var rins vm.Instructions + + rproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8)) + { + rproc.Refer = make(map[string]uint64) + } + { + var fs []top.Field + + fs = append(fs, top.Field{"orderId", top.Descending}) + rins = append(rins, vm.Instruction{vm.Top, &top.Argument{Limit: 3, Fs: fs}}) + } + rins = append(rins, vm.Instruction{vm.Transfer, &transfer.Argument{Mmu: gm, Reg: proc.Reg.Ws[0]}}) + rp := pipeline.New([]uint64{1, 1, 1}, []string{"orderId", "uid", "price"}, rins) + wg.Add(1) + go func() { + fmt.Printf("S[segment 0]: %s\n", rp) + rp.Run(segments("R", rproc)[:1], rproc) + fmt.Printf("S[segment 0] - guest: %v, host: %v\n", rproc.Size(), rproc.HostSize()) + wg.Done() + }() + } + { + var sins vm.Instructions + + sproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8)) + { + sproc.Refer = make(map[string]uint64) + } + { + var fs []top.Field + + fs = append(fs, top.Field{"orderId", top.Descending}) + sins = append(sins, vm.Instruction{vm.Top, &top.Argument{Limit: 3, Fs: fs}}) + } + sins = append(sins, vm.Instruction{vm.Transfer, &transfer.Argument{Mmu: gm, Reg: proc.Reg.Ws[1]}}) + sp := pipeline.New([]uint64{1, 1, 1}, []string{"uid", "price", "orderId"}, sins) + wg.Add(1) + go func() { + fmt.Printf("S[segment 1]: %s\n", sp) + sp.Run(segments("R", sproc)[1:2], sproc) + fmt.Printf("S[segment 1] - guest: %v, host: %v\n", sproc.Size(), sproc.HostSize()) + wg.Done() + }() + } + { + var fs []mergetop.Field + + fs = append(fs, mergetop.Field{"orderId", mergetop.Descending}) + ins = append(ins, vm.Instruction{vm.MergeTop, &mergetop.Argument{Limit: 3, Fs: fs}}) + } + ins = append(ins, vm.Instruction{vm.Output, nil}) + p := pipeline.NewMerge(ins) + fmt.Printf("%s\n", p) + p.RunMerge(proc) + fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize()) + wg.Wait() + fmt.Printf("************\n") +} diff --git a/pkg/vm/types.go b/pkg/vm/types.go index 43ac6d68ca27b4dfc9893181ff88b9025530b48c..045c0f8f987e1e141977c0f30c3b91bda3b27c06 100644 --- a/pkg/vm/types.go +++ b/pkg/vm/types.go @@ -22,6 +22,7 @@ const ( SetNaturalJoin SetSemiDifference // unsuitable name is anti join Output + MergeTop MergeDedup MergeSummarize ) diff --git a/pkg/vm/vm.go b/pkg/vm/vm.go index 4cf3895a22f58ced6834ee8e67ebecc26496e919..0f51a1aec96f1cbf75288bd9dcb1dff7e9714add 100644 --- a/pkg/vm/vm.go +++ b/pkg/vm/vm.go @@ -7,10 +7,12 @@ import ( "matrixbase/pkg/sql/colexec/hashset/natural" "matrixbase/pkg/sql/colexec/limit" "matrixbase/pkg/sql/colexec/mergededup" + "matrixbase/pkg/sql/colexec/mergetop" "matrixbase/pkg/sql/colexec/offset" "matrixbase/pkg/sql/colexec/output" "matrixbase/pkg/sql/colexec/projection" "matrixbase/pkg/sql/colexec/restrict" + "matrixbase/pkg/sql/colexec/top" "matrixbase/pkg/sql/colexec/transfer" "matrixbase/pkg/vm/process" ) @@ -22,6 +24,7 @@ func String(ins Instructions, buf *bytes.Buffer) { } switch in.Op { case Top: + top.String(in.Arg, buf) case Dedup: dedup.String(in.Arg, buf) case Limit: @@ -45,6 +48,8 @@ func String(ins Instructions, buf *bytes.Buffer) { natural.String(in.Arg, buf) case Output: output.String(in.Arg, buf) + case MergeTop: + mergetop.String(in.Arg, buf) case MergeDedup: mergededup.String(in.Arg, buf) } @@ -76,6 +81,9 @@ func Prepare(ins Instructions, proc *process.Process) error { for _, in := range ins { switch in.Op { case Top: + if err := top.Prepare(proc, in.Arg); err != nil { + return err + } case Dedup: if err := dedup.Prepare(proc, in.Arg); err != nil { return err @@ -117,6 +125,10 @@ func Prepare(ins Instructions, proc *process.Process) error { if err := output.Prepare(proc, in.Arg); err != nil { return err } + case MergeTop: + if err := mergetop.Prepare(proc, in.Arg); err != nil { + return err + } case MergeDedup: if err := mergededup.Prepare(proc, in.Arg); err != nil { return err @@ -134,6 +146,7 @@ func Run(ins Instructions, proc *process.Process) (bool, error) { for _, in := range ins { switch in.Op { case Top: + ok, err = top.Call(proc, in.Arg) case Dedup: ok, err = dedup.Call(proc, in.Arg) case Limit: @@ -157,6 +170,8 @@ func Run(ins Instructions, proc *process.Process) (bool, error) { ok, err = natural.Call(proc, in.Arg) case Output: ok, err = output.Call(proc, in.Arg) + case MergeTop: + ok, err = mergetop.Call(proc, in.Arg) case MergeDedup: ok, err = mergededup.Call(proc, in.Arg) }