diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index f8e7ffffd8f17b019608b2b998a52d3343c47983..038b5eb12bc087d95385aa6dc1bb25648e1a59d9 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -49,6 +49,7 @@ nebula_add_library( MatchExecutor.cpp DeleteVertexExecutor.cpp FindPathExecutor.cpp + LimitExecutor.cpp ) nebula_add_library( diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index abd261c99be3e3c66bdf049bacf9c20d51e75a72..f3005c6d29a1d0691b92a0c164f21c4c546181e3 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -46,6 +46,7 @@ #include "graph/UpdateVertexExecutor.h" #include "graph/UpdateEdgeExecutor.h" #include "graph/FindPathExecutor.h" +#include "graph/LimitExecutor.h" namespace nebula { namespace graph { @@ -159,6 +160,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) { case Sentence::Kind::kFindPath: executor = std::make_unique<FindPathExecutor>(sentence, ectx()); break; + case Sentence::Kind::kLimit: + executor = std::make_unique<LimitExecutor>(sentence, ectx()); + break; case Sentence::Kind::kUnknown: LOG(FATAL) << "Sentence kind unknown"; break; diff --git a/src/executor/LimitExecutor.cpp b/src/executor/LimitExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..781834d16146df45e4c25d3f3d7c582eaae9c41b --- /dev/null +++ b/src/executor/LimitExecutor.cpp @@ -0,0 +1,142 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "graph/LimitExecutor.h" + +namespace nebula { +namespace graph { + +LimitExecutor::LimitExecutor(Sentence *sentence, ExecutionContext *ectx) : TraverseExecutor(ectx) { + sentence_ = static_cast<LimitSentence*>(sentence); +} + + +Status LimitExecutor::prepare() { + offset_ = sentence_->offset(); + if (offset_ < 0) { + return Status::SyntaxError("skip `%ld' is illegal", offset_); + } + count_ = sentence_->count(); + if (count_ < 0) { + return Status::SyntaxError("count `%ld' is illegal", count_); + } + + return Status::OK(); +} + + +void LimitExecutor::execute() { + FLOG_INFO("Executing Limit: %s", sentence_->toString().c_str()); + if (inputs_ == nullptr || count_ == 0) { + DCHECK(onFinish_); + onFinish_(); + return; + } + + auto ret = inputs_->getRows(); + if (!ret.ok()) { + DCHECK(onFinish_); + onFinish_(); + return; + } + auto inRows = std::move(ret).value(); + if (inRows.size() > static_cast<uint64_t>(offset_ + count_)) { + rows_.resize(count_); + rows_.assign(std::make_move_iterator(inRows.begin()) + offset_, + std::make_move_iterator(inRows.begin()) + offset_ + count_); + } else if (inRows.size() > static_cast<uint64_t>(offset_) && + inRows.size() <= static_cast<uint64_t>(offset_ + count_)) { + rows_.resize(inRows.size() - offset_); + rows_.assign(std::make_move_iterator(inRows.begin()) + offset_, + std::make_move_iterator(inRows.end())); + } + + if (onResult_) { + auto output = setupInterimResult(); + onResult_(std::move(output)); + } + + DCHECK(onFinish_); + onFinish_(); +} + + +void LimitExecutor::feedResult(std::unique_ptr<InterimResult> result) { + if (result == nullptr) { + return; + } + inputs_ = std::move(result); +} + + +std::unique_ptr<InterimResult> LimitExecutor::setupInterimResult() { + if (rows_.empty()) { + return nullptr; + } + + auto rsWriter = std::make_unique<RowSetWriter>(inputs_->schema()); + using Type = cpp2::ColumnValue::Type; + for (auto &row : rows_) { + RowWriter writer(inputs_->schema()); + auto columns = row.get_columns(); + for (auto &column : columns) { + switch (column.getType()) { + case cpp2::ColumnValue::Type::id: + writer << column.get_id(); + break; + case Type::integer: + writer << column.get_integer(); + break; + case Type::double_precision: + writer << column.get_double_precision(); + break; + case Type::bool_val: + writer << column.get_bool_val(); + break; + case Type::str: + writer << column.get_str(); + break; + case cpp2::ColumnValue::Type::timestamp: + writer << column.get_timestamp(); + break; + default: + LOG(FATAL) << "Not Support: " << column.getType(); + } + } + rsWriter->addRow(writer); + } + + auto result = std::make_unique<InterimResult>(getResultColumnNames()); + if (rsWriter != nullptr) { + result->setInterim(std::move(rsWriter)); + } + return result; +} + + +std::vector<std::string> LimitExecutor::getResultColumnNames() const { + std::vector<std::string> columnNames; + columnNames.reserve(inputs_->schema()->getNumFields()); + auto field = inputs_->schema()->begin(); + while (field) { + columnNames.emplace_back(field->getName()); + ++field; + } + return columnNames; +} + + +void LimitExecutor::setupResponse(cpp2::ExecutionResponse &resp) { + resp.set_column_names(getResultColumnNames()); + if (rows_.empty()) { + return; + } + + resp.set_rows(std::move(rows_)); +} +} // namespace graph +} // namespace nebula diff --git a/src/executor/LimitExecutor.h b/src/executor/LimitExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..5932b89cd27a236cfb2a999265cc95415b919d6d --- /dev/null +++ b/src/executor/LimitExecutor.h @@ -0,0 +1,46 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef GRAPH_LIMITEXECUTOR_H_ +#define GRAPH_LIMITEXECUTOR_H_ + +#include "base/Base.h" +#include "graph/TraverseExecutor.h" + +namespace nebula { +namespace graph { + +class LimitExecutor final : public TraverseExecutor { +public: + LimitExecutor(Sentence *sentence, ExecutionContext *ectx); + + const char* name() const override { + return "LimitExecutor"; + } + + Status MUST_USE_RESULT prepare() override; + + void execute() override; + + void feedResult(std::unique_ptr<InterimResult> result) override; + + void setupResponse(cpp2::ExecutionResponse &resp) override; + +private: + std::unique_ptr<InterimResult> setupInterimResult(); + std::vector<std::string> getResultColumnNames() const; + +private: + LimitSentence *sentence_{nullptr}; + std::unique_ptr<InterimResult> inputs_; + std::vector<cpp2::RowValue> rows_; + int64_t offset_{-1}; + int64_t count_{-1}; +}; +} // namespace graph +} // namespace nebula + +#endif // GRAPH_LIMITEXECUTOR_H diff --git a/src/executor/TraverseExecutor.cpp b/src/executor/TraverseExecutor.cpp index e3f8211b6c3fd91cb40762ee2311b020fc89b461..801f133d3997d6ea2d5ca0601bb2af6be33cfbb5 100644 --- a/src/executor/TraverseExecutor.cpp +++ b/src/executor/TraverseExecutor.cpp @@ -18,6 +18,7 @@ #include "graph/FindExecutor.h" #include "graph/MatchExecutor.h" #include "graph/FindPathExecutor.h" +#include "graph/LimitExecutor.h" namespace nebula { namespace graph { @@ -60,6 +61,9 @@ TraverseExecutor::makeTraverseExecutor(Sentence *sentence, ExecutionContext *ect case Sentence::Kind::kFindPath: executor = std::make_unique<FindPathExecutor>(sentence, ectx); break; + case Sentence::Kind::kLimit: + executor = std::make_unique<LimitExecutor>(sentence, ectx); + break; case Sentence::Kind::kUnknown: LOG(FATAL) << "Sentence kind unknown"; break; diff --git a/src/executor/test/CMakeLists.txt b/src/executor/test/CMakeLists.txt index b0feeb6a0268ae91808cc699f15368770aca268c..41d5ae9ca63a795377acea8c3250c5fe46a43d51 100644 --- a/src/executor/test/CMakeLists.txt +++ b/src/executor/test/CMakeLists.txt @@ -250,3 +250,20 @@ nebula_add_test( wangle gtest ) + +nebula_add_test( + NAME + group_by_limit_test + SOURCES + GroupByLimitTest.cpp + OBJECTS + $<TARGET_OBJECTS:graph_test_common_obj> + $<TARGET_OBJECTS:client_cpp_obj> + $<TARGET_OBJECTS:adHocSchema_obj> + ${GRAPH_TEST_LIBS} + LIBRARIES + ${THRIFT_LIBRARIES} + ${ROCKSDB_LIBRARIES} + wangle + gtest +) diff --git a/src/executor/test/DataTest.cpp b/src/executor/test/DataTest.cpp index b307ac4e1f9cdfe335c60a41585232bf2ac3156d..0dcb4559e91a811b34986a11af5ea67d1e641b4a 100644 --- a/src/executor/test/DataTest.cpp +++ b/src/executor/test/DataTest.cpp @@ -95,7 +95,7 @@ AssertionResult DataTest::prepareSchema() { << " failed, error code "<< static_cast<int32_t>(code); } } - // Test same propName diff tyep in diff tags + // Test same propName diff type in diff tags { cpp2::ExecutionResponse resp; std::string cmd = "CREATE TAG employee(name int)"; diff --git a/src/executor/test/GroupByLimitTest.cpp b/src/executor/test/GroupByLimitTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..88cfdfbfcfe288929afcdd14d29337bacd087254 --- /dev/null +++ b/src/executor/test/GroupByLimitTest.cpp @@ -0,0 +1,142 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "graph/test/TestEnv.h" +#include "graph/test/TestBase.h" +#include "graph/test/TraverseTestBase.h" +#include "meta/test/TestUtils.h" + +DECLARE_int32(load_data_interval_secs); + +namespace nebula { +namespace graph { + +class GroupByLimitTest : public TraverseTestBase { +protected: + void SetUp() override { + TraverseTestBase::SetUp(); + // ... + } + + void TearDown() override { + // ... + TraverseTestBase::TearDown(); + } +}; + +TEST_F(GroupByLimitTest, SyntaxError) { + { + cpp2::ExecutionResponse resp; + auto query = "GO FROM 1 OVER server | LIMIT -1, 2"; + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_SYNTAX_ERROR, code); + } + { + cpp2::ExecutionResponse resp; + auto query = "GO FROM 1 OVER server | LIMIT 1, -2"; + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_SYNTAX_ERROR, code); + } +} + +TEST_F(GroupByLimitTest, LimitTest) { + { + cpp2::ExecutionResponse resp; + auto &player = players_["Marco Belinelli"]; + auto *fmt = "GO FROM %ld OVER serve YIELD $$.team.name AS name" + " | ORDER BY $-.name | LIMIT 5"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector<std::tuple<std::string>> expected = { + {"76ers"}, + {"Bulls"}, + {"Hawks"}, + {"Hornets"}, + {"Kings"}, + }; + ASSERT_TRUE(verifyResult(resp, expected, false)); + } + // Test limit skip,count + { + cpp2::ExecutionResponse resp; + auto &player = players_["Marco Belinelli"]; + auto *fmt = "GO FROM %ld OVER serve YIELD $$.team.name AS name | " + "ORDER BY $-.name | LIMIT 2,2"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector<std::tuple<std::string>> expected = { + {"Hawks"}, + {"Hornets"}, + }; + ASSERT_TRUE(verifyResult(resp, expected, false)); + + // use OFFSET + auto *fmt1 = "GO FROM %ld OVER serve YIELD $$.team.name AS name | " + "ORDER BY $-.name | LIMIT 2 OFFSET 2"; + query = folly::stringPrintf(fmt1, player.vid()); + code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + ASSERT_TRUE(verifyResult(resp, expected, false)); + } + // test pipe output + { + cpp2::ExecutionResponse resp; + auto &player = players_["Marco Belinelli"]; + auto *fmt = "GO FROM %ld OVER like YIELD $$.player.name AS name, like._dst AS id " + "| ORDER BY $-.name | LIMIT 1 " + "| GO FROM $-.id OVER like YIELD $$.player.name AS name " + "| ORDER BY $-.name | LIMIT 2"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector<std::tuple<std::string>> expected = { + {"LeBron James"}, + {"Marco Belinelli"}, + }; + ASSERT_TRUE(verifyResult(resp, expected, false)); + } + // Test count is 0 + { + cpp2::ExecutionResponse resp; + auto &player = players_["Danny Green"]; + auto *fmt = "GO FROM %ld OVER serve YIELD $$.team.name AS name | LIMIT 1 OFFSET 0"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + ASSERT_EQ(nullptr, resp.get_rows()); + std::vector<std::string> expectedColNames{ + {"name"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + } + // Test less limit + { + cpp2::ExecutionResponse resp; + auto &player = players_["Danny Green"]; + auto *fmt = "GO FROM %ld OVER serve YIELD $$.team.name AS name | LIMIT 5"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + ASSERT_EQ(3, resp.get_rows()->size()); + } + // Test empty result + { + cpp2::ExecutionResponse resp; + auto &player = players_["Danny Green"]; + auto *fmt = "GO FROM %ld OVER serve YIELD $$.team.name AS name | LIMIT 3, 2"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector<std::tuple<std::string>> expected = {}; + ASSERT_TRUE(verifyResult(resp, expected)); + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 9a6bd30cf0b8938f89d5e53cb4147e02fa24aae8..b499d49782b30d0d43c3ef30beac1ada25c75eba 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -60,6 +60,7 @@ public: kFetchEdges, kBalance, kFindPath, + kLimit, }; Kind kind() const { diff --git a/src/parser/TraverseSentences.cpp b/src/parser/TraverseSentences.cpp index 60be945dee4c293d1d74d3bf5ce437d1adcc9ade..b21d5d3883a1aba0e2d61d717143b400d94d9327 100644 --- a/src/parser/TraverseSentences.cpp +++ b/src/parser/TraverseSentences.cpp @@ -294,4 +294,12 @@ std::string FindPathSentence::toString() const { } return buf; } + +std::string LimitSentence::toString() const { + if (offset_ == 0) { + return folly::stringPrintf("LIMIT %ld", count_); + } + + return folly::stringPrintf("LIMIT %ld,%ld", offset_, count_); +} } // namespace nebula diff --git a/src/parser/TraverseSentences.h b/src/parser/TraverseSentences.h index e00684da3d3b94980a70c8d321e92ee3f7d1f433..02bb0cb63c091a0e2cf9c23188c383170a7eb6ec 100644 --- a/src/parser/TraverseSentences.h +++ b/src/parser/TraverseSentences.h @@ -545,5 +545,27 @@ private: std::unique_ptr<StepClause> step_; std::unique_ptr<WhereClause> where_; }; + +class LimitSentence final : public Sentence { +public: + explicit LimitSentence(int64_t offset, int64_t count) : offset_(offset), count_(count) { + kind_ = Kind::kLimit; + } + + std::string toString() const override; + + int64_t offset() { + return offset_; + } + + int64_t count() { + return count_; + } + + private: + int64_t offset_{-1}; + int64_t count_{-1}; +}; } // namespace nebula #endif // PARSER_TRAVERSESENTENCES_H_ + diff --git a/src/parser/parser.yy b/src/parser/parser.yy index e73838e01656acdb6e32d2c1fad5ad4ccd1e7032..206d5c241c103b0f82b9b5c1025abe2879770a16 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -106,7 +106,7 @@ class GraphScanner; %token KW_ROLES KW_BY KW_DOWNLOAD KW_HDFS %token KW_VARIABLES KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE %token KW_TTL_DURATION KW_TTL_COL -%token KW_ORDER KW_ASC +%token KW_ORDER KW_ASC KW_LIMIT KW_OFFSET %token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN %token KW_DISTINCT KW_ALL KW_OF %token KW_BALANCE KW_LEADER KW_DATA @@ -193,7 +193,7 @@ class GraphScanner; %type <acl_item_clause> acl_item_clause %type <sentence> go_sentence match_sentence use_sentence find_sentence find_path_sentence -%type <sentence> order_by_sentence +%type <sentence> order_by_sentence limit_sentence %type <sentence> fetch_vertices_sentence fetch_edges_sentence %type <sentence> create_tag_sentence create_edge_sentence %type <sentence> alter_tag_sentence alter_edge_sentence @@ -809,6 +809,12 @@ to_clause } ; +limit_sentence + : KW_LIMIT INTEGER { $$ = new LimitSentence(0, $2); } + | KW_LIMIT INTEGER COMMA INTEGER { $$ = new LimitSentence($2, $4); } + | KW_LIMIT INTEGER KW_OFFSET INTEGER { $$ = new LimitSentence($2, $4); } + ; + use_sentence : KW_USE name_label { $$ = new UseSentence($2); } ; @@ -1026,6 +1032,7 @@ traverse_sentence | order_by_sentence { $$ = $1; } | fetch_sentence { $$ = $1; } | find_path_sentence { $$ = $1; } + | limit_sentence { $$ = $1; } ; set_sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 4c5165882586534e7d343060f8436c322a0e85b5..0c011afa23a4de15ddc2e5b6e6ce18d773f19055 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -124,6 +124,8 @@ OF ([Oo][Ff]) DATA ([Dd][Aa][Tt][Aa]) SHORTEST ([Ss][Hh][Oo][Rr][Tt][Ee][Ss][Tt]) PATH ([Pp][Aa][Tt][Hh]) +LIMIT ([Ll][Ii][Mm][Ii][Tt]) +OFFSET ([Oo][Ff][Ff][Ss][Ee][Tt]) LABEL ([a-zA-Z][_a-zA-Z0-9]*) DEC ([0-9]) @@ -237,6 +239,8 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {DATA} { return TokenType::KW_DATA; } {SHORTEST} { return TokenType::KW_SHORTEST; } {PATH} { return TokenType::KW_PATH; } +{LIMIT} { return TokenType::KW_LIMIT; } +{OFFSET} { return TokenType::KW_OFFSET; } "." { return TokenType::DOT; } "," { return TokenType::COMMA; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 13b689021df25a6991a9460c37461d1972b9fa6a..e675703c41f6f86fa1a0214b50170a6044e4a3f3 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -1376,4 +1376,32 @@ TEST(Parser, FindPath) { ASSERT_TRUE(result.ok()) << result.status(); } } + +TEST(Parser, Limit) { + { + GQLParser parser; + std::string query = "GO FROM 1 OVER work | LIMIT 1"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "GO FROM 1 OVER work | LIMIT 1,2"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "GO FROM 1 OVER work | LIMIT 1 OFFSET 2"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + // ERROR + { + GQLParser parser; + std::string query = "GO FROM 1 OVER work | LIMIT \"1\""; + auto result = parser.parse(query); + ASSERT_FALSE(result.ok()); + } +} } // namespace nebula diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index aad3e44c2d9bda527224b70cd98eca3b74f225be..54e8ec5110d5885fcd7bb9295b0932c5f7153b3e 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -388,6 +388,10 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("OF", TokenType::KW_OF), CHECK_SEMANTIC_TYPE("Of", TokenType::KW_OF), CHECK_SEMANTIC_TYPE("of", TokenType::KW_OF), + CHECK_SEMANTIC_TYPE("LIMIT", TokenType::KW_LIMIT), + CHECK_SEMANTIC_TYPE("limit", TokenType::KW_LIMIT), + CHECK_SEMANTIC_TYPE("OFFSET", TokenType::KW_OFFSET), + CHECK_SEMANTIC_TYPE("offset", TokenType::KW_OFFSET), CHECK_SEMANTIC_TYPE("_type", TokenType::TYPE_PROP), CHECK_SEMANTIC_TYPE("_id", TokenType::ID_PROP),