Skip to content
Snippets Groups Projects
Unverified Commit 5ef41cc3 authored by Shylock Hg's avatar Shylock Hg Committed by GitHub
Browse files

Feature/balance (#66)


* Add the executor and plan node for balance.

* Add the mock of balance.

* Add the validator and testing about balance.

* Fix the QueryContext.

* Rename dels to deleteHosts.

* Get show balance tasks count directly.

* Fix the schema test.

* Fix the compile error.

* Fix the header guard.

* Fix compile error.

* Remove the unused call back.

* Add executor time stat.

* Mark explain unimplemented.

* Add call back time stat.

* Correct the header guard format.

* Fix header guard format.

* Fix the header guard.

Co-authored-by: default avatarlaura-ding <48548375+laura-ding@users.noreply.github.com>
parent d0310d68
No related branches found
No related tags found
No related merge requests found
Showing
with 814 additions and 33 deletions
......@@ -28,6 +28,10 @@ nebula_add_library(
query/DataJoinExecutor.cpp
admin/SwitchSpaceExecutor.cpp
admin/SubmitJobExecutor.cpp
admin/BalanceExecutor.cpp
admin/StopBalanceExecutor.cpp
admin/BalanceLeadersExecutor.cpp
admin/ShowBalanceExecutor.cpp
admin/ShowHostsExecutor.cpp
admin/SpaceExecutor.cpp
admin/SnapshotExecutor.cpp
......
......@@ -13,6 +13,10 @@
#include "context/ExecutionContext.h"
#include "context/QueryContext.h"
#include "exec/ExecutionError.h"
#include "exec/admin/BalanceLeadersExecutor.h"
#include "exec/admin/BalanceExecutor.h"
#include "exec/admin/StopBalanceExecutor.h"
#include "exec/admin/ShowBalanceExecutor.h"
#include "exec/admin/SubmitJobExecutor.h"
#include "exec/admin/ShowHostsExecutor.h"
#include "exec/admin/SnapshotExecutor.h"
......@@ -404,6 +408,34 @@ Executor *Executor::makeExecutor(const PlanNode *node,
exec->dependsOn(input);
break;
}
case PlanNode::Kind::kBalanceLeaders: {
auto balanceLeaders = asNode<BalanceLeaders>(node);
auto dep = makeExecutor(balanceLeaders->dep(), qctx, visited);
exec = new BalanceLeadersExecutor(balanceLeaders, qctx);
exec->dependsOn(dep);
break;
}
case PlanNode::Kind::kBalance: {
auto balance = asNode<Balance>(node);
auto dep = makeExecutor(balance->dep(), qctx, visited);
exec = new BalanceExecutor(balance, qctx);
exec->dependsOn(dep);
break;
}
case PlanNode::Kind::kStopBalance: {
auto stopBalance = asNode<Balance>(node);
auto dep = makeExecutor(stopBalance->dep(), qctx, visited);
exec = new StopBalanceExecutor(stopBalance, qctx);
exec->dependsOn(dep);
break;
}
case PlanNode::Kind::kShowBalance: {
auto showBalance = asNode<ShowBalance>(node);
auto dep = makeExecutor(showBalance->dep(), qctx, visited);
exec = new ShowBalanceExecutor(showBalance, qctx);
exec->dependsOn(dep);
break;
}
case PlanNode::Kind::kShowConfigs: {
auto showConfigs = asNode<ShowConfigs>(node);
auto input = makeExecutor(showConfigs->dep(), qctx, visited);
......
......@@ -18,6 +18,7 @@
#include "common/cpp/helpers.h"
#include "common/datatypes/Value.h"
#include "common/time/Duration.h"
#include "util/ScopedTimer.h"
#include "context/ExecutionContext.h"
namespace nebula {
......
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "exec/admin/BalanceExecutor.h"
#include "planner/Admin.h"
namespace nebula {
namespace graph {
folly::Future<Status> BalanceExecutor::execute() {
SCOPED_TIMER(&execTime_);
return balance();
}
folly::Future<Status> BalanceExecutor::balance() {
auto *bNode = asNode<Balance>(node());
return qctx()->getMetaClient()->balance(bNode->deleteHosts(), false)
.via(runner())
.then([this](StatusOr<int64_t> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return resp.status();
}
DataSet v({"ID"});
v.emplace_back(Row({resp.value()}));
return finish(std::move(v));
});
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef EXEC_ADMIN_BALANCEEXECUTOR_H_
#define EXEC_ADMIN_BALANCEEXECUTOR_H_
#include "exec/Executor.h"
#include "context/QueryContext.h"
namespace nebula {
namespace graph {
class BalanceExecutor final : public Executor {
public:
BalanceExecutor(const PlanNode *node, QueryContext *ectx)
: Executor("BalanceExecutor", node, ectx) {}
folly::Future<Status> execute() override;
private:
folly::Future<Status> balance();
};
} // namespace graph
} // namespace nebula
#endif // EXEC_ADMIN_BALANCEEXECUTOR_H_
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "exec/admin/BalanceLeadersExecutor.h"
#include "planner/Admin.h"
namespace nebula {
namespace graph {
folly::Future<Status> BalanceLeadersExecutor::execute() {
SCOPED_TIMER(&execTime_);
return balanceLeaders();
}
folly::Future<Status> BalanceLeadersExecutor::balanceLeaders() {
return qctx()->getMetaClient()->balanceLeader()
.via(runner())
.then([this](StatusOr<bool> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return resp.status();
}
if (!resp.value()) {
return Status::Error("Balance leaders failed");
}
return Status::OK();
});
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef EXEC_ADMIN_BALANCELEADERSEXECUTOR_H_
#define EXEC_ADMIN_BALANCELEADERSEXECUTOR_H_
#include "exec/Executor.h"
#include "context/QueryContext.h"
namespace nebula {
namespace graph {
class BalanceLeadersExecutor final : public Executor {
public:
BalanceLeadersExecutor(const PlanNode *node, QueryContext *ectx)
: Executor("BaanceLeadersExecutor", node, ectx) {}
folly::Future<Status> execute() override;
private:
folly::Future<Status> balanceLeaders();
};
} // namespace graph
} // namespace nebula
#endif // EXEC_ADMIN_BALANCELEADERSEXECUTOR_H_
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "exec/admin/ShowBalanceExecutor.h"
#include "planner/Admin.h"
namespace nebula {
namespace graph {
folly::Future<Status> ShowBalanceExecutor::execute() {
SCOPED_TIMER(&execTime_);
return showBalance();
}
folly::Future<Status> ShowBalanceExecutor::showBalance() {
auto *sbNode = asNode<ShowBalance>(node());
return qctx()->getMetaClient()->showBalance(sbNode->id())
.via(runner())
.then([this](StatusOr<std::vector<meta::cpp2::BalanceTask>> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return std::move(resp).status();
}
auto tasks = std::move(resp).value();
// TODO(shylock) typed items instead binary
// E.G. "balanceId", "spaceId", "partId", "from", "to"
uint32_t total = tasks.size(), succeeded = 0, failed = 0, inProgress = 0, invalid = 0;
DataSet v({"balanceId, spaceId:partId, src->dst", "status"});
for (auto &task : tasks) {
switch (task.get_result()) {
case meta::cpp2::TaskResult::FAILED:
++failed;
break;
case meta::cpp2::TaskResult::IN_PROGRESS:
++inProgress;
break;
case meta::cpp2::TaskResult::INVALID:
++invalid;
break;
case meta::cpp2::TaskResult::SUCCEEDED:
++succeeded;
break;
}
v.emplace_back(Row({
std::move(task).get_id(),
meta::cpp2::_TaskResult_VALUES_TO_NAMES.at(task.get_result())
}));
}
double percentage = total == 0 ? 0 : static_cast<double>(succeeded) / total * 100;
v.emplace_back(Row({
folly::sformat("Total:{}, Succeeded:{}, Failed:{}, In Progress:{}, Invalid:{}",
total, succeeded, failed, inProgress, invalid),
percentage
}));
return finish(std::move(v));
});
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef EXEC_ADMIN_SHOWBALANCEEXECUTOR_H_
#define EXEC_ADMIN_SHOWBALANCEEXECUTOR_H_
#include "exec/Executor.h"
#include "context/QueryContext.h"
namespace nebula {
namespace graph {
class ShowBalanceExecutor final : public Executor {
public:
ShowBalanceExecutor(const PlanNode *node, QueryContext *ectx)
: Executor("ShowBalanceExecutor", node, ectx) {}
folly::Future<Status> execute() override;
private:
folly::Future<Status> showBalance();
};
} // namespace graph
} // namespace nebula
#endif // EXEC_ADMIN_SHOWBALANCEEXECUTOR_H_
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "exec/admin/StopBalanceExecutor.h"
#include "planner/Admin.h"
namespace nebula {
namespace graph {
folly::Future<Status> StopBalanceExecutor::execute() {
SCOPED_TIMER(&execTime_);
return stopBalance();
}
folly::Future<Status> StopBalanceExecutor::stopBalance() {
return qctx()->getMetaClient()->balance({}, true)
.via(runner())
.then([this](StatusOr<int64_t> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return resp.status();
}
DataSet v({"ID"});
v.emplace_back(Row({resp.value()}));
return finish(std::move(v));
});
}
} // namespace graph
} // namespace nebula
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef EXEC_ADMIN_STOPBALANCEEXECUTOR_H_
#define EXEC_ADMIN_STOPBALANCEEXECUTOR_H_
#include "exec/Executor.h"
#include "context/QueryContext.h"
namespace nebula {
namespace graph {
class StopBalanceExecutor final : public Executor {
public:
StopBalanceExecutor(const PlanNode *node, QueryContext *ectx)
: Executor("StopBalanceExecutor", node, ectx) {}
folly::Future<Status> execute() override;
private:
folly::Future<Status> stopBalance();
};
} // namespace graph
} // namespace nebula
#endif // EXEC_ADMIN_STOPBALANCEEXECUTOR_H_
......@@ -22,19 +22,20 @@ Status MetaCache::createSpace(const meta::cpp2::CreateSpaceReq &req, GraphSpaceI
auto ifNotExists = req.get_if_not_exists();
auto properties = req.get_properties();
auto spaceName = properties.get_space_name();
auto findIter = spaces_.find(spaceName);
if (ifNotExists && findIter != spaces_.end()) {
spaceId = findIter->second.get_space_id();
const auto findIter = spaceIndex_.find(spaceName);
if (ifNotExists && findIter != spaceIndex_.end()) {
spaceId = findIter->second;
return Status::OK();
}
if (findIter != spaces_.end()) {
if (findIter != spaceIndex_.end()) {
return Status::Error("Space `%s' existed", spaceName.c_str());
}
spaceId = ++id_;
spaceIndex_.emplace(spaceName, spaceId);
meta::cpp2::SpaceItem space;
space.set_space_id(spaceId);
space.set_properties(std::move(properties));
spaces_[spaceName] = space;
spaces_[spaceId] = space;
VLOG(1) << "space name: " << space.get_properties().get_space_name()
<< ", partition_num: " << space.get_properties().get_partition_num()
<< ", replica_factor: " << space.get_properties().get_replica_factor()
......@@ -45,25 +46,26 @@ Status MetaCache::createSpace(const meta::cpp2::CreateSpaceReq &req, GraphSpaceI
StatusOr<meta::cpp2::SpaceItem> MetaCache::getSpace(const meta::cpp2::GetSpaceReq &req) {
folly::RWSpinLock::ReadHolder holder(lock_);
auto findIter = spaces_.find(req.get_space_name());
if (findIter == spaces_.end()) {
auto findIter = spaceIndex_.find(req.get_space_name());
if (findIter == spaceIndex_.end()) {
LOG(ERROR) << "Space " << req.get_space_name().c_str() << " not found";
return Status::Error("Space `%s' not found", req.get_space_name().c_str());
}
VLOG(1) << "space name: " << findIter->second.get_properties().get_space_name()
<< ", partition_num: " << findIter->second.get_properties().get_partition_num()
<< ", replica_factor: " << findIter->second.get_properties().get_replica_factor()
<< ", rvid_size: " << findIter->second.get_properties().get_vid_size();
return findIter->second;
const auto spaceInfo = spaces_.find(findIter->second);
VLOG(1) << "space name: " << spaceInfo->second.get_properties().get_space_name()
<< ", partition_num: " << spaceInfo->second.get_properties().get_partition_num()
<< ", replica_factor: " << spaceInfo->second.get_properties().get_replica_factor()
<< ", rvid_size: " << spaceInfo->second.get_properties().get_vid_size();
return spaceInfo->second;
}
StatusOr<std::vector<meta::cpp2::IdName>> MetaCache::listSpaces() {
folly::RWSpinLock::ReadHolder holder(lock_);
std::vector<meta::cpp2::IdName> spaces;
for (auto &item : spaces_) {
for (const auto &index : spaceIndex_) {
meta::cpp2::IdName idName;
idName.set_id(to(item.second.get_space_id(), EntryType::SPACE));
idName.set_name(item.first);
idName.set_id(to(index.second, EntryType::SPACE));
idName.set_name(index.first);
spaces.emplace_back(idName);
}
return spaces;
......@@ -72,19 +74,20 @@ StatusOr<std::vector<meta::cpp2::IdName>> MetaCache::listSpaces() {
Status MetaCache::dropSpace(const meta::cpp2::DropSpaceReq &req) {
folly::RWSpinLock::WriteHolder holder(lock_);
auto spaceName = req.get_space_name();
auto findIter = spaces_.find(spaceName);
auto findIter = spaceIndex_.find(spaceName);
auto ifExists = req.get_if_exists();
if (ifExists && findIter == spaces_.end()) {
if (ifExists && findIter == spaceIndex_.end()) {
Status::OK();
}
if (findIter == spaces_.end()) {
if (findIter == spaceIndex_.end()) {
return Status::Error("Space `%s' not existed", req.get_space_name().c_str());
}
auto id = findIter->second.get_space_id();
spaces_.erase(spaceName);
auto id = findIter->second;
spaces_.erase(id);
cache_.erase(id);
spaceIndex_.erase(spaceName);
return Status::OK();
}
......@@ -297,7 +300,7 @@ Status MetaCache::listUsers(const meta::cpp2::ListUsersReq&) {
std::vector<meta::cpp2::HostItem> MetaCache::listHosts() {
folly::RWSpinLock::WriteHolder holder(lock_);
std::vector<meta::cpp2::HostItem> hosts;
for (auto& spaceIdIt : spaces_) {
for (auto& spaceIdIt : spaceIndex_) {
auto spaceName = spaceIdIt.first;
for (auto &h : hostSet_) {
meta::cpp2::HostItem host;
......@@ -323,6 +326,76 @@ std::unordered_map<PartitionID, std::vector<HostAddr>> MetaCache::getParts() {
return parts;
}
ErrorOr<meta::cpp2::ErrorCode, int64_t> MetaCache::balanceSubmit(std::vector<HostAddr> dels) {
folly::RWSpinLock::ReadHolder rh(lock_);
for (const auto &job : balanceJobs_) {
if (job.second.status == meta::cpp2::TaskResult::IN_PROGRESS) {
return meta::cpp2::ErrorCode::E_BALANCER_RUNNING;
}
}
std::vector<BalanceTask> jobs;
for (const auto &spaceInfo : spaces_) {
for (PartitionID i = 1; i <= spaceInfo.second.get_properties().get_partition_num(); ++i) {
for (const auto &host : hostSet_) { // Note mock partition in each host here
if (std::find(dels.begin(), dels.end(), host) != dels.end()) {
continue;
}
jobs.emplace_back(BalanceTask{
// mock
spaceInfo.first,
i,
host,
host,
meta::cpp2::TaskResult::IN_PROGRESS,
});
}
}
}
auto jobId = incId();
balanceTasks_.emplace(jobId, std::move(jobs));
balanceJobs_.emplace(jobId, BalanceJob{
meta::cpp2::TaskResult::IN_PROGRESS,
});
return jobId;
}
ErrorOr<meta::cpp2::ErrorCode, int64_t> MetaCache::balanceStop() {
for (auto &job : balanceJobs_) {
if (job.second.status == meta::cpp2::TaskResult::IN_PROGRESS) {
job.second.status = meta::cpp2::TaskResult::FAILED;
return job.first;
}
}
return meta::cpp2::ErrorCode::E_NO_RUNNING_BALANCE_PLAN;
}
meta::cpp2::ErrorCode MetaCache::balanceLeaders() {
return meta::cpp2::ErrorCode::SUCCEEDED;
}
ErrorOr<meta::cpp2::ErrorCode, std::vector<meta::cpp2::BalanceTask>>
MetaCache::showBalance(int64_t id) {
const auto job = balanceTasks_.find(id);
if (job == balanceTasks_.end()) {
return meta::cpp2::ErrorCode::E_NOT_FOUND;
}
std::vector<meta::cpp2::BalanceTask> result;
result.reserve(job->second.size());
for (const auto &task : job->second) {
meta::cpp2::BalanceTask taskInfo;
std::stringstream idStr;
idStr << "[";
idStr << id << ", ";
idStr << task.space << ":" << task.part << ", ";
idStr << task.from << "->" << task.to;
idStr << "]";
taskInfo.set_id(idStr.str());
taskInfo.set_result(task.status);
result.emplace_back(std::move(taskInfo));
}
return result;
}
ErrorOr<meta::cpp2::ErrorCode, meta::cpp2::AdminJobResult>
MetaCache::runAdminJob(const meta::cpp2::AdminJobReq& req) {
meta::cpp2::AdminJobResult result;
......
......@@ -70,6 +70,12 @@ public:
std::unordered_map<PartitionID, std::vector<HostAddr>> getParts();
ErrorOr<meta::cpp2::ErrorCode, int64_t> balanceSubmit(std::vector<HostAddr> dels);
ErrorOr<meta::cpp2::ErrorCode, int64_t> balanceStop();
meta::cpp2::ErrorCode balanceLeaders();
ErrorOr<meta::cpp2::ErrorCode, std::vector<meta::cpp2::BalanceTask>>
showBalance(int64_t id);
ErrorOr<meta::cpp2::ErrorCode, meta::cpp2::AdminJobResult>
runAdminJob(const meta::cpp2::AdminJobReq& req);
......@@ -139,12 +145,27 @@ private:
};
std::unordered_set<HostAddr> hostSet_;
std::unordered_map<std::string, GraphSpaceID> spaceIndex_;
std::unordered_map<GraphSpaceID, SpaceInfoCache> cache_;
std::unordered_map<std::string, meta::cpp2::SpaceItem> spaces_;
std::unordered_map<GraphSpaceID, meta::cpp2::SpaceItem> spaces_;
int64_t id_{0};
std::unordered_map<std::string, meta::cpp2::Snapshot> snapshots_;
mutable folly::RWSpinLock lock_;
////////////////////////////////////////////// Balance /////////////////////////////////////////////
struct BalanceTask {
GraphSpaceID space;
PartitionID part;
HostAddr from;
HostAddr to;
meta::cpp2::TaskResult status;
};
struct BalanceJob {
meta::cpp2::TaskResult status;
};
std::unordered_map<int64_t, std::vector<BalanceTask>> balanceTasks_;
std::unordered_map<int64_t, BalanceJob> balanceJobs_;
////////////////////////////////////////////// Job /////////////////////////////////////////////////
struct JobDesc {
meta::cpp2::AdminCmd cmd_; // compact, flush ...
......
......@@ -526,18 +526,50 @@ MockMetaServiceHandler::future_getUserRoles(const meta::cpp2::GetUserRolesReq&)
}
folly::Future<meta::cpp2::BalanceResp>
MockMetaServiceHandler::future_balance(const meta::cpp2::BalanceReq&) {
folly::Promise<meta::cpp2::BalanceResp> promise;
auto future = promise.getFuture();
MockMetaServiceHandler::future_balance(const meta::cpp2::BalanceReq& req) {
meta::cpp2::BalanceResp resp;
resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED);
promise.setValue(std::move(resp));
return future;
if (req.__isset.id) {
// show
auto result = MetaCache::instance().showBalance(*req.get_id());
if (ok(result)) {
resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED);
resp.set_tasks(std::move(result).right());
} else {
resp.set_code(result.left());
}
} else if (req.__isset.stop) {
// stop
auto result = MetaCache::instance().balanceStop();
if (ok(result)) {
resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED);
resp.set_id(result.right());
} else {
resp.set_code(result.left());
}
} else {
ErrorOr<meta::cpp2::ErrorCode, int64_t> result;
// submit
if (req.__isset.host_del) {
result = MetaCache::instance().balanceSubmit(*req.get_host_del());
} else {
result = MetaCache::instance().balanceSubmit({});
}
if (ok(result)) {
resp.set_code(meta::cpp2::ErrorCode::SUCCEEDED);
resp.set_id(result.right());
} else {
resp.set_code(result.left());
}
}
return resp;
}
folly::Future<meta::cpp2::ExecResp>
MockMetaServiceHandler::future_leaderBalance(const meta::cpp2::LeaderBalanceReq&) {
RETURN_SUCCESSED();
meta::cpp2::ExecResp resp;
auto result = MetaCache::instance().balanceLeaders();
resp.set_code(result);
return resp;
}
folly::Future<meta::cpp2::ExecResp>
......
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include <gtest/gtest.h>
#include "common/base/Status.h"
#include "common/interface/gen-cpp2/common_types.h"
#include "common/network/NetworkUtils.h"
#include "mock/test/TestBase.h"
#include "mock/test/TestEnv.h"
DECLARE_int32(heartbeat_interval_secs);
namespace nebula {
namespace graph {
class BalanceTest : public TestBase {
public:
void SetUp() override {
TestBase::SetUp();
client_ = gEnv->getGraphClient();
ASSERT_NE(nullptr, client_);
};
void TearDown() override {
TestBase::TearDown();
client_.reset();
};
protected:
std::unique_ptr<GraphClient> client_;
};
TEST_F(BalanceTest, Error) {
{
// Show one not exists
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA 233;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_EXECUTION_ERROR);
}
{
// stop not exists
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA STOP;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::E_EXECUTION_ERROR);
}
}
TEST_F(BalanceTest, Simple) {
{
// balance leader
cpp2::ExecutionResponse resp;
std::string query = "BALANCE LEADER;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
}
{
// balance without space
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
DataSet expected({"ID"});
expected.emplace_back(Row({1}));
ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected));
}
{
// show one without space
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA 1;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
DataSet expected({"balanceId, spaceId:partId, src->dst", "status"});
expected.emplace_back(Row({
"Total:0, Succeeded:0, Failed:0, In Progress:0, Invalid:0",
0,
}));
ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected));
}
{
// stop
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA STOP;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
}
{
// create space
cpp2::ExecutionResponse resp;
std::string query = "CREATE SPACE space1(partition_num=3, replica_factor=1);";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
}
{
// create space
cpp2::ExecutionResponse resp;
std::string query = "CREATE SPACE space2(partition_num=9, replica_factor=3);";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
}
{
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
DataSet expected({"ID"});
expected.emplace_back(Row({4}));
ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected));
}
{
// show one
cpp2::ExecutionResponse resp;
std::string query = "BALANCE DATA 4;";
client_->execute(query, resp);
ASSERT_ERROR_CODE(resp, cpp2::ErrorCode::SUCCEEDED);
// TODO(shylock) dataset constructor
auto storagePort = gEnv->storageServerPort();
DataSet expected({"balanceId, spaceId:partId, src->dst", "status"});
expected.emplace_back(Row({
folly::sformat("[4, 3:1, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:2, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:3, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:4, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:5, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:6, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:7, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:8, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 3:9, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 2:1, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 2:2, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(Row({
folly::sformat("[4, 2:3, [127.0.0.1:{}]->[127.0.0.1:{}]]", storagePort, storagePort),
"IN_PROGRESS",
}));
expected.emplace_back(
Row({"Total:12, Succeeded:0, Failed:0, In Progress:12, Invalid:0", 0}));
ASSERT_TRUE(verifyDataSetWithoutOrder(resp, expected));
}
}
} // namespace graph
} // namespace nebula
......@@ -87,6 +87,22 @@ nebula_add_test(
wangle
)
nebula_add_test(
NAME
balance_test
SOURCES
BalanceTest.cpp
OBJECTS
${GRAPH_TEST_LIB}
LIBRARIES
proxygenhttpserver
proxygenlib
${THRIFT_LIBRARIES}
wangle
gtest
gtest_main
)
nebula_add_test(
NAME
admin_job_test
......
......@@ -269,10 +269,6 @@ private:
std::string spaceName_;
};
class Balance final : public SingleInputNode {
public:
};
class CreateSnapshot final : public SingleInputNode {
public:
static CreateSnapshot* make(ExecutionPlan* plan, PlanNode* input) {
......@@ -407,6 +403,82 @@ private:
const std::vector<std::string> params_;
};
class BalanceLeaders final : public SingleDependencyNode {
public:
static BalanceLeaders* make(ExecutionPlan* plan, PlanNode* dep) {
return new BalanceLeaders(plan, dep);
}
std::unique_ptr<cpp2::PlanNodeDescription> explain() const override {
LOG(FATAL) << "Unimplemented";
return nullptr;
}
private:
explicit BalanceLeaders(ExecutionPlan* plan, PlanNode* dep)
: SingleDependencyNode(plan, Kind::kBalanceLeaders, dep) {}
};
class Balance final : public SingleDependencyNode {
public:
static Balance* make(ExecutionPlan* plan, PlanNode* dep, std::vector<HostAddr> deleteHosts) {
return new Balance(plan, dep, std::move(deleteHosts));
}
std::unique_ptr<cpp2::PlanNodeDescription> explain() const override {
LOG(FATAL) << "Unimplemented";
return nullptr;
}
const std::vector<HostAddr> &deleteHosts() const {
return deleteHosts_;
}
private:
Balance(ExecutionPlan* plan, PlanNode* dep, std::vector<HostAddr> deleteHosts)
: SingleDependencyNode(plan, Kind::kBalance, dep), deleteHosts_(std::move(deleteHosts)) {}
std::vector<HostAddr> deleteHosts_;
};
class StopBalance final : public SingleDependencyNode {
public:
static StopBalance* make(ExecutionPlan* plan, PlanNode* dep) {
return new StopBalance(plan, dep);
}
std::unique_ptr<cpp2::PlanNodeDescription> explain() const override {
LOG(FATAL) << "Unimplemented";
return nullptr;
}
private:
explicit StopBalance(ExecutionPlan* plan, PlanNode* dep)
: SingleDependencyNode(plan, Kind::kStopBalance, dep) {}
};
class ShowBalance final : public SingleDependencyNode {
public:
static ShowBalance* make(ExecutionPlan* plan, PlanNode* dep, int64_t id) {
return new ShowBalance(plan, dep, id);
}
std::unique_ptr<cpp2::PlanNodeDescription> explain() const override {
LOG(FATAL) << "Unimplemented";
return nullptr;
}
int64_t id() const {
return id_;
}
private:
ShowBalance(ExecutionPlan* plan, PlanNode* dep, int64_t id)
: SingleDependencyNode(plan, Kind::kShowBalance, dep), id_(id) {}
int64_t id_;
};
class ShowCharset final : public SingleInputNode {
public:
static ShowCharset* make(ExecutionPlan* plan, PlanNode* input) {
......
......@@ -105,6 +105,14 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "DropSnapshot";
case Kind::kShowSnapshots:
return "ShowSnapshots";
case Kind::kBalanceLeaders:
return "BalanceLeaders";
case Kind::kBalance:
return "Balance";
case Kind::kStopBalance:
return "StopBalance";
case Kind::kShowBalance:
return "ShowBalance";
case Kind::kSubmitJob:
return "SubmitJob";
case Kind::kDataJoin:
......
......@@ -65,6 +65,10 @@ public:
kDropEdge,
kInsertVertices,
kInsertEdges,
kBalanceLeaders,
kBalance,
kStopBalance,
kShowBalance,
kSubmitJob,
kShowHosts,
kDataCollect,
......
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "common/base/Base.h"
#include "planner/Admin.h"
#include "validator/BalanceValidator.h"
namespace nebula {
namespace graph {
Status BalanceValidator::toPlan() {
auto* plan = qctx_->plan();
PlanNode *current = nullptr;
BalanceSentence *sentence = static_cast<BalanceSentence*>(sentence_);
switch (sentence->subType()) {
case BalanceSentence::SubType::kLeader:
current = BalanceLeaders::make(plan, nullptr);
break;
case BalanceSentence::SubType::kData:
current = Balance::make(plan,
nullptr,
sentence->hostDel() == nullptr
? std::vector<HostAddr>()
: sentence->hostDel()->hosts());
break;
case BalanceSentence::SubType::kDataStop:
current = StopBalance::make(plan, nullptr);
break;
case BalanceSentence::SubType::kShowBalancePlan:
current = ShowBalance::make(plan, nullptr, sentence->balanceId());
break;
case BalanceSentence::SubType::kUnknown:
// fallthrough
default:
DLOG(FATAL) << "Unknown balance kind " << sentence->kind();
return Status::NotSupported("Unknown balance kind %d", static_cast<int>(sentence->kind()));
}
root_ = current;
tail_ = root_;
return Status::OK();
}
} // namespace graph
} // namespace nebula
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment