Skip to content
Snippets Groups Projects
Commit 6321e9b0 authored by nnsgmsone's avatar nnsgmsone Committed by GitHub
Browse files

Add top (#24)

parent b67ebee7
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@ package vector
import (
"matrixbase/pkg/container/nulls"
"matrixbase/pkg/container/types"
"unsafe"
)
/*
......@@ -42,3 +43,9 @@ type Vector struct {
Col interface{}
Nsp *nulls.Nulls
}
// emptyInterface is the header for an interface{} value.
type emptyInterface struct {
_ *int
word unsafe.Pointer
}
......@@ -122,7 +122,7 @@ func (v *Vector) Reset() {
case types.T_char, types.T_varchar, types.T_json:
v.Col.(*types.Bytes).Reset()
default:
*(*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&v.Col)) + uintptr(strconv.IntSize>>3))) = 0
*(*int)(unsafe.Pointer(uintptr((*(*emptyInterface)(unsafe.Pointer(&v.Col))).word) + uintptr(strconv.IntSize>>3))) = 0
}
}
......@@ -143,7 +143,7 @@ func (v *Vector) Length() int {
case types.T_char, types.T_varchar, types.T_json:
return len(v.Col.(*types.Bytes).Offsets)
default:
hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v.Col))
hp := *(*reflect.SliceHeader)((*(*emptyInterface)(unsafe.Pointer(&v.Col))).word)
return hp.Len
}
}
......@@ -158,10 +158,10 @@ func (v *Vector) Window(start, end int) *Vector {
}
default:
col := v.Col
ptr := unsafe.Pointer(&col)
ptr := (*(*emptyInterface)(unsafe.Pointer(&col))).word
data := *(*uintptr)(unsafe.Pointer(uintptr(ptr)))
*(*uintptr)(unsafe.Pointer(uintptr(ptr))) = data + uintptr(v.Typ.Size)*uintptr(start)
*(*int)(unsafe.Pointer(uintptr(ptr) + uintptr(strconv.IntSize>>3))) = end - start + 1
*(*int)(unsafe.Pointer(uintptr(ptr) + uintptr(strconv.IntSize>>3))) = end - start
return &Vector{
Typ: v.Typ,
Col: col,
......@@ -308,6 +308,7 @@ func (v *Vector) Copy(w *Vector, vi, wi int64, proc *process.Process) error {
if err != nil {
return err
}
copy(buf[:mempool.CountSize], v.Data[:mempool.CountSize])
copy(buf[mempool.CountSize:], vs.Data[:vs.Offsets[vi]])
copy(buf[mempool.CountSize+vs.Offsets[vi]:], data)
o := vs.Offsets[vi] + vs.Lengths[vi]
......@@ -337,8 +338,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeInt8Slice(data[mempool.CountSize : mempool.CountSize+len(col)])
v.Data = data
......@@ -356,8 +359,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeInt16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*2])
v.Data = data
......@@ -375,8 +380,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeInt32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4])
v.Data = data
......@@ -413,8 +420,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeUint8Slice(data[mempool.CountSize : mempool.CountSize+len(col)])
v.Data = data
......@@ -432,8 +441,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeUint16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*2])
v.Data = data
......@@ -451,8 +462,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeUint32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4])
v.Data = data
......@@ -470,8 +483,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeInt16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8])
v.Data = data
......@@ -489,8 +504,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeDecimalSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DecimalSize])
v.Data = data
......@@ -508,8 +525,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeFloat32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4])
v.Data = data
......@@ -527,8 +546,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeFloat64Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8])
v.Data = data
......@@ -546,8 +567,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeDateSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DateSize])
v.Data = data
......@@ -565,8 +588,10 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
v.Col = encoding.DecodeDecimalSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DatetimeSize])
v.Data = data
......@@ -589,13 +614,14 @@ func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
copy(data, v.Data)
proc.Free(v.Data)
} else {
copy(data[:mempool.CountSize], w.Data[:mempool.CountSize])
}
data = data[:mempool.CountSize+len(col.Data)]
v.Data = data
col.Data = data[mempool.CountSize:]
}
}
col.Lengths = append(col.Lengths, uint32(len(from)))
......
......@@ -4,6 +4,7 @@ import (
"fmt"
"log"
"matrixbase/pkg/container/types"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/mmu/guest"
"matrixbase/pkg/vm/mmu/host"
......@@ -23,6 +24,7 @@ func TestVector(t *testing.T) {
if err := v.Append(vs); err != nil {
log.Fatal(err)
}
v.Data = encoding.EncodeInt64(1)
}
hm := host.New(1 << 20)
gm := guest.New(1<<20, hm)
......
package mergetop
import (
"bytes"
"container/heap"
"fmt"
"matrixbase/pkg/compare"
"matrixbase/pkg/container/batch"
"matrixbase/pkg/container/vector"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
func String(arg interface{}, buf *bytes.Buffer) {
n := arg.(*Argument)
buf.WriteString("τ([")
for i, f := range n.Fs {
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString(f.String())
}
buf.WriteString(fmt.Sprintf("], %v)", n.Limit))
}
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(*Argument)
ctr := &n.Ctr
{
ctr.attrs = make([]string, len(n.Fs))
for i, f := range n.Fs {
ctr.attrs[i] = f.Attr
}
}
{
data, err := proc.Alloc(n.Limit * 8)
if err != nil {
return err
}
sels := encoding.DecodeInt64Slice(data[mempool.CountSize : mempool.CountSize+n.Limit*8])
for i := int64(0); i < n.Limit; i++ {
sels[i] = i
}
n.Ctr.data = data
n.Ctr.sels = sels[:0]
}
n.Ctr.n = len(n.Fs)
return nil
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(*Argument)
ctr := &n.Ctr
for {
if len(proc.Reg.Ws) == 0 {
break
}
for i := 0; i < len(proc.Reg.Ws); i++ {
reg := proc.Reg.Ws[i]
v := <-reg.Ch
if v == nil {
reg.Wg.Done()
proc.Reg.Ws = append(proc.Reg.Ws[:i], proc.Reg.Ws[i+1:]...)
i--
continue
}
bat := v.(*batch.Batch)
if ctr.bat == nil {
bat.Reorder(ctr.attrs)
} else {
bat.Reorder(ctr.bat.Attrs)
}
if err := bat.Prefetch(bat.Attrs, bat.Vecs, proc); err != nil {
reg.Wg.Done()
ctr.clean(bat, proc)
return true, err
}
if ctr.bat == nil {
ctr.bat = batch.New(true, bat.Attrs)
for i, vec := range bat.Vecs {
ctr.bat.Vecs[i] = vector.New(vec.Typ)
}
ctr.cmps = make([]compare.Compare, len(bat.Attrs))
for i, f := range n.Fs {
n.Ctr.cmps[i] = compare.New(bat.Vecs[i].Typ.Oid, f.Type == Descending)
}
for i, j := len(n.Fs), len(bat.Attrs); i < j; i++ {
n.Ctr.cmps[i] = compare.New(bat.Vecs[i].Typ.Oid, false)
}
}
if len(bat.Sels) == 0 {
if err := ctr.processBatch(n.Limit, bat, proc); err != nil {
reg.Wg.Done()
ctr.clean(bat, proc)
return true, err
}
} else {
if err := ctr.processBatchSels(n.Limit, bat, proc); err != nil {
reg.Wg.Done()
ctr.clean(bat, proc)
return true, err
}
}
bat.Clean(proc)
reg.Wg.Done()
}
}
for i, cmp := range ctr.cmps {
ctr.bat.Vecs[i] = cmp.Vector()
}
sels := make([]int64, len(ctr.sels))
for i, j := 0, len(ctr.sels); i < j; i++ {
sels[len(sels)-1-i] = heap.Pop(ctr).(int64)
}
ctr.sels = append(ctr.sels, sels...) // no expansion here
ctr.bat.Sels = ctr.sels
ctr.bat.SelsData = ctr.data
ctr.bat.Reduce(ctr.attrs, proc)
proc.Reg.Ax = ctr.bat
ctr.bat = nil
ctr.data = nil
ctr.clean(nil, proc)
return true, nil
}
func (ctr *Container) processBatch(limit int64, bat *batch.Batch, proc *process.Process) error {
var start int64
length := int64(bat.Vecs[0].Length())
if n := int64(len(ctr.sels)); n < limit {
start = limit - n
if start > length {
start = length
}
for i := int64(0); i < start; i++ {
for j, vec := range ctr.bat.Vecs {
if err := vec.UnionOne(bat.Vecs[j], int64(i), proc); err != nil {
return err
}
}
ctr.sels = append(ctr.sels, n)
n++
}
if n == limit {
for i, cmp := range ctr.cmps {
cmp.Set(0, ctr.bat.Vecs[i])
}
heap.Init(ctr)
}
}
if start == length {
return nil
}
for i, cmp := range ctr.cmps {
cmp.Set(1, bat.Vecs[i])
}
for i, j := start, length; i < j; i++ {
if ctr.compare(1, 0, i, ctr.sels[0]) < 0 {
for _, cmp := range ctr.cmps {
if err := cmp.Copy(1, 0, i, ctr.sels[0], proc); err != nil {
return err
}
}
heap.Fix(ctr, 0)
}
}
return nil
}
func (ctr *Container) processBatchSels(limit int64, bat *batch.Batch, proc *process.Process) error {
var start int64
length := int64(len(bat.Sels))
if n := int64(len(ctr.sels)); n < limit {
start = limit - n
if start > length {
start = length
}
for i := int64(0); i < start; i++ {
for j, vec := range ctr.bat.Vecs {
if err := vec.UnionOne(bat.Vecs[j], bat.Sels[i], proc); err != nil {
return err
}
}
ctr.sels = append(ctr.sels, n)
n++
}
if n == limit {
for i, cmp := range ctr.cmps {
cmp.Set(0, ctr.bat.Vecs[i])
}
heap.Init(ctr)
}
}
if start == length {
return nil
}
for i, cmp := range ctr.cmps {
cmp.Set(1, bat.Vecs[i])
}
for i, j := start, length; i < j; i++ {
sel := bat.Sels[i]
if ctr.compare(1, 0, sel, ctr.sels[0]) < 0 {
for _, cmp := range ctr.cmps {
if err := cmp.Copy(1, 0, sel, ctr.sels[0], proc); err != nil {
return err
}
}
heap.Fix(ctr, 0)
}
}
return nil
}
func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
if bat != nil {
bat.Clean(proc)
}
if ctr.bat != nil {
ctr.bat.Clean(proc)
}
if ctr.data != nil {
proc.Free(ctr.data)
ctr.data = nil
ctr.sels = nil
}
register.FreeRegisters(proc)
}
package mergetop
import (
"fmt"
"matrixbase/pkg/compare"
"matrixbase/pkg/container/batch"
)
// Direction for ordering results.
type Direction int8
// Direction values.
const (
DefaultDirection Direction = iota
Ascending
Descending
)
type Container struct {
n int // number of attributes involved in sorting
data []byte
sels []int64
attrs []string
bat *batch.Batch
cmps []compare.Compare
}
type Field struct {
Attr string
Type Direction
}
type Argument struct {
Limit int64
Fs []Field
Ctr Container
}
var directionName = [...]string{
DefaultDirection: "",
Ascending: "ASC",
Descending: "DESC",
}
func (n Field) String() string {
s := n.Attr
if n.Type != DefaultDirection {
s += " " + n.Type.String()
}
return s
}
func (i Direction) String() string {
if i < 0 || i > Direction(len(directionName)-1) {
return fmt.Sprintf("Direction(%d)", i)
}
return directionName[i]
}
func (ctr *Container) compare(vi, vj int, i, j int64) int {
for k := 0; k < ctr.n; k++ {
if r := ctr.cmps[k].Compare(vi, vj, i, j); r != 0 {
return r
}
}
return 0
}
// maximum heap
func (ctr *Container) Len() int {
return len(ctr.sels)
}
func (ctr *Container) Less(i, j int) bool {
return ctr.compare(0, 0, ctr.sels[i], ctr.sels[j]) > 0
}
func (ctr *Container) Swap(i, j int) {
ctr.sels[i], ctr.sels[j] = ctr.sels[j], ctr.sels[i]
}
func (ctr *Container) Push(x interface{}) {
ctr.sels = append(ctr.sels, x.(int64))
}
func (ctr *Container) Pop() interface{} {
n := len(ctr.sels) - 1
x := ctr.sels[n]
ctr.sels = ctr.sels[:n]
return x
}
package top
import (
"bytes"
"container/heap"
"fmt"
"matrixbase/pkg/compare"
"matrixbase/pkg/container/batch"
"matrixbase/pkg/container/vector"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
func String(arg interface{}, buf *bytes.Buffer) {
n := arg.(*Argument)
buf.WriteString("τ([")
for i, f := range n.Fs {
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString(f.String())
}
buf.WriteString(fmt.Sprintf("], %v)", n.Limit))
}
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(*Argument)
ctr := &n.Ctr
{
n.Attrs = make([]string, len(n.Fs))
ctr.attrs = make([]string, len(n.Fs))
for i, f := range n.Fs {
n.Attrs[i] = f.Attr
ctr.attrs[i] = f.Attr
}
}
{
......@@ -28,15 +42,11 @@ func Prepare(proc *process.Process, arg interface{}) error {
for i := int64(0); i < n.Limit; i++ {
sels[i] = i
}
n.Ctr.sels = sels
n.Ctr.selsData = data
n.Ctr.data = data
n.Ctr.sels = sels[:n.Limit]
}
n.Ctr.n = len(n.Fs)
n.Ctr.vecs = make([]*vector.Vector, len(n.Fs))
n.Ctr.cmps = make([]compare.Compare, len(n.Fs))
for i, f := range n.Fs {
n.Ctr.cmps[i] = compare.New(f.Oid, f.Type == Descending)
}
return nil
}
......@@ -46,76 +56,90 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
if proc.Reg.Ax == nil {
return false, nil
}
n := arg.(Argument)
n := arg.(*Argument)
ctr := &n.Ctr
bat := proc.Reg.Ax.(*batch.Batch)
if err = bat.Prefetch(n.Attrs, n.Ctr.vecs, proc); err != nil {
clean(&n.Ctr, bat, proc)
bat.Reorder(ctr.attrs)
{
for i, f := range n.Fs {
n.Ctr.cmps[i] = compare.New(bat.Vecs[i].Typ.Oid, f.Type == Descending)
}
}
if err = bat.Prefetch(ctr.attrs, bat.Vecs, proc); err != nil {
ctr.clean(bat, proc)
return false, err
}
processBatch(n, bat)
data, err := proc.Alloc(int64(len(n.Ctr.sels)) * 8)
ctr.processBatch(n.Limit, bat)
data, err := proc.Alloc(int64(len(ctr.sels) * 8))
if err != nil {
clean(&n.Ctr, bat, proc)
ctr.clean(bat, proc)
return false, err
}
sels := encoding.DecodeInt64Slice(data[mempool.CountSize:])
for i, j := 0, len(n.Ctr.sels); i < j; i++ {
sels[len(sels)-1-i] = heap.Pop(&n.Ctr).(int64)
sels = sels[:len(ctr.sels)]
for i, j := 0, len(ctr.sels); i < j; i++ {
sels[len(sels)-1-i] = heap.Pop(ctr).(int64)
}
if len(bat.Sels) > 0 {
proc.Free(bat.SelsData)
}
bat.Sels = sels
bat.SelsData = data
bat.Reduce(n.Attrs, proc)
proc.Reg.Ax = bat
ctr.clean(nil, proc)
register.FreeRegisters(proc)
return false, nil
}
func processBatch(n Argument, bat *batch.Batch) {
func (ctr *Container) processBatch(limit int64, bat *batch.Batch) {
for i, cmp := range ctr.cmps {
cmp.Set(0, bat.Vecs[i])
cmp.Set(1, bat.Vecs[i])
}
if length := int64(len(bat.Sels)); length > 0 {
if length < n.Limit {
if length < limit {
for i := int64(0); i < length; i++ {
n.Ctr.sels[i] = bat.Sels[i]
ctr.sels[i] = bat.Sels[i]
}
n.Ctr.sels = n.Ctr.sels[:length]
heap.Init(&n.Ctr)
ctr.sels = ctr.sels[:length]
heap.Init(ctr)
return
}
for i := int64(0); i < n.Limit; i++ {
n.Ctr.sels[i] = bat.Sels[i]
for i := int64(0); i < limit; i++ {
ctr.sels[i] = bat.Sels[i]
}
heap.Init(&n.Ctr)
for i, j := n.Limit, length; i < j; i++ {
if n.Ctr.compare(bat.Sels[i], n.Ctr.sels[0]) < 0 {
n.Ctr.sels[0] = bat.Sels[i]
heap.Init(ctr)
for i, j := limit, length; i < j; i++ {
if ctr.compare(bat.Sels[i], ctr.sels[0]) < 0 {
ctr.sels[0] = bat.Sels[i]
}
heap.Fix(&n.Ctr, 0)
heap.Fix(ctr, 0)
}
return
}
length := int64(n.Ctr.vecs[0].Length())
if length < n.Limit {
n.Ctr.sels = n.Ctr.sels[:length]
heap.Init(&n.Ctr)
length := int64(bat.Vecs[0].Length())
if length < limit {
ctr.sels = ctr.sels[:length]
heap.Init(ctr)
return
}
heap.Init(&n.Ctr)
for i, j := n.Limit, length; i < j; i++ {
if n.Ctr.compare(i, n.Ctr.sels[0]) < 0 {
n.Ctr.sels[0] = i
heap.Init(ctr)
for i, j := limit, length; i < j; i++ {
if ctr.compare(i, ctr.sels[0]) < 0 {
ctr.sels[0] = i
}
heap.Fix(&n.Ctr, 0)
heap.Fix(ctr, 0)
}
}
func clean(ctr *Container, bat *batch.Batch, proc *process.Process) {
if ctr.selsData != nil {
proc.Free(ctr.selsData)
func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
if bat != nil {
bat.Clean(proc)
}
if ctr.data != nil {
proc.Free(ctr.data)
ctr.data = nil
ctr.sels = nil
ctr.selsData = nil
}
bat.Clean(proc)
register.FreeRegisters(proc)
}
package top
import (
"fmt"
"matrixbase/pkg/compare"
"matrixbase/pkg/container/types"
"matrixbase/pkg/container/vector"
)
// Direction for ordering results.
......@@ -17,15 +16,14 @@ const (
)
type Container struct {
n int // number of attributes involved in sorting
sels []int64
selsData []byte
vecs []*vector.Vector
cmps []compare.Compare
n int // number of attributes involved in sorting
data []byte
sels []int64
attrs []string
cmps []compare.Compare
}
type Field struct {
Oid types.T
Attr string
Type Direction
}
......@@ -33,10 +31,30 @@ type Field struct {
type Argument struct {
Limit int64
Fs []Field
Attrs []string
Ctr Container
}
var directionName = [...]string{
DefaultDirection: "",
Ascending: "ASC",
Descending: "DESC",
}
func (n Field) String() string {
s := n.Attr
if n.Type != DefaultDirection {
s += " " + n.Type.String()
}
return s
}
func (i Direction) String() string {
if i < 0 || i > Direction(len(directionName)-1) {
return fmt.Sprintf("Direction(%d)", i)
}
return directionName[i]
}
func (ctr *Container) compare(i, j int64) int {
for k := 0; k < ctr.n; k++ {
if r := ctr.cmps[k].Compare(0, 0, i, j); r != 0 {
......
package unittest
import (
"fmt"
"matrixbase/pkg/sql/colexec/mergetop"
"matrixbase/pkg/sql/colexec/top"
"matrixbase/pkg/sql/colexec/transfer"
"matrixbase/pkg/vm"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/mmu/guest"
"matrixbase/pkg/vm/mmu/host"
"matrixbase/pkg/vm/pipeline"
"matrixbase/pkg/vm/process"
"sync"
"testing"
)
func TestTop(t *testing.T) {
var wg sync.WaitGroup
var ins vm.Instructions
hm := host.New(1 << 20)
gm := guest.New(1<<20, hm)
proc := process.New(gm, mempool.New(1<<32, 8))
{
proc.Refer = make(map[string]uint64)
proc.Reg.Ws = make([]*process.WaitRegister, 2)
for i := 0; i < 2; i++ {
proc.Reg.Ws[i] = &process.WaitRegister{
Wg: new(sync.WaitGroup),
Ch: make(chan interface{}),
}
}
}
{
var rins vm.Instructions
rproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
{
rproc.Refer = make(map[string]uint64)
}
{
var fs []top.Field
fs = append(fs, top.Field{"orderId", top.Descending})
rins = append(rins, vm.Instruction{vm.Top, &top.Argument{Limit: 3, Fs: fs}})
}
rins = append(rins, vm.Instruction{vm.Transfer, &transfer.Argument{Mmu: gm, Reg: proc.Reg.Ws[0]}})
rp := pipeline.New([]uint64{1, 1, 1}, []string{"orderId", "uid", "price"}, rins)
wg.Add(1)
go func() {
fmt.Printf("S[segment 0]: %s\n", rp)
rp.Run(segments("R", rproc)[:1], rproc)
fmt.Printf("S[segment 0] - guest: %v, host: %v\n", rproc.Size(), rproc.HostSize())
wg.Done()
}()
}
{
var sins vm.Instructions
sproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
{
sproc.Refer = make(map[string]uint64)
}
{
var fs []top.Field
fs = append(fs, top.Field{"orderId", top.Descending})
sins = append(sins, vm.Instruction{vm.Top, &top.Argument{Limit: 3, Fs: fs}})
}
sins = append(sins, vm.Instruction{vm.Transfer, &transfer.Argument{Mmu: gm, Reg: proc.Reg.Ws[1]}})
sp := pipeline.New([]uint64{1, 1, 1}, []string{"uid", "price", "orderId"}, sins)
wg.Add(1)
go func() {
fmt.Printf("S[segment 1]: %s\n", sp)
sp.Run(segments("R", sproc)[1:2], sproc)
fmt.Printf("S[segment 1] - guest: %v, host: %v\n", sproc.Size(), sproc.HostSize())
wg.Done()
}()
}
{
var fs []mergetop.Field
fs = append(fs, mergetop.Field{"orderId", mergetop.Descending})
ins = append(ins, vm.Instruction{vm.MergeTop, &mergetop.Argument{Limit: 3, Fs: fs}})
}
ins = append(ins, vm.Instruction{vm.Output, nil})
p := pipeline.NewMerge(ins)
fmt.Printf("%s\n", p)
p.RunMerge(proc)
fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
wg.Wait()
fmt.Printf("************\n")
}
......@@ -22,6 +22,7 @@ const (
SetNaturalJoin
SetSemiDifference // unsuitable name is anti join
Output
MergeTop
MergeDedup
MergeSummarize
)
......
......@@ -7,10 +7,12 @@ import (
"matrixbase/pkg/sql/colexec/hashset/natural"
"matrixbase/pkg/sql/colexec/limit"
"matrixbase/pkg/sql/colexec/mergededup"
"matrixbase/pkg/sql/colexec/mergetop"
"matrixbase/pkg/sql/colexec/offset"
"matrixbase/pkg/sql/colexec/output"
"matrixbase/pkg/sql/colexec/projection"
"matrixbase/pkg/sql/colexec/restrict"
"matrixbase/pkg/sql/colexec/top"
"matrixbase/pkg/sql/colexec/transfer"
"matrixbase/pkg/vm/process"
)
......@@ -22,6 +24,7 @@ func String(ins Instructions, buf *bytes.Buffer) {
}
switch in.Op {
case Top:
top.String(in.Arg, buf)
case Dedup:
dedup.String(in.Arg, buf)
case Limit:
......@@ -45,6 +48,8 @@ func String(ins Instructions, buf *bytes.Buffer) {
natural.String(in.Arg, buf)
case Output:
output.String(in.Arg, buf)
case MergeTop:
mergetop.String(in.Arg, buf)
case MergeDedup:
mergededup.String(in.Arg, buf)
}
......@@ -76,6 +81,9 @@ func Prepare(ins Instructions, proc *process.Process) error {
for _, in := range ins {
switch in.Op {
case Top:
if err := top.Prepare(proc, in.Arg); err != nil {
return err
}
case Dedup:
if err := dedup.Prepare(proc, in.Arg); err != nil {
return err
......@@ -117,6 +125,10 @@ func Prepare(ins Instructions, proc *process.Process) error {
if err := output.Prepare(proc, in.Arg); err != nil {
return err
}
case MergeTop:
if err := mergetop.Prepare(proc, in.Arg); err != nil {
return err
}
case MergeDedup:
if err := mergededup.Prepare(proc, in.Arg); err != nil {
return err
......@@ -134,6 +146,7 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
for _, in := range ins {
switch in.Op {
case Top:
ok, err = top.Call(proc, in.Arg)
case Dedup:
ok, err = dedup.Call(proc, in.Arg)
case Limit:
......@@ -157,6 +170,8 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
ok, err = natural.Call(proc, in.Arg)
case Output:
ok, err = output.Call(proc, in.Arg)
case MergeTop:
ok, err = mergetop.Call(proc, in.Arg)
case MergeDedup:
ok, err = mergededup.Call(proc, in.Arg)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment