From ce0e5d07844f63670e4cdfc044733e7b33a5abcf Mon Sep 17 00:00:00 2001 From: nnsgmsone <nnsmgsone@outlook.com> Date: Tue, 16 Aug 2022 21:22:40 +0800 Subject: [PATCH] Add hashbuild for pushdown (#4536) Approved by: @yingfeng --- pkg/common/hashmap/joinmap.go | 79 +++++ pkg/common/hashmap/types.go | 11 + pkg/sql/colexec/anti/{anti.go => join.go} | 103 ++----- .../anti/{anti_test.go => join_test.go} | 29 +- pkg/sql/colexec/anti/types.go | 7 +- pkg/sql/colexec/dispatch/dispatch.go | 9 +- pkg/sql/colexec/hashbuild/build.go | 170 +++++++++++ pkg/sql/colexec/hashbuild/build_test.go | 158 ++++++++++ pkg/sql/colexec/{union => hashbuild}/types.go | 34 ++- pkg/sql/colexec/join/join.go | 92 +----- pkg/sql/colexec/join/join_test.go | 29 +- pkg/sql/colexec/join/types.go | 6 +- pkg/sql/colexec/left/join.go | 92 ++---- pkg/sql/colexec/left/join_test.go | 28 +- pkg/sql/colexec/left/types.go | 6 +- .../colexec/loopanti/{loopanti.go => join.go} | 28 +- .../{loopanti_test.go => join_test.go} | 28 +- pkg/sql/colexec/loopanti/types.go | 2 + pkg/sql/colexec/loopjoin/join.go | 28 +- pkg/sql/colexec/loopjoin/join_test.go | 29 +- pkg/sql/colexec/loopjoin/types.go | 2 + pkg/sql/colexec/loopleft/join.go | 21 +- pkg/sql/colexec/loopleft/join_test.go | 27 +- pkg/sql/colexec/loopsemi/join.go | 30 +- pkg/sql/colexec/loopsemi/join_test.go | 28 +- pkg/sql/colexec/loopsemi/types.go | 2 + pkg/sql/colexec/loopsingle/join.go | 23 +- pkg/sql/colexec/loopsingle/join_test.go | 27 +- pkg/sql/colexec/loopsingle/types.go | 2 +- pkg/sql/colexec/mergegroup/group.go | 2 - pkg/sql/colexec/product/product.go | 28 +- pkg/sql/colexec/product/product_test.go | 28 +- pkg/sql/colexec/product/types.go | 2 + pkg/sql/colexec/semi/join.go | 80 +---- pkg/sql/colexec/semi/join_test.go | 29 +- pkg/sql/colexec/semi/types.go | 9 +- pkg/sql/colexec/single/join.go | 86 ++---- pkg/sql/colexec/single/join_test.go | 48 ++- pkg/sql/colexec/single/types.go | 4 +- pkg/sql/colexec/union/union.go | 139 --------- pkg/sql/colexec/union/union_test.go | 130 -------- pkg/sql/compile/compile.go | 83 ++++-- pkg/sql/compile/operator.go | 101 ++++++- pkg/sql/compile/scope.go | 277 ++++++++++-------- pkg/sql/compile/types.go | 3 + pkg/vm/overload.go | 11 +- pkg/vm/process/process.go | 1 + pkg/vm/process/types.go | 3 +- pkg/vm/types.go | 3 +- 49 files changed, 1132 insertions(+), 1065 deletions(-) create mode 100644 pkg/common/hashmap/joinmap.go rename pkg/sql/colexec/anti/{anti.go => join.go} (74%) rename pkg/sql/colexec/anti/{anti_test.go => join_test.go} (88%) create mode 100644 pkg/sql/colexec/hashbuild/build.go create mode 100644 pkg/sql/colexec/hashbuild/build_test.go rename pkg/sql/colexec/{union => hashbuild}/types.go (60%) rename pkg/sql/colexec/loopanti/{loopanti.go => join.go} (87%) rename pkg/sql/colexec/loopanti/{loopanti_test.go => join_test.go} (89%) delete mode 100644 pkg/sql/colexec/union/union.go delete mode 100644 pkg/sql/colexec/union/union_test.go diff --git a/pkg/common/hashmap/joinmap.go b/pkg/common/hashmap/joinmap.go new file mode 100644 index 000000000..1997e0b0e --- /dev/null +++ b/pkg/common/hashmap/joinmap.go @@ -0,0 +1,79 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hashmap + +import ( + "sync/atomic" + + "github.com/matrixorigin/matrixone/pkg/pb/plan" +) + +func NewJoinMap(sels [][]int64, expr *plan.Expr, mp *StrHashMap, hasNull bool) *JoinMap { + return &JoinMap{ + cnt: 1, + mp: mp, + expr: expr, + sels: sels, + hasNull: hasNull, + } +} + +func (jm *JoinMap) Sels() [][]int64 { + return jm.sels +} + +func (jm *JoinMap) Map() *StrHashMap { + return jm.mp +} + +func (jm *JoinMap) Expr() *plan.Expr { + return jm.expr +} + +func (jm *JoinMap) HasNull() bool { + return jm.hasNull +} + +func (jm *JoinMap) Dup() *JoinMap { + m0 := &StrHashMap{ + m: jm.mp.m, + hashMap: jm.mp.hashMap, + hasNull: jm.mp.hasNull, + ibucket: jm.mp.ibucket, + nbucket: jm.mp.nbucket, + values: make([]uint64, UnitLimit), + zValues: make([]int64, UnitLimit), + keys: make([][]byte, UnitLimit), + strHashStates: make([][3]uint64, UnitLimit), + } + return &JoinMap{ + mp: m0, + expr: jm.expr, + sels: jm.sels, + hasNull: jm.hasNull, + cnt: atomic.LoadInt64(&jm.cnt), + } +} + +func (jm *JoinMap) IncRef(ref int64) { + atomic.AddInt64(&jm.cnt, ref) +} + +func (jm *JoinMap) Free() { + if atomic.AddInt64(&jm.cnt, -1) != 0 { + return + } + jm.mp.Free() +} diff --git a/pkg/common/hashmap/types.go b/pkg/common/hashmap/types.go index 8027a968f..092c39d09 100644 --- a/pkg/common/hashmap/types.go +++ b/pkg/common/hashmap/types.go @@ -17,6 +17,7 @@ package hashmap import ( "github.com/matrixorigin/matrixone/pkg/container/hashtable" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/vm/mheap" ) @@ -58,6 +59,16 @@ type Iterator interface { Find(start, count int, vecs []*vector.Vector, inBuckets []uint8) (vs []uint64, zvs []int64) } +// JoinMap is used for join +type JoinMap struct { + cnt int64 + sels [][]int64 + // push-down filter expression, possibly a bloomfilter + expr *plan.Expr + mp *StrHashMap + hasNull bool +} + // StrHashMap key is []byte, value is an uint64 value (starting from 1) // // each time a new key is inserted, the hashtable returns a last-value+1 or, if the old key is inserted, the value corresponding to that key diff --git a/pkg/sql/colexec/anti/anti.go b/pkg/sql/colexec/anti/join.go similarity index 74% rename from pkg/sql/colexec/anti/anti.go rename to pkg/sql/colexec/anti/join.go index 9807c7b24..46619cc12 100644 --- a/pkg/sql/colexec/anti/anti.go +++ b/pkg/sql/colexec/anti/join.go @@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { } func Prepare(proc *process.Process, arg any) error { - var err error - ap := arg.(*Argument) ap.ctr = new(container) - if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { - return err - } ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) - ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) + ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) return nil } @@ -54,8 +49,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { case Build: if err := ctr.build(ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } return true, err } ctr.state = Probe @@ -63,8 +59,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { bat := <-proc.Reg.MergeReceivers[0].Ch if bat == nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } if ctr.bat != nil { ctr.bat.Clean(proc.GetMheap()) } @@ -73,19 +70,21 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if bat.Length() == 0 { continue } - if ctr.bat == nil { + if ctr.bat == nil || ctr.bat.Length() == 0 { if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } proc.SetInputBatch(nil) return true, err } } else { if err := ctr.probe(bat, ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } proc.SetInputBatch(nil) return true, err } @@ -99,66 +98,11 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - if ctr.bat != nil { - ctr.bat.ExpandNulls() - } - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) - } - if ctr.bat == nil || ctr.bat.Length() == 0 { - return nil - } - if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil { - return err - } - defer ctr.freeJoinCondition(proc) - - itr := ctr.mp.NewIterator() - count := ctr.bat.Length() - for i := 0; i < count; i += hashmap.UnitLimit { - n := count - i - if n > hashmap.UnitLimit { - n = hashmap.UnitLimit - } - rows := ctr.mp.GroupCount() - vals, zvals, err := itr.Insert(i, n, ctr.vecs) - if err != nil { - ctr.bat.Clean(proc.GetMheap()) - return err - } - for k, v := range vals[:n] { - if zvals[k] == 0 { - ctr.hasNull = true - continue - } - if v > rows { - ctr.sels = append(ctr.sels, proc.GetMheap().GetSels()) - } - ai := int64(v) - 1 - ctr.sels[ai] = append(ctr.sels[ai], int64(i+k)) - } + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat + ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup() + ctr.hasNull = ctr.mp.HasNull() } return nil } @@ -211,7 +155,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() - itr := ctr.mp.NewIterator() + itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i if n > hashmap.UnitLimit { @@ -275,10 +219,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { } } } - -func (ctr *container) freeSels(proc *process.Process) { - for i := range ctr.sels { - proc.GetMheap().PutSels(ctr.sels[i]) - } - ctr.sels = nil -} diff --git a/pkg/sql/colexec/anti/anti_test.go b/pkg/sql/colexec/anti/join_test.go similarity index 88% rename from pkg/sql/colexec/anti/anti_test.go rename to pkg/sql/colexec/anti/join_test.go index 743335313..84e5d938c 100644 --- a/pkg/sql/colexec/anti/anti_test.go +++ b/pkg/sql/colexec/anti/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -40,6 +41,7 @@ type antiTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -78,6 +80,7 @@ func TestString(t *testing.T) { func TestAnti(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -86,9 +89,7 @@ func TestAnti(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -141,6 +142,7 @@ func BenchmarkAnti(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -149,9 +151,7 @@ func BenchmarkAnti(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -196,12 +196,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Result: rp, Conditions: cs, }, + barg: &hashbuild.Argument{ + Typs: ts, + NeedHashMap: true, + Conditions: cs[1], + }, } } +func hashBuild(t *testing.T, tc antiTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/anti/types.go b/pkg/sql/colexec/anti/types.go index e52f5b514..c05312d28 100644 --- a/pkg/sql/colexec/anti/types.go +++ b/pkg/sql/colexec/anti/types.go @@ -17,6 +17,7 @@ package anti import ( "github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/sql/plan" ) @@ -37,15 +38,14 @@ type container struct { hasNull bool - sels [][]int64 - inBuckets []uint8 bat *batch.Batch evecs []evalVector vecs []*vector.Vector - mp *hashmap.StrHashMap + + mp *hashmap.JoinMap } type Argument struct { @@ -53,5 +53,6 @@ type Argument struct { Ibucket uint64 Nbucket uint64 Result []int32 + Typs []types.Type Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/dispatch/dispatch.go b/pkg/sql/colexec/dispatch/dispatch.go index 0b6462583..208c041bf 100644 --- a/pkg/sql/colexec/dispatch/dispatch.go +++ b/pkg/sql/colexec/dispatch/dispatch.go @@ -18,6 +18,7 @@ import ( "bytes" "sync/atomic" + "github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -42,7 +43,7 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) { case reg.Ch <- nil: } } - return false, nil + return true, nil } vecs := ap.vecs[:0] for i := range bat.Vecs { @@ -62,6 +63,12 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) { } if ap.All { atomic.AddInt64(&bat.Cnt, int64(len(ap.Regs))-1) + if bat.Ht != nil { + jm, ok := bat.Ht.(*hashmap.JoinMap) + if ok { + jm.IncRef(int64(len(ap.Regs)) - 1) + } + } for _, reg := range ap.Regs { select { case <-reg.Ctx.Done(): diff --git a/pkg/sql/colexec/hashbuild/build.go b/pkg/sql/colexec/hashbuild/build.go new file mode 100644 index 000000000..a77ad888f --- /dev/null +++ b/pkg/sql/colexec/hashbuild/build.go @@ -0,0 +1,170 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hashbuild + +import ( + "bytes" + + "github.com/matrixorigin/matrixone/pkg/common/hashmap" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/sql/plan" + "github.com/matrixorigin/matrixone/pkg/vm/process" +) + +func String(_ any, buf *bytes.Buffer) { + buf.WriteString(" hash build ") +} + +func Prepare(proc *process.Process, arg any) error { + var err error + + ap := arg.(*Argument) + ap.ctr = new(container) + if ap.NeedHashMap { + if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { + return err + } + ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions)) + ap.ctr.evecs = make([]evalVector, len(ap.Conditions)) + } + ap.ctr.bat = batch.NewWithSize(len(ap.Typs)) + ap.ctr.bat.Zs = proc.GetMheap().GetSels() + for i, typ := range ap.Typs { + ap.ctr.bat.Vecs[i] = vector.New(typ) + } + return nil +} + +func Call(idx int, proc *process.Process, arg any) (bool, error) { + anal := proc.GetAnalyze(idx) + anal.Start() + defer anal.Stop() + ap := arg.(*Argument) + ctr := ap.ctr + for { + switch ctr.state { + case Build: + if err := ctr.build(ap, proc, anal); err != nil { + ctr.state = End + ctr.mp.Free() + return true, err + } + ctr.state = End + default: + if ctr.bat != nil { + if ap.NeedHashMap { + ctr.bat.Ht = hashmap.NewJoinMap(ctr.sels, nil, ctr.mp, ctr.hasNull) + } + proc.SetInputBatch(ctr.bat) + ctr.bat = nil + } else { + proc.SetInputBatch(nil) + } + return true, nil + } + } +} + +func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { + var err error + + for { + bat := <-proc.Reg.MergeReceivers[0].Ch + if bat == nil { + break + } + if bat.Length() == 0 { + continue + } + anal.Input(bat) + anal.Alloc(int64(bat.Size())) + if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { + bat.Clean(proc.GetMheap()) + ctr.bat.Clean(proc.GetMheap()) + return err + } + bat.Clean(proc.GetMheap()) + } + if ctr.bat == nil || ctr.bat.Length() == 0 || !ap.NeedHashMap { + return nil + } + if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions, proc); err != nil { + return err + } + defer ctr.freeJoinCondition(proc) + + itr := ctr.mp.NewIterator() + count := ctr.bat.Length() + for i := 0; i < count; i += hashmap.UnitLimit { + n := count - i + if n > hashmap.UnitLimit { + n = hashmap.UnitLimit + } + rows := ctr.mp.GroupCount() + vals, zvals, err := itr.Insert(i, n, ctr.vecs) + if err != nil { + return err + } + for k, v := range vals[:n] { + if zvals[k] == 0 { + ctr.hasNull = true + continue + } + if v == 0 { + continue + } + if v > rows { + ctr.sels = append(ctr.sels, make([]int64, 0, 8)) + } + ai := int64(v) - 1 + ctr.sels[ai] = append(ctr.sels[ai], int64(i+k)) + } + } + return nil +} + +func (ctr *container) evalJoinCondition(bat *batch.Batch, conds []*plan.Expr, proc *process.Process) error { + for i, cond := range conds { + vec, err := colexec.EvalExpr(bat, proc, cond) + if err != nil || vec.ConstExpand(proc.GetMheap()) == nil { + for j := 0; j < i; j++ { + if ctr.evecs[j].needFree { + vector.Clean(ctr.evecs[j].vec, proc.GetMheap()) + } + } + return err + } + ctr.vecs[i] = vec + ctr.evecs[i].vec = vec + ctr.evecs[i].needFree = true + for j := range bat.Vecs { + if bat.Vecs[j] == vec { + ctr.evecs[i].needFree = false + break + } + } + } + return nil +} + +func (ctr *container) freeJoinCondition(proc *process.Process) { + for i := range ctr.evecs { + if ctr.evecs[i].needFree { + ctr.evecs[i].vec.Free(proc.GetMheap()) + } + } +} diff --git a/pkg/sql/colexec/hashbuild/build_test.go b/pkg/sql/colexec/hashbuild/build_test.go new file mode 100644 index 000000000..36341b331 --- /dev/null +++ b/pkg/sql/colexec/hashbuild/build_test.go @@ -0,0 +1,158 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hashbuild + +import ( + "bytes" + "context" + "testing" + + "github.com/matrixorigin/matrixone/pkg/common/hashmap" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/testutil" + "github.com/matrixorigin/matrixone/pkg/vm/mheap" + "github.com/matrixorigin/matrixone/pkg/vm/process" + "github.com/stretchr/testify/require" +) + +const ( + Rows = 10 // default rows + BenchmarkRows = 100000 // default rows for benchmark +) + +// add unit tests for cases +type buildTestCase struct { + arg *Argument + flgs []bool // flgs[i] == true: nullable + types []types.Type + proc *process.Process + cancel context.CancelFunc +} + +var ( + tcs []buildTestCase +) + +func init() { + tcs = []buildTestCase{ + newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, + []*plan.Expr{ + newExpr(0, types.Type{Oid: types.T_int8}), + }), + newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, + []*plan.Expr{ + newExpr(0, types.Type{Oid: types.T_int8}), + }), + } +} + +func TestString(t *testing.T) { + buf := new(bytes.Buffer) + for _, tc := range tcs { + String(tc.arg, buf) + } +} + +func TestBuild(t *testing.T) { + for _, tc := range tcs[:1] { + err := Prepare(tc.proc, tc.arg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{} + tc.proc.Reg.MergeReceivers[0].Ch <- nil + for { + ok, err := Call(0, tc.proc, tc.arg) + require.NoError(t, err) + require.Equal(t, true, ok) + mp := tc.proc.Reg.InputBatch.Ht.(*hashmap.JoinMap) + mp.Free() + tc.proc.Reg.InputBatch.Clean(tc.proc.Mp) + break + } + require.Equal(t, int64(0), mheap.Size(tc.proc.Mp)) + } +} + +func BenchmarkBuild(b *testing.B) { + for i := 0; i < b.N; i++ { + tcs = []buildTestCase{ + newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_int8}}, + []*plan.Expr{ + newExpr(0, types.Type{Oid: types.T_int8}), + }), + } + t := new(testing.T) + for _, tc := range tcs { + err := Prepare(tc.proc, tc.arg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{} + tc.proc.Reg.MergeReceivers[0].Ch <- nil + for { + ok, err := Call(0, tc.proc, tc.arg) + require.NoError(t, err) + require.Equal(t, true, ok) + mp := tc.proc.Reg.InputBatch.Ht.(*hashmap.JoinMap) + mp.Free() + tc.proc.Reg.InputBatch.Clean(tc.proc.Mp) + break + } + } + } +} + +func newExpr(pos int32, typ types.Type) *plan.Expr { + return &plan.Expr{ + Typ: &plan.Type{ + Size: typ.Size, + Scale: typ.Scale, + Width: typ.Width, + Id: int32(typ.Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + ColPos: pos, + }, + }, + } +} + +func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, cs []*plan.Expr) buildTestCase { + proc := process.New(m) + proc.Reg.MergeReceivers = make([]*process.WaitRegister, 1) + ctx, cancel := context.WithCancel(context.Background()) + proc.Reg.MergeReceivers[0] = &process.WaitRegister{ + Ctx: ctx, + Ch: make(chan *batch.Batch, 10), + } + return buildTestCase{ + types: ts, + flgs: flgs, + proc: proc, + cancel: cancel, + arg: &Argument{ + Typs: ts, + Conditions: cs, + NeedHashMap: true, + }, + } +} + +// create a new block based on the type information, flgs[i] == ture: has null +func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { + return testutil.NewBatch(ts, false, int(rows), proc.Mp) +} diff --git a/pkg/sql/colexec/union/types.go b/pkg/sql/colexec/hashbuild/types.go similarity index 60% rename from pkg/sql/colexec/union/types.go rename to pkg/sql/colexec/hashbuild/types.go index d321208a1..74eed2381 100644 --- a/pkg/sql/colexec/union/types.go +++ b/pkg/sql/colexec/hashbuild/types.go @@ -12,32 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -package union +package hashbuild import ( "github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/sql/plan" ) const ( Build = iota - Probe End ) +type evalVector struct { + needFree bool + vec *vector.Vector +} + type container struct { state int - // hash table related. - hashTable *hashmap.StrHashMap + hasNull bool + + sels [][]int64 - // bat records the final result of union operator. bat *batch.Batch + + evecs []evalVector + vecs []*vector.Vector + + mp *hashmap.StrHashMap } type Argument struct { - // attribute which need not do serialization work - ctr *container - Ibucket uint64 // index in buckets - Nbucket uint64 // buckets count + ctr *container + // need to generate a push-down filter expression + NeedExpr bool + NeedHashMap bool + Ibucket uint64 + Nbucket uint64 + Typs []types.Type + Conditions []*plan.Expr } diff --git a/pkg/sql/colexec/join/join.go b/pkg/sql/colexec/join/join.go index 7ed700d00..39ab94d56 100644 --- a/pkg/sql/colexec/join/join.go +++ b/pkg/sql/colexec/join/join.go @@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { } func Prepare(proc *process.Process, arg any) error { - var err error - ap := arg.(*Argument) ap.ctr = new(container) - if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { - return err - } ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) - ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) + ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) return nil } @@ -55,7 +50,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if err := ctr.build(ap, proc, anal); err != nil { ctr.state = End ctr.mp.Free() - ctr.freeSels(proc) return true, err } ctr.state = Probe @@ -64,7 +58,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if bat == nil { ctr.state = End ctr.mp.Free() - ctr.freeSels(proc) if ctr.bat != nil { ctr.bat.Clean(proc.GetMheap()) } @@ -73,14 +66,13 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if bat.Length() == 0 { continue } - if ctr.bat == nil { + if ctr.bat == nil || ctr.bat.Length() == 0 { bat.Clean(proc.GetMheap()) continue } if err := ctr.probe(bat, ap, proc, anal); err != nil { ctr.state = End ctr.mp.Free() - ctr.freeSels(proc) proc.SetInputBatch(nil) return true, err } @@ -93,63 +85,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - ctr.bat.Zs = proc.GetMheap().GetSels() - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) - } - if ctr.bat == nil || ctr.bat.Length() == 0 { - return nil - } - if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil { - return err - } - defer ctr.freeJoinCondition(proc) - - itr := ctr.mp.NewIterator() - count := ctr.bat.Length() - for i := 0; i < count; i += hashmap.UnitLimit { - n := count - i - if n > hashmap.UnitLimit { - n = hashmap.UnitLimit - } - rows := ctr.mp.GroupCount() - vals, zvals, err := itr.Insert(i, n, ctr.vecs) - if err != nil { - return err - } - for k, v := range vals { - if zvals[k] == 0 { - continue - } - if v > rows { - ctr.sels = append(ctr.sels, proc.GetMheap().GetSels()) - } - ai := int64(v) - 1 - ctr.sels[ai] = append(ctr.sels[ai], int64(i+k)) - } - } + bat := <-proc.Reg.MergeReceivers[1].Ch + ctr.bat = bat + ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup() return nil } @@ -170,7 +108,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() - itr := ctr.mp.NewIterator() + mSels := ctr.mp.Sels() + itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i if n > hashmap.UnitLimit { @@ -179,16 +118,10 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces copy(ctr.inBuckets, hashmap.OneUInt8s) vals, zvals := itr.Find(i, n, ctr.vecs, ctr.inBuckets) for k := 0; k < n; k++ { - if ctr.inBuckets[k] == 0 { + if ctr.inBuckets[k] == 0 || zvals[k] == 0 || vals[k] == 0 { continue } - if zvals[k] == 0 { - continue - } - if vals[k] == 0 { - continue - } - sels := ctr.sels[vals[k]-1] + sels := mSels[vals[k]-1] for _, sel := range sels { for j, rp := range ap.Result { if rp.Rel == 0 { @@ -244,10 +177,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { } } } - -func (ctr *container) freeSels(proc *process.Process) { - for i := range ctr.sels { - proc.GetMheap().PutSels(ctr.sels[i]) - } - ctr.sels = nil -} diff --git a/pkg/sql/colexec/join/join_test.go b/pkg/sql/colexec/join/join_test.go index 5cf0e536b..864ae79af 100644 --- a/pkg/sql/colexec/join/join_test.go +++ b/pkg/sql/colexec/join/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -40,6 +41,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -87,6 +89,7 @@ func TestString(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -95,9 +98,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -132,6 +133,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -140,9 +142,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -187,12 +187,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Result: rp, Conditions: cs, }, + barg: &hashbuild.Argument{ + Typs: ts, + NeedHashMap: true, + Conditions: cs[1], + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/join/types.go b/pkg/sql/colexec/join/types.go index 3de60eb56..4e433a4e8 100644 --- a/pkg/sql/colexec/join/types.go +++ b/pkg/sql/colexec/join/types.go @@ -17,6 +17,7 @@ package join import ( "github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/sql/plan" ) @@ -35,8 +36,6 @@ type evalVector struct { type container struct { state int - sels [][]int64 - inBuckets []uint8 bat *batch.Batch @@ -44,7 +43,7 @@ type container struct { evecs []evalVector vecs []*vector.Vector - mp *hashmap.StrHashMap + mp *hashmap.JoinMap } type ResultPos struct { @@ -57,5 +56,6 @@ type Argument struct { Ibucket uint64 // index in buckets Nbucket uint64 // buckets count Result []ResultPos + Typs []types.Type Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/left/join.go b/pkg/sql/colexec/left/join.go index 255997c1c..ca7186470 100644 --- a/pkg/sql/colexec/left/join.go +++ b/pkg/sql/colexec/left/join.go @@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { } func Prepare(proc *process.Process, arg any) error { - var err error - ap := arg.(*Argument) ap.ctr = new(container) - if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { - return err - } ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) - ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) + ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.bat = batch.NewWithSize(len(ap.Typs)) ap.ctr.bat.Zs = proc.GetMheap().GetSels() for i, typ := range ap.Typs { @@ -59,8 +54,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { case Build: if err := ctr.build(ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } return true, err } ctr.state = Probe @@ -68,8 +64,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { bat := <-proc.Reg.MergeReceivers[0].Ch if bat == nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } if ctr.bat != nil { ctr.bat.Clean(proc.GetMheap()) } @@ -81,8 +78,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if ctr.bat.Length() == 0 { if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } proc.SetInputBatch(nil) return true, err } @@ -90,8 +88,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } else { if err := ctr.probe(bat, ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() - ctr.freeSels(proc) + if ctr.mp != nil { + ctr.mp.Free() + } proc.SetInputBatch(nil) return true, err } @@ -105,55 +104,10 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) - } - if ctr.bat == nil || ctr.bat.Length() == 0 { - return nil - } - if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil { - return err - } - defer ctr.freeJoinCondition(proc) - - itr := ctr.mp.NewIterator() - count := ctr.bat.Length() - for i := 0; i < count; i += hashmap.UnitLimit { - n := count - i - if n > hashmap.UnitLimit { - n = hashmap.UnitLimit - } - rows := ctr.mp.GroupCount() - vals, zvals, err := itr.Insert(i, n, ctr.vecs) - if err != nil { - return err - } - for k, v := range vals[:n] { - if zvals[k] == 0 { - continue - } - if v > rows { - ctr.sels = append(ctr.sels, proc.GetMheap().GetSels()) - } - ai := int64(v) - 1 - ctr.sels[ai] = append(ctr.sels[ai], int64(i+k)) - } + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat + ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup() } return nil } @@ -216,7 +170,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() - itr := ctr.mp.NewIterator() + mSels := ctr.mp.Sels() + itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i if n > hashmap.UnitLimit { @@ -245,7 +200,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) continue } - sels := ctr.sels[vals[k]-1] + sels := mSels[vals[k]-1] for _, sel := range sels { for j, rp := range ap.Result { if rp.Rel == 0 { @@ -301,10 +256,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { } } } - -func (ctr *container) freeSels(proc *process.Process) { - for i := range ctr.sels { - proc.GetMheap().PutSels(ctr.sels[i]) - } - ctr.sels = nil -} diff --git a/pkg/sql/colexec/left/join_test.go b/pkg/sql/colexec/left/join_test.go index 1e68302b3..59070d815 100644 --- a/pkg/sql/colexec/left/join_test.go +++ b/pkg/sql/colexec/left/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -40,6 +41,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -87,6 +89,7 @@ func TestString(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -95,9 +98,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -151,6 +152,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -159,9 +161,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -210,9 +210,25 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Result: rp, Conditions: cs, }, + barg: &hashbuild.Argument{ + Typs: ts, + NeedHashMap: true, + Conditions: cs[1], + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/left/types.go b/pkg/sql/colexec/left/types.go index a93a0ddfd..436031700 100644 --- a/pkg/sql/colexec/left/types.go +++ b/pkg/sql/colexec/left/types.go @@ -36,8 +36,6 @@ type evalVector struct { type container struct { state int - sels [][]int64 - inBuckets []uint8 bat *batch.Batch @@ -45,7 +43,7 @@ type container struct { evecs []evalVector vecs []*vector.Vector - mp *hashmap.StrHashMap + mp *hashmap.JoinMap } type ResultPos struct { @@ -57,7 +55,7 @@ type Argument struct { ctr *container Ibucket uint64 Nbucket uint64 - Typs []types.Type Result []ResultPos + Typs []types.Type Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/loopanti/loopanti.go b/pkg/sql/colexec/loopanti/join.go similarity index 87% rename from pkg/sql/colexec/loopanti/loopanti.go rename to pkg/sql/colexec/loopanti/join.go index 042f0c899..296e37f87 100644 --- a/pkg/sql/colexec/loopanti/loopanti.go +++ b/pkg/sql/colexec/loopanti/join.go @@ -83,31 +83,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - ctr.bat.Zs = proc.GetMheap().GetSels() - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat } return nil } diff --git a/pkg/sql/colexec/loopanti/loopanti_test.go b/pkg/sql/colexec/loopanti/join_test.go similarity index 89% rename from pkg/sql/colexec/loopanti/loopanti_test.go rename to pkg/sql/colexec/loopanti/join_test.go index f4797dbd1..f23389d4d 100644 --- a/pkg/sql/colexec/loopanti/loopanti_test.go +++ b/pkg/sql/colexec/loopanti/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" @@ -43,6 +44,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -74,6 +76,7 @@ func TestPrepare(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -82,10 +85,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -125,6 +125,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -133,9 +134,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -202,12 +201,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32) joinT proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Cond: cond, Result: rp, }, + barg: &hashbuild.Argument{ + Typs: ts, + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/loopanti/types.go b/pkg/sql/colexec/loopanti/types.go index b10680e01..a495d7853 100644 --- a/pkg/sql/colexec/loopanti/types.go +++ b/pkg/sql/colexec/loopanti/types.go @@ -16,6 +16,7 @@ package loopanti import ( "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" ) @@ -34,4 +35,5 @@ type Argument struct { ctr *container Result []int32 Cond *plan.Expr + Typs []types.Type } diff --git a/pkg/sql/colexec/loopjoin/join.go b/pkg/sql/colexec/loopjoin/join.go index 6b9e48b5c..b15035da4 100644 --- a/pkg/sql/colexec/loopjoin/join.go +++ b/pkg/sql/colexec/loopjoin/join.go @@ -77,31 +77,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - ctr.bat.Zs = proc.GetMheap().GetSels() - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat } return nil } diff --git a/pkg/sql/colexec/loopjoin/join_test.go b/pkg/sql/colexec/loopjoin/join_test.go index 2e573df62..652d7a77b 100644 --- a/pkg/sql/colexec/loopjoin/join_test.go +++ b/pkg/sql/colexec/loopjoin/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" @@ -43,6 +44,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -54,7 +56,6 @@ func init() { gm := guest.New(1<<30, hm) tcs = []joinTestCase{ newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}), - // newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}), } } @@ -74,6 +75,7 @@ func TestPrepare(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -82,10 +84,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -106,6 +105,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -114,9 +114,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -183,12 +181,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) j proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Cond: cond, Result: rp, }, + barg: &hashbuild.Argument{ + Typs: ts, + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/loopjoin/types.go b/pkg/sql/colexec/loopjoin/types.go index bc2046cd9..4c9c75652 100644 --- a/pkg/sql/colexec/loopjoin/types.go +++ b/pkg/sql/colexec/loopjoin/types.go @@ -16,6 +16,7 @@ package loopjoin import ( "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" ) @@ -39,4 +40,5 @@ type Argument struct { ctr *container Cond *plan.Expr Result []ResultPos + Typs []types.Type } diff --git a/pkg/sql/colexec/loopleft/join.go b/pkg/sql/colexec/loopleft/join.go index bda5d6072..fe7c35dcc 100644 --- a/pkg/sql/colexec/loopleft/join.go +++ b/pkg/sql/colexec/loopleft/join.go @@ -88,24 +88,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat } return nil } diff --git a/pkg/sql/colexec/loopleft/join_test.go b/pkg/sql/colexec/loopleft/join_test.go index bff6db1c6..314e9f110 100644 --- a/pkg/sql/colexec/loopleft/join_test.go +++ b/pkg/sql/colexec/loopleft/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" @@ -43,6 +44,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -74,6 +76,7 @@ func TestPrepare(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -82,10 +85,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -125,6 +125,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -133,9 +134,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -206,9 +205,23 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) j Cond: cond, Result: rp, }, + barg: &hashbuild.Argument{ + Typs: ts, + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/loopsemi/join.go b/pkg/sql/colexec/loopsemi/join.go index e89b8efe7..bff9276d7 100644 --- a/pkg/sql/colexec/loopsemi/join.go +++ b/pkg/sql/colexec/loopsemi/join.go @@ -59,7 +59,7 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if bat.Length() == 0 { continue } - if ctr.bat == nil { + if ctr.bat == nil || ctr.bat.Length() == 0 { bat.Clean(proc.GetMheap()) continue } @@ -77,31 +77,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - ctr.bat.Zs = proc.GetMheap().GetSels() - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat } return nil } diff --git a/pkg/sql/colexec/loopsemi/join_test.go b/pkg/sql/colexec/loopsemi/join_test.go index dfd875186..8f7cb1613 100644 --- a/pkg/sql/colexec/loopsemi/join_test.go +++ b/pkg/sql/colexec/loopsemi/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" @@ -43,6 +44,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -74,6 +76,7 @@ func TestPrepare(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -82,10 +85,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -106,6 +106,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -114,9 +115,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -183,12 +182,27 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32) joinT proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Cond: cond, Result: rp, }, + barg: &hashbuild.Argument{ + Typs: ts, + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/loopsemi/types.go b/pkg/sql/colexec/loopsemi/types.go index d8e338704..4acce2f24 100644 --- a/pkg/sql/colexec/loopsemi/types.go +++ b/pkg/sql/colexec/loopsemi/types.go @@ -16,6 +16,7 @@ package loopsemi import ( "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" ) @@ -34,4 +35,5 @@ type Argument struct { ctr *container Result []int32 Cond *plan.Expr + Typs []types.Type } diff --git a/pkg/sql/colexec/loopsingle/join.go b/pkg/sql/colexec/loopsingle/join.go index c7203dd57..c78fa5d78 100644 --- a/pkg/sql/colexec/loopsingle/join.go +++ b/pkg/sql/colexec/loopsingle/join.go @@ -26,7 +26,7 @@ import ( ) func String(_ any, buf *bytes.Buffer) { - buf.WriteString(" loop left join ") + buf.WriteString(" loop single join ") } func Prepare(proc *process.Process, arg any) error { @@ -89,24 +89,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat } return nil } diff --git a/pkg/sql/colexec/loopsingle/join_test.go b/pkg/sql/colexec/loopsingle/join_test.go index f9e29da0a..40b719995 100644 --- a/pkg/sql/colexec/loopsingle/join_test.go +++ b/pkg/sql/colexec/loopsingle/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" @@ -43,6 +44,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -74,6 +76,7 @@ func TestPrepare(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -82,10 +85,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - //tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -125,6 +125,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -133,9 +134,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -206,9 +205,23 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) j Cond: cond, Result: rp, }, + barg: &hashbuild.Argument{ + Typs: ts, + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/loopsingle/types.go b/pkg/sql/colexec/loopsingle/types.go index c1b72b818..1efcbdb4a 100644 --- a/pkg/sql/colexec/loopsingle/types.go +++ b/pkg/sql/colexec/loopsingle/types.go @@ -38,7 +38,7 @@ type ResultPos struct { type Argument struct { ctr *container - Typs []types.Type Cond *plan.Expr Result []ResultPos + Typs []types.Type } diff --git a/pkg/sql/colexec/mergegroup/group.go b/pkg/sql/colexec/mergegroup/group.go index 359736ef8..2884d3873 100644 --- a/pkg/sql/colexec/mergegroup/group.go +++ b/pkg/sql/colexec/mergegroup/group.go @@ -207,7 +207,6 @@ func (ctr *container) processH8(bat *batch.Batch, proc *process.Process) error { } if flg { ctr.bat = bat - ctr.intHashMap.AddGroups(ctr.intHashMap.Cardinality()) } return nil } @@ -237,7 +236,6 @@ func (ctr *container) processHStr(bat *batch.Batch, proc *process.Process) error } if flg { ctr.bat = bat - ctr.strHashMap.AddGroups(ctr.strHashMap.Cardinality()) } return nil } diff --git a/pkg/sql/colexec/product/product.go b/pkg/sql/colexec/product/product.go index f19166278..803b2e02e 100644 --- a/pkg/sql/colexec/product/product.go +++ b/pkg/sql/colexec/product/product.go @@ -77,31 +77,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - ctr.bat.Zs = proc.GetMheap().GetSels() - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat } return nil } diff --git a/pkg/sql/colexec/product/product_test.go b/pkg/sql/colexec/product/product_test.go index 33cf8be5e..79444b2da 100644 --- a/pkg/sql/colexec/product/product_test.go +++ b/pkg/sql/colexec/product/product_test.go @@ -21,6 +21,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/mmu/guest" @@ -41,6 +42,7 @@ type productTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -72,6 +74,7 @@ func TestPrepare(t *testing.T) { func TestProduct(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -80,10 +83,7 @@ func TestProduct(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -104,6 +104,7 @@ func BenchmarkProduct(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -112,9 +113,7 @@ func BenchmarkProduct(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -143,11 +142,26 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) p proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Result: rp, }, + barg: &hashbuild.Argument{ + Typs: ts, + }, } } +func hashBuild(t *testing.T, tc productTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/product/types.go b/pkg/sql/colexec/product/types.go index 07e2bcbb9..b0e3568be 100644 --- a/pkg/sql/colexec/product/types.go +++ b/pkg/sql/colexec/product/types.go @@ -16,6 +16,7 @@ package product import ( "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" ) const ( @@ -37,4 +38,5 @@ type ResultPos struct { type Argument struct { ctr *container Result []ResultPos + Typs []types.Type } diff --git a/pkg/sql/colexec/semi/join.go b/pkg/sql/colexec/semi/join.go index 19b7ed8c9..1f5900608 100644 --- a/pkg/sql/colexec/semi/join.go +++ b/pkg/sql/colexec/semi/join.go @@ -30,16 +30,11 @@ func String(_ any, buf *bytes.Buffer) { } func Prepare(proc *process.Process, arg any) error { - var err error - ap := arg.(*Argument) ap.ctr = new(container) - if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { - return err - } ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) - ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) + ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) return nil } @@ -55,7 +50,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if err := ctr.build(ap, proc, anal); err != nil { ctr.state = End ctr.mp.Free() - ctr.freeSels(proc) return true, err } ctr.state = Probe @@ -64,7 +58,6 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if bat == nil { ctr.state = End ctr.mp.Free() - ctr.freeSels(proc) if ctr.bat != nil { ctr.bat.Clean(proc.GetMheap()) } @@ -73,14 +66,13 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if bat.Length() == 0 { continue } - if ctr.bat == nil { + if ctr.bat == nil || ctr.bat.Length() == 0 { bat.Clean(proc.GetMheap()) continue } if err := ctr.probe(bat, ap, proc, anal); err != nil { ctr.state = End ctr.mp.Free() - ctr.freeSels(proc) proc.SetInputBatch(nil) return true, err } @@ -93,62 +85,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - if ctr.bat == nil { - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i, vec := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(vec.Typ) - } - ctr.bat.Zs = proc.GetMheap().GetSels() - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) - } - if ctr.bat == nil || ctr.bat.Length() == 0 { - return nil - } - if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil { - return err - } - defer ctr.freeJoinCondition(proc) - itr := ctr.mp.NewIterator() - count := ctr.bat.Length() - for i := 0; i < count; i += hashmap.UnitLimit { - n := count - i - if n > hashmap.UnitLimit { - n = hashmap.UnitLimit - } - rows := ctr.mp.GroupCount() - vals, zvals, err := itr.Insert(i, n, ctr.vecs) - if err != nil { - return err - } - for k, v := range vals[:n] { - if zvals[k] == 0 { - continue - } - if v > rows { - ctr.sels = append(ctr.sels, proc.GetMheap().GetSels()) - } - ai := int64(v) - 1 - ctr.sels[ai] = append(ctr.sels[ai], int64(i+k)) - } - } + bat := <-proc.Reg.MergeReceivers[1].Ch + ctr.bat = bat + ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup() return nil } @@ -165,7 +104,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() - itr := ctr.mp.NewIterator() + itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i if n > hashmap.UnitLimit { @@ -229,10 +168,3 @@ func (ctr *container) freeJoinCondition(proc *process.Process) { } } } - -func (ctr *container) freeSels(proc *process.Process) { - for i := range ctr.sels { - proc.GetMheap().PutSels(ctr.sels[i]) - } - ctr.sels = nil -} diff --git a/pkg/sql/colexec/semi/join_test.go b/pkg/sql/colexec/semi/join_test.go index b58548b8d..8ac26255a 100644 --- a/pkg/sql/colexec/semi/join_test.go +++ b/pkg/sql/colexec/semi/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -40,6 +41,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -78,6 +80,7 @@ func TestString(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -86,9 +89,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -123,6 +124,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -131,9 +133,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -178,12 +178,29 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] proc: proc, cancel: cancel, arg: &Argument{ + Typs: ts, Result: rp, Conditions: cs, }, + barg: &hashbuild.Argument{ + Typs: ts, + NeedHashMap: true, + Conditions: cs[1], + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/semi/types.go b/pkg/sql/colexec/semi/types.go index a239a8764..10773893b 100644 --- a/pkg/sql/colexec/semi/types.go +++ b/pkg/sql/colexec/semi/types.go @@ -17,6 +17,7 @@ package semi import ( "github.com/matrixorigin/matrixone/pkg/common/hashmap" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/sql/plan" ) @@ -35,15 +36,14 @@ type evalVector struct { type container struct { state int - sels [][]int64 - inBuckets []uint8 - bat *batch.Batch + + bat *batch.Batch evecs []evalVector vecs []*vector.Vector - mp *hashmap.StrHashMap + mp *hashmap.JoinMap } type Argument struct { @@ -51,5 +51,6 @@ type Argument struct { Ibucket uint64 // index in buckets Nbucket uint64 // buckets count Result []int32 + Typs []types.Type Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/single/join.go b/pkg/sql/colexec/single/join.go index d422ff35b..b1f2f70b2 100644 --- a/pkg/sql/colexec/single/join.go +++ b/pkg/sql/colexec/single/join.go @@ -31,17 +31,11 @@ func String(_ any, buf *bytes.Buffer) { } func Prepare(proc *process.Process, arg any) error { - var err error - ap := arg.(*Argument) ap.ctr = new(container) - ap.ctr.sels = make([]int64, 0, 65536) - if ap.ctr.mp, err = hashmap.NewStrMap(false, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { - return err - } ap.ctr.inBuckets = make([]uint8, hashmap.UnitLimit) - ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.evecs = make([]evalVector, len(ap.Conditions[0])) + ap.ctr.vecs = make([]*vector.Vector, len(ap.Conditions[0])) ap.ctr.bat = batch.NewWithSize(len(ap.Typs)) ap.ctr.bat.Zs = proc.GetMheap().GetSels() for i, typ := range ap.Typs { @@ -61,7 +55,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { case Build: if err := ctr.build(ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() + if ctr.mp != nil { + ctr.mp.Free() + } return true, err } ctr.state = Probe @@ -69,7 +65,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { bat := <-proc.Reg.MergeReceivers[0].Ch if bat == nil { ctr.state = End - ctr.mp.Free() + if ctr.mp != nil { + ctr.mp.Free() + } if ctr.bat != nil { ctr.bat.Clean(proc.GetMheap()) } @@ -81,7 +79,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { if ctr.bat.Length() == 0 { if err := ctr.emptyProbe(bat, ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() + if ctr.mp != nil { + ctr.mp.Free() + } proc.SetInputBatch(nil) return true, err } @@ -89,7 +89,9 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } else { if err := ctr.probe(bat, ap, proc, anal); err != nil { ctr.state = End - ctr.mp.Free() + if ctr.mp != nil { + ctr.mp.Free() + } proc.SetInputBatch(nil) return true, err } @@ -103,56 +105,10 @@ func Call(idx int, proc *process.Process, arg any) (bool, error) { } func (ctr *container) build(ap *Argument, proc *process.Process, anal process.Analyze) error { - var err error - - for { - bat := <-proc.Reg.MergeReceivers[1].Ch - if bat == nil { - break - } - if bat.Length() == 0 { - continue - } - anal.Input(bat) - anal.Alloc(int64(bat.Size())) - if ctr.bat, err = ctr.bat.Append(proc.GetMheap(), bat); err != nil { - bat.Clean(proc.GetMheap()) - ctr.bat.Clean(proc.GetMheap()) - return err - } - bat.Clean(proc.GetMheap()) - } - if ctr.bat == nil || ctr.bat.Length() == 0 { - return nil - } - if err := ctr.evalJoinCondition(ctr.bat, ap.Conditions[1], proc); err != nil { - return err - } - defer ctr.freeJoinCondition(proc) - rows := ctr.mp.GroupCount() - itr := ctr.mp.NewIterator() - count := ctr.bat.Length() - for i := 0; i < count; i += hashmap.UnitLimit { - n := count - i - if n > hashmap.UnitLimit { - n = hashmap.UnitLimit - } - vals, zvals, err := itr.Insert(i, n, ctr.vecs) - if err != nil { - return err - } - for k, v := range vals[:n] { - if zvals[k] == 0 { - continue - } - if v > rows { - rows++ - ctr.mp.AddGroup() - ctr.sels = append(ctr.sels, int64(i+k)) - } else { - ctr.sels[v-1] = -1 - } - } + bat := <-proc.Reg.MergeReceivers[1].Ch + if bat != nil { + ctr.bat = bat + ctr.mp = bat.Ht.(*hashmap.JoinMap).Dup() } return nil } @@ -215,7 +171,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() - itr := ctr.mp.NewIterator() + mSels := ctr.mp.Sels() + itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i if n > hashmap.UnitLimit { @@ -244,10 +201,11 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) continue } - sel := ctr.sels[vals[k]-1] - if sel == -1 { + sels := mSels[vals[k]-1] + if len(sels) > 1 { return errors.New("scalar subquery returns more than 1 row") } + sel := sels[0] for j, rp := range ap.Result { if rp.Rel == 0 { if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.GetMheap()); err != nil { diff --git a/pkg/sql/colexec/single/join_test.go b/pkg/sql/colexec/single/join_test.go index 8148c72eb..aa883ef8e 100644 --- a/pkg/sql/colexec/single/join_test.go +++ b/pkg/sql/colexec/single/join_test.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -40,6 +41,7 @@ type joinTestCase struct { types []types.Type proc *process.Process cancel context.CancelFunc + barg *hashbuild.Argument } var ( @@ -57,26 +59,6 @@ func init() { newExpr(0, types.Type{Oid: types.T_int8}), }, }), - /* - newTestCase(testutil.NewMheap(), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}, - [][]*plan.Expr{ - { - newExpr(0, types.Type{Oid: types.T_int8}), - }, - { - newExpr(0, types.Type{Oid: types.T_int8}), - }, - }), - newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_decimal64}}, []ResultPos{{0, 0}}, - [][]*plan.Expr{ - { - newExpr(0, types.Type{Oid: types.T_decimal64}), - }, - { - newExpr(0, types.Type{Oid: types.T_decimal64, Scale: 1}), - }, - }), - */ } } @@ -89,6 +71,7 @@ func TestString(t *testing.T) { func TestJoin(t *testing.T) { for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -97,9 +80,7 @@ func TestJoin(t *testing.T) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -152,6 +133,7 @@ func BenchmarkJoin(b *testing.B) { } t := new(testing.T) for _, tc := range tcs { + bat := hashBuild(t, tc) err := Prepare(tc.proc, tc.arg) require.NoError(t, err) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) @@ -160,9 +142,7 @@ func BenchmarkJoin(b *testing.B) { tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) - tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{} - tc.proc.Reg.MergeReceivers[1].Ch <- nil + tc.proc.Reg.MergeReceivers[1].Ch <- bat for { if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil { break @@ -211,9 +191,25 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Result: rp, Conditions: cs, }, + barg: &hashbuild.Argument{ + Typs: ts, + NeedHashMap: true, + Conditions: cs[1], + }, } } +func hashBuild(t *testing.T, tc joinTestCase) *batch.Batch { + err := hashbuild.Prepare(tc.proc, tc.barg) + require.NoError(t, err) + tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows) + tc.proc.Reg.MergeReceivers[0].Ch <- nil + ok, err := hashbuild.Call(0, tc.proc, tc.barg) + require.NoError(t, err) + require.Equal(t, true, ok) + return tc.proc.Reg.InputBatch +} + // create a new block based on the type information, flgs[i] == ture: has null func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch { return testutil.NewBatch(ts, false, int(rows), proc.Mp) diff --git a/pkg/sql/colexec/single/types.go b/pkg/sql/colexec/single/types.go index 222670809..10e8979e0 100644 --- a/pkg/sql/colexec/single/types.go +++ b/pkg/sql/colexec/single/types.go @@ -36,8 +36,6 @@ type evalVector struct { type container struct { state int - sels []int64 - inBuckets []uint8 bat *batch.Batch @@ -45,7 +43,7 @@ type container struct { evecs []evalVector vecs []*vector.Vector - mp *hashmap.StrHashMap + mp *hashmap.JoinMap } type ResultPos struct { diff --git a/pkg/sql/colexec/union/union.go b/pkg/sql/colexec/union/union.go deleted file mode 100644 index 1cffdaa49..000000000 --- a/pkg/sql/colexec/union/union.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package union - -import ( - "bytes" - - "github.com/matrixorigin/matrixone/pkg/common/hashmap" - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/vector" - "github.com/matrixorigin/matrixone/pkg/vm/process" -) - -func String(_ any, buf *bytes.Buffer) { - buf.WriteString(" union ") -} - -func Prepare(proc *process.Process, argument any) error { - var err error - - ap := argument.(*Argument) - ap.ctr = new(container) - if ap.ctr.hashTable, err = hashmap.NewStrMap(true, ap.Ibucket, ap.Nbucket, proc.GetMheap()); err != nil { - return err - } - return nil -} - -func Call(idx int, proc *process.Process, argument any) (bool, error) { - ap := argument.(*Argument) - ctr := ap.ctr - analyze := proc.GetAnalyze(idx) - analyze.Start() - defer analyze.Stop() - for { - switch ctr.state { - case Build: - end, err := ctr.insert(ap, proc, analyze, 1) - if err != nil { - ctr.state = End - ctr.hashTable.Free() - return true, err - } - if end { - ctr.state = Probe - continue - } - return false, nil - case Probe: - end, err := ctr.insert(ap, proc, analyze, 0) - if err != nil { - ctr.state = End - ctr.hashTable.Free() - return true, err - } - if end { - ctr.hashTable.Free() - ctr.state = End - } - return end, nil - default: - proc.SetInputBatch(nil) - return true, nil - } - } -} - -// insert function use Table[index] to probe the HashTable. -// if row data doesn't in HashTable, append it to bat and update the HashTable. -func (ctr *container) insert(ap *Argument, proc *process.Process, analyze process.Analyze, index int) (bool, error) { - inserted := make([]uint8, hashmap.UnitLimit) - restoreInserted := make([]uint8, hashmap.UnitLimit) - - for { - bat := <-proc.Reg.MergeReceivers[index].Ch - if bat == nil { - return true, nil - } - if bat.Length() == 0 { - continue - } - defer bat.Clean(proc.Mp) - ctr.bat = batch.NewWithSize(len(bat.Vecs)) - for i := range bat.Vecs { - ctr.bat.Vecs[i] = vector.New(bat.Vecs[i].Typ) - } - - analyze.Input(bat) - count := len(bat.Zs) - - for i := 0; i < count; i += hashmap.UnitLimit { - oldHashGroup := ctr.hashTable.GroupCount() - iterator := ctr.hashTable.NewIterator() - - n := count - i - if n > hashmap.UnitLimit { - n = hashmap.UnitLimit - } - rowCount := ctr.hashTable.GroupCount() - vs, _, err := iterator.Insert(i, n, bat.Vecs) - if err != nil { - return false, err - } - copy(inserted[:n], restoreInserted[:n]) - for j, v := range vs { - if v > rowCount { - inserted[j] = 1 - ctr.bat.Zs = append(ctr.bat.Zs, 1) - } - } - - newHashGroup := ctr.hashTable.GroupCount() - insertCount := int(newHashGroup - oldHashGroup) - if insertCount > 0 { - for pos := range bat.Vecs { - if err = vector.UnionBatch(ctr.bat.Vecs[pos], bat.Vecs[pos], int64(i), insertCount, inserted[:n], proc.Mp); err != nil { - ctr.bat.Clean(proc.Mp) - return false, err - } - } - } - } - analyze.Output(ctr.bat) - proc.SetInputBatch(ctr.bat) - return false, nil - } -} diff --git a/pkg/sql/colexec/union/union_test.go b/pkg/sql/colexec/union/union_test.go deleted file mode 100644 index 37b0f1867..000000000 --- a/pkg/sql/colexec/union/union_test.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package union - -import ( - "context" - "testing" - - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/container/vector" - "github.com/matrixorigin/matrixone/pkg/testutil" - "github.com/matrixorigin/matrixone/pkg/vm/mheap" - "github.com/matrixorigin/matrixone/pkg/vm/process" - "github.com/stretchr/testify/require" -) - -type unionTestCase struct { - proc *process.Process - arg *Argument - cancel context.CancelFunc -} - -func TestUnion(t *testing.T) { - proc := testutil.NewProcess() - // [4 rows + 3 rows, 2 columns] union [3 rows + 4 rows, 2 columns] - /* - {1, 1} {1, 1} - {2, 2} {2, 2} - {3, 3} {3, 3} - {4, 4} union {1, 1} - {1, 1} {2, 2} - {2, 2} {3, 3} - {3, 3} {4, 4} - */ - c := newUnionTestCase( - proc, - []*batch.Batch{ - testutil.NewBatchWithVectors( - []*vector.Vector{ - testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}), - testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}), - }, nil), - - testutil.NewBatchWithVectors( - []*vector.Vector{ - testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}), - testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}), - }, nil), - }, - - []*batch.Batch{ - testutil.NewBatchWithVectors( - []*vector.Vector{ - testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}), - testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}), - }, nil), - - testutil.NewBatchWithVectors( - []*vector.Vector{ - testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}), - testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}), - }, nil), - }, - ) - - err := Prepare(c.proc, c.arg) - require.NoError(t, err) - cnt := 0 - for { - end, err := Call(0, c.proc, c.arg) - if end { - break - } - result := c.proc.InputBatch() - require.NoError(t, err) - if result != nil { - cnt += result.Length() - require.Equal(t, 2, len(result.Vecs)) // 2 columns - c.proc.InputBatch().Clean(c.proc.Mp) // clean the final result - } - } - require.Equal(t, 4, cnt) // 4 rows - require.Equal(t, int64(0), mheap.Size(c.proc.Mp)) -} - -func newUnionTestCase(proc *process.Process, leftBatches, rightBatches []*batch.Batch) unionTestCase { - ctx, cancel := context.WithCancel(context.Background()) - proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2) - { - c := make(chan *batch.Batch, len(leftBatches)+1) - for i := range leftBatches { - c <- leftBatches[i] - } - c <- nil - proc.Reg.MergeReceivers[0] = &process.WaitRegister{ - Ctx: ctx, - Ch: c, - } - } - { - c := make(chan *batch.Batch, len(rightBatches)+1) - for i := range rightBatches { - c <- rightBatches[i] - } - c <- nil - proc.Reg.MergeReceivers[1] = &process.WaitRegister{ - Ctx: ctx, - Ch: c, - } - } - arg := new(Argument) - return unionTestCase{ - proc: proc, - arg: arg, - cancel: cancel, - } -} diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 26b02d771..1467f2bb7 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -529,7 +529,7 @@ func (c *Compile) compileProjection(n *plan.Node, ss []*Scope) []*Scope { func (c *Compile) compileUnion(n *plan.Node, ss []*Scope, children []*Scope, ns []*plan.Node) []*Scope { ss = append(ss, children...) - rs := c.newGroupScopeList(validScopeCount(ss)) + rs := c.newScopeList(validScopeCount(ss)) regs := extraGroupRegisters(rs) for i := range ss { if !ss[i].IsEnd { @@ -554,7 +554,7 @@ func (c *Compile) compileUnion(n *plan.Node, ss []*Scope, children []*Scope, ns } func (c *Compile) compileMinusAndIntersect(n *plan.Node, ss []*Scope, children []*Scope, nodeType plan.Node_NodeType) []*Scope { - rs, left, right := c.newJoinScopeListWithBucket(c.newGroupScopeList(2), ss, children) + rs, left, right := c.newJoinScopeListWithBucket(c.newScopeList(2), ss, children) switch nodeType { case plan.Node_MINUS: for i := range rs { @@ -585,6 +585,10 @@ func (c *Compile) compileUnionAll(n *plan.Node, ss []*Scope, children []*Scope) func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scope, joinTyp plan.Node_JoinFlag) []*Scope { rs, chp := c.newJoinScopeList(ss, children) isEq := isEquiJoin(n.OnList) + typs := make([]types.Type, len(right.ProjectList)) + for i, expr := range right.ProjectList { + typs[i] = dupType(expr.Typ) + } switch joinTyp { case plan.Node_INNER: if len(n.OnList) == 0 { @@ -592,7 +596,7 @@ func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scop rs[i].appendInstruction(vm.Instruction{ Op: vm.Product, Idx: c.anal.curr, - Arg: constructProduct(n, c.proc), + Arg: constructProduct(n, typs, c.proc), }) } } else { @@ -601,13 +605,13 @@ func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scop rs[i].appendInstruction(vm.Instruction{ Op: vm.Join, Idx: c.anal.curr, - Arg: constructJoin(n, c.proc), + Arg: constructJoin(n, typs, c.proc), }) } else { rs[i].appendInstruction(vm.Instruction{ Op: vm.LoopJoin, Idx: c.anal.curr, - Arg: constructLoopJoin(n, c.proc), + Arg: constructLoopJoin(n, typs, c.proc), }) } } @@ -618,21 +622,17 @@ func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scop rs[i].appendInstruction(vm.Instruction{ Op: vm.Semi, Idx: c.anal.curr, - Arg: constructSemi(n, c.proc), + Arg: constructSemi(n, typs, c.proc), }) } else { rs[i].appendInstruction(vm.Instruction{ Op: vm.LoopSemi, Idx: c.anal.curr, - Arg: constructLoopSemi(n, c.proc), + Arg: constructLoopSemi(n, typs, c.proc), }) } } case plan.Node_LEFT: - typs := make([]types.Type, len(right.ProjectList)) - for i, expr := range right.ProjectList { - typs[i] = dupType(expr.Typ) - } for i := range rs { if isEq { rs[i].appendInstruction(vm.Instruction{ @@ -649,10 +649,6 @@ func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scop } } case plan.Node_SINGLE: - typs := make([]types.Type, len(right.ProjectList)) - for i, expr := range right.ProjectList { - typs[i] = dupType(expr.Typ) - } for i := range rs { if isEq { rs[i].appendInstruction(vm.Instruction{ @@ -674,13 +670,13 @@ func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scop rs[i].appendInstruction(vm.Instruction{ Op: vm.Anti, Idx: c.anal.curr, - Arg: constructAnti(n, c.proc), + Arg: constructAnti(n, typs, c.proc), }) } else { rs[i].appendInstruction(vm.Instruction{ Op: vm.LoopAnti, Idx: c.anal.curr, - Arg: constructLoopAnti(n, c.proc), + Arg: constructLoopAnti(n, typs, c.proc), }) } } @@ -790,7 +786,7 @@ func (c *Compile) compileAgg(n *plan.Node, ss []*Scope, ns []*plan.Node) []*Scop } func (c *Compile) compileGroup(n *plan.Node, ss []*Scope, ns []*plan.Node) []*Scope { - rs := c.newGroupScopeList(validScopeCount(ss)) + rs := c.newScopeList(validScopeCount(ss)) regs := extraGroupRegisters(rs) for i := range ss { if !ss[i].IsEnd { @@ -843,16 +839,16 @@ func (c *Compile) newMergeScope(ss []*Scope) *Scope { return rs } -func (c *Compile) newGroupScopeList(childrenCount int) []*Scope { +func (c *Compile) newScopeList(childrenCount int) []*Scope { var ss []*Scope for _, n := range c.cnList { - ss = append(ss, c.newGroupScopeListWithNode(n.Mcpu, childrenCount)...) + ss = append(ss, c.newScopeListWithNode(n.Mcpu, childrenCount)...) } return ss } -func (c *Compile) newGroupScopeListWithNode(mcpu, childrenCount int) []*Scope { +func (c *Compile) newScopeListWithNode(mcpu, childrenCount int) []*Scope { ss := make([]*Scope, mcpu) for i := range ss { ss[i] = new(Scope) @@ -885,7 +881,6 @@ func (c *Compile) newJoinScopeListWithBucket(rs, ss, children []*Scope) ([]*Scop } func (c *Compile) newJoinScopeList(ss []*Scope, children []*Scope) ([]*Scope, *Scope) { - regs := make([]*process.WaitRegister, 0, len(ss)) chp := c.newMergeScope(children) chp.IsEnd = true rs := make([]*Scope, len(ss)) @@ -895,10 +890,11 @@ func (c *Compile) newJoinScopeList(ss []*Scope, children []*Scope) ([]*Scope, *S continue } rs[i] = new(Scope) - rs[i].Magic = Merge + rs[i].Magic = Remote + rs[i].IsJoin = true + rs[i].NodeInfo = ss[i].NodeInfo rs[i].PreScopes = []*Scope{ss[i]} rs[i].Proc = process.NewWithAnalyze(c.proc, c.ctx, 2, c.anal.Nodes()) - regs = append(regs, rs[i].Proc.Reg.MergeReceivers[1]) ss[i].appendInstruction(vm.Instruction{ Op: vm.Connector, Arg: &connector.Argument{ @@ -908,11 +904,48 @@ func (c *Compile) newJoinScopeList(ss []*Scope, children []*Scope) ([]*Scope, *S } chp.Instructions = append(chp.Instructions, vm.Instruction{ Op: vm.Dispatch, - Arg: constructDispatch(true, regs), + Arg: constructDispatch(true, extraJoinRegisters(rs, 1)), }) return rs, chp } +func (c *Compile) newLeftScope(s *Scope, ss []*Scope) *Scope { + rs := &Scope{ + Magic: Merge, + } + rs.appendInstruction(vm.Instruction{ + Op: vm.Merge, + Arg: &merge.Argument{}, + }) + rs.appendInstruction(vm.Instruction{ + Op: vm.Dispatch, + Arg: constructDispatch(false, extraJoinRegisters(ss, 0)), + }) + rs.IsEnd = true + rs.Proc = process.NewWithAnalyze(s.Proc, c.ctx, 1, c.anal.Nodes()) + rs.Proc.Reg.MergeReceivers[0] = s.Proc.Reg.MergeReceivers[0] + return rs +} + +func (c *Compile) newRightScope(s *Scope, ss []*Scope) *Scope { + rs := &Scope{ + Magic: Merge, + } + rs.appendInstruction(vm.Instruction{ + Op: vm.HashBuild, + Idx: s.Instructions[0].Idx, + Arg: constructHashBuild(s.Instructions[0]), + }) + rs.appendInstruction(vm.Instruction{ + Op: vm.Dispatch, + Arg: constructDispatch(true, extraJoinRegisters(ss, 1)), + }) + rs.IsEnd = true + rs.Proc = process.NewWithAnalyze(s.Proc, c.ctx, 1, c.anal.Nodes()) + rs.Proc.Reg.MergeReceivers[0] = s.Proc.Reg.MergeReceivers[1] + return rs +} + // Number of cpu's available on the current machine func (c *Compile) NumCPU() int { return runtime.NumCPU() diff --git a/pkg/sql/compile/operator.go b/pkg/sql/compile/operator.go index 25a382408..d3838a275 100644 --- a/pkg/sql/compile/operator.go +++ b/pkg/sql/compile/operator.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/matrixorigin/matrixone/pkg/sql/colexec/anti" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/colexec/intersect" "github.com/matrixorigin/matrixone/pkg/sql/colexec/loopanti" "github.com/matrixorigin/matrixone/pkg/sql/colexec/minus" @@ -300,18 +301,19 @@ func constructTop(n *plan.Node, proc *process.Process) *top.Argument { } } -func constructJoin(n *plan.Node, proc *process.Process) *join.Argument { +func constructJoin(n *plan.Node, typs []types.Type, proc *process.Process) *join.Argument { result := make([]join.ResultPos, len(n.ProjectList)) for i, expr := range n.ProjectList { result[i].Rel, result[i].Pos = constructJoinResult(expr) } return &join.Argument{ + Typs: typs, Result: result, Conditions: constructJoinConditions(n.OnList), } } -func constructSemi(n *plan.Node, proc *process.Process) *semi.Argument { +func constructSemi(n *plan.Node, typs []types.Type, proc *process.Process) *semi.Argument { result := make([]int32, len(n.ProjectList)) for i, expr := range n.ProjectList { rel, pos := constructJoinResult(expr) @@ -321,6 +323,7 @@ func constructSemi(n *plan.Node, proc *process.Process) *semi.Argument { result[i] = pos } return &semi.Argument{ + Typs: typs, Result: result, Conditions: constructJoinConditions(n.OnList), } @@ -350,15 +353,15 @@ func constructSingle(n *plan.Node, typs []types.Type, proc *process.Process) *si } } -func constructProduct(n *plan.Node, proc *process.Process) *product.Argument { +func constructProduct(n *plan.Node, typs []types.Type, proc *process.Process) *product.Argument { result := make([]product.ResultPos, len(n.ProjectList)) for i, expr := range n.ProjectList { result[i].Rel, result[i].Pos = constructJoinResult(expr) } - return &product.Argument{Result: result} + return &product.Argument{Typs: typs, Result: result} } -func constructAnti(n *plan.Node, proc *process.Process) *anti.Argument { +func constructAnti(n *plan.Node, typs []types.Type, proc *process.Process) *anti.Argument { result := make([]int32, len(n.ProjectList)) for i, expr := range n.ProjectList { rel, pos := constructJoinResult(expr) @@ -368,6 +371,7 @@ func constructAnti(n *plan.Node, proc *process.Process) *anti.Argument { result[i] = pos } return &anti.Argument{ + Typs: typs, Result: result, Conditions: constructJoinConditions(n.OnList), } @@ -521,18 +525,19 @@ func constructMergeOrder(n *plan.Node, proc *process.Process) *mergeorder.Argume } } -func constructLoopJoin(n *plan.Node, proc *process.Process) *loopjoin.Argument { +func constructLoopJoin(n *plan.Node, typs []types.Type, proc *process.Process) *loopjoin.Argument { result := make([]loopjoin.ResultPos, len(n.ProjectList)) for i, expr := range n.ProjectList { result[i].Rel, result[i].Pos = constructJoinResult(expr) } return &loopjoin.Argument{ + Typs: typs, Result: result, Cond: colexec.RewriteFilterExprList(n.OnList), } } -func constructLoopSemi(n *plan.Node, proc *process.Process) *loopsemi.Argument { +func constructLoopSemi(n *plan.Node, typs []types.Type, proc *process.Process) *loopsemi.Argument { result := make([]int32, len(n.ProjectList)) for i, expr := range n.ProjectList { rel, pos := constructJoinResult(expr) @@ -542,6 +547,7 @@ func constructLoopSemi(n *plan.Node, proc *process.Process) *loopsemi.Argument { result[i] = pos } return &loopsemi.Argument{ + Typs: typs, Result: result, Cond: colexec.RewriteFilterExprList(n.OnList), } @@ -570,7 +576,7 @@ func constructLoopSingle(n *plan.Node, typs []types.Type, proc *process.Process) } } -func constructLoopAnti(n *plan.Node, proc *process.Process) *loopanti.Argument { +func constructLoopAnti(n *plan.Node, typs []types.Type, proc *process.Process) *loopanti.Argument { result := make([]int32, len(n.ProjectList)) for i, expr := range n.ProjectList { rel, pos := constructJoinResult(expr) @@ -580,11 +586,90 @@ func constructLoopAnti(n *plan.Node, proc *process.Process) *loopanti.Argument { result[i] = pos } return &loopanti.Argument{ + Typs: typs, Result: result, Cond: colexec.RewriteFilterExprList(n.OnList), } } +func constructHashBuild(in vm.Instruction) *hashbuild.Argument { + switch in.Op { + case vm.Anti: + arg := in.Arg.(*anti.Argument) + return &hashbuild.Argument{ + NeedHashMap: true, + Typs: arg.Typs, + Conditions: arg.Conditions[1], + } + case vm.Join: + arg := in.Arg.(*join.Argument) + return &hashbuild.Argument{ + NeedHashMap: true, + Typs: arg.Typs, + Conditions: arg.Conditions[1], + } + case vm.Left: + arg := in.Arg.(*left.Argument) + return &hashbuild.Argument{ + NeedHashMap: true, + Typs: arg.Typs, + Conditions: arg.Conditions[1], + } + case vm.Semi: + arg := in.Arg.(*semi.Argument) + return &hashbuild.Argument{ + NeedHashMap: true, + Typs: arg.Typs, + Conditions: arg.Conditions[1], + } + case vm.Single: + arg := in.Arg.(*single.Argument) + return &hashbuild.Argument{ + NeedHashMap: true, + Typs: arg.Typs, + Conditions: arg.Conditions[1], + } + case vm.Product: + arg := in.Arg.(*product.Argument) + return &hashbuild.Argument{ + NeedHashMap: false, + Typs: arg.Typs, + } + case vm.LoopAnti: + arg := in.Arg.(*loopanti.Argument) + return &hashbuild.Argument{ + NeedHashMap: false, + Typs: arg.Typs, + } + case vm.LoopJoin: + arg := in.Arg.(*loopjoin.Argument) + return &hashbuild.Argument{ + NeedHashMap: false, + Typs: arg.Typs, + } + case vm.LoopLeft: + arg := in.Arg.(*loopleft.Argument) + return &hashbuild.Argument{ + NeedHashMap: false, + Typs: arg.Typs, + } + case vm.LoopSemi: + arg := in.Arg.(*loopsemi.Argument) + return &hashbuild.Argument{ + NeedHashMap: false, + Typs: arg.Typs, + } + case vm.LoopSingle: + arg := in.Arg.(*loopsingle.Argument) + return &hashbuild.Argument{ + NeedHashMap: false, + Typs: arg.Typs, + } + default: + panic(errors.New(errno.SyntaxErrororAccessRuleViolation, fmt.Sprintf("unsupport join type '%v'", in.Op))) + } +} + func constructJoinResult(expr *plan.Expr) (int32, int32) { e, ok := expr.Expr.(*plan.Expr_Col) if !ok { diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 64558712f..d2ff95f57 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -15,15 +15,14 @@ package compile import ( - "context" "fmt" "github.com/matrixorigin/matrixone/pkg/sql/colexec/connector" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/merge" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" "github.com/matrixorigin/matrixone/pkg/sql/colexec/limit" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/merge" "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergelimit" "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergeoffset" @@ -126,6 +125,9 @@ func (s *Scope) RemoteRun(c *Compile) error { func (s *Scope) ParallelRun(c *Compile) error { var rds []engine.Reader + if s.IsJoin { + return s.JoinRun(c) + } if s.DataSource == nil { return s.MergeRun(c) } @@ -155,154 +157,189 @@ func (s *Scope) ParallelRun(c *Compile) error { } ss[i].Proc = process.NewWithAnalyze(s.Proc, c.ctx, 0, c.anal.Nodes()) } - { - var flg bool + s = newParallelScope(c, s, ss) + return s.MergeRun(c) +} + +func (s *Scope) JoinRun(c *Compile) error { + mcpu := s.NodeInfo.Mcpu + if mcpu < 1 { + mcpu = 1 + } + chp := s.PreScopes[0] + chp.IsEnd = true + ss := make([]*Scope, mcpu) + for i := 0; i < mcpu; i++ { + ss[i] = &Scope{ + Magic: Merge, + } + ss[i].Proc = process.NewWithAnalyze(s.Proc, c.ctx, 2, c.anal.Nodes()) + ss[i].Proc.Reg.MergeReceivers[1].Ch = make(chan *batch.Batch, 10) + } + left, right := c.newLeftScope(s, ss), c.newRightScope(s, ss) + s = newParallelScope(c, s, ss) + s.PreScopes = append(s.PreScopes, chp) + s.PreScopes = append(s.PreScopes, left) + s.PreScopes = append(s.PreScopes, right) + return s.MergeRun(c) +} + +func newParallelScope(c *Compile, s *Scope, ss []*Scope) *Scope { + var flg bool - for i, in := range s.Instructions { - if flg { - break + for i, in := range s.Instructions { + if flg { + break + } + switch in.Op { + case vm.Top: + flg = true + arg := in.Arg.(*top.Argument) + s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) + s.Instructions[0] = vm.Instruction{ + Op: vm.MergeTop, + Idx: in.Idx, + Arg: &mergetop.Argument{ + Fs: arg.Fs, + Limit: arg.Limit, + }, } - switch in.Op { - case vm.Top: - flg = true - arg := in.Arg.(*top.Argument) - s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) - s.Instructions[0] = vm.Instruction{ - Op: vm.MergeTop, + for i := range ss { + ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ + Op: vm.Top, Idx: in.Idx, - Arg: &mergetop.Argument{ + Arg: &top.Argument{ Fs: arg.Fs, Limit: arg.Limit, }, - } - for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ - Op: vm.Top, - Idx: in.Idx, - Arg: &top.Argument{ - Fs: arg.Fs, - Limit: arg.Limit, - }, - }) - } - case vm.Order: - flg = true - arg := in.Arg.(*order.Argument) - s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) - s.Instructions[0] = vm.Instruction{ - Op: vm.MergeOrder, + }) + } + case vm.Order: + flg = true + arg := in.Arg.(*order.Argument) + s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) + s.Instructions[0] = vm.Instruction{ + Op: vm.MergeOrder, + Idx: in.Idx, + Arg: &mergeorder.Argument{ + Fs: arg.Fs, + }, + } + for i := range ss { + ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ + Op: vm.Order, Idx: in.Idx, - Arg: &mergeorder.Argument{ + Arg: &order.Argument{ Fs: arg.Fs, }, - } - for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ - Op: vm.Order, - Idx: in.Idx, - Arg: &order.Argument{ - Fs: arg.Fs, - }, - }) - } - case vm.Limit: - flg = true - arg := in.Arg.(*limit.Argument) - s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) - s.Instructions[0] = vm.Instruction{ - Op: vm.MergeLimit, + }) + } + case vm.Limit: + flg = true + arg := in.Arg.(*limit.Argument) + s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) + s.Instructions[0] = vm.Instruction{ + Op: vm.MergeLimit, + Idx: in.Idx, + Arg: &mergelimit.Argument{ + Limit: arg.Limit, + }, + } + for i := range ss { + ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ + Op: vm.Limit, Idx: in.Idx, - Arg: &mergelimit.Argument{ + Arg: &limit.Argument{ Limit: arg.Limit, }, - } - for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ - Op: vm.Limit, - Idx: in.Idx, - Arg: &limit.Argument{ - Limit: arg.Limit, - }, - }) - } - case vm.Group: - flg = true - arg := in.Arg.(*group.Argument) - s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) - s.Instructions[0] = vm.Instruction{ - Op: vm.MergeGroup, - Arg: &mergegroup.Argument{ - NeedEval: false, + }) + } + case vm.Group: + flg = true + arg := in.Arg.(*group.Argument) + s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) + s.Instructions[0] = vm.Instruction{ + Op: vm.MergeGroup, + Arg: &mergegroup.Argument{ + NeedEval: false, + }, + } + for i := range ss { + ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ + Op: vm.Group, + Arg: &group.Argument{ + Aggs: arg.Aggs, + Exprs: arg.Exprs, + Types: arg.Types, }, - } - for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ - Op: vm.Group, - Arg: &group.Argument{ - Aggs: arg.Aggs, - Exprs: arg.Exprs, - Types: arg.Types, - }, - }) - } - case vm.Offset: - flg = true - arg := in.Arg.(*offset.Argument) - s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) - s.Instructions[0] = vm.Instruction{ - Op: vm.MergeOffset, - Arg: &mergeoffset.Argument{ + }) + } + case vm.Offset: + flg = true + arg := in.Arg.(*offset.Argument) + s.Instructions = append(s.Instructions[:1], s.Instructions[i+1:]...) + s.Instructions[0] = vm.Instruction{ + Op: vm.MergeOffset, + Arg: &mergeoffset.Argument{ + Offset: arg.Offset, + }, + } + for i := range ss { + ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ + Op: vm.Offset, + Arg: &offset.Argument{ Offset: arg.Offset, }, - } - for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ - Op: vm.Offset, - Arg: &offset.Argument{ - Offset: arg.Offset, - }, - }) - } - default: - for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, dupInstruction(in)) - } + }) } - } - if !flg { + default: for i := range ss { - ss[i].Instructions = ss[i].Instructions[:len(ss[i].Instructions)-1] - } - s.Instructions[0] = vm.Instruction{ - Op: vm.Merge, - Arg: &merge.Argument{}, + ss[i].Instructions = append(ss[i].Instructions, dupInstruction(in)) } - s.Instructions[1] = s.Instructions[len(s.Instructions)-1] - s.Instructions = s.Instructions[:2] } } - ctx, cancel := context.WithCancel(c.ctx) + if !flg { + for i := range ss { + ss[i].Instructions = ss[i].Instructions[:len(ss[i].Instructions)-1] + } + s.Instructions[0] = vm.Instruction{ + Op: vm.Merge, + Arg: &merge.Argument{}, + } + s.Instructions[1] = s.Instructions[len(s.Instructions)-1] + s.Instructions = s.Instructions[:2] + } s.Magic = Merge s.PreScopes = ss - s.Proc.Cancel = cancel - s.Proc.Reg.MergeReceivers = make([]*process.WaitRegister, len(ss)) + cnt := 0 + for _, s := range ss { + if s.IsEnd { + continue + } + cnt++ + } + s.Proc.Reg.MergeReceivers = make([]*process.WaitRegister, cnt) { - for i := 0; i < len(ss); i++ { + for i := 0; i < cnt; i++ { s.Proc.Reg.MergeReceivers[i] = &process.WaitRegister{ - Ctx: ctx, + Ctx: s.Proc.Ctx, Ch: make(chan *batch.Batch, 1), } } } + j := 0 for i := range ss { - ss[i].Instructions = append(ss[i].Instructions, vm.Instruction{ - Op: vm.Connector, - Arg: &connector.Argument{ - Reg: s.Proc.Reg.MergeReceivers[i], - }, - }) + if !ss[i].IsEnd { + ss[i].appendInstruction(vm.Instruction{ + Op: vm.Connector, + Arg: &connector.Argument{ + Reg: s.Proc.Reg.MergeReceivers[i], + }, + }) + j++ + } } - return s.MergeRun(c) + return s } func (s *Scope) appendInstruction(in vm.Instruction) { diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index 2875f2a02..faf7371b4 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -76,6 +76,9 @@ type Scope struct { // 2 - execution unit that requires remote call. Magic int + // IsEnd means the pipeline is join + IsJoin bool + // IsEnd means the pipeline is end IsEnd bool diff --git a/pkg/vm/overload.go b/pkg/vm/overload.go index 8d63dcb4d..ff926780d 100644 --- a/pkg/vm/overload.go +++ b/pkg/vm/overload.go @@ -18,13 +18,13 @@ import ( "bytes" "github.com/matrixorigin/matrixone/pkg/sql/colexec/anti" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" "github.com/matrixorigin/matrixone/pkg/sql/colexec/intersect" "github.com/matrixorigin/matrixone/pkg/sql/colexec/loopanti" "github.com/matrixorigin/matrixone/pkg/sql/colexec/minus" "github.com/matrixorigin/matrixone/pkg/sql/colexec/loopsingle" "github.com/matrixorigin/matrixone/pkg/sql/colexec/single" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/union" "github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion" @@ -93,9 +93,10 @@ var stringFunc = [...]func(any, *bytes.Buffer){ Insert: insert.String, Update: update.String, - Union: union.String, Minus: minus.String, Intersect: intersect.String, + + HashBuild: hashbuild.String, } var prepareFunc = [...]func(*process.Process, any) error{ @@ -133,9 +134,10 @@ var prepareFunc = [...]func(*process.Process, any) error{ Insert: insert.Prepare, Update: update.Prepare, - Union: union.Prepare, Minus: minus.Prepare, Intersect: intersect.Prepare, + + HashBuild: hashbuild.Prepare, } var execFunc = [...]func(int, *process.Process, any) (bool, error){ @@ -173,7 +175,8 @@ var execFunc = [...]func(int, *process.Process, any) (bool, error){ Insert: insert.Call, Update: update.Call, - Union: union.Call, Minus: minus.Call, Intersect: intersect.Call, + + HashBuild: hashbuild.Call, } diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go index 1d5df6fa9..a0a8436cd 100644 --- a/pkg/vm/process/process.go +++ b/pkg/vm/process/process.go @@ -51,6 +51,7 @@ func NewFromProc(p *Process, ctx context.Context, regNumber int) *Process { proc.SessionInfo = p.SessionInfo // reg and cancel + proc.Ctx = newctx proc.Cancel = cancel proc.Reg.MergeReceivers = make([]*WaitRegister, regNumber) for i := 0; i < regNumber; i++ { diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go index 4cb6e620b..f7c74b1e5 100644 --- a/pkg/vm/process/types.go +++ b/pkg/vm/process/types.go @@ -111,7 +111,8 @@ type Process struct { SessionInfo SessionInfo - // snapshot is transaction context + Ctx context.Context + Cancel context.CancelFunc } diff --git a/pkg/vm/types.go b/pkg/vm/types.go index 0c240b293..80dcb07fb 100644 --- a/pkg/vm/types.go +++ b/pkg/vm/types.go @@ -49,9 +49,10 @@ const ( Insert Update - Union Minus Intersect + + HashBuild ) // Instruction contains relational algebra -- GitLab