Skip to content
Snippets Groups Projects
Unverified Commit 336bed8d authored by Yee's avatar Yee Committed by GitHub
Browse files

Extract dedup vid request data logics (#881)

* Extract dedup vid request data logics

* Refactor

* Increase timeout time
parent 055c5b71
No related branches found
No related tags found
No related merge requests found
......@@ -130,7 +130,7 @@ jobs:
ASAN_OPTIONS: fast_unwind_on_malloc=1
run: ctest -j $(($(nproc)/2+1)) --timeout 400 --output-on-failure
working-directory: build/
timeout-minutes: 15
timeout-minutes: 20
- name: Pytest
env:
NEBULA_TEST_LOGS_DIR: ${{ github.workspace }}/build
......@@ -142,7 +142,7 @@ jobs:
NEBULA_TEST_LOGS_DIR: ${{ github.workspace }}/build
run: make RM_DIR=false J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 15
timeout-minutes: 20
- name: Sanitizer
if: ${{ always() }}
run: |
......
......@@ -6,6 +6,7 @@
nebula_add_library(
executor_obj OBJECT
Executor.cpp
StorageAccessExecutor.cpp
logic/LoopExecutor.cpp
logic/PassThroughExecutor.cpp
logic/StartExecutor.cpp
......
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "executor/StorageAccessExecutor.h"
#include "common/interface/gen-cpp2/meta_types.h"
#include "context/Iterator.h"
#include "context/QueryExpressionContext.h"
#include "util/SchemaUtil.h"
namespace nebula {
namespace graph {
namespace internal {
template <typename VidType>
struct Vid;
template <>
struct Vid<int64_t> {
static int64_t value(const Value &v) {
return v.getInt();
}
};
template <>
struct Vid<std::string> {
static std::string value(const Value &v) {
return v.getStr();
}
};
template <typename VidType>
DataSet buildRequestDataSet(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup) {
DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr;
nebula::DataSet vertices({kVid});
vertices.rows.reserve(iter->size());
std::unordered_set<VidType> uniqueSet;
uniqueSet.reserve(iter->size());
const auto &vidType = space.spaceDesc.vid_type;
for (; iter->valid(); iter->next()) {
auto vid = expr->eval(exprCtx(iter));
if (!SchemaUtil::isValidVid(vid, vidType)) {
LOG(WARNING) << "Mismatched vid type: " << vid.type()
<< ", space vid type: " << SchemaUtil::typeToString(vidType);
continue;
}
if (dedup && !uniqueSet.emplace(Vid<VidType>::value(vid)).second) {
continue;
}
vertices.emplace_back(Row({std::move(vid)}));
}
return vertices;
}
} // namespace internal
bool StorageAccessExecutor::isIntVidType(const SpaceInfo &space) const {
return space.spaceDesc.vid_type.type == meta::cpp2::PropertyType::INT64;
}
DataSet StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup) {
const auto &space = qctx()->rctx()->session()->space();
QueryExpressionContext exprCtx(qctx()->ectx());
if (isIntVidType(space)) {
return internal::buildRequestDataSet<int64_t>(space, exprCtx, iter, expr, dedup);
}
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup);
}
} // namespace graph
} // namespace nebula
......@@ -10,10 +10,17 @@
#include "common/clients/storage/StorageClientBase.h"
#include "context/QueryContext.h"
#include "executor/Executor.h"
#include "service/Session.h"
namespace nebula {
class Expression;
namespace graph {
class Iterator;
struct SpaceInfo;
// It's used for data write/update/query
class StorageAccessExecutor : public Executor {
protected:
......@@ -125,6 +132,10 @@ protected:
folly::stringPrintf("%d(us)/%d(us)", std::get<1>(info), std::get<2>(info)));
}
}
bool isIntVidType(const SpaceInfo &space) const;
DataSet buildRequestDataSetByVidType(Iterator *iter, Expression *expr, bool dedup);
};
} // namespace graph
......
......@@ -12,7 +12,6 @@
#include "common/datatypes/List.h"
#include "common/datatypes/Vertex.h"
#include "context/QueryContext.h"
#include "util/SchemaUtil.h"
#include "util/ScopedTimer.h"
#include "service/GraphFlags.h"
......@@ -23,53 +22,17 @@ using nebula::storage::GraphStorageClient;
namespace nebula {
namespace graph {
folly::Future<Status> GetNeighborsExecutor::execute() {
auto status = buildRequestDataSet();
if (!status.ok()) {
return error(std::move(status));
}
return getNeighbors();
}
Status GetNeighborsExecutor::close() {
// clear the members
reqDs_.rows.clear();
return Executor::close();
}
Status GetNeighborsExecutor::buildRequestDataSet() {
DataSet GetNeighborsExecutor::buildRequestDataSet() {
SCOPED_TIMER(&execTime_);
auto inputVar = gn_->inputVar();
VLOG(1) << node()->outputVar() << " : " << inputVar;
auto& inputResult = ectx_->getResult(inputVar);
auto iter = inputResult.iter();
QueryExpressionContext ctx(ectx_);
DataSet input;
reqDs_.colNames = {kVid};
reqDs_.rows.reserve(iter->size());
auto* src = DCHECK_NOTNULL(gn_->src());
std::unordered_set<Value> uniqueVid;
const auto& spaceInfo = qctx()->rctx()->session()->space();
for (; iter->valid(); iter->next()) {
auto val = Expression::eval(src, ctx(iter.get()));
if (!SchemaUtil::isValidVid(val, spaceInfo.spaceDesc.vid_type)) {
continue;
}
if (gn_->dedup()) {
auto ret = uniqueVid.emplace(val);
if (ret.second) {
reqDs_.rows.emplace_back(Row({std::move(val)}));
}
} else {
reqDs_.rows.emplace_back(Row({std::move(val)}));
}
}
return Status::OK();
auto iter = ectx_->getResult(inputVar).iter();
return buildRequestDataSetByVidType(iter.get(), gn_->src(), gn_->dedup());
}
folly::Future<Status> GetNeighborsExecutor::getNeighbors() {
if (reqDs_.rows.empty()) {
VLOG(1) << "Empty input.";
folly::Future<Status> GetNeighborsExecutor::execute() {
DataSet reqDs = buildRequestDataSet();
if (reqDs.rows.empty()) {
List emptyResult;
return finish(ResultBuilder()
.value(Value(std::move(emptyResult)))
......@@ -81,8 +44,8 @@ folly::Future<Status> GetNeighborsExecutor::getNeighbors() {
GraphStorageClient* storageClient = qctx_->getStorageClient();
return storageClient
->getNeighbors(gn_->space(),
std::move(reqDs_.colNames),
std::move(reqDs_.rows),
std::move(reqDs.colNames),
std::move(reqDs.rows),
gn_->edgeTypes(),
gn_->edgeDirection(),
gn_->statProps(),
......
......@@ -7,19 +7,15 @@
#ifndef EXECUTOR_QUERY_GETNEIGHBORSEXECUTOR_H_
#define EXECUTOR_QUERY_GETNEIGHBORSEXECUTOR_H_
#include <vector>
#include "common/base/StatusOr.h"
#include "common/datatypes/Value.h"
#include "common/datatypes/Vertex.h"
#include "common/interface/gen-cpp2/storage_types.h"
#include "common/clients/storage/GraphStorageClient.h"
#include "executor/StorageAccessExecutor.h"
#include "planner/Query.h"
namespace nebula {
namespace graph {
class GetNeighborsExecutor final : public StorageAccessExecutor {
public:
GetNeighborsExecutor(const PlanNode *node, QueryContext *qctx)
......@@ -29,19 +25,13 @@ public:
folly::Future<Status> execute() override;
Status close() override;
DataSet buildRequestDataSet();
private:
friend class GetNeighborsTest_BuildRequestDataSet_Test;
Status buildRequestDataSet();
folly::Future<Status> getNeighbors();
using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsResponse>;
Status handleResponse(RpcResponse& resps);
private:
DataSet reqDs_;
const GetNeighbors* gn_;
};
......
......@@ -68,40 +68,8 @@ DataSet GetVerticesExecutor::buildRequestDataSet(const GetVertices* gv) {
// Accept Table such as | $a | $b | $c |... as input which one column indicate src
auto valueIter = ectx_->getResult(gv->inputVar()).iter();
VLOG(3) << "GV input var: " << gv->inputVar() << " iter kind: " << valueIter->kind();
auto expCtx = QueryExpressionContext(qctx()->ectx());
const auto &spaceInfo = qctx()->rctx()->session()->space();
vertices.rows.reserve(valueIter->size());
auto dedup = gv->dedup();
if (spaceInfo.spaceDesc.vid_type.type == meta::cpp2::PropertyType::INT64) {
std::unordered_set<int64_t> uniqueSet;
uniqueSet.reserve(valueIter->size());
for (; valueIter->valid(); valueIter->next()) {
auto src = gv->src()->eval(expCtx(valueIter.get()));
if (!SchemaUtil::isValidVid(src, spaceInfo.spaceDesc.vid_type)) {
LOG(WARNING) << "Mismatched vid type: " << src.type();
continue;
}
if (dedup && !uniqueSet.emplace(src.getInt()).second) {
continue;
}
vertices.emplace_back(Row({std::move(src)}));
}
} else {
std::unordered_set<std::string> uniqueSet;
uniqueSet.reserve(valueIter->size());
for (; valueIter->valid(); valueIter->next()) {
auto src = gv->src()->eval(expCtx(valueIter.get()));
if (!SchemaUtil::isValidVid(src, spaceInfo.spaceDesc.vid_type)) {
LOG(WARNING) << "Mismatched vid type: " << src.type();
continue;
}
if (dedup && !uniqueSet.emplace(src.getStr()).second) {
continue;
}
vertices.emplace_back(Row({std::move(src)}));
}
}
return vertices;
return buildRequestDataSetByVidType(valueIter.get(), gv->src(), gv->dedup());
}
} // namespace graph
} // namespace nebula
......@@ -69,8 +69,7 @@ TEST_F(GetNeighborsTest, BuildRequestDataSet) {
gn->setInputVar("input_gn");
auto gnExe = std::make_unique<GetNeighborsExecutor>(gn, qctx_.get());
auto status = gnExe->buildRequestDataSet();
EXPECT_TRUE(status.ok());
auto reqDs = gnExe->buildRequestDataSet();
DataSet expected;
expected.colNames = {kVid};
......@@ -79,7 +78,6 @@ TEST_F(GetNeighborsTest, BuildRequestDataSet) {
row.values.emplace_back(folly::to<std::string>(i));
expected.rows.emplace_back(std::move(row));
}
auto& reqDs = gnExe->reqDs_;
EXPECT_EQ(reqDs, expected);
}
} // namespace graph
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment