diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go index d3c5d315e20473270cdb3c26888cb7c8dbef44e6..d17d61fc1d5e97b4ff7344654133d2cd73ae91c8 100644 --- a/pkg/container/batch/batch.go +++ b/pkg/container/batch/batch.go @@ -17,6 +17,18 @@ func New(attrs []string) *Batch { } } +func (bat *Batch) Reorder(attrs []string) { + for i, name := range attrs { + for j, attr := range bat.Attrs { + if name == attr { + bat.Is[i], bat.Is[j] = bat.Is[j], bat.Is[i] + bat.Vecs[i], bat.Vecs[j] = bat.Vecs[j], bat.Vecs[i] + bat.Attrs[i], bat.Attrs[j] = bat.Attrs[j], bat.Attrs[i] + } + } + } +} + func (bat *Batch) Length(proc *process.Process) (int, error) { vec, err := bat.GetVector(bat.Attrs[0], proc) if err != nil { @@ -25,6 +37,17 @@ func (bat *Batch) Length(proc *process.Process) (int, error) { return vec.Length(), nil } +func (bat *Batch) Prefetch(attrs []string, vecs []*vector.Vector, proc *process.Process) error { + var err error + + for i, attr := range attrs { + if vecs[i], err = bat.GetVector(attr, proc); err != nil { + return err + } + } + return nil +} + func (bat *Batch) GetVector(name string, proc *process.Process) (*vector.Vector, error) { for i, attr := range bat.Attrs { if attr != name { @@ -71,6 +94,7 @@ func (bat *Batch) WaitIo() { } } +/* func (bat *Batch) Free(proc *process.Process) { bat.WaitIo() if bat.SelsData != nil { @@ -82,6 +106,7 @@ func (bat *Batch) Free(proc *process.Process) { vec.Free(proc) } } +*/ func (bat *Batch) Clean(proc *process.Process) { bat.WaitIo() @@ -92,13 +117,8 @@ func (bat *Batch) Clean(proc *process.Process) { } for _, vec := range bat.Vecs { if vec.Data != nil { - count := encoding.DecodeUint64(vec.Data[:8]) - if count > 0 { - copy(vec.Data, mempool.OneCount) - proc.Mp.Free(vec.Data) - proc.Gm.Free(int64(cap(vec.Data))) - } - vec.Data = nil + copy(vec.Data, mempool.OneCount) + vec.Free(proc) } } } @@ -111,6 +131,9 @@ func (bat *Batch) Reduce(attrs []string, proc *process.Process) { } bat.Vecs[i].Free(proc) if bat.Vecs[i].Data == nil { + if len(bat.Is) > i { + bat.Is = append(bat.Is[:i], bat.Is[i+1:]...) + } bat.Vecs = append(bat.Vecs[:i], bat.Vecs[i+1:]...) bat.Attrs = append(bat.Attrs[:i], bat.Attrs[i+1:]...) } diff --git a/pkg/hash/join.go b/pkg/hash/joingroup.go similarity index 77% rename from pkg/hash/join.go rename to pkg/hash/joingroup.go index e8cad90888df6f543a012bcf538bf542d447f5d9..9680992e834114b7899c96f16b33d35fb025c2aa 100644 --- a/pkg/hash/join.go +++ b/pkg/hash/joingroup.go @@ -2,37 +2,40 @@ package hash import ( "bytes" + "matrixbase/pkg/container/batch" "matrixbase/pkg/container/types" "matrixbase/pkg/container/vector" "matrixbase/pkg/encoding" "matrixbase/pkg/vm/process" ) -func NewJoin(idx, sel int64) *Join { - return &Join{ +func NewJoinGroup(idx, sel int64) *JoinGroup { + return &JoinGroup{ Idx: idx, Sel: sel, } } -func (m *Join) Free(proc *process.Process) { - if m.Data != nil { - proc.Free(m.Data) - m.Data = nil +func (g *JoinGroup) Free(proc *process.Process) { + if g.Data != nil { + proc.Free(g.Data) + g.Data = nil } - if m.Idata != nil { - proc.Free(m.Idata) - m.Idata = nil + if g.Idata != nil { + proc.Free(g.Idata) + g.Idata = nil } } -func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vector.Vector, gvecs [][]*vector.Vector, diffs []bool, proc *process.Process) ([]int64, error) { +func (g *JoinGroup) Fill(distinct bool, sels, matched []int64, vecs []*vector.Vector, + bats []*batch.Batch, diffs []bool, proc *process.Process) ([]int64, error) { + idx := int64(len(bats) - 1) for i, vec := range vecs { switch vec.Typ.Oid { case types.T_int8: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -42,7 +45,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]int8) - gv := gvec.Col.([]int8)[m.Sel] + gv := gvec.Col.([]int8)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -56,15 +59,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]int8) - gv := gvec.Col.([]int8)[m.Sel] + gv := gvec.Col.([]int8)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_int16: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -74,7 +77,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]int16) - gv := gvec.Col.([]int16)[m.Sel] + gv := gvec.Col.([]int16)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -88,15 +91,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]int16) - gv := gvec.Col.([]int16)[m.Sel] + gv := gvec.Col.([]int16)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_int32: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -106,7 +109,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]int32) - gv := gvec.Col.([]int32)[m.Sel] + gv := gvec.Col.([]int32)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -120,15 +123,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]int32) - gv := gvec.Col.([]int32)[m.Sel] + gv := gvec.Col.([]int32)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_int64: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -138,7 +141,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]int64) - gv := gvec.Col.([]int64)[m.Sel] + gv := gvec.Col.([]int64)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -152,15 +155,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]int64) - gv := gvec.Col.([]int64)[m.Sel] + gv := gvec.Col.([]int64)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_uint8: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -170,7 +173,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]uint8) - gv := gvec.Col.([]uint8)[m.Sel] + gv := gvec.Col.([]uint8)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -184,15 +187,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]uint8) - gv := gvec.Col.([]uint8)[m.Sel] + gv := gvec.Col.([]uint8)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_uint16: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -202,7 +205,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]uint16) - gv := gvec.Col.([]uint16)[m.Sel] + gv := gvec.Col.([]uint16)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -216,15 +219,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]uint16) - gv := gvec.Col.([]uint16)[m.Sel] + gv := gvec.Col.([]uint16)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_uint32: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -234,7 +237,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]uint32) - gv := gvec.Col.([]uint32)[m.Sel] + gv := gvec.Col.([]uint32)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -248,15 +251,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]uint32) - gv := gvec.Col.([]uint32)[m.Sel] + gv := gvec.Col.([]uint32)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_uint64: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -266,7 +269,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]uint64) - gv := gvec.Col.([]uint64)[m.Sel] + gv := gvec.Col.([]uint64)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -280,15 +283,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]uint64) - gv := gvec.Col.([]uint64)[m.Sel] + gv := gvec.Col.([]uint64)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_float32: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -298,7 +301,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]float32) - gv := gvec.Col.([]float32)[m.Sel] + gv := gvec.Col.([]float32)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -312,15 +315,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]float32) - gv := gvec.Col.([]float32)[m.Sel] + gv := gvec.Col.([]float32)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } } case types.T_float64: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -330,7 +333,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } case lnull && !rnull: // null is not value vs := vec.Col.([]float64) - gv := gvec.Col.([]float64)[m.Sel] + gv := gvec.Col.([]float64)[g.Sel] for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -344,7 +347,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } default: vs := vec.Col.([]float64) - gv := gvec.Col.([]float64)[m.Sel] + gv := gvec.Col.([]float64)[g.Sel] for i, sel := range sels { diffs[i] = diffs[i] || (gv != vs[sel]) } @@ -354,9 +357,9 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec case types.T_datetime: case types.T_char: case types.T_varchar: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -367,7 +370,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec case lnull && !rnull: // null is not value vs := vec.Col.(*types.Bytes) gvs := gvec.Col.(*types.Bytes) - gv := gvs.Get(int(m.Sel)) + gv := gvs.Get(int(g.Sel)) for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -382,15 +385,15 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec default: vs := vec.Col.(*types.Bytes) gvs := gvec.Col.(*types.Bytes) - gv := gvs.Get(int(m.Sel)) + gv := gvs.Get(int(g.Sel)) for i, sel := range sels { diffs[i] = diffs[i] || (bytes.Compare(gv, vs.Get(int(sel))) != 0) } } case types.T_json: - gvec := gvecs[m.Idx][i] + gvec := bats[g.Idx].Vecs[i] lnull := vec.Nsp.Any() - rnull := gvec.Nsp.Contains(uint64(m.Sel)) + rnull := gvec.Nsp.Contains(uint64(g.Sel)) switch { case lnull && rnull: for i, sel := range sels { @@ -401,7 +404,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec case lnull && !rnull: // null is not value vs := vec.Col.(*types.Bytes) gvs := gvec.Col.(*types.Bytes) - gv := gvs.Get(int(m.Sel)) + gv := gvs.Get(int(g.Sel)) for i, sel := range sels { if vec.Nsp.Contains(uint64(sel)) { diffs[i] = true @@ -416,7 +419,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec default: vs := vec.Col.(*types.Bytes) gvs := gvec.Col.(*types.Bytes) - gv := gvs.Get(int(m.Sel)) + gv := gvs.Get(int(g.Sel)) for i, sel := range sels { diffs[i] = diffs[i] || (bytes.Compare(gv, vs.Get(int(sel))) != 0) } @@ -435,8 +438,8 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec } } if len(matched) > 0 { - length := len(m.Sels) + len(matched) - if cap(m.Sels) < length { + length := len(g.Sels) + len(matched) + if cap(g.Sels) < length { data, err := proc.Alloc(int64(length) * 8) if err != nil { return nil, err @@ -446,24 +449,24 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec proc.Free(data) return nil, err } - copy(data, m.Data) - copy(idata, m.Idata) - proc.Free(m.Data) - proc.Free(m.Idata) - m.Is = encoding.DecodeInt64Slice(idata) - m.Sels = encoding.DecodeInt64Slice(data) - m.Data = data[:length] - m.Sels = m.Sels[:length] - m.Is = m.Is[:length] - m.Idata = idata[:length] + copy(data, g.Data) + copy(idata, g.Idata) + proc.Free(g.Data) + proc.Free(g.Idata) + g.Is = encoding.DecodeInt64Slice(idata) + g.Sels = encoding.DecodeInt64Slice(data) + g.Data = data[:length] + g.Sels = g.Sels[:length] + g.Is = g.Is[:length] + g.Idata = idata[:length] } for _ = range matched { - m.Is = append(m.Is, idx) + g.Is = append(g.Is, idx) } - m.Sels = append(m.Sels, matched...) + g.Sels = append(g.Sels, matched...) } } else { - if len(m.Sels) > 0 { + if len(g.Sels) > 0 { for i := 0; i < n; i++ { if diffs[i] { remaining = append(remaining, sels[i]) @@ -478,7 +481,7 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec matched = append(matched, sels[i]) } } - if len(matched) > 0 && cap(m.Sels) == 0 { + if len(matched) > 0 && cap(g.Sels) == 0 { data, err := proc.Alloc(8) if err != nil { return nil, err @@ -488,12 +491,12 @@ func (m *Join) Fill(distinct bool, idx int64, sels, matched []int64, vecs []*vec proc.Free(data) return nil, err } - m.Data = data - m.Idata = idata - m.Is = encoding.DecodeInt64Slice(idata) - m.Sels = encoding.DecodeInt64Slice(data) - m.Is = append(m.Is, idx) - m.Sels = append(m.Sels, matched[0]) + g.Data = data + g.Idata = idata + g.Is = encoding.DecodeInt64Slice(idata) + g.Sels = encoding.DecodeInt64Slice(data) + g.Is[0] = idx + g.Sels[0] = matched[0] } } } diff --git a/pkg/hash/types.go b/pkg/hash/types.go index b7a29488e484dfd2bbe95c52b8e68cb10436101b..ea24b8daf010a0d15df4638becea956722d4c5fb 100644 --- a/pkg/hash/types.go +++ b/pkg/hash/types.go @@ -1,6 +1,12 @@ package hash -type Join struct { +type Group struct { + Sel int64 + Data []byte + Sels []int64 +} + +type JoinGroup struct { Idx int64 Sel int64 Data []byte @@ -8,9 +14,3 @@ type Join struct { Is []int64 Sels []int64 } - -type Group struct { - Sel int64 - Data []byte - Sels []int64 -} diff --git a/pkg/sql/colexec/hashgroup/group.go b/pkg/sql/colexec/hashgroup/group.go index 9fee700dc6dd66deafac2083f8b657afd77d95bc..ab738797fe3597612dd397bd6ccd7b596e3f5233 100644 --- a/pkg/sql/colexec/hashgroup/group.go +++ b/pkg/sql/colexec/hashgroup/group.go @@ -48,29 +48,24 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { bat := proc.Reg.Ax.(*batch.Batch) ctr := &n.Ctr gvecs := make([]*vector.Vector, len(n.Gs)) - if err := ctr.prefetch(n.Gs, bat, gvecs, proc); err != nil { + if err := bat.Prefetch(n.Gs, gvecs, proc); err != nil { + ctr.clean(bat, proc) return false, err } if len(bat.Sels) > 0 { if err := ctr.batchGroupSels(bat.Sels, gvecs, proc); err != nil { - ctr.clean(gvecs, proc) + ctr.clean(bat, proc) return false, err } } else { - uvecs := make([]*vector.Vector, len(n.Gs)) - if err := ctr.batchGroup(gvecs, uvecs, proc); err != nil { - ctr.clean(gvecs, proc) + if err := ctr.batchGroup(gvecs, proc); err != nil { + ctr.clean(bat, proc) return false, err } } - { - for _, vec := range gvecs { - vec.Free(proc) - } - } vecs := make([]*vector.Vector, len(n.Es)) - if err := ctr.prefetch(n.Attrs, bat, vecs, proc); err != nil { - ctr.clean(nil, proc) + if err := bat.Prefetch(n.Attrs, vecs, proc); err != nil { + ctr.clean(bat, proc) return false, err } rbat := batch.New(n.Rattrs) @@ -80,26 +75,16 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { } } if err := ctr.eval(ctr.vecs[0].Length(), n.Es, vecs, rbat.Vecs, proc); err != nil { - ctr.clean(vecs, proc) + ctr.clean(bat, proc) return false, err } - ctr.clean(vecs, proc) + ctr.vecs = nil + ctr.clean(bat, proc) proc.Reg.Ax = rbat register.FreeRegisters(proc) return false, nil } -func (ctr *Container) clean(vecs []*vector.Vector, proc *process.Process) { - for _, gs := range ctr.groups { - for _, g := range gs { - g.Free(proc) - } - } - for _, vec := range vecs { - vec.Free(proc) - } -} - func (ctr *Container) eval(length int, es []aggregation.Extend, vecs, rvecs []*vector.Vector, proc *process.Process) error { for i, e := range es { typ := aggregation.ReturnType(e.Op, e.Typ) @@ -115,18 +100,19 @@ func (ctr *Container) eval(length int, es []aggregation.Extend, vecs, rvecs []*v for _, g := range gs { e.Agg.Reset() if err := e.Agg.Fill(g.Sels, vecs[i]); err != nil { + proc.Free(data) return err } v, err := e.Agg.Eval(proc) if err != nil { + proc.Free(data) return err } vs[g.Sel] = v.Col.([]int8)[0] if v.Nsp.Contains(0) { vec.Nsp.Add(uint64(g.Sel)) } - proc.Free(g.Data) - g.Data = nil + v.Free(proc) } } rvecs[i] = vec @@ -137,30 +123,13 @@ func (ctr *Container) eval(length int, es []aggregation.Extend, vecs, rvecs []*v return nil } -func (ctr *Container) prefetch(attrs []string, bat *batch.Batch, vecs []*vector.Vector, proc *process.Process) error { - var err error - - for i, attr := range attrs { - if vecs[i], err = bat.GetVector(attr, proc); err != nil { - for j := 0; j < i; j++ { - vecs[j].Free(proc) - } - return err - } - } - return nil -} - -func (ctr *Container) batchGroup(vecs, uvecs []*vector.Vector, proc *process.Process) error { +func (ctr *Container) batchGroup(vecs []*vector.Vector, proc *process.Process) error { for i, j := 0, vecs[0].Length(); i < j; i += UnitLimit { length := j - i if length > UnitLimit { length = UnitLimit } - for k, vec := range vecs { - uvecs[k] = vec.Window(i, i+length) - } - if err := ctr.unitGroup(i, length, nil, uvecs, proc); err != nil { + if err := ctr.unitGroup(i, length, nil, vecs, proc); err != nil { return err } } @@ -255,3 +224,16 @@ func (ctr *Container) fillHashSels(count int, sels []int64, vecs []*vector.Vecto ctr.sels[slot] = append(ctr.sels[slot], sels[i]) } } + +func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) { + bat.Clean(proc) + fastmap.Pool.Put(ctr.slots) + for _, vec := range ctr.vecs { + vec.Free(proc) + } + for _, gs := range ctr.groups { + for _, g := range gs { + g.Free(proc) + } + } +} diff --git a/pkg/sql/colexec/hashjoin/join.go b/pkg/sql/colexec/hashjoin/join.go new file mode 100644 index 0000000000000000000000000000000000000000..2606ee0920c053bdb96988a1dc48cb2befe33e97 --- /dev/null +++ b/pkg/sql/colexec/hashjoin/join.go @@ -0,0 +1,150 @@ +package hashjoin + +import ( + "matrixbase/pkg/container/batch" + "matrixbase/pkg/container/vector" + "matrixbase/pkg/hash" + "matrixbase/pkg/vm/process" +) + +func Prepare(proc *process.Process, arg interface{}) error { + return nil +} + +// R 猕� S, S is the small relation +func Call(proc *process.Process, arg interface{}) (bool, error) { + n := arg.(Argument) + ctr := &n.Ctr + if err := ctr.build(n.Sattrs, n.Distinct, proc); err != nil { + return false, err + } + return false, nil +} + +func (ctr *Container) build(attrs []string, distinct bool, proc *process.Process) error { + var err error + + ch := proc.Reg.Cs[1] + for { + v := <-ch + if v == nil { + break + } + bat := v.(*batch.Batch) + bat.Reorder(attrs) + if err = bat.Prefetch(attrs, bat.Vecs, proc); err != nil { + return err + } + ctr.bats = append(ctr.bats, bat) + if len(bat.Sels) == 0 { + if err = ctr.fillBatch(distinct, bat.Vecs[:len(attrs)], proc); err != nil { + return err + } + } else { + if err = ctr.fillBatchSels(distinct, bat.Sels, bat.Vecs[:len(attrs)], proc); err != nil { + return err + } + } + } + return nil +} + +func (ctr *Container) fillBatch(distinct bool, vecs []*vector.Vector, proc *process.Process) error { + for i, j := 0, vecs[0].Length(); i < j; i += UnitLimit { + length := j - i + if length > UnitLimit { + length = UnitLimit + } + if err := ctr.fillUnit(distinct, i, length, nil, vecs, proc); err != nil { + return err + } + } + return nil +} + +func (ctr *Container) fillBatchSels(distinct bool, sels []int64, vecs []*vector.Vector, proc *process.Process) error { + for i, j := 0, len(sels); i < j; i += UnitLimit { + length := j - i + if length > UnitLimit { + length = UnitLimit + } + if err := ctr.fillUnit(distinct, 0, length, sels[i:i+length], vecs, proc); err != nil { + return err + } + } + return nil +} + +func (ctr *Container) fillUnit(distinct bool, start, count int, sels []int64, + vecs []*vector.Vector, proc *process.Process) error { + var err error + + { + copy(ctr.hashs[:count], OneUint64s[:count]) + if len(sels) == 0 { + ctr.fillHash(start, count, vecs) + } else { + ctr.fillHashSels(count, sels, vecs) + } + } + copy(ctr.diffs[:count], ZeroBools[:count]) + for i, hs := range ctr.slots.Ks { + for j, h := range hs { + remaining := ctr.sels[ctr.slots.Vs[i][j]] + if gs, ok := ctr.groups[h]; ok { + for _, g := range gs { + if remaining, err = g.Fill(distinct, remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil { + return err + } + copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)]) + } + } else { + ctr.groups[h] = make([]*hash.JoinGroup, 0, 8) + } + for len(remaining) > 0 { + g := hash.NewJoinGroup(int64(len(ctr.bats)-1), int64(remaining[0])) + ctr.groups[h] = append(ctr.groups[h], g) + if remaining, err = g.Fill(distinct, remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil { + return err + } + copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)]) + } + } + } + ctr.slots.Reset() + return nil +} + +func (ctr *Container) fillHash(start, count int, vecs []*vector.Vector) { + ctr.hashs = ctr.hashs[:count] + for _, vec := range vecs { + hash.Rehash(count, ctr.hashs, vec) + } + nextslot := 0 + for i, h := range ctr.hashs { + slot, ok := ctr.slots.Get(h) + if !ok { + slot = nextslot + ctr.slots.Set(h, slot) + nextslot++ + } + ctr.sels[slot] = append(ctr.sels[slot], int64(i+start)) + } +} + +func (ctr *Container) fillHashSels(count int, sels []int64, vecs []*vector.Vector) { + ctr.hashs = ctr.hashs[:count] + for _, vec := range vecs { + hash.RehashSels(count, sels, ctr.hashs, vec) + } + nextslot := 0 + for i, h := range ctr.hashs { + slot, ok := ctr.slots.Get(h) + if !ok { + slot = nextslot + ctr.slots.Set(h, slot) + nextslot++ + } + ctr.sels[slot] = append(ctr.sels[slot], sels[i]) + } +} diff --git a/pkg/sql/colexec/hashjoin/types.go b/pkg/sql/colexec/hashjoin/types.go index d3232d4bd8f6fd0c35b06dd754d4d86cf6f4f383..63cc03d5e22013af71bb101aa678611332d80f63 100644 --- a/pkg/sql/colexec/hashjoin/types.go +++ b/pkg/sql/colexec/hashjoin/types.go @@ -1 +1,36 @@ package hashjoin + +import ( + "matrixbase/pkg/container/batch" + "matrixbase/pkg/hash" + "matrixbase/pkg/intmap/fastmap" + "matrixbase/pkg/sql/join" +) + +const ( + UnitLimit = 1024 +) + +var ( + ZeroBools []bool + OneUint64s []uint64 +) + +type Container struct { + diffs []bool + matchs []int64 + hashs []uint64 + sels [][]int64 // sels + slots *fastmap.Map // hash code -> sels index + bats []*batch.Batch // s relation + groups map[uint64][]*hash.JoinGroup // hash code -> join list +} + +type Argument struct { + Distinct bool + Attrs []string + Rattrs []string + Sattrs []string + Ctr Container + JoinType join.JoinType +} diff --git a/pkg/sql/colexec/limit/limit.go b/pkg/sql/colexec/limit/limit.go index bb7dd2cc419d5637500f705662d57befd74aff58..e92a23fb4d15fcba6aa7a49c1bf5f3e41eca16ec 100644 --- a/pkg/sql/colexec/limit/limit.go +++ b/pkg/sql/colexec/limit/limit.go @@ -19,6 +19,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { if newSeen >= n.Limit { // limit - seen bat.Sels = bat.Sels[:n.Limit-n.Seen] proc.Reg.Ax = bat + register.FreeRegisters(proc) return true, nil } n.Seen = newSeen @@ -28,17 +29,20 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { } length, err := bat.Length(proc) if err != nil { + clean(bat, proc) return false, err } newSeen := n.Seen + length if newSeen >= n.Limit { // limit - seen data, sels, err := newSels(int64(n.Limit-n.Seen), proc) if err != nil { + clean(bat, proc) return true, err } bat.Sels = sels bat.SelsData = data proc.Reg.Ax = bat + register.FreeRegisters(proc) return true, nil } n.Seen = newSeen @@ -58,3 +62,8 @@ func newSels(count int64, proc *process.Process) ([]byte, []int64, error) { } return data, sels, nil } + +func clean(bat *batch.Batch, proc *process.Process) { + bat.Clean(proc) + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/mergesum/mergesum.go b/pkg/sql/colexec/mergesum/summarize.go similarity index 70% rename from pkg/sql/colexec/mergesum/mergesum.go rename to pkg/sql/colexec/mergesum/summarize.go index eba2f2ef584f4009afc80bf920199d5bd781b2db..16b77028f8d5c935ca199524c046edd8c67345fd 100644 --- a/pkg/sql/colexec/mergesum/mergesum.go +++ b/pkg/sql/colexec/mergesum/summarize.go @@ -2,6 +2,7 @@ package mergesum import ( "matrixbase/pkg/container/batch" + "matrixbase/pkg/encoding" "matrixbase/pkg/vm/process" "matrixbase/pkg/vm/register" ) @@ -33,21 +34,26 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { return false, err } } - bat.Free(proc) + bat.Clean(proc) } - rbat := batch.New(n.Attrs) + bat := batch.New(n.Attrs) { var err error for i, e := range n.Es { - rbat.Vecs[i], err = e.Agg.Eval(proc) - if err != nil { - rbat.Vecs = rbat.Vecs[:i] - rbat.Free(proc) + if bat.Vecs[i], err = e.Agg.Eval(proc); err != nil { + bat.Vecs = bat.Vecs[:i] + clean(bat, proc) return false, err } + copy(bat.Vecs[i].Data, encoding.EncodeUint64(1+proc.Refer[n.Attrs[i]])) } } - proc.Reg.Ax = rbat + proc.Reg.Ax = bat register.FreeRegisters(proc) return false, nil } + +func clean(bat *batch.Batch, proc *process.Process) { + bat.Clean(proc) + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/output/output.go b/pkg/sql/colexec/output/output.go index f21461e074d8c5e22a38ced07afd74b26b6738cf..3972e855ce0013ba827e3e31406bba334639a6b8 100644 --- a/pkg/sql/colexec/output/output.go +++ b/pkg/sql/colexec/output/output.go @@ -9,6 +9,6 @@ import ( func Call(proc *process.Process, arg interface{}) (bool, error) { bat := proc.Reg.Ax.(*batch.Batch) fmt.Printf("%s\n", bat) - bat.Free(proc) + bat.Clean(proc) return false, nil } diff --git a/pkg/sql/colexec/projection/projection.go b/pkg/sql/colexec/projection/projection.go index b2b2cee54cd92584f4d02bfc3c9d3a035a702a18..8f676ac4b53689ce05b950eeb76c6d8f0d72b970 100644 --- a/pkg/sql/colexec/projection/projection.go +++ b/pkg/sql/colexec/projection/projection.go @@ -2,6 +2,7 @@ package projection import ( "matrixbase/pkg/container/batch" + "matrixbase/pkg/encoding" "matrixbase/pkg/vm/process" "matrixbase/pkg/vm/register" ) @@ -11,23 +12,45 @@ func Prepare(_ *process.Process, _ interface{}) error { } func Call(proc *process.Process, arg interface{}) (bool, error) { - es := arg.(Argument).Es - attrs := arg.(Argument).Attrs - rbat := batch.New(attrs) + var err error + + n := arg.(Argument) + rbat := batch.New(n.Attrs) bat := proc.Reg.Ax.(*batch.Batch) - for i := range attrs { - vec, _, err := es[i].Eval(bat, proc) - if err != nil { + for i := range n.Attrs { + if rbat.Vecs[i], _, err = n.Es[i].Eval(bat, proc); err != nil { rbat.Vecs = rbat.Vecs[:i] - rbat.Free(proc) + clean(bat, rbat, proc) return false, err } - rbat.Vecs[i] = vec + copy(rbat.Vecs[i].Data, encoding.EncodeUint64(1+proc.Refer[n.Attrs[i]])) + } + { + for _, e := range n.Es { + bat.Reduce(e.Attributes(), proc) + } } - bat.Free(proc) - rbat.Sels = bat.Sels - rbat.SelsData = bat.SelsData - proc.Reg.Ax = rbat + { + mp := make(map[string]uint8) + { + for _, attr := range bat.Attrs { + mp[attr] = 0 + } + } + for i, attr := range rbat.Attrs { + if _, ok := mp[attr]; !ok { + bat.Attrs = append(bat.Attrs, attr) + bat.Vecs = append(bat.Vecs, rbat.Vecs[i]) + } + } + } + proc.Reg.Ax = bat register.FreeRegisters(proc) return false, nil } + +func clean(bat, rbat *batch.Batch, proc *process.Process) { + bat.Clean(proc) + rbat.Clean(proc) + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/restrict/restrict.go b/pkg/sql/colexec/restrict/restrict.go index 821e97e429050b250bf19fdbaed385094f38d8d8..50102a7f065789a08d176110c0017ac9f752bf68 100644 --- a/pkg/sql/colexec/restrict/restrict.go +++ b/pkg/sql/colexec/restrict/restrict.go @@ -17,6 +17,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { bat := proc.Reg.Ax.(*batch.Batch) vec, _, err := n.E.Eval(bat, proc) if err != nil { + clean(bat, proc) return false, err } bat.SelsData = vec.Data @@ -26,3 +27,8 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { register.FreeRegisters(proc) return false, nil } + +func clean(bat *batch.Batch, proc *process.Process) { + bat.Clean(proc) + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/summarize/summarize.go b/pkg/sql/colexec/summarize/summarize.go index 72ee3a9b38c3ad1e5803abcac6a0d2d08da0269e..a4d7768213f0ccba24995f98f5a39b4956938aa7 100644 --- a/pkg/sql/colexec/summarize/summarize.go +++ b/pkg/sql/colexec/summarize/summarize.go @@ -2,6 +2,7 @@ package summarize import ( "matrixbase/pkg/container/batch" + "matrixbase/pkg/encoding" "matrixbase/pkg/vm/process" "matrixbase/pkg/vm/register" ) @@ -22,20 +23,30 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { for i, e := range n.Es { vec, err := bat.GetVector(e.Name, proc) if err != nil { + rbat.Vecs = rbat.Vecs[:i] + clean(bat, rbat, proc) return false, err } if err := e.Agg.Fill(bat.Sels, vec); err != nil { + rbat.Vecs = rbat.Vecs[:i] + clean(bat, rbat, proc) return false, err } - rbat.Vecs[i], err = e.Agg.Eval(proc) - if err != nil { + if rbat.Vecs[i], err = e.Agg.Eval(proc); err != nil { rbat.Vecs = rbat.Vecs[:i] - rbat.Free(proc) + clean(bat, rbat, proc) return false, err } + copy(rbat.Vecs[i].Data, encoding.EncodeUint64(1+proc.Refer[n.Attrs[i]])) } - bat.Free(proc) + bat.Clean(proc) proc.Reg.Ax = rbat register.FreeRegisters(proc) return false, nil } + +func clean(bat, rbat *batch.Batch, proc *process.Process) { + bat.Clean(proc) + rbat.Clean(proc) + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/top/top.go b/pkg/sql/colexec/top/top.go index 0f26ed41eb7ef81c81f8705b988cb7a7145ec86c..e166d39bb937bf9b738e30f5ecb5d144f9ae2dda 100644 --- a/pkg/sql/colexec/top/top.go +++ b/pkg/sql/colexec/top/top.go @@ -12,17 +12,25 @@ import ( func Prepare(proc *process.Process, arg interface{}) error { n := arg.(Argument) - data, err := proc.Alloc(n.Limit * 8) - if err != nil { - return err + { + n.Attrs = make([]string, len(n.Fs)) + for i, f := range n.Fs { + n.Attrs[i] = f.Attr + } } - sels := encoding.DecodeInt64Slice(data) - for i := int64(0); i < n.Limit; i++ { - sels[i] = i + { + data, err := proc.Alloc(n.Limit * 8) + if err != nil { + return err + } + sels := encoding.DecodeInt64Slice(data) + for i := int64(0); i < n.Limit; i++ { + sels[i] = i + } + n.Ctr.sels = sels + n.Ctr.selsData = data } n.Ctr.n = len(n.Fs) - n.Ctr.sels = sels - n.Ctr.selsData = data n.Ctr.vecs = make([]*vector.Vector, len(n.Fs)) n.Ctr.cmps = make([]compare.Compare, len(n.Fs)) for i, f := range n.Fs { @@ -36,22 +44,14 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { n := arg.(Argument) bat := proc.Reg.Ax.(*batch.Batch) - for i, f := range n.Fs { - n.Ctr.vecs[i], err = bat.GetVector(f.Attr, proc) - if err != nil { - for j := 0; j < i; j++ { - n.Ctr.vecs[i].Free(proc) - } - return false, err - } + if err = bat.Prefetch(n.Attrs, n.Ctr.vecs, proc); err != nil { + clean(&n.Ctr, bat, proc) + return false, err } - processBatch(bat, n) + processBatch(n, bat) data, err := proc.Alloc(int64(len(n.Ctr.sels)) * 8) if err != nil { - for _, vec := range n.Ctr.vecs { - vec.Free(proc) - } - proc.Free(n.Ctr.selsData) + clean(&n.Ctr, bat, proc) return false, err } sels := encoding.DecodeInt64Slice(data) @@ -63,12 +63,13 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { } bat.Sels = sels bat.SelsData = data + bat.Reduce(n.Attrs, proc) proc.Reg.Ax = bat register.FreeRegisters(proc) return false, nil } -func processBatch(bat *batch.Batch, n Argument) { +func processBatch(n Argument, bat *batch.Batch) { if length := int64(len(bat.Sels)); length > 0 { if length < n.Limit { for i := int64(0); i < length; i++ { @@ -104,3 +105,13 @@ func processBatch(bat *batch.Batch, n Argument) { heap.Fix(&n.Ctr, 0) } } + +func clean(ctr *Container, bat *batch.Batch, proc *process.Process) { + if ctr.selsData != nil { + proc.Free(ctr.selsData) + ctr.sels = nil + ctr.selsData = nil + } + bat.Clean(proc) + register.FreeRegisters(proc) +} diff --git a/pkg/sql/colexec/top/types.go b/pkg/sql/colexec/top/types.go index c0d37136798419653b18ba3b8e4466c237dd3015..cf007562479f400f910fb0f37b3b5175915ed778 100644 --- a/pkg/sql/colexec/top/types.go +++ b/pkg/sql/colexec/top/types.go @@ -33,6 +33,7 @@ type Field struct { type Argument struct { Limit int64 Fs []Field + Attrs []string Ctr Container } diff --git a/pkg/sql/colexec/unittest/projection_test.go b/pkg/sql/colexec/unittest/projection_test.go index 1521c110e919c29d928eb877661cb87cf11a47dd..908d033d5df9da07a02f5a03b7d9ea319147081f 100644 --- a/pkg/sql/colexec/unittest/projection_test.go +++ b/pkg/sql/colexec/unittest/projection_test.go @@ -21,6 +21,9 @@ func TestProjection(t *testing.T) { var ins vm.Instructions proc := process.New(guest.New(1<<20, host.New(1<<20)), mempool.New(1<<32, 8)) + { + proc.Refer = make(map[string]uint64) + } { var es []extend.Extend @@ -29,6 +32,9 @@ func TestProjection(t *testing.T) { } ins = append(ins, vm.Instruction{vm.Projection, projection.Argument{[]string{"uid"}, es}}) ins = append(ins, vm.Instruction{vm.Output, nil}) + { + proc.Refer["uid"] = 1 + } } p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins) p.Run(segments(proc), proc) diff --git a/pkg/vm/pipeline/pipeline.go b/pkg/vm/pipeline/pipeline.go index ca986ed50dd6cfe0c46240d0edc4d30ebf5ded4d..4fd44cd48f0463b228e4868bc015e85bc4d5a439 100644 --- a/pkg/vm/pipeline/pipeline.go +++ b/pkg/vm/pipeline/pipeline.go @@ -25,7 +25,6 @@ func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, erro } proc.Reg.Ax = bat if end, err := vm.Run(p.ins, proc); err != nil || end { - bat.Clean(proc) return end, err } } diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go index be8fbc34c31c26e66e547d2baf4e4472c96482b0..a5b09b1c71c0457e336fde71b53b2bfedf52317d 100644 --- a/pkg/vm/process/types.go +++ b/pkg/vm/process/types.go @@ -22,7 +22,8 @@ type Register struct { } type Process struct { - Reg Register - Gm *guest.Mmu - Mp *mempool.Mempool + Reg Register + Gm *guest.Mmu + Mp *mempool.Mempool + Refer map[string]uint64 }