diff --git a/src/context/ExecutionContext.cpp b/src/context/ExecutionContext.cpp index 47b4f40c1609b94ade89256bf470d5c6746882ae..c8efef02407f40cb366572665498ec762bd67562 100644 --- a/src/context/ExecutionContext.cpp +++ b/src/context/ExecutionContext.cpp @@ -13,29 +13,24 @@ const ExecResult ExecResult::kEmptyResult = ExecResult(); const std::vector<ExecResult> ExecResult::kEmptyResultList; void ExecutionContext::setValue(const std::string& name, Value&& val) { - auto& hist = valueMap_[name]; - hist.emplace_back(ExecResult::buildDefault(std::move(val))); + setResult(name, ExecResult::buildDefault(std::move(val))); } - void ExecutionContext::setResult(const std::string& name, ExecResult&& result) { auto& hist = valueMap_[name]; hist.emplace_back(std::move(result)); } - void ExecutionContext::deleteValue(const std::string& name) { valueMap_.erase(name); } - size_t ExecutionContext::numVersions(const std::string& name) const { auto it = valueMap_.find(name); CHECK(it != valueMap_.end()); return it->second.size(); } - // Only keep the last several versoins of the Value void ExecutionContext::truncHistory(const std::string& name, size_t numVersionsToKeep) { auto it = valueMap_.find(name); @@ -48,18 +43,11 @@ void ExecutionContext::truncHistory(const std::string& name, size_t numVersionsT } } - // Get the latest version of the value const Value& ExecutionContext::getValue(const std::string& name) const { - auto it = valueMap_.find(name); - if (it != valueMap_.end()) { - return it->second.back().value(); - } else { - return Value::kEmpty; - } + return getResult(name).value(); } - Value ExecutionContext::moveValue(const std::string& name) { auto it = valueMap_.find(name); if (it != valueMap_.end() && !it->second.empty()) { @@ -69,7 +57,6 @@ Value ExecutionContext::moveValue(const std::string& name) { } } - const ExecResult& ExecutionContext::getResult(const std::string& name) const { auto it = valueMap_.find(name); if (it != valueMap_.end()) { @@ -79,7 +66,6 @@ const ExecResult& ExecutionContext::getResult(const std::string& name) const { } } - const std::vector<ExecResult>& ExecutionContext::getHistory(const std::string& name) const { auto it = valueMap_.find(name); if (it != valueMap_.end()) { @@ -88,5 +74,5 @@ const std::vector<ExecResult>& ExecutionContext::getHistory(const std::string& n return ExecResult::kEmptyResultList; } } -} // namespace graph -} // namespace nebula +} // namespace graph +} // namespace nebula diff --git a/src/context/ExecutionContext.h b/src/context/ExecutionContext.h index 9b2ecb33cc7b7cd57974f58fabe5b8c9c6588ba4..a009bcabbd82e00fd151e9f38cb88c7614167816 100644 --- a/src/context/ExecutionContext.h +++ b/src/context/ExecutionContext.h @@ -56,19 +56,23 @@ public: static const ExecResult kEmptyResult; static const std::vector<ExecResult> kEmptyResultList; + static ExecResult buildDefault(std::shared_ptr<Value> val) { + return ExecResult(val); + } + static ExecResult buildDefault(Value&& val) { - return ExecResult(std::move(val)); + return ExecResult(std::make_shared<Value>(std::move(val))); } static ExecResult buildGetNeighbors(Value&& val, State&& stat) { - ExecResult result(std::move(val), std::move(stat)); + ExecResult result(std::make_shared<Value>(std::move(val)), std::move(stat)); auto iter = std::make_unique<GetNeighborsIter>(result.valuePtr()); result.setIter(std::move(iter)); return result; } static ExecResult buildSequential(Value&& val, State&& stat) { - ExecResult result(std::move(val), std::move(stat)); + ExecResult result(std::make_shared<Value>(std::move(val)), std::move(stat)); auto iter = std::make_unique<SequentialIter>(result.valuePtr()); result.setIter(std::move(iter)); return result; @@ -104,13 +108,12 @@ private: state_(State::Stat::kUnExecuted, ""), iter_(std::make_unique<DefaultIter>(value_)) {} - explicit ExecResult(Value&& val) - : value_(std::make_shared<Value>(std::move(val))), + explicit ExecResult(std::shared_ptr<Value> val) + : value_(val), state_(State::Stat::kSuccess, ""), iter_(std::make_unique<DefaultIter>(value_)) {} - ExecResult(Value&& val, State stat) - : value_(std::make_shared<Value>(std::move(val))), state_(stat) {} + ExecResult(std::shared_ptr<Value> val, State stat) : value_(val), state_(stat) {} private: std::shared_ptr<Value> value_; diff --git a/src/context/Iterator.cpp b/src/context/Iterator.cpp index 7d520b8f9ee2cd6a5e9516421252f60bf0cfe498..5a7345cc34fef24babc03138484996047f42a637 100644 --- a/src/context/Iterator.cpp +++ b/src/context/Iterator.cpp @@ -157,8 +157,7 @@ const Value& GetNeighborsIter::getColumn(const std::string& col) const { if (found == index.end()) { return Value::kNullValue; } - auto row = currentRow(); - return row->values[found->second]; + return row()->values[found->second]; } const Value& GetNeighborsIter::getTagProp(const std::string& tag, @@ -177,7 +176,7 @@ const Value& GetNeighborsIter::getTagProp(const std::string& tag, return Value::kNullValue; } auto colId = index->second.first; - auto& row = *currentRow(); + auto& row = *this->row(); DCHECK_GT(row.size(), colId); if (!row[colId].isList()) { return Value::kNullBadType; @@ -226,7 +225,7 @@ Value GetNeighborsIter::getVertex() const { vertex.vid = vidVal.getStr(); auto& tagPropMap = tagPropMaps_[segment]; for (auto& tagProp : tagPropMap) { - auto& row = *currentRow(); + auto& row = *this->row(); auto& tagPropNameList = tagProp.second.second; auto tagColId = tagProp.second.first; if (!row[tagColId].isList()) { diff --git a/src/context/Iterator.h b/src/context/Iterator.h index d270dc94663633cd444627142881c0f8933007e2..d943135730ad64ef88efdf0d51977b21974ded5d 100644 --- a/src/context/Iterator.h +++ b/src/context/Iterator.h @@ -22,6 +22,7 @@ public: kDefault, kGetNeighbors, kSequential, + kUnion, }; explicit Iterator(std::shared_ptr<Value> value, Kind kind) @@ -29,6 +30,10 @@ public: virtual ~Iterator() = default; + Kind kind() const { + return kind_; + } + virtual std::unique_ptr<Iterator> copy() const = 0; virtual bool valid() const = 0; @@ -37,6 +42,8 @@ public: virtual void erase() = 0; + virtual const Row* row() const = 0; + // Reset iterator position to `pos' from begin. Must be sure that the `pos' position // is lower than `size()' before resetting void reset(size_t pos = 0) { @@ -48,6 +55,10 @@ public: next(); } + virtual std::shared_ptr<Value> valuePtr() const { + return value_; + } + virtual const Value& value() const { return *value_; } @@ -63,10 +74,7 @@ public: } // The derived class should rewrite get prop if the Value is kind of dataset. - virtual const Value& getColumn(const std::string& col) const { - UNUSED(col); - return Value::kEmpty; - } + virtual const Value& getColumn(const std::string& col) const = 0; virtual const Value& getTagProp(const std::string& tag, const std::string& prop) const { @@ -94,7 +102,6 @@ protected: virtual void doReset(size_t pos) = 0; std::shared_ptr<Value> value_; - Kind kind_; }; @@ -123,6 +130,14 @@ public: return 1; } + const Value& getColumn(const std::string& /* col */) const override { + return Value::kEmpty; + } + + const Row* row() const override { + return nullptr; + } + private: void doReset(size_t pos) override { counter_ = pos; @@ -203,6 +218,11 @@ public: return edges; } + const Row* row() const override { + auto& current = *iter_; + return std::get<1>(current); + } + private: void doReset(size_t pos) override { iter_ = logicalRows_.begin() + pos; @@ -255,11 +275,6 @@ private: return std::get<0>(current); } - inline const Row* currentRow() const { - auto& current = *iter_; - return std::get<1>(current); - } - inline const std::string& currentEdgeName() const { auto& current = *iter_; return std::get<2>(current); @@ -318,10 +333,9 @@ public: } void next() override { - if (!valid()) { - return; + if (valid()) { + ++iter_; } - ++iter_; } void erase() override { @@ -346,6 +360,10 @@ public: } } + const Row* row() const override { + return *iter_; + } + private: void doReset(size_t pos) override { iter_ = rows_.begin() + pos; @@ -356,6 +374,96 @@ private: std::unordered_map<std::string, int64_t> colIndex_; }; +class UnionIterator final : public Iterator { +public: + UnionIterator(std::unique_ptr<Iterator> left, std::unique_ptr<Iterator> right) + : Iterator(left->valuePtr(), Kind::kUnion), + left_(std::move(left)), + right_(std::move(right)) {} + + std::unique_ptr<Iterator> copy() const override { + auto iter = std::make_unique<UnionIterator>(left_->copy(), right_->copy()); + iter->reset(); + return iter; + } + + bool valid() const override { + return left_->valid() || right_->valid(); + } + + void next() override { + if (left_->valid()) { + left_->next(); + } else { + if (right_->valid()) { + right_->next(); + } + } + } + + size_t size() const override { + return left_->size() + right_->size(); + } + + void erase() override { + if (left_->valid()) { + left_->erase(); + } else { + if (right_->valid()) { + right_->erase(); + } + } + } + + const Value& getColumn(const std::string& col) const override { + if (left_->valid()) { + return left_->getColumn(col); + } + if (right_->valid()) { + return right_->getColumn(col); + } + return Value::kEmpty; + } + + const Row* row() const override { + if (left_->valid()) return left_->row(); + if (right_->valid()) return right_->row(); + return nullptr; + } + +private: + void doReset(size_t poc) override { + if (poc < left_->size()) { + left_->reset(poc); + right_->reset(); + } else { + right_->reset(poc - left_->size()); + } + } + + std::unique_ptr<Iterator> left_; + std::unique_ptr<Iterator> right_; +}; + } // namespace graph } // namespace nebula + +namespace std { + +template <> +struct equal_to<const nebula::Row*> { + bool operator()(const nebula::Row* lhs, const nebula::Row* rhs) const { + return *lhs == *rhs; + } +}; + +template <> +struct hash<const nebula::Row*> { + size_t operator()(const nebula::Row* row) const { + return hash<nebula::Row>()(*row); + } +}; + +} // namespace std + #endif // CONTEXT_ITERATOR_H_ diff --git a/src/context/test/IteratorTest.cpp b/src/context/test/IteratorTest.cpp index 7f4e607a15306a0e3681e9eda04c8446249686ac..ca03300caf8028d8088941917750971b9310f65e 100644 --- a/src/context/test/IteratorTest.cpp +++ b/src/context/test/IteratorTest.cpp @@ -11,6 +11,7 @@ namespace nebula { namespace graph { + TEST(IteratorTest, Default) { auto constant = std::make_shared<Value>(1); DefaultIter iter(constant); @@ -545,6 +546,60 @@ TEST(IteratorTest, TestHead) { } } +TEST(IteratorTest, TestUnionIterator) { + std::vector<std::string> colNames = {"col1", "col2"}; + DataSet lds; + lds.colNames = colNames; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + auto lIter = std::make_unique<SequentialIter>(std::make_shared<Value>(lds)); + + DataSet rds; + rds.colNames = colNames; + rds.rows = { + Row({Value(3), Value("row3")}), + }; + auto rIter = std::make_unique<SequentialIter>(std::make_shared<Value>(rds)); + + // next and valid + { + auto uIter = std::make_unique<UnionIterator>(lIter->copy(), rIter->copy()); + for (; lIter->valid(); lIter->next()) { + EXPECT_TRUE(uIter->valid()); + for (auto &col : colNames) { + EXPECT_EQ(lIter->getColumn(col), uIter->getColumn(col)); + } + uIter->next(); + } + + for (; rIter->valid(); rIter->next()) { + EXPECT_TRUE(uIter->valid()); + for (auto &col : colNames) { + EXPECT_EQ(rIter->getColumn(col), uIter->getColumn(col)); + } + uIter->next(); + } + } + + // erase + { + auto uIter = std::make_unique<UnionIterator>(lIter->copy(), rIter->copy()); + for (; lIter->valid(); lIter->next()) { + uIter->erase(); + } + + for (; rIter->valid(); rIter->next()) { + EXPECT_TRUE(uIter->valid()); + for (auto &col : colNames) { + EXPECT_EQ(rIter->getColumn(col), uIter->getColumn(col)); + } + uIter->next(); + } + } +} + } // namespace graph } // namespace nebula diff --git a/src/exec/CMakeLists.txt b/src/exec/CMakeLists.txt index ccd8a4e87e4a423587537dd6b7ad1eb7e5a5e500..c9af0007908d2361931cff03fe2b0cd4d5d3af2a 100644 --- a/src/exec/CMakeLists.txt +++ b/src/exec/CMakeLists.txt @@ -22,6 +22,7 @@ nebula_add_library( query/ProjectExecutor.cpp query/SortExecutor.cpp query/ReadIndexExecutor.cpp + query/SetExecutor.cpp query/UnionExecutor.cpp query/DataCollectExecutor.cpp admin/SwitchSpaceExecutor.cpp diff --git a/src/exec/Executor.cpp b/src/exec/Executor.cpp index 3f7ccb532aa522222ed73f0a8386c20893d733a3..61c1479850f475ad3e4734cbeafdab3205b6ed29 100644 --- a/src/exec/Executor.cpp +++ b/src/exec/Executor.cpp @@ -9,6 +9,7 @@ #include <folly/String.h> #include <folly/executors/InlineExecutor.h> +#include "context/ExecutionContext.h" #include "context/QueryContext.h" #include "exec/ExecutionError.h" #include "exec/admin/CreateSpaceExecutor.h" @@ -256,16 +257,12 @@ Executor *Executor::makeExecutor(const PlanNode *node, return qctx->objPool()->add(exec); } -int64_t Executor::id() const { - return node()->id(); -} - Executor::Executor(const std::string &name, const PlanNode *node, QueryContext *qctx) - : name_(name), node_(node), qctx_(qctx) { - DCHECK(!!node_); - DCHECK(!!qctx_); - - ectx_ = qctx_->ectx(); + : id_(DCHECK_NOTNULL(node)->id()), + name_(name), + node_(DCHECK_NOTNULL(node)), + qctx_(DCHECK_NOTNULL(qctx)) { + ectx_ = qctx->ectx(); // Initialize the position in ExecutionContext for each executor before execution plan // starting to run. This will avoid lock something for thread safety in real execution if (!ectx_->exist(node->varName())) { diff --git a/src/exec/Executor.h b/src/exec/Executor.h index 05d8303fb01d821abd0f160f30a8099fec59a433..cfa84c3f6c3365b2893ba88cc9bb9fc1d7414a0e 100644 --- a/src/exec/Executor.h +++ b/src/exec/Executor.h @@ -11,6 +11,7 @@ #include <set> #include <string> #include <vector> + #include <folly/futures/Future.h> #include "common/base/Status.h" @@ -32,14 +33,18 @@ public: virtual ~Executor() {} - // Implementation interface of operation logic + // Each executor inherited from this class should get input values from ExecutionContext, + // execute expression evaluation and save output result back to ExecutionContext after + // computation virtual folly::Future<Status> execute() = 0; QueryContext *qctx() const { return qctx_; } - int64_t id() const; + int64_t id() const { + return id_; + } const std::string &name() const { return name_; @@ -89,9 +94,11 @@ protected: Status finish(nebula::Value &&value); Status finish(ExecResult &&result); - // Dump some execution logging messages + // Dump some execution logging messages, only for debugging + // TODO(yee): Remove it after implementing profile function void dumpLog() const; + int64_t id_; // Executor name std::string name_; diff --git a/src/exec/query/IntersectExecutor.cpp b/src/exec/query/IntersectExecutor.cpp index ae894708b64425684f30b00b38774b6946b0b18c..50b23e92d41b084fb46ed19aa4a75aa8bb70b981 100644 --- a/src/exec/query/IntersectExecutor.cpp +++ b/src/exec/query/IntersectExecutor.cpp @@ -6,15 +6,50 @@ #include "exec/query/IntersectExecutor.h" +#include <unordered_set> + #include "planner/PlanNode.h" +#include "planner/Query.h" namespace nebula { namespace graph { folly::Future<Status> IntersectExecutor::execute() { dumpLog(); - // TODO(yee): - return start(); + NG_RETURN_IF_ERROR(checkInputDataSets()); + + auto lIter = getLeftInputDataIter(); + auto rIter = getRightInputDataIter(); + + std::unordered_set<const Row *> hashSet; + for (; rIter->valid(); rIter->next()) { + auto res = hashSet.insert(rIter->row()); + if (UNLIKELY(!res.second)) { + LOG(ERROR) << "Fail to insert row into hash table in intersect executor, row: " + << *rIter->row(); + } + } + + if (hashSet.empty()) { + auto value = lIter->valuePtr(); + DataSet ds; + ds.colNames = value->getDataSet().colNames; + return finish(ExecResult::buildSequential(Value(std::move(ds)), State())); + } + + while (lIter->valid()) { + auto iter = hashSet.find(lIter->row()); + if (iter == hashSet.end()) { + lIter->erase(); + } else { + lIter->next(); + } + } + + auto result = ExecResult::buildDefault(lIter->valuePtr()); + result.setIter(std::move(lIter)); + + return finish(std::move(result)); } } // namespace graph diff --git a/src/exec/query/IntersectExecutor.h b/src/exec/query/IntersectExecutor.h index ca43d57c190a6204ac694e0080df3ca5c35985da..ed8d73b8293bf1feb2399a1554aafd61274ae4f8 100644 --- a/src/exec/query/IntersectExecutor.h +++ b/src/exec/query/IntersectExecutor.h @@ -7,15 +7,15 @@ #ifndef EXEC_QUERY_INTERSECTEXECUTOR_H_ #define EXEC_QUERY_INTERSECTEXECUTOR_H_ -#include "exec/Executor.h" +#include "exec/query/SetExecutor.h" namespace nebula { namespace graph { -class IntersectExecutor : public Executor { +class IntersectExecutor : public SetExecutor { public: IntersectExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("IntersectExecutor", node, qctx) {} + : SetExecutor("IntersectExecutor", node, qctx) {} folly::Future<Status> execute() override; }; diff --git a/src/exec/query/MinusExecutor.cpp b/src/exec/query/MinusExecutor.cpp index 4dd13fc737bbde42b77f126896c1aa7c4407d8b5..a860d6b6e0d4f7b7dc791f3fd2655da0c9a6a017 100644 --- a/src/exec/query/MinusExecutor.cpp +++ b/src/exec/query/MinusExecutor.cpp @@ -6,7 +6,9 @@ #include "exec/query/MinusExecutor.h" -#include "planner/PlanNode.h" +#include <unordered_set> + +#include "planner/Query.h" namespace nebula { namespace graph { @@ -14,8 +16,35 @@ namespace graph { folly::Future<Status> MinusExecutor::execute() { dumpLog(); - // TODO(yee): - return start(); + NG_RETURN_IF_ERROR(checkInputDataSets()); + + auto lIter = getLeftInputDataIter(); + auto rIter = getRightInputDataIter(); + + std::unordered_set<const Row *> hashSet; + for (; rIter->valid(); rIter->next()) { + auto iter = hashSet.insert(rIter->row()); + if (UNLIKELY(!iter.second)) { + LOG(ERROR) << "Fail to insert row into hash table in minus executor, row: " + << *rIter->row(); + } + } + + if (!hashSet.empty()) { + while (lIter->valid()) { + auto iter = hashSet.find(lIter->row()); + if (iter == hashSet.end()) { + lIter->next(); + } else { + lIter->erase(); + } + } + } + + auto result = ExecResult::buildDefault(lIter->valuePtr()); + result.setIter(std::move(lIter)); + + return finish(std::move(result)); } } // namespace graph diff --git a/src/exec/query/MinusExecutor.h b/src/exec/query/MinusExecutor.h index 58e253f6561920c2e01f44bca8a9fac242b56ef2..9b30caded275a309fd8574942af1e6a8a7dc5dfd 100644 --- a/src/exec/query/MinusExecutor.h +++ b/src/exec/query/MinusExecutor.h @@ -7,15 +7,15 @@ #ifndef EXEC_QUERY_MINUSEXECUTOR_H_ #define EXEC_QUERY_MINUSEXECUTOR_H_ -#include "exec/Executor.h" +#include "exec/query/SetExecutor.h" namespace nebula { namespace graph { -class MinusExecutor : public Executor { +class MinusExecutor : public SetExecutor { public: MinusExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("MinusExecutor", node, qctx) {} + : SetExecutor("MinusExecutor", node, qctx) {} folly::Future<Status> execute() override; }; diff --git a/src/exec/query/SetExecutor.cpp b/src/exec/query/SetExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..12e43a9e448cffe33db9ecba08e068cc8c6c3d10 --- /dev/null +++ b/src/exec/query/SetExecutor.cpp @@ -0,0 +1,59 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/query/SetExecutor.h" + +#include <sstream> + +#include "common/datatypes/DataSet.h" +#include "context/ExecutionContext.h" +#include "planner/Query.h" + +namespace nebula { +namespace graph { + +Status SetExecutor::checkInputDataSets() { + auto setNode = asNode<SetOp>(node()); + auto& lResult = ectx_->getResult(setNode->leftInputVar()); + auto& rResult = ectx_->getResult(setNode->rightInputVar()); + + auto& leftData = lResult.iter()->value(); + auto& rightData = rResult.iter()->value(); + + if (UNLIKELY(!leftData.isDataSet() || !rightData.isDataSet())) { + std::stringstream ss; + ss << "Invalid data types of dependencies: " << leftData.type() << " vs. " + << rightData.type() << "."; + return Status::Error(ss.str()); + } + + auto lds = leftData.getDataSet(); + auto rds = rightData.getDataSet(); + + if (LIKELY(lds.colNames == rds.colNames)) { + return Status::OK(); + } + + auto lcols = folly::join(",", lds.colNames); + auto rcols = folly::join(",", rds.colNames); + return Status::Error( + "Data sets have different columns: <%s> vs. <%s>", lcols.c_str(), rcols.c_str()); +} + +std::unique_ptr<Iterator> SetExecutor::getLeftInputDataIter() const { + auto left = asNode<SetOp>(node())->leftInputVar(); + auto& result = ectx_->getResult(left); + return result.iter(); +} + +std::unique_ptr<Iterator> SetExecutor::getRightInputDataIter() const { + auto right = asNode<SetOp>(node())->rightInputVar(); + auto& result = ectx_->getResult(right); + return result.iter(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/exec/query/SetExecutor.h b/src/exec/query/SetExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..afd6469824d8de83ca532aef79616013b3f65fba --- /dev/null +++ b/src/exec/query/SetExecutor.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXEC_QUERY_SETEXECUTOR_H_ +#define EXEC_QUERY_SETEXECUTOR_H_ + +#include <memory> + +#include "exec/Executor.h" + +namespace nebula { + +class Iterator; + +namespace graph { + +class SetExecutor : public Executor { +public: + Status checkInputDataSets(); + std::unique_ptr<Iterator> getLeftInputDataIter() const; + std::unique_ptr<Iterator> getRightInputDataIter() const; + +protected: + SetExecutor(const std::string &name, const PlanNode *node, QueryContext *qctx) + : Executor(name, node, qctx) {} +}; + +} // namespace graph +} // namespace nebula + +#endif // EXEC_QUERY_SETEXECUTOR_H_ diff --git a/src/exec/query/UnionExecutor.cpp b/src/exec/query/UnionExecutor.cpp index 3f8eedacdbe75de16afcdc6474baf7848e95b5e4..ec73624a23373ed4b9c6446caaf43c5158cbcc8c 100644 --- a/src/exec/query/UnionExecutor.cpp +++ b/src/exec/query/UnionExecutor.cpp @@ -6,15 +6,20 @@ #include "exec/query/UnionExecutor.h" -#include "planner/PlanNode.h" +#include "context/ExecutionContext.h" namespace nebula { namespace graph { folly::Future<Status> UnionExecutor::execute() { dumpLog(); - // TODO(yee): implement union results of left and right inputs - return start(); + NG_RETURN_IF_ERROR(checkInputDataSets()); + auto left = getLeftInputDataIter(); + auto right = getRightInputDataIter(); + auto result = ExecResult::buildDefault(left->valuePtr()); + auto iter = std::make_unique<UnionIterator>(std::move(left), std::move(right)); + result.setIter(std::move(iter)); + return finish(std::move(result)); } } // namespace graph diff --git a/src/exec/query/UnionExecutor.h b/src/exec/query/UnionExecutor.h index 14ccdb145160c08792bd7b669d751326771238df..239d8a1fd703fdd3ad7f530fccb5372e31479fa0 100644 --- a/src/exec/query/UnionExecutor.h +++ b/src/exec/query/UnionExecutor.h @@ -7,15 +7,15 @@ #ifndef EXEC_QUERY_UNIONEXECUTOR_H_ #define EXEC_QUERY_UNIONEXECUTOR_H_ -#include "exec/Executor.h" +#include "exec/query/SetExecutor.h" namespace nebula { namespace graph { -class UnionExecutor : public Executor { +class UnionExecutor : public SetExecutor { public: UnionExecutor(const PlanNode *node, QueryContext *qctx) - : Executor("UnionExecutor", node, qctx) {} + : SetExecutor("UnionExecutor", node, qctx) {} folly::Future<Status> execute() override; }; diff --git a/src/exec/query/test/CMakeLists.txt b/src/exec/query/test/CMakeLists.txt index 4133462987d449e24c0a6b3d55fbbdd538b78352..724858880aa6e591ad1df83d33ae1291e26ffe01 100644 --- a/src/exec/query/test/CMakeLists.txt +++ b/src/exec/query/test/CMakeLists.txt @@ -46,6 +46,7 @@ nebula_add_test( ProjectTest.cpp GetNeighborsTest.cpp DataCollectTest.cpp + SetExecutorTest.cpp OBJECTS ${EXEC_QUERY_TEST_LIBS} LIBRARIES diff --git a/src/exec/query/test/SetExecutorTest.cpp b/src/exec/query/test/SetExecutorTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..66fe312ecb3f10aa610c4f26e8523b196f104d3a --- /dev/null +++ b/src/exec/query/test/SetExecutorTest.cpp @@ -0,0 +1,439 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/query/UnionExecutor.h" + +#include <memory> + +#include <folly/String.h> +#include <gtest/gtest.h> + +#include "context/ExecutionContext.h" +#include "context/QueryContext.h" +#include "planner/Query.h" + +using folly::stringPrintf; + +namespace nebula { +namespace graph { + +class SetExecutorTest : public ::testing::Test { +public: + void SetUp() override { + qctx_ = std::make_unique<QueryContext>(); + plan_ = qctx_->plan(); + } + + static bool diffDataSet(const DataSet& lhs, const DataSet& rhs) { + if (lhs.colNames != rhs.colNames) return false; + if (lhs.rows.size() != rhs.rows.size()) return false; + + auto comp = [](const Row& l, const Row& r) -> bool { + for (size_t i = 0; i < l.values.size(); ++i) { + if (!(l.values[i] < r.values[i])) return false; + } + return true; + }; + + // Following sort will change the input data sets, so make the copies + auto l = lhs; + auto r = rhs; + std::sort(l.rows.begin(), l.rows.end(), comp); + std::sort(r.rows.begin(), r.rows.end(), comp); + return l.rows == r.rows; + } + +protected: + std::unique_ptr<QueryContext> qctx_; + ExecutionPlan* plan_; +}; + +TEST_F(SetExecutorTest, TestUnionAll) { + auto testUnion = [this](const DataSet& lds, const DataSet& rds, const DataSet& expected) { + auto left = StartNode::make(plan_); + auto right = StartNode::make(plan_); + auto unionNode = Union::make(plan_, left, right); + unionNode->setLeftVar(left->varName()); + unionNode->setRightVar(right->varName()); + + auto unionExecutor = Executor::makeExecutor(unionNode, qctx_.get()); + // Must save the values after constructing executors + qctx_->ectx()->setResult(left->varName(), ExecResult::buildSequential(Value(lds), State())); + qctx_->ectx()->setResult(right->varName(), + ExecResult::buildSequential(Value(rds), State())); + auto future = unionExecutor->execute(); + EXPECT_TRUE(std::move(future).get().ok()); + + auto& result = qctx_->ectx()->getResult(unionNode->varName()); + EXPECT_TRUE(result.value().isDataSet()); + + DataSet resultDS; + resultDS.colNames = result.value().getDataSet().colNames; + for (auto iter = result.iter(); iter->valid(); iter->next()) { + Row row; + for (auto& col : resultDS.colNames) { + row.values.emplace_back(iter->getColumn(col)); + } + resultDS.emplace_back(std::move(row)); + } + + EXPECT_TRUE(diffDataSet(resultDS, expected)); + }; + + std::vector<std::string> colNames = {"col1", "col2"}; + // Left and right are not empty + { + DataSet lds; + lds.colNames = colNames; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + DataSet rds; + rds.colNames = colNames; + rds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + + DataSet expected; + expected.colNames = colNames; + expected.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + Row({Value(3), Value("row3")}), + }; + + testUnion(lds, rds, expected); + } + // Left is empty + { + DataSet lds; + lds.colNames = colNames; + + DataSet rds; + rds.colNames = colNames; + rds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + DataSet expected; + expected.colNames = colNames; + expected.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + + testUnion(lds, rds, expected); + } + // Right is empty + { + DataSet lds; + lds.colNames = colNames; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + DataSet rds; + rds.colNames = colNames; + + DataSet expected; + expected.colNames = colNames; + expected.rows = { + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + testUnion(lds, rds, expected); + } + // All empty + { + DataSet lds; + lds.colNames = colNames; + + DataSet rds; + rds.colNames = colNames; + + DataSet expected; + expected.colNames = colNames; + + testUnion(lds, rds, expected); + } +} + +TEST_F(SetExecutorTest, TestUnionDifferentColumns) { + auto left = StartNode::make(plan_); + auto right = StartNode::make(plan_); + auto unionNode = Union::make(plan_, left, right); + unionNode->setLeftVar(left->varName()); + unionNode->setRightVar(right->varName()); + + auto unionExecutor = Executor::makeExecutor(unionNode, qctx_.get()); + + DataSet lds; + lds.colNames = {"col1"}; + DataSet rds; + rds.colNames = {"col1", "col2"}; + + // Must save the values after constructing executors + qctx_->ectx()->setValue(left->varName(), Value(lds)); + qctx_->ectx()->setValue(right->varName(), Value(rds)); + auto future = unionExecutor->execute(); + auto status = std::move(future).get(); + + EXPECT_FALSE(status.ok()); + + auto expected = "Data sets have different columns: <col1> vs. <col1,col2>"; + EXPECT_EQ(status.toString(), expected); +} + +TEST_F(SetExecutorTest, TestUnionDifferentValueType) { + auto left = StartNode::make(plan_); + auto right = StartNode::make(plan_); + auto unionNode = Union::make(plan_, left, right); + unionNode->setLeftVar(left->varName()); + unionNode->setRightVar(right->varName()); + + auto unionExecutor = Executor::makeExecutor(unionNode, qctx_.get()); + + List lst; + DataSet rds; + + // Must save the values after constructing executors + qctx_->ectx()->setValue(left->varName(), Value(lst)); + qctx_->ectx()->setValue(right->varName(), Value(rds)); + auto future = unionExecutor->execute(); + auto status = std::move(future).get(); + + EXPECT_FALSE(status.ok()); + + std::stringstream ss; + ss << "Invalid data types of dependencies: " << Value::Type::LIST << " vs. " + << Value::Type::DATASET << "."; + EXPECT_EQ(status.toString(), ss.str()); +} + +TEST_F(SetExecutorTest, TestIntersect) { + auto testInterset = [this](const DataSet& lds, const DataSet& rds, const DataSet& expected) { + auto left = StartNode::make(plan_); + auto right = StartNode::make(plan_); + auto intersect = Intersect::make(plan_, left, right); + intersect->setLeftVar(left->varName()); + intersect->setRightVar(right->varName()); + + auto executor = Executor::makeExecutor(intersect, qctx_.get()); + qctx_->ectx()->setResult(left->varName(), ExecResult::buildSequential(Value(lds), State())); + qctx_->ectx()->setResult(right->varName(), + ExecResult::buildSequential(Value(rds), State())); + + auto fut = executor->execute(); + auto status = std::move(fut).get(); + EXPECT_TRUE(status.ok()); + + auto& result = qctx_->ectx()->getResult(intersect->varName()); + EXPECT_TRUE(result.value().isDataSet()); + + DataSet ds; + ds.colNames = lds.colNames; + for (auto iter = result.iter(); iter->valid(); iter->next()) { + Row row; + for (auto& col : ds.colNames) { + row.values.emplace_back(iter->getColumn(col)); + } + ds.emplace_back(std::move(row)); + } + EXPECT_TRUE(diffDataSet(ds, expected)); + }; + // Right and left are not empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + rds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + DataSet expected; + expected.colNames = {"col1", "col2"}; + expected.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + }; + testInterset(lds, rds, expected); + } + // Right is empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + + testInterset(lds, rds, expected); + } + // Left is empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + rds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + + testInterset(lds, rds, expected); + } + // All empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + + testInterset(lds, rds, expected); + } +} + +TEST_F(SetExecutorTest, TestMinus) { + auto testMinus = [this](const DataSet& lds, const DataSet& rds, const DataSet& expected) { + auto left = StartNode::make(plan_); + auto right = StartNode::make(plan_); + auto minus = Minus::make(plan_, left, right); + minus->setLeftVar(left->varName()); + minus->setRightVar(right->varName()); + + auto executor = Executor::makeExecutor(minus, qctx_.get()); + qctx_->ectx()->setResult(left->varName(), ExecResult::buildSequential(Value(lds), State())); + qctx_->ectx()->setResult(right->varName(), + ExecResult::buildSequential(Value(rds), State())); + + auto fut = executor->execute(); + auto status = std::move(fut).get(); + EXPECT_TRUE(status.ok()); + + auto& result = qctx_->ectx()->getResult(minus->varName()); + EXPECT_TRUE(result.value().isDataSet()); + + DataSet ds; + ds.colNames = lds.colNames; + for (auto iter = result.iter(); iter->valid(); iter->next()) { + Row row; + for (auto& col : ds.colNames) { + row.values.emplace_back(iter->getColumn(col)); + } + ds.emplace_back(std::move(row)); + } + EXPECT_TRUE(diffDataSet(ds, expected)); + }; + // Left and right are not empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + rds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + expected.rows = { + Row({Value(2), Value("row2")}), + }; + + testMinus(lds, rds, expected); + } + // Left is empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + rds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(3), Value("row3")}), + }; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + + testMinus(lds, rds, expected); + } + // Right is empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + lds.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + expected.rows = { + Row({Value(1), Value("row1")}), + Row({Value(1), Value("row1")}), + Row({Value(2), Value("row2")}), + }; + + testMinus(lds, rds, expected); + } + // All empty + { + DataSet lds; + lds.colNames = {"col1", "col2"}; + + DataSet rds; + rds.colNames = {"col1", "col2"}; + + DataSet expected; + expected.colNames = {"col1", "col2"}; + + testMinus(lds, rds, expected); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/validator/test/ValidatorTestBase.h b/src/validator/test/ValidatorTestBase.h index d3e73f675861cec94619cf6ce471d10ca5140739..822e3ba6082f41bd46a5d9db1d181535375daf6e 100644 --- a/src/validator/test/ValidatorTestBase.h +++ b/src/validator/test/ValidatorTestBase.h @@ -17,9 +17,8 @@ namespace nebula { namespace graph { std::ostream& operator<<(std::ostream& os, const std::vector<PlanNode::Kind>& plan) { - std::vector<const char*> kinds; - kinds.reserve(plan.size()); - std::transform(plan.cbegin(), plan.cend(), std::back_inserter(kinds), PlanNode::toString); + std::vector<const char*> kinds(plan.size()); + std::transform(plan.cbegin(), plan.cend(), kinds.begin(), PlanNode::toString); os << "[" << folly::join(", ", kinds) << "]"; return os; }