From 5ef41cc3161e6403d0ac3ef9144f7e6b5a9a8f87 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Mon, 17 Aug 2020 16:46:03 +0800 Subject: [PATCH] Feature/balance (#66) * Add the executor and plan node for balance. * Add the mock of balance. * Add the validator and testing about balance. * Fix the QueryContext. * Rename dels to deleteHosts. * Get show balance tasks count directly. * Fix the schema test. * Fix the compile error. * Fix the header guard. * Fix compile error. * Remove the unused call back. * Add executor time stat. * Mark explain unimplemented. * Add call back time stat. * Correct the header guard format. * Fix header guard format. * Fix the header guard. Co-authored-by: laura-ding <48548375+laura-ding@users.noreply.github.com> --- src/exec/CMakeLists.txt | 4 + src/exec/Executor.cpp | 32 ++++ src/exec/Executor.h | 1 + src/exec/admin/BalanceExecutor.cpp | 35 +++++ src/exec/admin/BalanceExecutor.h | 30 ++++ src/exec/admin/BalanceLeadersExecutor.cpp | 35 +++++ src/exec/admin/BalanceLeadersExecutor.h | 30 ++++ src/exec/admin/ShowBalanceExecutor.cpp | 64 ++++++++ src/exec/admin/ShowBalanceExecutor.h | 30 ++++ src/exec/admin/StopBalanceExecutor.cpp | 34 ++++ src/exec/admin/StopBalanceExecutor.h | 30 ++++ src/mock/MetaCache.cpp | 115 +++++++++++--- src/mock/MetaCache.h | 23 ++- src/mock/MockMetaServiceHandler.cpp | 46 +++++- src/mock/test/BalanceTest.cpp | 182 ++++++++++++++++++++++ src/mock/test/CMakeLists.txt | 16 ++ src/planner/Admin.h | 80 +++++++++- src/planner/PlanNode.cpp | 8 + src/planner/PlanNode.h | 4 + src/validator/BalanceValidator.cpp | 48 ++++++ src/validator/BalanceValidator.h | 35 +++++ src/validator/CMakeLists.txt | 1 + src/validator/Validator.cpp | 8 +- 23 files changed, 857 insertions(+), 34 deletions(-) create mode 100644 src/exec/admin/BalanceExecutor.cpp create mode 100644 src/exec/admin/BalanceExecutor.h create mode 100644 src/exec/admin/BalanceLeadersExecutor.cpp create mode 100644 src/exec/admin/BalanceLeadersExecutor.h create mode 100644 src/exec/admin/ShowBalanceExecutor.cpp create mode 100644 src/exec/admin/ShowBalanceExecutor.h create mode 100644 src/exec/admin/StopBalanceExecutor.cpp create mode 100644 src/exec/admin/StopBalanceExecutor.h create mode 100644 src/mock/test/BalanceTest.cpp create mode 100644 src/validator/BalanceValidator.cpp create mode 100644 src/validator/BalanceValidator.h diff --git a/src/exec/CMakeLists.txt b/src/exec/CMakeLists.txt index ed334744..f0552041 100644 --- a/src/exec/CMakeLists.txt +++ b/src/exec/CMakeLists.txt @@ -28,6 +28,10 @@ nebula_add_library( query/DataJoinExecutor.cpp admin/SwitchSpaceExecutor.cpp admin/SubmitJobExecutor.cpp + admin/BalanceExecutor.cpp + admin/StopBalanceExecutor.cpp + admin/BalanceLeadersExecutor.cpp + admin/ShowBalanceExecutor.cpp admin/ShowHostsExecutor.cpp admin/SpaceExecutor.cpp admin/SnapshotExecutor.cpp diff --git a/src/exec/Executor.cpp b/src/exec/Executor.cpp index 407ecf5d..9b42c319 100644 --- a/src/exec/Executor.cpp +++ b/src/exec/Executor.cpp @@ -13,6 +13,10 @@ #include "context/ExecutionContext.h" #include "context/QueryContext.h" #include "exec/ExecutionError.h" +#include "exec/admin/BalanceLeadersExecutor.h" +#include "exec/admin/BalanceExecutor.h" +#include "exec/admin/StopBalanceExecutor.h" +#include "exec/admin/ShowBalanceExecutor.h" #include "exec/admin/SubmitJobExecutor.h" #include "exec/admin/ShowHostsExecutor.h" #include "exec/admin/SnapshotExecutor.h" @@ -404,6 +408,34 @@ Executor *Executor::makeExecutor(const PlanNode *node, exec->dependsOn(input); break; } + case PlanNode::Kind::kBalanceLeaders: { + auto balanceLeaders = asNode<BalanceLeaders>(node); + auto dep = makeExecutor(balanceLeaders->dep(), qctx, visited); + exec = new BalanceLeadersExecutor(balanceLeaders, qctx); + exec->dependsOn(dep); + break; + } + case PlanNode::Kind::kBalance: { + auto balance = asNode<Balance>(node); + auto dep = makeExecutor(balance->dep(), qctx, visited); + exec = new BalanceExecutor(balance, qctx); + exec->dependsOn(dep); + break; + } + case PlanNode::Kind::kStopBalance: { + auto stopBalance = asNode<Balance>(node); + auto dep = makeExecutor(stopBalance->dep(), qctx, visited); + exec = new StopBalanceExecutor(stopBalance, qctx); + exec->dependsOn(dep); + break; + } + case PlanNode::Kind::kShowBalance: { + auto showBalance = asNode<ShowBalance>(node); + auto dep = makeExecutor(showBalance->dep(), qctx, visited); + exec = new ShowBalanceExecutor(showBalance, qctx); + exec->dependsOn(dep); + break; + } case PlanNode::Kind::kShowConfigs: { auto showConfigs = asNode<ShowConfigs>(node); auto input = makeExecutor(showConfigs->dep(), qctx, visited); diff --git a/src/exec/Executor.h b/src/exec/Executor.h index 86045d29..16811b76 100644 --- a/src/exec/Executor.h +++ b/src/exec/Executor.h @@ -18,6 +18,7 @@ #include "common/cpp/helpers.h" #include "common/datatypes/Value.h" #include "common/time/Duration.h" +#include "util/ScopedTimer.h" #include "context/ExecutionContext.h" namespace nebula { diff --git a/src/exec/admin/BalanceExecutor.cpp b/src/exec/admin/BalanceExecutor.cpp new file mode 100644 index 00000000..8f0e67d4 --- /dev/null +++ b/src/exec/admin/BalanceExecutor.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/admin/BalanceExecutor.h" +#include "planner/Admin.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> BalanceExecutor::execute() { + SCOPED_TIMER(&execTime_); + return balance(); +} + +folly::Future<Status> BalanceExecutor::balance() { + auto *bNode = asNode<Balance>(node()); + return qctx()->getMetaClient()->balance(bNode->deleteHosts(), false) + .via(runner()) + .then([this](StatusOr<int64_t> resp) { + SCOPED_TIMER(&execTime_); + if (!resp.ok()) { + LOG(ERROR) << resp.status(); + return resp.status(); + } + DataSet v({"ID"}); + v.emplace_back(Row({resp.value()})); + return finish(std::move(v)); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/exec/admin/BalanceExecutor.h b/src/exec/admin/BalanceExecutor.h new file mode 100644 index 00000000..00c630da --- /dev/null +++ b/src/exec/admin/BalanceExecutor.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXEC_ADMIN_BALANCEEXECUTOR_H_ +#define EXEC_ADMIN_BALANCEEXECUTOR_H_ + +#include "exec/Executor.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +class BalanceExecutor final : public Executor { +public: + BalanceExecutor(const PlanNode *node, QueryContext *ectx) + : Executor("BalanceExecutor", node, ectx) {} + + folly::Future<Status> execute() override; + +private: + folly::Future<Status> balance(); +}; + +} // namespace graph +} // namespace nebula + +#endif // EXEC_ADMIN_BALANCEEXECUTOR_H_ diff --git a/src/exec/admin/BalanceLeadersExecutor.cpp b/src/exec/admin/BalanceLeadersExecutor.cpp new file mode 100644 index 00000000..fa29cf93 --- /dev/null +++ b/src/exec/admin/BalanceLeadersExecutor.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/admin/BalanceLeadersExecutor.h" +#include "planner/Admin.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> BalanceLeadersExecutor::execute() { + SCOPED_TIMER(&execTime_); + return balanceLeaders(); +} + +folly::Future<Status> BalanceLeadersExecutor::balanceLeaders() { + return qctx()->getMetaClient()->balanceLeader() + .via(runner()) + .then([this](StatusOr<bool> resp) { + SCOPED_TIMER(&execTime_); + if (!resp.ok()) { + LOG(ERROR) << resp.status(); + return resp.status(); + } + if (!resp.value()) { + return Status::Error("Balance leaders failed"); + } + return Status::OK(); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/exec/admin/BalanceLeadersExecutor.h b/src/exec/admin/BalanceLeadersExecutor.h new file mode 100644 index 00000000..db68d1ac --- /dev/null +++ b/src/exec/admin/BalanceLeadersExecutor.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXEC_ADMIN_BALANCELEADERSEXECUTOR_H_ +#define EXEC_ADMIN_BALANCELEADERSEXECUTOR_H_ + +#include "exec/Executor.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +class BalanceLeadersExecutor final : public Executor { +public: + BalanceLeadersExecutor(const PlanNode *node, QueryContext *ectx) + : Executor("BaanceLeadersExecutor", node, ectx) {} + + folly::Future<Status> execute() override; + +private: + folly::Future<Status> balanceLeaders(); +}; + +} // namespace graph +} // namespace nebula + +#endif // EXEC_ADMIN_BALANCELEADERSEXECUTOR_H_ diff --git a/src/exec/admin/ShowBalanceExecutor.cpp b/src/exec/admin/ShowBalanceExecutor.cpp new file mode 100644 index 00000000..f165f9ee --- /dev/null +++ b/src/exec/admin/ShowBalanceExecutor.cpp @@ -0,0 +1,64 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/admin/ShowBalanceExecutor.h" +#include "planner/Admin.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> ShowBalanceExecutor::execute() { + SCOPED_TIMER(&execTime_); + return showBalance(); +} + +folly::Future<Status> ShowBalanceExecutor::showBalance() { + auto *sbNode = asNode<ShowBalance>(node()); + return qctx()->getMetaClient()->showBalance(sbNode->id()) + .via(runner()) + .then([this](StatusOr<std::vector<meta::cpp2::BalanceTask>> resp) { + SCOPED_TIMER(&execTime_); + if (!resp.ok()) { + LOG(ERROR) << resp.status(); + return std::move(resp).status(); + } + auto tasks = std::move(resp).value(); + // TODO(shylock) typed items instead binary + // E.G. "balanceId", "spaceId", "partId", "from", "to" + uint32_t total = tasks.size(), succeeded = 0, failed = 0, inProgress = 0, invalid = 0; + DataSet v({"balanceId, spaceId:partId, src->dst", "status"}); + for (auto &task : tasks) { + switch (task.get_result()) { + case meta::cpp2::TaskResult::FAILED: + ++failed; + break; + case meta::cpp2::TaskResult::IN_PROGRESS: + ++inProgress; + break; + case meta::cpp2::TaskResult::INVALID: + ++invalid; + break; + case meta::cpp2::TaskResult::SUCCEEDED: + ++succeeded; + break; + } + v.emplace_back(Row({ + std::move(task).get_id(), + meta::cpp2::_TaskResult_VALUES_TO_NAMES.at(task.get_result()) + })); + } + double percentage = total == 0 ? 0 : static_cast<double>(succeeded) / total * 100; + v.emplace_back(Row({ + folly::sformat("Total:{}, Succeeded:{}, Failed:{}, In Progress:{}, Invalid:{}", + total, succeeded, failed, inProgress, invalid), + percentage + })); + return finish(std::move(v)); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/exec/admin/ShowBalanceExecutor.h b/src/exec/admin/ShowBalanceExecutor.h new file mode 100644 index 00000000..7fc03ed2 --- /dev/null +++ b/src/exec/admin/ShowBalanceExecutor.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXEC_ADMIN_SHOWBALANCEEXECUTOR_H_ +#define EXEC_ADMIN_SHOWBALANCEEXECUTOR_H_ + +#include "exec/Executor.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +class ShowBalanceExecutor final : public Executor { +public: + ShowBalanceExecutor(const PlanNode *node, QueryContext *ectx) + : Executor("ShowBalanceExecutor", node, ectx) {} + + folly::Future<Status> execute() override; + +private: + folly::Future<Status> showBalance(); +}; + +} // namespace graph +} // namespace nebula + +#endif // EXEC_ADMIN_SHOWBALANCEEXECUTOR_H_ diff --git a/src/exec/admin/StopBalanceExecutor.cpp b/src/exec/admin/StopBalanceExecutor.cpp new file mode 100644 index 00000000..ccd94460 --- /dev/null +++ b/src/exec/admin/StopBalanceExecutor.cpp @@ -0,0 +1,34 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "exec/admin/StopBalanceExecutor.h" +#include "planner/Admin.h" + +namespace nebula { +namespace graph { + +folly::Future<Status> StopBalanceExecutor::execute() { + SCOPED_TIMER(&execTime_); + return stopBalance(); +} + +folly::Future<Status> StopBalanceExecutor::stopBalance() { + return qctx()->getMetaClient()->balance({}, true) + .via(runner()) + .then([this](StatusOr<int64_t> resp) { + SCOPED_TIMER(&execTime_); + if (!resp.ok()) { + LOG(ERROR) << resp.status(); + return resp.status(); + } + DataSet v({"ID"}); + v.emplace_back(Row({resp.value()})); + return finish(std::move(v)); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/exec/admin/StopBalanceExecutor.h b/src/exec/admin/StopBalanceExecutor.h new file mode 100644 index 00000000..857b90da --- /dev/null +++ b/src/exec/admin/StopBalanceExecutor.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXEC_ADMIN_STOPBALANCEEXECUTOR_H_ +#define EXEC_ADMIN_STOPBALANCEEXECUTOR_H_ + +#include "exec/Executor.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +class StopBalanceExecutor final : public Executor { +public: + StopBalanceExecutor(const PlanNode *node, QueryContext *ectx) + : Executor("StopBalanceExecutor", node, ectx) {} + + folly::Future<Status> execute() override; + +private: + folly::Future<Status> stopBalance(); +}; + +} // namespace graph +} // namespace nebula + +#endif // EXEC_ADMIN_STOPBALANCEEXECUTOR_H_ diff --git a/src/mock/MetaCache.cpp b/src/mock/MetaCache.cpp index 9b6a3909..0a74d98d 100644 --- a/src/mock/MetaCache.cpp +++ b/src/mock/MetaCache.cpp @@ -22,19 +22,20 @@ Status MetaCache::createSpace(const meta::cpp2::CreateSpaceReq &req, GraphSpaceI auto ifNotExists = req.get_if_not_exists(); auto properties = req.get_properties(); auto spaceName = properties.get_space_name(); - auto findIter = spaces_.find(spaceName); - if (ifNotExists && findIter != spaces_.end()) { - spaceId = findIter->second.get_space_id(); + const auto findIter = spaceIndex_.find(spaceName); + if (ifNotExists && findIter != spaceIndex_.end()) { + spaceId = findIter->second; return Status::OK(); } - if (findIter != spaces_.end()) { + if (findIter != spaceIndex_.end()) { return Status::Error("Space `%s' existed", spaceName.c_str()); } spaceId = ++id_; + spaceIndex_.emplace(spaceName, spaceId); meta::cpp2::SpaceItem space; space.set_space_id(spaceId); space.set_properties(std::move(properties)); - spaces_[spaceName] = space; + spaces_[spaceId] = space; VLOG(1) << "space name: " << space.get_properties().get_space_name() << ", partition_num: " << space.get_properties().get_partition_num() << ", replica_factor: " << space.get_properties().get_replica_factor() @@ -45,25 +46,26 @@ Status MetaCache::createSpace(const meta::cpp2::CreateSpaceReq &req, GraphSpaceI StatusOr<meta::cpp2::SpaceItem> MetaCache::getSpace(const meta::cpp2::GetSpaceReq &req) { folly::RWSpinLock::ReadHolder holder(lock_); - auto findIter = spaces_.find(req.get_space_name()); - if (findIter == spaces_.end()) { + auto findIter = spaceIndex_.find(req.get_space_name()); + if (findIter == spaceIndex_.end()) { LOG(ERROR) << "Space " << req.get_space_name().c_str() << " not found"; return Status::Error("Space `%s' not found", req.get_space_name().c_str()); } - VLOG(1) << "space name: " << findIter->second.get_properties().get_space_name() - << ", partition_num: " << findIter->second.get_properties().get_partition_num() - << ", replica_factor: " << findIter->second.get_properties().get_replica_factor() - << ", rvid_size: " << findIter->second.get_properties().get_vid_size(); - return findIter->second; + const auto spaceInfo = spaces_.find(findIter->second); + VLOG(1) << "space name: " << spaceInfo->second.get_properties().get_space_name() + << ", partition_num: " << spaceInfo->second.get_properties().get_partition_num() + << ", replica_factor: " << spaceInfo->second.get_properties().get_replica_factor() + << ", rvid_size: " << spaceInfo->second.get_properties().get_vid_size(); + return spaceInfo->second; } StatusOr<std::vector<meta::cpp2::IdName>> MetaCache::listSpaces() { folly::RWSpinLock::ReadHolder holder(lock_); std::vector<meta::cpp2::IdName> spaces; - for (auto &item : spaces_) { + for (const auto &index : spaceIndex_) { meta::cpp2::IdName idName; - idName.set_id(to(item.second.get_space_id(), EntryType::SPACE)); - idName.set_name(item.first); + idName.set_id(to(index.second, EntryType::SPACE)); + idName.set_name(index.first); spaces.emplace_back(idName); } return spaces; @@ -72,19 +74,20 @@ StatusOr<std::vector<meta::cpp2::IdName>> MetaCache::listSpaces() { Status MetaCache::dropSpace(const meta::cpp2::DropSpaceReq &req) { folly::RWSpinLock::WriteHolder holder(lock_); auto spaceName = req.get_space_name(); - auto findIter = spaces_.find(spaceName); + auto findIter = spaceIndex_.find(spaceName); auto ifExists = req.get_if_exists(); - if (ifExists && findIter == spaces_.end()) { + if (ifExists && findIter == spaceIndex_.end()) { Status::OK(); } - if (findIter == spaces_.end()) { + if (findIter == spaceIndex_.end()) { return Status::Error("Space `%s' not existed", req.get_space_name().c_str()); } - auto id = findIter->second.get_space_id(); - spaces_.erase(spaceName); + auto id = findIter->second; + spaces_.erase(id); cache_.erase(id); + spaceIndex_.erase(spaceName); return Status::OK(); } @@ -297,7 +300,7 @@ Status MetaCache::listUsers(const meta::cpp2::ListUsersReq&) { std::vector<meta::cpp2::HostItem> MetaCache::listHosts() { folly::RWSpinLock::WriteHolder holder(lock_); std::vector<meta::cpp2::HostItem> hosts; - for (auto& spaceIdIt : spaces_) { + for (auto& spaceIdIt : spaceIndex_) { auto spaceName = spaceIdIt.first; for (auto &h : hostSet_) { meta::cpp2::HostItem host; @@ -323,6 +326,76 @@ std::unordered_map<PartitionID, std::vector<HostAddr>> MetaCache::getParts() { return parts; } +ErrorOr<meta::cpp2::ErrorCode, int64_t> MetaCache::balanceSubmit(std::vector<HostAddr> dels) { + folly::RWSpinLock::ReadHolder rh(lock_); + for (const auto &job : balanceJobs_) { + if (job.second.status == meta::cpp2::TaskResult::IN_PROGRESS) { + return meta::cpp2::ErrorCode::E_BALANCER_RUNNING; + } + } + std::vector<BalanceTask> jobs; + for (const auto &spaceInfo : spaces_) { + for (PartitionID i = 1; i <= spaceInfo.second.get_properties().get_partition_num(); ++i) { + for (const auto &host : hostSet_) { // Note mock partition in each host here + if (std::find(dels.begin(), dels.end(), host) != dels.end()) { + continue; + } + jobs.emplace_back(BalanceTask{ + // mock + spaceInfo.first, + i, + host, + host, + meta::cpp2::TaskResult::IN_PROGRESS, + }); + } + } + } + auto jobId = incId(); + balanceTasks_.emplace(jobId, std::move(jobs)); + balanceJobs_.emplace(jobId, BalanceJob{ + meta::cpp2::TaskResult::IN_PROGRESS, + }); + return jobId; +} + +ErrorOr<meta::cpp2::ErrorCode, int64_t> MetaCache::balanceStop() { + for (auto &job : balanceJobs_) { + if (job.second.status == meta::cpp2::TaskResult::IN_PROGRESS) { + job.second.status = meta::cpp2::TaskResult::FAILED; + return job.first; + } + } + return meta::cpp2::ErrorCode::E_NO_RUNNING_BALANCE_PLAN; +} + +meta::cpp2::ErrorCode MetaCache::balanceLeaders() { + return meta::cpp2::ErrorCode::SUCCEEDED; +} + +ErrorOr<meta::cpp2::ErrorCode, std::vector<meta::cpp2::BalanceTask>> +MetaCache::showBalance(int64_t id) { + const auto job = balanceTasks_.find(id); + if (job == balanceTasks_.end()) { + return meta::cpp2::ErrorCode::E_NOT_FOUND; + } + std::vector<meta::cpp2::BalanceTask> result; + result.reserve(job->second.size()); + for (const auto &task : job->second) { + meta::cpp2::BalanceTask taskInfo; + std::stringstream idStr; + idStr << "["; + idStr << id << ", "; + idStr << task.space << ":" << task.part << ", "; + idStr << task.from << "->" << task.to; + idStr << "]"; + taskInfo.set_id(idStr.str()); + taskInfo.set_result(task.status); + result.emplace_back(std::move(taskInfo)); + } + return result; +} + ErrorOr<meta::cpp2::ErrorCode, meta::cpp2::AdminJobResult> MetaCache::runAdminJob(const meta::cpp2::AdminJobReq& req) { meta::cpp2::AdminJobResult result; diff --git a/src/mock/MetaCache.h b/src/mock/MetaCache.h index 9feeacc2..d109f3d0 100644 --- a/src/mock/MetaCache.h +++ b/src/mock/MetaCache.h @@ -70,6 +70,12 @@ public: std::unordered_map<PartitionID, std::vector<HostAddr>> getParts(); + ErrorOr<meta::cpp2::ErrorCode, int64_t> balanceSubmit(std::vector<HostAddr> dels); + ErrorOr<meta::cpp2::ErrorCode, int64_t> balanceStop(); + meta::cpp2::ErrorCode balanceLeaders(); + ErrorOr<meta::cpp2::ErrorCode, std::vector<meta::cpp2::BalanceTask>> + showBalance(int64_t id); + ErrorOr<meta::cpp2::ErrorCode, meta::cpp2::AdminJobResult> runAdminJob(const meta::cpp2::AdminJobReq& req); @@ -139,12 +145,27 @@ private: }; std::unordered_set<HostAddr> hostSet_; + std::unordered_map<std::string, GraphSpaceID> spaceIndex_; std::unordered_map<GraphSpaceID, SpaceInfoCache> cache_; - std::unordered_map<std::string, meta::cpp2::SpaceItem> spaces_; + std::unordered_map<GraphSpaceID, meta::cpp2::SpaceItem> spaces_; int64_t id_{0}; std::unordered_map<std::string, meta::cpp2::Snapshot> snapshots_; mutable folly::RWSpinLock lock_; +////////////////////////////////////////////// Balance ///////////////////////////////////////////// + struct BalanceTask { + GraphSpaceID space; + PartitionID part; + HostAddr from; + HostAddr to; + meta::cpp2::TaskResult status; + }; + struct BalanceJob { + meta::cpp2::TaskResult status; + }; + std::unordered_map<int64_t, std::vector<BalanceTask>> balanceTasks_; + std::unordered_map<int64_t, BalanceJob> balanceJobs_; + ////////////////////////////////////////////// Job ///////////////////////////////////////////////// struct JobDesc { meta::cpp2::AdminCmd cmd_; // compact, flush ... diff --git a/src/mock/MockMetaServiceHandler.cpp b/src/mock/MockMetaServiceHandler.cpp index 3fd8e1d5..63830a57 100644 --- a/src/mock/MockMetaServiceHandler.cpp +++ b/src/mock/MockMetaServiceHandler.cpp @@ -526,18 +526,50 @@ MockMetaServiceHandler::future_getUserRoles(const meta::cpp2::GetUserRolesReq&) } folly::Future<meta::cpp2::BalanceResp> -MockMetaServiceHandler::future_balance(const meta::cpp2::BalanceReq&) { - folly::Promise<meta::cpp2::BalanceResp> promise; - auto future = promise.getFuture(); +MockMetaServiceHandler::future_balance(const meta::cpp2::BalanceReq& req) { meta::cpp2::BalanceResp resp; - resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED); - promise.setValue(std::move(resp)); - return future; + if (req.__isset.id) { + // show + auto result = MetaCache::instance().showBalance(*req.get_id()); + if (ok(result)) { + resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED); + resp.set_tasks(std::move(result).right()); + } else { + resp.set_code(result.left()); + } + } else if (req.__isset.stop) { + // stop + auto result = MetaCache::instance().balanceStop(); + if (ok(result)) { + resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED); + resp.set_id(result.right()); + } else { + resp.set_code(result.left()); + } + } else { + ErrorOr<meta::cpp2::ErrorCode, int64_t> result; + // submit + if (req.__isset.host_del) { + result = MetaCache::instance().balanceSubmit(*req.get_host_del()); + } else { + result = MetaCache::instance().balanceSubmit({}); + } + if (ok(result)) { + resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED); + resp.set_id(result.right()); + } else { + resp.set_code(result.left()); + } + } + return resp; } folly::Future<meta::cpp2::ExecResp> MockMetaServiceHandler::future_leaderBalance(const meta::cpp2::LeaderBalanceReq&) { - RETURN_SUCCESSED(); + meta::cpp2::ExecResp resp; + auto result = MetaCache::instance().balanceLeaders(); + resp.set_code(result); + return resp; } folly::Future<meta::cpp2::ExecResp> diff --git a/src/mock/test/BalanceTest.cpp b/src/mock/test/BalanceTest.cpp new file mode 100644 index 00000000..92f8f870 --- /dev/null +++ b/src/mock/test/BalanceTest.cpp @@ -0,0 +1,182 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include <gtest/gtest.h> +#include "common/base/Status.h" +#include "common/interface/gen-cpp2/common_types.h" +#include "common/network/NetworkUtils.h" +#include "mock/test/TestBase.h" +#include "mock/test/TestEnv.h" + +DECLARE_int32(heartbeat_interval_secs); + +namespace nebula { +namespace graph { + +class BalanceTest : public TestBase { +public: + void SetUp() override { + TestBase::SetUp(); + client_ = gEnv->getGraphClient(); + ASSERT_NE(nullptr, client_); + }; + + void TearDown() override { + TestBase::TearDown(); + client_.reset(); + }; + +protected: + std::unique_ptr<GraphClient> client_; +}; + +TEST_F(BalanceTest, Error) { + { + // Show one not exists + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA 233;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_EXECUTION_ERROR); + } + { + // stop not exists + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA STOP;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_EXECUTION_ERROR); + } +} + +TEST_F(BalanceTest, Simple) { + { + // balance leader + cpp2::ExecutionResponse resp; + std::string query = "BALANCE LEADER;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + } + { + // balance without space + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + DataSet expected({"ID"}); + expected.emplace_back(Row({1})); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // show one without space + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA 1;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + DataSet expected({"balanceId, spaceId:partId, src->dst", "status"}); + expected.emplace_back(Row({ + "Total:0, Succeeded:0, Failed:0, In Progress:0, Invalid:0", + 0, + })); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // stop + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA STOP;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + } + { + // create space + cpp2::ExecutionResponse resp; + std::string query = "CREATE SPACE space1(partition_num=3, replica_factor=1);"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + } + { + // create space + cpp2::ExecutionResponse resp; + std::string query = "CREATE SPACE space2(partition_num=9, replica_factor=3);"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + } + { + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + DataSet expected({"ID"}); + expected.emplace_back(Row({4})); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } + { + // show one + cpp2::ExecutionResponse resp; + std::string query = "BALANCE DATA 4;"; + client_->execute(query, resp); + ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED); + + // TODO(shylock) dataset constructor + auto storagePort = gEnv->storageServerPort(); + DataSet expected({"balanceId, spaceId:partId, src->dst", "status"}); + expected.emplace_back(Row({ + folly::sformat("[4, 3:1, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:2, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:3, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:4, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:5, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:6, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:7, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:8, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 3:9, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 2:1, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 2:2, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back(Row({ + folly::sformat("[4, 2:3, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort), + "IN_PROGRESS", + })); + expected.emplace_back( + Row({"Total:12, Succeeded:0, Failed:0, In Progress:12, Invalid:0", 0})); + ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected)); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/mock/test/CMakeLists.txt b/src/mock/test/CMakeLists.txt index aabdf6d0..b977ac10 100644 --- a/src/mock/test/CMakeLists.txt +++ b/src/mock/test/CMakeLists.txt @@ -87,6 +87,22 @@ nebula_add_test( wangle ) +nebula_add_test( + NAME + balance_test + SOURCES + BalanceTest.cpp + OBJECTS + ${GRAPH_TEST_LIB} + LIBRARIES + proxygenhttpserver + proxygenlib + ${THRIFT_LIBRARIES} + wangle + gtest + gtest_main +) + nebula_add_test( NAME admin_job_test diff --git a/src/planner/Admin.h b/src/planner/Admin.h index 4e45230a..1dabd9dc 100644 --- a/src/planner/Admin.h +++ b/src/planner/Admin.h @@ -269,10 +269,6 @@ private: std::string spaceName_; }; -class Balance final : public SingleInputNode { -public: -}; - class CreateSnapshot final : public SingleInputNode { public: static CreateSnapshot* make(ExecutionPlan* plan, PlanNode* input) { @@ -407,6 +403,82 @@ private: const std::vector<std::string> params_; }; +class BalanceLeaders final : public SingleDependencyNode { +public: + static BalanceLeaders* make(ExecutionPlan* plan, PlanNode* dep) { + return new BalanceLeaders(plan, dep); + } + + std::unique_ptr<cpp2::PlanNodeDescription> explain() const override { + LOG(FATAL) << "Unimplemented"; + return nullptr; + } + +private: + explicit BalanceLeaders(ExecutionPlan* plan, PlanNode* dep) + : SingleDependencyNode(plan, Kind::kBalanceLeaders, dep) {} +}; + +class Balance final : public SingleDependencyNode { +public: + static Balance* make(ExecutionPlan* plan, PlanNode* dep, std::vector<HostAddr> deleteHosts) { + return new Balance(plan, dep, std::move(deleteHosts)); + } + + std::unique_ptr<cpp2::PlanNodeDescription> explain() const override { + LOG(FATAL) << "Unimplemented"; + return nullptr; + } + + const std::vector<HostAddr> &deleteHosts() const { + return deleteHosts_; + } + +private: + Balance(ExecutionPlan* plan, PlanNode* dep, std::vector<HostAddr> deleteHosts) + : SingleDependencyNode(plan, Kind::kBalance, dep), deleteHosts_(std::move(deleteHosts)) {} + + std::vector<HostAddr> deleteHosts_; +}; + +class StopBalance final : public SingleDependencyNode { +public: + static StopBalance* make(ExecutionPlan* plan, PlanNode* dep) { + return new StopBalance(plan, dep); + } + + std::unique_ptr<cpp2::PlanNodeDescription> explain() const override { + LOG(FATAL) << "Unimplemented"; + return nullptr; + } + +private: + explicit StopBalance(ExecutionPlan* plan, PlanNode* dep) + : SingleDependencyNode(plan, Kind::kStopBalance, dep) {} +}; + +class ShowBalance final : public SingleDependencyNode { +public: + static ShowBalance* make(ExecutionPlan* plan, PlanNode* dep, int64_t id) { + return new ShowBalance(plan, dep, id); + } + + std::unique_ptr<cpp2::PlanNodeDescription> explain() const override { + LOG(FATAL) << "Unimplemented"; + return nullptr; + } + + int64_t id() const { + return id_; + } + +private: + ShowBalance(ExecutionPlan* plan, PlanNode* dep, int64_t id) + : SingleDependencyNode(plan, Kind::kShowBalance, dep), id_(id) {} + + int64_t id_; +}; + class ShowCharset final : public SingleInputNode { public: static ShowCharset* make(ExecutionPlan* plan, PlanNode* input) { diff --git a/src/planner/PlanNode.cpp b/src/planner/PlanNode.cpp index e354fa5d..a6ee52ba 100644 --- a/src/planner/PlanNode.cpp +++ b/src/planner/PlanNode.cpp @@ -105,6 +105,14 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "DropSnapshot"; case Kind::kShowSnapshots: return "ShowSnapshots"; + case Kind::kBalanceLeaders: + return "BalanceLeaders"; + case Kind::kBalance: + return "Balance"; + case Kind::kStopBalance: + return "StopBalance"; + case Kind::kShowBalance: + return "ShowBalance"; case Kind::kSubmitJob: return "SubmitJob"; case Kind::kDataJoin: diff --git a/src/planner/PlanNode.h b/src/planner/PlanNode.h index 1baa6935..4db31fb5 100644 --- a/src/planner/PlanNode.h +++ b/src/planner/PlanNode.h @@ -65,6 +65,10 @@ public: kDropEdge, kInsertVertices, kInsertEdges, + kBalanceLeaders, + kBalance, + kStopBalance, + kShowBalance, kSubmitJob, kShowHosts, kDataCollect, diff --git a/src/validator/BalanceValidator.cpp b/src/validator/BalanceValidator.cpp new file mode 100644 index 00000000..b63f0102 --- /dev/null +++ b/src/validator/BalanceValidator.cpp @@ -0,0 +1,48 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. +* +* This source code is licensed under Apache 2.0 License, +* attached with Common Clause Condition 1.0, found in the LICENSES directory. +*/ + +#include "common/base/Base.h" + +#include "planner/Admin.h" +#include "validator/BalanceValidator.h" + +namespace nebula { +namespace graph { + +Status BalanceValidator::toPlan() { + auto* plan = qctx_->plan(); + PlanNode *current = nullptr; + BalanceSentence *sentence = static_cast<BalanceSentence*>(sentence_); + switch (sentence->subType()) { + case BalanceSentence::SubType::kLeader: + current = BalanceLeaders::make(plan, nullptr); + break; + case BalanceSentence::SubType::kData: + current = Balance::make(plan, + nullptr, + sentence->hostDel() == nullptr + ? std::vector<HostAddr>() + : sentence->hostDel()->hosts()); + break; + case BalanceSentence::SubType::kDataStop: + current = StopBalance::make(plan, nullptr); + break; + case BalanceSentence::SubType::kShowBalancePlan: + current = ShowBalance::make(plan, nullptr, sentence->balanceId()); + break; + case BalanceSentence::SubType::kUnknown: + // fallthrough + default: + DLOG(FATAL) << "Unknown balance kind " << sentence->kind(); + return Status::NotSupported("Unknown balance kind %d", static_cast<int>(sentence->kind())); + } + root_ = current; + tail_ = root_; + return Status::OK(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/validator/BalanceValidator.h b/src/validator/BalanceValidator.h new file mode 100644 index 00000000..719dc4ef --- /dev/null +++ b/src/validator/BalanceValidator.h @@ -0,0 +1,35 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef VALIDATOR_BALANCEVALIDATOR_H_ +#define VALIDATOR_BALANCEVALIDATOR_H_ + +#include "common/base/Base.h" +#include "validator/Validator.h" +#include "parser/AdminSentences.h" + +namespace nebula { +namespace graph { + +class BalanceValidator final : public Validator { +public: + BalanceValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) { + setNoSpaceRequired(); + } + +private: + Status validateImpl() override { + return Status::OK(); + } + + Status toPlan() override; +}; + +} // namespace graph +} // namespace nebula + +#endif // VALIDATOR_BALANCEVALIDATOR_H_ diff --git a/src/validator/CMakeLists.txt b/src/validator/CMakeLists.txt index ee8a078c..bb6e9544 100644 --- a/src/validator/CMakeLists.txt +++ b/src/validator/CMakeLists.txt @@ -15,6 +15,7 @@ nebula_add_library( UseValidator.cpp GetSubgraphValidator.cpp AdminValidator.cpp + BalanceValidator.cpp AdminJobValidator.cpp MaintainValidator.cpp MutateValidator.cpp diff --git a/src/validator/Validator.cpp b/src/validator/Validator.cpp index 195285a8..3ede023c 100644 --- a/src/validator/Validator.cpp +++ b/src/validator/Validator.cpp @@ -26,6 +26,7 @@ #include "validator/SequentialValidator.h" #include "validator/SetValidator.h" #include "validator/UseValidator.h" +#include "validator/BalanceValidator.h" #include "validator/AdminJobValidator.h" #include "validator/YieldValidator.h" #include "validator/GroupByValidator.h" @@ -104,6 +105,8 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique<InsertVerticesValidator>(sentence, context); case Sentence::Kind::kInsertEdges: return std::make_unique<InsertEdgesValidator>(sentence, context); + case Sentence::Kind::kBalance: + return std::make_unique<BalanceValidator>(sentence, context); case Sentence::Kind::kAdminJob: return std::make_unique<AdminJobValidator>(sentence, context); case Sentence::Kind::kFetchVertices: @@ -165,7 +168,6 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon case Sentence::Kind::kLookup: case Sentence::Kind::kDownload: case Sentence::Kind::kIngest: - case Sentence::Kind::kBalance: case Sentence::Kind::kFindPath: case Sentence::Kind::kReturn: { // nothing @@ -214,6 +216,10 @@ Status Validator::appendPlan(PlanNode* node, PlanNode* appended) { case PlanNode::Kind::kDropSnapshot: case PlanNode::Kind::kSubmitJob: case PlanNode::Kind::kShowSnapshots: + case PlanNode::Kind::kBalanceLeaders: + case PlanNode::Kind::kBalance: + case PlanNode::Kind::kStopBalance: + case PlanNode::Kind::kShowBalance: case PlanNode::Kind::kDeleteVertices: case PlanNode::Kind::kDeleteEdges: case PlanNode::Kind::kUpdateVertex: -- GitLab