diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index 7736813d002ebe81a6945a52cfaf44cfd53f5ddd..53e3f07546b35444c81a0b5bac757f26be1b8793 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -58,6 +58,8 @@ nebula_add_library( admin/PartExecutor.cpp admin/CharsetExecutor.cpp admin/ShowStatsExecutor.cpp + admin/DownloadExecutor.cpp + admin/IngestExecutor.cpp maintain/TagExecutor.cpp maintain/TagIndexExecutor.cpp maintain/EdgeExecutor.cpp diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 00458c0941c53a1a05ab56c82228a0f027fd0c7e..d867a95a52cef42d4d2e8e62e0f32617d83cd48d 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -42,6 +42,8 @@ #include "executor/admin/ShowTSClientsExecutor.h" #include "executor/admin/SignInTSServiceExecutor.h" #include "executor/admin/SignOutTSServiceExecutor.h" +#include "executor/admin/DownloadExecutor.h" +#include "executor/admin/IngestExecutor.h" #include "executor/algo/BFSShortestPathExecutor.h" #include "executor/algo/ProduceSemiShortestPathExecutor.h" #include "executor/algo/ConjunctPathExecutor.h" @@ -473,6 +475,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kSignOutTSService: { return pool->add(new SignOutTSServiceExecutor(node, qctx)); } + case PlanNode::Kind::kDownload: { + return pool->add(new DownloadExecutor(node, qctx)); + } + case PlanNode::Kind::kIngest: { + return pool->add(new IngestExecutor(node, qctx)); + } case PlanNode::Kind::kUnknown: { LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind()); break; diff --git a/src/executor/admin/DownloadExecutor.cpp b/src/executor/admin/DownloadExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f4611cf27ba891a1f532506e444aa18c3312781f --- /dev/null +++ b/src/executor/admin/DownloadExecutor.cpp @@ -0,0 +1,34 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "executor/admin/DownloadExecutor.h" +#include "planner/Admin.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> DownloadExecutor::execute() { + SCOPED_TIMER(&execTime_); + auto *dNode = asNode<Download>(node()); + auto spaceId = qctx()->rctx()->session()->space().id; + return qctx()->getMetaClient()->download(dNode->getHdfsHost(), + dNode->getHdfsPort(), + dNode->getHdfsPath(), + spaceId) + .via(runner()) + .then([this](StatusOr<bool> resp) { + SCOPED_TIMER(&execTime_); + NG_RETURN_IF_ERROR(resp); + if (!resp.value()) { + return Status::Error("Download failed!"); + } + return Status::OK(); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/admin/DownloadExecutor.h b/src/executor/admin/DownloadExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..748991c7ce10eb591ce4fd00ff225e35a81b6ce6 --- /dev/null +++ b/src/executor/admin/DownloadExecutor.h @@ -0,0 +1,26 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ +#define EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ + +#include "executor/Executor.h" + +namespace nebula { +namespace graph { + +class DownloadExecutor final : public Executor { +public: + DownloadExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("DownloadExecutor", node, qctx) {} + + folly::Future<Status> execute() override; +}; + +} // namespace graph +} // namespace nebula + +#endif // EXECUTOR_ADMIN_DOWNLOADEXECUTOR_H_ diff --git a/src/executor/admin/IngestExecutor.cpp b/src/executor/admin/IngestExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2455bdb27383051bf192bd8945c0bea6e4edbdbe --- /dev/null +++ b/src/executor/admin/IngestExecutor.cpp @@ -0,0 +1,29 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "executor/admin/IngestExecutor.h" +#include "planner/Admin.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> IngestExecutor::execute() { + auto spaceId = qctx()->rctx()->session()->space().id; + return qctx()->getMetaClient()->ingest(spaceId) + .via(runner()) + .then([this](StatusOr<bool> resp) { + SCOPED_TIMER(&execTime_); + NG_RETURN_IF_ERROR(resp); + if (!resp.value()) { + return Status::Error("Ingest failed!"); + } + return Status::OK(); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/admin/IngestExecutor.h b/src/executor/admin/IngestExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..ea64276e1852f5d433ecdf278b0702f10cc74a57 --- /dev/null +++ b/src/executor/admin/IngestExecutor.h @@ -0,0 +1,26 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXECUTOR_ADMIN_INGESTEXECUTOR_H_ +#define EXECUTOR_ADMIN_INGESTEXECUTOR_H_ + +#include "executor/Executor.h" + +namespace nebula { +namespace graph { + +class IngestExecutor final : public Executor { +public: + IngestExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("IngestExecutor", node, qctx) {} + + folly::Future<Status> execute() override; +}; + +} // namespace graph +} // namespace nebula + +#endif // EXECUTOR_ADMIN_INGESTEXECUTOR_H_ diff --git a/src/planner/Admin.h b/src/planner/Admin.h index 78d912b24fb283c9169743deecafd3ecb053e1a1..e3a82155b027f52f5a678af4e0aabf87be64e426 100644 --- a/src/planner/Admin.h +++ b/src/planner/Admin.h @@ -415,12 +415,54 @@ private: : SingleInputNode(qctx, Kind::kShowListener, input) {} }; -class Download final : public SingleInputNode { +class Download final : public SingleDependencyNode { public: + static Download* make(QueryContext* qctx, + PlanNode* input, + std::string hdfsHost, + int32_t hdfsPort, + std::string hdfsPath) { + return qctx->objPool()->add(new Download(qctx, input, hdfsHost, hdfsPort, hdfsPath)); + } + + const std::string& getHdfsHost() const { + return hdfsHost_; + } + + int32_t getHdfsPort() const { + return hdfsPort_; + } + + const std::string& getHdfsPath() const { + return hdfsPath_; + } + +private: + explicit Download(QueryContext* qctx, + PlanNode* dep, + std::string hdfsHost, + int32_t hdfsPort, + std::string hdfsPath) + : SingleDependencyNode(qctx, Kind::kDownload, dep), + hdfsHost_(hdfsHost), + hdfsPort_(hdfsPort), + hdfsPath_(hdfsPath) {} + +private: + std::string hdfsHost_; + int32_t hdfsPort_; + std::string hdfsPath_; }; -class Ingest final : public SingleInputNode { +class Ingest final : public SingleDependencyNode { public: + static Ingest* make(QueryContext* qctx, PlanNode* dep) { + return qctx->objPool()->add(new Ingest(qctx, dep)); + } + +private: + explicit Ingest(QueryContext* qctx, PlanNode* dep) + : SingleDependencyNode(qctx, Kind::kIngest, dep) {} }; // User related Node diff --git a/src/planner/PlanNode.cpp b/src/planner/PlanNode.cpp index e63d21af92724a811c9b4873516067ce78888d5c..c7661710e1ac98170190354d8bc9aa721fe87fcb 100644 --- a/src/planner/PlanNode.cpp +++ b/src/planner/PlanNode.cpp @@ -247,6 +247,10 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "SignInTSService"; case Kind::kSignOutTSService: return "SignOutTSService"; + case Kind::kDownload: + return "Download"; + case Kind::kIngest: + return "Ingest"; // no default so the compiler will warning when lack } LOG(FATAL) << "Impossible kind plan node " << static_cast<int>(kind); diff --git a/src/planner/PlanNode.h b/src/planner/PlanNode.h index ba7c11da0b77bbbb503981166e7bf64779383949..05b2568d9c27eadddf28e66f2e1a8b825d5bb19f 100644 --- a/src/planner/PlanNode.h +++ b/src/planner/PlanNode.h @@ -137,6 +137,8 @@ public: kShowTSClients, kSignInTSService, kSignOutTSService, + kDownload, + kIngest, }; PlanNode(QueryContext* qctx, Kind kind); diff --git a/src/validator/BalanceValidator.h b/src/validator/BalanceValidator.h index 719dc4ef88268850763acdaf36a00243fba62d18..51b20f66db334e8eb66ca4b9ff59acf0cd25b89c 100644 --- a/src/validator/BalanceValidator.h +++ b/src/validator/BalanceValidator.h @@ -17,7 +17,7 @@ namespace graph { class BalanceValidator final : public Validator { public: BalanceValidator(Sentence* sentence, QueryContext* context) - : Validator(sentence, context) { + : Validator(sentence, context) { setNoSpaceRequired(); } diff --git a/src/validator/CMakeLists.txt b/src/validator/CMakeLists.txt index ef915f1da5fe02c2e22d8df3baa349750f4e5d1a..9138284b4420c3c31163ca4a6f6417609ecd97ab 100644 --- a/src/validator/CMakeLists.txt +++ b/src/validator/CMakeLists.txt @@ -30,6 +30,8 @@ nebula_add_library( FindPathValidator.cpp LookupValidator.cpp MatchValidator.cpp + DownloadValidator.cpp + IngestValidator.cpp ) nebula_add_subdirectory(test) diff --git a/src/validator/DownloadValidator.cpp b/src/validator/DownloadValidator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2a63deb95184e10125cd67b4675b54393641d6dc --- /dev/null +++ b/src/validator/DownloadValidator.cpp @@ -0,0 +1,29 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. +* +* This source code is licensed under Apache 2.0 License, +* attached with Common Clause Condition 1.0, found in the LICENSES directory. +*/ + +#include "common/base/Base.h" + +#include "planner/Admin.h" +#include "validator/DownloadValidator.h" +#include "parser/MutateSentences.h" + +namespace nebula { +namespace graph { + +Status DownloadValidator::toPlan() { + auto sentence = static_cast<DownloadSentence*>(sentence_); + auto *doNode = Download::make(qctx_, + nullptr, + *sentence->host(), + sentence->port(), + *sentence->path()); + root_ = doNode; + tail_ = root_; + return Status::OK(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/validator/DownloadValidator.h b/src/validator/DownloadValidator.h new file mode 100644 index 0000000000000000000000000000000000000000..34f6ca9274110ec1def0af1c0e290b0e20f74c9e --- /dev/null +++ b/src/validator/DownloadValidator.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef VALIDATOR_DOWNLOADVALIDATOR_H_ +#define VALIDATOR_DOWNLOADVALIDATOR_H_ + +#include "common/base/Base.h" +#include "validator/Validator.h" +#include "parser/AdminSentences.h" + +namespace nebula { +namespace graph { + +class DownloadValidator final : public Validator { +public: + DownloadValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) {} + +private: + Status validateImpl() override { + return Status::OK(); + } + + Status toPlan() override; +}; + +} // namespace graph +} // namespace nebula + +#endif // VALIDATOR_DOWNLOADVALIDATOR_H_ diff --git a/src/validator/IngestValidator.cpp b/src/validator/IngestValidator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4f615124477c6bc4b5901d36cd24c0c20ecc0cc1 --- /dev/null +++ b/src/validator/IngestValidator.cpp @@ -0,0 +1,24 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. +* +* This source code is licensed under Apache 2.0 License, +* attached with Common Clause Condition 1.0, found in the LICENSES directory. +*/ + +#include "common/base/Base.h" + +#include "planner/Admin.h" +#include "validator/IngestValidator.h" +#include "parser/MutateSentences.h" + +namespace nebula { +namespace graph { + +Status IngestValidator::toPlan() { + auto *doNode = Ingest::make(qctx_, nullptr); + root_ = doNode; + tail_ = root_; + return Status::OK(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/validator/IngestValidator.h b/src/validator/IngestValidator.h new file mode 100644 index 0000000000000000000000000000000000000000..6a6b4b3b3f85f509c6bfe07442e5e3da1a23ff39 --- /dev/null +++ b/src/validator/IngestValidator.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef VALIDATOR_INGESTVALIDATOR_H_ +#define VALIDATOR_INGESTVALIDATOR_H_ + +#include "common/base/Base.h" +#include "validator/Validator.h" +#include "parser/AdminSentences.h" + +namespace nebula { +namespace graph { + +class IngestValidator final : public Validator { +public: + IngestValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) {} + +private: + Status validateImpl() override { + return Status::OK(); + } + + Status toPlan() override; +}; + +} // namespace graph +} // namespace nebula + +#endif // VALIDATOR_INGESTVALIDATOR_H_ diff --git a/src/validator/Validator.cpp b/src/validator/Validator.cpp index 40fa2bc97b7e105f40433a182f1348f53357b53e..edad9953aef28c360dfd43a36a9a427ec82c6eb5 100644 --- a/src/validator/Validator.cpp +++ b/src/validator/Validator.cpp @@ -39,6 +39,8 @@ #include "validator/MatchValidator.h" #include "visitor/EvaluableExprVisitor.h" #include "validator/LookupValidator.h" +#include "validator/DownloadValidator.h" +#include "validator/IngestValidator.h" namespace nebula { namespace graph { @@ -233,11 +235,13 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique<SignInTSServiceValidator>(sentence, context); case Sentence::Kind::kSignOutTSService: return std::make_unique<SignOutTSServiceValidator>(sentence, context); + case Sentence::Kind::kDownload: + return std::make_unique<DownloadValidator>(sentence, context); + case Sentence::Kind::kIngest: + return std::make_unique<IngestValidator>(sentence, context); case Sentence::Kind::kShowGroups: case Sentence::Kind::kShowZones: case Sentence::Kind::kUnknown: - case Sentence::Kind::kDownload: - case Sentence::Kind::kIngest: case Sentence::Kind::kReturn: { // nothing DLOG(FATAL) << "Unimplemented sentence " << kind;