Skip to content
Snippets Groups Projects
Unverified Commit ce0e5d07 authored by nnsgmsone's avatar nnsgmsone Committed by GitHub
Browse files

Add hashbuild for pushdown (#4536)

Approved by: @yingfeng
parent 8d94b449
No related branches found
No related tags found
No related merge requests found
Showing
with 627 additions and 341 deletions
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hashmap
import (
"sync/atomic"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
)
func NewJoinMap(sels [][]int64, expr *plan.Expr, mp *StrHashMap, hasNull bool) *JoinMap {
return &JoinMap{
cnt: 1,
mp: mp,
expr: expr,
sels: sels,
hasNull: hasNull,
}
}
func (jm *JoinMap) Sels() [][]int64 {
return jm.sels
}
func (jm *JoinMap) Map() *StrHashMap {
return jm.mp
}
func (jm *JoinMap) Expr() *plan.Expr {
return jm.expr
}
func (jm *JoinMap) HasNull() bool {
return jm.hasNull
}
func (jm *JoinMap) Dup() *JoinMap {
m0 := &StrHashMap{
m: jm.mp.m,
hashMap: jm.mp.hashMap,
hasNull: jm.mp.hasNull,
ibucket: jm.mp.ibucket,
nbucket: jm.mp.nbucket,
values: make([]uint64, UnitLimit),
zValues: make([]int64, UnitLimit),
keys: make([][]byte, UnitLimit),
strHashStates: make([][3]uint64, UnitLimit),
}
return &JoinMap{
mp: m0,
expr: jm.expr,
sels: jm.sels,
hasNull: jm.hasNull,
cnt: atomic.LoadInt64(&jm.cnt),
}
}
func (jm *JoinMap) IncRef(ref int64) {
atomic.AddInt64(&jm.cnt, ref)
}
func (jm *JoinMap) Free() {
if atomic.AddInt64(&jm.cnt, -1) != 0 {
return
}
jm.mp.Free()
}
...@@ -17,6 +17,7 @@ package hashmap ...@@ -17,6 +17,7 @@ package hashmap
import ( import (
"github.com/matrixorigin/matrixone/pkg/container/hashtable" "github.com/matrixorigin/matrixone/pkg/container/hashtable"
"github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mheap"
) )
...@@ -58,6 +59,16 @@ type Iterator interface { ...@@ -58,6 +59,16 @@ type Iterator interface {
Find(start, count int, vecs []*vector.Vector, inBuckets []uint8) (vs []uint64, zvs []int64) Find(start, count int, vecs []*vector.Vector, inBuckets []uint8) (vs []uint64, zvs []int64)
} }
// JoinMap is used for join
type JoinMap struct {
cnt int64
sels [][]int64
// push-down filter expression, possibly a bloomfilter
expr *plan.Expr
mp *StrHashMap
hasNull bool
}
// StrHashMap key is []byte, value is an uint64 value (starting from 1) // StrHashMap key is []byte, value is an uint64 value (starting from 1)
// //
// each time a new key is inserted, the hashtable returns a last-value+1 or, if the old key is inserted, the value corresponding to that key // each time a new key is inserted, the hashtable returns a last-value+1 or, if the old key is inserted, the value corresponding to that key
......
...@@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { ...@@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) {
} }
func Prepare(proc *process.Process, arg any) error { func Prepare(proc *process.Process, arg any) error {
var err error
ap := arg.(*Argument) ap := arg.(*Argument)
ap.ctr = new(container) ap.ctr = new(container)
if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil {
return err
}
ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit)
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0]))
ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0]))
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0]))
return nil return nil
} }
...@@ -54,8 +49,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -54,8 +49,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
case Build: case Build:
if err := ctr.build(ap, proc, anal); err != nil { if err := ctr.build(ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
return true, err return true, err
} }
ctr.state = Probe ctr.state = Probe
...@@ -63,8 +59,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -63,8 +59,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
bat := <-proc.Reg.MergeReceivers[0].Ch bat := <-proc.Reg.MergeReceivers[0].Ch
if bat == nil { if bat == nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
if ctr.bat != nil { if ctr.bat != nil {
ctr.bat.Clean(proc.GetMheap()) ctr.bat.Clean(proc.GetMheap())
} }
...@@ -73,19 +70,21 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -73,19 +70,21 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
if bat.Length() == 0 { if bat.Length() == 0 {
continue continue
} }
if ctr.bat == nil { if ctr.bat == nil || ctr.bat.Length() == 0 {
if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil { if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
proc.SetInputBatch(nil) proc.SetInputBatch(nil)
return true, err return true, err
} }
} else { } else {
if err := ctr.probe(bat, ap, proc, anal); err != nil { if err := ctr.probe(bat, ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
proc.SetInputBatch(nil) proc.SetInputBatch(nil)
return true, err return true, err
} }
...@@ -99,66 +98,11 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -99,66 +98,11 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
} }
func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error {
var err error bat := <-proc.Reg.MergeReceivers[1].Ch
if bat != nil {
for { ctr.bat = bat
bat := <-proc.Reg.MergeReceivers[1].Ch ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup()
if bat == nil { ctr.hasNull = ctr.mp.HasNull()
if ctr.bat != nil {
ctr.bat.ExpandNulls()
}
break
}
if bat.Length() == 0 {
continue
}
if ctr.bat == nil {
ctr.bat = batch.NewWithSize(len(bat.Vecs))
for i, vec := range bat.Vecs {
ctr.bat.Vecs[i] = vector.New(vec.Typ)
}
}
anal.Input(bat)
anal.Alloc(int64(bat.Size()))
if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil {
bat.Clean(proc.GetMheap())
ctr.bat.Clean(proc.GetMheap())
return err
}
bat.Clean(proc.GetMheap())
}
if ctr.bat == nil || ctr.bat.Length() == 0 {
return nil
}
if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil {
return err
}
defer ctr.freeJoinCondition(proc)
itr := ctr.mp.NewIterator()
count := ctr.bat.Length()
for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}
rows := ctr.mp.GroupCount()
vals, zvals, err := itr.Insert(i, n, ctr.vecs)
if err != nil {
ctr.bat.Clean(proc.GetMheap())
return err
}
for k, v := range vals[:n] {
if zvals[k] == 0 {
ctr.hasNull = true
continue
}
if v > rows {
ctr.sels = append(ctr.sels, proc.GetMheap().GetSels())
}
ai := int64(v) - 1
ctr.sels[ai] = append(ctr.sels[ai], int64(i+k))
}
} }
return nil return nil
} }
...@@ -211,7 +155,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces ...@@ -211,7 +155,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces
} }
defer ctr.freeJoinCondition(proc) defer ctr.freeJoinCondition(proc)
count := bat.Length() count := bat.Length()
itr := ctr.mp.NewIterator() itr := ctr.mp.Map().NewIterator()
for i := 0; i < count; i += hashmap.UnitLimit { for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i n := count - i
if n > hashmap.UnitLimit { if n > hashmap.UnitLimit {
...@@ -275,10 +219,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { ...@@ -275,10 +219,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) {
} }
} }
} }
func (ctr *container) freeSels(proc *process.Process) {
for i := range ctr.sels {
proc.GetMheap().PutSels(ctr.sels[i])
}
ctr.sels = nil
}
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mheap"
"github.com/matrixorigin/matrixone/pkg/vm/process" "github.com/matrixorigin/matrixone/pkg/vm/process"
...@@ -40,6 +41,7 @@ type antiTestCase struct { ...@@ -40,6 +41,7 @@ type antiTestCase struct {
types []types.Type types []types.Type
proc *process.Process proc *process.Process
cancel context.CancelFunc cancel context.CancelFunc
barg *hashbuild.Argument
} }
var ( var (
...@@ -78,6 +80,7 @@ func TestString(t *testing.T) { ...@@ -78,6 +80,7 @@ func TestString(t *testing.T) {
func TestAnti(t *testing.T) { func TestAnti(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -86,9 +89,7 @@ func TestAnti(t *testing.T) { ...@@ -86,9 +89,7 @@ func TestAnti(t *testing.T) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -141,6 +142,7 @@ func BenchmarkAnti(b *testing.B) { ...@@ -141,6 +142,7 @@ func BenchmarkAnti(b *testing.B) {
} }
t := new(testing.T) t := new(testing.T)
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -149,9 +151,7 @@ func BenchmarkAnti(b *testing.B) { ...@@ -149,9 +151,7 @@ func BenchmarkAnti(b *testing.B) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -196,12 +196,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] ...@@ -196,12 +196,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs []
proc: proc, proc: proc,
cancel: cancel, cancel: cancel,
arg: &Argument{ arg: &Argument{
Typs: ts,
Result: rp, Result: rp,
Conditions: cs, Conditions: cs,
}, },
barg: &hashbuild.Argument{
Typs: ts,
NeedHashMap: true,
Conditions: cs[1],
},
} }
} }
func hashBuild(t *testing.T, tc antiTestCase) *batch.Batch {
err := hashbuild.Prepare(tc.proc, tc.barg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil
ok, err := hashbuild.Call(0, tc.proc, tc.barg)
require.NoError(t, err)
require.Equal(t, true, ok)
return tc.proc.Reg.InputBatch
}
// create a new block based on the type information, flgs[i] == ture: has null // create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
return testutil.NewBatch(ts, false, int(rows), proc.Mp) return testutil.NewBatch(ts, false, int(rows), proc.Mp)
......
...@@ -17,6 +17,7 @@ package anti ...@@ -17,6 +17,7 @@ package anti
import ( import (
"github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/plan"
) )
...@@ -37,15 +38,14 @@ type container struct { ...@@ -37,15 +38,14 @@ type container struct {
hasNull bool hasNull bool
sels [][]int64
inBuckets []uint8 inBuckets []uint8
bat *batch.Batch bat *batch.Batch
evecs []evalVector evecs []evalVector
vecs []*vector.Vector vecs []*vector.Vector
mp *hashmap.StrHashMap
mp *hashmap.JoinMap
} }
type Argument struct { type Argument struct {
...@@ -53,5 +53,6 @@ type Argument struct { ...@@ -53,5 +53,6 @@ type Argument struct {
Ibucket uint64 Ibucket uint64
Nbucket uint64 Nbucket uint64
Result []int32 Result []int32
Typs []types.Type
Conditions [][]*plan.Expr Conditions [][]*plan.Expr
} }
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"bytes" "bytes"
"sync/atomic" "sync/atomic"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/process" "github.com/matrixorigin/matrixone/pkg/vm/process"
) )
...@@ -42,7 +43,7 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) { ...@@ -42,7 +43,7 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) {
case reg.Ch <- nil: case reg.Ch <- nil:
} }
} }
return false, nil return true, nil
} }
vecs := ap.vecs[:0] vecs := ap.vecs[:0]
for i := range bat.Vecs { for i := range bat.Vecs {
...@@ -62,6 +63,12 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) { ...@@ -62,6 +63,12 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) {
} }
if ap.All { if ap.All {
atomic.AddInt64(&bat.Cnt, int64(len(ap.Regs))-1) atomic.AddInt64(&bat.Cnt, int64(len(ap.Regs))-1)
if bat.Ht != nil {
jm, ok := bat.Ht.(*hashmap.JoinMap)
if ok {
jm.IncRef(int64(len(ap.Regs)) - 1)
}
}
for _, reg := range ap.Regs { for _, reg := range ap.Regs {
select { select {
case <-reg.Ctx.Done(): case <-reg.Ctx.Done():
......
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hashbuild
import (
"bytes"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
func String(_ any, buf *bytes.Buffer) {
buf.WriteString(" hash build ")
}
func Prepare(proc *process.Process, arg any) error {
var err error
ap := arg.(*Argument)
ap.ctr = new(container)
if ap.NeedHashMap {
if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil {
return err
}
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions))
ap.ctr.evecs = make([]evalVector, len(ap.Conditions))
}
ap.ctr.bat = batch.NewWithSize(len(ap.Typs))
ap.ctr.bat.Zs = proc.GetMheap().GetSels()
for i, typ := range ap.Typs {
ap.ctr.bat.Vecs[i] = vector.New(typ)
}
return nil
}
func Call(idx int, proc *process.Process, arg any) (bool, error) {
anal := proc.GetAnalyze(idx)
anal.Start()
defer anal.Stop()
ap := arg.(*Argument)
ctr := ap.ctr
for {
switch ctr.state {
case Build:
if err := ctr.build(ap, proc, anal); err != nil {
ctr.state = End
ctr.mp.Free()
return true, err
}
ctr.state = End
default:
if ctr.bat != nil {
if ap.NeedHashMap {
ctr.bat.Ht = hashmap.NewJoinMap(ctr.sels, nil, ctr.mp, ctr.hasNull)
}
proc.SetInputBatch(ctr.bat)
ctr.bat = nil
} else {
proc.SetInputBatch(nil)
}
return true, nil
}
}
}
func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error {
var err error
for {
bat := <-proc.Reg.MergeReceivers[0].Ch
if bat == nil {
break
}
if bat.Length() == 0 {
continue
}
anal.Input(bat)
anal.Alloc(int64(bat.Size()))
if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil {
bat.Clean(proc.GetMheap())
ctr.bat.Clean(proc.GetMheap())
return err
}
bat.Clean(proc.GetMheap())
}
if ctr.bat == nil || ctr.bat.Length() == 0 || !ap.NeedHashMap {
return nil
}
if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions, proc); err != nil {
return err
}
defer ctr.freeJoinCondition(proc)
itr := ctr.mp.NewIterator()
count := ctr.bat.Length()
for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}
rows := ctr.mp.GroupCount()
vals, zvals, err := itr.Insert(i, n, ctr.vecs)
if err != nil {
return err
}
for k, v := range vals[:n] {
if zvals[k] == 0 {
ctr.hasNull = true
continue
}
if v == 0 {
continue
}
if v > rows {
ctr.sels = append(ctr.sels, make([]int64, 0, 8))
}
ai := int64(v) - 1
ctr.sels[ai] = append(ctr.sels[ai], int64(i+k))
}
}
return nil
}
func (ctr *container) evalJoinCondition(bat *batch.Batch, conds []*plan.Expr, proc *process.Process) error {
for i, cond := range conds {
vec, err := colexec.EvalExpr(bat, proc, cond)
if err != nil || vec.ConstExpand(proc.GetMheap()) == nil {
for j := 0; j < i; j++ {
if ctr.evecs[j].needFree {
vector.Clean(ctr.evecs[j].vec, proc.GetMheap())
}
}
return err
}
ctr.vecs[i] = vec
ctr.evecs[i].vec = vec
ctr.evecs[i].needFree = true
for j := range bat.Vecs {
if bat.Vecs[j] == vec {
ctr.evecs[i].needFree = false
break
}
}
}
return nil
}
func (ctr *container) freeJoinCondition(proc *process.Process) {
for i := range ctr.evecs {
if ctr.evecs[i].needFree {
ctr.evecs[i].vec.Free(proc.GetMheap())
}
}
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hashbuild
import (
"bytes"
"context"
"testing"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/stretchr/testify/require"
)
const (
Rows = 10 // default rows
BenchmarkRows = 100000 // default rows for benchmark
)
// add unit tests for cases
type buildTestCase struct {
arg *Argument
flgs []bool // flgs[i] == true: nullable
types []types.Type
proc *process.Process
cancel context.CancelFunc
}
var (
tcs []buildTestCase
)
func init() {
tcs = []buildTestCase{
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}},
[]*plan.Expr{
newExpr(0, types.Type{Oid: types.T_int8}),
}),
newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}},
[]*plan.Expr{
newExpr(0, types.Type{Oid: types.T_int8}),
}),
}
}
func TestString(t *testing.T) {
buf := new(bytes.Buffer)
for _, tc := range tcs {
String(tc.arg, buf)
}
}
func TestBuild(t *testing.T) {
for _, tc := range tcs[:1] {
err := Prepare(tc.proc, tc.arg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[0].Ch <- nil
for {
ok, err := Call(0, tc.proc, tc.arg)
require.NoError(t, err)
require.Equal(t, true, ok)
mp := tc.proc.Reg.InputBatch.Ht.(*hashmap.JoinMap)
mp.Free()
tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
break
}
require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
}
}
func BenchmarkBuild(b *testing.B) {
for i := 0; i < b.N; i++ {
tcs = []buildTestCase{
newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}},
[]*plan.Expr{
newExpr(0, types.Type{Oid: types.T_int8}),
}),
}
t := new(testing.T)
for _, tc := range tcs {
err := Prepare(tc.proc, tc.arg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[0].Ch <- nil
for {
ok, err := Call(0, tc.proc, tc.arg)
require.NoError(t, err)
require.Equal(t, true, ok)
mp := tc.proc.Reg.InputBatch.Ht.(*hashmap.JoinMap)
mp.Free()
tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
break
}
}
}
}
func newExpr(pos int32, typ types.Type) *plan.Expr {
return &plan.Expr{
Typ: &plan.Type{
Size: typ.Size,
Scale: typ.Scale,
Width: typ.Width,
Id: int32(typ.Oid),
},
Expr: &plan.Expr_Col{
Col: &plan.ColRef{
ColPos: pos,
},
},
}
}
func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, cs []*plan.Expr) buildTestCase {
proc := process.New(m)
proc.Reg.MergeReceivers = make([]*process.WaitRegister, 1)
ctx, cancel := context.WithCancel(context.Background())
proc.Reg.MergeReceivers[0] = &process.WaitRegister{
Ctx: ctx,
Ch: make(chan *batch.Batch, 10),
}
return buildTestCase{
types: ts,
flgs: flgs,
proc: proc,
cancel: cancel,
arg: &Argument{
Typs: ts,
Conditions: cs,
NeedHashMap: true,
},
}
}
// create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
return testutil.NewBatch(ts, false, int(rows), proc.Mp)
}
...@@ -12,32 +12,48 @@ ...@@ -12,32 +12,48 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package union package hashbuild
import ( import (
"github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
) )
const ( const (
Build = iota Build = iota
Probe
End End
) )
type evalVector struct {
needFree bool
vec *vector.Vector
}
type container struct { type container struct {
state int state int
// hash table related. hasNull bool
hashTable *hashmap.StrHashMap
sels [][]int64
// bat records the final result of union operator.
bat *batch.Batch bat *batch.Batch
evecs []evalVector
vecs []*vector.Vector
mp *hashmap.StrHashMap
} }
type Argument struct { type Argument struct {
// attribute which need not do serialization work ctr *container
ctr *container // need to generate a push-down filter expression
Ibucket uint64 // index in buckets NeedExpr bool
Nbucket uint64 // buckets count NeedHashMap bool
Ibucket uint64
Nbucket uint64
Typs []types.Type
Conditions []*plan.Expr
} }
...@@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { ...@@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) {
} }
func Prepare(proc *process.Process, arg any) error { func Prepare(proc *process.Process, arg any) error {
var err error
ap := arg.(*Argument) ap := arg.(*Argument)
ap.ctr = new(container) ap.ctr = new(container)
if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil {
return err
}
ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit)
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0]))
ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0]))
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0]))
return nil return nil
} }
...@@ -55,7 +50,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -55,7 +50,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
if err := ctr.build(ap, proc, anal); err != nil { if err := ctr.build(ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() ctr.mp.Free()
ctr.freeSels(proc)
return true, err return true, err
} }
ctr.state = Probe ctr.state = Probe
...@@ -64,7 +58,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -64,7 +58,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
if bat == nil { if bat == nil {
ctr.state = End ctr.state = End
ctr.mp.Free() ctr.mp.Free()
ctr.freeSels(proc)
if ctr.bat != nil { if ctr.bat != nil {
ctr.bat.Clean(proc.GetMheap()) ctr.bat.Clean(proc.GetMheap())
} }
...@@ -73,14 +66,13 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -73,14 +66,13 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
if bat.Length() == 0 { if bat.Length() == 0 {
continue continue
} }
if ctr.bat == nil { if ctr.bat == nil || ctr.bat.Length() == 0 {
bat.Clean(proc.GetMheap()) bat.Clean(proc.GetMheap())
continue continue
} }
if err := ctr.probe(bat, ap, proc, anal); err != nil { if err := ctr.probe(bat, ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() ctr.mp.Free()
ctr.freeSels(proc)
proc.SetInputBatch(nil) proc.SetInputBatch(nil)
return true, err return true, err
} }
...@@ -93,63 +85,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -93,63 +85,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
} }
func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error {
var err error bat := <-proc.Reg.MergeReceivers[1].Ch
ctr.bat = bat
for { ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup()
bat := <-proc.Reg.MergeReceivers[1].Ch
if bat == nil {
break
}
if bat.Length() == 0 {
continue
}
if ctr.bat == nil {
ctr.bat = batch.NewWithSize(len(bat.Vecs))
for i, vec := range bat.Vecs {
ctr.bat.Vecs[i] = vector.New(vec.Typ)
}
ctr.bat.Zs = proc.GetMheap().GetSels()
}
anal.Input(bat)
anal.Alloc(int64(bat.Size()))
if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil {
bat.Clean(proc.GetMheap())
ctr.bat.Clean(proc.GetMheap())
return err
}
bat.Clean(proc.GetMheap())
}
if ctr.bat == nil || ctr.bat.Length() == 0 {
return nil
}
if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil {
return err
}
defer ctr.freeJoinCondition(proc)
itr := ctr.mp.NewIterator()
count := ctr.bat.Length()
for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}
rows := ctr.mp.GroupCount()
vals, zvals, err := itr.Insert(i, n, ctr.vecs)
if err != nil {
return err
}
for k, v := range vals {
if zvals[k] == 0 {
continue
}
if v > rows {
ctr.sels = append(ctr.sels, proc.GetMheap().GetSels())
}
ai := int64(v) - 1
ctr.sels[ai] = append(ctr.sels[ai], int64(i+k))
}
}
return nil return nil
} }
...@@ -170,7 +108,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces ...@@ -170,7 +108,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces
} }
defer ctr.freeJoinCondition(proc) defer ctr.freeJoinCondition(proc)
count := bat.Length() count := bat.Length()
itr := ctr.mp.NewIterator() mSels := ctr.mp.Sels()
itr := ctr.mp.Map().NewIterator()
for i := 0; i < count; i += hashmap.UnitLimit { for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i n := count - i
if n > hashmap.UnitLimit { if n > hashmap.UnitLimit {
...@@ -179,16 +118,10 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces ...@@ -179,16 +118,10 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces
copy(ctr.inBuckets, hashmap.OneUInt8s) copy(ctr.inBuckets, hashmap.OneUInt8s)
vals, zvals := itr.Find(i, n, ctr.vecs, ctr.inBuckets) vals, zvals := itr.Find(i, n, ctr.vecs, ctr.inBuckets)
for k := 0; k < n; k++ { for k := 0; k < n; k++ {
if ctr.inBuckets[k] == 0 { if ctr.inBuckets[k] == 0 || zvals[k] == 0 || vals[k] == 0 {
continue continue
} }
if zvals[k] == 0 { sels := mSels[vals[k]-1]
continue
}
if vals[k] == 0 {
continue
}
sels := ctr.sels[vals[k]-1]
for _, sel := range sels { for _, sel := range sels {
for j, rp := range ap.Result { for j, rp := range ap.Result {
if rp.Rel == 0 { if rp.Rel == 0 {
...@@ -244,10 +177,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { ...@@ -244,10 +177,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) {
} }
} }
} }
func (ctr *container) freeSels(proc *process.Process) {
for i := range ctr.sels {
proc.GetMheap().PutSels(ctr.sels[i])
}
ctr.sels = nil
}
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mheap"
"github.com/matrixorigin/matrixone/pkg/vm/process" "github.com/matrixorigin/matrixone/pkg/vm/process"
...@@ -40,6 +41,7 @@ type joinTestCase struct { ...@@ -40,6 +41,7 @@ type joinTestCase struct {
types []types.Type types []types.Type
proc *process.Process proc *process.Process
cancel context.CancelFunc cancel context.CancelFunc
barg *hashbuild.Argument
} }
var ( var (
...@@ -87,6 +89,7 @@ func TestString(t *testing.T) { ...@@ -87,6 +89,7 @@ func TestString(t *testing.T) {
func TestJoin(t *testing.T) { func TestJoin(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -95,9 +98,7 @@ func TestJoin(t *testing.T) { ...@@ -95,9 +98,7 @@ func TestJoin(t *testing.T) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -132,6 +133,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -132,6 +133,7 @@ func BenchmarkJoin(b *testing.B) {
} }
t := new(testing.T) t := new(testing.T)
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -140,9 +142,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -140,9 +142,7 @@ func BenchmarkJoin(b *testing.B) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -187,12 +187,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c ...@@ -187,12 +187,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c
proc: proc, proc: proc,
cancel: cancel, cancel: cancel,
arg: &Argument{ arg: &Argument{
Typs: ts,
Result: rp, Result: rp,
Conditions: cs, Conditions: cs,
}, },
barg: &hashbuild.Argument{
Typs: ts,
NeedHashMap: true,
Conditions: cs[1],
},
} }
} }
func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch {
err := hashbuild.Prepare(tc.proc, tc.barg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil
ok, err := hashbuild.Call(0, tc.proc, tc.barg)
require.NoError(t, err)
require.Equal(t, true, ok)
return tc.proc.Reg.InputBatch
}
// create a new block based on the type information, flgs[i] == ture: has null // create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
return testutil.NewBatch(ts, false, int(rows), proc.Mp) return testutil.NewBatch(ts, false, int(rows), proc.Mp)
......
...@@ -17,6 +17,7 @@ package join ...@@ -17,6 +17,7 @@ package join
import ( import (
"github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/plan"
) )
...@@ -35,8 +36,6 @@ type evalVector struct { ...@@ -35,8 +36,6 @@ type evalVector struct {
type container struct { type container struct {
state int state int
sels [][]int64
inBuckets []uint8 inBuckets []uint8
bat *batch.Batch bat *batch.Batch
...@@ -44,7 +43,7 @@ type container struct { ...@@ -44,7 +43,7 @@ type container struct {
evecs []evalVector evecs []evalVector
vecs []*vector.Vector vecs []*vector.Vector
mp *hashmap.StrHashMap mp *hashmap.JoinMap
} }
type ResultPos struct { type ResultPos struct {
...@@ -57,5 +56,6 @@ type Argument struct { ...@@ -57,5 +56,6 @@ type Argument struct {
Ibucket uint64 // index in buckets Ibucket uint64 // index in buckets
Nbucket uint64 // buckets count Nbucket uint64 // buckets count
Result []ResultPos Result []ResultPos
Typs []types.Type
Conditions [][]*plan.Expr Conditions [][]*plan.Expr
} }
...@@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { ...@@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) {
} }
func Prepare(proc *process.Process, arg any) error { func Prepare(proc *process.Process, arg any) error {
var err error
ap := arg.(*Argument) ap := arg.(*Argument)
ap.ctr = new(container) ap.ctr = new(container)
if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil {
return err
}
ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit)
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0]))
ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0]))
ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0]))
ap.ctr.bat = batch.NewWithSize(len(ap.Typs)) ap.ctr.bat = batch.NewWithSize(len(ap.Typs))
ap.ctr.bat.Zs = proc.GetMheap().GetSels() ap.ctr.bat.Zs = proc.GetMheap().GetSels()
for i, typ := range ap.Typs { for i, typ := range ap.Typs {
...@@ -59,8 +54,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -59,8 +54,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
case Build: case Build:
if err := ctr.build(ap, proc, anal); err != nil { if err := ctr.build(ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
return true, err return true, err
} }
ctr.state = Probe ctr.state = Probe
...@@ -68,8 +64,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -68,8 +64,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
bat := <-proc.Reg.MergeReceivers[0].Ch bat := <-proc.Reg.MergeReceivers[0].Ch
if bat == nil { if bat == nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
if ctr.bat != nil { if ctr.bat != nil {
ctr.bat.Clean(proc.GetMheap()) ctr.bat.Clean(proc.GetMheap())
} }
...@@ -81,8 +78,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -81,8 +78,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
if ctr.bat.Length() == 0 { if ctr.bat.Length() == 0 {
if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil { if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
proc.SetInputBatch(nil) proc.SetInputBatch(nil)
return true, err return true, err
} }
...@@ -90,8 +88,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -90,8 +88,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
} else { } else {
if err := ctr.probe(bat, ap, proc, anal); err != nil { if err := ctr.probe(bat, ap, proc, anal); err != nil {
ctr.state = End ctr.state = End
ctr.mp.Free() if ctr.mp != nil {
ctr.freeSels(proc) ctr.mp.Free()
}
proc.SetInputBatch(nil) proc.SetInputBatch(nil)
return true, err return true, err
} }
...@@ -105,55 +104,10 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -105,55 +104,10 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
} }
func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error {
var err error bat := <-proc.Reg.MergeReceivers[1].Ch
if bat != nil {
for { ctr.bat = bat
bat := <-proc.Reg.MergeReceivers[1].Ch ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup()
if bat == nil {
break
}
if bat.Length() == 0 {
continue
}
anal.Input(bat)
anal.Alloc(int64(bat.Size()))
if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil {
bat.Clean(proc.GetMheap())
ctr.bat.Clean(proc.GetMheap())
return err
}
bat.Clean(proc.GetMheap())
}
if ctr.bat == nil || ctr.bat.Length() == 0 {
return nil
}
if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil {
return err
}
defer ctr.freeJoinCondition(proc)
itr := ctr.mp.NewIterator()
count := ctr.bat.Length()
for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}
rows := ctr.mp.GroupCount()
vals, zvals, err := itr.Insert(i, n, ctr.vecs)
if err != nil {
return err
}
for k, v := range vals[:n] {
if zvals[k] == 0 {
continue
}
if v > rows {
ctr.sels = append(ctr.sels, proc.GetMheap().GetSels())
}
ai := int64(v) - 1
ctr.sels[ai] = append(ctr.sels[ai], int64(i+k))
}
} }
return nil return nil
} }
...@@ -216,7 +170,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces ...@@ -216,7 +170,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces
} }
defer ctr.freeJoinCondition(proc) defer ctr.freeJoinCondition(proc)
count := bat.Length() count := bat.Length()
itr := ctr.mp.NewIterator() mSels := ctr.mp.Sels()
itr := ctr.mp.Map().NewIterator()
for i := 0; i < count; i += hashmap.UnitLimit { for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i n := count - i
if n > hashmap.UnitLimit { if n > hashmap.UnitLimit {
...@@ -245,7 +200,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces ...@@ -245,7 +200,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces
rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) rbat.Zs = append(rbat.Zs, bat.Zs[i+k])
continue continue
} }
sels := ctr.sels[vals[k]-1] sels := mSels[vals[k]-1]
for _, sel := range sels { for _, sel := range sels {
for j, rp := range ap.Result { for j, rp := range ap.Result {
if rp.Rel == 0 { if rp.Rel == 0 {
...@@ -301,10 +256,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { ...@@ -301,10 +256,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) {
} }
} }
} }
func (ctr *container) freeSels(proc *process.Process) {
for i := range ctr.sels {
proc.GetMheap().PutSels(ctr.sels[i])
}
ctr.sels = nil
}
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mheap"
"github.com/matrixorigin/matrixone/pkg/vm/process" "github.com/matrixorigin/matrixone/pkg/vm/process"
...@@ -40,6 +41,7 @@ type joinTestCase struct { ...@@ -40,6 +41,7 @@ type joinTestCase struct {
types []types.Type types []types.Type
proc *process.Process proc *process.Process
cancel context.CancelFunc cancel context.CancelFunc
barg *hashbuild.Argument
} }
var ( var (
...@@ -87,6 +89,7 @@ func TestString(t *testing.T) { ...@@ -87,6 +89,7 @@ func TestString(t *testing.T) {
func TestJoin(t *testing.T) { func TestJoin(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -95,9 +98,7 @@ func TestJoin(t *testing.T) { ...@@ -95,9 +98,7 @@ func TestJoin(t *testing.T) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -151,6 +152,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -151,6 +152,7 @@ func BenchmarkJoin(b *testing.B) {
} }
t := new(testing.T) t := new(testing.T)
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -159,9 +161,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -159,9 +161,7 @@ func BenchmarkJoin(b *testing.B) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -210,9 +210,25 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c ...@@ -210,9 +210,25 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c
Result: rp, Result: rp,
Conditions: cs, Conditions: cs,
}, },
barg: &hashbuild.Argument{
Typs: ts,
NeedHashMap: true,
Conditions: cs[1],
},
} }
} }
func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch {
err := hashbuild.Prepare(tc.proc, tc.barg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil
ok, err := hashbuild.Call(0, tc.proc, tc.barg)
require.NoError(t, err)
require.Equal(t, true, ok)
return tc.proc.Reg.InputBatch
}
// create a new block based on the type information, flgs[i] == ture: has null // create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
return testutil.NewBatch(ts, false, int(rows), proc.Mp) return testutil.NewBatch(ts, false, int(rows), proc.Mp)
......
...@@ -36,8 +36,6 @@ type evalVector struct { ...@@ -36,8 +36,6 @@ type evalVector struct {
type container struct { type container struct {
state int state int
sels [][]int64
inBuckets []uint8 inBuckets []uint8
bat *batch.Batch bat *batch.Batch
...@@ -45,7 +43,7 @@ type container struct { ...@@ -45,7 +43,7 @@ type container struct {
evecs []evalVector evecs []evalVector
vecs []*vector.Vector vecs []*vector.Vector
mp *hashmap.StrHashMap mp *hashmap.JoinMap
} }
type ResultPos struct { type ResultPos struct {
...@@ -57,7 +55,7 @@ type Argument struct { ...@@ -57,7 +55,7 @@ type Argument struct {
ctr *container ctr *container
Ibucket uint64 Ibucket uint64
Nbucket uint64 Nbucket uint64
Typs []types.Type
Result []ResultPos Result []ResultPos
Typs []types.Type
Conditions [][]*plan.Expr Conditions [][]*plan.Expr
} }
...@@ -83,31 +83,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -83,31 +83,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
} }
func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error {
var err error bat := <-proc.Reg.MergeReceivers[1].Ch
if bat != nil {
for { ctr.bat = bat
bat := <-proc.Reg.MergeReceivers[1].Ch
if bat == nil {
break
}
if bat.Length() == 0 {
continue
}
if ctr.bat == nil {
ctr.bat = batch.NewWithSize(len(bat.Vecs))
for i, vec := range bat.Vecs {
ctr.bat.Vecs[i] = vector.New(vec.Typ)
}
ctr.bat.Zs = proc.GetMheap().GetSels()
}
anal.Input(bat)
anal.Alloc(int64(bat.Size()))
if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil {
bat.Clean(proc.GetMheap())
ctr.bat.Clean(proc.GetMheap())
return err
}
bat.Clean(proc.GetMheap())
} }
return nil return nil
} }
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mheap"
...@@ -43,6 +44,7 @@ type joinTestCase struct { ...@@ -43,6 +44,7 @@ type joinTestCase struct {
types []types.Type types []types.Type
proc *process.Process proc *process.Process
cancel context.CancelFunc cancel context.CancelFunc
barg *hashbuild.Argument
} }
var ( var (
...@@ -74,6 +76,7 @@ func TestPrepare(t *testing.T) { ...@@ -74,6 +76,7 @@ func TestPrepare(t *testing.T) {
func TestJoin(t *testing.T) { func TestJoin(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -82,10 +85,7 @@ func TestJoin(t *testing.T) { ...@@ -82,10 +85,7 @@ func TestJoin(t *testing.T) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -125,6 +125,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -125,6 +125,7 @@ func BenchmarkJoin(b *testing.B) {
} }
t := new(testing.T) t := new(testing.T)
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -133,9 +134,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -133,9 +134,7 @@ func BenchmarkJoin(b *testing.B) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -202,12 +201,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32) joinT ...@@ -202,12 +201,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32) joinT
proc: proc, proc: proc,
cancel: cancel, cancel: cancel,
arg: &Argument{ arg: &Argument{
Typs: ts,
Cond: cond, Cond: cond,
Result: rp, Result: rp,
}, },
barg: &hashbuild.Argument{
Typs: ts,
},
} }
} }
func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch {
err := hashbuild.Prepare(tc.proc, tc.barg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil
ok, err := hashbuild.Call(0, tc.proc, tc.barg)
require.NoError(t, err)
require.Equal(t, true, ok)
return tc.proc.Reg.InputBatch
}
// create a new block based on the type information, flgs[i] == ture: has null // create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
return testutil.NewBatch(ts, false, int(rows), proc.Mp) return testutil.NewBatch(ts, false, int(rows), proc.Mp)
......
...@@ -16,6 +16,7 @@ package loopanti ...@@ -16,6 +16,7 @@ package loopanti
import ( import (
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/plan"
) )
...@@ -34,4 +35,5 @@ type Argument struct { ...@@ -34,4 +35,5 @@ type Argument struct {
ctr *container ctr *container
Result []int32 Result []int32
Cond *plan.Expr Cond *plan.Expr
Typs []types.Type
} }
...@@ -77,31 +77,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { ...@@ -77,31 +77,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) {
} }
func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error {
var err error bat := <-proc.Reg.MergeReceivers[1].Ch
if bat != nil {
for { ctr.bat = bat
bat := <-proc.Reg.MergeReceivers[1].Ch
if bat == nil {
break
}
if bat.Length() == 0 {
continue
}
if ctr.bat == nil {
ctr.bat = batch.NewWithSize(len(bat.Vecs))
for i, vec := range bat.Vecs {
ctr.bat.Vecs[i] = vector.New(vec.Typ)
}
ctr.bat.Zs = proc.GetMheap().GetSels()
}
anal.Input(bat)
anal.Alloc(int64(bat.Size()))
if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil {
bat.Clean(proc.GetMheap())
ctr.bat.Clean(proc.GetMheap())
return err
}
bat.Clean(proc.GetMheap())
} }
return nil return nil
} }
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mheap"
...@@ -43,6 +44,7 @@ type joinTestCase struct { ...@@ -43,6 +44,7 @@ type joinTestCase struct {
types []types.Type types []types.Type
proc *process.Process proc *process.Process
cancel context.CancelFunc cancel context.CancelFunc
barg *hashbuild.Argument
} }
var ( var (
...@@ -54,7 +56,6 @@ func init() { ...@@ -54,7 +56,6 @@ func init() {
gm := guest.New(1<<30, hm) gm := guest.New(1<<30, hm)
tcs = []joinTestCase{ tcs = []joinTestCase{
newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}), newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
// newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
} }
} }
...@@ -74,6 +75,7 @@ func TestPrepare(t *testing.T) { ...@@ -74,6 +75,7 @@ func TestPrepare(t *testing.T) {
func TestJoin(t *testing.T) { func TestJoin(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -82,10 +84,7 @@ func TestJoin(t *testing.T) { ...@@ -82,10 +84,7 @@ func TestJoin(t *testing.T) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -106,6 +105,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -106,6 +105,7 @@ func BenchmarkJoin(b *testing.B) {
} }
t := new(testing.T) t := new(testing.T)
for _, tc := range tcs { for _, tc := range tcs {
bat := hashBuild(t, tc)
err := Prepare(tc.proc, tc.arg) err := Prepare(tc.proc, tc.arg)
require.NoError(t, err) require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
...@@ -114,9 +114,7 @@ func BenchmarkJoin(b *testing.B) { ...@@ -114,9 +114,7 @@ func BenchmarkJoin(b *testing.B) {
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil tc.proc.Reg.MergeReceivers[0].Ch <- nil
tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[1].Ch <- bat
tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
tc.proc.Reg.MergeReceivers[1].Ch <- nil
for { for {
if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
break break
...@@ -183,12 +181,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) j ...@@ -183,12 +181,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) j
proc: proc, proc: proc,
cancel: cancel, cancel: cancel,
arg: &Argument{ arg: &Argument{
Typs: ts,
Cond: cond, Cond: cond,
Result: rp, Result: rp,
}, },
barg: &hashbuild.Argument{
Typs: ts,
},
} }
} }
func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch {
err := hashbuild.Prepare(tc.proc, tc.barg)
require.NoError(t, err)
tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
tc.proc.Reg.MergeReceivers[0].Ch <- nil
ok, err := hashbuild.Call(0, tc.proc, tc.barg)
require.NoError(t, err)
require.Equal(t, true, ok)
return tc.proc.Reg.InputBatch
}
// create a new block based on the type information, flgs[i] == ture: has null // create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
return testutil.NewBatch(ts, false, int(rows), proc.Mp) return testutil.NewBatch(ts, false, int(rows), proc.Mp)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment