Skip to content
Snippets Groups Projects
Commit c09c3f9b authored by nnsgmsone's avatar nnsgmsone
Browse files

Add offset module

parent 5aa5a603
No related branches found
No related tags found
No related merge requests found
Showing
with 392 additions and 194 deletions
File added
......@@ -145,6 +145,9 @@ func (bat *Batch) Reduce(attrs []string, proc *process.Process) {
func (bat *Batch) String() string {
var buf bytes.Buffer
if len(bat.Sels) > 0 {
fmt.Printf("%v\n", bat.Sels)
}
for i, attr := range bat.Attrs {
buf.WriteString(fmt.Sprintf("%s\n", attr))
buf.WriteString(fmt.Sprintf("\t%s\n", bat.Vecs[i]))
......
......@@ -4,8 +4,6 @@ import (
"fmt"
"matrixbase/pkg/container/types"
"matrixbase/pkg/container/vector"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vectorize/neg"
"matrixbase/pkg/vm/process"
)
......@@ -25,167 +23,169 @@ func unaryCheck(op int, arg types.T, val types.T) bool {
}
var UnaryOps = map[int][]*UnaryOp{
UnaryMinus: {
&UnaryOp{
Typ: types.T_int8,
ReturnType: types.T_int8,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int8)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I8Neg(vs, vs))
return v, nil
/*
UnaryMinus: {
&UnaryOp{
Typ: types.T_int8,
ReturnType: types.T_int8,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int8)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I8Neg(vs, vs))
return v, nil
}
}
}
data, err := p.Alloc(int64(len(vs)))
if err != nil {
return nil, err
}
rs := encoding.DecodeInt8Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I8Neg(vs, rs))
return vec, nil
data, err := p.Alloc(int64(len(vs)))
if err != nil {
return nil, err
}
rs := encoding.DecodeInt8Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I8Neg(vs, rs))
return vec, nil
},
},
},
&UnaryOp{
Typ: types.T_int16,
ReturnType: types.T_int16,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int16)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I16Neg(vs, vs))
return v, nil
&UnaryOp{
Typ: types.T_int16,
ReturnType: types.T_int16,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int16)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I16Neg(vs, vs))
return v, nil
}
}
}
data, err := p.Alloc(int64(len(vs)) * 2)
if err != nil {
return nil, err
}
rs := encoding.DecodeInt16Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I16Neg(vs, rs))
return vec, nil
data, err := p.Alloc(int64(len(vs)) * 2)
if err != nil {
return nil, err
}
rs := encoding.DecodeInt16Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I16Neg(vs, rs))
return vec, nil
},
},
},
&UnaryOp{
Typ: types.T_int32,
ReturnType: types.T_int32,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int32)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I32Neg(vs, vs))
return v, nil
&UnaryOp{
Typ: types.T_int32,
ReturnType: types.T_int32,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int32)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I32Neg(vs, vs))
return v, nil
}
}
}
data, err := p.Alloc(int64(len(vs)) * 4)
if err != nil {
return nil, err
}
rs := encoding.DecodeInt32Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I32Neg(vs, rs))
return vec, nil
data, err := p.Alloc(int64(len(vs)) * 4)
if err != nil {
return nil, err
}
rs := encoding.DecodeInt32Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I32Neg(vs, rs))
return vec, nil
},
},
},
&UnaryOp{
Typ: types.T_int64,
ReturnType: types.T_int64,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int64)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I64Neg(vs, vs))
return v, nil
&UnaryOp{
Typ: types.T_int64,
ReturnType: types.T_int64,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]int64)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.I64Neg(vs, vs))
return v, nil
}
}
data, err := p.Alloc(int64(len(vs)) * 8)
if err != nil {
return nil, err
}
}
data, err := p.Alloc(int64(len(vs)) * 8)
if err != nil {
return nil, err
}
rs := encoding.DecodeInt64Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I64Neg(vs, rs))
return vec, nil
rs := encoding.DecodeInt64Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.I64Neg(vs, rs))
return vec, nil
},
},
},
&UnaryOp{
Typ: types.T_float32,
ReturnType: types.T_float32,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]float32)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.F32Neg(vs, vs))
return v, nil
&UnaryOp{
Typ: types.T_float32,
ReturnType: types.T_float32,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]float32)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.F32Neg(vs, vs))
return v, nil
}
}
data, err := p.Alloc(int64(len(vs)) * 4)
if err != nil {
return nil, err
}
}
data, err := p.Alloc(int64(len(vs)) * 4)
if err != nil {
return nil, err
}
rs := encoding.DecodeFloat32Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.F32Neg(vs, rs))
return vec, nil
rs := encoding.DecodeFloat32Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.F32Neg(vs, rs))
return vec, nil
},
},
},
&UnaryOp{
Typ: types.T_float64,
ReturnType: types.T_float64,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]float64)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.F64Neg(vs, vs))
return v, nil
&UnaryOp{
Typ: types.T_float64,
ReturnType: types.T_float64,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]float64)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.F64Neg(vs, vs))
return v, nil
}
}
}
data, err := p.Alloc(int64(len(vs)) * 8)
if err != nil {
return nil, err
}
rs := encoding.DecodeFloat64Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.F64Neg(vs, rs))
return vec, nil
data, err := p.Alloc(int64(len(vs)) * 8)
if err != nil {
return nil, err
}
rs := encoding.DecodeFloat64Slice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.F64Neg(vs, rs))
return vec, nil
},
},
},
&UnaryOp{
Typ: types.T_decimal,
ReturnType: types.T_decimal,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]types.Decimal)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.DecimalNeg(vs, vs))
return v, nil
&UnaryOp{
Typ: types.T_decimal,
ReturnType: types.T_decimal,
Fn: func(v *vector.Vector, p *process.Process, _ bool) (*vector.Vector, error) {
vs := v.Col.([]types.Decimal)
if v.Data != nil {
if cnt := encoding.DecodeUint64(v.Data[:8]); cnt == 1 {
v.SetCol(neg.DecimalNeg(vs, vs))
return v, nil
}
}
data, err := p.Alloc(int64(len(vs) * encoding.DecimalSize))
if err != nil {
return nil, err
}
}
data, err := p.Alloc(int64(len(vs) * encoding.DecimalSize))
if err != nil {
return nil, err
}
rs := encoding.DecodeDecimalSlice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.DecimalNeg(vs, rs))
return vec, nil
rs := encoding.DecodeDecimalSlice(data)
vec := vector.New(v.Typ)
vec.Col = rs
vec.Nsp = v.Nsp
vec.SetCol(neg.DecimalNeg(vs, rs))
return vec, nil
},
},
},
},
*/
}
......@@ -9,6 +9,7 @@ import (
"matrixbase/pkg/hash"
"matrixbase/pkg/intmap/fastmap"
"matrixbase/pkg/sql/colexec/aggregation"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
......@@ -22,7 +23,7 @@ func init() {
}
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(Argument)
n := arg.(*Argument)
n.Attrs = make([]string, len(n.Es))
for i, e := range n.Es {
n.Attrs[i] = e.Name
......@@ -44,7 +45,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(Argument)
n := arg.(*Argument)
bat := proc.Reg.Ax.(*batch.Batch)
ctr := &n.Ctr
gvecs := make([]*vector.Vector, len(n.Gs))
......@@ -95,7 +96,7 @@ func (ctr *Container) eval(length int, es []aggregation.Extend, vecs, rvecs []*v
return err
}
vec := vector.New(types.Type{types.T_int8, 1, 1, 0})
vs := encoding.DecodeInt8Slice(data)
vs := encoding.DecodeInt8Slice(data[mempool.CountSize:])
for _, gs := range ctr.groups {
for _, g := range gs {
e.Agg.Reset()
......@@ -116,6 +117,7 @@ func (ctr *Container) eval(length int, es []aggregation.Extend, vecs, rvecs []*v
}
}
rvecs[i] = vec
copy(vec.Data, encoding.EncodeUint64(1+proc.Refer[e.Alias]))
default:
return fmt.Errorf("unsupport type %s", typ)
}
......
......@@ -13,7 +13,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
// R ⨝ S, S is the small relation
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(Argument)
n := arg.(*Argument)
ctr := &n.Ctr
if err := ctr.build(n.Sattrs, n.Distinct, proc); err != nil {
return false, err
......
......@@ -3,6 +3,7 @@ package limit
import (
"matrixbase/pkg/container/batch"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
......@@ -12,9 +13,9 @@ func Prepare(_ *process.Process, _ interface{}) error {
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(Argument)
n := arg.(*Argument)
bat := proc.Reg.Ax.(*batch.Batch)
if length := len(bat.Sels); length > 0 {
if length := uint64(len(bat.Sels)); length > 0 {
newSeen := n.Seen + length
if newSeen >= n.Limit { // limit - seen
bat.Sels = bat.Sels[:n.Limit-n.Seen]
......@@ -32,7 +33,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
clean(bat, proc)
return false, err
}
newSeen := n.Seen + length
newSeen := n.Seen + uint64(length)
if newSeen >= n.Limit { // limit - seen
data, sels, err := newSels(int64(n.Limit-n.Seen), proc)
if err != nil {
......@@ -56,11 +57,11 @@ func newSels(count int64, proc *process.Process) ([]byte, []int64, error) {
if err != nil {
return nil, nil, err
}
sels := encoding.DecodeInt64Slice(data)
sels := encoding.DecodeInt64Slice(data[mempool.CountSize:])
for i := int64(0); i < count; i++ {
sels[i] = i
}
return data, sels, nil
return data, sels[:count], nil
}
func clean(bat *batch.Batch, proc *process.Process) {
......
package limit
type Argument struct {
Seen int // seen is the number of tuples seen so far
Limit int
Seen uint64 // seen is the number of tuples seen so far
Limit uint64
}
......@@ -8,7 +8,7 @@ import (
)
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(Argument)
n := arg.(*Argument)
n.Attrs = make([]string, len(n.Es))
for i, e := range n.Es {
n.Attrs[i] = e.Alias
......@@ -17,7 +17,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(Argument)
n := arg.(*Argument)
for i, c := range proc.Reg.Cs {
v := <-c
if v == nil {
......
package offset
import (
"matrixbase/pkg/container/batch"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
func Prepare(_ *process.Process, _ interface{}) error {
return nil
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(*Argument)
bat := proc.Reg.Ax.(*batch.Batch)
if n.Seen > n.Offset {
proc.Reg.Ax = bat
register.FreeRegisters(proc)
return false, nil
}
if length := uint64(len(bat.Sels)); length > 0 {
if n.Seen+length > n.Offset {
bat.Sels = bat.Sels[n.Offset-n.Seen:]
proc.Reg.Ax = bat
n.Seen += length
register.FreeRegisters(proc)
return false, nil
}
n.Seen += length
bat.Clean(proc)
proc.Reg.Ax = batch.New(nil)
register.FreeRegisters(proc)
return false, nil
}
length, err := bat.Length(proc)
if err != nil {
clean(bat, proc)
return false, err
}
if n.Seen+uint64(length) > n.Offset {
data, sels, err := newSels(int64(n.Offset-n.Seen), int64(length)-int64(n.Offset-n.Seen), proc)
if err != nil {
clean(bat, proc)
return true, err
}
n.Seen += uint64(length)
bat.Sels = sels
bat.SelsData = data
proc.Reg.Ax = bat
register.FreeRegisters(proc)
return false, nil
}
n.Seen += uint64(length)
bat.Clean(proc)
proc.Reg.Ax = batch.New(nil)
register.FreeRegisters(proc)
return false, nil
}
func newSels(start, count int64, proc *process.Process) ([]byte, []int64, error) {
data, err := proc.Alloc(count * 8)
if err != nil {
return nil, nil, err
}
sels := encoding.DecodeInt64Slice(data[mempool.CountSize:])
for i := int64(0); i < count; i++ {
sels[i] = start + i
}
return data, sels[:count], nil
}
func clean(bat *batch.Batch, proc *process.Process) {
bat.Clean(proc)
register.FreeRegisters(proc)
}
package offset
type Argument struct {
Seen uint64 // seen is the number of tuples seen so far
Offset uint64
}
......@@ -14,7 +14,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
func Call(proc *process.Process, arg interface{}) (bool, error) {
var err error
n := arg.(Argument)
n := arg.(*Argument)
rbat := batch.New(n.Attrs)
bat := proc.Reg.Ax.(*batch.Batch)
for i := range n.Attrs {
......
......@@ -7,13 +7,13 @@ import (
)
func Prepare(_ *process.Process, arg interface{}) error {
n := arg.(Argument)
n := arg.(*Argument)
n.Attrs = n.E.Attributes()
return nil
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(Argument)
n := arg.(*Argument)
bat := proc.Reg.Ax.(*batch.Batch)
vec, _, err := n.E.Eval(bat, proc)
if err != nil {
......
......@@ -8,7 +8,7 @@ import (
)
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(Argument)
n := arg.(*Argument)
n.Attrs = make([]string, len(n.Es))
for i, e := range n.Es {
n.Attrs[i] = e.Alias
......@@ -17,7 +17,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(Argument)
n := arg.(*Argument)
rbat := batch.New(n.Attrs)
bat := proc.Reg.Ax.(*batch.Batch)
for i, e := range n.Es {
......
......@@ -6,12 +6,13 @@ import (
"matrixbase/pkg/container/batch"
"matrixbase/pkg/container/vector"
"matrixbase/pkg/encoding"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(Argument)
n := arg.(*Argument)
{
n.Attrs = make([]string, len(n.Fs))
for i, f := range n.Fs {
......@@ -23,7 +24,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
if err != nil {
return err
}
sels := encoding.DecodeInt64Slice(data)
sels := encoding.DecodeInt64Slice(data[mempool.CountSize:])
for i := int64(0); i < n.Limit; i++ {
sels[i] = i
}
......@@ -54,7 +55,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
clean(&n.Ctr, bat, proc)
return false, err
}
sels := encoding.DecodeInt64Slice(data)
sels := encoding.DecodeInt64Slice(data[mempool.CountSize:])
for i, j := 0, len(n.Ctr.sels); i < j; i++ {
sels[len(sels)-1-i] = heap.Pop(&n.Ctr).(int64)
}
......
package unittest
import (
"fmt"
"matrixbase/pkg/container/types"
"matrixbase/pkg/sql/colexec/extend"
"matrixbase/pkg/sql/colexec/limit"
"matrixbase/pkg/sql/colexec/projection"
"matrixbase/pkg/vm"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/mmu/guest"
"matrixbase/pkg/vm/mmu/host"
"matrixbase/pkg/vm/pipeline"
"matrixbase/pkg/vm/process"
"testing"
)
func TestLimit(t *testing.T) {
var ins vm.Instructions
proc := process.New(guest.New(1<<20, host.New(1<<20)), mempool.New(1<<32, 8))
{
proc.Refer = make(map[string]uint64)
}
{
var es []extend.Extend
{
es = append(es, &extend.Attribute{"uid", types.T_varchar})
}
ins = append(ins, vm.Instruction{vm.Projection, projection.Argument{[]string{"uid"}, es}})
{
proc.Refer["uid"] = 1
}
}
{
ins = append(ins, vm.Instruction{vm.Limit, &limit.Argument{Limit: 1}})
ins = append(ins, vm.Instruction{vm.Output, nil})
}
fmt.Printf("limit\n")
p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
p.Run(segments(proc), proc)
fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
}
package unittest
import (
"fmt"
"matrixbase/pkg/container/types"
"matrixbase/pkg/sql/colexec/extend"
"matrixbase/pkg/sql/colexec/offset"
"matrixbase/pkg/sql/colexec/projection"
"matrixbase/pkg/vm"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/mmu/guest"
"matrixbase/pkg/vm/mmu/host"
"matrixbase/pkg/vm/pipeline"
"matrixbase/pkg/vm/process"
"testing"
)
func TestOffset(t *testing.T) {
var ins vm.Instructions
proc := process.New(guest.New(1<<20, host.New(1<<20)), mempool.New(1<<32, 8))
{
proc.Refer = make(map[string]uint64)
}
{
var es []extend.Extend
{
es = append(es, &extend.Attribute{"uid", types.T_varchar})
}
ins = append(ins, vm.Instruction{vm.Projection, projection.Argument{[]string{"uid"}, es}})
{
proc.Refer["uid"] = 1
}
}
{
ins = append(ins, vm.Instruction{vm.Offset, &offset.Argument{Offset: 19}})
ins = append(ins, vm.Instruction{vm.Output, nil})
}
fmt.Printf("offset\n")
p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
p.Run(segments(proc), proc)
fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
}
......@@ -2,13 +2,10 @@ package unittest
import (
"fmt"
"log"
"matrixbase/pkg/container/types"
"matrixbase/pkg/sql/colexec/extend"
"matrixbase/pkg/sql/colexec/projection"
"matrixbase/pkg/vm"
"matrixbase/pkg/vm/engine"
"matrixbase/pkg/vm/engine/memEngine"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/mmu/guest"
"matrixbase/pkg/vm/mmu/host"
......@@ -36,21 +33,8 @@ func TestProjection(t *testing.T) {
proc.Refer["uid"] = 1
}
}
fmt.Printf("projection\n")
p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
p.Run(segments(proc), proc)
fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
}
func segments(proc *process.Process) []engine.Segment {
e := memEngine.NewTestEngine()
r, err := e.Relation("test")
if err != nil {
log.Fatal(err)
}
ids := r.Segments()
segs := make([]engine.Segment, len(ids))
for i, id := range ids {
segs[i] = r.Segment(id, proc)
}
return segs
}
package unittest
import (
"log"
"matrixbase/pkg/vm/engine"
"matrixbase/pkg/vm/engine/memEngine"
"matrixbase/pkg/vm/process"
)
func segments(proc *process.Process) []engine.Segment {
e := memEngine.NewTestEngine()
r, err := e.Relation("test")
if err != nil {
log.Fatal(err)
}
ids := r.Segments()
segs := make([]engine.Segment, len(ids))
for i, id := range ids {
segs[i] = r.Segment(id, proc)
}
return segs
}
......@@ -6,6 +6,7 @@ const (
Limit
Group
Order
Offset
Transfer
Restrict
Summarize
......
package vm
import (
"matrixbase/pkg/sql/colexec/limit"
"matrixbase/pkg/sql/colexec/offset"
"matrixbase/pkg/sql/colexec/output"
"matrixbase/pkg/sql/colexec/projection"
"matrixbase/pkg/sql/colexec/restrict"
......@@ -13,8 +15,15 @@ func Prepare(ins Instructions, proc *process.Process) error {
case Nub:
case Top:
case Limit:
if err := limit.Prepare(proc, in.Arg); err != nil {
return err
}
case Group:
case Order:
case Offset:
if err := offset.Prepare(proc, in.Arg); err != nil {
return err
}
case Transfer:
case Restrict:
if err := restrict.Prepare(proc, in.Arg); err != nil {
......@@ -41,6 +50,7 @@ func Prepare(ins Instructions, proc *process.Process) error {
}
func Run(ins Instructions, proc *process.Process) (bool, error) {
var ok bool
var end bool
var err error
......@@ -49,14 +59,17 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
case Nub:
case Top:
case Limit:
ok, err = limit.Call(proc, in.Arg)
case Group:
case Order:
case Offset:
ok, err = offset.Call(proc, in.Arg)
case Transfer:
case Restrict:
end, err = restrict.Call(proc, in.Arg)
ok, err = restrict.Call(proc, in.Arg)
case Summarize:
case Projection:
end, err = projection.Call(proc, in.Arg)
ok, err = projection.Call(proc, in.Arg)
case SetUnion:
case SetIntersect:
case SetDifference:
......@@ -68,14 +81,14 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
case InnerJoin:
case NaturalJoin:
case Output:
end, err = output.Call(proc, in.Arg)
ok, err = output.Call(proc, in.Arg)
}
if err != nil {
return false, err
}
if end {
return true, nil
if ok {
end = true
}
}
return false, nil
return end, nil
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment