diff --git a/pkg/sql/colexec2/connector/connector_test.go b/pkg/sql/colexec2/connector/connector_test.go
index e6f3865416716f626f035839454bbe148f439761..140f30799f2447b614352574490d2b762baf2d98 100644
--- a/pkg/sql/colexec2/connector/connector_test.go
+++ b/pkg/sql/colexec2/connector/connector_test.go
@@ -71,12 +71,31 @@ func TestPrepare(t *testing.T) {
 func TestConnector(t *testing.T) {
 	for _, tc := range tcs {
 		Prepare(tc.proc, tc.arg)
-		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
+		bat := newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.InputBatch = bat
+		{
+			for _, vec := range bat.Vecs {
+				if vec.Or {
+					mheap.Free(tc.proc.Mp, vec.Data)
+				}
+			}
+		}
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = &batch.Batch{}
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		for {
+			bat := <-tc.arg.Reg.Ch
+			if bat == nil {
+				break
+			}
+			if len(bat.Zs) == 0 {
+				continue
+			}
+			batch.Clean(bat, tc.proc.Mp)
+		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
diff --git a/pkg/sql/colexec2/group/group.go b/pkg/sql/colexec2/group/group.go
index 6d3c5a3b95878bb6ec4e9a3bb8264398292a04d7..5ebbcd610f6026b1a18c8515d9603b0db67b8bf5 100644
--- a/pkg/sql/colexec2/group/group.go
+++ b/pkg/sql/colexec2/group/group.go
@@ -423,6 +423,9 @@ func (ctr *Container) processHStr(bat *batch.Batch, ap *Argument, proc *process.
 		if err := ctr.batchFill(i, n, bat, ap, proc); err != nil {
 			return err
 		}
+		for k := 0; k < n; k++ {
+			ctr.hstr.keys[k] = ctr.hstr.keys[k][:0]
+		}
 	}
 	return nil
 }
@@ -505,13 +508,13 @@ func fillStringGroup[T any](ctr *Container, vec *vector.Vector, keys []T, n int,
 	}
 }
 
-func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz uint32, start int) {
-	vs := vector.DecodeFixedCol[T](vec, int(sz))
-	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*1)[:len(vs)*1]
+func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz int, start int) {
+	vs := vector.DecodeFixedCol[T](vec, sz)
+	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*sz)[:len(vs)*sz]
 	if !nulls.Any(vec.Nsp) {
 		for i := 0; i < n; i++ {
 			ctr.hstr.keys[i] = append(ctr.hstr.keys[i], byte(0))
-			ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*1:(i+start+1)*1]...)
+			ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
 		}
 	} else {
 		for i := 0; i < n; i++ {
@@ -519,7 +522,7 @@ func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz uint32, s
 				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], byte(1))
 			} else {
 				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], byte(0))
-				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*1:(i+start+1)*1]...)
+				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
 			}
 		}
 	}
diff --git a/pkg/sql/colexec2/group/group_test.go b/pkg/sql/colexec2/group/group_test.go
index 633a1c2cd64f973e02fa08d3b7694be2cbded184..e266375728542dbac531a825afb5c645d639f322 100644
--- a/pkg/sql/colexec2/group/group_test.go
+++ b/pkg/sql/colexec2/group/group_test.go
@@ -116,12 +116,16 @@ func TestGroup(t *testing.T) {
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
-func BenchmarkTop(b *testing.B) {
+func BenchmarkGroup(b *testing.B) {
 	for i := 0; i < b.N; i++ {
 		hm := host.New(1 << 30)
 		gm := guest.New(1<<30, hm)
@@ -140,6 +144,9 @@ func BenchmarkTop(b *testing.B) {
 			Call(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
 			Call(tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
 		}
 	}
 }
diff --git a/pkg/sql/colexec2/join/join.go b/pkg/sql/colexec2/join/join.go
new file mode 100644
index 0000000000000000000000000000000000000000..fd5c1e4ca89500fde0fc558f5613329defc85e05
--- /dev/null
+++ b/pkg/sql/colexec2/join/join.go
@@ -0,0 +1,491 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package join
+
+import (
+	"bytes"
+	"unsafe"
+
+	batch "github.com/matrixorigin/matrixone/pkg/container/batch2"
+	"github.com/matrixorigin/matrixone/pkg/container/hashtable"
+	"github.com/matrixorigin/matrixone/pkg/container/nulls"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/container/vector"
+	process "github.com/matrixorigin/matrixone/pkg/vm/process2"
+)
+
+func init() {
+	OneInt64s = make([]int64, UnitLimit)
+	for i := range OneInt64s {
+		OneInt64s[i] = 1
+	}
+}
+
+func String(_ interface{}, buf *bytes.Buffer) {
+	buf.WriteString(" ⨝ ")
+}
+
+func Prepare(proc *process.Process, arg interface{}) error {
+	ap := arg.(*Argument)
+	ap.ctr = new(Container)
+	ap.ctr.keys = make([][]byte, UnitLimit)
+	ap.ctr.values = make([]uint64, UnitLimit)
+	ap.ctr.zValues = make([]int64, UnitLimit)
+	ap.ctr.inserted = make([]uint8, UnitLimit)
+	ap.ctr.zInserted = make([]uint8, UnitLimit)
+	ap.ctr.strHashStates = make([][3]uint64, UnitLimit)
+	ap.ctr.strHashMap = &hashtable.StringHashMap{}
+	ap.ctr.strHashMap.Init()
+	mp := make(map[int32]int)
+	for i, cond := range ap.Conditions[0] { // aligning the precision of decimal
+		mp[ap.Conditions[1][i].Pos]++
+		switch cond.Typ.Oid {
+		case types.T_decimal64:
+			typ := ap.Conditions[1][i]
+			if typ.Scale > cond.Typ.Scale {
+				cond.Scale = typ.Scale - cond.Typ.Scale
+			} else if typ.Scale < cond.Typ.Scale {
+				ap.Conditions[1][i].Scale = cond.Typ.Scale - typ.Scale
+			}
+		case types.T_decimal128:
+			typ := ap.Conditions[1][i]
+			if typ.Scale > cond.Typ.Scale {
+				cond.Scale = typ.Scale - cond.Typ.Scale
+			} else if typ.Scale < cond.Typ.Scale {
+				ap.Conditions[1][i].Scale = cond.Typ.Scale - typ.Scale
+			}
+		}
+	}
+	{
+		flg := false
+		for _, rp := range ap.Result {
+			if rp.Rel == 1 {
+				ap.ctr.poses = append(ap.ctr.poses, rp.Pos)
+				if _, ok := mp[rp.Pos]; ok {
+					continue
+				}
+				flg = true
+			}
+		}
+		ap.ctr.flg = flg
+	}
+	ap.ctr.decimal64Slice = make([]types.Decimal64, UnitLimit)
+	ap.ctr.decimal128Slice = make([]types.Decimal128, UnitLimit)
+	return nil
+}
+
+func Call(proc *process.Process, arg interface{}) (bool, error) {
+	ap := arg.(*Argument)
+	ctr := ap.ctr
+	for {
+		switch ctr.state {
+		case Build:
+			if err := ctr.build(ap, proc); err != nil {
+				ctr.state = End
+				return true, err
+			}
+			ctr.state = Probe
+		case Probe:
+			bat := <-proc.Reg.MergeReceivers[0].Ch
+			if bat == nil {
+				ctr.state = End
+				batch.Clean(ctr.bat, proc.Mp)
+				continue
+			}
+			if len(bat.Zs) == 0 {
+				continue
+			}
+			if err := ctr.probe(bat, ap, proc); err != nil {
+				ctr.state = End
+				proc.Reg.InputBatch = nil
+				return true, err
+			}
+			return false, nil
+		default:
+			proc.Reg.InputBatch = nil
+			return true, nil
+		}
+	}
+}
+
+func (ctr *Container) build(ap *Argument, proc *process.Process) error {
+	if ap.IsPreBuild {
+		bat := <-proc.Reg.MergeReceivers[1].Ch
+		ctr.bat = bat
+		ctr.strHashMap = bat.Ht.(*hashtable.StringHashMap)
+	}
+	if ctr.flg {
+		var err error
+
+		for {
+			bat := <-proc.Reg.MergeReceivers[1].Ch
+			if bat == nil {
+				break
+			}
+			if len(bat.Zs) == 0 {
+				continue
+			}
+			if ctr.bat == nil {
+				ctr.bat = batch.New(len(bat.Vecs))
+				for i, vec := range bat.Vecs {
+					ctr.bat.Vecs[i] = vector.New(vec.Typ)
+				}
+			}
+			if ctr.bat, err = ctr.bat.Append(proc.Mp, bat); err != nil {
+				batch.Clean(bat, proc.Mp)
+				batch.Clean(ctr.bat, proc.Mp)
+				return err
+			}
+			batch.Clean(bat, proc.Mp)
+		}
+		count := len(ctr.bat.Zs)
+		for i := 0; i < count; i += UnitLimit {
+			n := count - i
+			if n > UnitLimit {
+				n = UnitLimit
+			}
+			copy(ctr.zValues[:n], OneInt64s[:n])
+			for _, cond := range ap.Conditions[1] {
+				vec := ctr.bat.Vecs[cond.Pos]
+				switch typLen := vec.Typ.Oid.FixedLength(); typLen {
+				case 1:
+					fillGroupStr[uint8](ctr, vec, n, 1, i)
+				case 2:
+					fillGroupStr[uint16](ctr, vec, n, 2, i)
+				case 4:
+					fillGroupStr[uint32](ctr, vec, n, 4, i)
+				case 8:
+					fillGroupStr[uint64](ctr, vec, n, 8, i)
+				case -8:
+					if cond.Scale > 0 {
+						fillGroupStrWithDecimal64(ctr, vec, n, i, cond.Scale)
+					} else {
+						fillGroupStr[uint64](ctr, vec, n, 8, i)
+					}
+				case -16:
+					if cond.Scale > 0 {
+						fillGroupStrWithDecimal128(ctr, vec, n, i, cond.Scale)
+					} else {
+						fillGroupStr[types.Decimal128](ctr, vec, n, 16, i)
+					}
+				default:
+					vs := vec.Col.(*types.Bytes)
+					if !nulls.Any(vec.Nsp) {
+						for k := 0; k < n; k++ {
+							ctr.keys[k] = append(ctr.keys[k], vs.Get(int64(i+k))...)
+						}
+					} else {
+						for k := 0; k < n; k++ {
+							if vec.Nsp.Np.Contains(uint64(i + k)) {
+								ctr.zValues[i] = 0
+							} else {
+								ctr.keys[k] = append(ctr.keys[k], vs.Get(int64(i+k))...)
+							}
+						}
+					}
+				}
+			}
+			for k := 0; k < n; k++ {
+				if l := len(ctr.keys[k]); l < 16 {
+					ctr.keys[k] = append(ctr.keys[k], hashtable.StrKeyPadding[l:]...)
+				}
+			}
+			ctr.strHashMap.InsertStringBatchWithRing(ctr.zValues, ctr.strHashStates, ctr.keys[:n], ctr.values)
+			for k, v := range ctr.values[:n] {
+				if ctr.zValues[k] == 0 {
+					continue
+				}
+				if v > ctr.rows {
+					ctr.sels = append(ctr.sels, make([]int64, 0, 8))
+				}
+				ai := int64(v) - 1
+				ctr.sels[ai] = append(ctr.sels[ai], int64(i+k))
+			}
+			for k := 0; k < n; k++ {
+				ctr.keys[k] = ctr.keys[k][:0]
+			}
+		}
+		return nil
+	}
+	for {
+		bat := <-proc.Reg.MergeReceivers[1].Ch
+		if bat == nil {
+			return nil
+		}
+		if len(bat.Zs) == 0 {
+			continue
+		}
+		if ctr.bat == nil {
+			ctr.bat = batch.New(len(bat.Vecs))
+			for _, pos := range ctr.poses {
+				ctr.bat.Vecs[pos] = vector.New(bat.Vecs[pos].Typ)
+			}
+		}
+		count := len(bat.Zs)
+		for i := 0; i < count; i += UnitLimit {
+			n := count - i
+			if n > UnitLimit {
+				n = UnitLimit
+			}
+			copy(ctr.zValues[:n], OneInt64s[:n])
+			for _, cond := range ap.Conditions[1] {
+				vec := bat.Vecs[cond.Pos]
+				switch typLen := vec.Typ.Oid.FixedLength(); typLen {
+				case 1:
+					fillGroupStr[uint8](ctr, vec, n, 1, i)
+				case 2:
+					fillGroupStr[uint16](ctr, vec, n, 2, i)
+				case 4:
+					fillGroupStr[uint32](ctr, vec, n, 4, i)
+				case 8:
+					fillGroupStr[uint64](ctr, vec, n, 8, i)
+				case -8:
+					if cond.Scale > 0 {
+						fillGroupStrWithDecimal64(ctr, vec, n, i, cond.Scale)
+					} else {
+						fillGroupStr[uint64](ctr, vec, n, 8, i)
+					}
+				case -16:
+					if cond.Scale > 0 {
+						fillGroupStrWithDecimal128(ctr, vec, n, i, cond.Scale)
+					} else {
+						fillGroupStr[types.Decimal128](ctr, vec, n, 16, i)
+					}
+				default:
+					vs := vec.Col.(*types.Bytes)
+					if !nulls.Any(vec.Nsp) {
+						for k := 0; k < n; k++ {
+							ctr.keys[k] = append(ctr.keys[k], vs.Get(int64(i+k))...)
+						}
+					} else {
+						for k := 0; k < n; k++ {
+							if vec.Nsp.Np.Contains(uint64(i + k)) {
+								ctr.zValues[i] = 0
+							} else {
+								ctr.keys[k] = append(ctr.keys[k], vs.Get(int64(i+k))...)
+							}
+						}
+					}
+				}
+			}
+			for k := 0; k < n; k++ {
+				if l := len(ctr.keys[k]); l < 16 {
+					ctr.keys[k] = append(ctr.keys[k], hashtable.StrKeyPadding[l:]...)
+				}
+			}
+			ctr.strHashMap.InsertStringBatchWithRing(ctr.zValues, ctr.strHashStates, ctr.keys[:n], ctr.values)
+			cnt := 0
+			copy(ctr.inserted[:n], ctr.zInserted[:n])
+			for k, v := range ctr.values[:n] {
+				if ctr.zValues[k] == 0 {
+					continue
+				}
+				if v > ctr.rows {
+					cnt++
+					ctr.rows++
+					ctr.inserted[k] = 1
+					ctr.bat.Zs = append(ctr.bat.Zs, 0)
+				}
+				ai := int64(v) - 1
+				ctr.bat.Zs[ai] += bat.Zs[i+k]
+			}
+			if cnt > 0 {
+				for _, pos := range ctr.poses {
+					if err := vector.UnionBatch(ctr.bat.Vecs[pos], bat.Vecs[pos], int64(i), cnt, ctr.inserted[:n], proc.Mp); err != nil {
+						batch.Clean(bat, proc.Mp)
+						batch.Clean(ctr.bat, proc.Mp)
+						return err
+					}
+
+				}
+			}
+			for k := 0; k < n; k++ {
+				ctr.keys[k] = ctr.keys[k][:0]
+			}
+			batch.Clean(bat, proc.Mp)
+		}
+	}
+}
+
+func (ctr *Container) probe(bat *batch.Batch, ap *Argument, proc *process.Process) error {
+	defer batch.Clean(bat, proc.Mp)
+	rbat := batch.New(len(ap.Result))
+	for i, rp := range ap.Result {
+		if rp.Rel == 0 {
+			rbat.Vecs[i] = vector.New(bat.Vecs[rp.Pos].Typ)
+		} else {
+			rbat.Vecs[i] = vector.New(ctr.bat.Vecs[rp.Pos].Typ)
+		}
+	}
+	count := len(bat.Zs)
+	for i := 0; i < count; i += UnitLimit {
+		n := count - i
+		if n > UnitLimit {
+			n = UnitLimit
+		}
+		copy(ctr.zValues[:n], OneInt64s[:n])
+		for _, cond := range ap.Conditions[0] {
+			vec := bat.Vecs[cond.Pos]
+			switch typLen := vec.Typ.Oid.FixedLength(); typLen {
+			case 1:
+				fillGroupStr[uint8](ctr, vec, n, 1, i)
+			case 2:
+				fillGroupStr[uint16](ctr, vec, n, 2, i)
+			case 4:
+				fillGroupStr[uint32](ctr, vec, n, 4, i)
+			case 8:
+				fillGroupStr[uint64](ctr, vec, n, 8, i)
+			case -8:
+				if cond.Scale > 0 {
+					fillGroupStrWithDecimal64(ctr, vec, n, i, cond.Scale)
+				} else {
+					fillGroupStr[uint64](ctr, vec, n, 8, i)
+				}
+			case -16:
+				if cond.Scale > 0 {
+					fillGroupStrWithDecimal128(ctr, vec, n, i, cond.Scale)
+				} else {
+					fillGroupStr[types.Decimal128](ctr, vec, n, 16, i)
+				}
+			default:
+				vs := vec.Col.(*types.Bytes)
+				if !nulls.Any(vec.Nsp) {
+					for k := 0; k < n; k++ {
+						ctr.keys[k] = append(ctr.keys[k], vs.Get(int64(i+k))...)
+					}
+				} else {
+					for k := 0; k < n; k++ {
+						if vec.Nsp.Np.Contains(uint64(i + k)) {
+							ctr.zValues[i] = 0
+						} else {
+							ctr.keys[k] = append(ctr.keys[k], vs.Get(int64(i+k))...)
+						}
+					}
+				}
+			}
+		}
+		for k := 0; k < n; k++ {
+			if l := len(ctr.keys[k]); l < 16 {
+				ctr.keys[k] = append(ctr.keys[k], hashtable.StrKeyPadding[l:]...)
+			}
+		}
+		ctr.strHashMap.FindStringBatch(ctr.strHashStates, ctr.keys[:n], ctr.values)
+		for k := 0; k < n; k++ {
+			ctr.keys[k] = ctr.keys[k][:0]
+		}
+		for k := 0; k < n; k++ {
+			if ctr.zValues[k] == 0 {
+				continue
+			}
+			if ctr.values[k] == 0 {
+				continue
+			}
+			if ctr.flg {
+				sels := ctr.sels[ctr.values[k]-1]
+				for _, sel := range sels {
+					for j, rp := range ap.Result {
+						if rp.Rel == 0 {
+							if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.Mp); err != nil {
+								batch.Clean(rbat, proc.Mp)
+								return err
+							}
+						} else {
+							if err := vector.UnionOne(rbat.Vecs[j], ctr.bat.Vecs[rp.Pos], sel, proc.Mp); err != nil {
+								batch.Clean(rbat, proc.Mp)
+								return err
+							}
+						}
+					}
+					rbat.Zs = append(rbat.Zs, ctr.bat.Zs[sel])
+				}
+			} else {
+				sel := int64(ctr.values[k] - 1)
+				for j, rp := range ap.Result {
+					if rp.Rel == 0 {
+						if err := vector.UnionOne(rbat.Vecs[j], bat.Vecs[rp.Pos], int64(i+k), proc.Mp); err != nil {
+							batch.Clean(rbat, proc.Mp)
+							return err
+						}
+					} else {
+						if err := vector.UnionOne(rbat.Vecs[j], ctr.bat.Vecs[rp.Pos], sel, proc.Mp); err != nil {
+							batch.Clean(rbat, proc.Mp)
+							return err
+						}
+					}
+				}
+				rbat.Zs = append(rbat.Zs, ctr.bat.Zs[sel])
+			}
+		}
+	}
+	proc.Reg.InputBatch = rbat
+	return nil
+}
+
+func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz int, start int) {
+	vs := vector.DecodeFixedCol[T](vec, sz)
+	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*sz)[:len(vs)*sz]
+	if !nulls.Any(vec.Nsp) {
+		for i := 0; i < n; i++ {
+			ctr.keys[i] = append(ctr.keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
+		}
+	} else {
+		for i := 0; i < n; i++ {
+			if vec.Nsp.Np.Contains(uint64(i + start)) {
+				ctr.zValues[i] = 0
+			} else {
+				ctr.keys[i] = append(ctr.keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
+			}
+		}
+	}
+}
+
+func fillGroupStrWithDecimal64(ctr *Container, vec *vector.Vector, n int, start int, scale int32) {
+	src := vector.DecodeFixedCol[types.Decimal64](vec, 8)
+	vs := types.AlignDecimal64UsingScaleDiffBatch(src[start:start+n], ctr.decimal64Slice[:n], scale)
+	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*8)[:len(vs)*8]
+	if !nulls.Any(vec.Nsp) {
+		for i := 0; i < n; i++ {
+			ctr.keys[i] = append(ctr.keys[i], data[(i)*8:(i+1)*8]...)
+		}
+	} else {
+		for i := 0; i < n; i++ {
+			if vec.Nsp.Np.Contains(uint64(i + start)) {
+				ctr.zValues[i] = 0
+			} else {
+				ctr.keys[i] = append(ctr.keys[i], data[(i)*8:(i+1)*8]...)
+			}
+		}
+	}
+}
+
+func fillGroupStrWithDecimal128(ctr *Container, vec *vector.Vector, n int, start int, scale int32) {
+	src := vector.DecodeFixedCol[types.Decimal128](vec, 16)
+	vs := ctr.decimal128Slice[:n]
+	types.AlignDecimal128UsingScaleDiffBatch(src[start:start+n], vs, scale)
+	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*16)[:len(vs)*16]
+	if !nulls.Any(vec.Nsp) {
+		for i := 0; i < n; i++ {
+			ctr.keys[i] = append(ctr.keys[i], data[(i)*16:(i+1)*16]...)
+		}
+	} else {
+		for i := 0; i < n; i++ {
+			if vec.Nsp.Np.Contains(uint64(i + start)) {
+				ctr.zValues[i] = 0
+			} else {
+				ctr.keys[i] = append(ctr.keys[i], data[(i)*16:(i+1)*16]...)
+			}
+		}
+	}
+}
diff --git a/pkg/sql/colexec2/join/join_test.go b/pkg/sql/colexec2/join/join_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..014da864045e947d9a6477f10d35a6def002bf9d
--- /dev/null
+++ b/pkg/sql/colexec2/join/join_test.go
@@ -0,0 +1,382 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package join
+
+import (
+	"bytes"
+	"context"
+	"strconv"
+	"testing"
+
+	batch "github.com/matrixorigin/matrixone/pkg/container/batch2"
+	"github.com/matrixorigin/matrixone/pkg/container/nulls"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/container/vector"
+	"github.com/matrixorigin/matrixone/pkg/encoding"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	process "github.com/matrixorigin/matrixone/pkg/vm/process2"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows          = 10     // default rows
+	BenchmarkRows = 100000 // default rows for benchmark
+)
+
+// add unit tests for cases
+type joinTestCase struct {
+	arg    *Argument
+	flgs   []bool // flgs[i] == true: nullable
+	types  []types.Type
+	proc   *process.Process
+	cancel context.CancelFunc
+}
+
+var (
+	tcs []joinTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []joinTestCase{
+		newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{0, 0, types.Type{Oid: types.T_int8}},
+				},
+				{
+					{0, 0, types.Type{Oid: types.T_int8}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{0, 0, types.Type{Oid: types.T_int8}},
+				},
+				{
+					{0, 0, types.Type{Oid: types.T_int8}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_decimal64}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{0, 0, types.Type{Oid: types.T_decimal64}},
+				},
+				{
+					{0, 1, types.Type{Oid: types.T_decimal64}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_decimal64}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{0, 0, types.Type{Oid: types.T_decimal64}},
+				},
+				{
+					{0, 1, types.Type{Oid: types.T_decimal64}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_decimal128}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{0, 0, types.Type{Oid: types.T_decimal128}},
+				},
+				{
+					{0, 1, types.Type{Oid: types.T_decimal128}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_decimal128}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{0, 0, types.Type{Oid: types.T_decimal128}},
+				},
+				{
+					{0, 1, types.Type{Oid: types.T_decimal128}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{false, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{1, 0, types.Type{Oid: types.T_int64}},
+				},
+				{
+					{1, 0, types.Type{Oid: types.T_int64}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{true, true}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_int64}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{1, 0, types.Type{Oid: types.T_int64}},
+				},
+				{
+					{1, 0, types.Type{Oid: types.T_int64}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{false, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_decimal64}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{1, 0, types.Type{Oid: types.T_decimal64}},
+				},
+				{
+					{1, 1, types.Type{Oid: types.T_decimal64}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{true, true}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_decimal64}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{1, 0, types.Type{Oid: types.T_decimal64}},
+				},
+				{
+					{1, 1, types.Type{Oid: types.T_decimal64}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{false, false}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_decimal128}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{1, 0, types.Type{Oid: types.T_decimal128}},
+				},
+				{
+					{1, 1, types.Type{Oid: types.T_decimal128}},
+				},
+			}),
+		newTestCase(mheap.New(gm), []bool{true, true}, []types.Type{{Oid: types.T_int8}, {Oid: types.T_decimal128}}, []ResultPos{{0, 0}, {1, 0}},
+			[][]Condition{
+				{
+					{1, 0, types.Type{Oid: types.T_decimal128}},
+				},
+				{
+					{1, 1, types.Type{Oid: types.T_decimal128}},
+				},
+			}),
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		Prepare(tc.proc, tc.arg)
+	}
+}
+
+func TestJoin(t *testing.T) {
+	for _, tc := range tcs {
+		Prepare(tc.proc, tc.arg)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- nil
+		tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[1].Ch <- nil
+		for {
+			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				break
+			}
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
+	}
+}
+
+func BenchmarkJoin(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		hm := host.New(1 << 30)
+		gm := guest.New(1<<30, hm)
+		tcs = []joinTestCase{
+			newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
+				[][]Condition{
+					{
+						{0, 0, types.Type{Oid: types.T_int8}},
+					},
+					{
+						{0, 0, types.Type{Oid: types.T_int8}},
+					},
+				}),
+			newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}},
+				[][]Condition{
+					{
+						{0, 0, types.Type{Oid: types.T_int8}},
+					},
+					{
+						{0, 0, types.Type{Oid: types.T_int8}},
+					},
+				}),
+		}
+		t := new(testing.T)
+		for _, tc := range tcs {
+			Prepare(tc.proc, tc.arg)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- nil
+			tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[1].Ch <- nil
+			for {
+				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					break
+				}
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
+		}
+	}
+}
+
+func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, cs [][]Condition) joinTestCase {
+	proc := process.New(m)
+	proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
+	ctx, cancel := context.WithCancel(context.Background())
+	proc.Reg.MergeReceivers[0] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 10),
+	}
+	proc.Reg.MergeReceivers[1] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	return joinTestCase{
+		types:  ts,
+		flgs:   flgs,
+		proc:   proc,
+		cancel: cancel,
+		arg: &Argument{
+			Result:     rp,
+			Conditions: cs,
+		},
+	}
+}
+
+// create a new block based on the type information, flgs[i] == ture: has null
+func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	bat := batch.New(len(ts))
+	bat.InitZsOne(int(rows))
+	for i := range bat.Vecs {
+		vec := vector.New(ts[i])
+		switch vec.Typ.Oid {
+		case types.T_int8:
+			data, err := mheap.Alloc(proc.Mp, rows*1)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt8Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int8(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_int16:
+			data, err := mheap.Alloc(proc.Mp, rows*2)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt16Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int16(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_int32:
+			data, err := mheap.Alloc(proc.Mp, rows*4)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt32Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int32(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_int64:
+			data, err := mheap.Alloc(proc.Mp, rows*8)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt64Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int64(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_decimal64:
+			data, err := mheap.Alloc(proc.Mp, rows*8)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeDecimal64Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = types.Decimal64(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_decimal128:
+			data, err := mheap.Alloc(proc.Mp, rows*16)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeDecimal128Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i].Lo = int64(i)
+				vs[i].Hi = int64(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+
+		case types.T_char, types.T_varchar:
+			size := 0
+			vs := make([][]byte, rows)
+			for i := range vs {
+				vs[i] = []byte(strconv.Itoa(i))
+				size += len(vs[i])
+			}
+			data, err := mheap.Alloc(proc.Mp, int64(size))
+			require.NoError(t, err)
+			data = data[:0]
+			col := new(types.Bytes)
+			o := uint32(0)
+			for _, v := range vs {
+				data = append(data, v...)
+				col.Offsets = append(col.Offsets, o)
+				o += uint32(len(v))
+				col.Lengths = append(col.Lengths, uint32(len(v)))
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			col.Data = data
+			vec.Col = col
+			vec.Data = data
+		}
+		bat.Vecs[i] = vec
+	}
+	return bat
+}
diff --git a/pkg/sql/colexec2/join/types.go b/pkg/sql/colexec2/join/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..732f9651a0deb4e8c3823cf670b43db6b9764d55
--- /dev/null
+++ b/pkg/sql/colexec2/join/types.go
@@ -0,0 +1,74 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package join
+
+import (
+	batch "github.com/matrixorigin/matrixone/pkg/container/batch2"
+	"github.com/matrixorigin/matrixone/pkg/container/hashtable"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+)
+
+const (
+	Build = iota
+	Probe
+	End
+)
+
+const (
+	UnitLimit = 256
+)
+
+var OneInt64s []int64
+
+type Container struct {
+	flg           bool // incicates if addition columns need to be copied
+	state         int
+	rows          uint64
+	keys          [][]byte
+	values        []uint64
+	zValues       []int64
+	hashes        []uint64
+	inserted      []uint8
+	zInserted     []uint8
+	strHashStates [][3]uint64
+	strHashMap    *hashtable.StringHashMap
+
+	poses []int32 // pos of vectors need to be copied
+
+	sels [][]int64
+
+	bat *batch.Batch
+
+	decimal64Slice  []types.Decimal64
+	decimal128Slice []types.Decimal128
+}
+
+type ResultPos struct {
+	Rel int32
+	Pos int32
+}
+
+type Condition struct {
+	Pos   int32
+	Scale int32
+	Typ   types.Type
+}
+
+type Argument struct {
+	ctr        *Container
+	IsPreBuild bool // hashtable is pre-build
+	Result     []ResultPos
+	Conditions [][]Condition
+}
diff --git a/pkg/sql/colexec2/limit/limit_test.go b/pkg/sql/colexec2/limit/limit_test.go
index 79979cfd3a6fd4c4ce37895d9c172eca0bcbef1f..80c35ade029799f4242d108641a589f5c6e928f4 100644
--- a/pkg/sql/colexec2/limit/limit_test.go
+++ b/pkg/sql/colexec2/limit/limit_test.go
@@ -101,12 +101,19 @@ func TestLimit(t *testing.T) {
 		Prepare(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -132,6 +139,9 @@ func BenchmarkLimit(b *testing.B) {
 			Prepare(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
 			Call(tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
 			tc.proc.Reg.InputBatch = &batch.Batch{}
 			Call(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
diff --git a/pkg/sql/colexec2/mergegroup/group.go b/pkg/sql/colexec2/mergegroup/group.go
index d038a6eb9e7b2031db005eb51124be9a4c389b7c..223f46559da7ba47f24724d4e4e679d02ab734c7 100644
--- a/pkg/sql/colexec2/mergegroup/group.go
+++ b/pkg/sql/colexec2/mergegroup/group.go
@@ -428,6 +428,9 @@ func (ctr *Container) processHStr(bat *batch.Batch, proc *process.Process) error
 			}
 		}
 		ctr.strHashMap.InsertStringBatch(ctr.strHashStates, ctr.hstr.keys[:n], ctr.values)
+		for k := 0; k < n; k++ {
+			ctr.hstr.keys[k] = ctr.hstr.keys[k][:0]
+		}
 		if !flg {
 			if err := ctr.batchFill(i, n, bat, proc); err != nil {
 				return err
@@ -519,13 +522,13 @@ func fillStringGroup[T any](ctr *Container, vec *vector.Vector, keys []T, n int,
 	}
 }
 
-func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz uint32, start int) {
-	vs := vector.DecodeFixedCol[T](vec, int(sz))
-	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*1)[:len(vs)*1]
+func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz int, start int) {
+	vs := vector.DecodeFixedCol[T](vec, sz)
+	data := unsafe.Slice((*byte)(unsafe.Pointer(&vs[0])), cap(vs)*sz)[:len(vs)*sz]
 	if !nulls.Any(vec.Nsp) {
 		for i := 0; i < n; i++ {
 			ctr.hstr.keys[i] = append(ctr.hstr.keys[i], byte(0))
-			ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*1:(i+start+1)*1]...)
+			ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
 		}
 	} else {
 		for i := 0; i < n; i++ {
@@ -533,9 +536,8 @@ func fillGroupStr[T any](ctr *Container, vec *vector.Vector, n int, sz uint32, s
 				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], byte(1))
 			} else {
 				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], byte(0))
-				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*1:(i+start+1)*1]...)
+				ctr.hstr.keys[i] = append(ctr.hstr.keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
 			}
 		}
 	}
-
 }
diff --git a/pkg/sql/colexec2/mergegroup/group_test.go b/pkg/sql/colexec2/mergegroup/group_test.go
index efc20a5d3ef2b82a4d645d04d465797f24e86be6..a485fcd6a9600f68b7e7a65b619601dc9cce6c24 100644
--- a/pkg/sql/colexec2/mergegroup/group_test.go
+++ b/pkg/sql/colexec2/mergegroup/group_test.go
@@ -121,9 +121,21 @@ func TestGroup(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
 			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
 				break
 			}
 		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					batch.Clean(bat, tc.proc.Mp)
+				}
+			}
+		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -146,9 +158,20 @@ func BenchmarkGroup(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
 				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+					}
 					break
 				}
 			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						batch.Clean(bat, tc.proc.Mp)
+					}
+				}
+			}
 		}
 	}
 }
diff --git a/pkg/sql/colexec2/mergelimit/limit_test.go b/pkg/sql/colexec2/mergelimit/limit_test.go
index a1ef603eaafd1f0665a1681cb4e5d499bbf8c233..bec719772967124db89e63f571e3aaeaf191561c 100644
--- a/pkg/sql/colexec2/mergelimit/limit_test.go
+++ b/pkg/sql/colexec2/mergelimit/limit_test.go
@@ -82,9 +82,24 @@ func TestLimit(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
 			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
 				break
 			}
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
+		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					batch.Clean(bat, tc.proc.Mp)
+				}
+			}
 		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -109,8 +124,22 @@ func BenchmarkLimit(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
 				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+					}
 					break
 				}
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
+			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						batch.Clean(bat, tc.proc.Mp)
+					}
+				}
 			}
 		}
 	}
diff --git a/pkg/sql/colexec2/mergeoffset/offset.go b/pkg/sql/colexec2/mergeoffset/offset.go
index 9947ce08f03359d9f4fdd55a235d42b35e9fb7be..837a6dea3a6df448037463cd225cd9c844bba141 100644
--- a/pkg/sql/colexec2/mergeoffset/offset.go
+++ b/pkg/sql/colexec2/mergeoffset/offset.go
@@ -36,8 +36,8 @@ func Prepare(_ *process.Process, arg interface{}) error {
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
 	for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
-		rec := proc.Reg.MergeReceivers[i]
-		bat := <-rec.Ch
+		reg := proc.Reg.MergeReceivers[i]
+		bat := <-reg.Ch
 		// deal special case for bat
 		{
 			// 1. the last batch at this receiver
@@ -54,6 +54,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
 		}
 
 		if n.ctr.seen > n.Offset {
+			batch.Clean(bat, proc.Mp)
 			return false, nil
 		}
 		length := len(bat.Zs)
diff --git a/pkg/sql/colexec2/mergeoffset/offset_test.go b/pkg/sql/colexec2/mergeoffset/offset_test.go
index 80b2f7b3021853d1dc032b6210b2c96e5aead3f3..a5330338e384ba7a846652dd12ff90e56f8e0556 100644
--- a/pkg/sql/colexec2/mergeoffset/offset_test.go
+++ b/pkg/sql/colexec2/mergeoffset/offset_test.go
@@ -84,9 +84,24 @@ func TestOffset(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
 			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
 				break
 			}
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
+		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					batch.Clean(bat, tc.proc.Mp)
+				}
+			}
 		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -112,9 +127,24 @@ func BenchmarkOffset(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
 				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+					}
 					break
 				}
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
+			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						batch.Clean(bat, tc.proc.Mp)
+					}
+				}
 			}
+
 		}
 	}
 }
diff --git a/pkg/sql/colexec2/mergeorder/order_test.go b/pkg/sql/colexec2/mergeorder/order_test.go
index d15b8731bad9ea032cd942efc3f43b83115b5d29..14223e20fade6c93de1da5c5a9539aa6f0690480 100644
--- a/pkg/sql/colexec2/mergeorder/order_test.go
+++ b/pkg/sql/colexec2/mergeorder/order_test.go
@@ -85,9 +85,21 @@ func TestOrder(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
 			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
 				break
 			}
 		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					batch.Clean(bat, tc.proc.Mp)
+				}
+			}
+		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -110,9 +122,20 @@ func BenchmarkOrder(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
 				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+					}
 					break
 				}
 			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						batch.Clean(bat, tc.proc.Mp)
+					}
+				}
+			}
 		}
 	}
 }
diff --git a/pkg/sql/colexec2/mergetop/top_test.go b/pkg/sql/colexec2/mergetop/top_test.go
index 8f3f6bcde2fe94ebc7563090ed16a58ff829e611..e043d1a01c9e034336fec55a3d4712df25a595f4 100644
--- a/pkg/sql/colexec2/mergetop/top_test.go
+++ b/pkg/sql/colexec2/mergetop/top_test.go
@@ -86,9 +86,21 @@ func TestTop(t *testing.T) {
 		tc.proc.Reg.MergeReceivers[1].Ch <- nil
 		for {
 			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+				}
 				break
 			}
 		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					batch.Clean(bat, tc.proc.Mp)
+				}
+			}
+		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -111,9 +123,20 @@ func BenchmarkTop(b *testing.B) {
 			tc.proc.Reg.MergeReceivers[1].Ch <- nil
 			for {
 				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+					}
 					break
 				}
 			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						batch.Clean(bat, tc.proc.Mp)
+					}
+				}
+			}
 		}
 	}
 }
diff --git a/pkg/sql/colexec2/offset/offset_test.go b/pkg/sql/colexec2/offset/offset_test.go
index 88a0469071d402bae45bff46122103854e8b51af..5e4dba5e8d08750c58736c2f067b0735d703e757 100644
--- a/pkg/sql/colexec2/offset/offset_test.go
+++ b/pkg/sql/colexec2/offset/offset_test.go
@@ -101,16 +101,23 @@ func TestOffset(t *testing.T) {
 		Prepare(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
-func BenchmarkLimit(b *testing.B) {
+func BenchmarkOffset(b *testing.B) {
 	for i := 0; i < b.N; i++ {
 		hm := host.New(1 << 30)
 		gm := guest.New(1<<30, hm)
@@ -132,6 +139,9 @@ func BenchmarkLimit(b *testing.B) {
 			Prepare(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
 			Call(tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
 			tc.proc.Reg.InputBatch = &batch.Batch{}
 			Call(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
diff --git a/pkg/sql/colexec2/order/order_test.go b/pkg/sql/colexec2/order/order_test.go
index b3442f125f5f3d9c83aa7991e4499a9765c48df1..30beefe1378b4d0d44e595914e398603965546fc 100644
--- a/pkg/sql/colexec2/order/order_test.go
+++ b/pkg/sql/colexec2/order/order_test.go
@@ -75,12 +75,19 @@ func TestOrder(t *testing.T) {
 		Prepare(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = &batch.Batch{}
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -99,8 +106,14 @@ func BenchmarkOrder(b *testing.B) {
 			Prepare(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
 			Call(tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
 			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
 			Call(tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
 			tc.proc.Reg.InputBatch = &batch.Batch{}
 			Call(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
diff --git a/pkg/sql/colexec2/output/output_test.go b/pkg/sql/colexec2/output/output_test.go
index 31639feb829f8f37b4ab30f93b421a9c10be9127..3b5b6a8d430730d79eda8969743de336a7ef2a79 100644
--- a/pkg/sql/colexec2/output/output_test.go
+++ b/pkg/sql/colexec2/output/output_test.go
@@ -79,7 +79,7 @@ func TestPrepare(t *testing.T) {
 	}
 }
 
-func TestLimit(t *testing.T) {
+func TestOutput(t *testing.T) {
 	for _, tc := range tcs {
 		Prepare(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
@@ -90,6 +90,7 @@ func TestLimit(t *testing.T) {
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
diff --git a/pkg/sql/colexec2/product/product.go b/pkg/sql/colexec2/product/product.go
new file mode 100644
index 0000000000000000000000000000000000000000..28215614906f31a6a799e14cd638ca624e44d1fb
--- /dev/null
+++ b/pkg/sql/colexec2/product/product.go
@@ -0,0 +1,127 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package product
+
+import (
+	"bytes"
+
+	batch "github.com/matrixorigin/matrixone/pkg/container/batch2"
+	"github.com/matrixorigin/matrixone/pkg/container/vector"
+	process "github.com/matrixorigin/matrixone/pkg/vm/process2"
+)
+
+func String(_ interface{}, buf *bytes.Buffer) {
+	buf.WriteString(" ⨯ ")
+}
+
+func Prepare(proc *process.Process, arg interface{}) error {
+	ap := arg.(*Argument)
+	ap.ctr = new(Container)
+	return nil
+}
+
+func Call(proc *process.Process, arg interface{}) (bool, error) {
+	ap := arg.(*Argument)
+	ctr := ap.ctr
+	for {
+		switch ctr.state {
+		case Build:
+			if err := ctr.build(ap, proc); err != nil {
+				ctr.state = End
+				return true, err
+			}
+			ctr.state = Probe
+		case Probe:
+			bat := <-proc.Reg.MergeReceivers[0].Ch
+			if bat == nil {
+				ctr.state = End
+				batch.Clean(ctr.bat, proc.Mp)
+				continue
+			}
+			if len(bat.Zs) == 0 {
+				continue
+			}
+			if err := ctr.probe(bat, ap, proc); err != nil {
+				ctr.state = End
+				proc.Reg.InputBatch = nil
+				return true, err
+			}
+			return false, nil
+		default:
+			proc.Reg.InputBatch = nil
+			return true, nil
+		}
+	}
+}
+
+func (ctr *Container) build(ap *Argument, proc *process.Process) error {
+	var err error
+
+	for {
+		bat := <-proc.Reg.MergeReceivers[1].Ch
+		if bat == nil {
+			break
+		}
+		if len(bat.Zs) == 0 {
+			continue
+		}
+		if ctr.bat == nil {
+			ctr.bat = batch.New(len(bat.Vecs))
+			for i, vec := range bat.Vecs {
+				ctr.bat.Vecs[i] = vector.New(vec.Typ)
+			}
+		}
+		if ctr.bat, err = ctr.bat.Append(proc.Mp, bat); err != nil {
+			batch.Clean(bat, proc.Mp)
+			batch.Clean(ctr.bat, proc.Mp)
+			return err
+		}
+		batch.Clean(bat, proc.Mp)
+	}
+	return nil
+}
+
+func (ctr *Container) probe(bat *batch.Batch, ap *Argument, proc *process.Process) error {
+	defer batch.Clean(bat, proc.Mp)
+	rbat := batch.New(len(ap.Result))
+	for i, rp := range ap.Result {
+		if rp.Rel == 0 {
+			rbat.Vecs[i] = vector.New(bat.Vecs[rp.Pos].Typ)
+		} else {
+			rbat.Vecs[i] = vector.New(ctr.bat.Vecs[rp.Pos].Typ)
+		}
+	}
+	count := len(bat.Zs)
+	for i := 0; i < count; i++ {
+		for j := 0; j < len(ctr.bat.Zs); j++ {
+			for k, rp := range ap.Result {
+				if rp.Rel == 0 {
+					if err := vector.UnionOne(rbat.Vecs[k], bat.Vecs[rp.Pos], int64(i), proc.Mp); err != nil {
+						batch.Clean(rbat, proc.Mp)
+						return err
+					}
+				} else {
+					if err := vector.UnionOne(rbat.Vecs[k], ctr.bat.Vecs[rp.Pos], int64(j), proc.Mp); err != nil {
+						batch.Clean(rbat, proc.Mp)
+						return err
+					}
+				}
+			}
+			rbat.Zs = append(rbat.Zs, ctr.bat.Zs[j])
+		}
+	}
+	proc.Reg.InputBatch = rbat
+	return nil
+}
diff --git a/pkg/sql/colexec2/product/product_test.go b/pkg/sql/colexec2/product/product_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..3a72053e9afff6c39bc9b8d70876e5349aaf0146
--- /dev/null
+++ b/pkg/sql/colexec2/product/product_test.go
@@ -0,0 +1,260 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package product
+
+import (
+	"bytes"
+	"context"
+	"strconv"
+	"testing"
+
+	batch "github.com/matrixorigin/matrixone/pkg/container/batch2"
+	"github.com/matrixorigin/matrixone/pkg/container/nulls"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/container/vector"
+	"github.com/matrixorigin/matrixone/pkg/encoding"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	process "github.com/matrixorigin/matrixone/pkg/vm/process2"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows          = 10     // default rows
+	BenchmarkRows = 100000 // default rows for benchmark
+)
+
+// add unit tests for cases
+type productTestCase struct {
+	arg    *Argument
+	flgs   []bool // flgs[i] == true: nullable
+	types  []types.Type
+	proc   *process.Process
+	cancel context.CancelFunc
+}
+
+var (
+	tcs []productTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []productTestCase{
+		newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
+		newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		Prepare(tc.proc, tc.arg)
+	}
+}
+
+func TestProduct(t *testing.T) {
+	for _, tc := range tcs {
+		Prepare(tc.proc, tc.arg)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- nil
+		tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[1].Ch <- nil
+		for {
+			if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+				break
+			}
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
+	}
+}
+
+func BenchmarkProduct(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		hm := host.New(1 << 30)
+		gm := guest.New(1<<30, hm)
+		tcs = []productTestCase{
+			newTestCase(mheap.New(gm), []bool{false}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
+			newTestCase(mheap.New(gm), []bool{true}, []types.Type{{Oid: types.T_int8}}, []ResultPos{{0, 0}, {1, 0}}),
+		}
+		t := new(testing.T)
+		for _, tc := range tcs {
+			Prepare(tc.proc, tc.arg)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- nil
+			tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.flgs, tc.types, tc.proc, Rows)
+			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[1].Ch <- nil
+			for {
+				if ok, err := Call(tc.proc, tc.arg); ok || err != nil {
+					break
+				}
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
+		}
+	}
+}
+
+func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos) productTestCase {
+	proc := process.New(m)
+	proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
+	ctx, cancel := context.WithCancel(context.Background())
+	proc.Reg.MergeReceivers[0] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 10),
+	}
+	proc.Reg.MergeReceivers[1] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 4),
+	}
+	return productTestCase{
+		types:  ts,
+		flgs:   flgs,
+		proc:   proc,
+		cancel: cancel,
+		arg: &Argument{
+			Result: rp,
+		},
+	}
+}
+
+// create a new block based on the type information, flgs[i] == ture: has null
+func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	bat := batch.New(len(ts))
+	bat.InitZsOne(int(rows))
+	for i := range bat.Vecs {
+		vec := vector.New(ts[i])
+		switch vec.Typ.Oid {
+		case types.T_int8:
+			data, err := mheap.Alloc(proc.Mp, rows*1)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt8Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int8(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_int16:
+			data, err := mheap.Alloc(proc.Mp, rows*2)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt16Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int16(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_int32:
+			data, err := mheap.Alloc(proc.Mp, rows*4)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt32Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int32(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_int64:
+			data, err := mheap.Alloc(proc.Mp, rows*8)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeInt64Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = int64(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_decimal64:
+			data, err := mheap.Alloc(proc.Mp, rows*8)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeDecimal64Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i] = types.Decimal64(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+		case types.T_decimal128:
+			data, err := mheap.Alloc(proc.Mp, rows*16)
+			require.NoError(t, err)
+			vec.Data = data
+			vs := encoding.DecodeDecimal128Slice(vec.Data)[:rows]
+			for i := range vs {
+				vs[i].Lo = int64(i)
+				vs[i].Hi = int64(i)
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			vec.Col = vs
+
+		case types.T_char, types.T_varchar:
+			size := 0
+			vs := make([][]byte, rows)
+			for i := range vs {
+				vs[i] = []byte(strconv.Itoa(i))
+				size += len(vs[i])
+			}
+			data, err := mheap.Alloc(proc.Mp, int64(size))
+			require.NoError(t, err)
+			data = data[:0]
+			col := new(types.Bytes)
+			o := uint32(0)
+			for _, v := range vs {
+				data = append(data, v...)
+				col.Offsets = append(col.Offsets, o)
+				o += uint32(len(v))
+				col.Lengths = append(col.Lengths, uint32(len(v)))
+			}
+			if flgs[i] {
+				nulls.Add(vec.Nsp, uint64(0))
+			}
+			col.Data = data
+			vec.Col = col
+			vec.Data = data
+		}
+		bat.Vecs[i] = vec
+	}
+	return bat
+}
diff --git a/pkg/sql/colexec2/product/types.go b/pkg/sql/colexec2/product/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..5d449b0c76987c3d830afdbc650ab94d35b0fe6e
--- /dev/null
+++ b/pkg/sql/colexec2/product/types.go
@@ -0,0 +1,40 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package product
+
+import (
+	batch "github.com/matrixorigin/matrixone/pkg/container/batch2"
+)
+
+const (
+	Build = iota
+	Probe
+	End
+)
+
+type Container struct {
+	state int
+	bat   *batch.Batch
+}
+
+type ResultPos struct {
+	Rel int32
+	Pos int32
+}
+
+type Argument struct {
+	ctr    *Container
+	Result []ResultPos
+}
diff --git a/pkg/sql/colexec2/top/top_test.go b/pkg/sql/colexec2/top/top_test.go
index 8d91a45e4506feafcf9eae333815ce9ae64c7c99..12a134c33a60478d38e4d309287e1f9c17e724ea 100644
--- a/pkg/sql/colexec2/top/top_test.go
+++ b/pkg/sql/colexec2/top/top_test.go
@@ -80,8 +80,12 @@ func TestTop(t *testing.T) {
 		Call(tc.proc, tc.arg)
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+		}
 		tc.proc.Reg.InputBatch = nil
 		Call(tc.proc, tc.arg)
+		require.Equal(t, mheap.Size(tc.proc.Mp), int64(0))
 	}
 }
 
@@ -104,6 +108,9 @@ func BenchmarkTop(b *testing.B) {
 			Call(tc.proc, tc.arg)
 			tc.proc.Reg.InputBatch = nil
 			Call(tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				batch.Clean(tc.proc.Reg.InputBatch, tc.proc.Mp)
+			}
 		}
 	}
 }
diff --git a/pkg/vm/mmu/guest/mmu.go b/pkg/vm/mmu/guest/mmu.go
index 7a84d5fe15bcd8436c77a1a071350902c85bcb57..20fa2fefcb920b44f0c19882e41d6eba7215ccf2 100644
--- a/pkg/vm/mmu/guest/mmu.go
+++ b/pkg/vm/mmu/guest/mmu.go
@@ -45,9 +45,6 @@ func (m *Mmu) Free(size int64) {
 }
 
 func (m *Mmu) Alloc(size int64) error {
-	if m.Size() == 0 {
-		return nil
-	}
 	if atomic.LoadInt64(&m.size)+size > m.Limit {
 		return mmu.OutOfMemory
 	}