diff --git a/pkg/container/vector/types.go b/pkg/container/vector/types.go index 506e8df7f2742b883c6f6f9e067dd4afd2ad450a..98482a450de32a00cd39e1edb762c4dd070d0409 100644 --- a/pkg/container/vector/types.go +++ b/pkg/container/vector/types.go @@ -20,6 +20,8 @@ type Vector interface { Shuffle([]int64) Vector + UnionOne(Vector, int64) error + Read([]byte) error Show() ([]byte, error) diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go index f56f246cd6715421281ae1459dd6269fc3917b3f..89d6e11b193c574fa5e8c44ae850beda5ac73fd7 100644 --- a/pkg/container/vector/vector.go +++ b/pkg/container/vector/vector.go @@ -291,6 +291,10 @@ func (v *Vector) Shuffle(sels []int64) *Vector { return nil } +func (v *Vector) UnionOne(w *Vector, sel int64, proc *process.Process) error { + return nil +} + func (v *Vector) Show() ([]byte, error) { var buf bytes.Buffer diff --git a/pkg/hash/group.go b/pkg/hash/group.go new file mode 100644 index 0000000000000000000000000000000000000000..84c5d9576c77f99ec26489e9f7cd51f32f514a42 --- /dev/null +++ b/pkg/hash/group.go @@ -0,0 +1,447 @@ +package hash + +import ( + "bytes" + "matrixbase/pkg/container/types" + "matrixbase/pkg/container/vector" + "matrixbase/pkg/encoding" + "matrixbase/pkg/vm/process" +) + +func NewGroup(sel int64) *Group { + return &Group{ + Sel: sel, + } +} + +func (g *Group) Free(proc *process.Process) { + if g.Data != nil { + proc.Free(g.Data) + } +} + +func (g *Group) Fill(sels, matched []int64, vecs, gvecs []*vector.Vector, diffs []bool, proc *process.Process) ([]int64, error) { + for i, vec := range vecs { + switch vec.Typ.Oid { + case types.T_int8: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]int8) + gv := gvec.Col.([]int8)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]int8) + gv := gvec.Col.([]int8)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_int16: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]int16) + gv := gvec.Col.([]int16)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]int16) + gv := gvec.Col.([]int16)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_int32: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]int32) + gv := gvec.Col.([]int32)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]int32) + gv := gvec.Col.([]int32)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_int64: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]int64) + gv := gvec.Col.([]int64)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]int64) + gv := gvec.Col.([]int64)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_uint8: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]uint8) + gv := gvec.Col.([]uint8)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]uint8) + gv := gvec.Col.([]uint8)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_uint16: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]uint16) + gv := gvec.Col.([]uint16)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]uint16) + gv := gvec.Col.([]uint16)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_uint32: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]uint32) + gv := gvec.Col.([]uint32)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]uint32) + gv := gvec.Col.([]uint32)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_uint64: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]uint64) + gv := gvec.Col.([]uint64)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]uint64) + gv := gvec.Col.([]uint64)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_float32: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]float32) + gv := gvec.Col.([]float32)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]float32) + gv := gvec.Col.([]float32)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_float64: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.([]float64) + gv := gvec.Col.([]float64)[g.Sel] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.([]float64) + gv := gvec.Col.([]float64)[g.Sel] + for i, sel := range sels { + diffs[i] = diffs[i] || (gv != vs[sel]) + } + } + case types.T_decimal: + case types.T_date: + case types.T_datetime: + case types.T_char: + case types.T_varchar: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.(*types.Bytes) + gvs := gvec.Col.(*types.Bytes) + gv := gvs.Data[gvs.Os[g.Sel] : gvs.Os[g.Sel]+gvs.Ns[g.Sel]] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (bytes.Compare(gv, vs.Data[vs.Os[sel]:vs.Os[sel]+vs.Ns[sel]]) != 0) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.(*types.Bytes) + gvs := gvec.Col.(*types.Bytes) + gv := gvs.Data[gvs.Os[g.Sel] : gvs.Os[g.Sel]+gvs.Ns[g.Sel]] + for i, sel := range sels { + diffs[i] = diffs[i] || (bytes.Compare(gv, vs.Data[vs.Os[sel]:vs.Os[sel]+vs.Ns[sel]]) != 0) + } + } + case types.T_json: + gvec := gvecs[i] + lnull := vec.Nsp.Any() + rnull := gvec.Nsp.Contains(uint64(g.Sel)) + switch { + case lnull && rnull: + for i, sel := range sels { + if !vec.Nsp.Contains(uint64(sel)) { // only null eq null + diffs[i] = true + } + } + case lnull && !rnull: // null is not value + vs := vec.Col.(*types.Bytes) + gvs := gvec.Col.(*types.Bytes) + gv := gvs.Data[gvs.Os[g.Sel] : gvs.Os[g.Sel]+gvs.Ns[g.Sel]] + for i, sel := range sels { + if vec.Nsp.Contains(uint64(sel)) { + diffs[i] = true + } else { + diffs[i] = diffs[i] || (bytes.Compare(gv, vs.Data[vs.Os[sel]:vs.Os[sel]+vs.Ns[sel]]) != 0) + } + } + case !lnull && rnull: // null is not value + for i := range sels { + diffs[i] = true + } + default: + vs := vec.Col.(*types.Bytes) + gvs := gvec.Col.(*types.Bytes) + gv := gvs.Data[gvs.Os[g.Sel] : gvs.Os[g.Sel]+gvs.Ns[g.Sel]] + for i, sel := range sels { + diffs[i] = diffs[i] || (bytes.Compare(gv, vs.Data[vs.Os[sel]:vs.Os[sel]+vs.Ns[sel]]) != 0) + } + } + } + } + n := len(sels) + matched = matched[:0] + remaining := sels[:0] + for i := 0; i < n; i++ { + if diffs[i] { + remaining = append(remaining, sels[i]) + } else { + matched = append(matched, sels[i]) + } + } + if len(matched) > 0 { + length := len(g.Sels) + len(matched) + if cap(g.Sels) < length { + data, err := proc.Alloc(int64(length) * 8) + if err != nil { + return nil, err + } + copy(data, g.Data) + proc.Free(g.Data) + g.Sels = encoding.DecodeInt64Slice(data) + g.Data = data[:length] + g.Sels = g.Sels[:length] + } + g.Sels = append(g.Sels, matched...) + } + return remaining, nil + +} diff --git a/pkg/hash/hash.go b/pkg/hash/hash.go new file mode 100644 index 0000000000000000000000000000000000000000..7adc22fef068b4445bfeb89b0e5fb548f0da6eab --- /dev/null +++ b/pkg/hash/hash.go @@ -0,0 +1 @@ +package hash diff --git a/pkg/hash/rehash.go b/pkg/hash/rehash.go new file mode 100644 index 0000000000000000000000000000000000000000..5493431f58f04c0ececf45bc0ce052486dc31544 --- /dev/null +++ b/pkg/hash/rehash.go @@ -0,0 +1,9 @@ +package hash + +import "matrixbase/pkg/container/vector" + +func Rehash(count int, hs []uint64, vec *vector.Vector) { +} + +func RehashSels(count int, sels []int64, hs []uint64, vec *vector.Vector) { +} diff --git a/pkg/hash/types.go b/pkg/hash/types.go new file mode 100644 index 0000000000000000000000000000000000000000..44a8fa3918916b076d532b8d647cdaeb5135db44 --- /dev/null +++ b/pkg/hash/types.go @@ -0,0 +1,7 @@ +package hash + +type Group struct { + Sel int64 + Data []byte + Sels []int64 +} diff --git a/pkg/sql/colexec/hashgroup/group.go b/pkg/sql/colexec/hashgroup/group.go new file mode 100644 index 0000000000000000000000000000000000000000..758262793b84e6735060b5301aff8c529c58aaef --- /dev/null +++ b/pkg/sql/colexec/hashgroup/group.go @@ -0,0 +1,103 @@ +package hashgroup + +import ( + "matrixbase/pkg/container/vector" + "matrixbase/pkg/hash" + "matrixbase/pkg/vm/process" +) + +func init() { + ZeroBools = make([]bool, UnitLimit) + OneUint64s = make([]uint64, UnitLimit) + for i := range OneUint64s { + OneUint64s[i] = 1 + } +} + +func Prepare(proc *process.Process, arg interface{}) error { + return nil +} + +func Call(proc *process.Process, arg interface{}) (bool, error) { + return false, nil +} + +func (ctr *Container) group(count int, sels []int64, vecs []*vector.Vector, proc *process.Process) error { + var err error + + { + copy(ctr.hashs[:count], OneUint64s[:count]) + if len(sels) == 0 { + ctr.fillHash(count, vecs) + } else { + ctr.fillHashSels(count, sels, vecs) + } + } + copy(ctr.diffs[:count], ZeroBools[:count]) + for _, h := range ctr.hashs { + slot, ok := ctr.slots[h] + if !ok { + continue + } + remaining := ctr.sels[slot] + if gs, ok := ctr.groups[h]; ok { + for _, g := range gs { + if remaining, err = g.Fill(remaining, ctr.matchs, vecs, ctr.vecs, ctr.diffs, proc); err != nil { + return err + } + copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)]) + } + } else { + ctr.groups[h] = make([]*hash.Group, 0, 8) + } + for len(remaining) > 0 { + g := hash.NewGroup(int64(ctr.vecs[0].Length())) + for i, vec := range vecs { + ctr.vecs[i].UnionOne(vec, remaining[0], proc) + } + ctr.groups[h] = append(ctr.groups[h], g) + if remaining, err = g.Fill(remaining, ctr.matchs, vecs, ctr.vecs, ctr.diffs, proc); err != nil { + return err + } + copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)]) + + } + ctr.sels[slot] = ctr.sels[slot][:0] + delete(ctr.slots, h) + } + return nil +} + +func (ctr *Container) fillHash(count int, vecs []*vector.Vector) { + ctr.hashs = ctr.hashs[:count] + for _, vec := range vecs { + hash.Rehash(count, ctr.hashs, vec) + } + nextslot := 0 + for i, h := range ctr.hashs { + slot, ok := ctr.slots[h] + if !ok { + slot = nextslot + ctr.slots[h] = slot + nextslot++ + } + ctr.sels[slot] = append(ctr.sels[slot], int64(i)) + } +} + +func (ctr *Container) fillHashSels(count int, sels []int64, vecs []*vector.Vector) { + ctr.hashs = ctr.hashs[:count] + for _, vec := range vecs { + hash.RehashSels(count, sels, ctr.hashs, vec) + } + nextslot := 0 + for i, h := range ctr.hashs { + slot, ok := ctr.slots[h] + if !ok { + slot = nextslot + ctr.slots[h] = slot + nextslot++ + } + ctr.sels[slot] = append(ctr.sels[slot], sels[i]) + } +} diff --git a/pkg/sql/colexec/hashgroup/types.go b/pkg/sql/colexec/hashgroup/types.go new file mode 100644 index 0000000000000000000000000000000000000000..be0db8424ff8ff7e9cc702f98b06d5b76c6c85be --- /dev/null +++ b/pkg/sql/colexec/hashgroup/types.go @@ -0,0 +1,32 @@ +package hashgroup + +import ( + "matrixbase/pkg/container/vector" + "matrixbase/pkg/hash" + "matrixbase/pkg/sql/colexec/aggregation" +) + +const ( + UnitLimit = 1024 +) + +var ( + ZeroBools []bool + OneUint64s []uint64 +) + +type Container struct { + diffs []bool + matchs []int64 + hashs []uint64 + sels [][]int64 // sels + slots map[uint64]int // hash code -> sels index + vecs []*vector.Vector + groups map[uint64][]*hash.Group // hash code -> group list +} + +type Argument struct { + Attrs []string + Ctr Container + Es []aggregation.Extend +}