diff --git a/pkg/sql/colexec/agg/agg.go b/pkg/sql/colexec/agg/agg.go index b4d906c68f2fb6d35ad8c8f6de3a869c01b3ed45..922150c1f3c9c7ca91b01e39759a3d50e853bb0e 100644 --- a/pkg/sql/colexec/agg/agg.go +++ b/pkg/sql/colexec/agg/agg.go @@ -25,16 +25,18 @@ import ( func NewUnaryAgg[T1, T2 any](priv any, isCount bool, ityp, otyp types.Type, grows func(int), eval func([]T2) []T2, merge func(int64, int64, T2, T2, bool, bool, any) (T2, bool), - fill func(int64, T1, T2, int64, bool, bool) (T2, bool)) Agg[*UnaryAgg[T1, T2]] { + fill func(int64, T1, T2, int64, bool, bool) (T2, bool), + batchFill func(any, any, int64, int64, []uint64, []int64, *nulls.Nulls) error) Agg[*UnaryAgg[T1, T2]] { return &UnaryAgg[T1, T2]{ - priv: priv, - otyp: otyp, - eval: eval, - fill: fill, - merge: merge, - grows: grows, - isCount: isCount, - ityps: []types.Type{ityp}, + priv: priv, + otyp: otyp, + eval: eval, + fill: fill, + merge: merge, + grows: grows, + batchFill: batchFill, + isCount: isCount, + ityps: []types.Type{ityp}, } } @@ -140,6 +142,24 @@ func (a *UnaryAgg[T1, T2]) BatchFill(start int64, os []uint8, vps []uint64, zs [ return nil } vs := vector.GetColumn[T1](vec) + if a.batchFill != nil { + if err := a.batchFill(a.vs, vs, start, int64(len(os)), vps, zs, vec.GetNulls()); err != nil { + return err + } + nsp := vec.GetNulls() + if nsp.Any() { + for i := range os { + if !nsp.Contains(uint64(i) + uint64(start)) { + a.es[vps[i]-1] = false + } + } + } else { + for i := range os { + a.es[vps[i]-1] = false + } + } + return nil + } for i := range os { hasNull := vec.GetNulls().Contains(uint64(i) + uint64(start)) if vps[i] == 0 { diff --git a/pkg/sql/colexec/agg/anyvalue/anyvalue_test.go b/pkg/sql/colexec/agg/anyvalue/anyvalue_test.go index 30fcdabc49eea0372ff7a3de8b3c5d9854f4fa0a..174df2ab565a74835ed82904ebdaea2e88886ca8 100644 --- a/pkg/sql/colexec/agg/anyvalue/anyvalue_test.go +++ b/pkg/sql/colexec/agg/anyvalue/anyvalue_test.go @@ -44,7 +44,7 @@ func TestAnyvalue(t *testing.T) { vec2 := testutil.NewVector(Rows, testTyp, m, false, vs2) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(a, true, testTyp, testTyp, a.Grows, a.Eval, a.Merge, a.Fill) + agg := agg.NewUnaryAgg(a, true, testTyp, testTyp, a.Grows, a.Eval, a.Merge, a.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -57,13 +57,13 @@ func TestAnyvalue(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(a2, true, testTyp, testTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill) + agg0 := agg.NewUnaryAgg(a2, true, testTyp, testTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(a3, true, testTyp, testTyp, a3.Grows, a3.Eval, a3.Merge, a3.Fill) + agg1 := agg.NewUnaryAgg(a3, true, testTyp, testTyp, a3.Grows, a3.Eval, a3.Merge, a3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -100,7 +100,7 @@ func TestDecimalAnyva(t *testing.T) { vec2 := testutil.MakeDecimal128Vector(input2, nil, testTyp) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(da, true, testTyp, testTyp, da.Grows, da.Eval, da.Merge, da.Fill) + agg := agg.NewUnaryAgg(da, true, testTyp, testTyp, da.Grows, da.Eval, da.Merge, da.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -113,13 +113,13 @@ func TestDecimalAnyva(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(da2, true, testTyp, testTyp, da2.Grows, da2.Eval, da2.Merge, da2.Fill) + agg0 := agg.NewUnaryAgg(da2, true, testTyp, testTyp, da2.Grows, da2.Eval, da2.Merge, da2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(da3, true, testTyp, testTyp, da3.Grows, da3.Eval, da3.Merge, da3.Fill) + agg1 := agg.NewUnaryAgg(da3, true, testTyp, testTyp, da3.Grows, da3.Eval, da3.Merge, da3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/approxcd/approxcd_test.go b/pkg/sql/colexec/agg/approxcd/approxcd_test.go index 3dc341f45ac411bf3fc84e6ee6d814e22ec18195..a1429e42ed96eb9b4444aad5177c2622c9679024 100644 --- a/pkg/sql/colexec/agg/approxcd/approxcd_test.go +++ b/pkg/sql/colexec/agg/approxcd/approxcd_test.go @@ -41,7 +41,7 @@ func TestApproxcdCount(t *testing.T) { vec2 := testutil.NewVector(Rows, testTyp, m, false, vs2) { a := NewApproxc[int8]() - agg := agg.NewUnaryAgg(a, true, testTyp, retTyp, a.Grows, a.Eval, a.Merge, a.Fill) + agg := agg.NewUnaryAgg(a, true, testTyp, retTyp, a.Grows, a.Eval, a.Merge, a.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -55,13 +55,13 @@ func TestApproxcdCount(t *testing.T) { { a1 := NewApproxc[int8]() a2 := NewApproxc[int8]() - agg0 := agg.NewUnaryAgg(a1, true, testTyp, retTyp, a1.Grows, a1.Eval, a1.Merge, a1.Fill) + agg0 := agg.NewUnaryAgg(a1, true, testTyp, retTyp, a1.Grows, a1.Eval, a1.Merge, a1.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(a2, true, testTyp, retTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill) + agg1 := agg.NewUnaryAgg(a2, true, testTyp, retTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/avg/avg.go b/pkg/sql/colexec/agg/avg/avg.go index a1498892981b0ea084d661977436616f36806fee..f952650399d20e32e4201c2cac269ec4408b50eb 100644 --- a/pkg/sql/colexec/agg/avg/avg.go +++ b/pkg/sql/colexec/agg/avg/avg.go @@ -15,7 +15,9 @@ package avg import ( + "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/vectorize/sum" ) func ReturnType(typs []types.Type) types.Type { @@ -112,6 +114,19 @@ func (a *Decimal64Avg) Merge(xIndex int64, yIndex int64, x types.Decimal128, y t return x, xEmpty } +func (a *Decimal64Avg) BatchFill(rs, vs any, start, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + if err := sum.Decimal64Sum128(rs.([]types.Decimal128), vs.([]types.Decimal64), start, count, vps, zs, nsp); err != nil { + return err + } + for i := int64(0); i < count; i++ { + if nsp.Contains(uint64(i + start)) { + continue + } + j := vps[i] - 1 + a.cnts[j] += zs[j] + } + return nil +} func NewD128Avg() *Decimal128Avg { return &Decimal128Avg{} } @@ -149,3 +164,17 @@ func (a *Decimal128Avg) Merge(xIndex int64, yIndex int64, x types.Decimal128, y return x, xEmpty } + +func (a *Decimal128Avg) BatchFill(rs, vs any, start, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + if err := sum.Decimal128Sum(rs.([]types.Decimal128), vs.([]types.Decimal128), start, count, vps, zs, nsp); err != nil { + return err + } + for i := int64(0); i < count; i++ { + if nsp.Contains(uint64(i + start)) { + continue + } + j := vps[i] - 1 + a.cnts[j] += zs[j] + } + return nil +} diff --git a/pkg/sql/colexec/agg/avg/avg_test.go b/pkg/sql/colexec/agg/avg/avg_test.go index eb632e82f3db7d4b6834a7c2b2a918834463fd22..966a922806236c231951c8121c6cf142c794c9ff 100644 --- a/pkg/sql/colexec/agg/avg/avg_test.go +++ b/pkg/sql/colexec/agg/avg/avg_test.go @@ -47,7 +47,7 @@ func TestAvg(t *testing.T) { expected2 := []float64{14.5} { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(a1, true, testTyp, retTyp, a1.Grows, a1.Eval, a1.Merge, a1.Fill) + agg := agg.NewUnaryAgg(a1, true, testTyp, retTyp, a1.Grows, a1.Eval, a1.Merge, a1.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -60,13 +60,13 @@ func TestAvg(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(a2, true, testTyp, retTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill) + agg0 := agg.NewUnaryAgg(a2, true, testTyp, retTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec1}) } - agg1 := agg.NewUnaryAgg(a3, true, testTyp, retTyp, a3.Grows, a3.Eval, a3.Merge, a3.Fill) + agg1 := agg.NewUnaryAgg(a3, true, testTyp, retTyp, a3.Grows, a3.Eval, a3.Merge, a3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -103,7 +103,7 @@ func TestDecimal128Avg(t *testing.T) { vec2 := testutil.MakeDecimal128Vector(input2, nil, testTyp) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(a1, true, testTyp, testTyp, a1.Grows, a1.Eval, a1.Merge, a1.Fill) + agg := agg.NewUnaryAgg(a1, true, testTyp, testTyp, a1.Grows, a1.Eval, a1.Merge, a1.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -116,13 +116,13 @@ func TestDecimal128Avg(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(a2, true, testTyp, testTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill) + agg0 := agg.NewUnaryAgg(a2, true, testTyp, testTyp, a2.Grows, a2.Eval, a2.Merge, a2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(a3, true, testTyp, testTyp, a3.Grows, a3.Eval, a3.Merge, a3.Fill) + agg1 := agg.NewUnaryAgg(a3, true, testTyp, testTyp, a3.Grows, a3.Eval, a3.Merge, a3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/bit_and/bit_and_test.go b/pkg/sql/colexec/agg/bit_and/bit_and_test.go index 995b6924e27f41c7947174c29b8120185c192ada..ff765b746f17345d3aa5c9be90518239fb7ea4a4 100644 --- a/pkg/sql/colexec/agg/bit_and/bit_and_test.go +++ b/pkg/sql/colexec/agg/bit_and/bit_and_test.go @@ -36,7 +36,7 @@ func TestBitAnd(t *testing.T) { m := mheap.New(guest.New(1<<30, host.New(1<<30))) vec := testutil.NewVector(Rows, types.New(types.T_int8, 0, 0, 0), m, false, nil) { - agg := agg.NewUnaryAgg(ba, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), ba.Grows, ba.Eval, ba.Merge, ba.Fill) + agg := agg.NewUnaryAgg(ba, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), ba.Grows, ba.Eval, ba.Merge, ba.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -48,13 +48,13 @@ func TestBitAnd(t *testing.T) { v.Free(m) } { - agg0 := agg.NewUnaryAgg(ba, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), ba.Grows, ba.Eval, ba.Merge, ba.Fill) + agg0 := agg.NewUnaryAgg(ba, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), ba.Grows, ba.Eval, ba.Merge, ba.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(ba, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), ba.Grows, ba.Eval, ba.Merge, ba.Fill) + agg1 := agg.NewUnaryAgg(ba, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_uint64, 0, 0, 0), ba.Grows, ba.Eval, ba.Merge, ba.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/bit_or/bit_or_test.go b/pkg/sql/colexec/agg/bit_or/bit_or_test.go index ee899c881d11fbc862f96621920a428699393519..70060a291794b252e3cadf6f108c2b69e2a64ce8 100644 --- a/pkg/sql/colexec/agg/bit_or/bit_or_test.go +++ b/pkg/sql/colexec/agg/bit_or/bit_or_test.go @@ -36,7 +36,7 @@ func TestBitOr(t *testing.T) { m := mheap.New(guest.New(1<<30, host.New(1<<30))) vec := testutil.NewVector(Rows, types.New(types.T_int8, 0, 0, 0), m, false, nil) { - agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill) + agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -49,7 +49,7 @@ func TestBitOr(t *testing.T) { } { vec0 := testutil.NewVector(2, types.New(types.T_int8, 0, 0, 0), m, false, []int8{2, 2}) - agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill) + agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) agg.Fill(0, int64(0), 2, []*vector.Vector{vec0}) @@ -61,13 +61,13 @@ func TestBitOr(t *testing.T) { vec0.Free(m) } { - agg0 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill) + agg0 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill) + agg1 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bo.Grows, bo.Eval, bo.Merge, bo.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/bit_xor/bit_xor_test.go b/pkg/sql/colexec/agg/bit_xor/bit_xor_test.go index ed2fea5fbb8afb6e8e9c4fca2ed509ea12ee2bec..285b4f91f69979ce640605cce3bbfe82f3d42069 100644 --- a/pkg/sql/colexec/agg/bit_xor/bit_xor_test.go +++ b/pkg/sql/colexec/agg/bit_xor/bit_xor_test.go @@ -36,7 +36,7 @@ func TestBitXor(t *testing.T) { m := mheap.New(guest.New(1<<30, host.New(1<<30))) vec := testutil.NewVector(Rows, types.New(types.T_int8, 0, 0, 0), m, false, nil) { - agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill) + agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -49,7 +49,7 @@ func TestBitXor(t *testing.T) { } { vec0 := testutil.NewVector(2, types.New(types.T_int8, 0, 0, 0), m, false, []int8{2, 2}) - agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill) + agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) agg.Fill(0, int64(0), 2, []*vector.Vector{vec0}) @@ -62,7 +62,7 @@ func TestBitXor(t *testing.T) { } { vec0 := testutil.NewVector(3, types.New(types.T_int8, 0, 0, 0), m, false, []int8{1, 2, 2}) - agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill) + agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) agg.Fill(0, int64(0), 1, []*vector.Vector{vec0}) @@ -75,13 +75,13 @@ func TestBitXor(t *testing.T) { vec0.Free(m) } { - agg0 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill) + agg0 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill) + agg1 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), bx.Grows, bx.Eval, bx.Merge, bx.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/count/count_test.go b/pkg/sql/colexec/agg/count/count_test.go index 28f6cac3b9852ce013a3fedd129375e880f7e156..ef39406d38e3a47614cdfc9fd86db4f79fc8739c 100644 --- a/pkg/sql/colexec/agg/count/count_test.go +++ b/pkg/sql/colexec/agg/count/count_test.go @@ -36,7 +36,7 @@ func TestCount(t *testing.T) { m := mheap.New(guest.New(1<<30, host.New(1<<30))) vec := testutil.NewVector(Rows, types.New(types.T_int8, 0, 0, 0), m, true, nil) { - agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), c.Grows, c.Eval, c.Merge, c.Fill) + agg := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), c.Grows, c.Eval, c.Merge, c.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -48,13 +48,13 @@ func TestCount(t *testing.T) { v.Free(m) } { - agg0 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), c.Grows, c.Eval, c.Merge, c.Fill) + agg0 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), c.Grows, c.Eval, c.Merge, c.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), c.Grows, c.Eval, c.Merge, c.Fill) + agg1 := agg.NewUnaryAgg(nil, true, types.New(types.T_int8, 0, 0, 0), types.New(types.T_int64, 0, 0, 0), c.Grows, c.Eval, c.Merge, c.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/max/max_test.go b/pkg/sql/colexec/agg/max/max_test.go index 9fc21855687b030bec2fc89f9c283e4d890b7618..966f2e2a2cbc5374d5a3dbee7d1579eeb83676de 100644 --- a/pkg/sql/colexec/agg/max/max_test.go +++ b/pkg/sql/colexec/agg/max/max_test.go @@ -43,7 +43,7 @@ func TestMax(t *testing.T) { vec2 := testutil.NewVector(Rows, testTyp, m, false, vs2) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mx.Grows, mx.Eval, mx.Merge, mx.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mx.Grows, mx.Eval, mx.Merge, mx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -56,13 +56,13 @@ func TestMax(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mx.Grows, mx.Eval, mx.Merge, mx.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mx.Grows, mx.Eval, mx.Merge, mx.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mx.Grows, mx.Eval, mx.Merge, mx.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mx.Grows, mx.Eval, mx.Merge, mx.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -97,7 +97,7 @@ func TestDecimalMax(t *testing.T) { vec2 := testutil.MakeDecimal128Vector(input2, nil, testTyp) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -110,13 +110,13 @@ func TestDecimalMax(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -151,7 +151,7 @@ func TestBoollMax(t *testing.T) { vec2 := testutil.MakeBoolVector(input2) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -164,13 +164,13 @@ func TestBoollMax(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmx.Grows, dmx.Eval, dmx.Merge, dmx.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -205,7 +205,7 @@ func TestStrlMax(t *testing.T) { vec2 := testutil.MakeVarcharVector(input2, nil) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smx.Grows, smx.Eval, smx.Merge, smx.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smx.Grows, smx.Eval, smx.Merge, smx.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -218,13 +218,13 @@ func TestStrlMax(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smx.Grows, smx.Eval, smx.Merge, smx.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smx.Grows, smx.Eval, smx.Merge, smx.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smx.Grows, smx.Eval, smx.Merge, smx.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smx.Grows, smx.Eval, smx.Merge, smx.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/min/min_test.go b/pkg/sql/colexec/agg/min/min_test.go index a33377b43f6f63f9521219d601d41b086bb9a896..6c38150e6d7cea696d28128b7df451a045cf9973 100644 --- a/pkg/sql/colexec/agg/min/min_test.go +++ b/pkg/sql/colexec/agg/min/min_test.go @@ -44,7 +44,7 @@ func TestMin(t *testing.T) { vec2 := testutil.NewVector(Rows, testTyp, m, false, vs2) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mn.Grows, mn.Eval, mn.Merge, mn.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mn.Grows, mn.Eval, mn.Merge, mn.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -57,13 +57,13 @@ func TestMin(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mn2.Grows, mn2.Eval, mn2.Merge, mn2.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mn2.Grows, mn2.Eval, mn2.Merge, mn2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mn3.Grows, mn3.Eval, mn3.Merge, mn3.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, mn3.Grows, mn3.Eval, mn3.Merge, mn3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -98,7 +98,7 @@ func TestDecimalMin(t *testing.T) { vec2 := testutil.MakeDecimal128Vector(input2, nil, testTyp) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -111,13 +111,13 @@ func TestDecimalMin(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -152,7 +152,7 @@ func TestBoollMin(t *testing.T) { vec2 := testutil.MakeBoolVector(input2) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -165,13 +165,13 @@ func TestBoollMin(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, dmn.Grows, dmn.Eval, dmn.Merge, dmn.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -206,7 +206,7 @@ func TestStrlMin(t *testing.T) { vec2 := testutil.MakeVarcharVector(input2, nil) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smn.Grows, smn.Eval, smn.Merge, smn.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smn.Grows, smn.Eval, smn.Merge, smn.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -219,13 +219,13 @@ func TestStrlMin(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smn.Grows, smn.Eval, smn.Merge, smn.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smn.Grows, smn.Eval, smn.Merge, smn.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smn.Grows, smn.Eval, smn.Merge, smn.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, smn.Grows, smn.Eval, smn.Merge, smn.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/stddevpop/stddevpop_test.go b/pkg/sql/colexec/agg/stddevpop/stddevpop_test.go index 3d85a263b7dd722cf3f17b4d338ff1cc5cc1ed2a..83ad047c8b3e18bf84a1ddd40782245f829137f2 100644 --- a/pkg/sql/colexec/agg/stddevpop/stddevpop_test.go +++ b/pkg/sql/colexec/agg/stddevpop/stddevpop_test.go @@ -39,7 +39,7 @@ func TestStddevpop(t *testing.T) { m := mheap.New(guest.New(1<<30, host.New(1<<30))) vec := testutil.NewVector(Rows, inputType, m, false, nil) { - agg := agg.NewUnaryAgg(sdp1, true, inputType, types.New(types.T_float64, 0, 0, 0), sdp1.Grows, sdp1.Eval, sdp1.Merge, sdp1.Fill) + agg := agg.NewUnaryAgg(sdp1, true, inputType, types.New(types.T_float64, 0, 0, 0), sdp1.Grows, sdp1.Eval, sdp1.Merge, sdp1.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -51,13 +51,13 @@ func TestStddevpop(t *testing.T) { v.Free(m) } { - agg0 := agg.NewUnaryAgg(sdp2, true, inputType, types.New(types.T_float64, 0, 0, 0), sdp2.Grows, sdp2.Eval, sdp2.Merge, sdp2.Fill) + agg0 := agg.NewUnaryAgg(sdp2, true, inputType, types.New(types.T_float64, 0, 0, 0), sdp2.Grows, sdp2.Eval, sdp2.Merge, sdp2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(sdp3, true, inputType, types.New(types.T_float64, 0, 0, 0), sdp3.Grows, sdp3.Eval, sdp3.Merge, sdp3.Fill) + agg1 := agg.NewUnaryAgg(sdp3, true, inputType, types.New(types.T_float64, 0, 0, 0), sdp3.Grows, sdp3.Eval, sdp3.Merge, sdp3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/sum/sum.go b/pkg/sql/colexec/agg/sum/sum.go index dde2b932a6607e0650730362a8cfbf4806a50ed5..92d10ef891f7af39b261bd8ae9b9e6ef70675ee1 100644 --- a/pkg/sql/colexec/agg/sum/sum.go +++ b/pkg/sql/colexec/agg/sum/sum.go @@ -17,7 +17,9 @@ package sum import ( "fmt" + "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/vectorize/sum" ) func ReturnType(typs []types.Type) types.Type { @@ -93,6 +95,10 @@ func (s *Decimal64Sum) Merge(_ int64, _ int64, x types.Decimal64, y types.Decima return x, xEmpty } +func (s *Decimal64Sum) BatchFill(rs, vs any, start, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + return sum.Decimal64Sum(rs.([]types.Decimal64), vs.([]types.Decimal64), start, count, vps, zs, nsp) +} + func NewD128Sum() *Decimal128Sum { return &Decimal128Sum{} } @@ -120,3 +126,7 @@ func (s *Decimal128Sum) Merge(_ int64, _ int64, x types.Decimal128, y types.Deci } return x, xEmpty } + +func (s *Decimal128Sum) BatchFill(rs, vs any, start, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + return sum.Decimal128Sum(rs.([]types.Decimal128), vs.([]types.Decimal128), start, count, vps, zs, nsp) +} diff --git a/pkg/sql/colexec/agg/sum/sum_test.go b/pkg/sql/colexec/agg/sum/sum_test.go index ac3feb9755dcffef80eca725dc0f7a319b699caa..f4bfa5dd0919f110bee4788005f91e40dfd4c6a2 100644 --- a/pkg/sql/colexec/agg/sum/sum_test.go +++ b/pkg/sql/colexec/agg/sum/sum_test.go @@ -41,7 +41,7 @@ func TestSum(t *testing.T) { expected := []int64{45} { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, types.New(types.T_int64, 0, 0, 0), s.Grows, s.Eval, s.Merge, s.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, types.New(types.T_int64, 0, 0, 0), s.Grows, s.Eval, s.Merge, s.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -54,13 +54,13 @@ func TestSum(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, types.New(types.T_int64, 0, 0, 0), s.Grows, s.Eval, s.Merge, s.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, types.New(types.T_int64, 0, 0, 0), s.Grows, s.Eval, s.Merge, s.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, types.New(types.T_int64, 0, 0, 0), s.Grows, s.Eval, s.Merge, s.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, types.New(types.T_int64, 0, 0, 0), s.Grows, s.Eval, s.Merge, s.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -92,7 +92,7 @@ func TestDecimal128Sum(t *testing.T) { vec := testutil.MakeDecimal128Vector(input1, nil, testTyp) { // test single agg with Grow & Fill function - agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, ds.Grows, ds.Eval, ds.Merge, ds.Fill) + agg := agg.NewUnaryAgg(nil, true, testTyp, testTyp, ds.Grows, ds.Eval, ds.Merge, ds.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -105,13 +105,13 @@ func TestDecimal128Sum(t *testing.T) { } { // test two agg with Merge function - agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, ds.Grows, ds.Eval, ds.Merge, ds.Fill) + agg0 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, ds.Grows, ds.Eval, ds.Merge, ds.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, ds.Grows, ds.Eval, ds.Merge, ds.Fill) + agg1 := agg.NewUnaryAgg(nil, true, testTyp, testTyp, ds.Grows, ds.Eval, ds.Merge, ds.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/agg/types.go b/pkg/sql/colexec/agg/types.go index 7015f8760309dec68c3e672ed4179103fcc39c01..9a5601f3539bcde209b16ab827afcb9d8ecaf0c1 100644 --- a/pkg/sql/colexec/agg/types.go +++ b/pkg/sql/colexec/agg/types.go @@ -16,6 +16,7 @@ package agg import ( "github.com/matrixorigin/matrixone/pkg/common/hashmap" + "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/vm/mheap" @@ -116,6 +117,9 @@ type UnaryAgg[T1, T2 any] struct { // fifth represents whether it is a new group // sixth represents whether the value to be fed is null fill func(int64, T1, T2, int64, bool, bool) (T2, bool) + + // Optional optimisation function for functions where cgo is used in a single pass. + batchFill func(any, any, int64, int64, []uint64, []int64, *nulls.Nulls) error } // UnaryDistAgg generic aggregation function with one input vector and with distinct diff --git a/pkg/sql/colexec/agg/variance/variance_test.go b/pkg/sql/colexec/agg/variance/variance_test.go index 23b6809f536a94f1334b9a9fcf600100ba47c6ab..1cb5bef62fa081aaf956505d010849e4c16bac28 100644 --- a/pkg/sql/colexec/agg/variance/variance_test.go +++ b/pkg/sql/colexec/agg/variance/variance_test.go @@ -39,7 +39,7 @@ func TestVariance(t *testing.T) { m := mheap.New(guest.New(1<<30, host.New(1<<30))) vec := testutil.NewVector(Rows, inputType, m, false, nil) { - agg := agg.NewUnaryAgg(variance1, true, inputType, types.New(types.T_float64, 0, 0, 0), variance1.Grows, variance1.Eval, variance1.Merge, variance1.Fill) + agg := agg.NewUnaryAgg(variance1, true, inputType, types.New(types.T_float64, 0, 0, 0), variance1.Grows, variance1.Eval, variance1.Merge, variance1.Fill, nil) err := agg.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { @@ -51,13 +51,13 @@ func TestVariance(t *testing.T) { v.Free(m) } { - agg0 := agg.NewUnaryAgg(variance2, true, inputType, types.New(types.T_float64, 0, 0, 0), variance2.Grows, variance2.Eval, variance2.Merge, variance2.Fill) + agg0 := agg.NewUnaryAgg(variance2, true, inputType, types.New(types.T_float64, 0, 0, 0), variance2.Grows, variance2.Eval, variance2.Merge, variance2.Fill, nil) err := agg0.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { agg0.Fill(0, int64(i), 1, []*vector.Vector{vec}) } - agg1 := agg.NewUnaryAgg(variance3, true, inputType, types.New(types.T_float64, 0, 0, 0), variance3.Grows, variance3.Eval, variance3.Merge, variance3.Fill) + agg1 := agg.NewUnaryAgg(variance3, true, inputType, types.New(types.T_float64, 0, 0, 0), variance3.Grows, variance3.Eval, variance3.Merge, variance3.Fill, nil) err = agg1.Grows(1, m) require.NoError(t, err) for i := 0; i < Rows; i++ { diff --git a/pkg/sql/colexec/aggregate/aggregate.go b/pkg/sql/colexec/aggregate/aggregate.go index 0381058ae4e1f4dde679caf736dcde9c171ffd84..474f5868fa82b7852139b32fe47927957e84f58d 100755 --- a/pkg/sql/colexec/aggregate/aggregate.go +++ b/pkg/sql/colexec/aggregate/aggregate.go @@ -213,13 +213,13 @@ func NewAvg(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, aggPriv.BatchFill) case types.T_decimal128: aggPriv := avg.NewD128Avg() if dist { return agg.NewUnaryDistAgg(false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, aggPriv.BatchFill) } panic(fmt.Errorf("unsupport type '%s' for avg", typ)) } @@ -251,13 +251,13 @@ func NewSum(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, aggPriv.BatchFill) case types.T_decimal128: aggPriv := sum.NewD128Sum() if dist { return agg.NewUnaryDistAgg(false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, aggPriv.BatchFill) } panic(fmt.Errorf("unsupport type '%s' for sum", typ)) } @@ -269,7 +269,7 @@ func NewMax(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_int8: return newGenericMax[int8](typ, dist) case types.T_int16: @@ -295,19 +295,19 @@ func NewMax(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_varchar: aggPriv := max.NewStrMax() if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_blob: aggPriv := max.NewStrMax() if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_date: return newGenericMax[types.Date](typ, dist) case types.T_datetime: @@ -319,13 +319,13 @@ func NewMax(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_decimal128: aggPriv := max.NewD128Max() if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } panic(fmt.Errorf("unsupport type '%s' for anyvalue", typ)) } @@ -337,7 +337,7 @@ func NewMin(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_int8: return newGenericMin[int8](typ, dist) case types.T_int16: @@ -363,19 +363,19 @@ func NewMin(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_varchar: aggPriv := min.NewStrMin() if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_blob: aggPriv := min.NewStrMin() if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_date: return newGenericMin[types.Date](typ, dist) case types.T_datetime: @@ -387,13 +387,13 @@ func NewMin(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_decimal128: aggPriv := min.NewD128Min() if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } panic(fmt.Errorf("unsupport type '%s' for anyvalue", typ)) } @@ -547,13 +547,13 @@ func NewVariance(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_decimal128: aggPriv := variance.New3() if dist { return agg.NewUnaryDistAgg(false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } panic(fmt.Errorf("unsupport type '%s' for avg", typ)) @@ -586,13 +586,13 @@ func NewStdDevPop(typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) case types.T_decimal128: aggPriv := stddevpop.New3() if dist { return agg.NewUnaryDistAgg(false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } panic(fmt.Errorf("unsupport type '%s' for avg", typ)) @@ -603,7 +603,7 @@ func newGenericAnyValue[T any](typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, anyvalue.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, anyvalue.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, anyvalue.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericSum[T1 sum.Numeric, T2 sum.ReturnTyp](typ types.Type, dist bool) agg.Agg[any] { @@ -611,7 +611,7 @@ func newGenericSum[T1 sum.Numeric, T2 sum.ReturnTyp](typ types.Type, dist bool) if dist { return agg.NewUnaryDistAgg(false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, sum.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericAvg[T sum.Numeric](typ types.Type, dist bool) agg.Agg[any] { @@ -619,7 +619,7 @@ func newGenericAvg[T sum.Numeric](typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, avg.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericMax[T max.Compare](typ types.Type, dist bool) agg.Agg[any] { @@ -627,7 +627,7 @@ func newGenericMax[T max.Compare](typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, max.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericMin[T max.Compare](typ types.Type, dist bool) agg.Agg[any] { @@ -635,7 +635,7 @@ func newGenericMin[T max.Compare](typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, min.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericCount[T types.Generic | count.Decimal128AndString](typ types.Type, dist bool, isStar bool) agg.Agg[any] { @@ -643,7 +643,7 @@ func newGenericCount[T types.Generic | count.Decimal128AndString](typ types.Type if dist { return agg.NewUnaryDistAgg(true, typ, count.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, true, typ, count.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, true, typ, count.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericApproxcd[T any](typ types.Type, dist bool) agg.Agg[any] { @@ -651,7 +651,7 @@ func newGenericApproxcd[T any](typ types.Type, dist bool) agg.Agg[any] { if dist { return agg.NewUnaryDistAgg(false, typ, approxcd.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, approxcd.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, approxcd.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericBitOr[T types.Ints | types.UInts | types.Floats](typ types.Type, dist bool) agg.Agg[any] { @@ -659,7 +659,7 @@ func newGenericBitOr[T types.Ints | types.UInts | types.Floats](typ types.Type, if dist { return agg.NewUnaryDistAgg(false, typ, bit_or.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, bit_or.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, bit_or.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericBitXor[T types.Ints | types.UInts | types.Floats](typ types.Type, dist bool) agg.Agg[any] { @@ -667,7 +667,7 @@ func newGenericBitXor[T types.Ints | types.UInts | types.Floats](typ types.Type, if dist { return agg.NewUnaryDistAgg(false, typ, bit_xor.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, bit_xor.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, bit_xor.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericBitAnd[T types.Ints | types.UInts | types.Floats](typ types.Type, dist bool) agg.Agg[any] { @@ -675,7 +675,7 @@ func newGenericBitAnd[T types.Ints | types.UInts | types.Floats](typ types.Type, if dist { return agg.NewUnaryDistAgg(false, typ, bit_and.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, bit_and.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, bit_and.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericVariance[T types.Ints | types.UInts | types.Floats](typ types.Type, dist bool) agg.Agg[any] { @@ -683,7 +683,7 @@ func newGenericVariance[T types.Ints | types.UInts | types.Floats](typ types.Typ if dist { return agg.NewUnaryDistAgg(false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, variance.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } func newGenericStdDevPop[T types.Ints | types.UInts | types.Floats](typ types.Type, dist bool) agg.Agg[any] { @@ -691,5 +691,5 @@ func newGenericStdDevPop[T types.Ints | types.UInts | types.Floats](typ types.Ty if dist { return agg.NewUnaryDistAgg(false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) } - return agg.NewUnaryAgg(aggPriv, false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill) + return agg.NewUnaryAgg(aggPriv, false, typ, stddevpop.ReturnType([]types.Type{typ}), aggPriv.Grows, aggPriv.Eval, aggPriv.Merge, aggPriv.Fill, nil) } diff --git a/pkg/sql/colexec/anti/join.go b/pkg/sql/colexec/anti/join.go index 46619cc12490fe50f4e1dc34454aa0552137127a..a728e6a48cd9f1e4a84083da141ed2f8f93ba119 100644 --- a/pkg/sql/colexec/anti/join.go +++ b/pkg/sql/colexec/anti/join.go @@ -155,6 +155,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() + mSels := ctr.mp.Sels() itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i @@ -170,16 +171,43 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces if zvals[k] == 0 { continue } - if vals[k] != 0 { + if vals[k] == 0 { + for j, pos := range ap.Result { + if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[pos], int64(i+k), proc.GetMheap()); err != nil { + rbat.Clean(proc.GetMheap()) + return err + } + } + rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) continue } - for j, pos := range ap.Result { - if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[pos], int64(i+k), proc.GetMheap()); err != nil { - rbat.Clean(proc.GetMheap()) - return err + if ap.Cond != nil { + flg := true // mark no tuple satisfies the condition + sels := mSels[vals[k]-1] + for _, sel := range sels { + vec, err := colexec.JoinFilterEvalExprInBucket(bat, ctr.bat, i+k, int(sel), proc, ap.Cond) + if err != nil { + return err + } + bs := vec.Col.([]bool) + if bs[0] { + flg = false + vec.Free(proc.Mp) + break + } + vec.Free(proc.Mp) + } + if !flg { + continue } + for j, pos := range ap.Result { + if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[pos], int64(i+k), proc.GetMheap()); err != nil { + rbat.Clean(proc.GetMheap()) + return err + } + } + rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) } - rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) } } rbat.ExpandNulls() diff --git a/pkg/sql/colexec/anti/join_test.go b/pkg/sql/colexec/anti/join_test.go index 84e5d938c7eabea0f2cc5349decf20cb035a453c..4d2693f36c96226a08ec51e97781e5e2f089fee8 100644 --- a/pkg/sql/colexec/anti/join_test.go +++ b/pkg/sql/colexec/anti/join_test.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -190,6 +191,44 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] Ctx: ctx, Ch: make(chan *batch.Batch, 3), } + fid := function.EncodeOverloadID(function.EQUAL, 4) + args := make([]*plan.Expr, 0, 2) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 0, + ColPos: 0, + }, + }, + }) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 1, + ColPos: 0, + }, + }, + }) + cond := &plan.Expr{ + Typ: &plan.Type{ + Size: 1, + Id: int32(types.T_bool), + }, + Expr: &plan.Expr_F{ + F: &plan.Function{ + Args: args, + Func: &plan.ObjectRef{Obj: fid}, + }, + }, + } return antiTestCase{ types: ts, flgs: flgs, @@ -199,6 +238,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] Typs: ts, Result: rp, Conditions: cs, + Cond: cond, }, barg: &hashbuild.Argument{ Typs: ts, diff --git a/pkg/sql/colexec/anti/types.go b/pkg/sql/colexec/anti/types.go index c05312d284c2468d7e453c358d7b991aa253dc7a..bfb10897d09fa1effc17de098df1249c51737502 100644 --- a/pkg/sql/colexec/anti/types.go +++ b/pkg/sql/colexec/anti/types.go @@ -54,5 +54,6 @@ type Argument struct { Nbucket uint64 Result []int32 Typs []types.Type + Cond *plan.Expr Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/expr_eval.go b/pkg/sql/colexec/expr_eval.go index fcdb3f813a7ce690b00065c6133f881944b50fcb..d3e32cda73405af9d9b1f0b1e04bb556627d436a 100644 --- a/pkg/sql/colexec/expr_eval.go +++ b/pkg/sql/colexec/expr_eval.go @@ -259,6 +259,98 @@ func JoinFilterEvalExpr(r, s *batch.Batch, rRow int, proc *process.Process, expr } } +func JoinFilterEvalExprInBucket(r, s *batch.Batch, rRow, sRow int, proc *process.Process, expr *plan.Expr) (*vector.Vector, error) { + var vec *vector.Vector + e := expr.Expr + switch t := e.(type) { + case *plan.Expr_C: + if t.C.GetIsnull() { + vec = vector.NewConst(types.Type{Oid: types.T(expr.Typ.GetId())}, 1) + nulls.Add(vec.Nsp, 0) + } else { + switch t.C.GetValue().(type) { + case *plan.Const_Bval: + vec = vector.NewConst(constBType, 1) + vec.Col = []bool{t.C.GetBval()} + case *plan.Const_Ival: + vec = vector.NewConst(constIType, 1) + vec.Col = []int64{t.C.GetIval()} + case *plan.Const_Dval: + vec = vector.NewConst(constDType, 1) + vec.Col = []float64{t.C.GetDval()} + case *plan.Const_Sval: + vec = vector.NewConst(constSType, 1) + sval := t.C.GetSval() + vec.Col = &types.Bytes{ + Data: []byte(sval), + Offsets: []uint32{0}, + Lengths: []uint32{uint32(len(sval))}, + } + default: + return nil, errors.New(errno.SyntaxErrororAccessRuleViolation, fmt.Sprintf("unimplemented const expression %v", t.C.GetValue())) + } + } + return vec, nil + case *plan.Expr_T: + // return a vector recorded type information but without real data + return vector.New(types.Type{ + Oid: types.T(t.T.Typ.GetId()), + Width: t.T.Typ.GetWidth(), + Scale: t.T.Typ.GetScale(), + Precision: t.T.Typ.GetPrecision(), + }), nil + case *plan.Expr_Col: + if t.Col.RelPos == 0 { + return r.Vecs[t.Col.ColPos].ToConst(rRow), nil + } + return s.Vecs[t.Col.ColPos].ToConst(sRow), nil + case *plan.Expr_F: + overloadId := t.F.Func.GetObj() + f, err := function.GetFunctionByID(overloadId) + if err != nil { + return nil, err + } + vs := make([]*vector.Vector, len(t.F.Args)) + for i := range vs { + v, err := JoinFilterEvalExprInBucket(r, s, rRow, sRow, proc, t.F.Args[i]) + if err != nil { + mp := make(map[*vector.Vector]uint8) + for i := range s.Vecs { + mp[s.Vecs[i]] = 0 + } + for j := 0; j < i; j++ { + if _, ok := mp[vs[j]]; !ok { + vector.Clean(vs[j], proc.Mp) + } + } + return nil, err + } + vs[i] = v + } + defer func() { + mp := make(map[*vector.Vector]uint8) + for i := range s.Vecs { + mp[s.Vecs[i]] = 0 + } + for i := range vs { + if _, ok := mp[vs[i]]; !ok { + vector.Clean(vs[i], proc.Mp) + } + } + }() + vec, err = f.VecFn(vs, proc) + if err != nil { + return nil, err + } + vec.Length = 1 + vec.FillDefaultValue() + return vec, nil + default: + // *plan.Expr_Corr, *plan.Expr_List, *plan.Expr_P, *plan.Expr_V, *plan.Expr_Sub + return nil, errors.New(errno.SyntaxErrororAccessRuleViolation, fmt.Sprintf("unsupported eval expr '%v'", t)) + } +} + // RewriteFilterExprList will convert an expression list to be an AndExpr func RewriteFilterExprList(list []*plan.Expr) *plan.Expr { l := len(list) @@ -276,6 +368,28 @@ func RewriteFilterExprList(list []*plan.Expr) *plan.Expr { } } +func SplitAndExprs(list []*plan.Expr) []*plan.Expr { + exprs := make([]*plan.Expr, 0, len(list)) + for i := range list { + exprs = append(exprs, splitAndExpr(list[i])...) + } + return exprs +} + +func splitAndExpr(expr *plan.Expr) []*plan.Expr { + exprs := make([]*plan.Expr, 0, 1) + if e, ok := expr.Expr.(*plan.Expr_F); ok { + fid, _ := function.DecodeOverloadID(e.F.Func.GetObj()) + if fid == function.AND { + exprs = append(exprs, splitAndExpr(e.F.Args[0])...) + exprs = append(exprs, splitAndExpr(e.F.Args[1])...) + return exprs + } + } + exprs = append(exprs, expr) + return exprs +} + func makeAndExpr(left, right *plan.Expr) *plan.Expr_F { return &plan.Expr_F{ F: &plan.Function{ diff --git a/pkg/sql/colexec/join/join.go b/pkg/sql/colexec/join/join.go index 39ab94d56101934dfde3f9a2f2ff965d29154e7a..ce6717c100b2c444ddfbcd08a478c0b3b0e023f8 100644 --- a/pkg/sql/colexec/join/join.go +++ b/pkg/sql/colexec/join/join.go @@ -123,6 +123,18 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } sels := mSels[vals[k]-1] for _, sel := range sels { + if ap.Cond != nil { + vec, err := colexec.JoinFilterEvalExprInBucket(bat, ctr.bat, i+k, int(sel), proc, ap.Cond) + if err != nil { + return err + } + bs := vec.Col.([]bool) + if !bs[0] { + vec.Free(proc.Mp) + continue + } + vec.Free(proc.Mp) + } for j, rp := range ap.Result { if rp.Rel == 0 { if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.Mp); err != nil { diff --git a/pkg/sql/colexec/join/join_test.go b/pkg/sql/colexec/join/join_test.go index 864ae79af13473398611d83338c9e1a96dda12fb..3e70c42742c862c608de2123e1be6ed4757aa0da 100644 --- a/pkg/sql/colexec/join/join_test.go +++ b/pkg/sql/colexec/join/join_test.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -68,15 +69,6 @@ func init() { newExpr(0, types.Type{Oid: types.T_int8}), }, }), - newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_decimal64}}, []ResultPos{{0, 0}}, - [][]*plan.Expr{ - { - newExpr(0, types.Type{Oid: types.T_decimal64}), - }, - { - newExpr(0, types.Type{Oid: types.T_decimal64, Scale: 1}), - }, - }), } } @@ -181,6 +173,44 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Ctx: ctx, Ch: make(chan *batch.Batch, 3), } + fid := function.EncodeOverloadID(function.EQUAL, 4) + args := make([]*plan.Expr, 0, 2) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 0, + ColPos: 0, + }, + }, + }) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 1, + ColPos: 0, + }, + }, + }) + cond := &plan.Expr{ + Typ: &plan.Type{ + Size: 1, + Id: int32(types.T_bool), + }, + Expr: &plan.Expr_F{ + F: &plan.Function{ + Args: args, + Func: &plan.ObjectRef{Obj: fid}, + }, + }, + } return joinTestCase{ types: ts, flgs: flgs, @@ -190,6 +220,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Typs: ts, Result: rp, Conditions: cs, + Cond: cond, }, barg: &hashbuild.Argument{ Typs: ts, diff --git a/pkg/sql/colexec/join/types.go b/pkg/sql/colexec/join/types.go index 4e433a4e8b4fa3501fc9d2108aaca5cd2ddc32cc..1207de1f2aea38d57fb3a9f1907c95d8e119bfac 100644 --- a/pkg/sql/colexec/join/types.go +++ b/pkg/sql/colexec/join/types.go @@ -57,5 +57,6 @@ type Argument struct { Nbucket uint64 // buckets count Result []ResultPos Typs []types.Type + Cond *plan.Expr Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/left/join.go b/pkg/sql/colexec/left/join.go index ca71864708e07d566c8e1dd886d92199cb956c56..a273a2df4c9e83f5c80653944cd5184b65c6b1b2 100644 --- a/pkg/sql/colexec/left/join.go +++ b/pkg/sql/colexec/left/join.go @@ -201,8 +201,22 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces continue } sels := mSels[vals[k]-1] + flg := true for _, sel := range sels { for j, rp := range ap.Result { + if ap.Cond != nil { + vec, err := colexec.JoinFilterEvalExprInBucket(bat, ctr.bat, i+k, int(sel), proc, ap.Cond) + if err != nil { + return err + } + bs := vec.Col.([]bool) + if !bs[0] { + vec.Free(proc.Mp) + continue + } + vec.Free(proc.Mp) + } + flg = false if rp.Rel == 0 { if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.GetMheap()); err != nil { rbat.Clean(proc.GetMheap()) @@ -217,6 +231,23 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } rbat.Zs = append(rbat.Zs, ctr.bat.Zs[sel]) } + if flg { + for j, rp := range ap.Result { + if rp.Rel == 0 { + if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.GetMheap()); err != nil { + rbat.Clean(proc.GetMheap()) + return err + } + } else { + if err := vector.UnionNull(rbat.Vecs[j], ctr.bat.Vecs[rp.Pos], proc.GetMheap()); err != nil { + rbat.Clean(proc.GetMheap()) + return err + } + } + } + rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) + continue + } } } rbat.ExpandNulls() diff --git a/pkg/sql/colexec/left/join_test.go b/pkg/sql/colexec/left/join_test.go index 59070d8156a169d48fef8fa36f9c0ec2a55c7695..c97d3e207a980337f2b23de6ec82e077ab784ee3 100644 --- a/pkg/sql/colexec/left/join_test.go +++ b/pkg/sql/colexec/left/join_test.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -68,15 +69,6 @@ func init() { newExpr(0, types.Type{Oid: types.T_int8}), }, }), - newTestCase(testutil.NewMheap(), []bool{false}, []types.Type{{Oid: types.T_decimal64}}, []ResultPos{{0, 0}}, - [][]*plan.Expr{ - { - newExpr(0, types.Type{Oid: types.T_decimal64}), - }, - { - newExpr(0, types.Type{Oid: types.T_decimal64, Scale: 1}), - }, - }), } } @@ -200,6 +192,44 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Ctx: ctx, Ch: make(chan *batch.Batch, 3), } + fid := function.EncodeOverloadID(function.EQUAL, 4) + args := make([]*plan.Expr, 0, 2) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 0, + ColPos: 0, + }, + }, + }) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 1, + ColPos: 0, + }, + }, + }) + cond := &plan.Expr{ + Typ: &plan.Type{ + Size: 1, + Id: int32(types.T_bool), + }, + Expr: &plan.Expr_F{ + F: &plan.Function{ + Args: args, + Func: &plan.ObjectRef{Obj: fid}, + }, + }, + } return joinTestCase{ types: ts, flgs: flgs, @@ -209,6 +239,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Typs: ts, Result: rp, Conditions: cs, + Cond: cond, }, barg: &hashbuild.Argument{ Typs: ts, diff --git a/pkg/sql/colexec/left/types.go b/pkg/sql/colexec/left/types.go index 436031700547fc2e27c9c1fe553ee44564c96808..5a400978322918853f4da54d124d1feabd47b628 100644 --- a/pkg/sql/colexec/left/types.go +++ b/pkg/sql/colexec/left/types.go @@ -57,5 +57,6 @@ type Argument struct { Nbucket uint64 Result []ResultPos Typs []types.Type + Cond *plan.Expr Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/semi/join.go b/pkg/sql/colexec/semi/join.go index 1f5900608e97bdd58e7929d3557409d1eb854173..aaa206e0c55076c542012697adccfaea7c284edf 100644 --- a/pkg/sql/colexec/semi/join.go +++ b/pkg/sql/colexec/semi/join.go @@ -104,6 +104,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces } defer ctr.freeJoinCondition(proc) count := bat.Length() + mSels := ctr.mp.Sels() itr := ctr.mp.Map().NewIterator() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i @@ -113,14 +114,28 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces copy(ctr.inBuckets, hashmap.OneUInt8s) vals, zvals := itr.Find(i, n, ctr.vecs, ctr.inBuckets) for k := 0; k < n; k++ { - if ctr.inBuckets[k] == 0 { + if ctr.inBuckets[k] == 0 || zvals[k] == 0 || vals[k] == 0 { continue } - if zvals[k] == 0 { - continue - } - if vals[k] == 0 { - continue + if ap.Cond != nil { + flg := true // mark no tuple satisfies the condition + sels := mSels[vals[k]-1] + for _, sel := range sels { + vec, err := colexec.JoinFilterEvalExprInBucket(bat, ctr.bat, i+k, int(sel), proc, ap.Cond) + if err != nil { + return err + } + bs := vec.Col.([]bool) + if bs[0] { + flg = false + vec.Free(proc.Mp) + break + } + vec.Free(proc.Mp) + } + if flg { + continue + } } for j, pos := range ap.Result { if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[pos], int64(i+k), proc.GetMheap()); err != nil { diff --git a/pkg/sql/colexec/semi/join_test.go b/pkg/sql/colexec/semi/join_test.go index 8ac26255a952f21979e554dadb93e5df8444edc8..4085811784df2fc61a495ca06c7c9a09faca74d4 100644 --- a/pkg/sql/colexec/semi/join_test.go +++ b/pkg/sql/colexec/semi/join_test.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -172,6 +173,44 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] Ctx: ctx, Ch: make(chan *batch.Batch, 3), } + fid := function.EncodeOverloadID(function.EQUAL, 4) + args := make([]*plan.Expr, 0, 2) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 0, + ColPos: 0, + }, + }, + }) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 1, + ColPos: 0, + }, + }, + }) + cond := &plan.Expr{ + Typ: &plan.Type{ + Size: 1, + Id: int32(types.T_bool), + }, + Expr: &plan.Expr_F{ + F: &plan.Function{ + Args: args, + Func: &plan.ObjectRef{Obj: fid}, + }, + }, + } return joinTestCase{ types: ts, flgs: flgs, @@ -181,6 +220,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []int32, cs [] Typs: ts, Result: rp, Conditions: cs, + Cond: cond, }, barg: &hashbuild.Argument{ Typs: ts, diff --git a/pkg/sql/colexec/semi/types.go b/pkg/sql/colexec/semi/types.go index 10773893bde2dc913030455dd9f283430491e2bd..3985f1d3592549510756d3fd43bc0e850c10b9a3 100644 --- a/pkg/sql/colexec/semi/types.go +++ b/pkg/sql/colexec/semi/types.go @@ -52,5 +52,6 @@ type Argument struct { Nbucket uint64 // buckets count Result []int32 Typs []types.Type + Cond *plan.Expr Conditions [][]*plan.Expr } diff --git a/pkg/sql/colexec/single/join.go b/pkg/sql/colexec/single/join.go index b1f2f70b29b9d5e0ae1ea34229c2e2b7fe98553f..900beece79dd3521685032ebb7169859e7c0d483 100644 --- a/pkg/sql/colexec/single/join.go +++ b/pkg/sql/colexec/single/join.go @@ -201,11 +201,44 @@ func (ctr *container) probe(bat *batch.Batch, ap *Argument, proc *process.Proces rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) continue } + idx := 0 + cnt := 0 sels := mSels[vals[k]-1] - if len(sels) > 1 { + if ap.Cond != nil { + for j, sel := range sels { + vec, err := colexec.JoinFilterEvalExprInBucket(bat, ctr.bat, i+k, int(sel), proc, ap.Cond) + if err != nil { + return err + } + bs := vec.Col.([]bool) + if bs[0] { + cnt++ + idx = j + } + vec.Free(proc.Mp) + } + } + if (ap.Cond != nil && cnt > 1) || (ap.Cond == nil && len(sels) > 1) { return errors.New("scalar subquery returns more than 1 row") } - sel := sels[0] + if ap.Cond != nil && cnt == 0 { + for j, rp := range ap.Result { + if rp.Rel == 0 { + if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.GetMheap()); err != nil { + rbat.Clean(proc.GetMheap()) + return err + } + } else { + if err := vector.UnionNull(rbat.Vecs[j], ctr.bat.Vecs[rp.Pos], proc.GetMheap()); err != nil { + rbat.Clean(proc.GetMheap()) + return err + } + } + } + rbat.Zs = append(rbat.Zs, bat.Zs[i+k]) + continue + } + sel := sels[idx] for j, rp := range ap.Result { if rp.Rel == 0 { if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.GetMheap()); err != nil { diff --git a/pkg/sql/colexec/single/join_test.go b/pkg/sql/colexec/single/join_test.go index aa883ef8e3416cc54cabbc4b975b09d342a2ad9d..9e672a48026ad72c1d4d726caff1be02266277f4 100644 --- a/pkg/sql/colexec/single/join_test.go +++ b/pkg/sql/colexec/single/join_test.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec/hashbuild" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/mheap" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -181,6 +182,44 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Ctx: ctx, Ch: make(chan *batch.Batch, 3), } + fid := function.EncodeOverloadID(function.EQUAL, 4) + args := make([]*plan.Expr, 0, 2) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 0, + ColPos: 0, + }, + }, + }) + args = append(args, &plan.Expr{ + Typ: &plan.Type{ + Size: ts[0].Size, + Id: int32(ts[0].Oid), + }, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: 1, + ColPos: 0, + }, + }, + }) + cond := &plan.Expr{ + Typ: &plan.Type{ + Size: 1, + Id: int32(types.T_bool), + }, + Expr: &plan.Expr_F{ + F: &plan.Function{ + Args: args, + Func: &plan.ObjectRef{Obj: fid}, + }, + }, + } return joinTestCase{ types: ts, flgs: flgs, @@ -190,6 +229,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c Typs: ts, Result: rp, Conditions: cs, + Cond: cond, }, barg: &hashbuild.Argument{ Typs: ts, diff --git a/pkg/sql/colexec/single/types.go b/pkg/sql/colexec/single/types.go index 10e8979e0d4bb76bc5fc8f730304ad78a1639503..cbcd13ae8c5e2e1ceaa90168cd7c90131d3b7737 100644 --- a/pkg/sql/colexec/single/types.go +++ b/pkg/sql/colexec/single/types.go @@ -55,7 +55,8 @@ type Argument struct { ctr *container Ibucket uint64 Nbucket uint64 - Typs []types.Type Result []ResultPos + Typs []types.Type + Cond *plan.Expr Conditions [][]*plan.Expr } diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 2303b854592a9b49e9f589d224308854c03dcc50..8e7f4e03ce0ec2bc1562f1ac4bdc2458f103de8a 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -689,8 +689,9 @@ func (c *Compile) compileJoin(n, right *plan.Node, ss []*Scope, children []*Scop } } case plan.Node_ANTI: + _, conds := extraJoinConditions(n.OnList) for i := range rs { - if isEq && len(n.OnList) == 1 { + if isEq && len(conds) == 1 { rs[i].appendInstruction(vm.Instruction{ Op: vm.Anti, Idx: c.anal.curr, diff --git a/pkg/sql/compile/operator.go b/pkg/sql/compile/operator.go index 1f0352da8bbae7671db63095b68ed10e4356e7b3..391fb06958d5e4ecfee715f36d6f78176c0a44d6 100644 --- a/pkg/sql/compile/operator.go +++ b/pkg/sql/compile/operator.go @@ -91,17 +91,22 @@ func dupInstruction(in vm.Instruction) vm.Instruction { } case *join.Argument: rin.Arg = &join.Argument{ + Typs: arg.Typs, + Cond: arg.Cond, Result: arg.Result, Conditions: arg.Conditions, } case *semi.Argument: rin.Arg = &semi.Argument{ + Typs: arg.Typs, + Cond: arg.Cond, Result: arg.Result, Conditions: arg.Conditions, } case *left.Argument: rin.Arg = &left.Argument{ Typs: arg.Typs, + Cond: arg.Cond, Result: arg.Result, Conditions: arg.Conditions, } @@ -116,15 +121,19 @@ func dupInstruction(in vm.Instruction) vm.Instruction { case *single.Argument: rin.Arg = &single.Argument{ Typs: arg.Typs, + Cond: arg.Cond, Result: arg.Result, Conditions: arg.Conditions, } case *product.Argument: rin.Arg = &product.Argument{ + Typs: arg.Typs, Result: arg.Result, } case *anti.Argument: rin.Arg = &anti.Argument{ + Typs: arg.Typs, + Cond: arg.Cond, Result: arg.Result, Conditions: arg.Conditions, } @@ -151,11 +160,13 @@ func dupInstruction(in vm.Instruction) vm.Instruction { } case *loopjoin.Argument: rin.Arg = &loopjoin.Argument{ + Typs: arg.Typs, Cond: arg.Cond, Result: arg.Result, } case *loopsemi.Argument: rin.Arg = &loopsemi.Argument{ + Typs: arg.Typs, Cond: arg.Cond, Result: arg.Result, } @@ -167,11 +178,13 @@ func dupInstruction(in vm.Instruction) vm.Instruction { } case *loopanti.Argument: rin.Arg = &loopanti.Argument{ + Typs: arg.Typs, Cond: arg.Cond, Result: arg.Result, } case *loopsingle.Argument: rin.Arg = &loopsingle.Argument{ + Typs: arg.Typs, Cond: arg.Cond, Result: arg.Result, } @@ -327,10 +340,12 @@ func constructJoin(n *plan.Node, typs []types.Type, proc *process.Process) *join for i, expr := range n.ProjectList { result[i].Rel, result[i].Pos = constructJoinResult(expr) } + cond, conds := extraJoinConditions(n.OnList) return &join.Argument{ Typs: typs, Result: result, - Conditions: constructJoinConditions(n.OnList), + Cond: cond, + Conditions: constructJoinConditions(conds), } } @@ -343,10 +358,12 @@ func constructSemi(n *plan.Node, typs []types.Type, proc *process.Process) *semi } result[i] = pos } + cond, conds := extraJoinConditions(n.OnList) return &semi.Argument{ Typs: typs, Result: result, - Conditions: constructJoinConditions(n.OnList), + Cond: cond, + Conditions: constructJoinConditions(conds), } } @@ -355,10 +372,12 @@ func constructLeft(n *plan.Node, typs []types.Type, proc *process.Process) *left for i, expr := range n.ProjectList { result[i].Rel, result[i].Pos = constructJoinResult(expr) } + cond, conds := extraJoinConditions(n.OnList) return &left.Argument{ Typs: typs, Result: result, - Conditions: constructJoinConditions(n.OnList), + Cond: cond, + Conditions: constructJoinConditions(conds), } } @@ -367,10 +386,12 @@ func constructSingle(n *plan.Node, typs []types.Type, proc *process.Process) *si for i, expr := range n.ProjectList { result[i].Rel, result[i].Pos = constructJoinResult(expr) } + cond, conds := extraJoinConditions(n.OnList) return &single.Argument{ Typs: typs, Result: result, - Conditions: constructJoinConditions(n.OnList), + Cond: cond, + Conditions: constructJoinConditions(conds), } } @@ -391,10 +412,12 @@ func constructAnti(n *plan.Node, typs []types.Type, proc *process.Process) *anti } result[i] = pos } + cond, conds := extraJoinConditions(n.OnList) return &anti.Argument{ Typs: typs, Result: result, - Conditions: constructJoinConditions(n.OnList), + Cond: cond, + Conditions: constructJoinConditions(conds), } } @@ -738,6 +761,22 @@ func constructJoinCondition(expr *plan.Expr) (*plan.Expr, *plan.Expr) { } func isEquiJoin(exprs []*plan.Expr) bool { + for _, expr := range exprs { + if e, ok := expr.Expr.(*plan.Expr_F); ok { + if !supportedJoinCondition(e.F.Func.GetObj()) { + continue + } + lpos, rpos := hasColExpr(e.F.Args[0], -1), hasColExpr(e.F.Args[1], -1) + if lpos == -1 || rpos == -1 || (lpos == rpos) { + continue + } + return true + } + } + return false || isEquiJoin0(exprs) +} + +func isEquiJoin0(exprs []*plan.Expr) bool { for _, expr := range exprs { if e, ok := expr.Expr.(*plan.Expr_F); ok { if !supportedJoinCondition(e.F.Func.GetObj()) { @@ -752,6 +791,30 @@ func isEquiJoin(exprs []*plan.Expr) bool { return true } +func extraJoinConditions(exprs []*plan.Expr) (*plan.Expr, []*plan.Expr) { + exprs = colexec.SplitAndExprs(exprs) + eqConds := make([]*plan.Expr, 0, len(exprs)) + notEqConds := make([]*plan.Expr, 0, len(exprs)) + for i, expr := range exprs { + if e, ok := expr.Expr.(*plan.Expr_F); ok { + if !supportedJoinCondition(e.F.Func.GetObj()) { + notEqConds = append(notEqConds, exprs[i]) + continue + } + lpos, rpos := hasColExpr(e.F.Args[0], -1), hasColExpr(e.F.Args[1], -1) + if lpos == -1 || rpos == -1 || (lpos == rpos) { + notEqConds = append(notEqConds, exprs[i]) + continue + } + eqConds = append(eqConds, exprs[i]) + } + } + if len(notEqConds) == 0 { + return nil, eqConds + } + return colexec.RewriteFilterExprList(notEqConds), eqConds +} + func supportedJoinCondition(id int64) bool { fid, _ := function.DecodeOverloadID(id) return fid == function.EQUAL diff --git a/pkg/vectorize/sum/sum.go b/pkg/vectorize/sum/sum.go index a5b61abb15c99d0f4928fbc8f53326ebc85de8d0..5cc81c9d389ca2611037aabefaacf02cd9a66dee 100644 --- a/pkg/vectorize/sum/sum.go +++ b/pkg/vectorize/sum/sum.go @@ -14,7 +14,19 @@ package sum +/* +#include "mo.h" +#cgo CFLAGS: -I../../../cgo +#cgo LDFLAGS: -L../../../cgo -lmo -lm +*/ +import "C" + import ( + "unsafe" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/nulls" + "github.com/matrixorigin/matrixone/pkg/container/types" "golang.org/x/exp/constraints" ) @@ -97,6 +109,33 @@ func floatSumSels[T constraints.Float](xs []T, sels []int64) T { return res } +func Decimal64Sum(rs, vs []types.Decimal64, start int64, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + rc := C.Decimal64_VecSum((*C.int64_t)(unsafe.Pointer(&rs[0])), (*C.int64_t)(unsafe.Pointer(&vs[0])), + (C.int64_t)(start), (C.int64_t)(count), (*C.uint64_t)(&vps[0]), (*C.int64_t)(&zs[0]), (*C.uint64_t)(nulls.Ptr(nsp))) + if rc != 0 { + return moerr.NewError(moerr.OUT_OF_RANGE, "Decimal64 mult overflow") + } + return nil +} + +func Decimal64Sum128(rs []types.Decimal128, vs []types.Decimal64, start int64, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + rc := C.Decimal64_VecSumToDecimal128((*C.int64_t)(unsafe.Pointer(&rs[0])), (*C.int64_t)(unsafe.Pointer(&vs[0])), + (C.int64_t)(start), (C.int64_t)(count), (*C.uint64_t)(&vps[0]), (*C.int64_t)(&zs[0]), (*C.uint64_t)(nulls.Ptr(nsp))) + if rc != 0 { + return moerr.NewError(moerr.OUT_OF_RANGE, "Decimal64 mult overflow") + } + return nil +} + +func Decimal128Sum(rs, vs []types.Decimal128, start int64, count int64, vps []uint64, zs []int64, nsp *nulls.Nulls) error { + rc := C.Decimal128_VecSum((*C.int64_t)(unsafe.Pointer(&rs[0])), (*C.int64_t)(unsafe.Pointer(&vs[0])), + (C.int64_t)(start), (C.int64_t)(count), (*C.uint64_t)(&vps[0]), (*C.int64_t)(&zs[0]), (*C.uint64_t)(nulls.Ptr(nsp))) + if rc != 0 { + return moerr.NewError(moerr.OUT_OF_RANGE, "Decimal64 mult overflow") + } + return nil +} + /* func VecSum(rs, vs []uint64, start int64, count int64, vps []uint64, zs []int64, nulls bitmap) { for i := int64(0); i < count; i++ {