diff --git a/src/context/ExecutionContext.cpp b/src/context/ExecutionContext.cpp
index ec511f63e13726c08734acf2f842814eac681f74..dd8c05c107d4bd7c9ffd5270e010c55c1e0d35ef 100644
--- a/src/context/ExecutionContext.cpp
+++ b/src/context/ExecutionContext.cpp
@@ -8,6 +8,8 @@
 
 namespace nebula {
 namespace graph {
+constexpr int64_t ExecutionContext::kLatestVersion;
+constexpr int64_t ExecutionContext::kOldestVersion;
 
 void ExecutionContext::setValue(const std::string& name, Value&& val) {
     ResultBuilder builder;
@@ -65,7 +67,19 @@ const Result& ExecutionContext::getResult(const std::string& name) const {
     }
 }
 
-const std::vector<Result>& ExecutionContext::getHistory(const std::string& name) const {
+const Result& ExecutionContext::getVersionedResult(const std::string& name,
+                                                   int64_t version) const {
+    auto& result = getHistory(name);
+    auto size = result.size();
+    if (static_cast<size_t>(std::abs(version)) >= size) {
+        return Result::kEmptyResult;
+    } else {
+        return result[(size + version - 1) % size ];
+    }
+}
+
+const std::vector<Result>& ExecutionContext::getHistory(
+    const std::string& name) const {
     auto it = valueMap_.find(name);
     if (it != valueMap_.end()) {
         return it->second;
diff --git a/src/context/ExecutionContext.h b/src/context/ExecutionContext.h
index af7a577bc05054ed671e0c569bed42e8e4f63002..25160e4769b56838e703314509949cb9b90707d8 100644
--- a/src/context/ExecutionContext.h
+++ b/src/context/ExecutionContext.h
@@ -31,6 +31,11 @@ class QueryInstance;
  **************************************************************************/
 class ExecutionContext {
 public:
+    // 0 is the latest, -1 is the preveos one, and so on
+    // 1 is the oldest, 2 is the second elder, and so on
+    static constexpr int64_t kLatestVersion = 0;
+    static constexpr int64_t kOldestVersion = 1;
+
     ExecutionContext() = default;
 
     virtual ~ExecutionContext() = default;
@@ -44,6 +49,8 @@ public:
 
     const Result& getResult(const std::string& name) const;
 
+    const Result& getVersionedResult(const std::string& name, int64_t version) const;
+
     size_t numVersions(const std::string& name) const;
 
     // Return all existing history of the value. The front is the latest value
diff --git a/src/context/QueryExpressionContext.cpp b/src/context/QueryExpressionContext.cpp
index 9283ed06add6842f077fb08c82e09958a8038149..24f464d77a265028a7ba2d6bfcac318594279ee9 100644
--- a/src/context/QueryExpressionContext.cpp
+++ b/src/context/QueryExpressionContext.cpp
@@ -22,15 +22,7 @@ const Value& QueryExpressionContext::getVersionedVar(const std::string& var,
     if (ectx_ == nullptr) {
         return Value::kEmpty;
     }
-    auto& result = ectx_->getHistory(var);
-    auto size = result.size();
-    if (version <= 0 && static_cast<size_t>(std::abs(version)) < size) {
-        return result[size + version -1].value();
-    } else if (version > 0 && static_cast<size_t>(version) <= size) {
-        return result[version - 1].value();
-    } else {
-        return Value::kEmpty;
-    }
+    return ectx_->getVersionedResult(var, version).value();
 }
 
 const Value& QueryExpressionContext::getVarProp(const std::string& var,
diff --git a/src/context/ValidateContext.h b/src/context/ValidateContext.h
index ea5d954c331e71da4b10919d1d8464d447fcfe48..e221034efaca39b424dc1ca9c996b59e38e67812 100644
--- a/src/context/ValidateContext.h
+++ b/src/context/ValidateContext.h
@@ -11,7 +11,8 @@
 #include "common/datatypes/Value.h"
 #include "common/charset/Charset.h"
 #include "planner/ExecutionPlan.h"
-#include "util/AnnoVarGenerator.h"
+#include "util/AnonVarGenerator.h"
+#include "util/AnonColGenerator.h"
 
 namespace nebula {
 namespace graph {
@@ -27,7 +28,8 @@ struct SpaceDescription {
 class ValidateContext final {
 public:
     ValidateContext() {
-        varGen_ = std::make_unique<AnnoVarGenerator>();
+        anonVarGen_ = std::make_unique<AnonVarGenerator>();
+        anonColGen_ = std::make_unique<AnonColGenerator>();
     }
 
     void switchToSpace(std::string spaceName, GraphSpaceID spaceId) {
@@ -69,8 +71,12 @@ public:
         return spaces_.back();
     }
 
-    AnnoVarGenerator* varGen() const {
-        return varGen_.get();
+    AnonVarGenerator* anonVarGen() const {
+        return anonVarGen_.get();
+    }
+
+    AnonColGenerator* anonColGen() const {
+        return anonColGen_.get();
     }
 
     void addSchema(const std::string& name,
@@ -91,7 +97,8 @@ private:
     std::vector<SpaceDescription>                       spaces_;
     // vars_ saves all named variable
     std::unordered_map<std::string, ColsDef>            vars_;
-    std::unique_ptr<AnnoVarGenerator>                   varGen_;
+    std::unique_ptr<AnonVarGenerator>                   anonVarGen_;
+    std::unique_ptr<AnonColGenerator>                   anonColGen_;
     using Schemas = std::unordered_map<std::string,
           std::shared_ptr<const meta::NebulaSchemaProvider>>;
     Schemas                                             schemas_;
diff --git a/src/exec/Executor.cpp b/src/exec/Executor.cpp
index 223bac6478688717fd4f17dc679e5504ed50657c..9eab0f8d25c852281a59630604f465a69b2d83a3 100644
--- a/src/exec/Executor.cpp
+++ b/src/exec/Executor.cpp
@@ -37,6 +37,7 @@
 #include "exec/query/ReadIndexExecutor.h"
 #include "exec/query/SortExecutor.h"
 #include "exec/query/UnionExecutor.h"
+#include "exec/query/DataJoinExecutor.h"
 #include "planner/Admin.h"
 #include "planner/Maintain.h"
 #include "planner/Mutate.h"
@@ -357,6 +358,13 @@ Executor *Executor::makeExecutor(const PlanNode *node,
             exec->addDependent(input);
             break;
         }
+        case PlanNode::Kind::kDataJoin: {
+            auto dataJoin = asNode<DataJoin>(node);
+            auto input = makeExecutor(dataJoin->dep(), qctx, visited);
+            exec = new DataJoinExecutor(dataJoin, qctx);
+            exec->addDependent(input);
+            break;
+        }
         case PlanNode::Kind::kUnknown:
         default:
             LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
diff --git a/src/exec/logic/LoopExecutor.cpp b/src/exec/logic/LoopExecutor.cpp
index 9d66374dd88f4879345527137500a930b6f492af..1ae58dc9f274621627405c57850745368952dd9b 100644
--- a/src/exec/logic/LoopExecutor.cpp
+++ b/src/exec/logic/LoopExecutor.cpp
@@ -26,6 +26,7 @@ folly::Future<Status> LoopExecutor::execute() {
     Expression *expr = loopNode->condition();
     QueryExpressionContext ctx(ectx_, nullptr);
     auto value = expr->eval(ctx);
+    VLOG(1) << "Loop condition: " << value;
     DCHECK(value.isBool());
     return finish(ResultBuilder().value(std::move(value)).iter(Iterator::Kind::kDefault).finish());
 }
diff --git a/src/exec/query/DataJoinExecutor.cpp b/src/exec/query/DataJoinExecutor.cpp
index bda208839b2b34e5ff2425fbe2750f0811a5105b..2928f73a0d3da3d28c902543c522f3fe58b80d3a 100644
--- a/src/exec/query/DataJoinExecutor.cpp
+++ b/src/exec/query/DataJoinExecutor.cpp
@@ -23,15 +23,25 @@ folly::Future<Status> DataJoinExecutor::execute() {
 folly::Future<Status> DataJoinExecutor::doInnerJoin() {
     auto* dataJoin = asNode<DataJoin>(node());
 
-    auto lhsIter = ectx_->getResult(dataJoin->vars().first).iter();
+    VLOG(1) << "lhs hist: " << ectx_->getHistory(dataJoin->leftVar().first).size();
+    VLOG(1) << "rhs hist: " << ectx_->getHistory(dataJoin->rightVar().first).size();
+    auto lhsIter = ectx_
+                       ->getVersionedResult(dataJoin->leftVar().first,
+                                            dataJoin->leftVar().second)
+                       .iter();
     DCHECK(!!lhsIter);
+    VLOG(1) << "lhs: " << dataJoin->leftVar().first << " " << lhsIter->size();
     if (!lhsIter->isSequentialIter() && !lhsIter->isJoinIter()) {
         std::stringstream ss;
         ss << "Join executor does not support " << lhsIter->kind();
         return error(Status::Error(ss.str()));
     }
-    auto rhsIter = ectx_->getResult(dataJoin->vars().second).iter();
+    auto rhsIter = ectx_
+                       ->getVersionedResult(dataJoin->rightVar().first,
+                                            dataJoin->rightVar().second)
+                       .iter();
     DCHECK(!!rhsIter);
+    VLOG(1) << "rhs: " << dataJoin->rightVar().first << " " << rhsIter->size();
     if (!rhsIter->isSequentialIter() && !rhsIter->isJoinIter()) {
         std::stringstream ss;
         ss << "Join executor does not support " << lhsIter->kind();
@@ -68,6 +78,7 @@ void DataJoinExecutor::buildHashTable(const std::vector<Expression*>& hashKeys,
             list.values.emplace_back(std::move(val));
         }
 
+        VLOG(1) << "key: " << list;
         hashTable_->add(std::move(list), iter->row());
     }
 }
@@ -83,6 +94,7 @@ void DataJoinExecutor::probe(const std::vector<Expression*>& probeKeys,
             list.values.emplace_back(std::move(val));
         }
 
+        VLOG(1) << "probe: " << list;
         auto range = hashTable_->get(list);
         for (auto i = range.first; i != range.second; ++i) {
             auto row = i->second;
@@ -99,6 +111,7 @@ void DataJoinExecutor::probe(const std::vector<Expression*>& probeKeys,
             size_t size = row->size() + probeIter->row()->size();
             JoinIter::LogicalRowJoin newRow(std::move(values), size,
                                         &resultIter->getColIdxIndices());
+            VLOG(1) << node()->varName() << " : " << newRow;
             resultIter->addRow(std::move(newRow));
         }
     }
diff --git a/src/exec/query/GetNeighborsExecutor.cpp b/src/exec/query/GetNeighborsExecutor.cpp
index d5f2dfe1b4fd266f675ebd015c21393c110ad26d..4f894fe50e4874c7d13630179f2a5017cf815235 100644
--- a/src/exec/query/GetNeighborsExecutor.cpp
+++ b/src/exec/query/GetNeighborsExecutor.cpp
@@ -35,6 +35,7 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
 
 Status GetNeighborsExecutor::buildRequestDataSet() {
     auto& inputVar = gn_->inputVar();
+    VLOG(1) << node()->varName() << " : " << inputVar;
     auto& inputResult = ectx_->getResult(inputVar);
     auto iter = inputResult.iter();
     QueryExpressionContext ctx(ectx_, iter.get());
@@ -58,7 +59,11 @@ Status GetNeighborsExecutor::buildRequestDataSet() {
 folly::Future<Status> GetNeighborsExecutor::getNeighbors() {
     if (reqDs_.rows.empty()) {
         LOG(INFO) << "Empty input.";
-        return folly::makeFuture(Status::OK());
+        DataSet emptyResult;
+        return finish(ResultBuilder()
+                          .value(Value(std::move(emptyResult)))
+                          .iter(Iterator::Kind::kGetNeighbors)
+                          .finish());
     }
     GraphStorageClient* storageClient = qctx_->getStorageClient();
     return storageClient
diff --git a/src/exec/query/ProjectExecutor.cpp b/src/exec/query/ProjectExecutor.cpp
index fea3cb72852b3bd9d28f916d237fc7acbdedd821..78133f10023a0b06518d7c92a63aae980f6fa73e 100644
--- a/src/exec/query/ProjectExecutor.cpp
+++ b/src/exec/query/ProjectExecutor.cpp
@@ -21,17 +21,19 @@ folly::Future<Status> ProjectExecutor::execute() {
     DCHECK(!!iter);
     QueryExpressionContext ctx(ectx_, iter.get());
 
+    VLOG(1) << "input: " << project->inputVar();
     DataSet ds;
     ds.colNames = project->colNames();
     for (; iter->valid(); iter->next()) {
         Row row;
         for (auto& col : columns) {
             Value val = col->expr()->eval(ctx);
-            VLOG(3) << "Project: " << val;
             row.values.emplace_back(std::move(val));
         }
         ds.rows.emplace_back(std::move(row));
     }
+
+    VLOG(1) << node()->varName() << " : " << ds;
     return finish(ResultBuilder().value(Value(std::move(ds))).finish());
 }
 
diff --git a/src/exec/query/test/DataJoinTest.cpp b/src/exec/query/test/DataJoinTest.cpp
index 369d1052a927ebd0c219f9df10f26b7b8f84d6e7..73e0756823a3f3ed6971b5f3c4b1b053f9423429 100644
--- a/src/exec/query/test/DataJoinTest.cpp
+++ b/src/exec/query/test/DataJoinTest.cpp
@@ -91,7 +91,7 @@ void DataJoinTest::testJoin(std::string left, std::string right,
 
     auto* plan = qctx_->plan();
     auto* dataJoin =
-        DataJoin::make(plan, nullptr, {left, right}, std::move(hashKeys),
+        DataJoin::make(plan, nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
                        std::move(probeKeys));
     dataJoin->setColNames(std::vector<std::string>{
         "src", "dst", kVid, "tag_prop", "edge_prop", kDst});
@@ -163,7 +163,7 @@ TEST_F(DataJoinTest, JoinTwice) {
 
         auto* plan = qctx_->plan();
         auto* dataJoin =
-            DataJoin::make(plan, nullptr, {left, right}, std::move(hashKeys),
+            DataJoin::make(plan, nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
                         std::move(probeKeys));
         dataJoin->setColNames(std::vector<std::string>{
             "src", "dst", kVid, "tag_prop", "edge_prop", kDst});
@@ -186,7 +186,7 @@ TEST_F(DataJoinTest, JoinTwice) {
 
     auto* plan = qctx_->plan();
     auto* dataJoin =
-        DataJoin::make(plan, nullptr, {left, right}, std::move(hashKeys),
+        DataJoin::make(plan, nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
                     std::move(probeKeys));
     dataJoin->setColNames(std::vector<std::string>{
         "src", "dst", kVid, "tag_prop", "edge_prop", kDst, "col1"});
diff --git a/src/planner/Query.h b/src/planner/Query.h
index 10cbfb13d8832d1583fc3bbed4ae974ba311e80b..c1354ec52df9299bacc87512f9eaad9e45b201c7 100644
--- a/src/planner/Query.h
+++ b/src/planner/Query.h
@@ -978,10 +978,12 @@ class DataJoin final : public SingleInputNode {
 public:
     static DataJoin* make(ExecutionPlan* plan,
                           PlanNode* input,
-                          std::pair<std::string, std::string> vars,
+                          std::pair<std::string, int64_t> leftVar,
+                          std::pair<std::string, int64_t> rightVar,
                           std::vector<Expression*> hashKeys,
                           std::vector<Expression*> probeKeys) {
-        return new DataJoin(plan, input, std::move(vars), std::move(hashKeys),
+        return new DataJoin(plan, input, std::move(leftVar),
+                            std::move(rightVar), std::move(hashKeys),
                             std::move(probeKeys));
     }
 
@@ -989,8 +991,12 @@ public:
         return "DataJoin";
     }
 
-    const std::pair<std::string, std::string>& vars() const {
-        return vars_;
+    const std::pair<std::string, int64_t>& leftVar() const {
+        return leftVar_;
+    }
+
+    const std::pair<std::string, int64_t>& rightVar() const {
+        return rightVar_;
     }
 
     const std::vector<Expression*>& hashKeys() const {
@@ -1003,15 +1009,19 @@ public:
 
 private:
     DataJoin(ExecutionPlan* plan, PlanNode* input,
-            std::pair<std::string, std::string> vars,
-            std::vector<Expression*> hashKeys, std::vector<Expression*> probeKeys)
+             std::pair<std::string, int64_t> leftVar,
+             std::pair<std::string, int64_t> rightVar,
+             std::vector<Expression*> hashKeys, std::vector<Expression*> probeKeys)
         : SingleInputNode(plan, Kind::kDataJoin, input),
-        vars_(std::move(vars)),
+        leftVar_(std::move(leftVar)),
+        rightVar_(std::move(rightVar)),
         hashKeys_(std::move(hashKeys)),
         probeKeys_(std::move(probeKeys)) {}
 
 private:
-    std::pair<std::string, std::string>     vars_;
+    // var name, var version
+    std::pair<std::string, int64_t>         leftVar_;
+    std::pair<std::string, int64_t>         rightVar_;
     std::vector<Expression*>                hashKeys_;
     std::vector<Expression*>                probeKeys_;
 };
diff --git a/src/util/AnonColGenerator.h b/src/util/AnonColGenerator.h
new file mode 100644
index 0000000000000000000000000000000000000000..3c4b1f36809dadb59a54df3c86e6b4a7283fe9b4
--- /dev/null
+++ b/src/util/AnonColGenerator.h
@@ -0,0 +1,34 @@
+/* Copyright (c) 2020 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef UTIL_ANONCOLGENERATOR_H_
+#define UTIL_ANONCOLGENERATOR_H_
+
+#include "common/base/Base.h"
+
+#include "util/IdGenerator.h"
+
+namespace nebula {
+namespace graph {
+/**
+ * An utility to generate an anonymous column name.
+ */
+class AnonColGenerator final {
+public:
+    AnonColGenerator() {
+        idGen_ = std::make_unique<IdGenerator>();
+    }
+
+    std::string getCol() const {
+        return folly::stringPrintf("__UNAMED_COL_%ld", idGen_->id());
+    }
+
+private:
+    std::unique_ptr<IdGenerator>    idGen_;
+};
+}  // namespace graph
+}  // namespace nebula
+#endif  // UTIL_ANONCOLGENERATOR_H_
diff --git a/src/util/AnnoVarGenerator.h b/src/util/AnonVarGenerator.h
similarity index 64%
rename from src/util/AnnoVarGenerator.h
rename to src/util/AnonVarGenerator.h
index 6c5282a30b7220b1030fba5ea12602ac786bdec1..736f9b2cd1417d79a6e39d84956856fb91100f6b 100644
--- a/src/util/AnnoVarGenerator.h
+++ b/src/util/AnonVarGenerator.h
@@ -4,8 +4,8 @@
  * attached with Common Clause Condition 1.0, found in the LICENSES directory.
  */
 
-#ifndef UTIL_ANNOVARGENERATOR_H_
-#define UTIL_ANNOVARGENERATOR_H_
+#ifndef UTIL_ANONVARGENERATOR_H_
+#define UTIL_ANONVARGENERATOR_H_
 
 #include "common/base/Base.h"
 
@@ -14,16 +14,16 @@
 namespace nebula {
 namespace graph {
 /**
- * An utility to generate an anonymous variable.
+ * An utility to generate an anonymous variable name.
  */
-class AnnoVarGenerator final {
+class AnonVarGenerator final {
 public:
-    AnnoVarGenerator() {
+    AnonVarGenerator() {
         idGen_ = std::make_unique<IdGenerator>();
     }
 
     std::string getVar() const {
-        return folly::stringPrintf("UNAMED_%ld", idGen_->id());
+        return folly::stringPrintf("__UNAMED_VAR_%ld", idGen_->id());
     }
 
 private:
@@ -31,4 +31,4 @@ private:
 };
 }  // namespace graph
 }  // namespace nebula
-#endif  // UTIL_ANNOVARGENERATOR_H_
+#endif  // UTIL_ANONVARGENERATOR_H_
diff --git a/src/validator/AssignmentValidator.cpp b/src/validator/AssignmentValidator.cpp
index f1122ca63be102773caed000d82bab9d3c237f7d..29fb13661640b6d821f0844467d32a554f7a4448 100644
--- a/src/validator/AssignmentValidator.cpp
+++ b/src/validator/AssignmentValidator.cpp
@@ -17,7 +17,7 @@ Status AssignmentValidator::validateImpl() {
     validator_ = makeValidator(assignSentence->sentence(), qctx_);
     NG_RETURN_IF_ERROR(validator_->validate());
 
-    auto outputs = validator_->outputs();
+    auto outputs = validator_->outputCols();
     var_ = *assignSentence->var();
     vctx_->registerVariable(var_, std::move(outputs));
     return Status::OK();
diff --git a/src/validator/GetSubgraphValidator.cpp b/src/validator/GetSubgraphValidator.cpp
index e40b98fd3fb5aeb5c7ff5782e0c7bde48be53088..e913fca1a43b01728c5e6cf881964871360ee0d4 100644
--- a/src/validator/GetSubgraphValidator.cpp
+++ b/src/validator/GetSubgraphValidator.cpp
@@ -155,7 +155,7 @@ Status GetSubgraphValidator::toPlan() {
     auto edgeProps = std::make_unique<std::vector<storage::cpp2::EdgeProp>>();
     auto statProps = std::make_unique<std::vector<storage::cpp2::StatProp>>();
     auto exprs = std::make_unique<std::vector<storage::cpp2::Expr>>();
-    auto vidsToSave = vctx_->varGen()->getVar();
+    auto vidsToSave = vctx_->anonVarGen()->getVar();
     DataSet ds;
     ds.colNames.emplace_back(kVid);
     for (auto& vid : starts_) {
@@ -194,7 +194,7 @@ Status GetSubgraphValidator::toPlan() {
     project->setColNames(deduceColNames(columns));
 
     // ++counter{0} <= steps
-    auto counter = vctx_->varGen()->getVar();
+    auto counter = vctx_->anonVarGen()->getVar();
     qctx_->ectx()->setValue(counter, 0);
     auto* condition = new RelationalExpression(
                 Expression::Kind::kRelLE,
diff --git a/src/validator/GoValidator.cpp b/src/validator/GoValidator.cpp
index 126ecec9c9d176a5db6ebf952a5e92bcc1bd5c5e..18f89ec5aae277c14c9749cd9856567fcef6e7c2 100644
--- a/src/validator/GoValidator.cpp
+++ b/src/validator/GoValidator.cpp
@@ -34,6 +34,8 @@ Status GoValidator::validateImpl() {
         return Status::Error("Only support single input in a go sentence.");
     }
 
+    NG_RETURN_IF_ERROR(buildColumns());
+
     if (!dstTagProps_.empty()) {
         // TODO: inplement get vertex props.
         return Status::Error("Not support get dst yet.");
@@ -44,11 +46,6 @@ Status GoValidator::validateImpl() {
         return Status::Error("Not support over all yet.");
     }
 
-    if (!inputProps_.empty()) {
-        // TODO: inplement get input props.
-        return Status::Error("Not support input prop yet.");
-    }
-
     return Status::OK();
 }
 
@@ -87,7 +84,8 @@ Status GoValidator::validateFrom(const FromClause* from) {
                    << "but was`" << type.value() << "'";
                 return Status::Error(ss.str());
             }
-            src_ = src;
+            srcRef_ = src;
+            firstBeginningSrcVidColName_ = *(static_cast<SymbolPropertyExpression*>(src)->prop());
         }
     } else {
         auto vidList = from->vidList();
@@ -194,38 +192,48 @@ Status GoValidator::toPlan() {
     }
 }
 
-Status GoValidator::oneStep(PlanNode* input, const std::string& inputVarName) {
+Status GoValidator::oneStep(PlanNode* dependencyForGn,
+                            const std::string& inputVarNameForGN,
+                            PlanNode* projectFromJoin) {
     auto* plan = qctx_->plan();
 
-    auto* gn = GetNeighbors::make(plan, input, space_.id);
+    auto* gn = GetNeighbors::make(plan, dependencyForGn, space_.id);
     gn->setSrc(src_);
     gn->setVertexProps(buildSrcVertexProps());
     gn->setEdgeProps(buildEdgeProps());
-    gn->setInputVar(inputVarName);
+    gn->setInputVar(inputVarNameForGN);
+    VLOG(1) << gn->varName();
 
     if (!dstTagProps_.empty()) {
         // TODO: inplement get vertex props.
         return Status::Error("Not support get dst yet.");
     }
 
-    PlanNode *projectInput = gn;
-    if (filter_ != nullptr) {
-        Filter* filterNode = Filter::make(plan, gn, filter_);
-        filterNode->setInputVar(gn->varName());
-        projectInput = filterNode;
+    PlanNode* inputNodeForProjectResult = gn;
+    auto* joinInput = ifBuildJoinPipeInput(gn, projectFromJoin);
+    if (joinInput != nullptr) {
+        inputNodeForProjectResult = joinInput;
     }
 
-    auto* project = Project::make(plan, projectInput, yields_);
-    project->setInputVar(projectInput->varName());
-    project->setColNames(std::vector<std::string>(colNames_));
-
+    if (filter_ != nullptr) {
+        auto* filterNode = Filter::make(plan, inputNodeForProjectResult,
+                    newFilter_ != nullptr ? newFilter_ : filter_);
+        filterNode->setInputVar(inputNodeForProjectResult->varName());
+        filterNode->setColNames(inputNodeForProjectResult->colNames());
+        inputNodeForProjectResult = filterNode;
+    }
+    auto* projectResult =
+        Project::make(plan, inputNodeForProjectResult,
+        newYieldCols_ != nullptr ? newYieldCols_ : yields_);
+    projectResult->setInputVar(inputNodeForProjectResult->varName());
+    projectResult->setColNames(std::vector<std::string>(colNames_));
     if (distinct_) {
-        Dedup* dedupNode = Dedup::make(plan, project);
-        dedupNode->setInputVar(project->varName());
+        Dedup* dedupNode = Dedup::make(plan, projectResult);
+        dedupNode->setInputVar(projectResult->varName());
         dedupNode->setColNames(std::move(colNames_));
         root_ = dedupNode;
     } else {
-        root_ = project;
+        root_ = projectResult;
     }
     tail_ = gn;
     return Status::OK();
@@ -233,52 +241,215 @@ Status GoValidator::oneStep(PlanNode* input, const std::string& inputVarName) {
 
 Status GoValidator::buildNStepsPlan() {
     auto* plan = qctx_->plan();
-    // [->project] -> loop -> project -> gn1 -> bodyStart
+
     auto* bodyStart = StartNode::make(plan);
 
-    std::string input;
+    std::string startVidsVar;
     PlanNode* projectStartVid = nullptr;
-    if (!starts_.empty() && src_ == nullptr) {
-        input = buildInput();
+    if (!starts_.empty() && srcRef_ == nullptr) {
+        startVidsVar = buildConstantInput();
     } else {
         projectStartVid = buildRuntimeInput();
-        input = projectStartVid->varName();
+        startVidsVar = projectStartVid->varName();
     }
+
+    Project* projectLeftVarForJoin = ifBuildLeftVarForTraceJoin(projectStartVid);
+
     auto* gn = GetNeighbors::make(plan, bodyStart, space_.id);
     gn->setSrc(src_);
     gn->setEdgeProps(buildNStepLoopEdgeProps());
-    gn->setInputVar(input);
+    gn->setInputVar(startVidsVar);
+    VLOG(1) << gn->varName();
+
+    Project* projectDstFromGN = projectDstVidsFromGN(gn, startVidsVar);
+
+    Project* projectFromJoin = ifTraceToStartVid(projectLeftVarForJoin, projectDstFromGN);
+
+    auto* loop = Loop::make(
+        plan,
+        projectLeftVarForJoin == nullptr ? projectStartVid
+                                         : projectLeftVarForJoin,
+        projectFromJoin == nullptr ? projectDstFromGN : projectFromJoin,
+        buildNStepLoopCondition(steps_ - 1));
+    VLOG(1) << "loop dep: " << projectLeftVarForJoin;
 
+    auto status = oneStep(loop, projectDstFromGN->varName(),
+            projectFromJoin == nullptr ? projectDstFromGN : projectFromJoin);
+    if (!status.ok()) {
+        return status;
+    }
+    // reset tail_
+    if (projectStartVid != nullptr) {
+        tail_ = projectStartVid;
+    } else if (projectLeftVarForJoin != nullptr) {
+        tail_ = projectLeftVarForJoin;
+    } else {
+        tail_ = loop;
+    }
+    VLOG(1) << "root: " << root_->kind() << " tail: " << tail_->kind();
+    return Status::OK();
+}
+
+PlanNode* GoValidator::ifBuildJoinPipeInput(
+    PlanNode* gn, PlanNode* projectFromJoin) {
+    PlanNode* inputNodeForProjectResult = nullptr;
+    auto* plan = qctx_->plan();
+    if (!inputProps_.empty()) {
+        auto* srcVidCol = new YieldColumn(
+            new VariablePropertyExpression(new std::string(gn->varName()),
+                                        new std::string(kVid)),
+            new std::string(kVid));
+        srcAndEdgePropCols_->addColumn(srcVidCol);
+        auto* project = Project::make(plan, gn, srcAndEdgePropCols_);
+        project->setInputVar(gn->varName());
+        project->setColNames(deduceColNames(srcAndEdgePropCols_));
+        VLOG(1) << project->varName();
+
+        PlanNode* joinInputHashKeyDependency = project;
+        if (steps_ > 1) {
+            auto* joinHashKey = new VariablePropertyExpression(
+                    new std::string(project->varName()), new std::string(kVid));
+            plan->saveObject(joinHashKey);
+            auto* probeKey = new VariablePropertyExpression(
+                    new std::string(projectFromJoin->varName()), new std::string(dstVidColName_));
+            plan->saveObject(probeKey);
+            auto* join = DataJoin::make(
+                plan, project,
+                {project->varName(), ExecutionContext::kLatestVersion},
+                {projectFromJoin->varName(), ExecutionContext::kLatestVersion},
+                {joinHashKey}, {probeKey});
+            std::vector<std::string> colNames = project->colNames();
+            for (auto& col : projectFromJoin->colNames()) {
+                colNames.emplace_back(col);
+            }
+            join->setColNames(std::move(colNames));
+            VLOG(1) << join->varName();
+            joinInputHashKeyDependency = join;
+        }
+
+        auto* joinHashKey = new VariablePropertyExpression(
+            new std::string(joinInputHashKeyDependency->varName()),
+            new std::string(steps_ > 1 ? firstBeginningSrcVidColName_ : kVid));
+        plan->saveObject(joinHashKey);
+        auto* joinInput =
+            DataJoin::make(plan, joinInputHashKeyDependency,
+                           {joinInputHashKeyDependency->varName(),
+                            ExecutionContext::kLatestVersion},
+                           {inputVarName_, ExecutionContext::kLatestVersion},
+                           {joinHashKey}, {steps_ > 1 ? srcRef_ : src_});
+        std::vector<std::string> colNames = joinInputHashKeyDependency->colNames();
+        for (auto& col : outputs_) {
+            colNames.emplace_back(col.first);
+        }
+        joinInput->setColNames(std::move(colNames));
+        VLOG(1) << joinInput->varName();
+        inputNodeForProjectResult = joinInput;
+    }
+
+    return inputNodeForProjectResult;
+}
+
+Project* GoValidator::ifTraceToStartVid(Project* projectLeftVarForJoin,
+                                         Project* projectDstFromGN) {
+    Project* projectJoin = nullptr;
+    auto* plan = qctx_->plan();
+    if (!inputProps_.empty() && projectLeftVarForJoin != nullptr) {
+        auto hashKey = new VariablePropertyExpression(
+                new std::string(projectLeftVarForJoin->varName()),
+                new std::string(dstVidColName_));
+        plan->saveObject(hashKey);
+        auto probeKey = new VariablePropertyExpression(
+            new std::string(projectDstFromGN->varName()), new std::string(srcVidColName_));
+        plan->saveObject(probeKey);
+        auto* join = DataJoin::make(
+            plan, projectDstFromGN,
+            {projectLeftVarForJoin->varName(),
+             ExecutionContext::kLatestVersion},
+            {projectDstFromGN->varName(), ExecutionContext::kLatestVersion},
+            {hashKey}, {probeKey});
+        std::vector<std::string> colNames = projectLeftVarForJoin->colNames();
+        for (auto& col : projectDstFromGN->colNames()) {
+            colNames.emplace_back(col);
+        }
+        join->setColNames(std::move(colNames));
+        VLOG(1) << join->varName();
+
+        auto* columns = new YieldColumns();
+        auto* column =
+            new YieldColumn(new InputPropertyExpression(
+                                new std::string(firstBeginningSrcVidColName_)),
+                            new std::string(firstBeginningSrcVidColName_));
+        columns->addColumn(column);
+        column =
+            new YieldColumn(new InputPropertyExpression(new std::string(kVid)),
+                            new std::string(dstVidColName_));
+        columns->addColumn(column);
+        projectJoin = Project::make(plan, join, plan->saveObject(columns));
+        projectJoin->setInputVar(join->varName());
+        projectJoin->setColNames(deduceColNames(columns));
+        VLOG(1) << projectJoin->varName();
+    }
+    return projectJoin;
+}
+
+Project* GoValidator::projectDstVidsFromGN(PlanNode* gn, const std::string& outputVar) {
+    Project* project = nullptr;
+    auto* plan = qctx_->plan();
     auto* columns = new YieldColumns();
     auto* column = new YieldColumn(
         new EdgePropertyExpression(new std::string("*"), new std::string(kDst)),
         new std::string(kVid));
     columns->addColumn(column);
 
-    auto* project = Project::make(plan, gn, plan->saveObject(columns));
+    srcVidColName_ = vctx_->anonColGen()->getCol();
+    if (!inputProps_.empty()) {
+        column =
+            new YieldColumn(new InputPropertyExpression(new std::string(kVid)),
+                            new std::string(srcVidColName_));
+        columns->addColumn(column);
+    }
+
+    project = Project::make(plan, gn, plan->saveObject(columns));
     project->setInputVar(gn->varName());
+    project->setOutputVar(outputVar);
     project->setColNames(deduceColNames(columns));
+    VLOG(1) << project->varName();
 
-    auto* loop = Loop::make(plan, projectStartVid, project,
-                            buildNStepLoopCondition(steps_ - 1));
+    return project;
+}
 
-    auto status = oneStep(loop, project->varName());
-    if (!status.ok()) {
-        return status;
-    }
-    // reset tail_
-    tail_ = projectStartVid == nullptr ? loop : projectStartVid;
-    VLOG(1) << "root: " << root_->kind() << " tail: " << tail_->kind();
-    return Status::OK();
+Project* GoValidator::ifBuildLeftVarForTraceJoin(PlanNode* projectStartVid) {
+    Project* projectLeftVarForJoin = nullptr;
+    auto* plan = qctx_->plan();
+    dstVidColName_ = vctx_->anonColGen()->getCol();
+    if (!inputProps_.empty()) {
+        auto* columns = new YieldColumns();
+        auto* column =
+            new YieldColumn(Expression::decode(srcRef_->encode()).release(),
+                            new std::string(firstBeginningSrcVidColName_));
+        columns->addColumn(column);
+        column =
+            new YieldColumn(Expression::decode(srcRef_->encode()).release(),
+                            new std::string(dstVidColName_));
+        columns->addColumn(column);
+        plan->saveObject(columns);
+        projectLeftVarForJoin = Project::make(plan, projectStartVid, columns);
+        projectLeftVarForJoin->setInputVar(inputVarName_);
+        projectLeftVarForJoin->setColNames(deduceColNames(columns));
+    }
+
+    return projectLeftVarForJoin;
 }
 
 Status GoValidator::buildOneStepPlan() {
-    std::string inputVarName;
-    if (!starts_.empty() && src_ == nullptr) {
-        inputVarName = buildInput();
+    std::string inputVarNameForGN = inputVarName_;
+    if (!starts_.empty() && srcRef_ == nullptr) {
+        inputVarNameForGN = buildConstantInput();
+    } else {
+        src_ = srcRef_;
     }
 
-    auto status = oneStep(nullptr, inputVarName);
+    auto status = oneStep(nullptr, inputVarNameForGN, nullptr);
     if (!status.ok()) {
         return status;
     }
@@ -287,8 +458,8 @@ Status GoValidator::buildOneStepPlan() {
     return Status::OK();
 }
 
-std::string GoValidator::buildInput() {
-    auto input = vctx_->varGen()->getVar();
+std::string GoValidator::buildConstantInput() {
+    auto input = vctx_->anonVarGen()->getVar();
     DataSet ds;
     ds.colNames.emplace_back(kVid);
     for (auto& vid : starts_) {
@@ -298,9 +469,8 @@ std::string GoValidator::buildInput() {
     }
     qctx_->ectx()->setResult(input, ResultBuilder().value(Value(std::move(ds))).finish());
 
-    auto* vids = new VariablePropertyExpression(
-                    new std::string(input),
-                    new std::string(kVid));
+    auto* vids = new VariablePropertyExpression(new std::string(input),
+                                                new std::string(kVid));
     qctx_->plan()->saveObject(vids);
     src_ = vids;
     return input;
@@ -308,7 +478,7 @@ std::string GoValidator::buildInput() {
 
 PlanNode* GoValidator::buildRuntimeInput() {
     auto* columns = new YieldColumns();
-    auto encode = src_->encode();
+    auto encode = srcRef_->encode();
     auto decode = Expression::decode(encode);
     auto* column = new YieldColumn(decode.release(), new std::string(kVid));
     columns->addColumn(column);
@@ -397,11 +567,11 @@ GetNeighbors::EdgeProps GoValidator::buildEdgeProps() {
 GetNeighbors::EdgeProps GoValidator::buildNStepLoopEdgeProps() {
     GetNeighbors::EdgeProps edgeProps;
     if (!edgeProps_.empty()) {
-        edgeProps = std::make_unique<std::vector<storage::cpp2::EdgeProp>>(edgeProps_.size());
-        std::transform(edgeProps_.begin(), edgeProps_.end(), edgeProps->begin(), [] (auto& edge) {
+        edgeProps = std::make_unique<std::vector<storage::cpp2::EdgeProp>>(edgeTypes_.size());
+        std::transform(edgeTypes_.begin(), edgeTypes_.end(), edgeProps->begin(), [] (auto& type) {
             storage::cpp2::EdgeProp ep;
-            ep.type = edge.first;
-            ep.props = std::vector<std::string>({kDst});
+            ep.type = type;
+            ep.props = {kDst};
             return ep;
         });
     }
@@ -409,8 +579,9 @@ GetNeighbors::EdgeProps GoValidator::buildNStepLoopEdgeProps() {
 }
 
 Expression* GoValidator::buildNStepLoopCondition(int64_t steps) const {
+    VLOG(1) << "steps: " << steps;
     // ++loopSteps{0} <= steps
-    auto loopSteps = vctx_->varGen()->getVar();
+    auto loopSteps = vctx_->anonVarGen()->getVar();
     qctx_->ectx()->setValue(loopSteps, 0);
     auto* condition = new RelationalExpression(
         Expression::Kind::kRelLE,
@@ -423,5 +594,235 @@ Expression* GoValidator::buildNStepLoopCondition(int64_t steps) const {
     return condition;
 }
 
+void GoValidator::extractPropExprs(const Expression* expr) {
+    switch (expr->kind()) {
+        case Expression::Kind::kConstant: {
+            break;
+        }
+        case Expression::Kind::kAdd:
+        case Expression::Kind::kMinus:
+        case Expression::Kind::kMultiply:
+        case Expression::Kind::kDivision:
+        case Expression::Kind::kMod:
+        case Expression::Kind::kRelEQ:
+        case Expression::Kind::kRelNE:
+        case Expression::Kind::kRelLT:
+        case Expression::Kind::kRelLE:
+        case Expression::Kind::kRelGT:
+        case Expression::Kind::kRelGE:
+        case Expression::Kind::kLogicalAnd:
+        case Expression::Kind::kLogicalOr:
+        case Expression::Kind::kLogicalXor: {
+            auto biExpr = static_cast<const BinaryExpression*>(expr);
+            extractPropExprs(biExpr->left());
+            extractPropExprs(biExpr->right());
+            break;
+        }
+        case Expression::Kind::kUnaryPlus:
+        case Expression::Kind::kUnaryNegate:
+        case Expression::Kind::kUnaryNot: {
+            auto unaryExpr = static_cast<const UnaryExpression*>(expr);
+            extractPropExprs(unaryExpr->operand());
+            break;
+        }
+        case Expression::Kind::kFunctionCall: {
+            auto funcExpr = static_cast<const FunctionCallExpression*>(expr);
+            auto& args = funcExpr->args()->args();
+            for (auto iter = args.begin(); iter < args.end(); ++iter) {
+                extractPropExprs(iter->get());
+            }
+            break;
+        }
+        case Expression::Kind::kDstProperty: {
+            auto found = propExprColMap_.find(expr->toString());
+            if (found == propExprColMap_.end()) {
+                auto encode = expr->encode();
+                auto newExpr = Expression::decode(encode);
+                auto col = new YieldColumn(
+                    newExpr.release(), new std::string(vctx_->anonColGen()->getCol()));
+                propExprColMap_.emplace(expr->toString(), col);
+                dstPropCols_->addColumn(col);
+            }
+            break;
+        }
+        case Expression::Kind::kTagProperty:
+        case Expression::Kind::kSrcProperty:
+        case Expression::Kind::kEdgeProperty:
+        case Expression::Kind::kEdgeSrc:
+        case Expression::Kind::kEdgeType:
+        case Expression::Kind::kEdgeRank:
+        case Expression::Kind::kEdgeDst: {
+            auto found = propExprColMap_.find(expr->toString());
+            if (found == propExprColMap_.end()) {
+                auto encode = expr->encode();
+                auto newExpr = Expression::decode(encode);
+                auto col = new YieldColumn(
+                    newExpr.release(), new std::string(vctx_->anonColGen()->getCol()));
+                propExprColMap_.emplace(expr->toString(), col);
+                srcAndEdgePropCols_->addColumn(col);
+            }
+            break;
+        }
+        case Expression::Kind::kInputProperty:
+        case Expression::Kind::kVarProperty: {
+            auto* symPropExpr = static_cast<const SymbolPropertyExpression*>(expr);
+            auto found = propExprColMap_.find(expr->toString());
+            if (found == propExprColMap_.end()) {
+                auto encode = expr->encode();
+                auto newExpr = Expression::decode(encode);
+                auto col = new YieldColumn(
+                    newExpr.release(), new std::string(*symPropExpr->prop()));
+                propExprColMap_.emplace(expr->toString(), col);
+                inputPropCols_->addColumn(col);
+            }
+            break;
+        }
+        case Expression::Kind::kUUID:
+        case Expression::Kind::kVar:
+        case Expression::Kind::kVersionedVar:
+        case Expression::Kind::kSymProperty:
+        case Expression::Kind::kTypeCasting:
+        case Expression::Kind::kUnaryIncr:
+        case Expression::Kind::kUnaryDecr:
+        case Expression::Kind::kRelIn: {
+            LOG(FATAL) << "Not support " << expr->kind();
+            break;
+        }
+    }
+}
+
+std::unique_ptr<Expression> GoValidator::rewriteToInputProp(Expression* expr) {
+    switch (expr->kind()) {
+        case Expression::Kind::kConstant: {
+            break;
+        }
+        case Expression::Kind::kAdd:
+        case Expression::Kind::kMinus:
+        case Expression::Kind::kMultiply:
+        case Expression::Kind::kDivision:
+        case Expression::Kind::kMod:
+        case Expression::Kind::kRelEQ:
+        case Expression::Kind::kRelNE:
+        case Expression::Kind::kRelLT:
+        case Expression::Kind::kRelLE:
+        case Expression::Kind::kRelGT:
+        case Expression::Kind::kRelGE:
+        case Expression::Kind::kLogicalAnd:
+        case Expression::Kind::kLogicalOr:
+        case Expression::Kind::kLogicalXor: {
+            auto biExpr = static_cast<BinaryExpression*>(expr);
+            auto left = rewriteToInputProp(const_cast<Expression*>(biExpr->left()));
+            if (left != nullptr) {
+                biExpr->setLeft(left.release());
+            }
+            auto right = rewriteToInputProp(const_cast<Expression*>(biExpr->right()));
+            if (right != nullptr) {
+                biExpr->setRight(right.release());
+            }
+            break;
+        }
+        case Expression::Kind::kUnaryPlus:
+        case Expression::Kind::kUnaryNegate:
+        case Expression::Kind::kUnaryNot: {
+            auto unaryExpr = static_cast<UnaryExpression*>(expr);
+            auto rewrite = rewriteToInputProp(const_cast<Expression*>(unaryExpr->operand()));
+            if (rewrite != nullptr) {
+                unaryExpr->setOperand(rewrite.release());
+            }
+            break;
+        }
+        case Expression::Kind::kFunctionCall: {
+            auto funcExpr = static_cast<FunctionCallExpression*>(expr);
+            auto* argList = const_cast<ArgumentList*>(funcExpr->args());
+            auto args = argList->moveArgs();
+            for (auto iter = args.begin(); iter < args.end(); ++iter) {
+                auto rewrite = rewriteToInputProp(iter->get());
+                if (rewrite != nullptr) {
+                    *iter = std::move(rewrite);
+                }
+            }
+            argList->setArgs(std::move(args));
+            break;
+        }
+        case Expression::Kind::kTagProperty:
+        case Expression::Kind::kSrcProperty:
+        case Expression::Kind::kDstProperty:
+        case Expression::Kind::kEdgeProperty:
+        case Expression::Kind::kEdgeSrc:
+        case Expression::Kind::kEdgeType:
+        case Expression::Kind::kEdgeRank:
+        case Expression::Kind::kEdgeDst:
+        case Expression::Kind::kVarProperty: {
+            auto found = propExprColMap_.find(expr->toString());
+            DCHECK(found != propExprColMap_.end());
+            auto alias = new std::string(*(found->second->alias()));
+            return std::make_unique<InputPropertyExpression>(alias);
+        }
+        case Expression::Kind::kInputProperty: {
+            break;
+        }
+        case Expression::Kind::kUUID:
+        case Expression::Kind::kVar:
+        case Expression::Kind::kVersionedVar:
+        case Expression::Kind::kSymProperty:
+        case Expression::Kind::kTypeCasting:
+        case Expression::Kind::kUnaryIncr:
+        case Expression::Kind::kUnaryDecr:
+        case Expression::Kind::kRelIn: {
+            LOG(FATAL) << "Not support " << expr->kind();
+            break;
+        }
+    }
+    return nullptr;
+}
+
+Status GoValidator::buildColumns() {
+    if (dstTagProps_.empty() && inputProps_.empty() && varProps_.empty()) {
+        return Status::OK();
+    }
+
+    if (!srcTagProps_.empty() || !edgeProps_.empty()) {
+        srcAndEdgePropCols_ = qctx_->plan()->saveObject(new YieldColumns());
+    }
+
+    if (!dstTagProps_.empty()) {
+        dstPropCols_ = qctx_->plan()->saveObject(new YieldColumns());
+    }
+
+    if (!inputProps_.empty() || !varProps_.empty()) {
+        inputPropCols_ = qctx_->plan()->saveObject(new YieldColumns());
+    }
+
+    if (filter_ != nullptr) {
+        extractPropExprs(filter_);
+        auto newFilter = Expression::decode(filter_->encode());
+        auto rewriteFilter = rewriteToInputProp(newFilter.get());
+        if (rewriteFilter != nullptr) {
+            newFilter_ = rewriteFilter.release();
+        } else {
+            newFilter_ = newFilter.release();
+        }
+        qctx_->plan()->saveObject(newFilter_);
+    }
+
+    newYieldCols_ = qctx_->plan()->saveObject(new YieldColumns());
+    for (auto* yield : yields_->columns()) {
+        extractPropExprs(yield->expr());
+        auto newCol = Expression::decode(yield->expr()->encode());
+        auto rewriteCol = rewriteToInputProp(newCol.get());
+        auto alias = yield->alias() == nullptr
+                         ? nullptr
+                         : new std::string(*(yield->alias()));
+        if (rewriteCol != nullptr) {
+            newYieldCols_->addColumn(
+                new YieldColumn(rewriteCol.release(), alias));
+        } else {
+            newYieldCols_->addColumn(new YieldColumn(newCol.release(), alias));
+        }
+    }
+
+    return Status::OK();
+}
+
 }  // namespace graph
 }  // namespace nebula
diff --git a/src/validator/GoValidator.h b/src/validator/GoValidator.h
index 47b0dd98eac65fe54c9f1e5d740129c125c69101..4361a53ce711451d8eb28d5dff4865cfdd569cc2 100644
--- a/src/validator/GoValidator.h
+++ b/src/validator/GoValidator.h
@@ -33,13 +33,20 @@ private:
 
     Status validateYield(const YieldClause* yield);
 
+    void extractPropExprs(const Expression* expr);
+
+    std::unique_ptr<Expression> rewriteToInputProp(Expression* expr);
+
+    Status buildColumns();
+
     Status buildOneStepPlan();
 
     Status buildNStepsPlan();
 
-    Status oneStep(PlanNode* input, const std::string& inputVarName);
+    Status oneStep(PlanNode* dependencyForGn, const std::string& inputVarNameForGN,
+                   PlanNode* projectFromJoin);
 
-    std::string buildInput();
+    std::string buildConstantInput();
 
     PlanNode* buildRuntimeInput();
 
@@ -53,6 +60,16 @@ private:
 
     Expression* buildNStepLoopCondition(int64_t steps) const;
 
+    Project* ifBuildLeftVarForTraceJoin(PlanNode* projectStartVid);
+
+    Project* projectDstVidsFromGN(PlanNode* gn, const std::string& outputVar);
+
+    Project* ifTraceToStartVid(Project* projectLeftVarForJoin,
+                               Project* projectDstFromGN);
+
+    PlanNode* ifBuildJoinPipeInput(PlanNode* gn,
+                                   PlanNode* projectFromJoin);
+
     enum FromType {
         kConstantExpr,
         kVariable,
@@ -62,6 +79,7 @@ private:
 private:
     int64_t                                                 steps_;
     FromType                                                fromType_{kConstantExpr};
+    Expression*                                             srcRef_{nullptr};
     Expression*                                             src_{nullptr};
     std::vector<Value>                                      starts_;
     bool                                                    isOverAll_{false};
@@ -71,6 +89,18 @@ private:
     std::vector<std::string>                                colNames_;
     YieldColumns*                                           yields_{nullptr};
     bool                                                    distinct_{false};
+
+    // Generated by validator if needed, and the lifecycle of raw pinters would
+    // be managed by object pool
+    YieldColumns*                                           srcAndEdgePropCols_{nullptr};
+    YieldColumns*                                           dstPropCols_{nullptr};
+    YieldColumns*                                           inputPropCols_{nullptr};
+    std::unordered_map<std::string, YieldColumn*>           propExprColMap_;
+    Expression*                                             newFilter_{nullptr};
+    YieldColumns*                                           newYieldCols_{nullptr};
+    std::string                                             srcVidColName_;
+    std::string                                             dstVidColName_;
+    std::string                                             firstBeginningSrcVidColName_;
 };
 }  // namespace graph
 }  // namespace nebula
diff --git a/src/validator/LimitValidator.cpp b/src/validator/LimitValidator.cpp
index 81514eca5caa15c77702ba5a2db79561e3ada4e9..3075e0fe17aaf0ea28d2786ed2a591690ad43442 100644
--- a/src/validator/LimitValidator.cpp
+++ b/src/validator/LimitValidator.cpp
@@ -21,7 +21,7 @@ Status LimitValidator::validateImpl() {
         return Status::SyntaxError("count `%ld' is illegal", count_);
     }
 
-    outputs_ = inputs();
+    outputs_ = inputCols();
     return Status::OK();
 }
 
diff --git a/src/validator/OrderByValidator.cpp b/src/validator/OrderByValidator.cpp
index 70817e82d89dc3f68ae66bb9b8680301802f7c16..41180f2014236842b47b361e5f7505fcc25359d4 100644
--- a/src/validator/OrderByValidator.cpp
+++ b/src/validator/OrderByValidator.cpp
@@ -12,7 +12,7 @@ namespace nebula {
 namespace graph {
 Status OrderByValidator::validateImpl() {
     auto sentence = static_cast<OrderBySentence*>(sentence_);
-    auto inputColNames = inputs();
+    auto inputColNames = inputCols();
     auto factors = sentence->factors();
     for (auto &factor : factors) {
         if (factor->expr()->kind() != Expression::Kind::kInputProperty) {
@@ -32,7 +32,7 @@ Status OrderByValidator::validateImpl() {
         colOrderTypes_.emplace_back(std::make_pair(name, factor->orderType()));
     }
 
-    outputs_ = inputs();
+    outputs_ = inputCols();
     return Status::OK();
 }
 
diff --git a/src/validator/PipeValidator.cpp b/src/validator/PipeValidator.cpp
index 5c72fa1d6c22e5c80872d06c13bffa5401ab89cd..f19c1ac0ebccf5c62c933e1e8d199c8409edf287 100644
--- a/src/validator/PipeValidator.cpp
+++ b/src/validator/PipeValidator.cpp
@@ -21,10 +21,11 @@ Status PipeValidator::validateImpl() {
 
     auto right = pipeSentence->right();
     rValidator_ = makeValidator(right, qctx_);
-    rValidator_->setInputs(lValidator_->outputs());
+    rValidator_->setInputCols(lValidator_->outputCols());
+    rValidator_->setInputVarName(lValidator_->root()->varName());
     NG_RETURN_IF_ERROR(rValidator_->validate());
 
-    outputs_ = rValidator_->outputs();
+    outputs_ = rValidator_->outputCols();
     return Status::OK();
 }
 
diff --git a/src/validator/SequentialValidator.cpp b/src/validator/SequentialValidator.cpp
index f6423565a90e3a7f6384bd220cb8241e1a065190..ecb7c4c3a41d2a54275f7c802411b4e19af78982 100644
--- a/src/validator/SequentialValidator.cpp
+++ b/src/validator/SequentialValidator.cpp
@@ -68,6 +68,7 @@ Status SequentialValidator::toPlan() {
     }
     tail_ = StartNode::make(plan);
     NG_RETURN_IF_ERROR(validators_.front()->appendPlan(tail_));
+    VLOG(1) << "root: " << root_->kind() << " tail: " << tail_->kind();
     return Status::OK();
 }
 
diff --git a/src/validator/SetValidator.cpp b/src/validator/SetValidator.cpp
index 4fea3c97dc2636adbf36047f886792513d634feb..b42afa72a8ec5f85364ef1abf434aa8dcc6d2ed7 100644
--- a/src/validator/SetValidator.cpp
+++ b/src/validator/SetValidator.cpp
@@ -18,8 +18,8 @@ Status SetValidator::validateImpl() {
     rValidator_ = makeValidator(setSentence->right(), qctx_);
     NG_RETURN_IF_ERROR(rValidator_->validate());
 
-    auto lCols = lValidator_->outputs();
-    auto rCols = rValidator_->outputs();
+    auto lCols = lValidator_->outputCols();
+    auto rCols = rValidator_->outputCols();
 
     if (UNLIKELY(lCols != rCols)) {
         return Status::Error("Different columns to UNION/INTERSECT/MINUS");
diff --git a/src/validator/Validator.cpp b/src/validator/Validator.cpp
index 0a3ecdd19636f076324409d664e1dd7da4014a48..dc3f41221fff71a7487477026d45e6175a2d3faa 100644
--- a/src/validator/Validator.cpp
+++ b/src/validator/Validator.cpp
@@ -194,16 +194,8 @@ std::vector<std::string> Validator::deduceColNames(const YieldColumns* cols) con
 std::string Validator::deduceColName(const YieldColumn* col) const {
     if (col->alias() != nullptr) {
         return *col->alias();
-    }
-
-    switch (col->expr()->kind()) {
-        case Expression::Kind::kInputProperty: {
-            auto expr = static_cast<InputPropertyExpression*>(col->expr());
-            return *expr->prop();
-        }
-        default: {
-            return col->expr()->toString();
-        }
+    } else {
+        return col->expr()->toString();
     }
 }
 
diff --git a/src/validator/Validator.h b/src/validator/Validator.h
index f1cc191eb535e883a21e90b795f02ba356cce5e2..0eae4243e7b9556bd11c87f959e05057f475527a 100644
--- a/src/validator/Validator.h
+++ b/src/validator/Validator.h
@@ -30,7 +30,11 @@ public:
 
     Status validate();
 
-    void setInputs(ColsDef&& inputs) {
+    void setInputVarName(std::string name) {
+        inputVarName_ = std::move(name);
+    }
+
+    void setInputCols(ColsDef&& inputs) {
         inputs_ = std::move(inputs);
     }
 
@@ -42,11 +46,11 @@ public:
         return tail_;
     }
 
-    ColsDef outputs() const {
+    ColsDef outputCols() const {
         return outputs_;
     }
 
-    ColsDef inputs() const {
+    ColsDef inputCols() const {
         return inputs_;
     }
 
@@ -95,6 +99,8 @@ protected:
     // The input columns and output columns of a sentence.
     ColsDef                         outputs_;
     ColsDef                         inputs_;
+    // The variable name of the input node.
+    std::string                     inputVarName_;
     // Admin sentences do not requires a space to be chosen.
     bool                            noSpaceRequired_{false};
 
diff --git a/src/validator/test/CMakeLists.txt b/src/validator/test/CMakeLists.txt
index 7b19689ad535080748a4881ad3ce42afa49e3adf..e2bd2e6308974a17a4fbe01e1cf57f4481931c29 100644
--- a/src/validator/test/CMakeLists.txt
+++ b/src/validator/test/CMakeLists.txt
@@ -59,7 +59,6 @@ nebula_add_test(
     OBJECTS ${VALIDATOR_TEST_LIBS}
     LIBRARIES
         gtest
-        gtest_main
         ${THRIFT_LIBRARIES}
         wangle
         proxygenhttpserver
diff --git a/src/validator/test/QueryValidatorTest.cpp b/src/validator/test/QueryValidatorTest.cpp
index 217065aaf24ad3294230d636fb47f6b2a9d22414..fd41f24d1332858a1160cef96a5e8e06de9ad457 100644
--- a/src/validator/test/QueryValidatorTest.cpp
+++ b/src/validator/test/QueryValidatorTest.cpp
@@ -165,6 +165,129 @@ TEST_F(QueryValidatorTest, GoNSteps) {
         };
         EXPECT_TRUE(checkResult(query, expected));
     }
+    {
+        std::string query = "GO 1 STEPS FROM \"1\" OVER like YIELD like._dst AS "
+                            "id | GO 1 STEPS FROM $-.id OVER like YIELD $-.id, like._dst";
+        std::vector<PlanNode::Kind> expected = {
+            PK::kProject,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kStart
+        };
+        EXPECT_TRUE(checkResult(query, expected));
+    }
+    {
+        std::string query = "GO 1 STEPS FROM \"1\" OVER like YIELD like._dst AS "
+                            "id | GO 1 STEPS FROM $-.id OVER like "
+                            "WHERE $-.id == \"2\" YIELD $-.id, like._dst";
+        std::vector<PlanNode::Kind> expected = {
+            PK::kProject,
+            PK::kFilter,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kStart
+        };
+        EXPECT_TRUE(checkResult(query, expected));
+    }
+    {
+        std::string query = "GO 1 STEPS FROM \"1\" OVER like YIELD like._dst AS "
+                            "id | GO 1 STEPS FROM $-.id OVER like "
+                            "WHERE $-.id == \"2\" YIELD DISTINCT $-.id, like._dst";
+        std::vector<PlanNode::Kind> expected = {
+            PK::kDataCollect,
+            PK::kDedup,
+            PK::kProject,
+            PK::kFilter,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kStart
+        };
+        EXPECT_TRUE(checkResult(query, expected));
+    }
+    {
+        std::string query = "GO 1 STEPS FROM \"1\" OVER like YIELD like._dst AS "
+                            "id | GO 2 STEPS FROM $-.id OVER like YIELD $-.id, like._dst";
+        std::vector<PlanNode::Kind> expected = {
+            PK::kProject,
+            PK::kDataJoin,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kLoop,
+            PK::kProject,
+            PK::kProject,
+            PK::kProject,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kGetNeighbors,
+            PK::kStart,
+            PK::kStart,
+        };
+        EXPECT_TRUE(checkResult(query, expected));
+    }
+    {
+        std::string query = "GO 1 STEPS FROM \"1\" OVER like YIELD like._dst AS "
+                            "id | GO 2 STEPS FROM $-.id OVER like "
+                            "WHERE $-.id == \"2\" YIELD $-.id, like._dst";
+        std::vector<PlanNode::Kind> expected = {
+            PK::kProject,
+            PK::kFilter,
+            PK::kDataJoin,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kLoop,
+            PK::kProject,
+            PK::kProject,
+            PK::kProject,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kGetNeighbors,
+            PK::kStart,
+            PK::kStart,
+        };
+        EXPECT_TRUE(checkResult(query, expected));
+    }
+    {
+        std::string query = "GO 1 STEPS FROM \"1\" OVER like YIELD like._dst AS "
+                            "id | GO 2 STEPS FROM $-.id OVER like "
+                            "WHERE $-.id == \"2\" YIELD DISTINCT $-.id, like._dst";
+        std::vector<PlanNode::Kind> expected = {
+            PK::kDataCollect,
+            PK::kDedup,
+            PK::kProject,
+            PK::kFilter,
+            PK::kDataJoin,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kLoop,
+            PK::kProject,
+            PK::kProject,
+            PK::kProject,
+            PK::kDataJoin,
+            PK::kProject,
+            PK::kProject,
+            PK::kGetNeighbors,
+            PK::kGetNeighbors,
+            PK::kStart,
+            PK::kStart,
+        };
+        EXPECT_TRUE(checkResult(query, expected));
+    }
 }
 
 TEST_F(QueryValidatorTest, GoOneStep) {
@@ -476,3 +599,4 @@ TEST_F(QueryValidatorTest, TestMaxAllowedStatements) {
 
 }  // namespace graph
 }  // namespace nebula
+
diff --git a/src/validator/test/ValidatorTestBase.cpp b/src/validator/test/ValidatorTestBase.cpp
index 07d2d1743042ac88553e092e4ab7436ff95c1d7e..80acd68cd123f26e8ef0489234f762b83b1d39a2 100644
--- a/src/validator/test/ValidatorTestBase.cpp
+++ b/src/validator/test/ValidatorTestBase.cpp
@@ -18,3 +18,12 @@ std::ostream& operator<<(std::ostream& os, const std::vector<PlanNode::Kind>& pl
 
 }   // namespace graph
 }   // namespace nebula
+
+int main(int argc, char** argv) {
+    testing::InitGoogleTest(&argc, argv);
+    folly::init(&argc, &argv, true);
+    google::SetStderrLogging(google::INFO);
+
+    return RUN_ALL_TESTS();
+}
+
diff --git a/src/validator/test/ValidatorTestBase.h b/src/validator/test/ValidatorTestBase.h
index 5d9baa53ba368a7a0d3566c0f176ec0d2e71e33a..7ca4723c2e2f6fd92c733d3a7b3c70ded66a5820 100644
--- a/src/validator/test/ValidatorTestBase.h
+++ b/src/validator/test/ValidatorTestBase.h
@@ -108,6 +108,7 @@ protected:
 
         while (!queue.empty()) {
             auto node = queue.front();
+            VLOG(1) << "node kind: " << node->kind();
             queue.pop();
             if (visited.find(node->id()) != visited.end()) {
                 continue;
@@ -153,7 +154,8 @@ protected:
                 case PlanNode::Kind::kShowEdges:
                 case PlanNode::Kind::kCreateSnapshot:
                 case PlanNode::Kind::kDropSnapshot:
-                case PlanNode::Kind::kShowSnapshots: {
+                case PlanNode::Kind::kShowSnapshots:
+                case PlanNode::Kind::kDataJoin: {
                     auto* current = static_cast<const SingleInputNode*>(node);
                     queue.emplace(current->dep());
                     break;