diff --git a/src/executor/BalanceExecutor.cpp b/src/executor/BalanceExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4602bbb2a89a70590a6f6d86f377317603dbf9dd --- /dev/null +++ b/src/executor/BalanceExecutor.cpp @@ -0,0 +1,64 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "graph/BalanceExecutor.h" + +namespace nebula { +namespace graph { + +BalanceExecutor::BalanceExecutor(Sentence *sentence, + ExecutionContext *ectx) : Executor(ectx) { + sentence_ = static_cast<BalanceSentence*>(sentence); +} + +Status BalanceExecutor::prepare() { + return Status::OK(); +} + +void BalanceExecutor::execute() { + auto showType = sentence_->subType(); + switch (showType) { + case BalanceSentence::SubType::kLeader: + balanceLeader(); + break; + case BalanceSentence::SubType::kUnknown: + onError_(Status::Error("Type unknown")); + break; + } +} + +void BalanceExecutor::balanceLeader() { + auto future = ectx()->getMetaClient()->balanceLeader(); + auto *runner = ectx()->rctx()->runner(); + + auto cb = [this] (auto &&resp) { + if (!resp.ok()) { + DCHECK(onError_); + onError_(std::move(resp).status()); + return; + } + auto ret = std::move(resp).value(); + if (!ret) { + DCHECK(onError_); + onError_(Status::Error("Balance leader failed")); + return; + } + DCHECK(onFinish_); + onFinish_(); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/BalanceExecutor.h b/src/executor/BalanceExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..4b10bc6d9dd540c6f1e941e24e774f36bc8a4c5b --- /dev/null +++ b/src/executor/BalanceExecutor.h @@ -0,0 +1,38 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef GRAPH_BALANCEEXECUTOR_H_ +#define GRAPH_BALANCEEXECUTOR_H_ + +#include "base/Base.h" +#include "graph/Executor.h" + +namespace nebula { +namespace graph { + +class BalanceExecutor final : public Executor { +public: + BalanceExecutor(Sentence *sentence, ExecutionContext *ectx); + + const char* name() const override { + return "BalanceExecutor"; + } + + Status MUST_USE_RESULT prepare() override; + + void execute() override; + + void balanceLeader(); + +private: + BalanceSentence *sentence_{nullptr}; + std::unique_ptr<cpp2::ExecutionResponse> resp_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_BALANCEEXECUTOR_H_ diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index 7d20bb11161834f78188dc9bf018739f99f98f0a..9aa9e3ebfc6c1a25aaac977e7a80614e076b9028 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -37,6 +37,7 @@ add_library( OrderByExecutor.cpp IngestExecutor.cpp ConfigExecutor.cpp + BalanceExecutor.cpp SchemaHelper.cpp FetchVerticesExecutor.cpp FetchEdgesExecutor.cpp diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 683d0633c4d6e7fee8cdd6b5310d0499db0ac38d..65b0c26ce056704d2a88e3cc6ce8ad5ed61ee322 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -41,6 +41,7 @@ #include "graph/SetExecutor.h" #include "graph/FindExecutor.h" #include "graph/MatchExecutor.h" +#include "graph/BalanceExecutor.h" namespace nebula { namespace graph { @@ -139,6 +140,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) { case Sentence::Kind::kFind: executor = std::make_unique<FindExecutor>(sentence, ectx()); break; + case Sentence::Kind::kBalance: + executor = std::make_unique<BalanceExecutor>(sentence, ectx()); + break; case Sentence::Kind::kUnknown: LOG(FATAL) << "Sentence kind unknown"; break; diff --git a/src/executor/ShowExecutor.cpp b/src/executor/ShowExecutor.cpp index 3504529567967cb1370aa80065aba565c7928e72..a3fdf0465ffb41160e1bd5b00774ff75c7d504cb 100644 --- a/src/executor/ShowExecutor.cpp +++ b/src/executor/ShowExecutor.cpp @@ -82,18 +82,53 @@ void ShowExecutor::showHosts() { return; } - auto retShowHosts = std::move(resp).value(); + auto hostItems = std::move(resp).value(); std::vector<cpp2::RowValue> rows; - std::vector<std::string> header{"Ip", "Port", "Status"}; + std::vector<std::string> header{"Ip", "Port", "Status", "Leader count", + "Leader distribution", "Partition distribution"}; resp_ = std::make_unique<cpp2::ExecutionResponse>(); resp_->set_column_names(std::move(header)); - for (auto &status : retShowHosts) { + for (auto& item : hostItems) { std::vector<cpp2::ColumnValue> row; - row.resize(3); - row[0].set_str(NetworkUtils::ipFromHostAddr(status.first)); - row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(status.first))); - row[2].set_str(status.second); + row.resize(6); + auto hostAddr = HostAddr(item.hostAddr.ip, item.hostAddr.port); + row[0].set_str(NetworkUtils::ipFromHostAddr(hostAddr)); + row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(hostAddr))); + switch (item.get_status()) { + case meta::cpp2::HostStatus::ONLINE: + row[2].set_str("online"); + break; + case meta::cpp2::HostStatus::OFFLINE: + case meta::cpp2::HostStatus::UNKNOWN: + row[2].set_str("offline"); + break; + } + + int32_t leaderCount = 0; + std::string leaders; + for (auto& spaceEntry : item.get_leader_parts()) { + leaderCount += spaceEntry.second.size(); + leaders += "space " + folly::to<std::string>(spaceEntry.first) + ": " + + folly::to<std::string>(spaceEntry.second.size()) + ", "; + } + if (!leaders.empty()) { + leaders.resize(leaders.size() - 2); + } + + row[3].set_integer(leaderCount); + row[4].set_str(leaders); + + std::string parts; + for (auto& spaceEntry : item.get_all_parts()) { + parts += "space " + folly::to<std::string>(spaceEntry.first) + ": " + + folly::to<std::string>(spaceEntry.second.size()) + ", "; + } + if (!parts.empty()) { + parts.resize(parts.size() - 2); + } + row[5].set_str(parts); + rows.emplace_back(); rows.back().set_columns(std::move(row)); } diff --git a/src/executor/test/SchemaTest.cpp b/src/executor/test/SchemaTest.cpp index 5af4c465a8e2a8c772ccb7b00d01dd98f434fbf4..339bc7f42b001b11dd17807dbf8c2210ee54c954 100644 --- a/src/executor/test/SchemaTest.cpp +++ b/src/executor/test/SchemaTest.cpp @@ -52,8 +52,9 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "SHOW HOSTS"; client->execute(query, resp); - std::vector<uniform_tuple_t<std::string, 3>> expected{ - {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online"}, + std::vector<std::tuple<std::string, std::string, std::string, + int, std::string, std::string>> expected { + {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online", 0, "", ""}, }; ASSERT_TRUE(verifyResult(resp, expected)); } @@ -254,7 +255,7 @@ TEST_F(SchemaTest, metaCommunication) { // Test existent tag { cpp2::ExecutionResponse resp; - std::string query = "CREATE TAG person(id int, balance double)"; + std::string query = "CREATE TAG person(id int)"; auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); } @@ -640,6 +641,7 @@ TEST_F(SchemaTest, metaCommunication) { client->execute(query, resp); ASSERT_EQ(1, (*(resp.get_rows())).size()); } + sleep(FLAGS_load_data_interval_secs + 1); } @@ -650,8 +652,9 @@ TEST_F(SchemaTest, TTLtest) { cpp2::ExecutionResponse resp; std::string query = "SHOW HOSTS"; client->execute(query, resp); - std::vector<uniform_tuple_t<std::string, 3>> expected{ - {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online"}, + std::vector<std::tuple<std::string, std::string, std::string, + int, std::string, std::string>> expected { + {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online", 0, "", ""}, }; ASSERT_TRUE(verifyResult(resp, expected)); } diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index 41a86961fd1593d178a5dcf2e94073f312bce024..dd10040ca438e9fa2c98b0029e1618938d039086 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -145,4 +145,14 @@ std::string ConfigSentence::toString() const { return "Unknown"; } +std::string BalanceSentence::toString() const { + switch (subType_) { + case SubType::kLeader: + return std::string("BALANCE LEADER"); + default: + FLOG_FATAL("Type illegal"); + } + return "Unknown"; +} + } // namespace nebula diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index e742474fac3197e5c980c562e4f0ef9291df20af..689b608df3ea84d11f9dc50c10ae48b4d9c5e8da 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -354,6 +354,29 @@ private: std::unique_ptr<ConfigRowItem> configItem_; }; +class BalanceSentence final : public Sentence { +public: + enum class SubType : uint32_t { + kUnknown, + kLeader, + }; + + // TODO: add more subtype for balance + explicit BalanceSentence(SubType subType) { + kind_ = Kind::kBalance; + subType_ = std::move(subType); + } + + std::string toString() const override; + + SubType subType() const { + return subType_; + } + +private: + SubType subType_{SubType::kUnknown}; +}; + } // namespace nebula #endif // PARSER_ADMINSENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 7aa22cfda108548173f87a87349b357dacd7c95f..fe118c965f765c1f7c449a127c6bc160343cfed3 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -56,6 +56,7 @@ public: kConfig, kFetchVertices, kFetchEdges, + kBalance, }; Kind kind() const { diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 6d9d8efd9c073a97883362137673b9620703a260..5d3947cb19a45f08b9c545f55524d95d4557a379 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -104,6 +104,7 @@ class GraphScanner; %token KW_ORDER KW_ASC %token KW_FETCH KW_PROP %token KW_DISTINCT KW_ALL +%token KW_BALANCE KW_LEADER /* symbols */ %token L_PAREN R_PAREN L_BRACKET R_BRACKET L_BRACE R_BRACE COMMA %token PIPE OR AND LT LE GT GE EQ NE PLUS MINUS MUL DIV MOD NOT NEG ASSIGN @@ -195,7 +196,7 @@ class GraphScanner; %type <sentence> create_user_sentence alter_user_sentence drop_user_sentence change_password_sentence %type <sentence> grant_sentence revoke_sentence %type <sentence> download_sentence -%type <sentence> set_config_sentence get_config_sentence +%type <sentence> set_config_sentence get_config_sentence balance_sentence %type <sentence> sentence %type <sentences> sentences @@ -1483,6 +1484,12 @@ set_config_sentence } ; +balance_sentence + : KW_BALANCE KW_LEADER { + $$ = new BalanceSentence(BalanceSentence::SubType::kLeader); + } + ; + mutate_sentence : insert_vertex_sentence { $$ = $1; } | insert_edge_sentence { $$ = $1; } @@ -1523,6 +1530,7 @@ maintain_sentence | revoke_sentence { $$ = $1; } | get_config_sentence { $$ = $1; } | set_config_sentence { $$ = $1; } + | balance_sentence { $$ = $1; } ; sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 372b669ae8f2546eabecfa3464da28011d6f70b2..eb25f3527bdd6cd7f1e21d2225832e674fc11ccc 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -113,6 +113,8 @@ STORAGE ([Ss][Tt][Oo][Rr][Aa][Gg][Ee]) FETCH ([Ff][Ee][Tt][Cc][Hh]) PROP ([Pp][Rr][Oo][Pp]) ALL ([Aa][Ll][Ll]) +BALANCE ([Bb][Aa][Ll][Aa][Nn][Cc][Ee]) +LEADER ([Ll][Ee][Aa][Dd][Ee][Rr]) LABEL ([a-zA-Z][_a-zA-Z0-9]*) DEC ([0-9]) @@ -215,6 +217,8 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {FETCH} { return TokenType::KW_FETCH; } {PROP} { return TokenType::KW_PROP; } {ALL} { return TokenType::KW_ALL; } +{BALANCE} { return TokenType::KW_BALANCE; } +{LEADER} { return TokenType::KW_LEADER; } "." { return TokenType::DOT; } "," { return TokenType::COMMA; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index c796ca447e1cc6f68134ff506b60c03c164959b2..efbf5f6a00653b414ab3fe43cb4214f250b5d30f 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -1146,4 +1146,13 @@ TEST(Parser, ConfigOperation) { } } +TEST(Parser, BalanceOperation) { + { + GQLParser parser; + std::string query = "BALANCE LEADER"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } +} + } // namespace nebula diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index 1110e684a7fdde310b59fd491389941874ed7137..36f49c6ee65ac325d8aad08e929109ffcbb004f4 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -360,6 +360,12 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("Variables", TokenType::KW_VARIABLES), CHECK_SEMANTIC_TYPE("ALL", TokenType::KW_ALL), CHECK_SEMANTIC_TYPE("all", TokenType::KW_ALL), + CHECK_SEMANTIC_TYPE("BALANCE", TokenType::KW_BALANCE), + CHECK_SEMANTIC_TYPE("Balance", TokenType::KW_BALANCE), + CHECK_SEMANTIC_TYPE("balance", TokenType::KW_BALANCE), + CHECK_SEMANTIC_TYPE("LEADER", TokenType::KW_LEADER), + CHECK_SEMANTIC_TYPE("Leader", TokenType::KW_LEADER), + CHECK_SEMANTIC_TYPE("leader", TokenType::KW_LEADER), CHECK_SEMANTIC_TYPE("_type", TokenType::TYPE_PROP), CHECK_SEMANTIC_TYPE("_id", TokenType::ID_PROP),