diff --git a/src/exec/CMakeLists.txt b/src/exec/CMakeLists.txt index f716057fc070c91eb2656fb2f82b847c1bfbfd84..711d850419440bfba9a78ac7e09bbca4661ba056 100644 --- a/src/exec/CMakeLists.txt +++ b/src/exec/CMakeLists.txt @@ -27,6 +27,7 @@ nebula_add_library( query/DataCollectExecutor.cpp query/DataJoinExecutor.cpp admin/SwitchSpaceExecutor.cpp + admin/SubmitJobExecutor.cpp admin/ShowHostsExecutor.cpp admin/SpaceExecutor.cpp admin/SnapshotExecutor.cpp diff --git a/src/exec/Executor.cpp b/src/exec/Executor.cpp index b4b6b5eb6381c1107bbe5f5ba73eafe4b9b9985b..e49012a3e81ab04404e037486ce8a3bcc058bd7c 100644 --- a/src/exec/Executor.cpp +++ b/src/exec/Executor.cpp @@ -13,6 +13,7 @@ #include "context/ExecutionContext.h" #include "context/QueryContext.h" #include "exec/ExecutionError.h" +#include "exec/admin/SubmitJobExecutor.h" #include "exec/admin/ShowHostsExecutor.h" #include "exec/admin/SnapshotExecutor.h" #include "exec/admin/SpaceExecutor.h" @@ -402,6 +403,13 @@ Executor *Executor::makeExecutor(const PlanNode *node, exec->dependsOn(input); break; } + case PlanNode::Kind::kSubmitJob: { + auto submitJob = asNode<SubmitJob>(node); + auto input = makeExecutor(submitJob->dep(), qctx, visited); + exec = new SubmitJobExecutor(submitJob, qctx); + exec->dependsOn(input); + break; + } case PlanNode::Kind::kShowHosts: { auto showHosts = asNode<ShowHosts>(node); auto input = makeExecutor(showHosts->dep(), qctx, visited); diff --git a/src/exec/admin/SubmitJobExecutor.cpp b/src/exec/admin/SubmitJobExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..092c61921279e82ac23c6b0422515b684b2f1cb4 --- /dev/null +++ b/src/exec/admin/SubmitJobExecutor.cpp @@ -0,0 +1,130 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/admin/SubmitJobExecutor.h" + +#include "planner/Admin.h" +#include "context/QueryContext.h" +#include "util/ScopedTimer.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> SubmitJobExecutor::execute() { + SCOPED_TIMER(&execTime_); + + auto *sjNode = asNode<SubmitJob>(node()); + auto jobOp = sjNode->jobOp(); + meta::cpp2::AdminCmd cmd = meta::cpp2::AdminCmd::COMPACT; + if (jobOp == meta::cpp2::AdminJobOp::ADD) { + std::vector<std::string> params; + folly::split(" ", sjNode->params().front(), params, true); + if (params.front() == "compact") { + cmd = meta::cpp2::AdminCmd::COMPACT; + } else if (params.front() == "flush") { + cmd = meta::cpp2::AdminCmd::FLUSH; + } else { + DLOG(FATAL) << "Unknown job command " << params.front(); + return Status::Error("Unknown job command %s", params.front().c_str()); + } + } + return qctx()->getMetaClient()->submitJob(jobOp, cmd, sjNode->params()) + .via(runner()) + .then([jobOp, this](StatusOr<meta::cpp2::AdminJobResult> &&resp) { + SCOPED_TIMER(&execTime_); + + if (!resp.ok()) { + LOG(ERROR) << resp.status().toString(); + return std::move(resp).status(); + } + switch (jobOp) { + case meta::cpp2::AdminJobOp::ADD: { + nebula::DataSet v({"New Job Id"}); + DCHECK(resp.value().__isset.job_id); + if (!resp.value().__isset.job_id) { + return Status::Error("Response unexpected."); + } + v.emplace_back(nebula::Row({*DCHECK_NOTNULL(resp.value().get_job_id())})); + return finish(std::move(v)); + } + case meta::cpp2::AdminJobOp::RECOVER: { + nebula::DataSet v({"Recovered job num"}); + DCHECK(resp.value().__isset.recovered_job_num); + if (!resp.value().__isset.recovered_job_num) { + return Status::Error("Response unexpected."); + } + v.emplace_back( + nebula::Row({*DCHECK_NOTNULL(resp.value().get_recovered_job_num())})); + return finish(std::move(v)); + } + case meta::cpp2::AdminJobOp::SHOW: { + nebula::DataSet v( + {"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"}); + DCHECK(resp.value().__isset.job_desc); + if (!resp.value().__isset.job_desc) { + return Status::Error("Response unexpected."); + } + DCHECK(resp.value().__isset.task_desc); + if (!resp.value().__isset.task_desc) { + return Status::Error("Response unexpected"); + } + auto &jobDesc = *resp.value().get_job_desc(); + // job desc + v.emplace_back( + nebula::Row( + {jobDesc.front().get_id(), + meta::cpp2::_AdminCmd_VALUES_TO_NAMES.at(jobDesc.front().get_cmd()), + meta::cpp2::_JobStatus_VALUES_TO_NAMES.at(jobDesc.front().get_status()), + jobDesc.front().get_start_time(), + jobDesc.front().get_stop_time(), + })); + // tasks desc + auto &tasksDesc = *resp.value().get_task_desc(); + for (const auto & taskDesc : tasksDesc) { + v.emplace_back(nebula::Row({ + taskDesc.get_task_id(), + taskDesc.get_host().host, + meta::cpp2::_JobStatus_VALUES_TO_NAMES.at(taskDesc.get_status()), + taskDesc.get_start_time(), + taskDesc.get_stop_time(), + })); + } + return finish(std::move(v)); + } + case meta::cpp2::AdminJobOp::SHOW_All: { + nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"}); + DCHECK(resp.value().__isset.job_desc); + if (!resp.value().__isset.job_desc) { + return Status::Error("Response unexpected"); + } + const auto &jobsDesc = *resp.value().get_job_desc(); + for (const auto &jobDesc : jobsDesc) { + v.emplace_back(nebula::Row({ + jobDesc.get_id(), + meta::cpp2::_AdminCmd_VALUES_TO_NAMES.at(jobDesc.get_cmd()), + meta::cpp2::_JobStatus_VALUES_TO_NAMES.at(jobDesc.get_status()), + jobDesc.get_start_time(), + jobDesc.get_stop_time(), + })); + } + return finish(std::move(v)); + } + case meta::cpp2::AdminJobOp::STOP: { + nebula::DataSet v({"Result"}); + v.emplace_back(nebula::Row({ + "Job stopped" + })); + return finish(std::move(v)); + } + // no default so the compiler will warning when lack + } + DLOG(FATAL) << "Unknown job operation " << static_cast<int>(jobOp); + return Status::Error("Unkown job job operation %d.", static_cast<int>(jobOp)); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/exec/admin/SubmitJobExecutor.h b/src/exec/admin/SubmitJobExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..e0d75c1e6ce2b1971f4bc99f039ec4a7c5ad90f2 --- /dev/null +++ b/src/exec/admin/SubmitJobExecutor.h @@ -0,0 +1,26 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXEC_ADMIN_SUBMIT_JOB_EXECUTOR_H_ +#define EXEC_ADMIN_SUBMIT_JOB_EXECUTOR_H_ + +#include "exec/Executor.h" + +namespace nebula { +namespace graph { + +class SubmitJobExecutor final : public Executor { +public: + SubmitJobExecutor(const PlanNode *node, QueryContext *ectx) + : Executor("SubmitJobExecutor", node, ectx) {} + + folly::Future<Status> execute() override; +}; + +} // namespace graph +} // namespace nebula + +#endif // EXEC_ADMIN_SUBMIT_JOB_EXECUTOR_H_ diff --git a/src/mock/MetaCache.cpp b/src/mock/MetaCache.cpp index 1646f978b76529ddbe70782352af6d190b021152..9b6a39093ce97ba09285f57426deb5a71d9520c8 100644 --- a/src/mock/MetaCache.cpp +++ b/src/mock/MetaCache.cpp @@ -323,6 +323,118 @@ std::unordered_map<PartitionID, std::vector<HostAddr>> MetaCache::getParts() { return parts; } +ErrorOr<meta::cpp2::ErrorCode, meta::cpp2::AdminJobResult> +MetaCache::runAdminJob(const meta::cpp2::AdminJobReq& req) { + meta::cpp2::AdminJobResult result; + switch (req.get_op()) { + case meta::cpp2::AdminJobOp::ADD: { + folly::RWSpinLock::WriteHolder wh(jobLock_); + auto jobId = incId(); + jobs_.emplace(jobId, JobDesc{ + req.get_cmd(), + req.get_paras(), + meta::cpp2::JobStatus::QUEUE, + 0, + 0 + }); + std::vector<TaskDesc> descs; + int32_t iTask = 0; + for (const auto &host : hostSet_) { + descs.reserve(hostSet_.size()); + descs.emplace_back(TaskDesc{ + ++iTask, + host, + meta::cpp2::JobStatus::QUEUE, + 0, + 0 + }); + } + tasks_.emplace(jobId, std::move(descs)); + result.set_job_id(jobId); + return result; + } + case meta::cpp2::AdminJobOp::RECOVER: { + uint32_t jobNum = 0; + folly::RWSpinLock::WriteHolder wh(jobLock_); + for (auto &job : jobs_) { + if (job.second.status_ == meta::cpp2::JobStatus::FAILED) { + job.second.status_ = meta::cpp2::JobStatus::QUEUE; + ++jobNum; + } + } + result.set_recovered_job_num(jobNum); + return result; + } + case meta::cpp2::AdminJobOp::SHOW: { + folly::RWSpinLock::ReadHolder rh(jobLock_); + auto ret = checkJobId(req); + if (!ok(ret)) { + return error(ret); + } + auto job = value(ret); + result.set_job_id(job->first); + std::vector<meta::cpp2::JobDesc> jobsDesc; + meta::cpp2::JobDesc jobDesc; + jobDesc.set_id(job->first); + jobDesc.set_cmd(job->second.cmd_); + jobDesc.set_status(job->second.status_); + jobDesc.set_start_time(job->second.startTime_); + jobDesc.set_stop_time(job->second.stopTime_); + jobsDesc.emplace_back(std::move(jobDesc)); + result.set_job_desc(std::move(jobsDesc)); + + // tasks + const auto tasks = tasks_.find(job->first); + if (tasks == tasks_.end()) { + LOG(FATAL) << "Impossible not find tasks of job id " << job->first; + } + std::vector<meta::cpp2::TaskDesc> tasksDesc; + for (const auto &task : tasks->second) { + meta::cpp2::TaskDesc taskDesc; + taskDesc.set_job_id(job->first); + taskDesc.set_task_id(task.iTask_); + taskDesc.set_host(task.dest_); + taskDesc.set_status(task.status_); + taskDesc.set_start_time(task.startTime_); + taskDesc.set_stop_time(task.stopTime_); + tasksDesc.emplace_back(std::move(taskDesc)); + } + result.set_task_desc(std::move(tasksDesc)); + return result; + } + case meta::cpp2::AdminJobOp::SHOW_All: { + std::vector<meta::cpp2::JobDesc> jobsDesc; + folly::RWSpinLock::ReadHolder rh(jobLock_); + for (const auto &job : jobs_) { + meta::cpp2::JobDesc jobDesc; + jobDesc.set_id(job.first); + jobDesc.set_cmd(job.second.cmd_); + jobDesc.set_status(job.second.status_); + jobDesc.set_start_time(job.second.startTime_); + jobDesc.set_stop_time(job.second.stopTime_); + jobsDesc.emplace_back(std::move(jobDesc)); + } + result.set_job_desc(std::move(jobsDesc)); + return result; + } + case meta::cpp2::AdminJobOp::STOP: { + folly::RWSpinLock::WriteHolder wh(jobLock_); + auto ret = checkJobId(req); + if (!ok(ret)) { + return error(ret); + } + auto job = value(ret); + if (job->second.status_ != meta::cpp2::JobStatus::QUEUE && + job->second.status_ != meta::cpp2::JobStatus::RUNNING) { + return meta::cpp2::ErrorCode::E_CONFLICT; + } + job->second.status_ = meta::cpp2::JobStatus::STOPPED; + return result; + } + } + return meta::cpp2::ErrorCode::E_INVALID_PARM; +} + Status MetaCache::alterColumnDefs(meta::cpp2::Schema &schema, const std::vector<meta::cpp2::AlterSchemaItem> &items) { std::vector<meta::cpp2::ColumnDef> columns = schema.columns; @@ -474,5 +586,6 @@ StatusOr<std::vector<meta::cpp2::Snapshot>> MetaCache::listSnapshots() { } return snapshots; } + } // namespace graph } // namespace nebula diff --git a/src/mock/MetaCache.h b/src/mock/MetaCache.h index b00a53e9aa06fa75d6cb22559f5aa312eb595ec3..9feeacc20d4e54f86f2f0639df377730a2b35d13 100644 --- a/src/mock/MetaCache.h +++ b/src/mock/MetaCache.h @@ -9,6 +9,7 @@ #include "common/base/Base.h" #include "common/base/StatusOr.h" +#include "common/base/ErrorOr.h" #include "common/interface/gen-cpp2/meta_types.h" namespace nebula { @@ -69,6 +70,9 @@ public: std::unordered_map<PartitionID, std::vector<HostAddr>> getParts(); + ErrorOr<meta::cpp2::ErrorCode, meta::cpp2::AdminJobResult> + runAdminJob(const meta::cpp2::AdminJobReq& req); + Status createSnapshot(); Status dropSnapshot(const meta::cpp2::DropSnapshotReq& req); @@ -78,6 +82,10 @@ public: private: MetaCache() = default; + int64_t incId() { + return ++id_; + } + Status alterColumnDefs(meta::cpp2::Schema &schema, const std::vector<meta::cpp2::AlterSchemaItem> &items); @@ -136,6 +144,49 @@ private: int64_t id_{0}; std::unordered_map<std::string, meta::cpp2::Snapshot> snapshots_; mutable folly::RWSpinLock lock_; + +////////////////////////////////////////////// Job ///////////////////////////////////////////////// + struct JobDesc { + meta::cpp2::AdminCmd cmd_; // compact, flush ... + std::vector<std::string> paras_; + meta::cpp2::JobStatus status_; + int64_t startTime_; + int64_t stopTime_; + }; + struct TaskDesc { + int32_t iTask_; + nebula::HostAddr dest_; + meta::cpp2::JobStatus status_; + int64_t startTime_; + int64_t stopTime_; + }; + + ErrorOr<meta::cpp2::ErrorCode, std::unordered_map<int64_t, JobDesc>::iterator> + checkJobId(const meta::cpp2::AdminJobReq& req) { + const auto ¶ms = req.get_paras(); + if (params.empty()) { + return meta::cpp2::ErrorCode::E_INVALID_PARM; + } + int64_t jobId; + try { + jobId = folly::to<int64_t>(params.front()); + } catch (std::exception &e) { + LOG(ERROR) << e.what(); + return meta::cpp2::ErrorCode::E_INVALID_PARM; + } + const auto job = jobs_.find(jobId); + if (job == jobs_.end()) { + return meta::cpp2::ErrorCode::E_INVALID_PARM; + } + return job; + } + + + mutable folly::RWSpinLock jobLock_; + // jobId => jobs + std::unordered_map<int64_t, JobDesc> jobs_; + // jobId => tasks + std::unordered_map<int64_t, std::vector<TaskDesc>> tasks_; }; } // namespace graph diff --git a/src/mock/MockMetaServiceHandler.cpp b/src/mock/MockMetaServiceHandler.cpp index 1cc890cc0639b7ed9f877b79b4f5328113d81b29..3fd8e1d557c84f59633ab684f9d3b9dc76bf40ae 100644 --- a/src/mock/MockMetaServiceHandler.cpp +++ b/src/mock/MockMetaServiceHandler.cpp @@ -74,13 +74,16 @@ MockMetaServiceHandler::future_listSpaces(const meta::cpp2::ListSpacesReq&) { } folly::Future<meta::cpp2::AdminJobResp> -MockMetaServiceHandler::future_runAdminJob(const meta::cpp2::AdminJobReq&) { - folly::Promise<meta::cpp2::AdminJobResp> promise; - auto future = promise.getFuture(); +MockMetaServiceHandler::future_runAdminJob(const meta::cpp2::AdminJobReq& req) { meta::cpp2::AdminJobResp resp; + auto result = MetaCache::instance().runAdminJob(req); + if (!ok(result)) { + resp.set_code(result.left()); + return resp; + } resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED); - promise.setValue(std::move(resp)); - return future; + resp.set_result(std::move(result).right()); + return resp; } folly::Future<meta::cpp2::GetSpaceResp> diff --git a/src/mock/test/AdminJobTest.cpp b/src/mock/test/AdminJobTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..35c76cd57e93592d359aa93f0ed01022e7681b97 --- /dev/null +++ b/src/mock/test/AdminJobTest.cpp @@ -0,0 +1,173 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "common/base/Status.h" +#include "common/network/NetworkUtils.h" +#include "common/interface/gen-cpp2/common_types.h" +#include "mock/test/TestEnv.h" +#include "mock/test/TestBase.h" +#include <gtest/gtest.h> + +DECLARE_int32(heartbeat_interval_secs); + +namespace nebula { +namespace graph { + +class AdminJobTest : public TestBase { +public: + void SetUp() override { + TestBase::SetUp(); + client_ = gEnv->getGraphClient(); + ASSERT_NE(nullptr, client_); + }; + + void TearDown() override { + TestBase::TearDown(); + client_.reset(); + }; + +protected: + std::unique_ptr<GraphClient> client_; +}; + +TEST_F(AdminJobTest, Error) { + { + // submit without space + cpp2::ExecutionResponse resp; + std::string query = "SUBMIT JOB COMPACT"; + client_->execute(query, resp); + // TODO(shylock) semantic error? + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_SEMANTIC_ERROR); + } + { + // show one not exists + cpp2::ExecutionResponse resp; + std::string query = "SHOW JOB 233"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_EXECUTION_ERROR); + } + { + // stop one not exists + cpp2::ExecutionResponse resp; + std::string query = "STOP JOB 233"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_EXECUTION_ERROR); + } +} + +TEST_F(AdminJobTest, Base) { + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE SPACE space_for_default(partition_num=9, replica_factor=1);"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + } + sleep(FLAGS_heartbeat_interval_secs + 1); + { + cpp2::ExecutionResponse resp; + std::string query = "USE space_for_default;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + } + { + // submit + cpp2::ExecutionResponse resp; + std::string query = "SUBMIT JOB COMPACT"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + DataSet expected({"New Job Id"}); + expected.emplace_back(Row({ + 2 + })); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // submit + cpp2::ExecutionResponse resp; + std::string query = "SUBMIT JOB FLUSH"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + DataSet expected({"New Job Id"}); + expected.emplace_back(Row({ + 3 + })); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // show all + cpp2::ExecutionResponse resp; + std::string query = "SHOW JOBS"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + nebula::DataSet expected({"Job Id", "Command", "Status", "Start Time", "Stop Time"}); + expected.emplace_back(Row({ + 2, "COMPACT", "QUEUE", 0, 0, + })); + expected.emplace_back(Row({ + 3, "FLUSH", "QUEUE", 0, 0 + })); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // show one + cpp2::ExecutionResponse resp; + std::string query = "SHOW JOB 2"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + nebula::DataSet expected( + {"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"}); + expected.emplace_back(Row({2, "COMPACT", "QUEUE", 0, 0})); + expected.emplace_back(Row({1, "127.0.0.1", "QUEUE", 0, 0})); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // stop one + cpp2::ExecutionResponse resp; + std::string query = "STOP JOB 2"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + nebula::DataSet expected({"Result"}); + expected.emplace_back(Row({"Job stopped"})); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // show all + cpp2::ExecutionResponse resp; + std::string query = "SHOW JOBS"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + nebula::DataSet expected({"Job Id", "Command", "Status", "Start Time", "Stop Time"}); + expected.emplace_back(Row({ + 2, "COMPACT", "STOPPED", 0, 0, + })); + expected.emplace_back(Row({ + 3, "FLUSH", "QUEUE", 0, 0 + })); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // recover + cpp2::ExecutionResponse resp; + std::string query = "RECOVER JOB"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + nebula::DataSet expected({"Recovered job num"}); + expected.emplace_back(Row({ + 0 + })); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/mock/test/CMakeLists.txt b/src/mock/test/CMakeLists.txt index a2f1a85601fd75af716c75c610475bfbdc676c0e..aabdf6d0c3281fc67155c2515f3a9fd95f452e6c 100644 --- a/src/mock/test/CMakeLists.txt +++ b/src/mock/test/CMakeLists.txt @@ -87,6 +87,22 @@ nebula_add_test( wangle ) +nebula_add_test( + NAME + admin_job_test + SOURCES + AdminJobTest.cpp + OBJECTS + ${GRAPH_TEST_LIB} + LIBRARIES + proxygenhttpserver + proxygenlib + ${THRIFT_LIBRARIES} + wangle + gtest + gtest_main +) + nebula_add_test( NAME snapshot_test diff --git a/src/mock/test/TestBase.h b/src/mock/test/TestBase.h index 68b5b9d8ea7f12663bb894d93a76ba1809f18d68..be1869b838d00f32ca9e244975001da65bbf9440 100644 --- a/src/mock/test/TestBase.h +++ b/src/mock/test/TestBase.h @@ -20,6 +20,29 @@ protected: void TearDown() override; + static ::testing::AssertionResult verifyDataSetWithoutOrder(cpp2::ExecutionResponse &resp, + DataSet &expected) { + if (resp.get_error_code() != cpp2::ErrorCode::SUCCEEDED) { + return ::testing::AssertionFailure() << "query failed: " + << cpp2::_ErrorCode_VALUES_TO_NAMES.at(resp.get_error_code()); + } + if (!resp.__isset.data) { + return ::testing::AssertionFailure() << "No data in response"; + } + auto &data = *resp.get_data(); + std::sort(data.rows.begin(), data.rows.end()); + std::sort(expected.rows.begin(), expected.rows.end()); + if (data != expected) { + return ::testing::AssertionFailure() << "Not match data set" << std::endl + << "Resp: " << std::endl + << data + << "Expected: " << std::endl + << expected; + } else { + return ::testing::AssertionSuccess(); + } + } + static ::testing::AssertionResult TestOK() { return ::testing::AssertionSuccess(); } @@ -103,6 +126,10 @@ protected: } }; +#define ASSERT_ERROR_CODE(resp, expected) ASSERT_EQ(resp.get_error_code(), expected) \ + << "Expect: " << cpp2::_ErrorCode_VALUES_TO_NAMES.at(expected) << ", " \ + << "In fact: " << cpp2::_ErrorCode_VALUES_TO_NAMES.at(resp.get_error_code()) + } // namespace graph } // namespace nebula diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index 8e5db4904e159654205afde2564f8b24cc51e4b0..e772a0d8ee1c404ce45dc257b0cd88f4b3cc34a9 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -151,4 +151,32 @@ std::string DropSnapshotSentence::toString() const { return folly::stringPrintf("DROP SNAPSHOT %s", name_.get()->c_str()); } +std::string AdminJobSentence::toString() const { + switch (op_) { + case meta::cpp2::AdminJobOp::ADD: + return "add job"; + case meta::cpp2::AdminJobOp::SHOW_All: + return "show jobs"; + case meta::cpp2::AdminJobOp::SHOW: + return "show job"; + case meta::cpp2::AdminJobOp::STOP: + return "stop job"; + case meta::cpp2::AdminJobOp::RECOVER: + return "recover job"; + } + LOG(FATAL) << "Unkown job operation " << static_cast<uint8_t>(op_); +} + +meta::cpp2::AdminJobOp AdminJobSentence::getType() const { + return op_; +} + +const std::vector<std::string> &AdminJobSentence::getParas() const { + return paras_; +} + +void AdminJobSentence::addPara(const std::string& para) { + paras_.emplace_back(para); +} + } // namespace nebula diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 0a2f7b1ad008bd6ff8cf30ba4a275bb98644d877..0cda54e91594bc0eddd6b5768a62dc83f864168d 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -10,6 +10,7 @@ #include "parser/Sentence.h" #include "parser/MutateSentences.h" #include "common/network/NetworkUtils.h" +#include "common/interface/gen-cpp2/meta_types.h" namespace nebula { @@ -483,6 +484,23 @@ private: std::unique_ptr<std::string> name_; }; +class AdminJobSentence final : public Sentence { +public: + explicit AdminJobSentence(meta::cpp2::AdminJobOp op) : op_(op) { + kind_ = Kind::kAdminJob; + } + + void addPara(const std::string& para); + std::string toString() const override; + meta::cpp2::AdminJobOp getType() const; + const std::vector<std::string> &getParas() const; + +private: + meta::cpp2::AdminJobOp op_; + std::vector<std::string> paras_; +}; + + } // namespace nebula #endif // PARSER_ADMINSENTENCES_H_ diff --git a/src/parser/MutateSentences.cpp b/src/parser/MutateSentences.cpp index 3f5a1b3807121f9727ab01fb31c293a92eda96c0..50e2d32c99d2c05aa6ebda30df750cd0d2fe29b2 100644 --- a/src/parser/MutateSentences.cpp +++ b/src/parser/MutateSentences.cpp @@ -282,19 +282,4 @@ std::string IngestSentence::toString() const { return "INGEST"; } -std::string AdminSentence::toString() const { - return op_; -} - -std::string AdminSentence::getType() const { - return op_; -} - -std::vector<std::string> AdminSentence::getParas() const { - return paras_; -} - -void AdminSentence::addPara(const std::string& para) { - paras_.emplace_back(para); -} } // namespace nebula diff --git a/src/parser/MutateSentences.h b/src/parser/MutateSentences.h index 48fcc79703ee7caab5568197209e94e0e5126b43..44ad27b56cf559a9582f1d22c9054c938475bff1 100644 --- a/src/parser/MutateSentences.h +++ b/src/parser/MutateSentences.h @@ -645,20 +645,5 @@ public: std::string toString() const override; }; -class AdminSentence final : public Sentence { -public: - explicit AdminSentence(const std::string& op) : op_(op) { - kind_ = Kind::kAdmin; - } - - void addPara(const std::string& para); - std::string toString() const override; - std::string getType() const; - std::vector<std::string> getParas() const; -private: - std::string op_; - std::vector<std::string> paras_; -}; - } // namespace nebula #endif // PARSER_MUTATESENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 4b4a62a44c216c1f30d81dda794374bc308fb445..e91a088954f453e6dca998c95cccc84e0f90f470 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -101,7 +101,7 @@ public: kReturn, kCreateSnapshot, kDropSnapshot, - kAdmin, + kAdminJob, kGetSubgraph, }; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index da6db7c84f2923e711bb63295907584a0068addc..b90d63814a8d168be6c70a901ac0c0bc0495536b 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -17,6 +17,7 @@ #include "parser/ExplainSentence.h" #include "parser/SequentialSentences.h" #include "parser/ColumnTypeDef.h" +#include "common/interface/gen-cpp2/meta_types.h" #include "util/SchemaUtil.h" namespace nebula { @@ -158,7 +159,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL; %token <strval> STRING VARIABLE LABEL IPV4 %type <strval> name_label unreserved_keyword agg_function -%type <strval> admin_operation admin_para +%type <strval> admin_job_operation admin_job_para %type <expr> expression logic_xor_expression logic_or_expression logic_and_expression %type <expr> relational_expression multiplicative_expression additive_expression %type <expr> unary_expression constant_expression equality_expression base_expression @@ -255,7 +256,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL; %type <sentence> rebuild_tag_index_sentence rebuild_edge_index_sentence %type <sentence> create_snapshot_sentence drop_snapshot_sentence -%type <sentence> admin_sentence +%type <sentence> admin_job_sentence %type <sentence> create_user_sentence alter_user_sentence drop_user_sentence change_password_sentence %type <sentence> show_sentence @@ -1783,41 +1784,42 @@ ingest_sentence } ; -admin_sentence - : KW_SUBMIT KW_JOB admin_operation { - auto sentence = new AdminSentence("add_job"); +admin_job_sentence + : KW_SUBMIT KW_JOB admin_job_operation { + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD); sentence->addPara(*$3); + delete $3; $$ = sentence; } | KW_SHOW KW_JOBS { - auto sentence = new AdminSentence("show_jobs"); + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::SHOW_All); $$ = sentence; } | KW_SHOW KW_JOB legal_integer { - auto sentence = new AdminSentence("show_job"); + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::SHOW); sentence->addPara(std::to_string($3)); $$ = sentence; } | KW_STOP KW_JOB legal_integer { - auto sentence = new AdminSentence("stop_job"); + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::STOP); sentence->addPara(std::to_string($3)); $$ = sentence; } | KW_RECOVER KW_JOB { - auto sentence = new AdminSentence("recover_job"); + auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::RECOVER); $$ = sentence; } ; -admin_operation +admin_job_operation : KW_COMPACT { $$ = new std::string("compact"); } | KW_FLUSH { $$ = new std::string("flush"); } - | admin_operation admin_para { + | admin_job_operation admin_job_para { $$ = new std::string(*$1 + " " + *$2); } ; -admin_para +admin_job_para : name_label ASSIGN name_label { auto left = *$1; auto right = *$3; @@ -2168,7 +2170,7 @@ mutate_sentence | update_edge_sentence { $$ = $1; } | download_sentence { $$ = $1; } | ingest_sentence { $$ = $1; } - | admin_sentence { $$ = $1; } + | admin_job_sentence { $$ = $1; } ; maintain_sentence diff --git a/src/planner/Admin.h b/src/planner/Admin.h index c1dcecbcfa4b40f74874ae080ae0ea8005b10c35..a745e60d0115e8cb5db1161440777c065064f77b 100644 --- a/src/planner/Admin.h +++ b/src/planner/Admin.h @@ -268,6 +268,50 @@ private: std::vector<PartitionID> partIds_; }; +class SubmitJob final : public SingleDependencyNode { +public: + static SubmitJob* make(ExecutionPlan* plan, + PlanNode* dep, + meta::cpp2::AdminJobOp op, + const std::vector<std::string>& params) { + return new SubmitJob(plan, dep, op, params); + } + + std::unique_ptr<cpp2::PlanNodeDescription> explain() const override { + // TODO(shylock) + LOG(FATAL) << "Unimplemented"; + return nullptr; + } + +public: + meta::cpp2::AdminJobOp jobOp() const { + return op_; + } + + meta::cpp2::AdminCmd cmd() const { + return cmd_; + } + + const std::vector<std::string> ¶ms() const { + return params_; + } + +private: + SubmitJob(ExecutionPlan* plan, + PlanNode* dep, + meta::cpp2::AdminJobOp op, + const std::vector<std::string> ¶ms) + : SingleDependencyNode(plan, Kind::kSubmitJob, dep), + op_(op), + params_(params) {} + + +private: + meta::cpp2::AdminJobOp op_; + meta::cpp2::AdminCmd cmd_; + const std::vector<std::string> params_; +}; + class ShowCharset final : public SingleInputNode { public: static ShowCharset* make(ExecutionPlan* plan, PlanNode* input) { diff --git a/src/planner/PlanNode.cpp b/src/planner/PlanNode.cpp index e42f8080dc095ef95fa4023484567739c274b77c..b16e2000aba4f1ff39a1af42796f7367226928c3 100644 --- a/src/planner/PlanNode.cpp +++ b/src/planner/PlanNode.cpp @@ -105,6 +105,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "DropSnapshot"; case Kind::kShowSnapshots: return "ShowSnapshots"; + case Kind::kSubmitJob: + return "SubmitJob"; case Kind::kDataJoin: return "DataJoin"; case Kind::kDeleteVertices: @@ -123,6 +125,7 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "ShowCharset"; case Kind::kShowCollation: return "ShowCollation"; + // no default so the compiler will warning when lack } LOG(FATAL) << "Impossible kind plan node " << static_cast<int>(kind); } diff --git a/src/planner/PlanNode.h b/src/planner/PlanNode.h index fbf0641b8f480a95c38a8d58530d76e5ad21116c..bd9cc35c9475af761497a21bbae8fc8f4bf80e42 100644 --- a/src/planner/PlanNode.h +++ b/src/planner/PlanNode.h @@ -65,6 +65,7 @@ public: kDropEdge, kInsertVertices, kInsertEdges, + kSubmitJob, kShowHosts, kDataCollect, kCreateSnapshot, diff --git a/src/service/PermissionCheck.cpp b/src/service/PermissionCheck.cpp index f4c71973c69ae57c601ba71e91b3020cf5118546..11500ef3c5b09ce2f918e10a85b77ce34dcd5610 100644 --- a/src/service/PermissionCheck.cpp +++ b/src/service/PermissionCheck.cpp @@ -56,7 +56,7 @@ bool PermissionCheck::permissionCheck(Session *session, case Sentence::Kind::kCreateSnapshot : case Sentence::Kind::kDropSnapshot : case Sentence::Kind::kBalance : - case Sentence::Kind::kAdmin : + case Sentence::Kind::kAdminJob : case Sentence::Kind::kIngest : case Sentence::Kind::kConfig : case Sentence::Kind::kDownload : { diff --git a/src/validator/AdminJobValidator.cpp b/src/validator/AdminJobValidator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a6782b0c083583bb000457342a97c2221c1e0af7 --- /dev/null +++ b/src/validator/AdminJobValidator.cpp @@ -0,0 +1,26 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. +* +* This source code is licensed under Apache 2.0 License, +* attached with Common Clause Condition 1.0, found in the LICENSES directory. +*/ + +#include "validator/AdminJobValidator.h" +#include "planner/Admin.h" + +namespace nebula { +namespace graph { + +Status AdminJobValidator::validateImpl() { + return Status::OK(); +} + +Status AdminJobValidator::toPlan() { + auto* plan = qctx_->plan(); + auto *doNode = SubmitJob::make(plan, nullptr, sentence_->getType(), sentence_->getParas()); + root_ = doNode; + tail_ = root_; + return Status::OK(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/validator/AdminJobValidator.h b/src/validator/AdminJobValidator.h new file mode 100644 index 0000000000000000000000000000000000000000..3505edc4d3c6a189d1413a74f246143555b32e70 --- /dev/null +++ b/src/validator/AdminJobValidator.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef VALIDATOR_ADMIN_JOB_VALIDATOR_H_ +#define VALIDATOR_ADMIN_JOB_VALIDATOR_H_ + +#include "common/base/Base.h" +#include "validator/Validator.h" +#include "parser/AdminSentences.h" + +namespace nebula { +namespace graph { + +class AdminJobValidator final : public Validator { +public: + AdminJobValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) { + sentence_ = static_cast<AdminJobSentence*>(sentence); + if (sentence_->getType() != meta::cpp2::AdminJobOp::ADD) { + setNoSpaceRequired(); + } + } + +private: + Status validateImpl() override; + + Status toPlan() override; + +private: + AdminJobSentence *sentence_{nullptr}; +}; + +} // namespace graph +} // namespace nebula + +#endif // VALIDATOR_ADMIN_JOB_VALIDATOR_H_ diff --git a/src/validator/CMakeLists.txt b/src/validator/CMakeLists.txt index 2510cf36b0c58c8596d69835b0f64c7626a7dad4..56d655a3981b44249b911b745b0545027a188f89 100644 --- a/src/validator/CMakeLists.txt +++ b/src/validator/CMakeLists.txt @@ -15,6 +15,7 @@ nebula_add_library( UseValidator.cpp GetSubgraphValidator.cpp AdminValidator.cpp + AdminJobValidator.cpp MaintainValidator.cpp MutateValidator.cpp FetchEdgesValidator.cpp diff --git a/src/validator/Validator.cpp b/src/validator/Validator.cpp index 4c7064209d3e93ecd21c96bc9b9d53316fbf9fa2..428507b0386073113bedbad625735c96a1989301 100644 --- a/src/validator/Validator.cpp +++ b/src/validator/Validator.cpp @@ -26,6 +26,7 @@ #include "validator/SequentialValidator.h" #include "validator/SetValidator.h" #include "validator/UseValidator.h" +#include "validator/AdminJobValidator.h" #include "validator/YieldValidator.h" #include "validator/GroupByValidator.h" #include "common/function/FunctionManager.h" @@ -103,6 +104,8 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique<InsertVerticesValidator>(sentence, context); case Sentence::Kind::kInsertEdges: return std::make_unique<InsertEdgesValidator>(sentence, context); + case Sentence::Kind::kAdminJob: + return std::make_unique<AdminJobValidator>(sentence, context); case Sentence::Kind::kFetchVertices: return std::make_unique<FetchVerticesValidator>(sentence, context); case Sentence::Kind::kFetchEdges: @@ -159,8 +162,7 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon case Sentence::Kind::kBalance: case Sentence::Kind::kConfig: case Sentence::Kind::kFindPath: - case Sentence::Kind::kReturn: - case Sentence::Kind::kAdmin: { + case Sentence::Kind::kReturn: { // nothing DLOG(FATAL) << "Unimplemented sentence " << kind; } @@ -205,6 +207,7 @@ Status Validator::appendPlan(PlanNode* node, PlanNode* appended) { case PlanNode::Kind::kShowEdges: case PlanNode::Kind::kCreateSnapshot: case PlanNode::Kind::kDropSnapshot: + case PlanNode::Kind::kSubmitJob: case PlanNode::Kind::kShowSnapshots: case PlanNode::Kind::kDeleteVertices: case PlanNode::Kind::kDeleteEdges: