Skip to content
Snippets Groups Projects
Commit 48f83b0c authored by dutor's avatar dutor Committed by GitHub
Browse files

Implement GoExecutor (#245)

* [WIP] Implement GoExecutor

* Address @laura.ding 's comments

* Addressed @dangleptr's comments

* Addressed @zlcook's comment

* Addressed @sherman-the-tank's and @dangleptr's comments

* Fixed a stupid error

* Reverted changes on ThriftClientManager

* Addressed @sherman-the-tank's comments

close #176  
close #177 
parent 0f59e00f
No related branches found
No related tags found
No related merge requests found
Showing
with 274 additions and 69 deletions
install(FILES nebula-graphd.conf.default DESTINATION etc) install(
FILES nebula-graphd.conf.default nebula-storaged.conf.default nebula-metad.conf.default
DESTINATION etc
)
########## basics ########## ########## basics ##########
# Whether run as a daemon process # Whether to run as a daemon process
--daemonize=true --daemonize=true
# The file to host the process id
--pid_file=pids/nebula-graphd.pid
########## logging ########## ########## logging ##########
# Directory to host logging files, which must already exist # The directory to host logging files, which must already exists
--log_dir=logs --log_dir=logs
# 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively # Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0 --minloglevel=0
# verbose loging level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging # Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
--v=4 --v=4
# maximum seconds to buffer the log messages # Maximum seconds to buffer the log messages
--logbufsecs=0 --logbufsecs=0
# Whether to redirect stdout and stderr to separate output files # Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true --redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir. # Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=stdout.log --stdout_log_file=stdout.log
--stderr_log_file=stderr.log --stderr_log_file=stderr.log
# File to host the process id, which also resides in log_dir
--pid_file=nebula-graphd.pid
########## networking ########## ########## networking ##########
# Network device to listen on # Network device to listen on
--listen_netdev=any --listen_netdev=any
# Port to listen on # Port to listen on
--port=3699 --port=3699
# seconds before we close the idle connections, 0 for infinite # To turn on SO_REUSEPORT or not
--reuse_port=false
# Backlog of the listen socket, adjust this together with net.core.somaxconn
--listen_backlog=1024
# Seconds before the idle connections are closed, 0 for never closed
--client_idle_timeout_secs=0 --client_idle_timeout_secs=0
# seconds before we expire the idle sessions, 0 for inifnite # Seconds before the idle sessions are expired, 0 for no expiration
--session_idle_timeout_secs=60000 --session_idle_timeout_secs=60000
# number of threads to accept incoming connections # The number of threads to accept incoming connections
--num_accept_threads=1 --num_accept_threads=1
# number of networking IO threads, 0 for number of physical CPU cores # The number of networking IO threads, 0 for number of physical CPU cores
--num_netio_threads=0 --num_netio_threads=0
# turn on SO_REUSEPORT or not # HTTP service port
--reuse_port=false --ws_http_port=0
# Backlog of the listen socket, adjust this together with net.core.somaxconn # HTTP2 service port
--listen_backlog=1024 --ws_h2_portt=0
...@@ -7,6 +7,7 @@ add_executable( ...@@ -7,6 +7,7 @@ add_executable(
$<TARGET_OBJECTS:console_obj> $<TARGET_OBJECTS:console_obj>
$<TARGET_OBJECTS:client_cpp_obj> $<TARGET_OBJECTS:client_cpp_obj>
$<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:graph_thrift_obj> $<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:time_obj> $<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:thread_obj> $<TARGET_OBJECTS:thread_obj>
......
...@@ -10,6 +10,9 @@ ...@@ -10,6 +10,9 @@
#include "console/CliManager.h" #include "console/CliManager.h"
#include "client/cpp/GraphClient.h" #include "client/cpp/GraphClient.h"
DECLARE_string(u);
DECLARE_string(p);
namespace nebula { namespace nebula {
namespace graph { namespace graph {
...@@ -18,6 +21,10 @@ const int32_t kMaxUsernameLen = 16; ...@@ -18,6 +21,10 @@ const int32_t kMaxUsernameLen = 16;
const int32_t kMaxPasswordLen = 24; const int32_t kMaxPasswordLen = 24;
const int32_t kMaxCommandLineLen = 1024; const int32_t kMaxCommandLineLen = 1024;
CliManager::CliManager() {
::using_history();
}
bool CliManager::connect(const std::string& addr, bool CliManager::connect(const std::string& addr,
uint16_t port, uint16_t port,
...@@ -32,11 +39,15 @@ bool CliManager::connect(const std::string& addr, ...@@ -32,11 +39,15 @@ bool CliManager::connect(const std::string& addr,
pass[kMaxPasswordLen] = '\0'; pass[kMaxPasswordLen] = '\0';
// Make sure username is not empty // Make sure username is not empty
for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(user); i++) { if (FLAGS_u.empty()) {
// Need to interactively get the username for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(user); i++) {
std::cout << "Username: "; // Need to interactively get the username
std::cin.getline(user, kMaxUsernameLen); std::cout << "Username: ";
user[kMaxUsernameLen] = '\0'; std::cin.getline(user, kMaxUsernameLen);
user[kMaxUsernameLen] = '\0';
}
} else {
strcpy(user, FLAGS_u.c_str()); // NOLINT
} }
if (!strlen(user)) { if (!strlen(user)) {
std::cout << "Authentication failed: " std::cout << "Authentication failed: "
...@@ -45,11 +56,15 @@ bool CliManager::connect(const std::string& addr, ...@@ -45,11 +56,15 @@ bool CliManager::connect(const std::string& addr,
} }
// Make sure password is not empty // Make sure password is not empty
for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(pass); i++) { if (FLAGS_p.empty()) {
// Need to interactively get the password for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(pass); i++) {
std::cout << "Password: "; // Need to interactively get the password
std::cin.getline(pass, kMaxPasswordLen); std::cout << "Password: ";
pass[kMaxPasswordLen] = '\0'; std::cin.getline(pass, kMaxPasswordLen);
pass[kMaxPasswordLen] = '\0';
}
} else {
strcpy(pass, FLAGS_p.c_str()); // NOLINT
} }
if (!strlen(pass)) { if (!strlen(pass)) {
std::cout << "Authentication failed: " std::cout << "Authentication failed: "
...@@ -85,26 +100,52 @@ void CliManager::loop() { ...@@ -85,26 +100,52 @@ void CliManager::loop() {
std::string cmd; std::string cmd;
loadHistory(); loadHistory();
while (true) { while (true) {
if (!readLine(cmd)) { std::string line;
if (!readLine(line, !cmd.empty())) {
break; break;
} }
if (cmd.empty()) { if (line.empty()) {
cmd.clear();
continue;
}
if (line.back() == '\\') {
line.resize(line.size() - 1);
if (cmd.empty()) {
cmd = line;
} else if (cmd.back() == ' ') {
cmd += line;
} else {
cmd = cmd + " " + line;
}
continue; continue;
} }
cmd += line;
if (!cmdProcessor_->process(cmd)) { if (!cmdProcessor_->process(cmd)) {
break; break;
} }
cmd.clear();
} }
saveHistory(); saveHistory();
} }
bool CliManager::readLine(std::string &line) { bool CliManager::readLine(std::string &line, bool linebreak) {
auto ok = true; auto ok = true;
char prompt[256]; char prompt[256];
static auto color = 0u; static auto color = 0u;
::snprintf(prompt, sizeof(prompt), "\033[1;%umnebula> \033[0m", color++ % 6 + 31); ::snprintf(prompt, sizeof(prompt),
auto *input = ::readline(prompt); "\001" // RL_PROMPT_START_IGNORE
"\033[1;%um" // color codes start
"\002" // RL_PROMPT_END_IGNORE
"nebula> " // prompt
"\001" // RL_PROMPT_START_IGNORE
"\033[0m" // restore color code
"\002", // RL_PROMPT_END_IGNORE
color++ % 6 + 31);
auto *input = ::readline(linebreak ? "": prompt);
do { do {
// EOF // EOF
......
...@@ -15,7 +15,7 @@ namespace graph { ...@@ -15,7 +15,7 @@ namespace graph {
class CliManager final { class CliManager final {
public: public:
CliManager() = default; CliManager();
~CliManager() = default; ~CliManager() = default;
bool connect(const std::string& addr, bool connect(const std::string& addr,
...@@ -27,7 +27,7 @@ public: ...@@ -27,7 +27,7 @@ public:
void loop(); void loop();
bool readLine(std::string &line); bool readLine(std::string &line, bool linebreak = false);
void updateHistory(const char *line); void updateHistory(const char *line);
......
...@@ -347,12 +347,12 @@ void CmdProcessor::processServerCmd(folly::StringPiece cmd) { ...@@ -347,12 +347,12 @@ void CmdProcessor::processServerCmd(folly::StringPiece cmd) {
if (std::regex_search(*msg, result, range)) { if (std::regex_search(*msg, result, range)) {
auto start = folly::to<size_t>(result[1].str()); auto start = folly::to<size_t>(result[1].str());
auto end = folly::to<size_t>(result[2].str()); auto end = folly::to<size_t>(result[2].str());
verbose = "syntax error near `" + std::string(&cmd[start-1], &cmd[end]) + "'"; verbose = "syntax error near `" + std::string(&cmd[start-1], end - start + 1) + "'";
} else if (std::regex_search(*msg, result, single)) { } else if (std::regex_search(*msg, result, single)) {
auto start = folly::to<size_t>(result[1].str()); auto start = folly::to<size_t>(result[1].str());
auto end = start + 8; auto end = start + 8;
end = end > cmd.size() ? cmd.size() : end; end = end > cmd.size() ? cmd.size() : end;
verbose = "syntax error near `" + std::string(&cmd[start-1], &cmd[end]) + "'"; verbose = "syntax error near `" + std::string(&cmd[start-1], end - start + 1) + "'";
} }
std::cout << "[ERROR (" << static_cast<int32_t>(res) std::cout << "[ERROR (" << static_cast<int32_t>(res)
<< ")]: " << verbose << "\n"; << ")]: " << verbose << "\n";
......
...@@ -6,24 +6,42 @@ ...@@ -6,24 +6,42 @@
#include "base/Base.h" #include "base/Base.h"
#include "console/CliManager.h" #include "console/CliManager.h"
#include "fs/FileUtils.h"
DEFINE_string(addr, "127.0.0.1", "Nebula daemon IP address"); DEFINE_string(addr, "127.0.0.1", "Nebula daemon IP address");
DEFINE_int32(port, 34500, "Nebula daemon listening port"); DEFINE_int32(port, 0, "Nebula daemon listening port");
DEFINE_string(username, "", "Username used to authenticate"); DEFINE_string(u, "", "Username used to authenticate");
DEFINE_string(password, "", "Password used to authenticate"); DEFINE_string(p, "", "Password used to authenticate");
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
folly::init(&argc, &argv, true); folly::init(&argc, &argv, true);
using nebula::graph::CliManager; using nebula::graph::CliManager;
using nebula::fs::FileUtils;
if (FLAGS_port == 0) {
// If port not provided, we use the one in etc/nebula-graphd.conf
auto path = FileUtils::readLink("/proc/self/exe").value();
auto dir = FileUtils::dirname(path.c_str());
static const std::regex pattern("--port=([0-9]+)");
FileUtils::FileLineIterator iter(dir + "/../etc/nebula-graphd.conf", &pattern);
if (iter.valid()) {
auto &smatch = iter.matched();
FLAGS_port = folly::to<int>(smatch[1].str());
}
}
if (FLAGS_port == 0) {
fprintf(stderr, "--port must be specified\n");
return EXIT_FAILURE;
}
CliManager cli; CliManager cli;
if (!cli.connect(FLAGS_addr, FLAGS_port, FLAGS_username, FLAGS_password)) { if (!cli.connect(FLAGS_addr, FLAGS_port, FLAGS_u, FLAGS_p)) {
exit(1); return EXIT_FAILURE;
} }
cli.loop(); cli.loop();
return EXIT_SUCCESS;
} }
...@@ -8,6 +8,9 @@ add_executable( ...@@ -8,6 +8,9 @@ add_executable(
$<TARGET_OBJECTS:graph_thrift_obj> $<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj> $<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj> $<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:time_obj> $<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj> $<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj> $<TARGET_OBJECTS:stats_obj>
...@@ -18,6 +21,7 @@ add_executable( ...@@ -18,6 +21,7 @@ add_executable(
$<TARGET_OBJECTS:thrift_obj> $<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:schema_obj> $<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:ws_obj> $<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:dataman_obj>
) )
nebula_link_libraries( nebula_link_libraries(
nebula-graphd nebula-graphd
...@@ -26,7 +30,6 @@ nebula_link_libraries( ...@@ -26,7 +30,6 @@ nebula_link_libraries(
${THRIFT_LIBRARIES} ${THRIFT_LIBRARIES}
wangle wangle
) )
install(TARGETS nebula-graphd DESTINATION bin)
add_executable( add_executable(
...@@ -78,6 +81,7 @@ add_executable( ...@@ -78,6 +81,7 @@ add_executable(
$<TARGET_OBJECTS:time_obj> $<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj> $<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj> $<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ws_obj> $<TARGET_OBJECTS:ws_obj>
) )
nebula_link_libraries( nebula_link_libraries(
...@@ -88,4 +92,9 @@ nebula_link_libraries( ...@@ -88,4 +92,9 @@ nebula_link_libraries(
${THRIFT_LIBRARIES} ${THRIFT_LIBRARIES}
wangle wangle
) )
install(TARGETS nebula-metad DESTINATION bin)
install(
TARGETS nebula-graphd nebula-storaged nebula-metad
DESTINATION bin
)
...@@ -54,6 +54,12 @@ int main(int argc, char *argv[]) { ...@@ -54,6 +54,12 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
if (FLAGS_daemonize) {
google::SetStderrLogging(google::FATAL);
} else {
google::SetStderrLogging(google::INFO);
}
// Setup logging // Setup logging
auto status = setupLogging(); auto status = setupLogging();
if (!status.ok()) { if (!status.ok()) {
...@@ -62,7 +68,7 @@ int main(int argc, char *argv[]) { ...@@ -62,7 +68,7 @@ int main(int argc, char *argv[]) {
} }
// Detect if the server has already been started // Detect if the server has already been started
auto pidPath = FLAGS_log_dir + "/" + FLAGS_pid_file; auto pidPath = FLAGS_pid_file;
status = ProcessUtils::isPidAvailable(pidPath); status = ProcessUtils::isPidAvailable(pidPath);
if (!status.ok()) { if (!status.ok()) {
LOG(ERROR) << status; LOG(ERROR) << status;
...@@ -84,13 +90,6 @@ int main(int argc, char *argv[]) { ...@@ -84,13 +90,6 @@ int main(int argc, char *argv[]) {
} }
} }
// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}
LOG(INFO) << "Starting Graph HTTP Service"; LOG(INFO) << "Starting Graph HTTP Service";
nebula::WebService::registerHandler("/graph", [] { nebula::WebService::registerHandler("/graph", [] {
return new nebula::graph::GraphHttpHandler(); return new nebula::graph::GraphHttpHandler();
...@@ -112,10 +111,10 @@ int main(int argc, char *argv[]) { ...@@ -112,10 +111,10 @@ int main(int argc, char *argv[]) {
localIP = std::move(result).value(); localIP = std::move(result).value();
} }
auto interface = std::make_shared<GraphService>();
gServer = std::make_unique<apache::thrift::ThriftServer>(); gServer = std::make_unique<apache::thrift::ThriftServer>();
auto interface = std::make_shared<GraphService>(gServer->getIOThreadPool());
gServer->setInterface(interface); gServer->setInterface(std::move(interface));
gServer->setAddress(localIP, FLAGS_port); gServer->setAddress(localIP, FLAGS_port);
gServer->setReusePort(FLAGS_reuse_port); gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
...@@ -131,6 +130,13 @@ int main(int argc, char *argv[]) { ...@@ -131,6 +130,13 @@ int main(int argc, char *argv[]) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads); gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
} }
// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port); FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port);
try { try {
gServer->serve(); // Blocking wait until shut down via gServer->stop() gServer->serve(); // Blocking wait until shut down via gServer->stop()
......
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "base/Base.h"
#include "graph/AssignmentExecutor.h"
#include "graph/TraverseExecutor.h"
namespace nebula {
namespace graph {
AssignmentExecutor::AssignmentExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<AssignmentSentence*>(sentence);
}
Status AssignmentExecutor::prepare() {
var_ = sentence_->var();
executor_ = TraverseExecutor::makeTraverseExecutor(sentence_->sentence(), ectx());
auto status = executor_->prepare();
if (!status.ok()) {
FLOG_ERROR("Prepare executor `%s' failed: %s",
executor_->name(), status.toString().c_str());
return status;
}
auto onError = [this] (Status s) {
DCHECK(onError_);
onError_(std::move(s));
};
auto onFinish = [this] () {
DCHECK(onFinish_);
onFinish_();
};
auto onResult = [this] (std::unique_ptr<InterimResult> result) {
ectx()->variableHolder()->add(*var_, std::move(result));
};
executor_->setOnError(onError);
executor_->setOnFinish(onFinish);
executor_->setOnResult(onResult);
return Status::OK();
}
void AssignmentExecutor::execute() {
executor_->execute();
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef GRAPH_ASSIGNMENTEXECUTOR_H_
#define GRAPH_ASSIGNMENTEXECUTOR_H_
#include "base/Base.h"
#include "graph/Executor.h"
namespace nebula {
namespace graph {
class TraverseExecutor;
class AssignmentExecutor final : public Executor {
public:
AssignmentExecutor(Sentence *sentence, ExecutionContext *ectx);
const char* name() const override {
return "AssignmentExecutor";
}
Status MUST_USE_RESULT prepare() override;
void execute() override;
private:
AssignmentSentence *sentence_{nullptr};
std::unique_ptr<TraverseExecutor> executor_;
std::string *var_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_ASSIGNMENTEXECUTOR_H_
...@@ -23,7 +23,10 @@ add_library( ...@@ -23,7 +23,10 @@ add_library(
DescribeEdgeExecutor.cpp DescribeEdgeExecutor.cpp
InsertVertexExecutor.cpp InsertVertexExecutor.cpp
InsertEdgeExecutor.cpp InsertEdgeExecutor.cpp
AssignmentExecutor.cpp
ShowExecutor.cpp ShowExecutor.cpp
InterimResult.cpp
VariableHolder.cpp
mock/PropertiesSchema.cpp mock/PropertiesSchema.cpp
mock/EdgeSchema.cpp mock/EdgeSchema.cpp
mock/TagSchema.cpp mock/TagSchema.cpp
......
...@@ -27,12 +27,17 @@ public: ...@@ -27,12 +27,17 @@ public:
id_ = id; id_ = id;
} }
const std::string& space() const { GraphSpaceID space() const {
return space_; return space_;
} }
void setSpace(std::string space) { void setSpace(const std::string &name, GraphSpaceID space) {
space_ = std::move(space); spaceName_ = name;
space_ = space;
}
const std::string& spaceName() const {
return spaceName_;
} }
uint64_t idleSeconds() const; uint64_t idleSeconds() const;
...@@ -58,8 +63,9 @@ private: ...@@ -58,8 +63,9 @@ private:
private: private:
int64_t id_{0}; int64_t id_{0};
GraphSpaceID space_{0};
time::Duration idleDuration_; time::Duration idleDuration_;
std::string space_; std::string spaceName_;
std::string user_; std::string user_;
}; };
......
...@@ -25,8 +25,8 @@ Status DescribeEdgeExecutor::prepare() { ...@@ -25,8 +25,8 @@ Status DescribeEdgeExecutor::prepare() {
void DescribeEdgeExecutor::execute() { void DescribeEdgeExecutor::execute() {
auto *name = sentence_->name(); auto *name = sentence_->name();
auto space = ectx()->rctx()->session()->space(); auto space = ectx()->rctx()->session()->space();
auto edgeType = meta::SchemaManager::toEdgeType(*name);
auto schema = meta::SchemaManager::getEdgeSchema(space, *name); auto schema = meta::SchemaManager::getEdgeSchema(space, edgeType);
resp_ = std::make_unique<cpp2::ExecutionResponse>(); resp_ = std::make_unique<cpp2::ExecutionResponse>();
do { do {
......
...@@ -25,7 +25,8 @@ Status DescribeTagExecutor::prepare() { ...@@ -25,7 +25,8 @@ Status DescribeTagExecutor::prepare() {
void DescribeTagExecutor::execute() { void DescribeTagExecutor::execute() {
auto *name = sentence_->name(); auto *name = sentence_->name();
auto space = ectx()->rctx()->session()->space(); auto space = ectx()->rctx()->session()->space();
auto schema = meta::SchemaManager::getTagSchema(space, *name); auto tagId = meta::SchemaManager::toTagID(*name);
auto schema = meta::SchemaManager::getTagSchema(space, tagId);
resp_ = std::make_unique<cpp2::ExecutionResponse>(); resp_ = std::make_unique<cpp2::ExecutionResponse>();
......
...@@ -12,22 +12,26 @@ ...@@ -12,22 +12,26 @@
#include "graph/RequestContext.h" #include "graph/RequestContext.h"
#include "parser/SequentialSentences.h" #include "parser/SequentialSentences.h"
#include "graph/mock/SchemaManager.h" #include "graph/mock/SchemaManager.h"
#include "graph/mock/StorageService.h" #include "graph/VariableHolder.h"
/** /**
* ExecutionContext holds context infos in the execution process, e.g. clients of storage or meta services. * ExecutionContext holds context infos in the execution process, e.g. clients of storage or meta services.
*/ */
namespace nebula { namespace nebula {
namespace storage {
class StorageClient;
} // namespace storage
namespace graph { namespace graph {
class ExecutionContext final : public cpp::NonCopyable, public cpp::NonMovable { class ExecutionContext final : public cpp::NonCopyable, public cpp::NonMovable {
public: public:
using RequestContextPtr = std::unique_ptr<RequestContext<cpp2::ExecutionResponse>>; using RequestContextPtr = std::unique_ptr<RequestContext<cpp2::ExecutionResponse>>;
ExecutionContext(RequestContextPtr rctx, SchemaManager *sm, StorageService *storage) { ExecutionContext(RequestContextPtr rctx, SchemaManager *sm, storage::StorageClient *storage) {
rctx_ = std::move(rctx); rctx_ = std::move(rctx);
sm_ = sm; sm_ = sm;
storage_ = storage; storage_ = storage;
variableHolder_ = std::make_unique<VariableHolder>();
} }
~ExecutionContext() = default; ~ExecutionContext() = default;
...@@ -40,14 +44,19 @@ public: ...@@ -40,14 +44,19 @@ public:
return sm_; return sm_;
} }
StorageService* storage() const { storage::StorageClient* storage() const {
return storage_; return storage_;
} }
VariableHolder* variableHolder() const {
return variableHolder_.get();
}
private: private:
RequestContextPtr rctx_; RequestContextPtr rctx_;
SchemaManager *sm_{nullptr}; SchemaManager *sm_{nullptr};
StorageService *storage_{nullptr}; storage::StorageClient *storage_{nullptr};
std::unique_ptr<VariableHolder> variableHolder_;
}; };
} // namespace graph } // namespace graph
......
...@@ -8,13 +8,14 @@ ...@@ -8,13 +8,14 @@
#include "graph/ExecutionEngine.h" #include "graph/ExecutionEngine.h"
#include "graph/ExecutionContext.h" #include "graph/ExecutionContext.h"
#include "graph/ExecutionPlan.h" #include "graph/ExecutionPlan.h"
#include "storage/client/StorageClient.h"
namespace nebula { namespace nebula {
namespace graph { namespace graph {
ExecutionEngine::ExecutionEngine() { ExecutionEngine::ExecutionEngine(std::unique_ptr<storage::StorageClient> storage) {
schemaManager_ = std::make_unique<SchemaManager>(); schemaManager_ = std::make_unique<SchemaManager>();
storage_ = std::make_unique<StorageService>(schemaManager_.get()); storage_ = std::move(storage);
} }
...@@ -26,7 +27,7 @@ void ExecutionEngine::execute(RequestContextPtr rctx) { ...@@ -26,7 +27,7 @@ void ExecutionEngine::execute(RequestContextPtr rctx) {
auto ectx = std::make_unique<ExecutionContext>(std::move(rctx), auto ectx = std::make_unique<ExecutionContext>(std::move(rctx),
schemaManager_.get(), schemaManager_.get(),
storage_.get()); storage_.get());
// TODO(dutor) add support to execution plan // TODO(dutor) add support to plan cache
auto plan = new ExecutionPlan(std::move(ectx)); auto plan = new ExecutionPlan(std::move(ectx));
plan->execute(); plan->execute();
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
#include "graph/RequestContext.h" #include "graph/RequestContext.h"
#include "gen-cpp2/GraphService.h" #include "gen-cpp2/GraphService.h"
#include "graph/mock/SchemaManager.h" #include "graph/mock/SchemaManager.h"
#include "graph/mock/StorageService.h"
/** /**
* ExecutinoEngine is responsible to create and manage ExecutionPlan. * ExecutinoEngine is responsible to create and manage ExecutionPlan.
...@@ -21,11 +20,15 @@ ...@@ -21,11 +20,15 @@
*/ */
namespace nebula { namespace nebula {
namespace storage {
class StorageClient;
} // namespace storage
namespace graph { namespace graph {
class ExecutionEngine final : public cpp::NonCopyable, public cpp::NonMovable { class ExecutionEngine final : public cpp::NonCopyable, public cpp::NonMovable {
public: public:
ExecutionEngine(); explicit ExecutionEngine(std::unique_ptr<storage::StorageClient> storage);
~ExecutionEngine(); ~ExecutionEngine();
using RequestContextPtr = std::unique_ptr<RequestContext<cpp2::ExecutionResponse>>; using RequestContextPtr = std::unique_ptr<RequestContext<cpp2::ExecutionResponse>>;
...@@ -33,7 +36,7 @@ public: ...@@ -33,7 +36,7 @@ public:
private: private:
std::unique_ptr<SchemaManager> schemaManager_; std::unique_ptr<SchemaManager> schemaManager_;
std::unique_ptr<StorageService> storage_; std::unique_ptr<storage::StorageClient> storage_;
}; };
} // namespace graph } // namespace graph
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "graph/InsertVertexExecutor.h" #include "graph/InsertVertexExecutor.h"
#include "graph/InsertEdgeExecutor.h" #include "graph/InsertEdgeExecutor.h"
#include "graph/ShowExecutor.h" #include "graph/ShowExecutor.h"
#include "graph/AssignmentExecutor.h"
namespace nebula { namespace nebula {
namespace graph { namespace graph {
...@@ -66,6 +67,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) { ...@@ -66,6 +67,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kShow: case Sentence::Kind::kShow:
executor = std::make_unique<ShowExecutor>(sentence, ectx()); executor = std::make_unique<ShowExecutor>(sentence, ectx());
break; break;
case Sentence::Kind::kAssignment:
executor = std::make_unique<AssignmentExecutor>(sentence, ectx());
break;
case Sentence::Kind::kUnknown: case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown"; LOG(FATAL) << "Sentence kind unknown";
break; break;
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "base/Status.h" #include "base/Status.h"
#include "cpp/helpers.h" #include "cpp/helpers.h"
#include "graph/ExecutionContext.h" #include "graph/ExecutionContext.h"
#include "gen-cpp2/common_types.h"
/** /**
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment