diff --git a/pkg/container/types/types.go b/pkg/container/types/types.go
index f671816e2ec19280a47dd9d9826810d5387c4f2a..e32db54f12f660f1641a22f6d23f3f1ace40f418 100644
--- a/pkg/container/types/types.go
+++ b/pkg/container/types/types.go
@@ -175,6 +175,10 @@ func (t Type) TypeSize() int {
 	return t.Oid.TypeLen()
 }
 
+func (t Type) IsBoolean() bool {
+	return t.Oid == T_bool
+}
+
 func (t Type) IsString() bool {
 	return t.Oid == T_char || t.Oid == T_varchar
 }
diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go
index e169ac14f1a59952a3c6e304d901e10314320a7b..513723f9630be1d77c854a81060faa9a613c430e 100644
--- a/pkg/container/vector/vector.go
+++ b/pkg/container/vector/vector.go
@@ -79,6 +79,10 @@ func (v *Vector) Count() int {
 	return Length(v)
 }
 
+func (v *Vector) Size() int {
+	return len(v.Data)
+}
+
 func (v *Vector) GetType() types.Type {
 	return v.Typ
 }
@@ -545,6 +549,7 @@ func (v *Vector) ConstVectorIsNull() bool {
 func (v *Vector) Free(m *mheap.Mheap) {
 	if v.Data != nil {
 		mheap.Free(m, v.Data)
+		v.Data = nil
 	}
 }
 
diff --git a/pkg/sql/colexec2/limit/limit.go b/pkg/sql/colexec2/limit/limit.go
new file mode 100644
index 0000000000000000000000000000000000000000..91fce72162895157cbd3b982bce86448eac9f353
--- /dev/null
+++ b/pkg/sql/colexec2/limit/limit.go
@@ -0,0 +1,65 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package limit
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+)
+
+func String(arg interface{}, buf *bytes.Buffer) {
+	n := arg.(*Argument)
+	buf.WriteString(fmt.Sprintf("limit(%v)", n.Limit))
+}
+
+func Prepare(_ *process.Process, _ interface{}) error {
+	return nil
+}
+
+// returning only the first n tuples from its input
+func Call(idx int, proc *process.Process, arg interface{}) (bool, error) {
+	bat := proc.InputBatch()
+	if bat == nil {
+		return true, nil
+	}
+	if bat.Length() == 0 {
+		return false, nil
+	}
+	ap := arg.(*Argument)
+	anal := proc.GetAnalyze(idx)
+	anal.Start()
+	defer anal.Stop()
+	anal.Input(bat)
+	if ap.Seen >= ap.Limit {
+		proc.Reg.InputBatch = nil
+		bat.Clean(proc.Mp)
+		return true, nil
+	}
+	length := bat.Length()
+	newSeen := ap.Seen + uint64(length)
+	if newSeen >= ap.Limit { // limit - seen
+		batch.SetLength(bat, int(ap.Limit-ap.Seen))
+		ap.Seen = newSeen
+		anal.Output(bat)
+		proc.SetInputBatch(bat)
+		return true, nil
+	}
+	anal.Output(bat)
+	ap.Seen = newSeen
+	return false, nil
+}
diff --git a/pkg/sql/colexec2/limit/limit_test.go b/pkg/sql/colexec2/limit/limit_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..7711c1c1be6b002159f56be349bc4cfc74997a49
--- /dev/null
+++ b/pkg/sql/colexec2/limit/limit_test.go
@@ -0,0 +1,157 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package limit
+
+import (
+	"bytes"
+	"testing"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/testutil"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows          = 10      // default rows
+	BenchmarkRows = 1000000 // default rows for benchmark
+)
+
+// add unit tests for cases
+type limitTestCase struct {
+	arg   *Argument
+	types []types.Type
+	proc  *process.Process
+}
+
+var (
+	tcs []limitTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []limitTestCase{
+		{
+			proc: process.New(mheap.New(gm)),
+			types: []types.Type{
+				{Oid: types.T_int8},
+			},
+			arg: &Argument{
+				Seen:  0,
+				Limit: 8,
+			},
+		},
+		{
+			proc: process.New(mheap.New(gm)),
+			types: []types.Type{
+				{Oid: types.T_int8},
+			},
+			arg: &Argument{
+				Seen:  0,
+				Limit: 10,
+			},
+		},
+		{
+			proc: process.New(mheap.New(gm)),
+			types: []types.Type{
+				{Oid: types.T_int8},
+			},
+			arg: &Argument{
+				Seen:  0,
+				Limit: 12,
+			},
+		},
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+	}
+}
+
+func TestLimit(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
+		_, _ = Call(0, tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+		}
+		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
+		_, _ = Call(0, tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+		}
+		tc.proc.Reg.InputBatch = &batch.Batch{}
+		_, _ = Call(0, tc.proc, tc.arg)
+		tc.proc.Reg.InputBatch = nil
+		_, _ = Call(0, tc.proc, tc.arg)
+		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
+	}
+}
+
+func BenchmarkLimit(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		hm := host.New(1 << 30)
+		gm := guest.New(1<<30, hm)
+		tcs = []limitTestCase{
+			{
+				proc: process.New(mheap.New(gm)),
+				types: []types.Type{
+					{Oid: types.T_int8},
+				},
+				arg: &Argument{
+					Seen:  0,
+					Limit: 8,
+				},
+			},
+		}
+
+		t := new(testing.T)
+		for _, tc := range tcs {
+			err := Prepare(tc.proc, tc.arg)
+			require.NoError(t, err)
+			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
+			_, _ = Call(0, tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+			}
+			tc.proc.Reg.InputBatch = &batch.Batch{}
+			_, _ = Call(0, tc.proc, tc.arg)
+			tc.proc.Reg.InputBatch = nil
+			_, _ = Call(0, tc.proc, tc.arg)
+		}
+	}
+}
+
+// create a new block based on the type information
+func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	return testutil.NewBatch(ts, false, int(rows), proc.Mp)
+}
diff --git a/pkg/sql/colexec2/limit/types.go b/pkg/sql/colexec2/limit/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..ef71c274ad722fd067c3371293cef3cc0bb2156f
--- /dev/null
+++ b/pkg/sql/colexec2/limit/types.go
@@ -0,0 +1,20 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package limit
+
+type Argument struct {
+	Seen  uint64 // seen is the number of tuples seen so far
+	Limit uint64
+}
diff --git a/pkg/sql/colexec2/merge/merge.go b/pkg/sql/colexec2/merge/merge.go
new file mode 100644
index 0000000000000000000000000000000000000000..5fbaca60a6abeb67525132c66e73f97632732774
--- /dev/null
+++ b/pkg/sql/colexec2/merge/merge.go
@@ -0,0 +1,57 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package merge
+
+import (
+	"bytes"
+
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+)
+
+func String(_ interface{}, buf *bytes.Buffer) {
+	buf.WriteString(" merge ")
+}
+
+func Prepare(_ *process.Process, arg interface{}) error {
+	ap := arg.(*Argument)
+	ap.ctr = new(Container)
+	return nil
+}
+
+func Call(_ int, proc *process.Process, arg interface{}) (bool, error) {
+	ap := arg.(*Argument)
+	for {
+		if len(proc.Reg.MergeReceivers) == 0 {
+			return true, nil
+		}
+		reg := proc.Reg.MergeReceivers[ap.ctr.i]
+		bat := <-reg.Ch
+		if bat == nil {
+			proc.Reg.MergeReceivers = append(proc.Reg.MergeReceivers[:ap.ctr.i], proc.Reg.MergeReceivers[ap.ctr.i+1:]...)
+			if ap.ctr.i >= len(proc.Reg.MergeReceivers) {
+				ap.ctr.i = 0
+			}
+			continue
+		}
+		if bat.Length() == 0 {
+			continue
+		}
+		proc.SetInputBatch(bat)
+		if ap.ctr.i = ap.ctr.i + 1; ap.ctr.i >= len(proc.Reg.MergeReceivers) {
+			ap.ctr.i = 0
+		}
+		return false, nil
+	}
+}
diff --git a/pkg/sql/colexec2/merge/merge_test.go b/pkg/sql/colexec2/merge/merge_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..cd44d35ffdef470512fccbc3a0551268cd94d18a
--- /dev/null
+++ b/pkg/sql/colexec2/merge/merge_test.go
@@ -0,0 +1,128 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package merge
+
+import (
+	"bytes"
+	"context"
+	"testing"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/testutil"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows = 10 // default rows
+)
+
+// add unit tests for cases
+type mergeTestCase struct {
+	arg    *Argument
+	types  []types.Type
+	proc   *process.Process
+	cancel context.CancelFunc
+}
+
+var (
+	tcs []mergeTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []mergeTestCase{
+		newTestCase(mheap.New(gm)),
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+	}
+}
+
+func TestMerge(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[0].Ch <- nil
+		tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[1].Ch <- nil
+		for {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+				}
+				break
+			}
+			if tc.proc.Reg.InputBatch != nil {
+				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+			}
+		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					bat.Clean(tc.proc.Mp)
+				}
+			}
+		}
+		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
+	}
+}
+
+func newTestCase(m *mheap.Mheap) mergeTestCase {
+	proc := process.New(m)
+	proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
+	ctx, cancel := context.WithCancel(context.Background())
+	proc.Reg.MergeReceivers[0] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	proc.Reg.MergeReceivers[1] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	return mergeTestCase{
+		proc: proc,
+		types: []types.Type{
+			{Oid: types.T_int8},
+		},
+		arg:    new(Argument),
+		cancel: cancel,
+	}
+}
+
+// create a new block based on the type information
+func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	return testutil.NewBatch(ts, false, int(rows), proc.Mp)
+}
diff --git a/pkg/sql/colexec2/merge/types.go b/pkg/sql/colexec2/merge/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..2f04050cb2ea991ec94cdfdba2210667aba0f45b
--- /dev/null
+++ b/pkg/sql/colexec2/merge/types.go
@@ -0,0 +1,23 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package merge
+
+type Container struct {
+	i int
+}
+
+type Argument struct {
+	ctr *Container
+}
diff --git a/pkg/sql/colexec2/mergelimit/limit.go b/pkg/sql/colexec2/mergelimit/limit.go
new file mode 100644
index 0000000000000000000000000000000000000000..859a59a7dc21dd31b28beb5dd8424722ae265cf2
--- /dev/null
+++ b/pkg/sql/colexec2/mergelimit/limit.go
@@ -0,0 +1,97 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mergelimit
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+)
+
+func String(arg interface{}, buf *bytes.Buffer) {
+	ap := arg.(*Argument)
+	buf.WriteString(fmt.Sprintf("mergeLimit(%d)", ap.Limit))
+}
+
+func Prepare(_ *process.Process, arg interface{}) error {
+	ap := arg.(*Argument)
+	ap.ctr = new(container)
+	ap.ctr.seen = 0
+	return nil
+}
+
+func Call(idx int, proc *process.Process, arg interface{}) (bool, error) {
+	ap := arg.(*Argument)
+	for {
+		switch ap.ctr.state {
+		case Eval:
+			anal := proc.GetAnalyze(idx)
+			defer anal.Stop()
+			ok, err := ap.ctr.eval(ap, proc, anal)
+			if err != nil {
+				return ok, err
+			}
+			if ok {
+				ap.ctr.state = End
+			}
+			return ok, err
+		default:
+			proc.SetInputBatch(nil)
+			return true, nil
+		}
+	}
+}
+
+func (ctr *container) eval(ap *Argument, proc *process.Process, anal process.Analyze) (bool, error) {
+	for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
+		reg := proc.Reg.MergeReceivers[i]
+		bat := <-reg.Ch
+
+		// 1. the last batch at this receiver
+		if bat == nil {
+			proc.Reg.MergeReceivers = append(proc.Reg.MergeReceivers[:i], proc.Reg.MergeReceivers[i+1:]...)
+			i--
+			continue
+		}
+		// 2. an empty batch
+		if bat.Length() == 0 {
+			i--
+			continue
+		}
+		anal.Input(bat)
+		if ap.ctr.seen >= ap.Limit {
+			proc.SetInputBatch(nil)
+			bat.Clean(proc.Mp)
+			return true, nil
+		}
+		newSeen := ap.ctr.seen + uint64(bat.Length())
+		if newSeen < ap.Limit {
+			ap.ctr.seen = newSeen
+			anal.Output(bat)
+			proc.SetInputBatch(bat)
+			return false, nil
+		} else {
+			num := int(newSeen - ap.Limit)
+			batch.SetLength(bat, bat.Length()-num)
+			ap.ctr.seen = newSeen
+			proc.Reg.InputBatch = bat
+			return false, nil
+		}
+	}
+	return true, nil
+
+}
diff --git a/pkg/sql/colexec2/mergelimit/limit_test.go b/pkg/sql/colexec2/mergelimit/limit_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..52d76a0a32b199638fb87b1c8476e4b929310c35
--- /dev/null
+++ b/pkg/sql/colexec2/mergelimit/limit_test.go
@@ -0,0 +1,176 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mergelimit
+
+import (
+	"bytes"
+	"context"
+	"testing"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/testutil"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows          = 10      // default rows
+	BenchmarkRows = 1000000 // default rows for benchmark
+)
+
+// add unit tests for cases
+type limitTestCase struct {
+	arg    *Argument
+	types  []types.Type
+	proc   *process.Process
+	cancel context.CancelFunc
+}
+
+var (
+	tcs []limitTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []limitTestCase{
+		newTestCase(mheap.New(gm), 8),
+		newTestCase(mheap.New(gm), 10),
+		newTestCase(mheap.New(gm), 12),
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+	}
+}
+
+func TestLimit(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[0].Ch <- nil
+		tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[1].Ch <- nil
+		for {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+				}
+				break
+			}
+			if tc.proc.Reg.InputBatch != nil {
+				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+			}
+		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					bat.Clean(tc.proc.Mp)
+				}
+			}
+		}
+		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
+	}
+}
+
+func BenchmarkLimit(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		hm := host.New(1 << 30)
+		gm := guest.New(1<<30, hm)
+		tcs = []limitTestCase{
+			newTestCase(mheap.New(gm), 8),
+			newTestCase(mheap.New(gm), 10),
+			newTestCase(mheap.New(gm), 12),
+		}
+
+		t := new(testing.T)
+		for _, tc := range tcs {
+			err := Prepare(tc.proc, tc.arg)
+			require.NoError(t, err)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.types, tc.proc, BenchmarkRows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[0].Ch <- nil
+			tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.types, tc.proc, BenchmarkRows)
+			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[1].Ch <- nil
+			for {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+					}
+					break
+				}
+				if tc.proc.Reg.InputBatch != nil {
+					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+				}
+			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						bat.Clean(tc.proc.Mp)
+					}
+				}
+			}
+		}
+	}
+}
+
+func newTestCase(m *mheap.Mheap, limit uint64) limitTestCase {
+	proc := process.New(m)
+	proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
+	ctx, cancel := context.WithCancel(context.Background())
+	proc.Reg.MergeReceivers[0] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	proc.Reg.MergeReceivers[1] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	return limitTestCase{
+		proc: proc,
+		types: []types.Type{
+			{Oid: types.T_int8},
+		},
+		arg: &Argument{
+			Limit: limit,
+		},
+		cancel: cancel,
+	}
+}
+
+// create a new block based on the type information
+func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	return testutil.NewBatch(ts, false, int(rows), proc.Mp)
+}
diff --git a/pkg/sql/colexec2/mergelimit/types.go b/pkg/sql/colexec2/mergelimit/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..b30cd0fff5849b1ab703c6e5edf80b433dc6f7a8
--- /dev/null
+++ b/pkg/sql/colexec2/mergelimit/types.go
@@ -0,0 +1,32 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mergelimit
+
+const (
+	Eval = iota
+	End
+)
+
+type container struct {
+	state int
+	seen  uint64
+}
+
+type Argument struct {
+	// Limit records the limit number of this operator
+	Limit uint64
+	// ctr stores the attributes needn't do Serialization work
+	ctr *container
+}
diff --git a/pkg/sql/colexec2/mergeoffset/offset.go b/pkg/sql/colexec2/mergeoffset/offset.go
new file mode 100644
index 0000000000000000000000000000000000000000..af9e91fa80487ac6322487bc4bbff48490238505
--- /dev/null
+++ b/pkg/sql/colexec2/mergeoffset/offset.go
@@ -0,0 +1,105 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mergeoffset
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+)
+
+func String(arg interface{}, buf *bytes.Buffer) {
+	ap := arg.(*Argument)
+	buf.WriteString(fmt.Sprintf("mergeOffset(%d)", ap.Offset))
+}
+
+func Prepare(_ *process.Process, arg interface{}) error {
+	ap := arg.(*Argument)
+	ap.ctr = new(container)
+	ap.ctr.seen = 0
+	return nil
+}
+
+func Call(idx int, proc *process.Process, arg interface{}) (bool, error) {
+	ap := arg.(*Argument)
+	for {
+		switch ap.ctr.state {
+		case Eval:
+			anal := proc.GetAnalyze(idx)
+			defer anal.Stop()
+			ok, err := ap.ctr.eval(ap, proc, anal)
+			if err != nil {
+				return ok, err
+			}
+			if ok {
+				ap.ctr.state = End
+			}
+			return ok, err
+		default:
+			proc.SetInputBatch(nil)
+			return true, nil
+		}
+	}
+}
+
+func (ctr *container) eval(ap *Argument, proc *process.Process, anal process.Analyze) (bool, error) {
+	for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
+		reg := proc.Reg.MergeReceivers[i]
+		bat := <-reg.Ch
+		// 1. the last batch at this receiver
+		if bat == nil {
+			proc.Reg.MergeReceivers = append(proc.Reg.MergeReceivers[:i], proc.Reg.MergeReceivers[i+1:]...)
+			i--
+			continue
+		}
+		// 2. an empty batch
+		if bat.Length() == 0 {
+			i--
+			continue
+		}
+		anal.Input(bat)
+		if ap.ctr.seen > ap.Offset {
+			anal.Output(bat)
+			proc.SetInputBatch(bat)
+			return false, nil
+		}
+		length := len(bat.Zs)
+		// bat = PartOne + PartTwo, and PartTwo is required.
+		if ap.ctr.seen+uint64(length) > ap.Offset {
+			sels := newSels(int64(ap.Offset-ap.ctr.seen), int64(length)-int64(ap.Offset-ap.ctr.seen), proc)
+			ap.ctr.seen += uint64(length)
+			batch.Shrink(bat, sels)
+			proc.PutSels(sels)
+			anal.Output(bat)
+			proc.SetInputBatch(bat)
+			return false, nil
+		}
+		ap.ctr.seen += uint64(length)
+		bat.Clean(proc.Mp)
+		proc.SetInputBatch(nil)
+		i--
+	}
+	return true, nil
+}
+
+func newSels(start, count int64, proc *process.Process) []int64 {
+	sels := proc.GetSels()
+	for i := int64(0); i < count; i++ {
+		sels = append(sels, start+i)
+	}
+	return sels[:count]
+}
diff --git a/pkg/sql/colexec2/mergeoffset/offset_test.go b/pkg/sql/colexec2/mergeoffset/offset_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..9e264156a7f2b17df16d781305ad552d9789b302
--- /dev/null
+++ b/pkg/sql/colexec2/mergeoffset/offset_test.go
@@ -0,0 +1,177 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mergeoffset
+
+import (
+	"bytes"
+	"context"
+	"testing"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/testutil"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows          = 10      // default rows
+	BenchmarkRows = 1000000 // default rows for benchmark
+)
+
+// add unit tests for cases
+type offsetTestCase struct {
+	arg    *Argument
+	types  []types.Type
+	proc   *process.Process
+	cancel context.CancelFunc
+}
+
+var (
+	tcs []offsetTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []offsetTestCase{
+		newTestCase(mheap.New(gm), 8),
+		//		newTestCase(mheap.New(gm), 10),
+		//		newTestCase(mheap.New(gm), 12),
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+	}
+}
+
+func TestOffset(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+		tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[0].Ch <- nil
+		tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.types, tc.proc, Rows)
+		tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+		tc.proc.Reg.MergeReceivers[1].Ch <- nil
+		for {
+			if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
+				if tc.proc.Reg.InputBatch != nil {
+					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+				}
+				break
+			}
+			if tc.proc.Reg.InputBatch != nil {
+				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+			}
+		}
+		for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+			for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+				bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+				if bat != nil {
+					bat.Clean(tc.proc.Mp)
+				}
+			}
+		}
+		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
+	}
+}
+
+func BenchmarkOffset(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		hm := host.New(1 << 30)
+		gm := guest.New(1<<30, hm)
+		tcs = []offsetTestCase{
+			newTestCase(mheap.New(gm), 8),
+			newTestCase(mheap.New(gm), 10),
+			newTestCase(mheap.New(gm), 12),
+		}
+
+		t := new(testing.T)
+		for _, tc := range tcs {
+			err := Prepare(tc.proc, tc.arg)
+			require.NoError(t, err)
+			tc.proc.Reg.MergeReceivers[0].Ch <- newBatch(t, tc.types, tc.proc, BenchmarkRows)
+			tc.proc.Reg.MergeReceivers[0].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[0].Ch <- nil
+			tc.proc.Reg.MergeReceivers[1].Ch <- newBatch(t, tc.types, tc.proc, BenchmarkRows)
+			tc.proc.Reg.MergeReceivers[1].Ch <- &batch.Batch{}
+			tc.proc.Reg.MergeReceivers[1].Ch <- nil
+			for {
+				if ok, err := Call(0, tc.proc, tc.arg); ok || err != nil {
+					if tc.proc.Reg.InputBatch != nil {
+						tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+					}
+					break
+				}
+				if tc.proc.Reg.InputBatch != nil {
+					tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+				}
+			}
+			for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
+				for len(tc.proc.Reg.MergeReceivers[i].Ch) > 0 {
+					bat := <-tc.proc.Reg.MergeReceivers[i].Ch
+					if bat != nil {
+						bat.Clean(tc.proc.Mp)
+					}
+				}
+			}
+
+		}
+	}
+}
+
+func newTestCase(m *mheap.Mheap, offset uint64) offsetTestCase {
+	proc := process.New(m)
+	proc.Reg.MergeReceivers = make([]*process.WaitRegister, 2)
+	ctx, cancel := context.WithCancel(context.Background())
+	proc.Reg.MergeReceivers[0] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	proc.Reg.MergeReceivers[1] = &process.WaitRegister{
+		Ctx: ctx,
+		Ch:  make(chan *batch.Batch, 3),
+	}
+	return offsetTestCase{
+		proc: proc,
+		types: []types.Type{
+			{Oid: types.T_int8},
+		},
+		arg: &Argument{
+			Offset: offset,
+		},
+		cancel: cancel,
+	}
+}
+
+// create a new block based on the type information
+func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	return testutil.NewBatch(ts, false, int(rows), proc.Mp)
+}
diff --git a/pkg/sql/colexec2/mergeoffset/types.go b/pkg/sql/colexec2/mergeoffset/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..b1eeb5945d901f1c1d045db2b7485db5e16a6f45
--- /dev/null
+++ b/pkg/sql/colexec2/mergeoffset/types.go
@@ -0,0 +1,32 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mergeoffset
+
+const (
+	Eval = iota
+	End
+)
+
+type container struct {
+	state int
+	seen  uint64
+}
+
+type Argument struct {
+	// Offset records the offset number of mergeOffset operator
+	Offset uint64
+	// ctr contains the attributes needn't do serialization work
+	ctr *container
+}
diff --git a/pkg/sql/colexec2/offset/offset.go b/pkg/sql/colexec2/offset/offset.go
new file mode 100644
index 0000000000000000000000000000000000000000..397316d43803dffd57918f909b67170ec40fee08
--- /dev/null
+++ b/pkg/sql/colexec2/offset/offset.go
@@ -0,0 +1,71 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package offset
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+)
+
+func String(arg interface{}, buf *bytes.Buffer) {
+	n := arg.(*Argument)
+	buf.WriteString(fmt.Sprintf("offset(%v)", n.Offset))
+}
+
+func Prepare(_ *process.Process, _ interface{}) error {
+	return nil
+}
+
+func Call(idx int, proc *process.Process, arg interface{}) (bool, error) {
+	bat := proc.InputBatch()
+	if bat == nil {
+		return true, nil
+	}
+	if bat.Length() == 0 {
+		return false, nil
+	}
+	ap := arg.(*Argument)
+	anal := proc.GetAnalyze(idx)
+	anal.Start()
+	defer anal.Stop()
+	anal.Input(bat)
+	if ap.Seen > ap.Offset {
+		return false, nil
+	}
+	length := bat.Length()
+	if ap.Seen+uint64(length) > ap.Offset {
+		sels := newSels(int64(ap.Offset-ap.Seen), int64(length)-int64(ap.Offset-ap.Seen), proc)
+		ap.Seen += uint64(length)
+		bat.Shrink(sels)
+		proc.PutSels(sels)
+		proc.SetInputBatch(bat)
+		return false, nil
+	}
+	ap.Seen += uint64(length)
+	bat.Clean(proc.Mp)
+	proc.SetInputBatch(&batch.Batch{})
+	return false, nil
+}
+
+func newSels(start, count int64, proc *process.Process) []int64 {
+	sels := proc.GetSels()
+	for i := int64(0); i < count; i++ {
+		sels = append(sels, start+i)
+	}
+	return sels[:count]
+}
diff --git a/pkg/sql/colexec2/offset/offset_test.go b/pkg/sql/colexec2/offset/offset_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..5714256b1d8e185de5f51ea62775f4b9f82ea1e9
--- /dev/null
+++ b/pkg/sql/colexec2/offset/offset_test.go
@@ -0,0 +1,157 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package offset
+
+import (
+	"bytes"
+	"testing"
+
+	"github.com/matrixorigin/matrixone/pkg/container/batch"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/testutil"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	Rows          = 10      // default rows
+	BenchmarkRows = 1000000 // default rows for benchmark
+)
+
+// add unit tests for cases
+type offsetTestCase struct {
+	arg   *Argument
+	types []types.Type
+	proc  *process.Process
+}
+
+var (
+	tcs []offsetTestCase
+)
+
+func init() {
+	hm := host.New(1 << 30)
+	gm := guest.New(1<<30, hm)
+	tcs = []offsetTestCase{
+		{
+			proc: process.New(mheap.New(gm)),
+			types: []types.Type{
+				{Oid: types.T_int8},
+			},
+			arg: &Argument{
+				Seen:   0,
+				Offset: 8,
+			},
+		},
+		{
+			proc: process.New(mheap.New(gm)),
+			types: []types.Type{
+				{Oid: types.T_int8},
+			},
+			arg: &Argument{
+				Seen:   0,
+				Offset: 10,
+			},
+		},
+		{
+			proc: process.New(mheap.New(gm)),
+			types: []types.Type{
+				{Oid: types.T_int8},
+			},
+			arg: &Argument{
+				Seen:   0,
+				Offset: 12,
+			},
+		},
+	}
+}
+
+func TestString(t *testing.T) {
+	buf := new(bytes.Buffer)
+	for _, tc := range tcs {
+		String(tc.arg, buf)
+	}
+}
+
+func TestPrepare(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+	}
+}
+
+func TestOffset(t *testing.T) {
+	for _, tc := range tcs {
+		err := Prepare(tc.proc, tc.arg)
+		require.NoError(t, err)
+		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
+		_, _ = Call(0, tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+		}
+		tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, Rows)
+		_, _ = Call(0, tc.proc, tc.arg)
+		if tc.proc.Reg.InputBatch != nil {
+			tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+		}
+		tc.proc.Reg.InputBatch = &batch.Batch{}
+		_, _ = Call(0, tc.proc, tc.arg)
+		tc.proc.Reg.InputBatch = nil
+		_, _ = Call(0, tc.proc, tc.arg)
+		require.Equal(t, int64(0), mheap.Size(tc.proc.Mp))
+	}
+}
+
+func BenchmarkOffset(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		hm := host.New(1 << 30)
+		gm := guest.New(1<<30, hm)
+		tcs = []offsetTestCase{
+			{
+				proc: process.New(mheap.New(gm)),
+				types: []types.Type{
+					{Oid: types.T_int8},
+				},
+				arg: &Argument{
+					Seen:   0,
+					Offset: 8,
+				},
+			},
+		}
+
+		t := new(testing.T)
+		for _, tc := range tcs {
+			err := Prepare(tc.proc, tc.arg)
+			require.NoError(t, err)
+			tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, BenchmarkRows)
+			_, _ = Call(0, tc.proc, tc.arg)
+			if tc.proc.Reg.InputBatch != nil {
+				tc.proc.Reg.InputBatch.Clean(tc.proc.Mp)
+			}
+			tc.proc.Reg.InputBatch = &batch.Batch{}
+			_, _ = Call(0, tc.proc, tc.arg)
+			tc.proc.Reg.InputBatch = nil
+			_, _ = Call(0, tc.proc, tc.arg)
+		}
+	}
+}
+
+// create a new block based on the type information
+func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
+	return testutil.NewBatch(ts, false, int(rows), proc.Mp)
+}
diff --git a/pkg/sql/colexec2/offset/types.go b/pkg/sql/colexec2/offset/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..d8dfb8d82de404ab5c87cea87d2eb5a8aaa997cb
--- /dev/null
+++ b/pkg/sql/colexec2/offset/types.go
@@ -0,0 +1,20 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package offset
+
+type Argument struct {
+	Seen   uint64 // seen is the number of tuples seen so far
+	Offset uint64
+}
diff --git a/pkg/sql/colexec2/restrict/restrict.go b/pkg/sql/colexec2/restrict/restrict.go
new file mode 100644
index 0000000000000000000000000000000000000000..1e5de2a452b0a411da4084e5f45e01980dbd0ab3
--- /dev/null
+++ b/pkg/sql/colexec2/restrict/restrict.go
@@ -0,0 +1,78 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package restrict
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/matrixorigin/matrixone/pkg/container/vector"
+	"github.com/matrixorigin/matrixone/pkg/sql/errors"
+
+	"github.com/matrixorigin/matrixone/pkg/sql/colexec"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+)
+
+func String(arg interface{}, buf *bytes.Buffer) {
+	ap := arg.(*Argument)
+	buf.WriteString(fmt.Sprintf("filter(%s)", ap.E))
+}
+
+func Prepare(_ *process.Process, _ interface{}) error {
+	return nil
+}
+
+func Call(idx int, proc *process.Process, arg interface{}) (bool, error) {
+	bat := proc.InputBatch()
+	if bat == nil {
+		return true, nil
+	}
+	if bat.Length() == 0 {
+		return false, nil
+	}
+	ap := arg.(*Argument)
+	anal := proc.GetAnalyze(idx)
+	anal.Start()
+	defer anal.Stop()
+	anal.Input(bat)
+	vec, err := colexec.EvalExpr(bat, proc, ap.E)
+	if err != nil {
+		bat.Clean(proc.Mp)
+		return false, err
+	}
+	defer vec.Free(proc.Mp)
+	anal.Alloc(int64(vec.Size()))
+	if !vec.GetType().IsBoolean() {
+		return false, errors.New("", "Only bool expression can be used as filter condition.")
+	}
+	bs := vector.GetColumn[bool](vec)
+	if vec.IsScalar() {
+		if !bs[0] {
+			bat.Shrink(nil)
+		}
+	} else {
+		sels := proc.GetSels()
+		for i, b := range bs {
+			if b {
+				sels = append(sels, int64(i))
+			}
+		}
+		bat.Shrink(sels)
+		proc.PutSels(sels)
+	}
+	anal.Output(bat)
+	proc.SetInputBatch(bat)
+	return false, nil
+}
diff --git a/pkg/sql/colexec2/restrict/types.go b/pkg/sql/colexec2/restrict/types.go
new file mode 100644
index 0000000000000000000000000000000000000000..c248ecb417f335ffbdb5de9b0177c6b412027795
--- /dev/null
+++ b/pkg/sql/colexec2/restrict/types.go
@@ -0,0 +1,21 @@
+// Copyright 2021 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package restrict
+
+import "github.com/matrixorigin/matrixone/pkg/pb/plan"
+
+type Argument struct {
+	E *plan.Expr
+}
diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go
index 07e0ab74cb05da77018ee2d285050b02c5e666c5..2defc87b5233bb555f285fa01b4515675bc99d38 100644
--- a/pkg/vm/process/process.go
+++ b/pkg/vm/process/process.go
@@ -67,6 +67,18 @@ func PutSels(sels []int64, proc *Process) {
 	proc.Reg.Ss = append(proc.Reg.Ss, sels)
 }
 
+func (proc *Process) OperatorMemoryLimit() int64 {
+	return proc.Lim.Size
+}
+
+func (proc *Process) SetInputBatch(bat *batch.Batch) {
+	proc.Reg.InputBatch = bat
+}
+
+func (proc *Process) InputBatch() *batch.Batch {
+	return proc.Reg.InputBatch
+}
+
 func (proc *Process) GetSels() []int64 {
 	if len(proc.Reg.Ss) == 0 {
 		return make([]int64, 0, 16)