Skip to content
Snippets Groups Projects
Unverified Commit 387f3fa5 authored by chenmingsong's avatar chenmingsong Committed by GitHub
Browse files

implementing UNION colexec operator (#4197)

parent 8e712dca
No related branches found
No related tags found
No related merge requests found
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package union
import (
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch"
)
type Argument struct {
// attribute which need not do serialization work
ctr Container
}
type Container struct {
// hash table related.
hashTable *hashmap.StrHashMap
// bat records the final result of union operator.
bat *batch.Batch
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package union
import (
"bytes"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
func String(_ interface{}, buf *bytes.Buffer) {
buf.WriteString(" union ")
}
func Prepare(_ *process.Process, argument interface{}) error {
arg := argument.(*Argument)
{
arg.ctr.bat = nil
arg.ctr.hashTable = hashmap.NewStrMap(true)
}
return nil
}
func Call(idx int, proc *process.Process, argument interface{}) (bool, error) {
var err error
arg := argument.(*Argument)
// we make this assertion here for now, the real situation of table size
// should be noted by the execution plan
smallTableIndex, bigTableIndex := 1, 0
// prepare the analysis work
analyze := proc.GetAnalyze(idx)
analyze.Start()
defer analyze.Stop()
// step1: deal the small table. if new row, put into bat.
if err = arg.ctr.insert(proc, analyze, smallTableIndex); err != nil {
return false, err
}
// step2: deal the big table. if new row, put into bat.
if err = arg.ctr.insert(proc, analyze, bigTableIndex); err != nil {
return false, err
}
// step3: return
analyze.Output(arg.ctr.bat)
proc.Reg.InputBatch = arg.ctr.bat
return true, nil
}
// insert function use Table[index] to probe the HashTable.
// if row data doesn't in HashTable, append it to bat and update the HashTable.
func (ctr *Container) insert(proc *process.Process, analyze process.Analyze, index int) error {
var err error
inserted := make([]uint8, hashmap.UnitLimit)
restoreInserted := make([]uint8, hashmap.UnitLimit)
for {
bat := <-proc.Reg.MergeReceivers[index].Ch
if bat == nil {
return nil
}
if len(bat.Zs) == 0 {
continue
}
if ctr.bat == nil {
ctr.bat = batch.NewWithSize(len(bat.Vecs))
for i := range bat.Vecs {
ctr.bat.Vecs[i] = vector.New(bat.Vecs[i].Typ)
}
}
analyze.Input(bat)
count := len(bat.Zs)
scales := make([]int32, len(bat.Vecs))
for i := range scales {
scales[i] = bat.Vecs[i].Typ.Scale
}
for i := 0; i < count; i += hashmap.UnitLimit {
insertCount := 0
iterator := ctr.hashTable.NewIterator()
n := count - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}
vs, _ := iterator.Insert(i, n, bat.Vecs, scales)
copy(inserted[:n], restoreInserted[:n])
for j, v := range vs {
if v > ctr.hashTable.GroupCount() {
insertCount++
inserted[j] = 1
ctr.bat.Zs = append(ctr.bat.Zs, 1)
}
}
if insertCount > 0 {
for pos := range bat.Vecs {
if err = vector.UnionBatch(ctr.bat.Vecs[pos], bat.Vecs[pos], int64(i), insertCount, inserted[:n], proc.Mp); err != nil {
return err
}
}
}
}
bat.Clean(proc.Mp)
}
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package union
import (
"context"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/stretchr/testify/require"
"testing"
)
type unionTestCase struct {
proc *process.Process
arg *Argument
cancel context.CancelFunc
}
func TestUnion(t *testing.T) {
proc := testutil.NewProcess()
// [4 rows + 3 rows, 2 columns] union [5 rows + 5 rows, 2 columns]
c := newUnionTestCase(
proc,
[]*batch.Batch{
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, true, nil),
}, nil),
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, true, nil),
}, nil),
},
[]*batch.Batch{
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
}, nil),
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
}, nil),
},
)
err := Prepare(c.proc, c.arg)
require.NoError(t, err)
_, err = Call(0, c.proc, c.arg)
{
result := c.arg.ctr.bat
require.NoError(t, err)
require.Equal(t, 2, len(result.Vecs)) // 2 columns
require.Equal(t, 17, vector.Length(result.Vecs[0])) // 17 = (4+3+5+5) rows
}
c.proc.Reg.InputBatch.Clean(c.proc.Mp) // clean the final result
require.Equal(t, int64(0), mheap.Size(c.proc.Mp))
}
func newUnionTestCase(proc *process.Process, leftBatches, rightBatches []*batch.Batch) unionTestCase {
ctx, cancel := context.WithCancel(context.Background())
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
{
c := make(chan *batch.Batch, len(leftBatches)+1)
for i := range leftBatches {
c <- leftBatches[i]
}
c <- nil
proc.Reg.MergeReceivers[0] = &process.WaitRegister{
Ctx: ctx,
Ch: c,
}
}
{
c := make(chan *batch.Batch, len(rightBatches)+1)
for i := range rightBatches {
c <- rightBatches[i]
}
c <- nil
proc.Reg.MergeReceivers[1] = &process.WaitRegister{
Ctx: ctx,
Ch: c,
}
}
arg := &Argument{}
return unionTestCase{
proc: proc,
arg: arg,
cancel: cancel,
}
}
......@@ -50,6 +50,19 @@ func NewBatch(ts []types.Type, random bool, n int, m *mheap.Mheap) *batch.Batch
return bat
}
func NewBatchWithVectors(vs []*vector.Vector, zs []int64) *batch.Batch {
bat := batch.NewWithSize(len(vs))
if len(vs) > 0 {
l := vector.Length(vs[0])
if zs == nil {
zs = MakeBatchZs(l, false)
}
bat.Zs = zs
bat.Vecs = vs
}
return bat
}
func NewVector(n int, typ types.Type, m *mheap.Mheap, random bool, Values interface{}) *vector.Vector {
switch typ.Oid {
case types.T_bool:
......
......@@ -16,6 +16,7 @@ package vm
import (
"bytes"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/union"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion"
......@@ -83,6 +84,8 @@ var stringFunc = [...]func(interface{}, *bytes.Buffer){
Deletion: deletion.String,
Insert: insert.String,
Update: update.String,
Union: union.String,
}
var prepareFunc = [...]func(*process.Process, interface{}) error{
......@@ -117,6 +120,8 @@ var prepareFunc = [...]func(*process.Process, interface{}) error{
Deletion: deletion.Prepare,
Insert: insert.Prepare,
Update: update.Prepare,
Union: union.Prepare,
}
var execFunc = [...]func(int, *process.Process, interface{}) (bool, error){
......@@ -151,4 +156,6 @@ var execFunc = [...]func(int, *process.Process, interface{}) (bool, error){
Deletion: deletion.Call,
Insert: insert.Call,
Update: update.Call,
Union: union.Call,
}
......@@ -46,8 +46,13 @@ const (
Deletion
Insert
Update
Union
Minus
)
var _ = Minus
// Instruction contains relational algebra
type Instruction struct {
// Op specified the operator code of an instruction.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment