diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go
index 0c990c8e5a5e7e24497c1e48ed20f180fbdcfdc5..d9d1c4389facd2e62ac5fb49034d83e33fb59ee0 100644
--- a/pkg/container/batch/batch.go
+++ b/pkg/container/batch/batch.go
@@ -118,7 +118,7 @@ func (bat *Batch) Reduce(attrs []string, proc *process.Process) {
 		bat.Cow()
 	}
 	for _, attr := range attrs {
-		for i := range bat.Attrs {
+		for i := 0; i < len(bat.Attrs); i++ {
 			if bat.Attrs[i] != attr {
 				continue
 			}
@@ -129,6 +129,7 @@ func (bat *Batch) Reduce(attrs []string, proc *process.Process) {
 				}
 				bat.Vecs = append(bat.Vecs[:i], bat.Vecs[i+1:]...)
 				bat.Attrs = append(bat.Attrs[:i], bat.Attrs[i+1:]...)
+				i--
 			}
 			break
 		}
diff --git a/pkg/hash/joingroup.go b/pkg/hash/setgroup.go
similarity index 90%
rename from pkg/hash/joingroup.go
rename to pkg/hash/setgroup.go
index 353735eb7b2afa8210681b9e779021dbdf9b9207..ee4b4634ab7d05c9082e1de9091a3ba2bfa7f4ca 100644
--- a/pkg/hash/joingroup.go
+++ b/pkg/hash/setgroup.go
@@ -5,30 +5,21 @@ import (
 	"matrixbase/pkg/container/batch"
 	"matrixbase/pkg/container/types"
 	"matrixbase/pkg/container/vector"
-	"matrixbase/pkg/encoding"
 	"matrixbase/pkg/vm/process"
 )
 
-func NewJoinGroup(idx, sel int64) *JoinGroup {
-	return &JoinGroup{
+func NewSetGroup(idx, sel int64) *SetGroup {
+	return &SetGroup{
 		Idx: idx,
 		Sel: sel,
 	}
 }
 
-func (g *JoinGroup) Free(proc *process.Process) {
-	if g.Data != nil {
-		proc.Free(g.Data)
-		g.Data = nil
-	}
-	if g.Idata != nil {
-		proc.Free(g.Idata)
-		g.Idata = nil
-	}
+func (g *SetGroup) Free(_ *process.Process) {
 }
 
-func (g *JoinGroup) Probe(sels, matched []int64, vecs []*vector.Vector,
-	bats []*batch.Batch, diffs []bool, proc *process.Process) ([]int64, []int64, error) {
+func (g *SetGroup) Probe(sels, matched []int64, vecs []*vector.Vector,
+	bats []*batch.Batch, diffs []bool, proc *process.Process) (int64, []int64, error) {
 	for i, vec := range vecs {
 		switch vec.Typ.Oid {
 		case types.T_int8:
@@ -426,21 +417,23 @@ func (g *JoinGroup) Probe(sels, matched []int64, vecs []*vector.Vector,
 		}
 	}
 	n := len(sels)
-	remaining := sels[:0]
 	matched = matched[:0]
+	remaining := sels[:0]
 	for i := 0; i < n; i++ {
 		if diffs[i] {
 			remaining = append(remaining, sels[i])
-		} else {
+		} else if len(matched) == 0 {
 			matched = append(matched, sels[i])
 		}
 	}
-	return matched, remaining, nil
+	if len(matched) == 0 {
+		return -1, remaining, nil
+	}
+	return matched[0], remaining, nil
 }
 
-func (g *JoinGroup) Fill(distinct bool, sels, matched []int64, vecs []*vector.Vector,
+func (g *SetGroup) Fill(sels, matched []int64, vecs []*vector.Vector,
 	bats []*batch.Batch, diffs []bool, proc *process.Process) ([]int64, error) {
-	idx := int64(len(bats) - 1)
 	for i, vec := range vecs {
 		switch vec.Typ.Oid {
 		case types.T_int8:
@@ -839,76 +832,9 @@ func (g *JoinGroup) Fill(distinct bool, sels, matched []int64, vecs []*vector.Ve
 	}
 	n := len(sels)
 	remaining := sels[:0]
-	if !distinct {
-		matched = matched[:0]
-		for i := 0; i < n; i++ {
-			if diffs[i] {
-				remaining = append(remaining, sels[i])
-			} else {
-				matched = append(matched, sels[i])
-			}
-		}
-		if len(matched) > 0 {
-			length := len(g.Sels) + len(matched)
-			if cap(g.Sels) < length {
-				data, err := proc.Alloc(int64(length) * 8)
-				if err != nil {
-					return nil, err
-				}
-				idata, err := proc.Alloc(int64(length) * 8)
-				if err != nil {
-					proc.Free(data)
-					return nil, err
-				}
-				copy(data, g.Data)
-				copy(idata, g.Idata)
-				proc.Free(g.Data)
-				proc.Free(g.Idata)
-				g.Is = encoding.DecodeInt64Slice(idata)
-				g.Sels = encoding.DecodeInt64Slice(data)
-				g.Data = data[:length]
-				g.Sels = g.Sels[:length]
-				g.Is = g.Is[:length]
-				g.Idata = idata[:length]
-			}
-			for _ = range matched {
-				g.Is = append(g.Is, idx)
-			}
-			g.Sels = append(g.Sels, matched...)
-		}
-	} else {
-		if len(g.Sels) > 0 {
-			for i := 0; i < n; i++ {
-				if diffs[i] {
-					remaining = append(remaining, sels[i])
-				}
-			}
-		} else {
-			matched = matched[:0]
-			for i := 0; i < n; i++ {
-				if diffs[i] {
-					remaining = append(remaining, sels[i])
-				} else if len(matched) == 0 {
-					matched = append(matched, sels[i])
-				}
-			}
-			if len(matched) > 0 && cap(g.Sels) == 0 {
-				data, err := proc.Alloc(8)
-				if err != nil {
-					return nil, err
-				}
-				idata, err := proc.Alloc(8)
-				if err != nil {
-					proc.Free(data)
-					return nil, err
-				}
-				g.Data = data
-				g.Idata = idata
-				g.Is = encoding.DecodeInt64Slice(idata)
-				g.Sels = encoding.DecodeInt64Slice(data)
-				g.Is[0] = idx
-				g.Sels[0] = matched[0]
-			}
+	for i := 0; i < n; i++ {
+		if diffs[i] {
+			remaining = append(remaining, sels[i])
 		}
 	}
 	return remaining, nil
diff --git a/pkg/hash/types.go b/pkg/hash/types.go
index ea24b8daf010a0d15df4638becea956722d4c5fb..a92f510c3b1b3485f0d2da8b8668dbf5944db2ea 100644
--- a/pkg/hash/types.go
+++ b/pkg/hash/types.go
@@ -6,11 +6,7 @@ type Group struct {
 	Sels []int64
 }
 
-type JoinGroup struct {
-	Idx   int64
-	Sel   int64
-	Data  []byte
-	Idata []byte
-	Is    []int64
-	Sels  []int64
+type SetGroup struct {
+	Idx int64
+	Sel int64
 }
diff --git a/pkg/sql/colexec/hashgroup/group.go b/pkg/sql/colexec/hashgroup/group.go
index 9b41a167ba30268cb1fa115a54fbf0d9d2a9098a..ea754c887c65b71ce750f182b22d14c8809e60bc 100644
--- a/pkg/sql/colexec/hashgroup/group.go
+++ b/pkg/sql/colexec/hashgroup/group.go
@@ -35,6 +35,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	copy(n.Rattrs[len(n.Es):], n.Gs)
 	n.Ctr = Container{
 		diffs:  make([]bool, UnitLimit),
+		matchs: make([]int64, UnitLimit),
 		hashs:  make([]uint64, UnitLimit),
 		sels:   make([][]int64, UnitLimit),
 		groups: make(map[uint64][]*hash.Group),
@@ -187,6 +188,7 @@ func (ctr *Container) unitGroup(start int, count int, sels []int64, vecs []*vect
 				}
 				copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
 			}
+			ctr.sels[ctr.slots.Vs[i][j]] = ctr.sels[ctr.slots.Vs[i][j]][:0]
 		}
 	}
 	ctr.slots.Reset()
@@ -238,4 +240,5 @@ func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
 			g.Free(proc)
 		}
 	}
+	register.FreeRegisters(proc)
 }
diff --git a/pkg/sql/colexec/hashjoin/join.go b/pkg/sql/colexec/hashjoin/join.go
deleted file mode 100644
index eff12eee8d932ade632eedd1457b2fdd2a565da7..0000000000000000000000000000000000000000
--- a/pkg/sql/colexec/hashjoin/join.go
+++ /dev/null
@@ -1,243 +0,0 @@
-package hashjoin
-
-import (
-	"matrixbase/pkg/container/batch"
-	"matrixbase/pkg/container/vector"
-	"matrixbase/pkg/hash"
-	"matrixbase/pkg/intmap/fastmap"
-	"matrixbase/pkg/vm/process"
-)
-
-func Prepare(proc *process.Process, arg interface{}) error {
-	return nil
-}
-
-// R ⨝ S, S is the small relation
-func Call(proc *process.Process, arg interface{}) (bool, error) {
-	n := arg.(*Argument)
-	ctr := &n.Ctr
-	if !ctr.builded {
-		if err := ctr.build(n.Sattrs, n.Distinct, proc); err != nil {
-			return false, err
-		}
-		ctr.builded = true
-	}
-	return ctr.probe(n.Rattrs, n.Distinct, proc)
-}
-
-func (ctr *Container) build(attrs []string, distinct bool, proc *process.Process) error {
-	var err error
-
-	ch := proc.Reg.Cs[1]
-	for {
-		v := <-ch
-		if v == nil {
-			break
-		}
-		bat := v.(*batch.Batch)
-		bat.Reorder(attrs)
-		if err = bat.Prefetch(attrs, bat.Vecs, proc); err != nil {
-			return err
-		}
-		ctr.bats = append(ctr.bats, bat)
-		if len(bat.Sels) == 0 {
-			if err = ctr.fillBatch(distinct, bat.Vecs[:len(attrs)], proc); err != nil {
-				return err
-			}
-		} else {
-			if err = ctr.fillBatchSels(distinct, bat.Sels, bat.Vecs[:len(attrs)], proc); err != nil {
-				return err
-			}
-		}
-	}
-	return nil
-}
-
-func (ctr *Container) probe(attrs []string, distinct bool, proc *process.Process) (bool, error) {
-	if bat := ctr.probeState.bat; bat != nil {
-		return false, ctr.probeBatch(distinct, bat.Vecs[:len(attrs)], proc)
-	}
-	v := <-proc.Reg.Cs[0]
-	if v == nil {
-		return true, nil
-	}
-	bat := v.(*batch.Batch)
-	bat.Reorder(attrs)
-	ctr.probeState.bat = bat
-	ctr.probeState.start = 0
-	return false, ctr.probeBatch(distinct, bat.Vecs[:len(attrs)], proc)
-}
-
-func (ctr *Container) fillBatch(distinct bool, vecs []*vector.Vector, proc *process.Process) error {
-	for i, j := 0, vecs[0].Length(); i < j; i += UnitLimit {
-		length := j - i
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.fillUnit(distinct, i, length, nil, vecs, proc); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func (ctr *Container) fillBatchSels(distinct bool, sels []int64, vecs []*vector.Vector, proc *process.Process) error {
-	for i, j := 0, len(sels); i < j; i += UnitLimit {
-		length := j - i
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.fillUnit(distinct, 0, length, sels[i:i+length], vecs, proc); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func (ctr *Container) fillUnit(distinct bool, start, count int, sels []int64,
-	vecs []*vector.Vector, proc *process.Process) error {
-	var err error
-
-	{
-		copy(ctr.hashs[:count], OneUint64s[:count])
-		if len(sels) == 0 {
-			ctr.fillHash(start, count, vecs)
-		} else {
-			ctr.fillHashSels(count, sels, vecs)
-		}
-	}
-	copy(ctr.diffs[:count], ZeroBools[:count])
-	for i, hs := range ctr.slots.Ks {
-		for j, h := range hs {
-			remaining := ctr.sels[ctr.slots.Vs[i][j]]
-			if gs, ok := ctr.groups[h]; ok {
-				for _, g := range gs {
-					if remaining, err = g.Fill(distinct, remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
-						return err
-					}
-					copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
-				}
-			} else {
-				ctr.groups[h] = make([]*hash.JoinGroup, 0, 8)
-			}
-			for len(remaining) > 0 {
-				g := hash.NewJoinGroup(int64(len(ctr.bats)-1), int64(remaining[0]))
-				ctr.groups[h] = append(ctr.groups[h], g)
-				if remaining, err = g.Fill(distinct, remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
-					return err
-				}
-				copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
-			}
-		}
-	}
-	ctr.slots.Reset()
-	return nil
-}
-
-func (ctr *Container) fillHash(start, count int, vecs []*vector.Vector) {
-	ctr.hashs = ctr.hashs[:count]
-	for _, vec := range vecs {
-		hash.Rehash(count, ctr.hashs, vec)
-	}
-	nextslot := 0
-	for i, h := range ctr.hashs {
-		slot, ok := ctr.slots.Get(h)
-		if !ok {
-			slot = nextslot
-			ctr.slots.Set(h, slot)
-			nextslot++
-		}
-		ctr.sels[slot] = append(ctr.sels[slot], int64(i+start))
-	}
-}
-
-func (ctr *Container) fillHashSels(count int, sels []int64, vecs []*vector.Vector) {
-	ctr.hashs = ctr.hashs[:count]
-	for _, vec := range vecs {
-		hash.RehashSels(count, sels, ctr.hashs, vec)
-	}
-	nextslot := 0
-	for i, h := range ctr.hashs {
-		slot, ok := ctr.slots.Get(h)
-		if !ok {
-			slot = nextslot
-			ctr.slots.Set(h, slot)
-			nextslot++
-		}
-		ctr.sels[slot] = append(ctr.sels[slot], sels[i])
-	}
-}
-
-func (ctr *Container) probeBatch(distinct bool, vecs []*vector.Vector, proc *process.Process) error {
-	defer func() { ctr.probeState.size = 0 }()
-	for ; ctr.probeState.start < ctr.probeState.end; ctr.probeState.start += UnitLimit {
-		length := ctr.probeState.end - ctr.probeState.start
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.probeUnit(distinct, ctr.probeState.start, length, nil, vecs, proc); err != nil {
-			return err
-		}
-		if ctr.probeState.size > ctr.probeState.limit {
-			ctr.probeState.start += UnitLimit
-			return nil
-		}
-	}
-	ctr.probeState.bat = nil
-	return nil
-}
-
-func (ctr *Container) probeBatchSels(distinct bool, sels []int64, vecs []*vector.Vector, proc *process.Process) error {
-	defer func() { ctr.probeState.size = 0 }()
-	for ; ctr.probeState.start < ctr.probeState.end; ctr.probeState.start += UnitLimit {
-		length := ctr.probeState.end - ctr.probeState.start
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.probeUnit(distinct, 0, length, sels[ctr.probeState.start:ctr.probeState.start+length], vecs, proc); err != nil {
-			return err
-		}
-		if ctr.probeState.size > ctr.probeState.limit {
-			ctr.probeState.start += UnitLimit
-			return nil
-		}
-	}
-	ctr.probeState.bat = nil
-	return nil
-}
-
-func (ctr *Container) probeUnit(distinct bool, start, count int, sels []int64,
-	vecs []*vector.Vector, proc *process.Process) error {
-	var err error
-
-	{
-		copy(ctr.hashs[:count], OneUint64s[:count])
-		if len(sels) == 0 {
-			ctr.fillHash(start, count, vecs)
-		} else {
-			ctr.fillHashSels(count, sels, vecs)
-		}
-	}
-	copy(ctr.diffs[:count], ZeroBools[:count])
-	for i, hs := range ctr.slots.Ks {
-		for j, h := range hs {
-			remaining := ctr.sels[ctr.slots.Vs[i][j]]
-			if gs, ok := ctr.groups[h]; ok {
-				for _, g := range gs {
-					if ctr.matchs, remaining, err = g.Probe(remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
-						return err
-					}
-					// product
-					copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
-				}
-			}
-		}
-	}
-	ctr.slots.Reset()
-	return nil
-}
-
-func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
-	bat.Clean(proc)
-	fastmap.Pool.Put(ctr.slots)
-}
diff --git a/pkg/sql/colexec/hashjoin/types.go b/pkg/sql/colexec/hashjoin/types.go
deleted file mode 100644
index 89aa568549060323c6f39dddf400391eb04bd76f..0000000000000000000000000000000000000000
--- a/pkg/sql/colexec/hashjoin/types.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package hashjoin
-
-import (
-	"matrixbase/pkg/container/batch"
-	"matrixbase/pkg/hash"
-	"matrixbase/pkg/intmap/fastmap"
-	"matrixbase/pkg/sql/join"
-)
-
-const (
-	UnitLimit = 1024
-)
-
-var (
-	ZeroBools  []bool
-	OneUint64s []uint64
-)
-
-type Container struct {
-	builded    bool
-	diffs      []bool
-	matchs     []int64
-	hashs      []uint64
-	sels       [][]int64      // sels
-	slots      *fastmap.Map   // hash code -> sels index
-	bats       []*batch.Batch // s relation
-	probeState struct {
-		size  int
-		limit int
-		start int
-		end   int
-		bat   *batch.Batch
-	}
-	groups map[uint64][]*hash.JoinGroup // hash code -> join list
-}
-
-type Argument struct {
-	Distinct bool
-	Attrs    []string
-	Rattrs   []string
-	Sattrs   []string
-	Ctr      Container
-	JoinType join.JoinType
-}
diff --git a/pkg/sql/colexec/limit/limit.go b/pkg/sql/colexec/limit/limit.go
index e664149938826ce01d649757bdf7bef266e19ea1..fe4cd3e3d1899025af5d6831419e461502f3c50b 100644
--- a/pkg/sql/colexec/limit/limit.go
+++ b/pkg/sql/colexec/limit/limit.go
@@ -20,6 +20,9 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	if length := uint64(len(bat.Sels)); length > 0 {
diff --git a/pkg/sql/colexec/mergesum/summarize.go b/pkg/sql/colexec/mergesum/summarize.go
index 88d46c110d9e4bb4c8a3c4af76f0a0df8d18465f..cfe8a5b138b54147584645762a0b4521d8ebafd4 100644
--- a/pkg/sql/colexec/mergesum/summarize.go
+++ b/pkg/sql/colexec/mergesum/summarize.go
@@ -18,10 +18,13 @@ func Prepare(proc *process.Process, arg interface{}) error {
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
-	for i, c := range proc.Reg.Cs {
-		v := <-c
+	for i := 0; i < len(proc.Reg.Ws); i++ {
+		reg := proc.Reg.Ws[i]
+		v := <-reg.Ch
 		if v == nil {
-			proc.Reg.Cs = append(proc.Reg.Cs[:i], proc.Reg.Cs[i:]...)
+			reg.Wg.Done()
+			proc.Reg.Ws = append(proc.Reg.Ws[:i], proc.Reg.Ws[i:]...)
+			i--
 			continue
 		}
 		bat := v.(*batch.Batch)
@@ -34,6 +37,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
 				return false, err
 			}
 		}
+		reg.Wg.Done()
 		bat.Clean(proc)
 	}
 	bat := batch.New(true, n.Attrs)
diff --git a/pkg/sql/colexec/offset/offset.go b/pkg/sql/colexec/offset/offset.go
index 73412d4427f86d351055a5a7d137758b0a9b919d..73bc5de0eab3e595575d29bc43d159bbbbf9cdb0 100644
--- a/pkg/sql/colexec/offset/offset.go
+++ b/pkg/sql/colexec/offset/offset.go
@@ -20,6 +20,9 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	if n.Seen > n.Offset {
diff --git a/pkg/sql/colexec/output/output.go b/pkg/sql/colexec/output/output.go
index 533ae43b35b99e6f20c315e8f17780fae394eac0..feed54e5ed2583f641d1b7366d14651373b2d5e4 100644
--- a/pkg/sql/colexec/output/output.go
+++ b/pkg/sql/colexec/output/output.go
@@ -16,8 +16,8 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
-	bat := proc.Reg.Ax.(*batch.Batch)
-	if bat != nil {
+	if proc.Reg.Ax != nil {
+		bat := proc.Reg.Ax.(*batch.Batch)
 		fmt.Printf("%s\n", bat)
 		bat.Clean(proc)
 	}
diff --git a/pkg/sql/colexec/projection/projection.go b/pkg/sql/colexec/projection/projection.go
index efccc39bb5ffbc1914d89bcc59d826c64bf031cc..a06d774bad6b939c47bb3b4703aa719e00b1c295 100644
--- a/pkg/sql/colexec/projection/projection.go
+++ b/pkg/sql/colexec/projection/projection.go
@@ -27,9 +27,12 @@ func Prepare(_ *process.Process, _ interface{}) error {
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	var err error
 
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
-	rbat := batch.New(true, n.Attrs)
 	bat := proc.Reg.Ax.(*batch.Batch)
+	rbat := batch.New(true, n.Attrs)
 	for i := range n.Attrs {
 		if rbat.Vecs[i], _, err = n.Es[i].Eval(bat, proc); err != nil {
 			rbat.Vecs = rbat.Vecs[:i]
diff --git a/pkg/sql/colexec/restrict/restrict.go b/pkg/sql/colexec/restrict/restrict.go
index ef90d1ee387a7472a2719c591574615ef5b51998..4db09336c8bd7d45ee91874229ac5c957445ccd6 100644
--- a/pkg/sql/colexec/restrict/restrict.go
+++ b/pkg/sql/colexec/restrict/restrict.go
@@ -20,6 +20,9 @@ func Prepare(_ *process.Process, arg interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	vec, _, err := n.E.Eval(bat, proc)
diff --git a/pkg/sql/colexec/summarize/summarize.go b/pkg/sql/colexec/summarize/summarize.go
index 28d1b33d96cf215e7095450cfbbfa3980702a151..44abb7f119a1411e85e1bd5774ff46d533b69562 100644
--- a/pkg/sql/colexec/summarize/summarize.go
+++ b/pkg/sql/colexec/summarize/summarize.go
@@ -17,6 +17,9 @@ func Prepare(proc *process.Process, arg interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	rbat := batch.New(true, n.Attrs)
 	bat := proc.Reg.Ax.(*batch.Batch)
diff --git a/pkg/sql/colexec/top/top.go b/pkg/sql/colexec/top/top.go
index 2c89a424b8821a5875ac965deaed0cc659292aad..d3eb2a6dc1144ffabf9843ea80a291ce7e1eef93 100644
--- a/pkg/sql/colexec/top/top.go
+++ b/pkg/sql/colexec/top/top.go
@@ -43,6 +43,9 @@ func Prepare(proc *process.Process, arg interface{}) error {
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	var err error
 
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	if err = bat.Prefetch(n.Attrs, n.Ctr.vecs, proc); err != nil {
diff --git a/pkg/sql/colexec/transfer/transfer.go b/pkg/sql/colexec/transfer/transfer.go
index 81093593538b19521f060ea2ceb0ff549ce38385..065a20a658493a2d9e31f684788f2850659d3caf 100644
--- a/pkg/sql/colexec/transfer/transfer.go
+++ b/pkg/sql/colexec/transfer/transfer.go
@@ -1,20 +1,30 @@
 package transfer
 
 import (
+	"bytes"
 	"matrixbase/pkg/container/batch"
 	"matrixbase/pkg/vm/process"
 )
 
+func String(_ interface{}, buf *bytes.Buffer) {
+	buf.WriteString("transfer")
+}
+
 func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
-	n := arg.(Argument)
-	bat := proc.Reg.Ax.(*batch.Batch)
-	if bat != nil {
-		n.Ch <- bat
+	reg := arg.(*Argument).Reg
+	if reg.Ch == nil {
+		if proc.Reg.Ax != nil {
+			bat := proc.Reg.Ax.(*batch.Batch)
+			bat.Clean(proc)
+		}
+		return true, nil
 	}
-	n.Ch <- nil
+	reg.Wg.Add(1)
+	reg.Ch <- proc.Reg.Ax
+	reg.Wg.Wait()
 	return false, nil
 }
diff --git a/pkg/sql/colexec/transfer/types.go b/pkg/sql/colexec/transfer/types.go
index 69612851fbe5b782318a48ad16900c8e035bd763..46c9c660b2228ef520fbaee4b219dded60269f2e 100644
--- a/pkg/sql/colexec/transfer/types.go
+++ b/pkg/sql/colexec/transfer/types.go
@@ -1,5 +1,7 @@
 package transfer
 
+import "matrixbase/pkg/vm/process"
+
 type Argument struct {
-	Ch chan interface{}
+	Reg *process.WaitRegister
 }
diff --git a/pkg/sql/colexec/unittest/extendProjection_test.go b/pkg/sql/colexec/unittest/extendProjection_test.go
index 8afc1b342654c8dcc94cccbf14b2a2afeabea8a1..f379058bfbf2749a6e66a709843431927b467ec6 100644
--- a/pkg/sql/colexec/unittest/extendProjection_test.go
+++ b/pkg/sql/colexec/unittest/extendProjection_test.go
@@ -35,7 +35,7 @@ func TestExtendProjection(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1}, []string{"price"}, ins)
 	fmt.Printf("%s\n", p)
-	if _, err := p.Run(segments(proc), proc); err != nil {
+	if _, err := p.Run(segments("R", proc), proc); err != nil {
 		log.Fatal(err)
 	}
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
diff --git a/pkg/sql/colexec/unittest/intersect_test.go b/pkg/sql/colexec/unittest/intersect_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..e8fbb6c5251570fe75f7c846c6d8a88c30ef61b2
--- /dev/null
+++ b/pkg/sql/colexec/unittest/intersect_test.go
@@ -0,0 +1,76 @@
+package unittest
+
+import (
+	"fmt"
+	"matrixbase/pkg/sql/colexec/hashset/intersect"
+	"matrixbase/pkg/sql/colexec/transfer"
+	"matrixbase/pkg/vm"
+	"matrixbase/pkg/vm/mempool"
+	"matrixbase/pkg/vm/mmu/guest"
+	"matrixbase/pkg/vm/mmu/host"
+	"matrixbase/pkg/vm/pipeline"
+	"matrixbase/pkg/vm/process"
+	"sync"
+	"testing"
+)
+
+func TestIntersect(t *testing.T) {
+	var wg sync.WaitGroup
+	var ins vm.Instructions
+
+	hm := host.New(1 << 20)
+	proc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
+	{
+		proc.Refer = make(map[string]uint64)
+		proc.Reg.Ws = make([]*process.WaitRegister, 2)
+		for i := 0; i < 2; i++ {
+			proc.Reg.Ws[i] = &process.WaitRegister{
+				Wg: new(sync.WaitGroup),
+				Ch: make(chan interface{}),
+			}
+		}
+	}
+	{
+		var rins vm.Instructions
+
+		rproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
+		{
+			rproc.Refer = make(map[string]uint64)
+		}
+		rins = append(rins, vm.Instruction{vm.Transfer, &transfer.Argument{proc.Reg.Ws[0]}})
+		rp := pipeline.New([]uint64{1}, []string{"orderId"}, rins)
+		wg.Add(1)
+		go func() {
+			fmt.Printf("R: %s\n", rp)
+			rp.Run(segments("R", rproc), rproc)
+			fmt.Printf("R - guest: %v, host: %v\n", rproc.Size(), rproc.HostSize())
+			wg.Done()
+		}()
+	}
+	{
+		var sins vm.Instructions
+
+		sproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
+		{
+			sproc.Refer = make(map[string]uint64)
+		}
+		sins = append(sins, vm.Instruction{vm.Transfer, &transfer.Argument{proc.Reg.Ws[1]}})
+		sp := pipeline.New([]uint64{1}, []string{"orderId"}, sins)
+		wg.Add(1)
+		go func() {
+			fmt.Printf("S: %s\n", sp)
+			sp.Run(segments("S", sproc), sproc)
+			fmt.Printf("S - guest: %v, host: %v\n", sproc.Size(), sproc.HostSize())
+			wg.Done()
+		}()
+	}
+	{
+		ins = append(ins, vm.Instruction{vm.SetIntersect, &intersect.Argument{}})
+		ins = append(ins, vm.Instruction{vm.Output, nil})
+	}
+	p := pipeline.NewMerge(ins)
+	fmt.Printf("%s\n", p)
+	p.RunMerge(proc)
+	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
+	wg.Wait()
+}
diff --git a/pkg/sql/colexec/unittest/limit_test.go b/pkg/sql/colexec/unittest/limit_test.go
index dd30fcd39222a8f1cdb8d210610d3aaf442f0673..19950ebfa8c5e202401a03749d2e24b85e7d4d7a 100644
--- a/pkg/sql/colexec/unittest/limit_test.go
+++ b/pkg/sql/colexec/unittest/limit_test.go
@@ -36,6 +36,6 @@ func TestLimit(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
 	fmt.Printf("%s\n", p)
-	p.Run(segments(proc), proc)
+	p.Run(segments("R", proc), proc)
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
 }
diff --git a/pkg/sql/colexec/unittest/offset_test.go b/pkg/sql/colexec/unittest/offset_test.go
index 61af304ad85f0d7b6ba7ef3186ca4c3478d7b475..804c54bf9d0f8626ff7d12d64ecc478d4f98694f 100644
--- a/pkg/sql/colexec/unittest/offset_test.go
+++ b/pkg/sql/colexec/unittest/offset_test.go
@@ -36,6 +36,6 @@ func TestOffset(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
 	fmt.Printf("%s\n", p)
-	p.Run(segments(proc), proc)
+	p.Run(segments("R", proc), proc)
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
 }
diff --git a/pkg/sql/colexec/unittest/projection_test.go b/pkg/sql/colexec/unittest/projection_test.go
index ab1ae3823a0be32e4a2224fa07946e15ffe57569..6823b5b5cd800b8eba10424fc7d755fbbba2dde4 100644
--- a/pkg/sql/colexec/unittest/projection_test.go
+++ b/pkg/sql/colexec/unittest/projection_test.go
@@ -32,6 +32,6 @@ func TestProjection(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
 	fmt.Printf("%s\n", p)
-	p.Run(segments(proc), proc)
+	p.Run(segments("R", proc), proc)
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
 }
diff --git a/pkg/sql/colexec/unittest/segments.go b/pkg/sql/colexec/unittest/segments.go
index 336355f3ab22759b35079fed81d369614f9d3cbd..b49c84a6f9cbcb1ad7a3e7c93dd26b2a45c8ab29 100644
--- a/pkg/sql/colexec/unittest/segments.go
+++ b/pkg/sql/colexec/unittest/segments.go
@@ -7,9 +7,9 @@ import (
 	"matrixbase/pkg/vm/process"
 )
 
-func segments(proc *process.Process) []engine.Segment {
+func segments(name string, proc *process.Process) []engine.Segment {
 	e := memEngine.NewTestEngine()
-	r, err := e.Relation("test")
+	r, err := e.Relation(name)
 	if err != nil {
 		log.Fatal(err)
 	}
diff --git a/pkg/sql/join/types.go b/pkg/sql/join/types.go
deleted file mode 100644
index c3611093af1658e541e2b055c82bae45e391f5fc..0000000000000000000000000000000000000000
--- a/pkg/sql/join/types.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package join
-
-const (
-	Inner = iota
-	Left
-	Right
-	Full
-	Semi
-	Anti
-	ExceptAll
-	IntersectAll
-)
-
-type JoinType uint8
diff --git a/pkg/vm/engine/memEngine/testEngine.go b/pkg/vm/engine/memEngine/testEngine.go
index 7db0995f373614efc613444496b6e6fc1a75304e..82165ba424835c2dc45a8ce130e65cd50fdb1604 100644
--- a/pkg/vm/engine/memEngine/testEngine.go
+++ b/pkg/vm/engine/memEngine/testEngine.go
@@ -14,6 +14,12 @@ import (
 
 func NewTestEngine() engine.Engine {
 	e := New(kv.New())
+	CreateR(e)
+	CreateS(e)
+	return e
+}
+
+func CreateR(e engine.Engine) {
 	{
 		var attrs []metadata.Attribute
 
@@ -34,11 +40,11 @@ func NewTestEngine() engine.Engine {
 				Type: types.Type{types.T(types.T_float64), 8, 8, 0},
 			})
 		}
-		if err := e.Create("test", attrs); err != nil {
+		if err := e.Create("R", attrs); err != nil {
 			log.Fatal(err)
 		}
 	}
-	r, err := e.Relation("test")
+	r, err := e.Relation("R")
 	if err != nil {
 		log.Fatal(err)
 	}
@@ -122,5 +128,115 @@ func NewTestEngine() engine.Engine {
 			log.Fatal(err)
 		}
 	}
-	return e
+}
+
+func CreateS(e engine.Engine) {
+	{
+		var attrs []metadata.Attribute
+
+		{
+			attrs = append(attrs, metadata.Attribute{
+				Alg:  compress.Lz4,
+				Name: "orderId",
+				Type: types.Type{types.T(types.T_varchar), 24, 0, 0},
+			})
+			attrs = append(attrs, metadata.Attribute{
+				Alg:  compress.Lz4,
+				Name: "uid",
+				Type: types.Type{types.T(types.T_varchar), 24, 0, 0},
+			})
+			attrs = append(attrs, metadata.Attribute{
+				Alg:  compress.Lz4,
+				Name: "price",
+				Type: types.Type{types.T(types.T_float64), 8, 8, 0},
+			})
+		}
+		if err := e.Create("S", attrs); err != nil {
+			log.Fatal(err)
+		}
+	}
+	r, err := e.Relation("S")
+	if err != nil {
+		log.Fatal(err)
+	}
+	{
+		bat := batch.New(true, []string{"orderId", "uid", "price"})
+		{
+			{
+				vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+				vs := make([][]byte, 10)
+				for i := 0; i < 10; i++ {
+					vs[i] = []byte(fmt.Sprintf("%v", i*2))
+				}
+				if err := vec.Append(vs); err != nil {
+					log.Fatal(err)
+				}
+				bat.Vecs[0] = vec
+			}
+			{
+				vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+				vs := make([][]byte, 10)
+				for i := 0; i < 10; i++ {
+					vs[i] = []byte(fmt.Sprintf("%v", i%2))
+				}
+				if err := vec.Append(vs); err != nil {
+					log.Fatal(err)
+				}
+				bat.Vecs[1] = vec
+			}
+			{
+				vec := vector.New(types.Type{types.T(types.T_float64), 8, 8, 0})
+				vs := make([]float64, 10)
+				for i := 0; i < 10; i++ {
+					vs[i] = float64(i)
+				}
+				if err := vec.Append(vs); err != nil {
+					log.Fatal(err)
+				}
+				bat.Vecs[2] = vec
+			}
+		}
+		if err := r.Write(bat); err != nil {
+			log.Fatal(err)
+		}
+	}
+	{
+		bat := batch.New(true, []string{"orderId", "uid", "price"})
+		{
+			vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+			vs := make([][]byte, 10)
+			for i := 10; i < 20; i++ {
+				vs[i-10] = []byte(fmt.Sprintf("%v", i*2))
+			}
+			if err := vec.Append(vs); err != nil {
+				log.Fatal(err)
+			}
+			bat.Vecs[0] = vec
+		}
+		{
+			vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+			vs := make([][]byte, 10)
+			for i := 10; i < 20; i++ {
+				vs[i-10] = []byte(fmt.Sprintf("%v", i%2))
+			}
+			if err := vec.Append(vs); err != nil {
+				log.Fatal(err)
+			}
+			bat.Vecs[1] = vec
+		}
+		{
+			vec := vector.New(types.Type{types.T(types.T_float64), 8, 8, 0})
+			vs := make([]float64, 10)
+			for i := 10; i < 20; i++ {
+				vs[i-10] = float64(i)
+			}
+			if err := vec.Append(vs); err != nil {
+				log.Fatal(err)
+			}
+			bat.Vecs[2] = vec
+		}
+		if err := r.Write(bat); err != nil {
+			log.Fatal(err)
+		}
+	}
 }
diff --git a/pkg/vm/instruction.go b/pkg/vm/instruction.go
deleted file mode 100644
index cbffab2ebec3dcbaf8b7207254bb976949ebbc88..0000000000000000000000000000000000000000
--- a/pkg/vm/instruction.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package vm
-
-import (
-	"bytes"
-)
-
-func (ins Instructions) String() string {
-	var buf bytes.Buffer
-
-	for _, in := range ins {
-		switch in.Op {
-		case Nub:
-		case Top:
-		case Limit:
-		case Group:
-		case Order:
-		case Transfer:
-		case Restrict:
-		case Summarize:
-		case Projection:
-		case SetUnion:
-		case SetIntersect:
-		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
-		}
-	}
-	return buf.String()
-}
diff --git a/pkg/vm/pipeline/pipeline.go b/pkg/vm/pipeline/pipeline.go
index e7f58f097cb9497f403e8d45bd836b64ae4f9d67..591f392783fdb77c507ddd890ee12674d05ef36e 100644
--- a/pkg/vm/pipeline/pipeline.go
+++ b/pkg/vm/pipeline/pipeline.go
@@ -15,6 +15,12 @@ func New(cs []uint64, attrs []string, ins vm.Instructions) *Pipeline {
 	}
 }
 
+func NewMerge(ins vm.Instructions) *Pipeline {
+	return &Pipeline{
+		ins: ins,
+	}
+}
+
 func (p *Pipeline) String() string {
 	var buf bytes.Buffer
 
@@ -24,6 +30,7 @@ func (p *Pipeline) String() string {
 
 func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, error) {
 	if err := vm.Prepare(p.ins, proc); err != nil {
+		vm.Clean(p.ins, proc)
 		return false, err
 	}
 	for _, seg := range segs {
@@ -36,5 +43,24 @@ func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, erro
 			return end, err
 		}
 	}
+	{
+		proc.Reg.Ax = nil
+		if end, err := vm.Run(p.ins, proc); err != nil || end {
+			return end, err
+		}
+	}
+	return false, nil
+}
+
+func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) {
+	if err := vm.Prepare(p.ins, proc); err != nil {
+		vm.Clean(p.ins, proc)
+		return false, err
+	}
+	for {
+		if end, err := vm.Run(p.ins, proc); err != nil || end {
+			return end, err
+		}
+	}
 	return false, nil
 }
diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go
index a5b09b1c71c0457e336fde71b53b2bfedf52317d..c025c9c95d853e41ea90332d58ad5c20a1342529 100644
--- a/pkg/vm/process/types.go
+++ b/pkg/vm/process/types.go
@@ -3,6 +3,7 @@ package process
 import (
 	"matrixbase/pkg/vm/mempool"
 	"matrixbase/pkg/vm/mmu/guest"
+	"sync"
 )
 
 /*
@@ -15,10 +16,15 @@ type Process interface {
 }
 */
 
+type WaitRegister struct {
+	Wg *sync.WaitGroup
+	Ch chan interface{}
+}
+
 type Register struct {
 	Ax interface{}
 	Ts []interface{}
-	Cs []chan interface{}
+	Ws []*WaitRegister
 }
 
 type Process struct {
diff --git a/pkg/vm/types.go b/pkg/vm/types.go
index f3e86eb1c1071929d145b78063ca3edf8cc69f10..fbdd79550f9bc13d23085735e1253dcf975821db 100644
--- a/pkg/vm/types.go
+++ b/pkg/vm/types.go
@@ -14,13 +14,13 @@ const (
 	SetUnion
 	SetIntersect
 	SetDifference
-	MultisetUnion
-	MultisetIntersect
-	MultisetDifference
-	EqJoin
-	SemiJoin
-	InnerJoin
-	NaturalJoin
+	SetFullJoin
+	SetLeftJoin
+	SetSemiJoin
+	SetInnerJoin
+	SetRightJoin
+	SetNaturalJoin
+	SetSemiDifference // unsuitable name is anti join
 	Output
 	MergeSummarize
 )
diff --git a/pkg/vm/vm.go b/pkg/vm/vm.go
index 48976689aaf9939e5ffe0c1bf72816d45a6c5d20..e6d68e3894e5eabc73e9ca03326731dbd8a1a765 100644
--- a/pkg/vm/vm.go
+++ b/pkg/vm/vm.go
@@ -2,11 +2,13 @@ package vm
 
 import (
 	"bytes"
+	"matrixbase/pkg/sql/colexec/hashset/intersect"
 	"matrixbase/pkg/sql/colexec/limit"
 	"matrixbase/pkg/sql/colexec/offset"
 	"matrixbase/pkg/sql/colexec/output"
 	"matrixbase/pkg/sql/colexec/projection"
 	"matrixbase/pkg/sql/colexec/restrict"
+	"matrixbase/pkg/sql/colexec/transfer"
 	"matrixbase/pkg/vm/process"
 )
 
@@ -25,6 +27,7 @@ func String(ins Instructions, buf *bytes.Buffer) {
 		case Offset:
 			offset.String(in.Arg, buf)
 		case Transfer:
+			transfer.String(in.Arg, buf)
 		case Restrict:
 			restrict.String(in.Arg, buf)
 		case Summarize:
@@ -32,20 +35,35 @@ func String(ins Instructions, buf *bytes.Buffer) {
 			projection.String(in.Arg, buf)
 		case SetUnion:
 		case SetIntersect:
+			intersect.String(in.Arg, buf)
 		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
 		case Output:
 			output.String(in.Arg, buf)
 		}
 	}
 }
 
+func Clean(ins Instructions, proc *process.Process) {
+	for _, in := range ins {
+		switch in.Op {
+		case Nub:
+		case Top:
+		case Limit:
+		case Group:
+		case Order:
+		case Offset:
+		case Transfer:
+		case Restrict:
+		case Summarize:
+		case Projection:
+		case SetUnion:
+		case SetIntersect:
+		case SetDifference:
+		case Output:
+		}
+	}
+}
+
 func Prepare(ins Instructions, proc *process.Process) error {
 	for _, in := range ins {
 		switch in.Op {
@@ -62,6 +80,9 @@ func Prepare(ins Instructions, proc *process.Process) error {
 				return err
 			}
 		case Transfer:
+			if err := transfer.Prepare(proc, in.Arg); err != nil {
+				return err
+			}
 		case Restrict:
 			if err := restrict.Prepare(proc, in.Arg); err != nil {
 				return err
@@ -73,14 +94,10 @@ func Prepare(ins Instructions, proc *process.Process) error {
 			}
 		case SetUnion:
 		case SetIntersect:
+			if err := intersect.Prepare(proc, in.Arg); err != nil {
+				return err
+			}
 		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
 		case Output:
 			if err := output.Prepare(proc, in.Arg); err != nil {
 				return err
@@ -106,6 +123,7 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
 		case Offset:
 			ok, err = offset.Call(proc, in.Arg)
 		case Transfer:
+			ok, err = transfer.Call(proc, in.Arg)
 		case Restrict:
 			ok, err = restrict.Call(proc, in.Arg)
 		case Summarize:
@@ -113,14 +131,8 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
 			ok, err = projection.Call(proc, in.Arg)
 		case SetUnion:
 		case SetIntersect:
+			ok, err = intersect.Call(proc, in.Arg)
 		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
 		case Output:
 			ok, err = output.Call(proc, in.Arg)
 		}