Skip to content
Snippets Groups Projects
Commit 7fe9a7d3 authored by nnsgmsone's avatar nnsgmsone Committed by jinhai
Browse files

Add set natural join (#9)

parent 59f973cf
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,8 @@ Please mark all change in change log and use the issue from GitHub
## Feature
* Add set natural join
## Task
- [#4 More documents are needed in Github REPO](https://github.com/matrixorigin/matrixbase/issues/4)
......
......@@ -6,11 +6,12 @@ require (
github.com/aws/aws-sdk-go v1.37.14
github.com/klauspost/compress v1.11.7
github.com/frankban/quicktest v1.11.3 // indirect
github.com/mmcloughlin/avo v0.0.0-20210120082657-d60cc025fc3c // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/pierrec/lz4 v2.6.0+incompatible
github.com/pilosa/pilosa v1.4.0
github.com/traetox/goaio v0.0.0-20171005222435-46641abceb17
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
go.uber.org/zap v1.15.0
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
golang.org/x/text v0.3.3
golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 // indirect
)
......@@ -25,7 +25,9 @@ func (bat *Batch) Reorder(attrs []string) {
for i, name := range attrs {
for j, attr := range bat.Attrs {
if name == attr {
bat.Is[i], bat.Is[j] = bat.Is[j], bat.Is[i]
if len(bat.Is) > j {
bat.Is[i], bat.Is[j] = bat.Is[j], bat.Is[i]
}
bat.Vecs[i], bat.Vecs[j] = bat.Vecs[j], bat.Vecs[i]
bat.Attrs[i], bat.Attrs[j] = bat.Attrs[j], bat.Attrs[i]
}
......
......@@ -29,7 +29,14 @@ type Vector interface {
}
*/
/*
* origin true:
* count || type || bitmap size || bitmap || vector
* origin false:
* count || vector
*/
type Vector struct {
Or bool // true: origin
Data []byte // raw data
Typ types.Type
Col interface{}
......
......@@ -2,6 +2,7 @@ package vector
import (
"bytes"
"errors"
"fmt"
"matrixbase/pkg/container/nulls"
"matrixbase/pkg/container/types"
......@@ -294,6 +295,295 @@ func (v *Vector) Shuffle(sels []int64) *Vector {
}
func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error {
if v.Or {
return errors.New("unionone operation cannot be performed for origin vector")
}
switch v.Typ.Oid {
case types.T_int8:
vs := w.Col.([]int8)
col := v.Col.([]int8)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < len(vs)+1 {
data, err := proc.Alloc(int64(len(vs) + 1))
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeInt8Slice(data[mempool.CountSize : mempool.CountSize+len(col)])
v.Data = data
col = v.Col.([]int8)
}
}
v.Col = append(col, vs[sel])
case types.T_int16:
vs := w.Col.([]int16)
col := v.Col.([]int16)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*2 {
data, err := proc.Alloc(int64(len(vs)+1) * 2)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeInt16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*2])
v.Data = data
col = v.Col.([]int16)
}
}
v.Col = append(col, vs[sel])
case types.T_int32:
vs := w.Col.([]int32)
col := v.Col.([]int32)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*4 {
data, err := proc.Alloc(int64(len(vs)+1) * 4)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeInt32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4])
v.Data = data
col = v.Col.([]int32)
}
}
v.Col = append(col, vs[sel])
case types.T_int64:
vs := w.Col.([]int64)
col := v.Col.([]int64)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*8 {
data, err := proc.Alloc(int64(len(vs)+1) * 8)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeInt64Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8])
v.Data = data
col = v.Col.([]int64)
}
}
v.Col = append(col, vs[sel])
case types.T_uint8:
vs := w.Col.([]uint8)
col := v.Col.([]uint8)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1) {
data, err := proc.Alloc(int64(len(vs) + 1))
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeUint8Slice(data[mempool.CountSize : mempool.CountSize+len(col)])
v.Data = data
col = v.Col.([]uint8)
}
}
v.Col = append(col, vs[sel])
case types.T_uint16:
vs := w.Col.([]uint16)
col := v.Col.([]uint16)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*2 {
data, err := proc.Alloc(int64(len(vs)+1) * 2)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeUint16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*2])
v.Data = data
col = v.Col.([]uint16)
}
}
v.Col = append(col, vs[sel])
case types.T_uint32:
vs := w.Col.([]uint32)
col := v.Col.([]uint32)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*4 {
data, err := proc.Alloc(int64(len(vs)+1) * 4)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeUint32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4])
v.Data = data
col = v.Col.([]uint32)
}
}
v.Col = append(col, vs[sel])
case types.T_uint64:
vs := w.Col.([]uint64)
col := v.Col.([]uint64)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*8 {
data, err := proc.Alloc(int64(len(vs)+1) * 8)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeInt16Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8])
v.Data = data
col = v.Col.([]uint64)
}
}
v.Col = append(col, vs[sel])
case types.T_decimal:
vs := w.Col.([]types.Decimal)
col := v.Col.([]types.Decimal)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*encoding.DecimalSize {
data, err := proc.Alloc(int64((len(vs) + 1) * encoding.DecimalSize))
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeDecimalSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DecimalSize])
v.Data = data
col = v.Col.([]types.Decimal)
}
}
v.Col = append(col, vs[sel])
case types.T_float32:
vs := w.Col.([]float32)
col := v.Col.([]float32)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*4 {
data, err := proc.Alloc(int64(len(vs)+1) * 4)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeFloat32Slice(data[mempool.CountSize : mempool.CountSize+len(col)*4])
v.Data = data
col = v.Col.([]float32)
}
}
v.Col = append(col, vs[sel])
case types.T_float64:
vs := w.Col.([]float64)
col := v.Col.([]float64)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*8 {
data, err := proc.Alloc(int64(len(vs)+1) * 8)
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeFloat64Slice(data[mempool.CountSize : mempool.CountSize+len(col)*8])
v.Data = data
col = v.Col.([]float64)
}
}
v.Col = append(col, vs[sel])
case types.T_date:
vs := w.Col.([]types.Date)
col := v.Col.([]types.Date)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*encoding.DateSize {
data, err := proc.Alloc(int64((len(vs) + 1) * encoding.DateSize))
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeDateSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DateSize])
v.Data = data
col = v.Col.([]types.Date)
}
}
v.Col = append(col, vs[sel])
case types.T_datetime:
vs := w.Col.([]types.Datetime)
col := v.Col.([]types.Datetime)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < (len(vs)+1)*encoding.DatetimeSize {
data, err := proc.Alloc(int64((len(vs) + 1) * encoding.DatetimeSize))
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
v.Col = encoding.DecodeDecimalSlice(data[mempool.CountSize : mempool.CountSize+len(col)*encoding.DatetimeSize])
v.Data = data
col = v.Col.([]types.Datetime)
}
}
v.Col = append(col, vs[sel])
case types.T_tuple:
vs, ws := v.Col.([][]interface{}), w.Col.([][]interface{})
vs = append(vs, ws[sel])
v.Col = vs
case types.T_char, types.T_varchar, types.T_json:
vs := w.Col.(*types.Bytes)
from := vs.Data[vs.Offsets[sel] : vs.Offsets[sel]+vs.Lengths[sel]]
col := v.Col.(*types.Bytes)
{
if v.Data == nil || cap(v.Data[mempool.CountSize:]) < len(col.Data)+len(from) {
data, err := proc.Alloc(int64((len(col.Data) + len(from))))
if err != nil {
return err
}
if v.Data != nil {
copy(data[mempool.CountSize:], v.Data[mempool.CountSize:])
proc.Free(v.Data)
}
data = data[:mempool.CountSize+len(col.Data)]
v.Data = data
col.Data = data[mempool.CountSize:]
}
}
col.Lengths = append(col.Lengths, uint32(len(from)))
{
n := len(col.Offsets)
if n > 0 {
col.Offsets = append(col.Offsets, col.Offsets[n-1]+col.Lengths[n-1])
} else {
col.Offsets = append(col.Offsets, 0)
}
}
col.Data = append(col.Data, from...)
}
if w.Nsp.Any() && w.Nsp.Contains(uint64(sel)) {
v.Nsp.Add(uint64(v.Length()))
}
return nil
}
......@@ -498,6 +788,8 @@ func (v *Vector) Read(data []byte) error {
data = data[mempool.CountSize:]
typ := encoding.DecodeType(data[:encoding.TypeSize])
data = data[encoding.TypeSize:]
v.Typ = typ
v.Or = true
switch typ.Oid {
case types.T_int8:
size := encoding.DecodeUint32(data)
......
......@@ -31,7 +31,10 @@ func (a *count) Fill(sels []int64, vec *vector.Vector) error {
}
func (a *count) Eval(proc *process.Process) (*vector.Vector, error) {
data := proc.Mp.Alloc(8)
data, err := proc.Alloc(8)
if err != nil {
return nil, err
}
vec := vector.New(types.Type{types.T_int64, 8, 8, 0})
copy(data[mempool.CountSize:], encoding.EncodeInt64(a.cnt))
vec.Data = data
......
......@@ -31,7 +31,10 @@ func (a *count) Fill(sels []int64, vec *vector.Vector) error {
}
func (a *count) Eval(proc *process.Process) (*vector.Vector, error) {
data := proc.Mp.Alloc(8)
data, err := proc.Alloc(8)
if err != nil {
return nil, err
}
vec := vector.New(types.Type{types.T_int64, 8, 8, 0})
copy(data[mempool.CountSize:], encoding.EncodeInt64(a.cnt))
vec.Data = data
......
......@@ -89,8 +89,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
func (ctr *Container) eval(length int, es []aggregation.Extend, vecs, rvecs []*vector.Vector, proc *process.Process) error {
for i, e := range es {
typ := aggregation.ReturnType(e.Op, e.Typ)
switch typ {
switch typ := aggregation.ReturnType(e.Op, e.Typ); typ {
case types.T_int8:
data, err := proc.Alloc(int64(length))
if err != nil {
......
......@@ -20,7 +20,7 @@ func init() {
}
func String(arg interface{}, buf *bytes.Buffer) {
buf.WriteString("R ∩ S")
buf.WriteString("R ∩ S")
}
func Prepare(proc *process.Process, arg interface{}) error {
......
package natural
import (
"bytes"
"matrixbase/pkg/container/batch"
"matrixbase/pkg/container/vector"
"matrixbase/pkg/hash"
"matrixbase/pkg/intmap/fastmap"
"matrixbase/pkg/vm/process"
"matrixbase/pkg/vm/register"
)
func init() {
ZeroBools = make([]bool, UnitLimit)
OneUint64s = make([]uint64, UnitLimit)
for i := range OneUint64s {
OneUint64s[i] = 1
}
}
func String(arg interface{}, buf *bytes.Buffer) {
buf.WriteString("R ⨝ S")
}
func Prepare(proc *process.Process, arg interface{}) error {
n := arg.(*Argument)
n.Ctr = Container{
builded: false,
diffs: make([]bool, UnitLimit),
matchs: make([]int64, UnitLimit),
hashs: make([]uint64, UnitLimit),
sels: make([][]int64, UnitLimit),
groups: make(map[uint64][]*hash.SetGroup),
slots: fastmap.Pool.Get().(*fastmap.Map),
}
return nil
}
func Call(proc *process.Process, arg interface{}) (bool, error) {
n := arg.(*Argument)
ctr := &n.Ctr
if !ctr.builded {
if err := ctr.build(n.Attrs, proc); err != nil {
return false, err
}
ctr.builded = true
}
return ctr.probe(n.R, n.S, n.Attrs, proc)
}
// R ⨝ S - S is the smaller relation
func (ctr *Container) build(attrs []string, proc *process.Process) error {
var err error
reg := proc.Reg.Ws[1]
for {
v := <-reg.Ch
if v == nil {
break
}
bat := v.(*batch.Batch)
bat.Reorder(attrs)
if err = bat.Prefetch(attrs, bat.Vecs, proc); err != nil {
ctr.clean(bat, proc)
reg.Wg.Done()
return err
}
ctr.bats = append(ctr.bats, bat)
if len(bat.Sels) == 0 {
if err = ctr.buildBatch(bat.Vecs[:len(attrs)], proc); err != nil {
ctr.clean(bat, proc)
reg.Wg.Done()
return err
}
} else {
if err = ctr.buildBatchSels(bat.Sels, bat.Vecs[:len(attrs)], proc); err != nil {
ctr.clean(bat, proc)
reg.Wg.Done()
return err
}
}
reg.Wg.Done()
}
reg.Wg.Done()
return nil
}
func (ctr *Container) probe(rName, sName string, attrs []string, proc *process.Process) (bool, error) {
reg := proc.Reg.Ws[0]
defer reg.Wg.Done()
v := <-reg.Ch
if v == nil {
proc.Reg.Ax = nil
ctr.clean(nil, proc)
return true, nil
}
bat := v.(*batch.Batch)
if len(ctr.groups) == 0 {
reg.Ch = nil
proc.Reg.Ax = nil
ctr.clean(bat, proc)
return true, nil
}
bat.Reorder(attrs)
if len(ctr.attrs) == 0 {
ctr.attrs = append(ctr.attrs, attrs...)
{
for i, j := len(attrs), len(bat.Attrs); i < j; i++ {
ctr.attrs = append(ctr.attrs, rName+"."+bat.Attrs[i])
}
}
{
for i, j := len(attrs), len(ctr.bats[0].Attrs); i < j; i++ {
ctr.attrs = append(ctr.attrs, sName+"."+ctr.bats[0].Attrs[i])
}
}
}
ctr.probeState.bat = batch.New(true, ctr.attrs)
{
i := 0
for _, vec := range bat.Vecs {
ctr.probeState.bat.Vecs[i] = vector.New(vec.Typ)
i++
}
for j, k := len(attrs), len(ctr.bats[0].Vecs); j < k; j++ {
ctr.probeState.bat.Vecs[i] = vector.New(ctr.bats[0].Vecs[j].Typ)
i++
}
}
if len(bat.Sels) == 0 {
if err := ctr.probeBatch(bat, bat.Vecs[:len(attrs)], proc); err != nil {
ctr.clean(bat, proc)
return false, err
}
proc.Reg.Ax = ctr.probeState.bat
} else {
if err := ctr.probeBatchSels(bat.Sels, bat, bat.Vecs[:len(attrs)], proc); err != nil {
ctr.clean(bat, proc)
return false, err
}
proc.Reg.Ax = ctr.probeState.bat
}
bat.Clean(proc)
ctr.probeState.bat = nil
return false, nil
}
func (ctr *Container) buildBatch(vecs []*vector.Vector, proc *process.Process) error {
for i, j := 0, vecs[0].Length(); i < j; i += UnitLimit {
length := j - i
if length > UnitLimit {
length = UnitLimit
}
if err := ctr.buildUnit(i, length, nil, vecs, proc); err != nil {
return err
}
}
return nil
}
func (ctr *Container) buildBatchSels(sels []int64, vecs []*vector.Vector, proc *process.Process) error {
for i, j := 0, len(sels); i < j; i += UnitLimit {
length := j - i
if length > UnitLimit {
length = UnitLimit
}
if err := ctr.buildUnit(0, length, sels[i:i+length], vecs, proc); err != nil {
return err
}
}
return nil
}
func (ctr *Container) buildUnit(start, count int, sels []int64,
vecs []*vector.Vector, proc *process.Process) error {
var err error
{
copy(ctr.hashs[:count], OneUint64s[:count])
if len(sels) == 0 {
ctr.fillHash(start, count, vecs)
} else {
ctr.fillHashSels(count, sels, vecs)
}
}
copy(ctr.diffs[:count], ZeroBools[:count])
for i, hs := range ctr.slots.Ks {
for j, h := range hs {
remaining := ctr.sels[ctr.slots.Vs[i][j]]
if gs, ok := ctr.groups[h]; ok {
for _, g := range gs {
if remaining, err = g.Fill(remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
return err
}
copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
}
} else {
ctr.groups[h] = make([]*hash.SetGroup, 0, 8)
}
for len(remaining) > 0 {
g := hash.NewSetGroup(int64(len(ctr.bats)-1), int64(remaining[0]))
ctr.groups[h] = append(ctr.groups[h], g)
if remaining, err = g.Fill(remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
return err
}
copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
}
ctr.sels[ctr.slots.Vs[i][j]] = ctr.sels[ctr.slots.Vs[i][j]][:0]
}
}
ctr.slots.Reset()
return nil
}
func (ctr *Container) probeBatch(bat *batch.Batch, vecs []*vector.Vector, proc *process.Process) error {
for i, j := 0, vecs[0].Length(); i < j; i += UnitLimit {
length := j - i
if length > UnitLimit {
length = UnitLimit
}
if err := ctr.probeUnit(i, length, nil, bat, vecs, proc); err != nil {
return err
}
}
return nil
}
func (ctr *Container) probeBatchSels(sels []int64, bat *batch.Batch, vecs []*vector.Vector, proc *process.Process) error {
for i, j := 0, len(sels); i < j; i += UnitLimit {
length := j - i
if length > UnitLimit {
length = UnitLimit
}
if err := ctr.probeUnit(0, length, sels[i:i+length], bat, vecs, proc); err != nil {
return err
}
}
return nil
}
func (ctr *Container) probeUnit(start, count int, sels []int64, bat *batch.Batch,
vecs []*vector.Vector, proc *process.Process) error {
var sel int64
var err error
{
copy(ctr.hashs[:count], OneUint64s[:count])
if len(sels) == 0 {
ctr.fillHash(start, count, vecs)
} else {
ctr.fillHashSels(count, sels, vecs)
}
}
copy(ctr.diffs[:count], ZeroBools[:count])
for i, hs := range ctr.slots.Ks {
for j, h := range hs {
remaining := ctr.sels[ctr.slots.Vs[i][j]]
if gs, ok := ctr.groups[h]; ok {
for k := 0; k < len(gs); k++ {
g := gs[k]
if sel, remaining, err = g.Probe(remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
return err
}
if sel >= 0 {
gs = append(gs[:k], gs[k+1:]...)
k--
if len(gs) == 0 {
delete(ctr.groups, h)
}
{
for i, vec := range bat.Vecs {
if err := ctr.probeState.bat.Vecs[i].UnionOne(vec, sel, proc); err != nil {
return err
}
}
}
{
k := len(bat.Vecs)
for i, j := len(vecs), len(ctr.bats[g.Idx].Vecs); i < j; i++ {
if err := ctr.probeState.bat.Vecs[k].UnionOne(ctr.bats[g.Idx].Vecs[i], g.Sel, proc); err != nil {
return err
}
k++
}
}
}
copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
}
ctr.sels[ctr.slots.Vs[i][j]] = ctr.sels[ctr.slots.Vs[i][j]][:0]
}
}
}
ctr.slots.Reset()
return nil
}
func (ctr *Container) fillHash(start, count int, vecs []*vector.Vector) {
ctr.hashs = ctr.hashs[:count]
for _, vec := range vecs {
hash.Rehash(count, ctr.hashs, vec)
}
nextslot := 0
for i, h := range ctr.hashs {
slot, ok := ctr.slots.Get(h)
if !ok {
slot = nextslot
ctr.slots.Set(h, slot)
nextslot++
}
ctr.sels[slot] = append(ctr.sels[slot], int64(i+start))
}
}
func (ctr *Container) fillHashSels(count int, sels []int64, vecs []*vector.Vector) {
ctr.hashs = ctr.hashs[:count]
for _, vec := range vecs {
hash.RehashSels(count, sels, ctr.hashs, vec)
}
nextslot := 0
for i, h := range ctr.hashs {
slot, ok := ctr.slots.Get(h)
if !ok {
slot = nextslot
ctr.slots.Set(h, slot)
nextslot++
}
ctr.sels[slot] = append(ctr.sels[slot], sels[i])
}
}
func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
if bat != nil {
bat.Clean(proc)
}
fastmap.Pool.Put(ctr.slots)
if ctr.probeState.bat != nil {
ctr.probeState.bat.Clean(proc)
}
for _, bat := range ctr.bats {
bat.Clean(proc)
}
register.FreeRegisters(proc)
}
package natural
import (
"matrixbase/pkg/container/batch"
"matrixbase/pkg/hash"
"matrixbase/pkg/intmap/fastmap"
)
const (
UnitLimit = 1024
)
var (
ZeroBools []bool
OneUint64s []uint64
)
type Container struct {
builded bool
diffs []bool
matchs []int64
hashs []uint64
attrs []string
sels [][]int64 // sels
slots *fastmap.Map // hash code -> sels index
bats []*batch.Batch // s relation
probeState struct {
bat *batch.Batch // output relation
}
groups map[uint64][]*hash.SetGroup // hash code -> group list
}
type Argument struct {
R string
S string
Attrs []string
Ctr Container
}
package unittest
import (
"fmt"
"matrixbase/pkg/sql/colexec/hashset/natural"
"matrixbase/pkg/sql/colexec/transfer"
"matrixbase/pkg/vm"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/mmu/guest"
"matrixbase/pkg/vm/mmu/host"
"matrixbase/pkg/vm/pipeline"
"matrixbase/pkg/vm/process"
"sync"
"testing"
)
func TestNaturalJoin(t *testing.T) {
var wg sync.WaitGroup
var ins vm.Instructions
hm := host.New(1 << 20)
proc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
{
proc.Refer = make(map[string]uint64)
proc.Reg.Ws = make([]*process.WaitRegister, 2)
for i := 0; i < 2; i++ {
proc.Reg.Ws[i] = &process.WaitRegister{
Wg: new(sync.WaitGroup),
Ch: make(chan interface{}),
}
}
}
{
var rins vm.Instructions
rproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
{
rproc.Refer = make(map[string]uint64)
}
rins = append(rins, vm.Instruction{vm.Transfer, &transfer.Argument{proc.Reg.Ws[0]}})
rp := pipeline.New([]uint64{1, 1, 1}, []string{"orderId", "uid", "price"}, rins)
wg.Add(1)
go func() {
fmt.Printf("R: %s\n", rp)
rp.Run(segments("R", rproc), rproc)
fmt.Printf("R - guest: %v, host: %v\n", rproc.Size(), rproc.HostSize())
wg.Done()
}()
}
{
var sins vm.Instructions
sproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
{
sproc.Refer = make(map[string]uint64)
}
sins = append(sins, vm.Instruction{vm.Transfer, &transfer.Argument{proc.Reg.Ws[1]}})
sp := pipeline.New([]uint64{1, 1, 1}, []string{"uid", "price", "orderId"}, sins)
wg.Add(1)
go func() {
fmt.Printf("S: %s\n", sp)
sp.Run(segments("S", sproc), sproc)
fmt.Printf("S - guest: %v, host: %v\n", sproc.Size(), sproc.HostSize())
wg.Done()
}()
}
{
ins = append(ins, vm.Instruction{vm.SetNaturalJoin, &natural.Argument{R: "R", S: "S", Attrs: []string{"uid"}}})
ins = append(ins, vm.Instruction{vm.Output, nil})
}
p := pipeline.NewMerge(ins)
fmt.Printf("%s\n", p)
p.RunMerge(proc)
fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
wg.Wait()
}
......@@ -3,15 +3,16 @@ package register
import (
"matrixbase/pkg/container/types"
"matrixbase/pkg/container/vector"
"matrixbase/pkg/vm/mempool"
"matrixbase/pkg/vm/process"
)
func Get(proc *process.Process, size int64, typ types.Type) (*vector.Vector, error) {
for i, t := range proc.Reg.Ts {
v := t.(*vector.Vector)
if int64(cap(v.Data)) == size {
if int64(cap(v.Data[mempool.CountSize:])) >= size {
vec := vector.New(typ)
vec.Data = v.Data[:size]
vec.Data = v.Data
proc.Reg.Ts = append(proc.Reg.Ts[:i], proc.Reg.Ts[i+1:])
return vec, nil
}
......
......@@ -3,6 +3,7 @@ package vm
import (
"bytes"
"matrixbase/pkg/sql/colexec/hashset/intersect"
"matrixbase/pkg/sql/colexec/hashset/natural"
"matrixbase/pkg/sql/colexec/limit"
"matrixbase/pkg/sql/colexec/offset"
"matrixbase/pkg/sql/colexec/output"
......@@ -37,6 +38,8 @@ func String(ins Instructions, buf *bytes.Buffer) {
case SetIntersect:
intersect.String(in.Arg, buf)
case SetDifference:
case SetNaturalJoin:
natural.String(in.Arg, buf)
case Output:
output.String(in.Arg, buf)
}
......@@ -98,6 +101,10 @@ func Prepare(ins Instructions, proc *process.Process) error {
return err
}
case SetDifference:
case SetNaturalJoin:
if err := natural.Prepare(proc, in.Arg); err != nil {
return err
}
case Output:
if err := output.Prepare(proc, in.Arg); err != nil {
return err
......@@ -133,6 +140,8 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
case SetIntersect:
ok, err = intersect.Call(proc, in.Arg)
case SetDifference:
case SetNaturalJoin:
ok, err = natural.Call(proc, in.Arg)
case Output:
ok, err = output.Call(proc, in.Arg)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment