diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 11f2f8acf57abe070771da68545636a4d78ac178..5e3afb98f9dbfb77172623dac4e299bb551578ec 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -130,7 +130,7 @@ jobs: ASAN_OPTIONS: fast_unwind_on_malloc=1 run: ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure working-directory: build/ - timeout-minutes: 15 + timeout-minutes: 20 - name: Pytest env: NEBULA_TEST_LOGS_DIR: ${{ github.workspace }}/build @@ -142,7 +142,7 @@ jobs: NEBULA_TEST_LOGS_DIR: ${{ github.workspace }}/build run: make RM_DIR=false J=${{ steps.cmake.outputs.j }} tck working-directory: tests/ - timeout-minutes: 15 + timeout-minutes: 20 - name: Sanitizer if: ${{ always() }} run: | diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index 719cc46c22d6433e062940aa3049daf66499df08..17d54c498c238b8ddc95a3891ea95d1ee7613a88 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -6,6 +6,7 @@ nebula_add_library( executor_obj OBJECT Executor.cpp + StorageAccessExecutor.cpp logic/LoopExecutor.cpp logic/PassThroughExecutor.cpp logic/StartExecutor.cpp diff --git a/src/executor/StorageAccessExecutor.cpp b/src/executor/StorageAccessExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8d129b3b356c367a585087744decbc1edb7500c3 --- /dev/null +++ b/src/executor/StorageAccessExecutor.cpp @@ -0,0 +1,85 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "executor/StorageAccessExecutor.h" + +#include "common/interface/gen-cpp2/meta_types.h" +#include "context/Iterator.h" +#include "context/QueryExpressionContext.h" +#include "util/SchemaUtil.h" + +namespace nebula { +namespace graph { + +namespace internal { + +template <typename VidType> +struct Vid; + +template <> +struct Vid<int64_t> { + static int64_t value(const Value &v) { + return v.getInt(); + } +}; + +template <> +struct Vid<std::string> { + static std::string value(const Value &v) { + return v.getStr(); + } +}; + +template <typename VidType> +DataSet buildRequestDataSet(const SpaceInfo &space, + QueryExpressionContext &exprCtx, + Iterator *iter, + Expression *expr, + bool dedup) { + DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr; + nebula::DataSet vertices({kVid}); + vertices.rows.reserve(iter->size()); + + std::unordered_set<VidType> uniqueSet; + uniqueSet.reserve(iter->size()); + + const auto &vidType = space.spaceDesc.vid_type; + + for (; iter->valid(); iter->next()) { + auto vid = expr->eval(exprCtx(iter)); + if (!SchemaUtil::isValidVid(vid, vidType)) { + LOG(WARNING) << "Mismatched vid type: " << vid.type() + << ", space vid type: " << SchemaUtil::typeToString(vidType); + continue; + } + if (dedup && !uniqueSet.emplace(Vid<VidType>::value(vid)).second) { + continue; + } + vertices.emplace_back(Row({std::move(vid)})); + } + return vertices; +} + +} // namespace internal + +bool StorageAccessExecutor::isIntVidType(const SpaceInfo &space) const { + return space.spaceDesc.vid_type.type == meta::cpp2::PropertyType::INT64; +} + +DataSet StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *iter, + Expression *expr, + bool dedup) { + const auto &space = qctx()->rctx()->session()->space(); + QueryExpressionContext exprCtx(qctx()->ectx()); + + if (isIntVidType(space)) { + return internal::buildRequestDataSet<int64_t>(space, exprCtx, iter, expr, dedup); + } + return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup); +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/StorageAccessExecutor.h b/src/executor/StorageAccessExecutor.h index 5cd1ccd37c6bbfb24972822d28e2ac68ddc8c85a..f7eec220c1a8f6e4be2537736ae5fd175d1044ea 100644 --- a/src/executor/StorageAccessExecutor.h +++ b/src/executor/StorageAccessExecutor.h @@ -10,10 +10,17 @@ #include "common/clients/storage/StorageClientBase.h" #include "context/QueryContext.h" #include "executor/Executor.h" +#include "service/Session.h" namespace nebula { + +class Expression; + namespace graph { +class Iterator; +struct SpaceInfo; + // It's used for data write/update/query class StorageAccessExecutor : public Executor { protected: @@ -125,6 +132,10 @@ protected: folly::stringPrintf("%d(us)/%d(us)", std::get<1>(info), std::get<2>(info))); } } + + bool isIntVidType(const SpaceInfo &space) const; + + DataSet buildRequestDataSetByVidType(Iterator *iter, Expression *expr, bool dedup); }; } // namespace graph diff --git a/src/executor/query/GetNeighborsExecutor.cpp b/src/executor/query/GetNeighborsExecutor.cpp index 04c7c38a48cb1ee4151508bf71586f853f61b30c..750a9d4bce5c9d95244371c61f6bee2fcff89c01 100644 --- a/src/executor/query/GetNeighborsExecutor.cpp +++ b/src/executor/query/GetNeighborsExecutor.cpp @@ -12,7 +12,6 @@ #include "common/datatypes/List.h" #include "common/datatypes/Vertex.h" #include "context/QueryContext.h" -#include "util/SchemaUtil.h" #include "util/ScopedTimer.h" #include "service/GraphFlags.h" @@ -23,53 +22,17 @@ using nebula::storage::GraphStorageClient; namespace nebula { namespace graph { -folly::Future<Status> GetNeighborsExecutor::execute() { - auto status = buildRequestDataSet(); - if (!status.ok()) { - return error(std::move(status)); - } - return getNeighbors(); -} - -Status GetNeighborsExecutor::close() { - // clear the members - reqDs_.rows.clear(); - return Executor::close(); -} - -Status GetNeighborsExecutor::buildRequestDataSet() { +DataSet GetNeighborsExecutor::buildRequestDataSet() { SCOPED_TIMER(&execTime_); auto inputVar = gn_->inputVar(); VLOG(1) << node()->outputVar() << " : " << inputVar; - auto& inputResult = ectx_->getResult(inputVar); - auto iter = inputResult.iter(); - QueryExpressionContext ctx(ectx_); - DataSet input; - reqDs_.colNames = {kVid}; - reqDs_.rows.reserve(iter->size()); - auto* src = DCHECK_NOTNULL(gn_->src()); - std::unordered_set<Value> uniqueVid; - const auto& spaceInfo = qctx()->rctx()->session()->space(); - for (; iter->valid(); iter->next()) { - auto val = Expression::eval(src, ctx(iter.get())); - if (!SchemaUtil::isValidVid(val, spaceInfo.spaceDesc.vid_type)) { - continue; - } - if (gn_->dedup()) { - auto ret = uniqueVid.emplace(val); - if (ret.second) { - reqDs_.rows.emplace_back(Row({std::move(val)})); - } - } else { - reqDs_.rows.emplace_back(Row({std::move(val)})); - } - } - return Status::OK(); + auto iter = ectx_->getResult(inputVar).iter(); + return buildRequestDataSetByVidType(iter.get(), gn_->src(), gn_->dedup()); } -folly::Future<Status> GetNeighborsExecutor::getNeighbors() { - if (reqDs_.rows.empty()) { - VLOG(1) << "Empty input."; +folly::Future<Status> GetNeighborsExecutor::execute() { + DataSet reqDs = buildRequestDataSet(); + if (reqDs.rows.empty()) { List emptyResult; return finish(ResultBuilder() .value(Value(std::move(emptyResult))) @@ -81,8 +44,8 @@ folly::Future<Status> GetNeighborsExecutor::getNeighbors() { GraphStorageClient* storageClient = qctx_->getStorageClient(); return storageClient ->getNeighbors(gn_->space(), - std::move(reqDs_.colNames), - std::move(reqDs_.rows), + std::move(reqDs.colNames), + std::move(reqDs.rows), gn_->edgeTypes(), gn_->edgeDirection(), gn_->statProps(), diff --git a/src/executor/query/GetNeighborsExecutor.h b/src/executor/query/GetNeighborsExecutor.h index 5dfdf57625fdcdd34b5cb40771f858b98078abaa..6032f857ed889b4cadb9125aa5f1a09591f7d15a 100644 --- a/src/executor/query/GetNeighborsExecutor.h +++ b/src/executor/query/GetNeighborsExecutor.h @@ -7,19 +7,15 @@ #ifndef EXECUTOR_QUERY_GETNEIGHBORSEXECUTOR_H_ #define EXECUTOR_QUERY_GETNEIGHBORSEXECUTOR_H_ -#include <vector> - #include "common/base/StatusOr.h" -#include "common/datatypes/Value.h" -#include "common/datatypes/Vertex.h" #include "common/interface/gen-cpp2/storage_types.h" -#include "common/clients/storage/GraphStorageClient.h" #include "executor/StorageAccessExecutor.h" #include "planner/Query.h" namespace nebula { namespace graph { + class GetNeighborsExecutor final : public StorageAccessExecutor { public: GetNeighborsExecutor(const PlanNode *node, QueryContext *qctx) @@ -29,19 +25,13 @@ public: folly::Future<Status> execute() override; - Status close() override; + DataSet buildRequestDataSet(); private: - friend class GetNeighborsTest_BuildRequestDataSet_Test; - Status buildRequestDataSet(); - - folly::Future<Status> getNeighbors(); - using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsResponse>; Status handleResponse(RpcResponse& resps); private: - DataSet reqDs_; const GetNeighbors* gn_; }; diff --git a/src/executor/query/GetVerticesExecutor.cpp b/src/executor/query/GetVerticesExecutor.cpp index 8ab0aebe9da26859c537c70755e9021eb406d1e0..e05eda2a8c9c7ddd8cb3ee5e347509b3e3fdbc74 100644 --- a/src/executor/query/GetVerticesExecutor.cpp +++ b/src/executor/query/GetVerticesExecutor.cpp @@ -68,40 +68,8 @@ DataSet GetVerticesExecutor::buildRequestDataSet(const GetVertices* gv) { // Accept Table such as | $a | $b | $c |... as input which one column indicate src auto valueIter = ectx_->getResult(gv->inputVar()).iter(); VLOG(3) << "GV input var: " << gv->inputVar() << " iter kind: " << valueIter->kind(); - auto expCtx = QueryExpressionContext(qctx()->ectx()); - const auto &spaceInfo = qctx()->rctx()->session()->space(); - vertices.rows.reserve(valueIter->size()); - auto dedup = gv->dedup(); - if (spaceInfo.spaceDesc.vid_type.type == meta::cpp2::PropertyType::INT64) { - std::unordered_set<int64_t> uniqueSet; - uniqueSet.reserve(valueIter->size()); - for (; valueIter->valid(); valueIter->next()) { - auto src = gv->src()->eval(expCtx(valueIter.get())); - if (!SchemaUtil::isValidVid(src, spaceInfo.spaceDesc.vid_type)) { - LOG(WARNING) << "Mismatched vid type: " << src.type(); - continue; - } - if (dedup && !uniqueSet.emplace(src.getInt()).second) { - continue; - } - vertices.emplace_back(Row({std::move(src)})); - } - } else { - std::unordered_set<std::string> uniqueSet; - uniqueSet.reserve(valueIter->size()); - for (; valueIter->valid(); valueIter->next()) { - auto src = gv->src()->eval(expCtx(valueIter.get())); - if (!SchemaUtil::isValidVid(src, spaceInfo.spaceDesc.vid_type)) { - LOG(WARNING) << "Mismatched vid type: " << src.type(); - continue; - } - if (dedup && !uniqueSet.emplace(src.getStr()).second) { - continue; - } - vertices.emplace_back(Row({std::move(src)})); - } - } - return vertices; + return buildRequestDataSetByVidType(valueIter.get(), gv->src(), gv->dedup()); } + } // namespace graph } // namespace nebula diff --git a/src/executor/test/GetNeighborsTest.cpp b/src/executor/test/GetNeighborsTest.cpp index df0a5651cee5e453e4fb9dfaad5d33ac6b7e5d15..5cac1b96fd1ae6e6de47319bd363bdfd28c5f248 100644 --- a/src/executor/test/GetNeighborsTest.cpp +++ b/src/executor/test/GetNeighborsTest.cpp @@ -69,8 +69,7 @@ TEST_F(GetNeighborsTest, BuildRequestDataSet) { gn->setInputVar("input_gn"); auto gnExe = std::make_unique<GetNeighborsExecutor>(gn, qctx_.get()); - auto status = gnExe->buildRequestDataSet(); - EXPECT_TRUE(status.ok()); + auto reqDs = gnExe->buildRequestDataSet(); DataSet expected; expected.colNames = {kVid}; @@ -79,7 +78,6 @@ TEST_F(GetNeighborsTest, BuildRequestDataSet) { row.values.emplace_back(folly::to<std::string>(i)); expected.rows.emplace_back(std::move(row)); } - auto& reqDs = gnExe->reqDs_; EXPECT_EQ(reqDs, expected); } } // namespace graph