Skip to content
Snippets Groups Projects
Unverified Commit f7c5c44b authored by bright-starry-sky's avatar bright-starry-sky Committed by GitHub
Browse files

Executor and Planner for IndexScan (#235)

* lookup Validator | Planner | Executor

* IndexScan Planner and Executor

* Rebase master
parent e594eeb9
No related branches found
No related tags found
No related merge requests found
......@@ -26,6 +26,7 @@ nebula_add_library(
query/UnionExecutor.cpp
query/DataCollectExecutor.cpp
query/DataJoinExecutor.cpp
query/IndexScanExecutor.cpp
admin/SwitchSpaceExecutor.cpp
admin/CreateUserExecutor.cpp
admin/DropUserExecutor.cpp
......
......@@ -7,15 +7,91 @@
#include "executor/query/IndexScanExecutor.h"
#include "planner/PlanNode.h"
#include "util/ScopedTimer.h"
#include "context/QueryContext.h"
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::LookupIndexResp;
using nebula::storage::GraphStorageClient;
namespace nebula {
namespace graph {
folly::Future<Status> IndexScanExecutor::execute() {
SCOPED_TIMER(&execTime_);
// TODO(yee): Get all neighbors by storage client
return start();
return indexScan();
}
folly::Future<Status> IndexScanExecutor::indexScan() {
GraphStorageClient* storageClient = qctx_->getStorageClient();
auto *lookup = asNode<IndexScan>(node());
return storageClient->lookupIndex(lookup->space(),
*lookup->queryContext(),
lookup->isEdge(),
lookup->schemaId(),
*lookup->returnColumns())
.via(runner())
.then([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
return handleResp(std::move(rpcResp));
});
}
template <typename Resp>
Status IndexScanExecutor::handleResp(storage::StorageRpcResponse<Resp> &&rpcResp) {
auto completeness = handleCompleteness(rpcResp);
if (!completeness.ok()) {
return std::move(completeness).status();
}
auto state = std::move(completeness).value();
nebula::DataSet v;
for (auto &resp : rpcResp.responses()) {
checkResponseResult(resp.get_result());
if (resp.__isset.data) {
nebula::DataSet* data = resp.get_data();
// TODO : convert the column name to alias.
if (v.colNames.empty()) {
v.colNames = data->colNames;
}
v.rows.insert(v.rows.end(), data->rows.begin(), data->rows.end());
} else {
state = Result::State::kPartialSuccess;
}
}
return finish(ResultBuilder()
.value(std::move(v))
.iter(Iterator::Kind::kSequential)
.state(state)
.finish());
}
template <typename Resp>
StatusOr<Result::State>
IndexScanExecutor::handleCompleteness(const storage::StorageRpcResponse<Resp> &rpcResp) const {
auto completeness = rpcResp.completeness();
if (completeness != 100) {
const auto &failedCodes = rpcResp.failedParts();
for (auto it = failedCodes.begin(); it != failedCodes.end(); it++) {
LOG(ERROR) << name_ << " failed, error "
<< storage::cpp2::_ErrorCode_VALUES_TO_NAMES.at(it->second) << ", part "
<< it->first;
}
if (completeness == 0) {
return Status::Error("IndexScan from storage failed in executor.");
}
return Result::State::kPartialSuccess;
}
return Result::State::kSuccess;
}
void IndexScanExecutor::checkResponseResult(const storage::cpp2::ResponseCommon& result) const {
auto failedParts = result.get_failed_parts();
if (!failedParts.empty()) {
std::stringstream ss;
for (auto& part : failedParts) {
ss << "error code: " << storage::cpp2::_ErrorCode_VALUES_TO_NAMES.at(part.get_code())
<< ", leader: " << part.get_leader()->host << ":" << part.get_leader()->port
<< ", part id: " << part.get_part_id() << "; ";
}
LOG(ERROR) << ss.str();
}
}
} // namespace graph
......
......@@ -7,7 +7,10 @@
#ifndef EXECUTOR_QUERY_INDEXSCANEXECUTOR_H_
#define EXECUTOR_QUERY_INDEXSCANEXECUTOR_H_
#include "common/interface/gen-cpp2/storage_types.h"
#include "common/clients/storage/GraphStorageClient.h"
#include "executor/Executor.h"
#include "planner/Query.h"
namespace nebula {
namespace graph {
......@@ -15,10 +18,26 @@ namespace graph {
class IndexScanExecutor final : public Executor {
public:
IndexScanExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("IndexScanExecutor", node, qctx) {}
: Executor("IndexScanExecutor", node, qctx) {
gn_ = asNode<IndexScan>(node);
}
private:
folly::Future<Status> execute() override;
folly::Future<Status> indexScan();
template <typename Resp>
Status handleResp(storage::StorageRpcResponse<Resp> &&rpcResp);
template <typename Resp>
StatusOr<Result::State>
handleCompleteness(const storage::StorageRpcResponse<Resp> &rpcResp) const;
void checkResponseResult(const storage::cpp2::ResponseCommon& result) const;
private:
const IndexScan * gn_;
};
} // namespace graph
......
......@@ -403,15 +403,98 @@ private:
*/
class IndexScan final : public Explore {
public:
static IndexScan* make(QueryContext* qctx, PlanNode* input, GraphSpaceID space) {
return qctx->objPool()->add(new IndexScan(qctx->genId(), input, space));
using IndexQueryCtx = std::unique_ptr<std::vector<storage::cpp2::IndexQueryContext>>;
using IndexReturnCols = std::unique_ptr<std::vector<std::string>>;
static IndexScan* make(QueryContext* qctx,
PlanNode* input,
GraphSpaceID space,
IndexQueryCtx&& contexts,
IndexReturnCols&& returnCols,
bool isEdge,
int32_t schemaId,
bool dedup = false,
std::vector<storage::cpp2::OrderBy> orderBy = {},
int64_t limit = std::numeric_limits<int64_t>::max(),
std::string filter = "") {
return qctx->objPool()->add(new IndexScan(qctx->genId(),
input,
space,
std::move(contexts),
std::move(returnCols),
isEdge,
schemaId,
dedup,
std::move(orderBy),
limit,
std::move(filter)));
}
std::unique_ptr<cpp2::PlanNodeDescription> explain() const override;
const std::vector<storage::cpp2::IndexQueryContext>* queryContext() const {
return contexts_.get();
}
const std::vector<std::string>* returnColumns() const {
return returnCols_.get();
}
bool isEdge() const {
return isEdge_;
}
int32_t schemaId() const {
return schemaId_;
}
void setQueryContext(IndexQueryCtx contexts) {
contexts_ = std::move(contexts);
}
void setReturnCols(IndexReturnCols cols) {
returnCols_ = std::move(cols);
}
void setIsEdge(bool isEdge) {
isEdge_ = isEdge;
}
void setSchemaId(int32_t schema) {
schemaId_ = schema;
}
private:
IndexScan(int64_t id,
PlanNode* input,
GraphSpaceID space,
IndexQueryCtx&& contexts,
IndexReturnCols&& returnCols,
bool isEdge,
int32_t schemaId,
bool dedup,
std::vector<storage::cpp2::OrderBy> orderBy,
int64_t limit,
std::string filter)
: Explore(id,
Kind::kIndexScan,
input,
space,
dedup,
limit,
std::move(filter),
std::move(orderBy)) {
contexts_ = std::move(contexts);
returnCols_ = std::move(returnCols);
isEdge_ = isEdge;
schemaId_ = schemaId;
}
private:
IndexScan(int64_t id, PlanNode* input, GraphSpaceID space)
: Explore(id, Kind::kIndexScan, input, space) {}
IndexQueryCtx contexts_;
IndexReturnCols returnCols_;
bool isEdge_;
int32_t schemaId_;
};
/**
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment