From dcfed520c9b5f15d5353c7070cf207c37eb1a21c Mon Sep 17 00:00:00 2001
From: nnsgmsone <nnsmgsone@outlook.com>
Date: Tue, 19 Jul 2022 09:31:54 +0800
Subject: [PATCH] Add analyze information (#4070)

---
 pkg/container/batch/batch.go                  |  9 +++
 pkg/sql/colexec/complement/complement.go      |  2 +-
 pkg/sql/colexec/complement/complement_test.go |  4 +-
 pkg/sql/colexec/connector/connector.go        |  2 +-
 pkg/sql/colexec/connector/connector_test.go   |  6 +-
 pkg/sql/colexec/deletion/deletion.go          |  5 +-
 pkg/sql/colexec/dispatch/dispatch.go          |  2 +-
 pkg/sql/colexec/dispatch/dispatch_test.go     |  6 +-
 pkg/sql/colexec/group/group.go                |  2 +-
 pkg/sql/colexec/group/group_test.go           | 18 +++---
 pkg/sql/colexec/insert/insert.go              |  3 +-
 pkg/sql/colexec/insert/insert_test.go         |  4 +-
 pkg/sql/colexec/join/join.go                  |  2 +-
 pkg/sql/colexec/join/join_test.go             |  4 +-
 pkg/sql/colexec/left/join.go                  |  2 +-
 pkg/sql/colexec/left/join_test.go             |  4 +-
 pkg/sql/colexec/limit/limit.go                |  2 +-
 pkg/sql/colexec/limit/limit_test.go           | 14 ++--
 pkg/sql/colexec/loopcomplement/complement.go  |  2 +-
 .../colexec/loopcomplement/complement_test.go |  4 +-
 pkg/sql/colexec/loopjoin/join.go              |  2 +-
 pkg/sql/colexec/loopjoin/join_test.go         |  4 +-
 pkg/sql/colexec/loopleft/join.go              |  2 +-
 pkg/sql/colexec/loopleft/join_test.go         |  4 +-
 pkg/sql/colexec/loopsemi/join.go              |  2 +-
 pkg/sql/colexec/loopsemi/join_test.go         |  4 +-
 pkg/sql/colexec/merge/merge.go                |  2 +-
 pkg/sql/colexec/merge/merge_test.go           |  2 +-
 pkg/sql/colexec/mergegroup/group.go           |  2 +-
 pkg/sql/colexec/mergegroup/group_test.go      |  4 +-
 pkg/sql/colexec/mergelimit/limit.go           |  2 +-
 pkg/sql/colexec/mergelimit/limit_test.go      |  4 +-
 pkg/sql/colexec/mergeoffset/offset.go         |  2 +-
 pkg/sql/colexec/mergeoffset/offset_test.go    |  4 +-
 pkg/sql/colexec/mergeorder/order.go           |  2 +-
 pkg/sql/colexec/mergeorder/order_test.go      |  4 +-
 pkg/sql/colexec/mergetop/top.go               |  2 +-
 pkg/sql/colexec/mergetop/top_test.go          |  4 +-
 pkg/sql/colexec/offset/offset.go              |  2 +-
 pkg/sql/colexec/offset/offset_test.go         | 14 ++--
 pkg/sql/colexec/order/order.go                |  2 +-
 pkg/sql/colexec/order/order_test.go           | 16 ++---
 pkg/sql/colexec/output/output.go              |  2 +-
 pkg/sql/colexec/output/output_test.go         |  8 +--
 pkg/sql/colexec/product/product.go            |  2 +-
 pkg/sql/colexec/product/product_test.go       |  4 +-
 pkg/sql/colexec/projection/projection.go      |  2 +-
 pkg/sql/colexec/projection/projection_test.go |  8 +--
 pkg/sql/colexec/restrict/restrict.go          | 12 +++-
 pkg/sql/colexec/semi/join.go                  |  2 +-
 pkg/sql/colexec/semi/join_test.go             |  4 +-
 pkg/sql/colexec/top/top.go                    |  2 +-
 pkg/sql/colexec/top/top_test.go               | 18 +++---
 pkg/sql/colexec/update/update.go              |  2 +-
 pkg/vm/overload.go                            |  2 +-
 pkg/vm/process/analyze.go                     | 64 +++++++++++++++++++
 pkg/vm/process/process.go                     | 22 ++++++-
 pkg/vm/process/types.go                       | 19 +++++-
 pkg/vm/vm.go                                  |  4 +-
 59 files changed, 238 insertions(+), 122 deletions(-)
 create mode 100644 pkg/vm/process/analyze.go

diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go
index 3e85c8ec3..3aed5b6c3 100644
--- a/pkg/container/batch/batch.go
+++ b/pkg/container/batch/batch.go
@@ -234,6 +234,15 @@ func (bat *Batch) Shuffle(sels []int64, m *mheap.Mheap) error {
 	return nil
 }
 
+func (bat *Batch) Size() int {
+	var size int
+
+	for _, vec := range bat.Vecs {
+		size += len(vec.Data)
+	}
+	return size
+}
+
 func (bat *Batch) Length() int {
 	return len(bat.Zs)
 }
diff --git a/pkg/sql/colexec/complement/complement.go b/pkg/sql/colexec/complement/complement.go
index 53265e9dc..d5764c6d4 100644
--- a/pkg/sql/colexec/complement/complement.go
+++ b/pkg/sql/colexec/complement/complement.go
@@ -73,7 +73,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/complement/complement_test.go b/pkg/sql/colexec/complement/complement_test.go
index ba1b296f9..fd3aa8854 100644
--- a/pkg/sql/colexec/complement/complement_test.go
+++ b/pkg/sql/colexec/complement/complement_test.go
@@ -235,7 +235,7 @@ func TestComplement(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -282,7 +282,7 @@ func BenchmarkComplement(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/connector/connector.go b/pkg/sql/colexec/connector/connector.go
index 6ae248299..22036a30c 100644
--- a/pkg/sql/colexec/connector/connector.go
+++ b/pkg/sql/colexec/connector/connector.go
@@ -29,7 +29,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	reg := ap.Reg
 	bat := proc.Reg.InputBatch
diff --git a/pkg/sql/colexec/connector/connector_test.go b/pkg/sql/colexec/connector/connector_test.go
index 54e56d627..e168c9b12 100644
--- a/pkg/sql/colexec/connector/connector_test.go
+++ b/pkg/sql/colexec/connector/connector_test.go
@@ -80,11 +80,11 @@ func TestConnector(t *testing.T) {
 				}
 			}
 		}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		for {
 			bat := <-tc.arg.Reg.Ch
 			if bat == nil {
diff --git a/pkg/sql/colexec/deletion/deletion.go b/pkg/sql/colexec/deletion/deletion.go
index d41c30f18..823804c3c 100644
--- a/pkg/sql/colexec/deletion/deletion.go
+++ b/pkg/sql/colexec/deletion/deletion.go
@@ -16,9 +16,10 @@ package deletion
 
 import (
 	"bytes"
+	"sync/atomic"
+
 	"github.com/matrixorigin/matrixone/pkg/container/batch"
 	"github.com/matrixorigin/matrixone/pkg/vm/process"
-	"sync/atomic"
 )
 
 func String(arg interface{}, buf *bytes.Buffer) {
@@ -29,7 +30,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	p := arg.(*Argument)
 	bat := proc.Reg.InputBatch
 	if bat == nil || len(bat.Zs) == 0 {
diff --git a/pkg/sql/colexec/dispatch/dispatch.go b/pkg/sql/colexec/dispatch/dispatch.go
index 2474a1206..b46721a0b 100644
--- a/pkg/sql/colexec/dispatch/dispatch.go
+++ b/pkg/sql/colexec/dispatch/dispatch.go
@@ -32,7 +32,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	bat := proc.Reg.InputBatch
 	if bat == nil {
diff --git a/pkg/sql/colexec/dispatch/dispatch_test.go b/pkg/sql/colexec/dispatch/dispatch_test.go
index c3ec8a9e4..7e5f1a54c 100644
--- a/pkg/sql/colexec/dispatch/dispatch_test.go
+++ b/pkg/sql/colexec/dispatch/dispatch_test.go
@@ -81,11 +81,11 @@ func TestDispatch(t *testing.T) {
 				}
 			}
 		}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		for {
 			bat := <-tc.arg.Regs[0].Ch
 			if bat == nil {
diff --git a/pkg/sql/colexec/group/group.go b/pkg/sql/colexec/group/group.go
index 266f3bfbe..dfa18c862 100644
--- a/pkg/sql/colexec/group/group.go
+++ b/pkg/sql/colexec/group/group.go
@@ -56,7 +56,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	if len(ap.Exprs) == 0 {
 		return ap.ctr.process(ap, proc)
diff --git a/pkg/sql/colexec/group/group_test.go b/pkg/sql/colexec/group/group_test.go
index 6a19ed690..fe3572a89 100644
--- a/pkg/sql/colexec/group/group_test.go
+++ b/pkg/sql/colexec/group/group_test.go
@@ -109,22 +109,22 @@ func TestGroup(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = nil
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = nil
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
@@ -143,16 +143,16 @@ func BenchmarkGroup(b *testing.B) {
 			err := Prepare(tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.flgs, tc.types, tc.proc, BenchmarkRows)
-			_, err = Call(tc.proc, tc.arg)
+			_, err = Call(0, tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.flgs, tc.types, tc.proc, BenchmarkRows)
-			_, err = Call(tc.proc, tc.arg)
+			_, err = Call(0, tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = &batch.Batch{}
-			_, err = Call(tc.proc, tc.arg)
+			_, err = Call(0, tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = nil
-			_, err = Call(tc.proc, tc.arg)
+			_, err = Call(0, tc.proc, tc.arg)
 			require.NoError(t, err)
 			if tc.proc.Reg.InputBatch != nil {
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/insert/insert.go b/pkg/sql/colexec/insert/insert.go
index 3d953120f..fc69ec591 100644
--- a/pkg/sql/colexec/insert/insert.go
+++ b/pkg/sql/colexec/insert/insert.go
@@ -17,6 +17,7 @@ package insert
 import (
 	"bytes"
 	"fmt"
+
 	"github.com/matrixorigin/matrixone/pkg/container/nulls"
 	"github.com/matrixorigin/matrixone/pkg/container/types"
 	"github.com/matrixorigin/matrixone/pkg/container/vector"
@@ -42,7 +43,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
 	bat := proc.Reg.InputBatch
 	if bat == nil {
diff --git a/pkg/sql/colexec/insert/insert_test.go b/pkg/sql/colexec/insert/insert_test.go
index cda9efdbe..812b6c4d8 100644
--- a/pkg/sql/colexec/insert/insert_test.go
+++ b/pkg/sql/colexec/insert/insert_test.go
@@ -66,7 +66,7 @@ func TestInsertOperator(t *testing.T) {
 		},
 	}
 	proc.Reg.InputBatch = batch1
-	_, err := Call(proc, &argument1)
+	_, err := Call(0, proc, &argument1)
 	require.NoError(t, err)
 	println(argument1.TargetTable.(*mockRelation).result.Vecs)
 	{
@@ -104,6 +104,6 @@ func TestInsertOperator(t *testing.T) {
 		},
 	}
 	proc.Reg.InputBatch = batch2
-	_, err2 := Call(proc, &argument2)
+	_, err2 := Call(0, proc, &argument2)
 	require.Errorf(t, err2, "should return error when insert null into primary key column")
 }
diff --git a/pkg/sql/colexec/join/join.go b/pkg/sql/colexec/join/join.go
index 203dbf4a7..07b78ffc5 100644
--- a/pkg/sql/colexec/join/join.go
+++ b/pkg/sql/colexec/join/join.go
@@ -83,7 +83,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/join/join_test.go b/pkg/sql/colexec/join/join_test.go
index 7126175b5..d3f1daf33 100644
--- a/pkg/sql/colexec/join/join_test.go
+++ b/pkg/sql/colexec/join/join_test.go
@@ -236,7 +236,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -283,7 +283,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/left/join.go b/pkg/sql/colexec/left/join.go
index c38cf72dc..2d3c86649 100644
--- a/pkg/sql/colexec/left/join.go
+++ b/pkg/sql/colexec/left/join.go
@@ -83,7 +83,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/left/join_test.go b/pkg/sql/colexec/left/join_test.go
index 919db3dd0..7e66864f7 100644
--- a/pkg/sql/colexec/left/join_test.go
+++ b/pkg/sql/colexec/left/join_test.go
@@ -236,7 +236,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -283,7 +283,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/limit/limit.go b/pkg/sql/colexec/limit/limit.go
index 9b20a5c6b..9106c009e 100644
--- a/pkg/sql/colexec/limit/limit.go
+++ b/pkg/sql/colexec/limit/limit.go
@@ -32,7 +32,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 // returning only the first n tuples from its input
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	bat := proc.Reg.InputBatch
 	if bat == nil {
 		return true, nil
diff --git a/pkg/sql/colexec/limit/limit_test.go b/pkg/sql/colexec/limit/limit_test.go
index 7aeaca5b1..7711c1c1b 100644
--- a/pkg/sql/colexec/limit/limit_test.go
+++ b/pkg/sql/colexec/limit/limit_test.go
@@ -100,19 +100,19 @@ func TestLimit(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
 }
@@ -139,14 +139,14 @@ func BenchmarkLimit(b *testing.B) {
 			err := Prepare(tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			if tc.proc.Reg.InputBatch != nil {
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 			}
 			tc.proc.Reg.InputBatch = &batch.Batch{}
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 		}
 	}
 }
diff --git a/pkg/sql/colexec/loopcomplement/complement.go b/pkg/sql/colexec/loopcomplement/complement.go
index 21da437da..06580aabd 100644
--- a/pkg/sql/colexec/loopcomplement/complement.go
+++ b/pkg/sql/colexec/loopcomplement/complement.go
@@ -34,7 +34,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/loopcomplement/complement_test.go b/pkg/sql/colexec/loopcomplement/complement_test.go
index 27d67924e..b69142910 100644
--- a/pkg/sql/colexec/loopcomplement/complement_test.go
+++ b/pkg/sql/colexec/loopcomplement/complement_test.go
@@ -87,7 +87,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -118,7 +118,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/loopjoin/join.go b/pkg/sql/colexec/loopjoin/join.go
index 4827b64d8..b6ae70ad4 100644
--- a/pkg/sql/colexec/loopjoin/join.go
+++ b/pkg/sql/colexec/loopjoin/join.go
@@ -33,7 +33,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/loopjoin/join_test.go b/pkg/sql/colexec/loopjoin/join_test.go
index 6689b42f3..3e3364fcc 100644
--- a/pkg/sql/colexec/loopjoin/join_test.go
+++ b/pkg/sql/colexec/loopjoin/join_test.go
@@ -87,7 +87,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -118,7 +118,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/loopleft/join.go b/pkg/sql/colexec/loopleft/join.go
index 3acaa5f42..310fcf63c 100644
--- a/pkg/sql/colexec/loopleft/join.go
+++ b/pkg/sql/colexec/loopleft/join.go
@@ -34,7 +34,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/loopleft/join_test.go b/pkg/sql/colexec/loopleft/join_test.go
index a7b757266..970f5567a 100644
--- a/pkg/sql/colexec/loopleft/join_test.go
+++ b/pkg/sql/colexec/loopleft/join_test.go
@@ -87,7 +87,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -118,7 +118,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/loopsemi/join.go b/pkg/sql/colexec/loopsemi/join.go
index be46aecbf..6deb44659 100644
--- a/pkg/sql/colexec/loopsemi/join.go
+++ b/pkg/sql/colexec/loopsemi/join.go
@@ -33,7 +33,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/loopsemi/join_test.go b/pkg/sql/colexec/loopsemi/join_test.go
index 1f012466b..b2ee5c664 100644
--- a/pkg/sql/colexec/loopsemi/join_test.go
+++ b/pkg/sql/colexec/loopsemi/join_test.go
@@ -87,7 +87,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -118,7 +118,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/merge/merge.go b/pkg/sql/colexec/merge/merge.go
index 3632f507b..c73db0810 100644
--- a/pkg/sql/colexec/merge/merge.go
+++ b/pkg/sql/colexec/merge/merge.go
@@ -30,7 +30,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
 	for {
 		if len(proc.Reg.MergeReceivers) == 0 {
diff --git a/pkg/sql/colexec/merge/merge_test.go b/pkg/sql/colexec/merge/merge_test.go
index 83c5c86c1..be703a638 100644
--- a/pkg/sql/colexec/merge/merge_test.go
+++ b/pkg/sql/colexec/merge/merge_test.go
@@ -78,7 +78,7 @@ func TestMerge(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				if tc.proc.Reg.InputBatch != nil {
 					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 				}
diff --git a/pkg/sql/colexec/mergegroup/group.go b/pkg/sql/colexec/mergegroup/group.go
index 3923efb5a..4355fde87 100644
--- a/pkg/sql/colexec/mergegroup/group.go
+++ b/pkg/sql/colexec/mergegroup/group.go
@@ -37,7 +37,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/mergegroup/group_test.go b/pkg/sql/colexec/mergegroup/group_test.go
index 6554e462d..5e9e59a93 100644
--- a/pkg/sql/colexec/mergegroup/group_test.go
+++ b/pkg/sql/colexec/mergegroup/group_test.go
@@ -118,7 +118,7 @@ func TestGroup(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				if tc.proc.Reg.InputBatch != nil {
 					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 				}
@@ -156,7 +156,7 @@ func BenchmarkGroup(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					if tc.proc.Reg.InputBatch != nil {
 						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 					}
diff --git a/pkg/sql/colexec/mergelimit/limit.go b/pkg/sql/colexec/mergelimit/limit.go
index 4ee9c4e2e..cd01a418d 100644
--- a/pkg/sql/colexec/mergelimit/limit.go
+++ b/pkg/sql/colexec/mergelimit/limit.go
@@ -33,7 +33,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
 	for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
 		reg := proc.Reg.MergeReceivers[i]
diff --git a/pkg/sql/colexec/mergelimit/limit_test.go b/pkg/sql/colexec/mergelimit/limit_test.go
index 4a8addc0e..90f28100f 100644
--- a/pkg/sql/colexec/mergelimit/limit_test.go
+++ b/pkg/sql/colexec/mergelimit/limit_test.go
@@ -81,7 +81,7 @@ func TestLimit(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				if tc.proc.Reg.InputBatch != nil {
 					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 				}
@@ -124,7 +124,7 @@ func BenchmarkLimit(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					if tc.proc.Reg.InputBatch != nil {
 						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 					}
diff --git a/pkg/sql/colexec/mergeoffset/offset.go b/pkg/sql/colexec/mergeoffset/offset.go
index ba6718a7c..458afb18b 100644
--- a/pkg/sql/colexec/mergeoffset/offset.go
+++ b/pkg/sql/colexec/mergeoffset/offset.go
@@ -33,7 +33,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
 	for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
 		reg := proc.Reg.MergeReceivers[i]
diff --git a/pkg/sql/colexec/mergeoffset/offset_test.go b/pkg/sql/colexec/mergeoffset/offset_test.go
index ff09a6bc2..0ea1e33b0 100644
--- a/pkg/sql/colexec/mergeoffset/offset_test.go
+++ b/pkg/sql/colexec/mergeoffset/offset_test.go
@@ -81,7 +81,7 @@ func TestOffset(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				if tc.proc.Reg.InputBatch != nil {
 					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 				}
@@ -124,7 +124,7 @@ func BenchmarkOffset(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					if tc.proc.Reg.InputBatch != nil {
 						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 					}
diff --git a/pkg/sql/colexec/mergeorder/order.go b/pkg/sql/colexec/mergeorder/order.go
index 41e5266ee..58dbc8ea8 100644
--- a/pkg/sql/colexec/mergeorder/order.go
+++ b/pkg/sql/colexec/mergeorder/order.go
@@ -44,7 +44,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/mergeorder/order_test.go b/pkg/sql/colexec/mergeorder/order_test.go
index b977fb1c9..bad24eb18 100644
--- a/pkg/sql/colexec/mergeorder/order_test.go
+++ b/pkg/sql/colexec/mergeorder/order_test.go
@@ -86,7 +86,7 @@ func TestOrder(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				if tc.proc.Reg.InputBatch != nil {
 					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 				}
@@ -124,7 +124,7 @@ func BenchmarkOrder(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					if tc.proc.Reg.InputBatch != nil {
 						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 					}
diff --git a/pkg/sql/colexec/mergetop/top.go b/pkg/sql/colexec/mergetop/top.go
index ffcce6e28..d5ee0f20c 100644
--- a/pkg/sql/colexec/mergetop/top.go
+++ b/pkg/sql/colexec/mergetop/top.go
@@ -47,7 +47,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/mergetop/top_test.go b/pkg/sql/colexec/mergetop/top_test.go
index 79e920154..809d3cc04 100644
--- a/pkg/sql/colexec/mergetop/top_test.go
+++ b/pkg/sql/colexec/mergetop/top_test.go
@@ -87,7 +87,7 @@ func TestTop(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				if tc.proc.Reg.InputBatch != nil {
 					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 				}
@@ -125,7 +125,7 @@ func BenchmarkTop(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					if tc.proc.Reg.InputBatch != nil {
 						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 					}
diff --git a/pkg/sql/colexec/offset/offset.go b/pkg/sql/colexec/offset/offset.go
index f9efd80f6..865363379 100644
--- a/pkg/sql/colexec/offset/offset.go
+++ b/pkg/sql/colexec/offset/offset.go
@@ -31,7 +31,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	bat := proc.Reg.InputBatch
 	if bat == nil {
 		return true, nil
diff --git a/pkg/sql/colexec/offset/offset_test.go b/pkg/sql/colexec/offset/offset_test.go
index db2853c16..5714256b1 100644
--- a/pkg/sql/colexec/offset/offset_test.go
+++ b/pkg/sql/colexec/offset/offset_test.go
@@ -100,19 +100,19 @@ func TestOffset(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
 }
@@ -139,14 +139,14 @@ func BenchmarkOffset(b *testing.B) {
 			err := Prepare(tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			if tc.proc.Reg.InputBatch != nil {
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 			}
 			tc.proc.Reg.InputBatch = &batch.Batch{}
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 		}
 	}
 }
diff --git a/pkg/sql/colexec/order/order.go b/pkg/sql/colexec/order/order.go
index 058352daf..062c964f2 100644
--- a/pkg/sql/colexec/order/order.go
+++ b/pkg/sql/colexec/order/order.go
@@ -50,7 +50,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	bat := proc.Reg.InputBatch
 	if bat == nil {
 		return true, nil
diff --git a/pkg/sql/colexec/order/order_test.go b/pkg/sql/colexec/order/order_test.go
index 69088e01b..21181c966 100644
--- a/pkg/sql/colexec/order/order_test.go
+++ b/pkg/sql/colexec/order/order_test.go
@@ -75,19 +75,19 @@ func TestOrder(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
 }
@@ -105,19 +105,19 @@ func BenchmarkOrder(b *testing.B) {
 			err := Prepare(tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			if tc.proc.Reg.InputBatch != nil {
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 			}
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			if tc.proc.Reg.InputBatch != nil {
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 			}
 			tc.proc.Reg.InputBatch = &batch.Batch{}
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 		}
 	}
 }
diff --git a/pkg/sql/colexec/output/output.go b/pkg/sql/colexec/output/output.go
index 56798d02e..784cb4efd 100644
--- a/pkg/sql/colexec/output/output.go
+++ b/pkg/sql/colexec/output/output.go
@@ -28,7 +28,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	if bat := proc.Reg.InputBatch; bat != nil && len(bat.Zs) > 0 {
 		if err := ap.Func(ap.Data, bat); err != nil {
diff --git a/pkg/sql/colexec/output/output_test.go b/pkg/sql/colexec/output/output_test.go
index 33b2ef91c..89e0638c9 100644
--- a/pkg/sql/colexec/output/output_test.go
+++ b/pkg/sql/colexec/output/output_test.go
@@ -83,16 +83,16 @@ func TestOutput(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = nil
-		_, err = Call(tc.proc, tc.arg)
+		_, err = Call(0, tc.proc, tc.arg)
 		require.NoError(t, err)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
diff --git a/pkg/sql/colexec/product/product.go b/pkg/sql/colexec/product/product.go
index 35f8b5d09..4b4416943 100644
--- a/pkg/sql/colexec/product/product.go
+++ b/pkg/sql/colexec/product/product.go
@@ -32,7 +32,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/product/product_test.go b/pkg/sql/colexec/product/product_test.go
index 91a54c99b..33cf8be5e 100644
--- a/pkg/sql/colexec/product/product_test.go
+++ b/pkg/sql/colexec/product/product_test.go
@@ -85,7 +85,7 @@ func TestProduct(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -116,7 +116,7 @@ func BenchmarkProduct(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/projection/projection.go b/pkg/sql/colexec/projection/projection.go
index 9281d5fca..d4c4899bd 100644
--- a/pkg/sql/colexec/projection/projection.go
+++ b/pkg/sql/colexec/projection/projection.go
@@ -38,7 +38,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	bat := proc.Reg.InputBatch
 	if bat == nil {
 		return true, nil
diff --git a/pkg/sql/colexec/projection/projection_test.go b/pkg/sql/colexec/projection/projection_test.go
index 9c29c80de..7eb9b51e8 100644
--- a/pkg/sql/colexec/projection/projection_test.go
+++ b/pkg/sql/colexec/projection/projection_test.go
@@ -81,19 +81,19 @@ func TestProjection(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
 }
diff --git a/pkg/sql/colexec/restrict/restrict.go b/pkg/sql/colexec/restrict/restrict.go
index e7151bb20..1d5192bca 100644
--- a/pkg/sql/colexec/restrict/restrict.go
+++ b/pkg/sql/colexec/restrict/restrict.go
@@ -34,15 +34,19 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(idx int, proc *process.Process, arg interface{}) (bool, error) {
 	bat := proc.Reg.InputBatch
 	if bat == nil {
 		return true, nil
 	}
-	if len(bat.Zs) == 0 {
+	if bat.Length() == 0 {
 		return false, nil
 	}
 	ap := arg.(*Argument)
+	anal := proc.GetAnalyze(idx)
+	anal.Start()
+	defer anal.Stop()
+	anal.Input(bat)
 	vec, err := colexec.EvalExpr(bat, proc, ap.E)
 	if err != nil {
 		bat.Clean(proc.Mp)
@@ -58,14 +62,16 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
 			bat.Shrink(nil)
 		}
 	} else {
-		sels := make([]int64, 0, 8)
+		sels := proc.GetSels()
 		for i, b := range bs {
 			if b {
 				sels = append(sels, int64(i))
 			}
 		}
 		bat.Shrink(sels)
+		proc.PutSels(sels)
 	}
+	anal.Output(bat)
 	proc.Reg.InputBatch = bat
 	return false, nil
 }
diff --git a/pkg/sql/colexec/semi/join.go b/pkg/sql/colexec/semi/join.go
index f976d91c2..4a4090502 100644
--- a/pkg/sql/colexec/semi/join.go
+++ b/pkg/sql/colexec/semi/join.go
@@ -73,7 +73,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/semi/join_test.go b/pkg/sql/colexec/semi/join_test.go
index 8fda25fa9..3ff1a1096 100644
--- a/pkg/sql/colexec/semi/join_test.go
+++ b/pkg/sql/colexec/semi/join_test.go
@@ -236,7 +236,7 @@ func TestJoin(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
-			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 				break
 			}
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
@@ -283,7 +283,7 @@ func BenchmarkJoin(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
-				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
 					break
 				}
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
diff --git a/pkg/sql/colexec/top/top.go b/pkg/sql/colexec/top/top.go
index a0668e57b..c269a889d 100644
--- a/pkg/sql/colexec/top/top.go
+++ b/pkg/sql/colexec/top/top.go
@@ -46,7 +46,7 @@ func Prepare(_ *process.Process, arg interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	ap := arg.(*Argument)
 	ctr := ap.ctr
 	for {
diff --git a/pkg/sql/colexec/top/top_test.go b/pkg/sql/colexec/top/top_test.go
index 036fca97d..1bd316d23 100644
--- a/pkg/sql/colexec/top/top_test.go
+++ b/pkg/sql/colexec/top/top_test.go
@@ -74,18 +74,18 @@ func TestTop(t *testing.T) {
 		err := Prepare(tc.proc, tc.arg)
 		require.NoError(t, err)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = &batch.Batch{}
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		if tc.proc.Reg.InputBatch != nil {
 			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 		}
 		tc.proc.Reg.InputBatch = nil
-		_, _ = Call(tc.proc, tc.arg)
+		_, _ = Call(0, tc.proc, tc.arg)
 		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
 	}
 }
@@ -103,13 +103,13 @@ func BenchmarkTop(b *testing.B) {
 			err := Prepare(tc.proc, tc.arg)
 			require.NoError(t, err)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = &batch.Batch{}
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
-			_, _ = Call(tc.proc, tc.arg)
+			_, _ = Call(0, tc.proc, tc.arg)
 			if tc.proc.Reg.InputBatch != nil {
 				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
 			}
diff --git a/pkg/sql/colexec/update/update.go b/pkg/sql/colexec/update/update.go
index b86dd2de3..f0abaf98d 100644
--- a/pkg/sql/colexec/update/update.go
+++ b/pkg/sql/colexec/update/update.go
@@ -30,7 +30,7 @@ func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
-func Call(proc *process.Process, arg interface{}) (bool, error) {
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
 	p := arg.(*Argument)
 	bat := proc.Reg.InputBatch
 	if bat == nil || len(bat.Zs) == 0 {
diff --git a/pkg/vm/overload.go b/pkg/vm/overload.go
index f1f764f93..c32b1fcc4 100644
--- a/pkg/vm/overload.go
+++ b/pkg/vm/overload.go
@@ -119,7 +119,7 @@ var prepareFunc = [...]func(*process.Process, interface{}) error{
 	Update:   update.Prepare,
 }
 
-var execFunc = [...]func(*process.Process, interface{}) (bool, error){
+var execFunc = [...]func(int, *process.Process, interface{}) (bool, error){
 	Top:        top.Call,
 	Join:       join.Call,
 	Semi:       semi.Call,
diff --git a/pkg/vm/process/analyze.go b/pkg/vm/process/analyze.go
new file mode 100644
index 000000000..12343513c
--- /dev/null
+++ b/pkg/vm/process/analyze.go
@@ -0,0 +1,64 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package process
+
+import (
+	"sync/atomic"
+	"time"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+)
+
+func NewAnalyzeInfo(nodeId int32) *AnalyzeInfo {
+	return &AnalyzeInfo{
+		NodeId:       nodeId,
+		InputRows:    0,
+		OutputRows:   0,
+		TimeConsumed: 0,
+		InputSize:    0,
+		OutputSize:   0,
+		MemorySize:   0,
+	}
+}
+
+func (a *analyze) Start() {
+	a.start = time.Now()
+}
+
+func (a *analyze) Stop() {
+	if a.analInfo != nil {
+		atomic.AddInt64(&a.analInfo.TimeConsumed, int64(time.Now().Sub(a.start)/time.Millisecond))
+	}
+}
+
+func (a *analyze) Alloc(size int64) {
+	if a.analInfo != nil {
+		atomic.AddInt64(&a.analInfo.MemorySize, size)
+	}
+}
+
+func (a *analyze) Input(bat *batch.Batch) {
+	if a.analInfo != nil {
+		atomic.AddInt64(&a.analInfo.InputSize, int64(bat.Size()))
+		atomic.AddInt64(&a.analInfo.InputRows, int64(bat.Length()))
+	}
+}
+
+func (a *analyze) Output(bat *batch.Batch) {
+	if a.analInfo != nil {
+		atomic.AddInt64(&a.analInfo.OutputSize, int64(bat.Size()))
+		atomic.AddInt64(&a.analInfo.OutputRows, int64(bat.Length()))
+	}
+}
diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go
index fccc29e1c..07e0ab74c 100644
--- a/pkg/vm/process/process.go
+++ b/pkg/vm/process/process.go
@@ -39,7 +39,7 @@ func NewFromProc(m *mheap.Mheap, p *Process, regNumber int) *Process {
 	proc.Id = p.Id
 	proc.Lim = p.Lim
 	proc.Snapshot = p.Snapshot
-	proc.AnalInfo = p.AnalInfo
+	proc.AnalInfos = p.AnalInfos
 	proc.SessionInfo = p.SessionInfo
 
 	// reg and cancel
@@ -67,6 +67,26 @@ func PutSels(sels []int64, proc *Process) {
 	proc.Reg.Ss = append(proc.Reg.Ss, sels)
 }
 
+func (proc *Process) GetSels() []int64 {
+	if len(proc.Reg.Ss) == 0 {
+		return make([]int64, 0, 16)
+	}
+	sels := proc.Reg.Ss[0]
+	proc.Reg.Ss = proc.Reg.Ss[1:]
+	return sels[:0]
+}
+
+func (proc *Process) GetAnalyze(idx int) Analyze {
+	if idx >= len(proc.AnalInfos) {
+		return &analyze{analInfo: nil}
+	}
+	return &analyze{analInfo: proc.AnalInfos[idx]}
+}
+
+func (proc *Process) PutSels(sels []int64) {
+	proc.Reg.Ss = append(proc.Reg.Ss, sels)
+}
+
 func (proc *Process) GetBoolTyp(typ types.Type) (typ2 types.Type) {
 	typ.Oid = types.T_bool
 	return typ
diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go
index 3834135c3..d532e9d9e 100644
--- a/pkg/vm/process/types.go
+++ b/pkg/vm/process/types.go
@@ -16,11 +16,21 @@ package process
 
 import (
 	"context"
+	"time"
 
 	"github.com/matrixorigin/matrixone/pkg/container/batch"
 	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
 )
 
+// Analyze analyze information for operator
+type Analyze interface {
+	Stop()
+	Start()
+	Alloc(int64)
+	Input(*batch.Batch)
+	Output(*batch.Batch)
+}
+
 // WaitRegister channel
 type WaitRegister struct {
 	Ctx context.Context
@@ -63,7 +73,7 @@ type SessionInfo struct {
 	Version      string
 }
 
-// explain analyze information for query
+// AnalyzeInfo  analyze information for query
 type AnalyzeInfo struct {
 	// NodeId, index of query's node list
 	NodeId int32
@@ -97,7 +107,7 @@ type Process struct {
 	// snapshot is transaction context
 	Snapshot []byte
 
-	AnalInfo *AnalyzeInfo
+	AnalInfos []*AnalyzeInfo
 
 	SessionInfo SessionInfo
 
@@ -105,6 +115,11 @@ type Process struct {
 	Cancel context.CancelFunc
 }
 
+type analyze struct {
+	start    time.Time
+	analInfo *AnalyzeInfo
+}
+
 func (si *SessionInfo) GetUser() string {
 	return si.User
 }
diff --git a/pkg/vm/vm.go b/pkg/vm/vm.go
index e7bb3dd78..2c9f47a0b 100644
--- a/pkg/vm/vm.go
+++ b/pkg/vm/vm.go
@@ -49,8 +49,8 @@ func Run(ins Instructions, proc *process.Process) (end bool, err error) {
 			err = moerr.NewPanicError(e)
 		}
 	}()
-	for _, in := range ins {
-		if ok, err = execFunc[in.Op](proc, in.Arg); err != nil {
+	for i, in := range ins {
+		if ok, err = execFunc[in.Op](i, proc, in.Arg); err != nil {
 			return ok || end, err
 		}
 		if ok { // ok is true shows that at least one operator has done its work
-- 
GitLab