diff --git a/src/context/Iterator.cpp b/src/context/Iterator.cpp
index 2a786411f45485ccce8d6fcf453e0bd369f8444d..9de9d67e02ae42d62038afedea19871a00d64d0b 100644
--- a/src/context/Iterator.cpp
+++ b/src/context/Iterator.cpp
@@ -15,89 +15,92 @@ namespace graph {
 
 GetNeighborsIter::GetNeighborsIter(std::shared_ptr<Value> value)
     : Iterator(value, Kind::kGetNeighbors) {
-    if (!value->isList()) {
+    auto status = processList(value);
+    if (UNLIKELY(!status.ok())) {
+        LOG(ERROR) << status;
         clear();
         return;
     }
-    int64_t segment = 0;
-    for (auto& val : value_->getList().values) {
-        if (!val.isDataSet()) {
-            clear();
-            return;
+    iter_ = logicalRows_.begin();
+    valid_ = true;
+}
+
+Status GetNeighborsIter::processList(std::shared_ptr<Value> value) {
+    if (UNLIKELY(!value->isList())) {
+        std::stringstream ss;
+        ss << "Value type is not list, type: " << value->type();
+        return Status::Error(ss.str());
+    }
+    size_t idx = 0;
+    for (auto& val : value->getList().values) {
+        if (UNLIKELY(!val.isDataSet())) {
+            return Status::Error("There is a value in list which is not a data set.");
         }
-        auto& ds = val.getDataSet();
-        auto& colNames = ds.colNames;
-        auto buildResult = buildIndex(colNames);
-        if (!buildResult.ok()) {
-            LOG(ERROR) << "Build index error: " << buildResult.status();
-            clear();
-            return;
+        auto status = makeDataSetIndex(val.getDataSet(), idx++);
+        NG_RETURN_IF_ERROR(status);
+        dsIndices_.emplace_back(std::move(status).value());
+    }
+    return Status::OK();
+}
+
+StatusOr<GetNeighborsIter::DataSetIndex> GetNeighborsIter::makeDataSetIndex(const DataSet& ds,
+                                                                            size_t idx) {
+    DataSetIndex dsIndex;
+    dsIndex.ds = &ds;
+    auto buildResult = buildIndex(&dsIndex);
+    NG_RETURN_IF_ERROR(buildResult);
+    int64_t edgeStartIndex = std::move(buildResult).value();
+    if (edgeStartIndex < 0) {
+        for (auto& row : dsIndex.ds->rows) {
+            logicalRows_.emplace_back(LogicalRow{idx, &row, "", nullptr});
         }
-        int64_t edgeStartIndex = buildResult.value();
-        segments_.emplace_back(&ds);
-        if (edgeStartIndex < 0) {
-            for (auto& row : ds.rows) {
-                logicalRows_.emplace_back(
-                        std::make_tuple(segment, &row, "", nullptr));
+    } else {
+        makeLogicalRowByEdge(edgeStartIndex, idx, dsIndex);
+    }
+    return dsIndex;
+}
+
+void GetNeighborsIter::makeLogicalRowByEdge(int64_t edgeStartIndex,
+                                            size_t idx,
+                                            const DataSetIndex& dsIndex) {
+    for (auto& row : dsIndex.ds->rows) {
+        auto& cols = row.values;
+        for (size_t column = edgeStartIndex; column < cols.size() - 1; ++column) {
+            if (!cols[column].isList()) {
+                // Ignore the bad value.
+                continue;
             }
-        } else {
-            for (auto& row : ds.rows) {
-                auto& cols = row.values;
-                for (size_t column = edgeStartIndex; column < cols.size() - 1; ++column) {
-                    if (!cols[column].isList()) {
-                        // Ignore the bad value.
-                        continue;
-                    }
-                    for (auto& edge : cols[column].getList().values) {
-                        if (!edge.isList()) {
-                            // Ignore the bad value.
-                            continue;
-                        }
-                        auto& tagEdgeNameIndex = tagEdgeNameIndices_[segment];
-                        auto edgeName = tagEdgeNameIndex.find(column);
-                        DCHECK(edgeName != tagEdgeNameIndex.end());
-                        logicalRows_.emplace_back(
-                                std::make_tuple(segment, &row, edgeName->second, &edge.getList()));
-                    }
+            for (auto& edge : cols[column].getList().values) {
+                if (!edge.isList()) {
+                    // Ignore the bad value.
+                    continue;
                 }
+                auto edgeName = dsIndex.tagEdgeNameIndices.find(column);
+                DCHECK(edgeName != dsIndex.tagEdgeNameIndices.end());
+                logicalRows_.emplace_back(LogicalRow{idx, &row, edgeName->second, &edge.getList()});
             }
         }
-        ++segment;
     }
-    iter_ = logicalRows_.begin();
-    valid_ = true;
 }
 
-StatusOr<int64_t> GetNeighborsIter::buildIndex(const std::vector<std::string>& colNames) {
-    if (colNames.size() < 3
-            || colNames[0] != "_vid"
-            || colNames[1].find("_stats") != 0
-            || colNames.back().find("_expr") != 0) {
+bool checkColumnNames(const std::vector<std::string>& colNames) {
+    return colNames.size() < 3 || colNames[0] != nebula::kVid || colNames[1].find("_stats") != 0 ||
+           colNames.back().find("_expr") != 0;
+}
+
+StatusOr<int64_t> GetNeighborsIter::buildIndex(DataSetIndex* dsIndex) {
+    auto& colNames = dsIndex->ds->colNames;
+    if (UNLIKELY(checkColumnNames(colNames))) {
         return Status::Error("Bad column names.");
     }
-    Status status;
-    std::unordered_map<std::string, size_t> colIndex;
-    TagEdgeNameIdxMap tagEdgeNameIndex;
     int64_t edgeStartIndex = -1;
-    tagPropIndices_.emplace_back();
-    edgePropIndices_.emplace_back();
-    tagPropMaps_.emplace_back();
-    edgePropMaps_.emplace_back();
     for (size_t i = 0; i < colNames.size(); ++i) {
-        colIndex.emplace(colNames[i], i);
+        dsIndex->colIndices.emplace(colNames[i], i);
         auto& colName = colNames[i];
         if (colName.find("_tag") == 0) {
-            status = buildPropIndex(colName, i, false,
-                    tagEdgeNameIndex, tagPropIndices_.back(), tagPropMaps_.back());
-            if (!status.ok()) {
-                return status;
-            }
+            NG_RETURN_IF_ERROR(buildPropIndex(colName, i, false, dsIndex));
         } else if (colName.find("_edge") == 0) {
-            status = buildPropIndex(colName, i, true,
-                    tagEdgeNameIndex, edgePropIndices_.back(), edgePropMaps_.back());
-            if (!status.ok()) {
-                return status;
-            }
+            NG_RETURN_IF_ERROR(buildPropIndex(colName, i, true, dsIndex));
             if (edgeStartIndex < 0) {
                 edgeStartIndex = i;
             }
@@ -105,50 +108,44 @@ StatusOr<int64_t> GetNeighborsIter::buildIndex(const std::vector<std::string>& c
             // It is "_vid", "_stats", "_expr" in this situation.
         }
     }
-    tagEdgeNameIndices_.emplace_back(std::move(tagEdgeNameIndex));
-    colIndices_.emplace_back(std::move(colIndex));
+
     return edgeStartIndex;
 }
 
 Status GetNeighborsIter::buildPropIndex(const std::string& props,
                                        size_t columnId,
                                        bool isEdge,
-                                       TagEdgeNameIdxMap& tagEdgeNameIndex,
-                                       TagEdgePropIdxMap& tagEdgePropIdxMap,
-                                       TagEdgePropMap& tagEdgePropMap) {
+                                       DataSetIndex* dsIndex) {
     std::vector<std::string> pieces;
     folly::split(":", props, pieces);
-    PropIdxMap kv;
-    if (pieces.size() < 2) {
+    if (UNLIKELY(pieces.size() < 2)) {
         return Status::Error("Bad column name format: %s", props.c_str());
     }
 
+    PropIndex propIdx;
     // if size == 2, it is the tag defined without props.
     if (pieces.size() > 2) {
         for (size_t i = 2; i < pieces.size(); ++i) {
-            kv.emplace(pieces[i], i - 2);
+            propIdx.propIndices.emplace(pieces[i], i - 2);
         }
     }
 
+    propIdx.colIdx = columnId;
+    propIdx.propList.resize(pieces.size() - 2);
+    std::move(pieces.begin() + 2, pieces.end(), propIdx.propList.begin());
     std::string name = pieces[1];
     if (isEdge) {
         // The first character of the tag/edge name is +/-.
         // It's not used for now.
-        if (name.find("+") != 0 && name.find("-") != 0) {
+        if (UNLIKELY(name.find("+") != 0 && name.find("-") != 0)) {
             return Status::Error("Bad edge name: %s", name.c_str());
         }
-        auto edgeName = name.substr(1, name.size());
-        tagEdgePropIdxMap.emplace(edgeName, std::make_pair(columnId, std::move(kv)));
-        pieces.erase(pieces.begin(), pieces.begin() + 2);
-        auto propList = std::make_pair(columnId, std::move(pieces));
-        tagEdgePropMap.emplace(edgeName, std::move(propList));
-        tagEdgeNameIndex.emplace(columnId, edgeName);
+        name = name.substr(1, name.size());
+        dsIndex->tagEdgeNameIndices.emplace(columnId, name);
+        dsIndex->edgePropsMap.emplace(name, std::move(propIdx));
     } else {
-        tagEdgePropIdxMap.emplace(name, std::make_pair(columnId, std::move(kv)));
-        pieces.erase(pieces.begin(), pieces.begin() + 2);
-        auto propList = std::make_pair(columnId, std::move(pieces));
-        tagEdgePropMap.emplace(name, std::move(propList));
-        tagEdgeNameIndex.emplace(columnId, name);
+        dsIndex->tagEdgeNameIndices.emplace(columnId, name);
+        dsIndex->tagPropsMap.emplace(name, std::move(propIdx));
     }
 
     return Status::OK();
@@ -159,7 +156,7 @@ const Value& GetNeighborsIter::getColumn(const std::string& col) const {
         return Value::kNullValue;
     }
     auto segment = currentSeg();
-    auto& index = colIndices_[segment];
+    auto& index = dsIndices_[segment].colIndices;
     auto found = index.find(col);
     if (found == index.end()) {
         return Value::kNullValue;
@@ -174,15 +171,16 @@ const Value& GetNeighborsIter::getTagProp(const std::string& tag,
     }
 
     auto segment = currentSeg();
-    auto index = tagPropIndices_[segment].find(tag);
-    if (index == tagPropIndices_[segment].end()) {
+    auto &tagPropIndices = dsIndices_[segment].tagPropsMap;
+    auto index = tagPropIndices.find(tag);
+    if (index == tagPropIndices.end()) {
         return Value::kNullValue;
     }
-    auto propIndex = index->second.second.find(prop);
-    if (propIndex == index->second.second.end()) {
+    auto propIndex = index->second.propIndices.find(prop);
+    if (propIndex == index->second.propIndices.end()) {
         return Value::kNullValue;
     }
-    auto colId = index->second.first;
+    auto colId = index->second.colIdx;
     auto& row = *this->row();
     DCHECK_GT(row.size(), colId);
     if (!row[colId].isList()) {
@@ -204,13 +202,13 @@ const Value& GetNeighborsIter::getEdgeProp(const std::string& edge,
         return Value::kNullValue;
     }
     auto segment = currentSeg();
-    auto index = edgePropIndices_[segment].find(currentEdge);
-    if (index == edgePropIndices_[segment].end()) {
+    auto index = dsIndices_[segment].edgePropsMap.find(currentEdge);
+    if (index == dsIndices_[segment].edgePropsMap.end()) {
         VLOG(1) << "No edge found: " << edge;
         return Value::kNullValue;
     }
-    auto propIndex = index->second.second.find(prop);
-    if (propIndex == index->second.second.end()) {
+    auto propIndex = index->second.propIndices.find(prop);
+    if (propIndex == index->second.propIndices.end()) {
         VLOG(1) << "No edge prop found: " << prop;
         return Value::kNullValue;
     }
@@ -224,17 +222,17 @@ Value GetNeighborsIter::getVertex() const {
     }
 
     auto segment = currentSeg();
-    auto vidVal = getColumn("_vid");
+    auto vidVal = getColumn(nebula::kVid);
     if (!vidVal.isStr()) {
         return Value::kNullBadType;
     }
     Vertex vertex;
     vertex.vid = vidVal.getStr();
-    auto& tagPropMap = tagPropMaps_[segment];
+    auto& tagPropMap = dsIndices_[segment].tagPropsMap;
     for (auto& tagProp : tagPropMap) {
         auto& row = *this->row();
-        auto& tagPropNameList = tagProp.second.second;
-        auto tagColId = tagProp.second.first;
+        auto& tagPropNameList = tagProp.second.propList;
+        auto tagColId = tagProp.second.colIdx;
         if (!row[tagColId].isList()) {
             // Ignore the bad value.
             continue;
@@ -280,12 +278,12 @@ Value GetNeighborsIter::getEdge() const {
     edge.ranking = rank.getInt();
     edge.type = 0;
 
-    auto& edgePropMap = edgePropMaps_[segment];
+    auto& edgePropMap = dsIndices_[segment].edgePropsMap;
     auto edgeProp = edgePropMap.find(edgeName);
     if (edgeProp == edgePropMap.end()) {
         return Value::kNullValue;
     }
-    auto& edgeNamePropList = edgeProp->second.second;
+    auto& edgeNamePropList = edgeProp->second.propList;
     auto& propList = currentEdgeProps()->values;
     DCHECK_EQ(edgeNamePropList.size(), propList.size());
     for (size_t i = 0; i < propList.size(); ++i) {
diff --git a/src/context/Iterator.h b/src/context/Iterator.h
index d943135730ad64ef88efdf0d51977b21974ded5d..9387484610c7541eb59b64f98bfed73bd01d9c50 100644
--- a/src/context/Iterator.h
+++ b/src/context/Iterator.h
@@ -9,6 +9,8 @@
 
 #include <memory>
 
+#include <gtest/gtest_prod.h>
+
 #include "common/datatypes/Value.h"
 #include "common/datatypes/List.h"
 #include "common/datatypes/DataSet.h"
@@ -219,8 +221,7 @@ public:
     }
 
     const Row* row() const override {
-        auto& current = *iter_;
-        return std::get<1>(current);
+        return iter_->row;
     }
 
 private:
@@ -230,81 +231,64 @@ private:
 
     void clear() {
         valid_ = false;
-        colIndices_.clear();
-        tagEdgeNameIndices_.clear();
-        tagPropIndices_.clear();
-        edgePropIndices_.clear();
-        tagPropMaps_.clear();
-        edgePropMaps_.clear();
-        segments_.clear();
+        dsIndices_.clear();
         logicalRows_.clear();
     }
 
-    // Maps the origin column names with its column index, each response
-    // has a segment.
-    // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 |
-    // -> {_vid : 0, _stats : 1, _tag:t1:p1:p2 : 2, _edge:d1:p1:p2 : 3}
-    using ColumnIndex = std::vector<std::unordered_map<std::string, size_t>>;
-    // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 |
-    // -> {t1 : 2, e1 : 3}
-    using TagEdgeNameIdxMap = std::unordered_map<size_t, std::string>;
-    using TagEdgeNameIndex = std::vector<TagEdgeNameIdxMap>;
-
-    // _tag:t1:p1:p2  ->  {t1 : {p1 : 0, p2 : 1}}
-    // _edge:e1:p1:p2  ->  {e1 : {p1 : 0, p2 : 1}}
-    using PropIdxMap = std::unordered_map<std::string, size_t>;
-    // {tag/edge name : [column_idx, PropIdxMap]}
-    using TagEdgePropIdxMap = std::unordered_map<std::string, std::pair<size_t, PropIdxMap>>;
-    // Maps the property name with its index, each response has a segment
-    // in PropIndex.
-    using PropIndex = std::vector<TagEdgePropIdxMap>;
-
-    // LogicalRow: <segment_id, row, edge_name, edge_props>
-    using LogicalRow = std::tuple<size_t, const Row*, std::string, const List*>;
-
-    using PropList = std::vector<std::string>;
-    // _tag:t1:p1:p2  ->  {t1 : [column_idx, {p1, p2}]}
-    // _edge:e1:p1:p2  ->  {e1 : [columns_idx, {p1, p2}]}
-    using TagEdgePropMap = std::unordered_map<std::string, std::pair<size_t, PropList>>;
-    // Maps the tag/edge with its properties, each response has a segment
-    // in PropMaps
-    using PropMaps = std::vector<TagEdgePropMap>;
-
     inline size_t currentSeg() const {
-        auto& current = *iter_;
-        return std::get<0>(current);
+        return iter_->dsIdx;
     }
 
     inline const std::string& currentEdgeName() const {
-        auto& current = *iter_;
-        return std::get<2>(current);
+        return iter_->edgeName;
     }
 
     inline const List* currentEdgeProps() const {
-        auto& current = *iter_;
-        return std::get<3>(current);
+        return iter_->edgeProps;
     }
 
-    StatusOr<int64_t> buildIndex(const std::vector<std::string>& colNames);
+    struct PropIndex {
+        size_t colIdx;
+        std::vector<std::string> propList;
+        std::unordered_map<std::string, size_t> propIndices;
+    };
+
+    struct DataSetIndex {
+        const DataSet* ds;
+        // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 |
+        // -> {_vid : 0, _stats : 1, _tag:t1:p1:p2 : 2, _edge:d1:p1:p2 : 3}
+        std::unordered_map<std::string, size_t> colIndices;
+        // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 |
+        // -> {2 : t1, 3 : e1}
+        std::unordered_map<size_t, std::string> tagEdgeNameIndices;
+        // _tag:t1:p1:p2  ->  {t1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]}
+        std::unordered_map<std::string, PropIndex> tagPropsMap;
+        // _edge:e1:p1:p2  ->  {e1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]}
+        std::unordered_map<std::string, PropIndex> edgePropsMap;
+    };
 
+    struct LogicalRow {
+        size_t dsIdx;
+        const Row* row;
+        std::string edgeName;
+        const List* edgeProps;
+    };
+
+    StatusOr<int64_t> buildIndex(DataSetIndex* dsIndex);
     Status buildPropIndex(const std::string& props,
                           size_t columnId,
                           bool isEdge,
-                          TagEdgeNameIdxMap& tagEdgeNameIndex,
-                          TagEdgePropIdxMap& tagEdgePropIdxMap,
-                          TagEdgePropMap& tagEdgePropMap);
-
-    friend class IteratorTest_TestHead_Test;
-    bool                                    valid_{false};
-    ColumnIndex                             colIndices_;
-    TagEdgeNameIndex                        tagEdgeNameIndices_;
-    PropIndex                               tagPropIndices_;
-    PropIndex                               edgePropIndices_;
-    PropMaps                                tagPropMaps_;
-    PropMaps                                edgePropMaps_;
-    std::vector<const DataSet*>             segments_;
-    std::vector<LogicalRow>                 logicalRows_;
-    std::vector<LogicalRow>::iterator       iter_;
+                          DataSetIndex* dsIndex);
+    Status processList(std::shared_ptr<Value> value);
+    StatusOr<DataSetIndex> makeDataSetIndex(const DataSet& ds, size_t idx);
+    void makeLogicalRowByEdge(int64_t edgeStartIndex, size_t idx, const DataSetIndex& dsIndex);
+
+    FRIEND_TEST(IteratorTest, TestHead);
+
+    bool valid_{false};
+    std::vector<LogicalRow> logicalRows_;
+    std::vector<LogicalRow>::iterator iter_;
+    std::vector<DataSetIndex> dsIndices_;
 };
 
 class SequentialIter final : public Iterator {