Skip to content
Snippets Groups Projects
Unverified Commit e403fde8 authored by chenmingsong's avatar chenmingsong Committed by GitHub
Browse files

fix union operator bug and update some comments for hash map (#4208)

parent 8e119bcb
No related branches found
No related tags found
No related merge requests found
......@@ -32,8 +32,11 @@ type Iterator interface {
// return value is the corresponding the group number,
// if it is 0 it means that the corresponding value cannot be found
Find(start, count int, vecs []*vector.Vector, scales []int32) ([]uint64, []int64)
// Insert vecs[start, start+count) into hashmap
// the return value corresponds to the corresponding group number(start with 1)
// the return value corresponds to the corresponding group number(start with 1)
// WATCH THAT: we do not update the rows of Hash Map at Insert Method because of Speed Performance,
// If need it, you should call the hash map's AddGroup function by yourself.
Insert(start, count int, vecs []*vector.Vector, scales []int32) ([]uint64, []int64)
}
......
......@@ -38,7 +38,7 @@ func Prepare(_ *process.Process, argument interface{}) error {
func Call(idx int, proc *process.Process, argument interface{}) (bool, error) {
var err error
arg := argument.(*Argument)
// we make this assertion here for now, the real situation of table size
// we make an assertion here for now, the real situation of table size
// should be noted by the execution plan
smallTableIndex, bigTableIndex := 1, 0
......@@ -47,11 +47,11 @@ func Call(idx int, proc *process.Process, argument interface{}) (bool, error) {
analyze.Start()
defer analyze.Stop()
// step1: deal the small table. if new row, put into bat.
// step1: deal the small table. if new row, put it into bat.
if err = arg.ctr.insert(proc, analyze, smallTableIndex); err != nil {
return false, err
}
// step2: deal the big table. if new row, put into bat.
// step2: deal the big table. if new row, put it into bat.
if err = arg.ctr.insert(proc, analyze, bigTableIndex); err != nil {
return false, err
}
......@@ -91,7 +91,7 @@ func (ctr *Container) insert(proc *process.Process, analyze process.Analyze, ind
scales[i] = bat.Vecs[i].Typ.Scale
}
for i := 0; i < count; i += hashmap.UnitLimit {
insertCount := 0
oldHashGroup := ctr.hashTable.GroupCount()
iterator := ctr.hashTable.NewIterator()
n := count - i
......@@ -103,12 +103,14 @@ func (ctr *Container) insert(proc *process.Process, analyze process.Analyze, ind
copy(inserted[:n], restoreInserted[:n])
for j, v := range vs {
if v > ctr.hashTable.GroupCount() {
insertCount++
ctr.hashTable.AddGroup()
inserted[j] = 1
ctr.bat.Zs = append(ctr.bat.Zs, 1)
}
}
newHashGroup := ctr.hashTable.GroupCount()
insertCount := int(newHashGroup - oldHashGroup)
if insertCount > 0 {
for pos := range bat.Vecs {
if err = vector.UnionBatch(ctr.bat.Vecs[pos], bat.Vecs[pos], int64(i), insertCount, inserted[:n], proc.Mp); err != nil {
......
......@@ -34,34 +34,43 @@ type unionTestCase struct {
func TestUnion(t *testing.T) {
proc := testutil.NewProcess()
// [4 rows + 3 rows, 2 columns] union [5 rows + 5 rows, 2 columns]
// [4 rows + 3 rows, 2 columns] union [3 rows + 4 rows, 2 columns]
/*
{1, 1} {1, 1}
{2, 2} {2, 2}
{3, 3} {3, 3}
{4, 4} union {1, 1}
{1, 1} {2, 2}
{2, 2} {3, 3}
{3, 3} {4, 4}
*/
c := newUnionTestCase(
proc,
[]*batch.Batch{
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}),
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}),
}, nil),
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}),
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}),
}, nil),
},
[]*batch.Batch{
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}),
testutil.NewVector(3, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3}),
}, nil),
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(5, types.T_int64.ToType(), proc.Mp, true, nil),
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}),
testutil.NewVector(4, types.T_int64.ToType(), proc.Mp, false, []int64{1, 2, 3, 4}),
}, nil),
},
)
......@@ -72,8 +81,8 @@ func TestUnion(t *testing.T) {
{
result := c.arg.ctr.bat
require.NoError(t, err)
require.Equal(t, 2, len(result.Vecs)) // 2 columns
require.Equal(t, 17, vector.Length(result.Vecs[0])) // 17 = (4+3+5+5) rows
require.Equal(t, 2, len(result.Vecs)) // 2 columns
require.Equal(t, 4, vector.Length(result.Vecs[0])) // 4 rows
}
c.proc.Reg.InputBatch.Clean(c.proc.Mp) // clean the final result
require.Equal(t, int64(0), mheap.Size(c.proc.Mp))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment