Skip to content
Snippets Groups Projects
Unverified Commit b8f9ac5a authored by Shylock Hg's avatar Shylock Hg Committed by GitHub
Browse files

Remove the useless executors. (#41)


Co-authored-by: default avatarYee <2520865+yixinglu@users.noreply.github.com>
parent fd717a88
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 1571 deletions
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "base/Base.h"
#include "graph/AlterEdgeExecutor.h"
#include "graph/SchemaHelper.h"
namespace nebula {
namespace graph {
AlterEdgeExecutor::AlterEdgeExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<AlterEdgeSentence*>(sentence);
}
Status AlterEdgeExecutor::prepare() {
return Status::OK();
}
Status AlterEdgeExecutor::getSchema() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
return status;
}
const auto& schemaOpts = sentence_->getSchemaOpts();
const auto& schemaProps = sentence_->getSchemaProps();
return SchemaHelper::alterSchema(schemaOpts, schemaProps, options_, schemaProp_);
}
void AlterEdgeExecutor::execute() {
auto status = getSchema();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();
auto future = mc->alterEdgeSchema(spaceId, *name, std::move(options_), std::move(schemaProp_));
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_ALTEREDGEEXECUTOR_H_
#define GRAPH_ALTEREDGEEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class AlterEdgeExecutor final : public Executor {
public:
AlterEdgeExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "AlterEdgeExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
Status getSchema();
private:
AlterEdgeSentence *sentence_{nullptr};
std::vector<nebula::meta::cpp2::AlterSchemaItem> options_;
nebula::cpp2::SchemaProp schemaProp_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_ALTEREDGEEXECUTOR_H_
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "base/Base.h"
#include "graph/AlterTagExecutor.h"
#include "graph/SchemaHelper.h"
namespace nebula {
namespace graph {
AlterTagExecutor::AlterTagExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<AlterTagSentence*>(sentence);
}
Status AlterTagExecutor::prepare() {
return Status::OK();
}
Status AlterTagExecutor::getSchema() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
return status;
}
const auto& schemaOpts = sentence_->getSchemaOpts();
const auto& schemaProps = sentence_->getSchemaProps();
return SchemaHelper::alterSchema(schemaOpts, schemaProps, options_, schemaProp_);
}
void AlterTagExecutor::execute() {
auto status = getSchema();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();
auto future = mc->alterTagSchema(spaceId, *name, std::move(options_), std::move(schemaProp_));
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_ALTERTAGEXECUTOR_H_
#define GRAPH_ALTERTAGEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class AlterTagExecutor final : public Executor {
public:
AlterTagExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "AlterTagExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
Status getSchema();
private:
AlterTagSentence *sentence_{nullptr};
std::vector<nebula::meta::cpp2::AlterSchemaItem> options_;
nebula::cpp2::SchemaProp schemaProp_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_ALTERTAGEXECUTOR_H_
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "base/Base.h"
#include "graph/AssignmentExecutor.h"
#include "graph/TraverseExecutor.h"
namespace nebula {
namespace graph {
AssignmentExecutor::AssignmentExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<AssignmentSentence*>(sentence);
}
Status AssignmentExecutor::prepare() {
var_ = sentence_->var();
executor_ = TraverseExecutor::makeTraverseExecutor(sentence_->sentence(), ectx());
auto onError = [this] (Status s) {
DCHECK(onError_);
onError_(std::move(s));
};
auto onFinish = [this] (Executor::ProcessControl ctr) {
DCHECK(onFinish_);
onFinish_(ctr);
};
auto onResult = [this] (std::unique_ptr<InterimResult> result) {
ectx()->variableHolder()->add(*var_, std::move(result));
};
executor_->setOnError(onError);
executor_->setOnFinish(onFinish);
executor_->setOnResult(onResult);
auto status = executor_->prepare();
if (!status.ok()) {
FLOG_ERROR("Prepare executor `%s' failed: %s",
executor_->name(), status.toString().c_str());
return status;
}
return Status::OK();
}
void AssignmentExecutor::execute() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
executor_->execute();
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_ASSIGNMENTEXECUTOR_H_
#define GRAPH_ASSIGNMENTEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class TraverseExecutor;
class AssignmentExecutor final : public Executor {
public:
AssignmentExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "AssignmentExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
AssignmentSentence *sentence_{nullptr};
std::unique_ptr<TraverseExecutor> executor_;
const std::string *var_{nullptr};
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_ASSIGNMENTEXECUTOR_H_
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "graph/BalanceExecutor.h"
namespace nebula {
namespace graph {
BalanceExecutor::BalanceExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<BalanceSentence*>(sentence);
}
Status BalanceExecutor::prepare() {
return Status::OK();
}
void BalanceExecutor::execute() {
auto showType = sentence_->subType();
switch (showType) {
case BalanceSentence::SubType::kLeader:
balanceLeader();
break;
case BalanceSentence::SubType::kData:
balanceData();
break;
case BalanceSentence::SubType::kDataStop:
balanceData(true);
break;
case BalanceSentence::SubType::kShowBalancePlan:
showBalancePlan();
break;
case BalanceSentence::SubType::kUnknown:
onError_(Status::Error("Type unknown"));
break;
}
}
void BalanceExecutor::balanceLeader() {
auto future = ectx()->getMetaClient()->balanceLeader();
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}
auto ret = std::move(resp).value();
if (!ret) {
DCHECK(onError_);
onError_(Status::Error("Balance leader failed"));
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
void BalanceExecutor::balanceData(bool isStop) {
std::vector<HostAddr> hostDelList;
auto hostDel = sentence_->hostDel();
if (hostDel != nullptr) {
hostDelList = hostDel->hosts();
}
auto future = ectx()->getMetaClient()->balance(std::move(hostDelList),
isStop);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}
auto balanceId = std::move(resp).value();
resp_ = std::make_unique<cpp2::ExecutionResponse>();
std::vector<std::string> header{"ID"};
resp_->set_column_names(std::move(header));
std::vector<cpp2::RowValue> rows;
std::vector<cpp2::ColumnValue> row;
row.resize(1);
row[0].set_integer(balanceId);
rows.emplace_back();
rows.back().set_columns(std::move(row));
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
void BalanceExecutor::showBalancePlan() {
auto id = sentence_->balanceId();
auto future = ectx()->getMetaClient()->showBalance(id);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}
auto tasks = std::move(resp).value();
resp_ = std::make_unique<cpp2::ExecutionResponse>();
std::vector<std::string> header{"balanceId, spaceId:partId, src->dst", "status"};
resp_->set_column_names(std::move(header));
std::vector<cpp2::RowValue> rows;
rows.reserve(tasks.size());
int32_t succeeded = 0;
int32_t failed = 0;
int32_t inProgress = 0;
int32_t invalid = 0;
for (auto& task : tasks) {
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_str(std::move(task.get_id()));
switch (task.get_result()) {
case meta::cpp2::TaskResult::SUCCEEDED:
row[1].set_str("succeeded");
succeeded++;
break;
case meta::cpp2::TaskResult::FAILED:
row[1].set_str("failed");
failed++;
break;
case meta::cpp2::TaskResult::IN_PROGRESS:
row[1].set_str("in progress");
inProgress++;
break;
case meta::cpp2::TaskResult::INVALID:
row[1].set_str("invalid");
invalid++;
break;
}
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
int32_t total = static_cast<int32_t>(rows.size());
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_str(
folly::stringPrintf("Total:%d, Succeeded:%d, Failed:%d, In Progress:%d, Invalid:%d",
total, succeeded, failed, inProgress, invalid));
row[1].set_str(folly::stringPrintf("%f%%",
total == 0 ? 100 : (100 - static_cast<float>(inProgress) / total * 100)));
rows.emplace_back();
rows.back().set_columns(std::move(row));
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
void BalanceExecutor::setupResponse(cpp2::ExecutionResponse &resp) {
if (resp_) {
resp = std::move(*resp_);
} else {
Executor::setupResponse(resp);
}
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_BALANCEEXECUTOR_H_
#define GRAPH_BALANCEEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class BalanceExecutor final : public Executor {
public:
BalanceExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "BalanceExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
void balanceLeader();
void balanceData(bool isStop = false);
void stopBalanceData();
void showBalancePlan();
void setupResponse(cpp2::ExecutionResponse &resp) override;
private:
BalanceSentence *sentence_{nullptr};
std::unique_ptr<cpp2::ExecutionResponse> resp_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_BALANCEEXECUTOR_H_
# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License,
# attached with Common Clause Condition 1.0, found in the LICENSES directory.
nebula_add_library(
graph_obj OBJECT
GraphFlags.cpp
GraphService.cpp
ClientSession.cpp
SessionManager.cpp
ExecutionEngine.cpp
ExecutionContext.cpp
ExecutionPlan.cpp
Executor.cpp
TraverseExecutor.cpp
SequentialExecutor.cpp
UseExecutor.cpp
GoExecutor.cpp
PipeExecutor.cpp
CreateEdgeExecutor.cpp
CreateTagExecutor.cpp
AlterEdgeExecutor.cpp
AlterTagExecutor.cpp
DropTagExecutor.cpp
DropEdgeExecutor.cpp
DescribeTagExecutor.cpp
DescribeEdgeExecutor.cpp
InsertVertexExecutor.cpp
UpdateVertexExecutor.cpp
InsertEdgeExecutor.cpp
UpdateEdgeExecutor.cpp
AssignmentExecutor.cpp
InterimResult.cpp
VariableHolder.cpp
CreateSpaceExecutor.cpp
DropSpaceExecutor.cpp
DescribeSpaceExecutor.cpp
ShowExecutor.cpp
YieldExecutor.cpp
DownloadExecutor.cpp
OrderByExecutor.cpp
IngestExecutor.cpp
ConfigExecutor.cpp
BalanceExecutor.cpp
SchemaHelper.cpp
FetchVerticesExecutor.cpp
FetchEdgesExecutor.cpp
FetchExecutor.cpp
SetExecutor.cpp
FindExecutor.cpp
MatchExecutor.cpp
DeleteVertexExecutor.cpp
DeleteEdgesExecutor.cpp
FindPathExecutor.cpp
LimitExecutor.cpp
GroupByExecutor.cpp
ReturnExecutor.cpp
CreateSnapshotExecutor.cpp
DropSnapshotExecutor.cpp
)
nebula_add_library(
graph_http_handler OBJECT
GraphHttpHandler.cpp
)
nebula_add_subdirectory(test)
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "graph/ConfigExecutor.h"
#include "base/Configuration.h"
namespace nebula {
namespace graph {
const std::string kConfigUnknown = "UNKNOWN"; // NOLINT
ConfigExecutor::ConfigExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<ConfigSentence*>(sentence);
}
Status ConfigExecutor::prepare() {
configItem_ = sentence_->configItem();
return Status::OK();
}
void ConfigExecutor::execute() {
auto showType = sentence_->subType();
switch (showType) {
case ConfigSentence::SubType::kShow:
showVariables();
break;
case ConfigSentence::SubType::kSet:
setVariables();
break;
case ConfigSentence::SubType::kGet:
getVariables();
break;
case ConfigSentence::SubType::kUnknown:
onError_(Status::Error("Type unknown"));
break;
}
}
void ConfigExecutor::showVariables() {
meta::cpp2::ConfigModule module = meta::cpp2::ConfigModule::ALL;
if (configItem_ != nullptr) {
if (configItem_->getModule() != nullptr) {
module = toThriftConfigModule(*configItem_->getModule());
}
}
if (module == meta::cpp2::ConfigModule::UNKNOWN) {
DCHECK(onError_);
onError_(Status::Error("Parse config module error"));
return;
}
auto future = ectx()->gflagsManager()->listConfigs(module);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp.status()));
return;
}
std::vector<std::string> header{"module", "name", "type", "mode", "value"};
resp_ = std::make_unique<cpp2::ExecutionResponse>();
resp_->set_column_names(std::move(header));
auto configs = std::move(resp.value());
std::vector<cpp2::RowValue> rows;
for (const auto &item : configs) {
auto row = genRow(item);
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error(folly::stringPrintf("Internal error : %s", e.what().c_str())));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
void ConfigExecutor::setVariables() {
meta::cpp2::ConfigModule module = meta::cpp2::ConfigModule::UNKNOWN;
std::string name;
VariantType value;
meta::cpp2::ConfigType type;
if (configItem_ != nullptr) {
if (configItem_->getModule() != nullptr) {
module = toThriftConfigModule(*configItem_->getModule());
}
if (configItem_->getName() != nullptr) {
name = *configItem_->getName();
}
if (configItem_->getValue() != nullptr) {
auto v = configItem_->getValue()->eval();
if (!v.ok()) {
DCHECK(onError_);
onError_(v.status());
return;
}
value = v.value();
switch (value.which()) {
case VAR_INT64:
type = meta::cpp2::ConfigType::INT64;
break;
case VAR_DOUBLE:
type = meta::cpp2::ConfigType::DOUBLE;
break;
case VAR_BOOL:
type = meta::cpp2::ConfigType::BOOL;
break;
case VAR_STR:
type = meta::cpp2::ConfigType::STRING;
break;
default:
DCHECK(onError_);
onError_(Status::Error("Parse value type error"));
return;
}
} else if (configItem_->getUpdateItems() != nullptr) {
auto status = configItem_->getUpdateItems()->toEvaledString();
if (!status.ok()) {
DCHECK(onError_);
onError_(status.status());
return;
}
value = status.value();
// all nested options are regarded as string
type = meta::cpp2::ConfigType::NESTED;
}
}
if (module == meta::cpp2::ConfigModule::UNKNOWN) {
DCHECK(onError_);
onError_(Status::Error("Parse config module error"));
return;
}
bool isForce = sentence_->isForce();
auto future = ectx()->gflagsManager()->setConfig(module, name, type, value, isForce);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto && resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp.status()));
return;
}
resp_ = std::make_unique<cpp2::ExecutionResponse>();
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error(folly::stringPrintf("Internal error : %s",
e.what().c_str())));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
void ConfigExecutor::getVariables() {
meta::cpp2::ConfigModule module = meta::cpp2::ConfigModule::UNKNOWN;
std::string name;
if (configItem_ != nullptr) {
if (configItem_->getModule() != nullptr) {
module = toThriftConfigModule(*configItem_->getModule());
}
if (configItem_->getName() != nullptr) {
name = *configItem_->getName();
}
}
if (module == meta::cpp2::ConfigModule::UNKNOWN) {
DCHECK(onError_);
onError_(Status::Error("Parse config module error"));
return;
}
auto future = ectx()->gflagsManager()->getConfig(module, name);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto && resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp.status()));
return;
}
std::vector<std::string> header{"module", "name", "type", "mode", "value"};
resp_ = std::make_unique<cpp2::ExecutionResponse>();
resp_->set_column_names(std::move(header));
auto configs = std::move(resp.value());
std::vector<cpp2::RowValue> rows;
for (const auto &item : configs) {
auto row = genRow(item);
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error(folly::stringPrintf("Internal error : %s", e.what().c_str())));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
std::vector<cpp2::ColumnValue> ConfigExecutor::genRow(const meta::cpp2::ConfigItem& item) {
std::vector<cpp2::ColumnValue> row;
row.resize(5);
row[0].set_str(ConfigModuleToString(item.get_module()));
row[1].set_str(item.get_name());
row[2].set_str(ConfigTypeToString(item.get_type()));
row[3].set_str(ConfigModeToString(item.get_mode()));
// TODO: Console must have same type of the same column over different lines,
// so we transform all kinds of value to string for now.
VariantType value;
switch (item.get_type()) {
case meta::cpp2::ConfigType::INT64:
value = *reinterpret_cast<const int64_t*>(item.get_value().data());
row[4].set_str(std::to_string(boost::get<int64_t>(value)));
break;
case meta::cpp2::ConfigType::DOUBLE:
value = *reinterpret_cast<const double*>(item.get_value().data());
row[4].set_str(std::to_string(boost::get<double>(value)));
break;
case meta::cpp2::ConfigType::BOOL:
value = *reinterpret_cast<const bool*>(item.get_value().data());
row[4].set_str(boost::get<bool>(value) ? "True" : "False");
break;
case meta::cpp2::ConfigType::STRING:
value = item.get_value();
row[4].set_str(boost::get<std::string>(value));
break;
case meta::cpp2::ConfigType::NESTED:
value = item.get_value();
Configuration conf;
auto status = conf.parseFromString(boost::get<std::string>(value));
if (!status.ok()) {
row[4].set_str(boost::get<std::string>(value));
} else {
row[4].set_str(conf.dumpToPrettyString());
}
break;
}
return row;
}
void ConfigExecutor::setupResponse(cpp2::ExecutionResponse &resp) {
resp = std::move(*resp_);
}
meta::cpp2::ConfigModule toThriftConfigModule(const nebula::ConfigModule& mode) {
switch (mode) {
case nebula::ConfigModule::ALL:
return meta::cpp2::ConfigModule::ALL;
case nebula::ConfigModule::GRAPH:
return meta::cpp2::ConfigModule::GRAPH;
case nebula::ConfigModule::META:
return meta::cpp2::ConfigModule::META;
case nebula::ConfigModule::STORAGE:
return meta::cpp2::ConfigModule::STORAGE;
default:
return meta::cpp2::ConfigModule::UNKNOWN;
}
}
std::string ConfigModuleToString(const meta::cpp2::ConfigModule& module) {
auto it = meta::cpp2::_ConfigModule_VALUES_TO_NAMES.find(module);
if (it == meta::cpp2::_ConfigModule_VALUES_TO_NAMES.end()) {
return kConfigUnknown;
} else {
return it->second;
}
}
std::string ConfigModeToString(const meta::cpp2::ConfigMode& mode) {
auto it = meta::cpp2::_ConfigMode_VALUES_TO_NAMES.find(mode);
if (it == meta::cpp2::_ConfigMode_VALUES_TO_NAMES.end()) {
return kConfigUnknown;
} else {
return it->second;
}
}
std::string ConfigTypeToString(const meta::cpp2::ConfigType& type) {
auto it = meta::cpp2::_ConfigType_VALUES_TO_NAMES.find(type);
if (it == meta::cpp2::_ConfigType_VALUES_TO_NAMES.end()) {
return kConfigUnknown;
} else {
return it->second;
}
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_CONFIGEXECUTOR_H_
#define GRAPH_CONFIGEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
#include "meta/ClientBasedGflagsManager.h"
namespace nebula {
namespace graph {
class ConfigExecutor final : public Executor {
public:
ConfigExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "ConfigExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
void setupResponse(cpp2::ExecutionResponse &resp) override;
void showVariables();
void setVariables();
void getVariables();
private:
std::vector<cpp2::ColumnValue> genRow(const meta::cpp2::ConfigItem& item);
ConfigSentence *sentence_{nullptr};
std::unique_ptr<cpp2::ExecutionResponse> resp_;
ConfigRowItem *configItem_{nullptr};
};
meta::cpp2::ConfigModule toThriftConfigModule(const nebula::ConfigModule& mode);
std::string ConfigModuleToString(const meta::cpp2::ConfigModule& module);
std::string ConfigModeToString(const meta::cpp2::ConfigMode& mode);
std::string ConfigTypeToString(const meta::cpp2::ConfigType& type);
} // namespace graph
} // namespace nebula
#endif // GRAPH_CONFIGEXECUTOR_H_
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "base/Base.h"
#include "graph/CreateEdgeExecutor.h"
#include "dataman/ResultSchemaProvider.h"
#include "graph/SchemaHelper.h"
namespace nebula {
namespace graph {
CreateEdgeExecutor::CreateEdgeExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<CreateEdgeSentence*>(sentence);
}
Status CreateEdgeExecutor::prepare() {
return Status::OK();
}
Status CreateEdgeExecutor::getSchema() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
return status;
}
const auto& specs = sentence_->columnSpecs();
const auto& schemaProps = sentence_->getSchemaProps();
return SchemaHelper::createSchema(specs, schemaProps, schema_);
}
void CreateEdgeExecutor::execute() {
auto status = getSchema();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();
auto future = mc->createEdgeSchema(spaceId, *name, schema_, sentence_->isIfNotExist());
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_CREATEEDGEEXECUTOR_H_
#define GRAPH_CREATEEDGEEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class CreateEdgeExecutor final : public Executor {
public:
CreateEdgeExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "CreateEdgeExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
Status getSchema();
private:
CreateEdgeSentence *sentence_{nullptr};
nebula::cpp2::Schema schema_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_DEFINEEDGEEXECUTOR_H_
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "graph/CreateSnapshotExecutor.h"
namespace nebula {
namespace graph {
CreateSnapshotExecutor::CreateSnapshotExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<CreateSnapshotSentence*>(sentence);
}
Status CreateSnapshotExecutor::prepare() {
return Status::OK();
}
void CreateSnapshotExecutor::execute() {
auto future = ectx()->getMetaClient()->createSnapshot();
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}
auto ret = std::move(resp).value();
if (!ret) {
DCHECK(onError_);
onError_(Status::Error("Balance leader failed"));
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_CREATESNAPSHOT_H_
#define GRAPH_CREATESNAPSHOT_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class CreateSnapshotExecutor final : public Executor {
public:
CreateSnapshotExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "CreateSnapshotExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
CreateSnapshotSentence *sentence_{nullptr};
std::unique_ptr<cpp2::ExecutionResponse> resp_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_CREATESNAPSHOT_H_
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "graph/CreateSpaceExecutor.h"
namespace nebula {
namespace graph {
CreateSpaceExecutor::CreateSpaceExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<CreateSpaceSentence*>(sentence);
}
Status CreateSpaceExecutor::prepare() {
spaceName_ = sentence_->name();
for (auto &item : sentence_->getOpts()) {
switch (item->getOptType()) {
case SpaceOptItem::PARTITION_NUM:
partNum_ = item->get_partition_num();
if (partNum_ <= 0) {
return Status::Error("Partition_num value should be greater than zero");
}
break;
case SpaceOptItem::REPLICA_FACTOR:
replicaFactor_ = item->get_replica_factor();
if (replicaFactor_ <= 0) {
return Status::Error("Replica_factor value should be greater than zero");
}
break;
}
}
return Status::OK();
}
void CreateSpaceExecutor::execute() {
auto future = ectx()->getMetaClient()->createSpace(
*spaceName_, partNum_, replicaFactor_, sentence_->isIfNotExist());
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}
auto spaceId = std::move(resp).value();
if (spaceId <= 0) {
DCHECK(onError_);
onError_(Status::Error("Create space failed"));
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_CREATESPACEEXECUTOR_H_
#define GRAPH_CREATESPACEEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class CreateSpaceExecutor final : public Executor {
public:
CreateSpaceExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "CreateSpaceExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
CreateSpaceSentence *sentence_{nullptr};
const std::string *spaceName_{nullptr};
// TODO Due to the currently design of the createSpace interface,
// it's impossible to express *not specified*, so we use 0 to indicate this.
int32_t partNum_{0};
int32_t replicaFactor_{0};
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_CREATESPACEEXECUTOR_H_
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "base/Base.h"
#include "graph/CreateTagExecutor.h"
#include "dataman/ResultSchemaProvider.h"
#include "graph/SchemaHelper.h"
namespace nebula {
namespace graph {
CreateTagExecutor::CreateTagExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<CreateTagSentence*>(sentence);
}
Status CreateTagExecutor::prepare() {
return Status::OK();
}
Status CreateTagExecutor::getSchema() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
return status;
}
const auto& specs = sentence_->columnSpecs();
const auto& schemaProps = sentence_->getSchemaProps();
return SchemaHelper::createSchema(specs, schemaProps, schema_);
}
void CreateTagExecutor::execute() {
auto status = getSchema();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();
auto future = mc->createTagSchema(spaceId, *name, schema_, sentence_->isIfNotExist());
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_CREATETAGEXECUTOR_H_
#define GRAPH_CREATETAGEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class CreateTagExecutor final : public Executor {
public:
CreateTagExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "CreateTagExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
Status getSchema();
private:
CreateTagSentence *sentence_{nullptr};
nebula::cpp2::Schema schema_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_DEFINETAGEXECUTOR_H_
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "base/Base.h"
#include "graph/DeleteEdgesExecutor.h"
#include "meta/SchemaManager.h"
#include "filter/Expressions.h"
#include "storage/client/StorageClient.h"
namespace nebula {
namespace graph {
DeleteEdgesExecutor::DeleteEdgesExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<DeleteEdgesSentence*>(sentence);
}
Status DeleteEdgesExecutor::prepare() {
Status status;
do {
status = checkIfGraphSpaceChosen();
if (!status.ok()) {
break;
}
spaceId_ = ectx()->rctx()->session()->space();
expCtx_ = std::make_unique<ExpressionContext>();
expCtx_->setSpace(spaceId_);
expCtx_->setStorageClient(ectx()->getStorageClient());
auto edgeStatus = ectx()->schemaManager()->toEdgeType(spaceId_, *sentence_->edge());
if (!edgeStatus.ok()) {
status = edgeStatus.status();
break;
}
edgeType_ = edgeStatus.value();
auto schema = ectx()->schemaManager()->getEdgeSchema(spaceId_, edgeType_);
if (schema == nullptr) {
status = Status::Error("No schema found for '%s'", sentence_->edge()->c_str());
break;
}
} while (false);
return status;
}
void DeleteEdgesExecutor::execute() {
auto status = setupEdgeKeys();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
// TODO Need to consider distributed transaction because in-edges/out-edges
// may be in different partitions
auto future = ectx()->getStorageClient()->deleteEdges(spaceId_, std::move(edgeKeys_));
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
auto completeness = resp.completeness();
if (completeness != 100) {
// TODO Need to consider atomic issues
DCHECK(onError_);
onError_(Status::Error("Internal Error"));
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
};
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}
Status DeleteEdgesExecutor::setupEdgeKeys() {
auto status = Status::OK();
auto edgeKeysExpr = sentence_->keys()->keys();
for (auto *keyExpr : edgeKeysExpr) {
auto *srcExpr = keyExpr->srcid();
srcExpr->setContext(expCtx_.get());
auto *dstExpr = keyExpr->dstid();
dstExpr->setContext(expCtx_.get());
auto rank = keyExpr->rank();
status = srcExpr->prepare();
if (!status.ok()) {
break;
}
status = dstExpr->prepare();
if (!status.ok()) {
break;
}
auto value = srcExpr->eval();
if (!value.ok()) {
return value.status();
}
auto srcid = value.value();
value = dstExpr->eval();
if (!value.ok()) {
return value.status();
}
auto dstid = value.value();
if (!Expression::isInt(srcid) || !Expression::isInt(dstid)) {
status = Status::Error("ID should be of type integer.");
break;
}
storage::cpp2::EdgeKey outkey;
outkey.set_src(Expression::asInt(srcid));
outkey.set_edge_type(edgeType_);
outkey.set_dst(Expression::asInt(dstid));
outkey.set_ranking(rank);
storage::cpp2::EdgeKey inkey;
inkey.set_src(Expression::asInt(dstid));
inkey.set_edge_type(-edgeType_);
inkey.set_dst(Expression::asInt(srcid));
inkey.set_ranking(rank);
edgeKeys_.emplace_back(std::move(outkey));
edgeKeys_.emplace_back(std::move(inkey));
}
return status;
}
} // namespace graph
} // namespace nebula
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment