From 0300d399551e1cc3bf84a4527924446fcee3d86c Mon Sep 17 00:00:00 2001
From: nnsgmsone <nnsmgsone@outlook.com>
Date: Fri, 19 Mar 2021 16:08:19 +0800
Subject: [PATCH] Add set module

---
 pkg/container/batch/batch.go                  |   3 +-
 pkg/hash/{joingroup.go => setgroup.go}        | 104 ++------
 pkg/hash/types.go                             |  10 +-
 pkg/sql/colexec/hashgroup/group.go            |   3 +
 pkg/sql/colexec/hashjoin/join.go              | 243 ------------------
 pkg/sql/colexec/hashjoin/types.go             |  44 ----
 pkg/sql/colexec/limit/limit.go                |   3 +
 pkg/sql/colexec/mergesum/summarize.go         |  10 +-
 pkg/sql/colexec/offset/offset.go              |   3 +
 pkg/sql/colexec/output/output.go              |   4 +-
 pkg/sql/colexec/projection/projection.go      |   5 +-
 pkg/sql/colexec/restrict/restrict.go          |   3 +
 pkg/sql/colexec/summarize/summarize.go        |   3 +
 pkg/sql/colexec/top/top.go                    |   3 +
 pkg/sql/colexec/transfer/transfer.go          |  20 +-
 pkg/sql/colexec/transfer/types.go             |   4 +-
 .../colexec/unittest/extendProjection_test.go |   2 +-
 pkg/sql/colexec/unittest/intersect_test.go    |  76 ++++++
 pkg/sql/colexec/unittest/limit_test.go        |   2 +-
 pkg/sql/colexec/unittest/offset_test.go       |   2 +-
 pkg/sql/colexec/unittest/projection_test.go   |   2 +-
 pkg/sql/colexec/unittest/segments.go          |   4 +-
 pkg/sql/join/types.go                         |  14 -
 pkg/vm/engine/memEngine/testEngine.go         | 122 ++++++++-
 pkg/vm/instruction.go                         |  34 ---
 pkg/vm/pipeline/pipeline.go                   |  26 ++
 pkg/vm/process/types.go                       |   8 +-
 pkg/vm/types.go                               |  14 +-
 pkg/vm/vm.go                                  |  54 ++--
 29 files changed, 343 insertions(+), 482 deletions(-)
 rename pkg/hash/{joingroup.go => setgroup.go} (90%)
 delete mode 100644 pkg/sql/colexec/hashjoin/join.go
 delete mode 100644 pkg/sql/colexec/hashjoin/types.go
 create mode 100644 pkg/sql/colexec/unittest/intersect_test.go
 delete mode 100644 pkg/sql/join/types.go
 delete mode 100644 pkg/vm/instruction.go

diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go
index 0c990c8e5..d9d1c4389 100644
--- a/pkg/container/batch/batch.go
+++ b/pkg/container/batch/batch.go
@@ -118,7 +118,7 @@ func (bat *Batch) Reduce(attrs []string, proc *process.Process) {
 		bat.Cow()
 	}
 	for _, attr := range attrs {
-		for i := range bat.Attrs {
+		for i := 0; i < len(bat.Attrs); i++ {
 			if bat.Attrs[i] != attr {
 				continue
 			}
@@ -129,6 +129,7 @@ func (bat *Batch) Reduce(attrs []string, proc *process.Process) {
 				}
 				bat.Vecs = append(bat.Vecs[:i], bat.Vecs[i+1:]...)
 				bat.Attrs = append(bat.Attrs[:i], bat.Attrs[i+1:]...)
+				i--
 			}
 			break
 		}
diff --git a/pkg/hash/joingroup.go b/pkg/hash/setgroup.go
similarity index 90%
rename from pkg/hash/joingroup.go
rename to pkg/hash/setgroup.go
index 353735eb7..ee4b4634a 100644
--- a/pkg/hash/joingroup.go
+++ b/pkg/hash/setgroup.go
@@ -5,30 +5,21 @@ import (
 	"matrixbase/pkg/container/batch"
 	"matrixbase/pkg/container/types"
 	"matrixbase/pkg/container/vector"
-	"matrixbase/pkg/encoding"
 	"matrixbase/pkg/vm/process"
 )
 
-func NewJoinGroup(idx, sel int64) *JoinGroup {
-	return &JoinGroup{
+func NewSetGroup(idx, sel int64) *SetGroup {
+	return &SetGroup{
 		Idx: idx,
 		Sel: sel,
 	}
 }
 
-func (g *JoinGroup) Free(proc *process.Process) {
-	if g.Data != nil {
-		proc.Free(g.Data)
-		g.Data = nil
-	}
-	if g.Idata != nil {
-		proc.Free(g.Idata)
-		g.Idata = nil
-	}
+func (g *SetGroup) Free(_ *process.Process) {
 }
 
-func (g *JoinGroup) Probe(sels, matched []int64, vecs []*vector.Vector,
-	bats []*batch.Batch, diffs []bool, proc *process.Process) ([]int64, []int64, error) {
+func (g *SetGroup) Probe(sels, matched []int64, vecs []*vector.Vector,
+	bats []*batch.Batch, diffs []bool, proc *process.Process) (int64, []int64, error) {
 	for i, vec := range vecs {
 		switch vec.Typ.Oid {
 		case types.T_int8:
@@ -426,21 +417,23 @@ func (g *JoinGroup) Probe(sels, matched []int64, vecs []*vector.Vector,
 		}
 	}
 	n := len(sels)
-	remaining := sels[:0]
 	matched = matched[:0]
+	remaining := sels[:0]
 	for i := 0; i < n; i++ {
 		if diffs[i] {
 			remaining = append(remaining, sels[i])
-		} else {
+		} else if len(matched) == 0 {
 			matched = append(matched, sels[i])
 		}
 	}
-	return matched, remaining, nil
+	if len(matched) == 0 {
+		return -1, remaining, nil
+	}
+	return matched[0], remaining, nil
 }
 
-func (g *JoinGroup) Fill(distinct bool, sels, matched []int64, vecs []*vector.Vector,
+func (g *SetGroup) Fill(sels, matched []int64, vecs []*vector.Vector,
 	bats []*batch.Batch, diffs []bool, proc *process.Process) ([]int64, error) {
-	idx := int64(len(bats) - 1)
 	for i, vec := range vecs {
 		switch vec.Typ.Oid {
 		case types.T_int8:
@@ -839,76 +832,9 @@ func (g *JoinGroup) Fill(distinct bool, sels, matched []int64, vecs []*vector.Ve
 	}
 	n := len(sels)
 	remaining := sels[:0]
-	if !distinct {
-		matched = matched[:0]
-		for i := 0; i < n; i++ {
-			if diffs[i] {
-				remaining = append(remaining, sels[i])
-			} else {
-				matched = append(matched, sels[i])
-			}
-		}
-		if len(matched) > 0 {
-			length := len(g.Sels) + len(matched)
-			if cap(g.Sels) < length {
-				data, err := proc.Alloc(int64(length) * 8)
-				if err != nil {
-					return nil, err
-				}
-				idata, err := proc.Alloc(int64(length) * 8)
-				if err != nil {
-					proc.Free(data)
-					return nil, err
-				}
-				copy(data, g.Data)
-				copy(idata, g.Idata)
-				proc.Free(g.Data)
-				proc.Free(g.Idata)
-				g.Is = encoding.DecodeInt64Slice(idata)
-				g.Sels = encoding.DecodeInt64Slice(data)
-				g.Data = data[:length]
-				g.Sels = g.Sels[:length]
-				g.Is = g.Is[:length]
-				g.Idata = idata[:length]
-			}
-			for _ = range matched {
-				g.Is = append(g.Is, idx)
-			}
-			g.Sels = append(g.Sels, matched...)
-		}
-	} else {
-		if len(g.Sels) > 0 {
-			for i := 0; i < n; i++ {
-				if diffs[i] {
-					remaining = append(remaining, sels[i])
-				}
-			}
-		} else {
-			matched = matched[:0]
-			for i := 0; i < n; i++ {
-				if diffs[i] {
-					remaining = append(remaining, sels[i])
-				} else if len(matched) == 0 {
-					matched = append(matched, sels[i])
-				}
-			}
-			if len(matched) > 0 && cap(g.Sels) == 0 {
-				data, err := proc.Alloc(8)
-				if err != nil {
-					return nil, err
-				}
-				idata, err := proc.Alloc(8)
-				if err != nil {
-					proc.Free(data)
-					return nil, err
-				}
-				g.Data = data
-				g.Idata = idata
-				g.Is = encoding.DecodeInt64Slice(idata)
-				g.Sels = encoding.DecodeInt64Slice(data)
-				g.Is[0] = idx
-				g.Sels[0] = matched[0]
-			}
+	for i := 0; i < n; i++ {
+		if diffs[i] {
+			remaining = append(remaining, sels[i])
 		}
 	}
 	return remaining, nil
diff --git a/pkg/hash/types.go b/pkg/hash/types.go
index ea24b8daf..a92f510c3 100644
--- a/pkg/hash/types.go
+++ b/pkg/hash/types.go
@@ -6,11 +6,7 @@ type Group struct {
 	Sels []int64
 }
 
-type JoinGroup struct {
-	Idx   int64
-	Sel   int64
-	Data  []byte
-	Idata []byte
-	Is    []int64
-	Sels  []int64
+type SetGroup struct {
+	Idx int64
+	Sel int64
 }
diff --git a/pkg/sql/colexec/hashgroup/group.go b/pkg/sql/colexec/hashgroup/group.go
index 9b41a167b..ea754c887 100644
--- a/pkg/sql/colexec/hashgroup/group.go
+++ b/pkg/sql/colexec/hashgroup/group.go
@@ -35,6 +35,7 @@ func Prepare(proc *process.Process, arg interface{}) error {
 	copy(n.Rattrs[len(n.Es):], n.Gs)
 	n.Ctr = Container{
 		diffs:  make([]bool, UnitLimit),
+		matchs: make([]int64, UnitLimit),
 		hashs:  make([]uint64, UnitLimit),
 		sels:   make([][]int64, UnitLimit),
 		groups: make(map[uint64][]*hash.Group),
@@ -187,6 +188,7 @@ func (ctr *Container) unitGroup(start int, count int, sels []int64, vecs []*vect
 				}
 				copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
 			}
+			ctr.sels[ctr.slots.Vs[i][j]] = ctr.sels[ctr.slots.Vs[i][j]][:0]
 		}
 	}
 	ctr.slots.Reset()
@@ -238,4 +240,5 @@ func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
 			g.Free(proc)
 		}
 	}
+	register.FreeRegisters(proc)
 }
diff --git a/pkg/sql/colexec/hashjoin/join.go b/pkg/sql/colexec/hashjoin/join.go
deleted file mode 100644
index eff12eee8..000000000
--- a/pkg/sql/colexec/hashjoin/join.go
+++ /dev/null
@@ -1,243 +0,0 @@
-package hashjoin
-
-import (
-	"matrixbase/pkg/container/batch"
-	"matrixbase/pkg/container/vector"
-	"matrixbase/pkg/hash"
-	"matrixbase/pkg/intmap/fastmap"
-	"matrixbase/pkg/vm/process"
-)
-
-func Prepare(proc *process.Process, arg interface{}) error {
-	return nil
-}
-
-// R ⨝ S, S is the small relation
-func Call(proc *process.Process, arg interface{}) (bool, error) {
-	n := arg.(*Argument)
-	ctr := &n.Ctr
-	if !ctr.builded {
-		if err := ctr.build(n.Sattrs, n.Distinct, proc); err != nil {
-			return false, err
-		}
-		ctr.builded = true
-	}
-	return ctr.probe(n.Rattrs, n.Distinct, proc)
-}
-
-func (ctr *Container) build(attrs []string, distinct bool, proc *process.Process) error {
-	var err error
-
-	ch := proc.Reg.Cs[1]
-	for {
-		v := <-ch
-		if v == nil {
-			break
-		}
-		bat := v.(*batch.Batch)
-		bat.Reorder(attrs)
-		if err = bat.Prefetch(attrs, bat.Vecs, proc); err != nil {
-			return err
-		}
-		ctr.bats = append(ctr.bats, bat)
-		if len(bat.Sels) == 0 {
-			if err = ctr.fillBatch(distinct, bat.Vecs[:len(attrs)], proc); err != nil {
-				return err
-			}
-		} else {
-			if err = ctr.fillBatchSels(distinct, bat.Sels, bat.Vecs[:len(attrs)], proc); err != nil {
-				return err
-			}
-		}
-	}
-	return nil
-}
-
-func (ctr *Container) probe(attrs []string, distinct bool, proc *process.Process) (bool, error) {
-	if bat := ctr.probeState.bat; bat != nil {
-		return false, ctr.probeBatch(distinct, bat.Vecs[:len(attrs)], proc)
-	}
-	v := <-proc.Reg.Cs[0]
-	if v == nil {
-		return true, nil
-	}
-	bat := v.(*batch.Batch)
-	bat.Reorder(attrs)
-	ctr.probeState.bat = bat
-	ctr.probeState.start = 0
-	return false, ctr.probeBatch(distinct, bat.Vecs[:len(attrs)], proc)
-}
-
-func (ctr *Container) fillBatch(distinct bool, vecs []*vector.Vector, proc *process.Process) error {
-	for i, j := 0, vecs[0].Length(); i < j; i += UnitLimit {
-		length := j - i
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.fillUnit(distinct, i, length, nil, vecs, proc); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func (ctr *Container) fillBatchSels(distinct bool, sels []int64, vecs []*vector.Vector, proc *process.Process) error {
-	for i, j := 0, len(sels); i < j; i += UnitLimit {
-		length := j - i
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.fillUnit(distinct, 0, length, sels[i:i+length], vecs, proc); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func (ctr *Container) fillUnit(distinct bool, start, count int, sels []int64,
-	vecs []*vector.Vector, proc *process.Process) error {
-	var err error
-
-	{
-		copy(ctr.hashs[:count], OneUint64s[:count])
-		if len(sels) == 0 {
-			ctr.fillHash(start, count, vecs)
-		} else {
-			ctr.fillHashSels(count, sels, vecs)
-		}
-	}
-	copy(ctr.diffs[:count], ZeroBools[:count])
-	for i, hs := range ctr.slots.Ks {
-		for j, h := range hs {
-			remaining := ctr.sels[ctr.slots.Vs[i][j]]
-			if gs, ok := ctr.groups[h]; ok {
-				for _, g := range gs {
-					if remaining, err = g.Fill(distinct, remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
-						return err
-					}
-					copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
-				}
-			} else {
-				ctr.groups[h] = make([]*hash.JoinGroup, 0, 8)
-			}
-			for len(remaining) > 0 {
-				g := hash.NewJoinGroup(int64(len(ctr.bats)-1), int64(remaining[0]))
-				ctr.groups[h] = append(ctr.groups[h], g)
-				if remaining, err = g.Fill(distinct, remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
-					return err
-				}
-				copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
-			}
-		}
-	}
-	ctr.slots.Reset()
-	return nil
-}
-
-func (ctr *Container) fillHash(start, count int, vecs []*vector.Vector) {
-	ctr.hashs = ctr.hashs[:count]
-	for _, vec := range vecs {
-		hash.Rehash(count, ctr.hashs, vec)
-	}
-	nextslot := 0
-	for i, h := range ctr.hashs {
-		slot, ok := ctr.slots.Get(h)
-		if !ok {
-			slot = nextslot
-			ctr.slots.Set(h, slot)
-			nextslot++
-		}
-		ctr.sels[slot] = append(ctr.sels[slot], int64(i+start))
-	}
-}
-
-func (ctr *Container) fillHashSels(count int, sels []int64, vecs []*vector.Vector) {
-	ctr.hashs = ctr.hashs[:count]
-	for _, vec := range vecs {
-		hash.RehashSels(count, sels, ctr.hashs, vec)
-	}
-	nextslot := 0
-	for i, h := range ctr.hashs {
-		slot, ok := ctr.slots.Get(h)
-		if !ok {
-			slot = nextslot
-			ctr.slots.Set(h, slot)
-			nextslot++
-		}
-		ctr.sels[slot] = append(ctr.sels[slot], sels[i])
-	}
-}
-
-func (ctr *Container) probeBatch(distinct bool, vecs []*vector.Vector, proc *process.Process) error {
-	defer func() { ctr.probeState.size = 0 }()
-	for ; ctr.probeState.start < ctr.probeState.end; ctr.probeState.start += UnitLimit {
-		length := ctr.probeState.end - ctr.probeState.start
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.probeUnit(distinct, ctr.probeState.start, length, nil, vecs, proc); err != nil {
-			return err
-		}
-		if ctr.probeState.size > ctr.probeState.limit {
-			ctr.probeState.start += UnitLimit
-			return nil
-		}
-	}
-	ctr.probeState.bat = nil
-	return nil
-}
-
-func (ctr *Container) probeBatchSels(distinct bool, sels []int64, vecs []*vector.Vector, proc *process.Process) error {
-	defer func() { ctr.probeState.size = 0 }()
-	for ; ctr.probeState.start < ctr.probeState.end; ctr.probeState.start += UnitLimit {
-		length := ctr.probeState.end - ctr.probeState.start
-		if length > UnitLimit {
-			length = UnitLimit
-		}
-		if err := ctr.probeUnit(distinct, 0, length, sels[ctr.probeState.start:ctr.probeState.start+length], vecs, proc); err != nil {
-			return err
-		}
-		if ctr.probeState.size > ctr.probeState.limit {
-			ctr.probeState.start += UnitLimit
-			return nil
-		}
-	}
-	ctr.probeState.bat = nil
-	return nil
-}
-
-func (ctr *Container) probeUnit(distinct bool, start, count int, sels []int64,
-	vecs []*vector.Vector, proc *process.Process) error {
-	var err error
-
-	{
-		copy(ctr.hashs[:count], OneUint64s[:count])
-		if len(sels) == 0 {
-			ctr.fillHash(start, count, vecs)
-		} else {
-			ctr.fillHashSels(count, sels, vecs)
-		}
-	}
-	copy(ctr.diffs[:count], ZeroBools[:count])
-	for i, hs := range ctr.slots.Ks {
-		for j, h := range hs {
-			remaining := ctr.sels[ctr.slots.Vs[i][j]]
-			if gs, ok := ctr.groups[h]; ok {
-				for _, g := range gs {
-					if ctr.matchs, remaining, err = g.Probe(remaining, ctr.matchs, vecs, ctr.bats, ctr.diffs, proc); err != nil {
-						return err
-					}
-					// product
-					copy(ctr.diffs[:len(remaining)], ZeroBools[:len(remaining)])
-				}
-			}
-		}
-	}
-	ctr.slots.Reset()
-	return nil
-}
-
-func (ctr *Container) clean(bat *batch.Batch, proc *process.Process) {
-	bat.Clean(proc)
-	fastmap.Pool.Put(ctr.slots)
-}
diff --git a/pkg/sql/colexec/hashjoin/types.go b/pkg/sql/colexec/hashjoin/types.go
deleted file mode 100644
index 89aa56854..000000000
--- a/pkg/sql/colexec/hashjoin/types.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package hashjoin
-
-import (
-	"matrixbase/pkg/container/batch"
-	"matrixbase/pkg/hash"
-	"matrixbase/pkg/intmap/fastmap"
-	"matrixbase/pkg/sql/join"
-)
-
-const (
-	UnitLimit = 1024
-)
-
-var (
-	ZeroBools  []bool
-	OneUint64s []uint64
-)
-
-type Container struct {
-	builded    bool
-	diffs      []bool
-	matchs     []int64
-	hashs      []uint64
-	sels       [][]int64      // sels
-	slots      *fastmap.Map   // hash code -> sels index
-	bats       []*batch.Batch // s relation
-	probeState struct {
-		size  int
-		limit int
-		start int
-		end   int
-		bat   *batch.Batch
-	}
-	groups map[uint64][]*hash.JoinGroup // hash code -> join list
-}
-
-type Argument struct {
-	Distinct bool
-	Attrs    []string
-	Rattrs   []string
-	Sattrs   []string
-	Ctr      Container
-	JoinType join.JoinType
-}
diff --git a/pkg/sql/colexec/limit/limit.go b/pkg/sql/colexec/limit/limit.go
index e66414993..fe4cd3e3d 100644
--- a/pkg/sql/colexec/limit/limit.go
+++ b/pkg/sql/colexec/limit/limit.go
@@ -20,6 +20,9 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	if length := uint64(len(bat.Sels)); length > 0 {
diff --git a/pkg/sql/colexec/mergesum/summarize.go b/pkg/sql/colexec/mergesum/summarize.go
index 88d46c110..cfe8a5b13 100644
--- a/pkg/sql/colexec/mergesum/summarize.go
+++ b/pkg/sql/colexec/mergesum/summarize.go
@@ -18,10 +18,13 @@ func Prepare(proc *process.Process, arg interface{}) error {
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	n := arg.(*Argument)
-	for i, c := range proc.Reg.Cs {
-		v := <-c
+	for i := 0; i < len(proc.Reg.Ws); i++ {
+		reg := proc.Reg.Ws[i]
+		v := <-reg.Ch
 		if v == nil {
-			proc.Reg.Cs = append(proc.Reg.Cs[:i], proc.Reg.Cs[i:]...)
+			reg.Wg.Done()
+			proc.Reg.Ws = append(proc.Reg.Ws[:i], proc.Reg.Ws[i:]...)
+			i--
 			continue
 		}
 		bat := v.(*batch.Batch)
@@ -34,6 +37,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
 				return false, err
 			}
 		}
+		reg.Wg.Done()
 		bat.Clean(proc)
 	}
 	bat := batch.New(true, n.Attrs)
diff --git a/pkg/sql/colexec/offset/offset.go b/pkg/sql/colexec/offset/offset.go
index 73412d442..73bc5de0e 100644
--- a/pkg/sql/colexec/offset/offset.go
+++ b/pkg/sql/colexec/offset/offset.go
@@ -20,6 +20,9 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	if n.Seen > n.Offset {
diff --git a/pkg/sql/colexec/output/output.go b/pkg/sql/colexec/output/output.go
index 533ae43b3..feed54e5e 100644
--- a/pkg/sql/colexec/output/output.go
+++ b/pkg/sql/colexec/output/output.go
@@ -16,8 +16,8 @@ func Prepare(_ *process.Process, _ interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
-	bat := proc.Reg.Ax.(*batch.Batch)
-	if bat != nil {
+	if proc.Reg.Ax != nil {
+		bat := proc.Reg.Ax.(*batch.Batch)
 		fmt.Printf("%s\n", bat)
 		bat.Clean(proc)
 	}
diff --git a/pkg/sql/colexec/projection/projection.go b/pkg/sql/colexec/projection/projection.go
index efccc39bb..a06d774ba 100644
--- a/pkg/sql/colexec/projection/projection.go
+++ b/pkg/sql/colexec/projection/projection.go
@@ -27,9 +27,12 @@ func Prepare(_ *process.Process, _ interface{}) error {
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	var err error
 
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
-	rbat := batch.New(true, n.Attrs)
 	bat := proc.Reg.Ax.(*batch.Batch)
+	rbat := batch.New(true, n.Attrs)
 	for i := range n.Attrs {
 		if rbat.Vecs[i], _, err = n.Es[i].Eval(bat, proc); err != nil {
 			rbat.Vecs = rbat.Vecs[:i]
diff --git a/pkg/sql/colexec/restrict/restrict.go b/pkg/sql/colexec/restrict/restrict.go
index ef90d1ee3..4db09336c 100644
--- a/pkg/sql/colexec/restrict/restrict.go
+++ b/pkg/sql/colexec/restrict/restrict.go
@@ -20,6 +20,9 @@ func Prepare(_ *process.Process, arg interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	vec, _, err := n.E.Eval(bat, proc)
diff --git a/pkg/sql/colexec/summarize/summarize.go b/pkg/sql/colexec/summarize/summarize.go
index 28d1b33d9..44abb7f11 100644
--- a/pkg/sql/colexec/summarize/summarize.go
+++ b/pkg/sql/colexec/summarize/summarize.go
@@ -17,6 +17,9 @@ func Prepare(proc *process.Process, arg interface{}) error {
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(*Argument)
 	rbat := batch.New(true, n.Attrs)
 	bat := proc.Reg.Ax.(*batch.Batch)
diff --git a/pkg/sql/colexec/top/top.go b/pkg/sql/colexec/top/top.go
index 2c89a424b..d3eb2a6dc 100644
--- a/pkg/sql/colexec/top/top.go
+++ b/pkg/sql/colexec/top/top.go
@@ -43,6 +43,9 @@ func Prepare(proc *process.Process, arg interface{}) error {
 func Call(proc *process.Process, arg interface{}) (bool, error) {
 	var err error
 
+	if proc.Reg.Ax == nil {
+		return false, nil
+	}
 	n := arg.(Argument)
 	bat := proc.Reg.Ax.(*batch.Batch)
 	if err = bat.Prefetch(n.Attrs, n.Ctr.vecs, proc); err != nil {
diff --git a/pkg/sql/colexec/transfer/transfer.go b/pkg/sql/colexec/transfer/transfer.go
index 810935935..065a20a65 100644
--- a/pkg/sql/colexec/transfer/transfer.go
+++ b/pkg/sql/colexec/transfer/transfer.go
@@ -1,20 +1,30 @@
 package transfer
 
 import (
+	"bytes"
 	"matrixbase/pkg/container/batch"
 	"matrixbase/pkg/vm/process"
 )
 
+func String(_ interface{}, buf *bytes.Buffer) {
+	buf.WriteString("transfer")
+}
+
 func Prepare(_ *process.Process, _ interface{}) error {
 	return nil
 }
 
 func Call(proc *process.Process, arg interface{}) (bool, error) {
-	n := arg.(Argument)
-	bat := proc.Reg.Ax.(*batch.Batch)
-	if bat != nil {
-		n.Ch <- bat
+	reg := arg.(*Argument).Reg
+	if reg.Ch == nil {
+		if proc.Reg.Ax != nil {
+			bat := proc.Reg.Ax.(*batch.Batch)
+			bat.Clean(proc)
+		}
+		return true, nil
 	}
-	n.Ch <- nil
+	reg.Wg.Add(1)
+	reg.Ch <- proc.Reg.Ax
+	reg.Wg.Wait()
 	return false, nil
 }
diff --git a/pkg/sql/colexec/transfer/types.go b/pkg/sql/colexec/transfer/types.go
index 69612851f..46c9c660b 100644
--- a/pkg/sql/colexec/transfer/types.go
+++ b/pkg/sql/colexec/transfer/types.go
@@ -1,5 +1,7 @@
 package transfer
 
+import "matrixbase/pkg/vm/process"
+
 type Argument struct {
-	Ch chan interface{}
+	Reg *process.WaitRegister
 }
diff --git a/pkg/sql/colexec/unittest/extendProjection_test.go b/pkg/sql/colexec/unittest/extendProjection_test.go
index 8afc1b342..f379058bf 100644
--- a/pkg/sql/colexec/unittest/extendProjection_test.go
+++ b/pkg/sql/colexec/unittest/extendProjection_test.go
@@ -35,7 +35,7 @@ func TestExtendProjection(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1}, []string{"price"}, ins)
 	fmt.Printf("%s\n", p)
-	if _, err := p.Run(segments(proc), proc); err != nil {
+	if _, err := p.Run(segments("R", proc), proc); err != nil {
 		log.Fatal(err)
 	}
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
diff --git a/pkg/sql/colexec/unittest/intersect_test.go b/pkg/sql/colexec/unittest/intersect_test.go
new file mode 100644
index 000000000..e8fbb6c52
--- /dev/null
+++ b/pkg/sql/colexec/unittest/intersect_test.go
@@ -0,0 +1,76 @@
+package unittest
+
+import (
+	"fmt"
+	"matrixbase/pkg/sql/colexec/hashset/intersect"
+	"matrixbase/pkg/sql/colexec/transfer"
+	"matrixbase/pkg/vm"
+	"matrixbase/pkg/vm/mempool"
+	"matrixbase/pkg/vm/mmu/guest"
+	"matrixbase/pkg/vm/mmu/host"
+	"matrixbase/pkg/vm/pipeline"
+	"matrixbase/pkg/vm/process"
+	"sync"
+	"testing"
+)
+
+func TestIntersect(t *testing.T) {
+	var wg sync.WaitGroup
+	var ins vm.Instructions
+
+	hm := host.New(1 << 20)
+	proc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
+	{
+		proc.Refer = make(map[string]uint64)
+		proc.Reg.Ws = make([]*process.WaitRegister, 2)
+		for i := 0; i < 2; i++ {
+			proc.Reg.Ws[i] = &process.WaitRegister{
+				Wg: new(sync.WaitGroup),
+				Ch: make(chan interface{}),
+			}
+		}
+	}
+	{
+		var rins vm.Instructions
+
+		rproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
+		{
+			rproc.Refer = make(map[string]uint64)
+		}
+		rins = append(rins, vm.Instruction{vm.Transfer, &transfer.Argument{proc.Reg.Ws[0]}})
+		rp := pipeline.New([]uint64{1}, []string{"orderId"}, rins)
+		wg.Add(1)
+		go func() {
+			fmt.Printf("R: %s\n", rp)
+			rp.Run(segments("R", rproc), rproc)
+			fmt.Printf("R - guest: %v, host: %v\n", rproc.Size(), rproc.HostSize())
+			wg.Done()
+		}()
+	}
+	{
+		var sins vm.Instructions
+
+		sproc := process.New(guest.New(1<<20, hm), mempool.New(1<<32, 8))
+		{
+			sproc.Refer = make(map[string]uint64)
+		}
+		sins = append(sins, vm.Instruction{vm.Transfer, &transfer.Argument{proc.Reg.Ws[1]}})
+		sp := pipeline.New([]uint64{1}, []string{"orderId"}, sins)
+		wg.Add(1)
+		go func() {
+			fmt.Printf("S: %s\n", sp)
+			sp.Run(segments("S", sproc), sproc)
+			fmt.Printf("S - guest: %v, host: %v\n", sproc.Size(), sproc.HostSize())
+			wg.Done()
+		}()
+	}
+	{
+		ins = append(ins, vm.Instruction{vm.SetIntersect, &intersect.Argument{}})
+		ins = append(ins, vm.Instruction{vm.Output, nil})
+	}
+	p := pipeline.NewMerge(ins)
+	fmt.Printf("%s\n", p)
+	p.RunMerge(proc)
+	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
+	wg.Wait()
+}
diff --git a/pkg/sql/colexec/unittest/limit_test.go b/pkg/sql/colexec/unittest/limit_test.go
index dd30fcd39..19950ebfa 100644
--- a/pkg/sql/colexec/unittest/limit_test.go
+++ b/pkg/sql/colexec/unittest/limit_test.go
@@ -36,6 +36,6 @@ func TestLimit(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
 	fmt.Printf("%s\n", p)
-	p.Run(segments(proc), proc)
+	p.Run(segments("R", proc), proc)
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
 }
diff --git a/pkg/sql/colexec/unittest/offset_test.go b/pkg/sql/colexec/unittest/offset_test.go
index 61af304ad..804c54bf9 100644
--- a/pkg/sql/colexec/unittest/offset_test.go
+++ b/pkg/sql/colexec/unittest/offset_test.go
@@ -36,6 +36,6 @@ func TestOffset(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
 	fmt.Printf("%s\n", p)
-	p.Run(segments(proc), proc)
+	p.Run(segments("R", proc), proc)
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
 }
diff --git a/pkg/sql/colexec/unittest/projection_test.go b/pkg/sql/colexec/unittest/projection_test.go
index ab1ae3823..6823b5b5c 100644
--- a/pkg/sql/colexec/unittest/projection_test.go
+++ b/pkg/sql/colexec/unittest/projection_test.go
@@ -32,6 +32,6 @@ func TestProjection(t *testing.T) {
 	}
 	p := pipeline.New([]uint64{1, 1}, []string{"uid", "orderId"}, ins)
 	fmt.Printf("%s\n", p)
-	p.Run(segments(proc), proc)
+	p.Run(segments("R", proc), proc)
 	fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize())
 }
diff --git a/pkg/sql/colexec/unittest/segments.go b/pkg/sql/colexec/unittest/segments.go
index 336355f3a..b49c84a6f 100644
--- a/pkg/sql/colexec/unittest/segments.go
+++ b/pkg/sql/colexec/unittest/segments.go
@@ -7,9 +7,9 @@ import (
 	"matrixbase/pkg/vm/process"
 )
 
-func segments(proc *process.Process) []engine.Segment {
+func segments(name string, proc *process.Process) []engine.Segment {
 	e := memEngine.NewTestEngine()
-	r, err := e.Relation("test")
+	r, err := e.Relation(name)
 	if err != nil {
 		log.Fatal(err)
 	}
diff --git a/pkg/sql/join/types.go b/pkg/sql/join/types.go
deleted file mode 100644
index c3611093a..000000000
--- a/pkg/sql/join/types.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package join
-
-const (
-	Inner = iota
-	Left
-	Right
-	Full
-	Semi
-	Anti
-	ExceptAll
-	IntersectAll
-)
-
-type JoinType uint8
diff --git a/pkg/vm/engine/memEngine/testEngine.go b/pkg/vm/engine/memEngine/testEngine.go
index 7db0995f3..82165ba42 100644
--- a/pkg/vm/engine/memEngine/testEngine.go
+++ b/pkg/vm/engine/memEngine/testEngine.go
@@ -14,6 +14,12 @@ import (
 
 func NewTestEngine() engine.Engine {
 	e := New(kv.New())
+	CreateR(e)
+	CreateS(e)
+	return e
+}
+
+func CreateR(e engine.Engine) {
 	{
 		var attrs []metadata.Attribute
 
@@ -34,11 +40,11 @@ func NewTestEngine() engine.Engine {
 				Type: types.Type{types.T(types.T_float64), 8, 8, 0},
 			})
 		}
-		if err := e.Create("test", attrs); err != nil {
+		if err := e.Create("R", attrs); err != nil {
 			log.Fatal(err)
 		}
 	}
-	r, err := e.Relation("test")
+	r, err := e.Relation("R")
 	if err != nil {
 		log.Fatal(err)
 	}
@@ -122,5 +128,115 @@ func NewTestEngine() engine.Engine {
 			log.Fatal(err)
 		}
 	}
-	return e
+}
+
+func CreateS(e engine.Engine) {
+	{
+		var attrs []metadata.Attribute
+
+		{
+			attrs = append(attrs, metadata.Attribute{
+				Alg:  compress.Lz4,
+				Name: "orderId",
+				Type: types.Type{types.T(types.T_varchar), 24, 0, 0},
+			})
+			attrs = append(attrs, metadata.Attribute{
+				Alg:  compress.Lz4,
+				Name: "uid",
+				Type: types.Type{types.T(types.T_varchar), 24, 0, 0},
+			})
+			attrs = append(attrs, metadata.Attribute{
+				Alg:  compress.Lz4,
+				Name: "price",
+				Type: types.Type{types.T(types.T_float64), 8, 8, 0},
+			})
+		}
+		if err := e.Create("S", attrs); err != nil {
+			log.Fatal(err)
+		}
+	}
+	r, err := e.Relation("S")
+	if err != nil {
+		log.Fatal(err)
+	}
+	{
+		bat := batch.New(true, []string{"orderId", "uid", "price"})
+		{
+			{
+				vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+				vs := make([][]byte, 10)
+				for i := 0; i < 10; i++ {
+					vs[i] = []byte(fmt.Sprintf("%v", i*2))
+				}
+				if err := vec.Append(vs); err != nil {
+					log.Fatal(err)
+				}
+				bat.Vecs[0] = vec
+			}
+			{
+				vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+				vs := make([][]byte, 10)
+				for i := 0; i < 10; i++ {
+					vs[i] = []byte(fmt.Sprintf("%v", i%2))
+				}
+				if err := vec.Append(vs); err != nil {
+					log.Fatal(err)
+				}
+				bat.Vecs[1] = vec
+			}
+			{
+				vec := vector.New(types.Type{types.T(types.T_float64), 8, 8, 0})
+				vs := make([]float64, 10)
+				for i := 0; i < 10; i++ {
+					vs[i] = float64(i)
+				}
+				if err := vec.Append(vs); err != nil {
+					log.Fatal(err)
+				}
+				bat.Vecs[2] = vec
+			}
+		}
+		if err := r.Write(bat); err != nil {
+			log.Fatal(err)
+		}
+	}
+	{
+		bat := batch.New(true, []string{"orderId", "uid", "price"})
+		{
+			vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+			vs := make([][]byte, 10)
+			for i := 10; i < 20; i++ {
+				vs[i-10] = []byte(fmt.Sprintf("%v", i*2))
+			}
+			if err := vec.Append(vs); err != nil {
+				log.Fatal(err)
+			}
+			bat.Vecs[0] = vec
+		}
+		{
+			vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0})
+			vs := make([][]byte, 10)
+			for i := 10; i < 20; i++ {
+				vs[i-10] = []byte(fmt.Sprintf("%v", i%2))
+			}
+			if err := vec.Append(vs); err != nil {
+				log.Fatal(err)
+			}
+			bat.Vecs[1] = vec
+		}
+		{
+			vec := vector.New(types.Type{types.T(types.T_float64), 8, 8, 0})
+			vs := make([]float64, 10)
+			for i := 10; i < 20; i++ {
+				vs[i-10] = float64(i)
+			}
+			if err := vec.Append(vs); err != nil {
+				log.Fatal(err)
+			}
+			bat.Vecs[2] = vec
+		}
+		if err := r.Write(bat); err != nil {
+			log.Fatal(err)
+		}
+	}
 }
diff --git a/pkg/vm/instruction.go b/pkg/vm/instruction.go
deleted file mode 100644
index cbffab2eb..000000000
--- a/pkg/vm/instruction.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package vm
-
-import (
-	"bytes"
-)
-
-func (ins Instructions) String() string {
-	var buf bytes.Buffer
-
-	for _, in := range ins {
-		switch in.Op {
-		case Nub:
-		case Top:
-		case Limit:
-		case Group:
-		case Order:
-		case Transfer:
-		case Restrict:
-		case Summarize:
-		case Projection:
-		case SetUnion:
-		case SetIntersect:
-		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
-		}
-	}
-	return buf.String()
-}
diff --git a/pkg/vm/pipeline/pipeline.go b/pkg/vm/pipeline/pipeline.go
index e7f58f097..591f39278 100644
--- a/pkg/vm/pipeline/pipeline.go
+++ b/pkg/vm/pipeline/pipeline.go
@@ -15,6 +15,12 @@ func New(cs []uint64, attrs []string, ins vm.Instructions) *Pipeline {
 	}
 }
 
+func NewMerge(ins vm.Instructions) *Pipeline {
+	return &Pipeline{
+		ins: ins,
+	}
+}
+
 func (p *Pipeline) String() string {
 	var buf bytes.Buffer
 
@@ -24,6 +30,7 @@ func (p *Pipeline) String() string {
 
 func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, error) {
 	if err := vm.Prepare(p.ins, proc); err != nil {
+		vm.Clean(p.ins, proc)
 		return false, err
 	}
 	for _, seg := range segs {
@@ -36,5 +43,24 @@ func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, erro
 			return end, err
 		}
 	}
+	{
+		proc.Reg.Ax = nil
+		if end, err := vm.Run(p.ins, proc); err != nil || end {
+			return end, err
+		}
+	}
+	return false, nil
+}
+
+func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) {
+	if err := vm.Prepare(p.ins, proc); err != nil {
+		vm.Clean(p.ins, proc)
+		return false, err
+	}
+	for {
+		if end, err := vm.Run(p.ins, proc); err != nil || end {
+			return end, err
+		}
+	}
 	return false, nil
 }
diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go
index a5b09b1c7..c025c9c95 100644
--- a/pkg/vm/process/types.go
+++ b/pkg/vm/process/types.go
@@ -3,6 +3,7 @@ package process
 import (
 	"matrixbase/pkg/vm/mempool"
 	"matrixbase/pkg/vm/mmu/guest"
+	"sync"
 )
 
 /*
@@ -15,10 +16,15 @@ type Process interface {
 }
 */
 
+type WaitRegister struct {
+	Wg *sync.WaitGroup
+	Ch chan interface{}
+}
+
 type Register struct {
 	Ax interface{}
 	Ts []interface{}
-	Cs []chan interface{}
+	Ws []*WaitRegister
 }
 
 type Process struct {
diff --git a/pkg/vm/types.go b/pkg/vm/types.go
index f3e86eb1c..fbdd79550 100644
--- a/pkg/vm/types.go
+++ b/pkg/vm/types.go
@@ -14,13 +14,13 @@ const (
 	SetUnion
 	SetIntersect
 	SetDifference
-	MultisetUnion
-	MultisetIntersect
-	MultisetDifference
-	EqJoin
-	SemiJoin
-	InnerJoin
-	NaturalJoin
+	SetFullJoin
+	SetLeftJoin
+	SetSemiJoin
+	SetInnerJoin
+	SetRightJoin
+	SetNaturalJoin
+	SetSemiDifference // unsuitable name is anti join
 	Output
 	MergeSummarize
 )
diff --git a/pkg/vm/vm.go b/pkg/vm/vm.go
index 48976689a..e6d68e389 100644
--- a/pkg/vm/vm.go
+++ b/pkg/vm/vm.go
@@ -2,11 +2,13 @@ package vm
 
 import (
 	"bytes"
+	"matrixbase/pkg/sql/colexec/hashset/intersect"
 	"matrixbase/pkg/sql/colexec/limit"
 	"matrixbase/pkg/sql/colexec/offset"
 	"matrixbase/pkg/sql/colexec/output"
 	"matrixbase/pkg/sql/colexec/projection"
 	"matrixbase/pkg/sql/colexec/restrict"
+	"matrixbase/pkg/sql/colexec/transfer"
 	"matrixbase/pkg/vm/process"
 )
 
@@ -25,6 +27,7 @@ func String(ins Instructions, buf *bytes.Buffer) {
 		case Offset:
 			offset.String(in.Arg, buf)
 		case Transfer:
+			transfer.String(in.Arg, buf)
 		case Restrict:
 			restrict.String(in.Arg, buf)
 		case Summarize:
@@ -32,20 +35,35 @@ func String(ins Instructions, buf *bytes.Buffer) {
 			projection.String(in.Arg, buf)
 		case SetUnion:
 		case SetIntersect:
+			intersect.String(in.Arg, buf)
 		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
 		case Output:
 			output.String(in.Arg, buf)
 		}
 	}
 }
 
+func Clean(ins Instructions, proc *process.Process) {
+	for _, in := range ins {
+		switch in.Op {
+		case Nub:
+		case Top:
+		case Limit:
+		case Group:
+		case Order:
+		case Offset:
+		case Transfer:
+		case Restrict:
+		case Summarize:
+		case Projection:
+		case SetUnion:
+		case SetIntersect:
+		case SetDifference:
+		case Output:
+		}
+	}
+}
+
 func Prepare(ins Instructions, proc *process.Process) error {
 	for _, in := range ins {
 		switch in.Op {
@@ -62,6 +80,9 @@ func Prepare(ins Instructions, proc *process.Process) error {
 				return err
 			}
 		case Transfer:
+			if err := transfer.Prepare(proc, in.Arg); err != nil {
+				return err
+			}
 		case Restrict:
 			if err := restrict.Prepare(proc, in.Arg); err != nil {
 				return err
@@ -73,14 +94,10 @@ func Prepare(ins Instructions, proc *process.Process) error {
 			}
 		case SetUnion:
 		case SetIntersect:
+			if err := intersect.Prepare(proc, in.Arg); err != nil {
+				return err
+			}
 		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
 		case Output:
 			if err := output.Prepare(proc, in.Arg); err != nil {
 				return err
@@ -106,6 +123,7 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
 		case Offset:
 			ok, err = offset.Call(proc, in.Arg)
 		case Transfer:
+			ok, err = transfer.Call(proc, in.Arg)
 		case Restrict:
 			ok, err = restrict.Call(proc, in.Arg)
 		case Summarize:
@@ -113,14 +131,8 @@ func Run(ins Instructions, proc *process.Process) (bool, error) {
 			ok, err = projection.Call(proc, in.Arg)
 		case SetUnion:
 		case SetIntersect:
+			ok, err = intersect.Call(proc, in.Arg)
 		case SetDifference:
-		case MultisetUnion:
-		case MultisetIntersect:
-		case MultisetDifference:
-		case EqJoin:
-		case SemiJoin:
-		case InnerJoin:
-		case NaturalJoin:
 		case Output:
 			ok, err = output.Call(proc, in.Arg)
 		}
-- 
GitLab