Skip to content
Snippets Groups Projects
Unverified Commit 812e5246 authored by Yee's avatar Yee Committed by GitHub
Browse files

Move plan description from QueryContext to ExecutionPlan (#840)

* Move plan description from QueryContext to ExecutionPlan

* Delete outputVar in ExecutionPlan
parent bd34ed6c
No related branches found
No related tags found
No related merge requests found
......@@ -6,8 +6,6 @@
#include "context/QueryContext.h"
#include "common/interface/gen-cpp2/graph_types.h"
namespace nebula {
namespace graph {
......@@ -39,24 +37,5 @@ void QueryContext::init() {
vctx_ = std::make_unique<ValidateContext>(std::make_unique<AnonVarGenerator>(symTable_.get()));
}
void QueryContext::addProfilingData(int64_t planNodeId, ProfilingStats&& profilingStats) {
// return directly if not enable profile
if (!planDescription_) return;
auto found = planDescription_->nodeIndexMap.find(planNodeId);
DCHECK(found != planDescription_->nodeIndexMap.end());
auto idx = found->second;
auto& planNodeDesc = planDescription_->planNodeDescs[idx];
if (planNodeDesc.profiles == nullptr) {
planNodeDesc.profiles.reset(new std::vector<ProfilingStats>());
}
planNodeDesc.profiles->emplace_back(std::move(profilingStats));
}
void QueryContext::fillPlanDescription() {
DCHECK(ep_ != nullptr);
ep_->fillPlanDescription(planDescription_.get());
}
} // namespace graph
} // namespace nebula
......@@ -8,29 +8,24 @@
#define CONTEXT_QUERYCONTEXT_H_
#include "common/base/Base.h"
#include "common/base/ObjectPool.h"
#include "common/charset/Charset.h"
#include "common/clients/meta/MetaClient.h"
#include "common/clients/storage/GraphStorageClient.h"
#include "common/cpp/helpers.h"
#include "common/datatypes/Value.h"
#include "common/meta/SchemaManager.h"
#include "common/meta/IndexManager.h"
#include "common/meta/SchemaManager.h"
#include "context/ExecutionContext.h"
#include "context/Symbols.h"
#include "context/ValidateContext.h"
#include "parser/SequentialSentences.h"
#include "service/RequestContext.h"
#include "util/IdGenerator.h"
#include "common/base/ObjectPool.h"
#include "context/Symbols.h"
namespace nebula {
namespace graph {
namespace cpp2 {
class ProfilingStats;
class PlanDescription;
} // namespace cpp2
/***************************************************************************
*
* The context for each query request
......@@ -98,10 +93,6 @@ public:
return ep_.get();
}
void setPlan(std::unique_ptr<ExecutionPlan> plan) {
ep_ = std::move(plan);
}
meta::SchemaManager* schemaMng() const {
return sm_;
}
......@@ -130,18 +121,6 @@ public:
return idGen_->id();
}
void addProfilingData(int64_t planNodeId, ProfilingStats&& profilingStats);
PlanDescription* planDescription() const {
return planDescription_.get();
}
void setPlanDescription(std::unique_ptr<PlanDescription> planDescription) {
planDescription_ = std::move(planDescription);
}
void fillPlanDescription();
SymbolTable* symTable() const {
return symTable_.get();
}
......@@ -162,9 +141,6 @@ private:
// The Object Pool holds all internal generated objects.
// e.g. expressions, plan nodes, executors
std::unique_ptr<ObjectPool> objPool_;
// plan description for explain and profile query
std::unique_ptr<PlanDescription> planDescription_;
std::unique_ptr<IdGenerator> idGen_;
std::unique_ptr<SymbolTable> symTable_;
};
......
......@@ -532,7 +532,7 @@ Status Executor::close() {
stats.otherStats =
std::make_unique<std::unordered_map<std::string, std::string>>(std::move(otherStats_));
}
qctx()->addProfilingData(node_->id(), std::move(stats));
qctx()->plan()->addProfileStats(node_->id(), std::move(stats));
return Status::OK();
}
......
......@@ -7,6 +7,7 @@
#include "planner/ExecutionPlan.h"
#include "common/graph/Response.h"
#include "common/interface/gen-cpp2/graph_types.h"
#include "planner/Logic.h"
#include "planner/PlanNode.h"
#include "planner/Query.h"
......@@ -29,6 +30,7 @@ static size_t makePlanNodeDesc(const PlanNode* node, PlanDescription* planDesc)
planDesc->nodeIndexMap.emplace(node->id(), planNodeDescPos);
planDesc->planNodeDescs.emplace_back(std::move(*node->explain()));
auto& planNodeDesc = planDesc->planNodeDescs.back();
planNodeDesc.profiles = std::make_unique<std::vector<ProfilingStats>>();
switch (node->kind()) {
case PlanNode::Kind::kStart: {
......@@ -84,11 +86,23 @@ static size_t makePlanNodeDesc(const PlanNode* node, PlanDescription* planDesc)
return planNodeDescPos;
}
void ExecutionPlan::fillPlanDescription(PlanDescription* planDesc) const {
DCHECK(planDesc != nullptr);
planDesc->optimize_time_in_us = optimizeTimeInUs_;
void ExecutionPlan::describe(PlanDescription* planDesc) {
planDescription_ = DCHECK_NOTNULL(planDesc);
planDescription_->optimize_time_in_us = optimizeTimeInUs_;
planDescription_->format = explainFormat_;
makePlanNodeDesc(root_, planDesc);
}
void ExecutionPlan::addProfileStats(int64_t planNodeId, ProfilingStats&& profilingStats) {
// return directly if not enable profile
if (!planDescription_) return;
auto found = planDescription_->nodeIndexMap.find(planNodeId);
DCHECK(found != planDescription_->nodeIndexMap.end());
auto idx = found->second;
auto& planNodeDesc = planDescription_->planNodeDescs[idx];
planNodeDesc.profiles->emplace_back(std::move(profilingStats));
}
} // namespace graph
} // namespace nebula
......@@ -8,10 +8,13 @@
#define PLANNER_EXECUTIONPLAN_H_
#include <cstdint>
#include <memory>
namespace nebula {
struct ProfilingStats;
struct PlanDescription;
struct PlanNodeDescription;
namespace graph {
......@@ -22,14 +25,14 @@ public:
explicit ExecutionPlan(PlanNode* root = nullptr);
~ExecutionPlan();
void setRoot(PlanNode* root) {
root_ = root;
}
int64_t id() const {
return id_;
}
void setRoot(PlanNode* root) {
root_ = root;
}
PlanNode* root() const {
return root_;
}
......@@ -38,12 +41,21 @@ public:
return &optimizeTimeInUs_;
}
void fillPlanDescription(PlanDescription* planDesc) const;
void addProfileStats(int64_t planNodeId, ProfilingStats&& profilingStats);
void describe(PlanDescription* planDesc);
void setExplainFormat(const std::string& format) {
explainFormat_ = format;
}
private:
int32_t optimizeTimeInUs_{0};
int64_t id_{-1};
PlanNode* root_{nullptr};
// plan description for explain and profile query
PlanDescription* planDescription_{nullptr};
std::string explainFormat_;
};
} // namespace graph
......
......@@ -171,7 +171,7 @@ public:
void setOutputVar(const std::string &var);
std::string outputVar(size_t index = 0) const {
const std::string& outputVar(size_t index = 0) const {
DCHECK_LT(index, outputVars_.size());
return outputVars_[index]->name;
}
......
......@@ -14,16 +14,15 @@
#include "planner/ExecutionPlan.h"
#include "planner/PlanNode.h"
#include "scheduler/Scheduler.h"
#include "stats/StatsDef.h"
#include "util/AstUtils.h"
#include "util/ScopedTimer.h"
#include "validator/Validator.h"
#include "util/AstUtils.h"
#include "stats/StatsDef.h"
using nebula::opt::Optimizer;
using nebula::opt::OptRule;
using nebula::opt::RuleSet;
namespace nebula {
namespace graph {
......@@ -33,7 +32,6 @@ QueryInstance::QueryInstance(std::unique_ptr<QueryContext> qctx, Optimizer *opti
scheduler_ = std::make_unique<Scheduler>(qctx_.get());
}
void QueryInstance::execute() {
Status status = validateAndOptimize();
if (!status.ok()) {
......@@ -58,7 +56,6 @@ void QueryInstance::execute() {
.onError([this](const std::exception &e) { onError(Status::Error("%s", e.what())); });
}
Status QueryInstance::validateAndOptimize() {
auto *rctx = qctx()->rctx();
VLOG(1) << "Parsing query: " << rctx->query();
......@@ -67,63 +64,32 @@ Status QueryInstance::validateAndOptimize() {
sentence_ = std::move(result).value();
NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx()));
auto plan = std::make_unique<ExecutionPlan>();
{
SCOPED_TIMER(plan->optimizeTimeInUs());
auto rootStatus = optimizer_->findBestPlan(qctx_.get());
NG_RETURN_IF_ERROR(rootStatus);
plan->setRoot(const_cast<PlanNode *>(std::move(rootStatus).value()));
}
qctx_->setPlan(std::move(plan));
NG_RETURN_IF_ERROR(findBestPlan());
return Status::OK();
}
bool QueryInstance::explainOrContinue() {
if (sentence_->kind() != Sentence::Kind::kExplain) {
return true;
}
qctx_->fillPlanDescription();
auto &resp = qctx_->rctx()->resp();
resp.planDesc = std::make_unique<PlanDescription>();
DCHECK_NOTNULL(qctx_->plan())->describe(resp.planDesc.get());
return static_cast<const ExplainSentence *>(sentence_.get())->isProfile();
}
void QueryInstance::onFinish() {
auto rctx = qctx()->rctx();
VLOG(1) << "Finish query: " << rctx->query();
auto ectx = qctx()->ectx();
auto &spaceName = rctx->session()->space().name;
rctx->resp().spaceName = std::make_unique<std::string>(spaceName);
auto name = qctx()->plan()->root()->outputVar();
if (ectx->exist(name)) {
auto &&value = ectx->moveValue(name);
if (value.type() == Value::Type::DATASET) {
auto result = value.moveDataSet();
if (!result.colNames.empty()) {
rctx->resp().data = std::make_unique<DataSet>(std::move(result));
} else {
LOG(ERROR) << "Empty column name list";
rctx->resp().errorCode = ErrorCode::E_EXECUTION_ERROR;
rctx->resp().errorMsg = std::make_unique<std::string>(
"Internal error: empty column name list");
}
}
}
if (qctx()->planDescription() != nullptr) {
rctx->resp().planDesc = std::make_unique<PlanDescription>(
std::move(*qctx()->planDescription()));
}
fillRespData(&rctx->resp());
auto latency = rctx->duration().elapsedInUSec();
rctx->resp().latencyInUs = latency;
stats::StatsManager::addValue(kQueryLatencyUs, latency);
if (latency > static_cast<uint64_t>(FLAGS_slow_query_threshold_us)) {
stats::StatsManager::addValue(kNumSlowQueries);
stats::StatsManager::addValue(kSlowQueryLatencyUs, latency);
}
addSlowQueryStats(latency);
rctx->finish();
// The `QueryInstance' is the root node holding all resources during the execution.
......@@ -133,7 +99,6 @@ void QueryInstance::onFinish() {
delete this;
}
void QueryInstance::onError(Status status) {
LOG(ERROR) << status;
auto *rctx = qctx()->rctx();
......@@ -179,14 +144,48 @@ void QueryInstance::onError(Status status) {
rctx->resp().errorMsg = std::make_unique<std::string>(status.toString());
auto latency = rctx->duration().elapsedInUSec();
rctx->resp().latencyInUs = latency;
stats::StatsManager::addValue(kQueryLatencyUs, latency);
stats::StatsManager::addValue(kNumQueryErrors);
addSlowQueryStats(latency);
rctx->finish();
delete this;
}
void QueryInstance::addSlowQueryStats(uint64_t latency) const {
stats::StatsManager::addValue(kQueryLatencyUs, latency);
if (latency > static_cast<uint64_t>(FLAGS_slow_query_threshold_us)) {
stats::StatsManager::addValue(kNumSlowQueries);
stats::StatsManager::addValue(kSlowQueryLatencyUs, latency);
}
rctx->finish();
delete this;
}
void QueryInstance::fillRespData(ExecutionResponse *resp) {
auto ectx = DCHECK_NOTNULL(qctx_->ectx());
auto plan = DCHECK_NOTNULL(qctx_->plan());
const auto &name = plan->root()->outputVar();
if (!ectx->exist(name)) return;
auto &&value = ectx->moveValue(name);
if (!value.isDataSet()) return;
// fill dataset
auto result = value.moveDataSet();
if (!result.colNames.empty()) {
resp->data = std::make_unique<DataSet>(std::move(result));
} else {
resp->errorCode = ErrorCode::E_EXECUTION_ERROR;
resp->errorMsg = std::make_unique<std::string>("Internal error: empty column name list");
LOG(ERROR) << "Empty column name list";
}
}
Status QueryInstance::findBestPlan() {
auto plan = qctx_->plan();
SCOPED_TIMER(plan->optimizeTimeInUs());
auto rootStatus = optimizer_->findBestPlan(qctx_.get());
NG_RETURN_IF_ERROR(rootStatus);
auto root = std::move(rootStatus).value();
plan->setRoot(const_cast<PlanNode *>(root));
return Status::OK();
}
} // namespace graph
......
......@@ -52,6 +52,9 @@ private:
Status validateAndOptimize();
// return true if continue to execute
bool explainOrContinue();
void addSlowQueryStats(uint64_t latency) const;
void fillRespData(ExecutionResponse* resp);
Status findBestPlan();
std::unique_ptr<Sentence> sentence_;
std::unique_ptr<QueryContext> qctx_;
......
......@@ -58,9 +58,7 @@ Status ExplainValidator::validateImpl() {
auto status = toExplainFormatType(explain->formatType());
NG_RETURN_IF_ERROR(status);
auto planDesc = std::make_unique<PlanDescription>();
planDesc->format = std::move(status).value();
qctx_->setPlanDescription(std::move(planDesc));
qctx_->plan()->setExplainFormat(std::move(status).value());
NG_RETURN_IF_ERROR(validator_->validate());
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment