diff --git a/docs/developer_guides/chap01_system_overview.md b/docs/developer_guides/chap01_system_overview.md index e5d648253821576cf03ac9dada6040c6568c6970..e0e2923a16c3915e408f32051c6134c4b9776dcc 100644 --- a/docs/developer_guides/chap01_system_overview.md +++ b/docs/developer_guides/chap01_system_overview.md @@ -78,9 +78,9 @@ In order to boost throughput, we model Milvus as a stream-driven system. ```go type Component interface { - Init() error - Start() error - Stop() error + Init() + Start() + Stop() GetComponentStates() (ComponentStates, error) GetTimeTickChannel() (string, error) GetStatisticsChannel() (string, error) diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 9d0434d013cfaa337790c76802a9b9d993208daf..d999eb7179ba9aec511a3cb4e5715ac75144006b 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -209,7 +209,7 @@ Parser::ParseVecNode(const Json& out_body) { } }(); vec_node->query_info_.topK_ = topK; - vec_node->query_info_.metric_type_ = vec_info.at("metric_type"); + vec_node->query_info_.metric_type_ = GetMetricType(vec_info.at("metric_type")); vec_node->query_info_.search_params_ = vec_info.at("params"); vec_node->query_info_.field_offset_ = field_offset; vec_node->placeholder_tag_ = vec_info.at("query"); diff --git a/internal/core/src/query/PlanNode.h b/internal/core/src/query/PlanNode.h index bbe5e9d35d6c32626f4ff3647c42ffa2324c0f00..3b1e068b5552ddc2bce9a5d1dfe393410bf11dc6 100644 --- a/internal/core/src/query/PlanNode.h +++ b/internal/core/src/query/PlanNode.h @@ -41,7 +41,8 @@ using PlanNodePtr = std::unique_ptr<PlanNode>; struct QueryInfo { int64_t topK_; FieldOffset field_offset_; - std::string metric_type_; // TODO: use enum + MetricType metric_type_; + std::string deprecated_metric_type_; // TODO: use enum nlohmann::json search_params_; }; diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 1389b71fd01c3d8ca07b80b81667baae3e1f6d60..54edd7e2f5c6da2a1ea5b97fc690e9f120fea2e7 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -63,7 +63,7 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, auto dim = field.get_dim(); auto topK = info.topK_; auto total_count = topK * num_queries; - auto metric_type = GetMetricType(info.metric_type_); + auto metric_type = info.metric_type_; // step 3: small indexing search // std::vector<int64_t> final_uids(total_count, -1); @@ -138,7 +138,7 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, auto& record = segment.get_insert_record(); // step 1: binary search to find the barrier of the snapshot // auto ins_barrier = get_barrier(record, timestamp); - auto metric_type = GetMetricType(info.metric_type_); + auto metric_type = info.metric_type_; // auto del_barrier = get_barrier(deleted_record_, timestamp); #if 0 @@ -195,37 +195,24 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, } // TODO: refactor and merge this into one -template <typename VectorType> void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, int64_t ins_barrier, const query::QueryInfo& info, - const EmbeddedType<VectorType>* query_data, + const void* query_data, int64_t num_queries, const faiss::BitsetView& bitset, QueryResult& results) { - static_assert(IsVector<VectorType>); - if constexpr (std::is_same_v<VectorType, FloatVector>) { - FloatSearch(segment, info, query_data, num_queries, ins_barrier, bitset, results); + // TODO: add data_type to info + auto data_type = segment.get_schema()[info.field_offset_].get_data_type(); + Assert(datatype_is_vector(data_type)); + if (data_type == DataType::VECTOR_FLOAT) { + auto typed_data = reinterpret_cast<const float*>(query_data); + FloatSearch(segment, info, typed_data, num_queries, ins_barrier, bitset, results); } else { - BinarySearch(segment, info, query_data, num_queries, ins_barrier, bitset, results); + auto typed_data = reinterpret_cast<const uint8_t*>(query_data); + BinarySearch(segment, info, typed_data, num_queries, ins_barrier, bitset, results); } } -template void -SearchOnGrowing<FloatVector>(const segcore::SegmentGrowingImpl& segment, - int64_t ins_barrier, - const query::QueryInfo& info, - const EmbeddedType<FloatVector>* query_data, - int64_t num_queries, - const faiss::BitsetView& bitset, - QueryResult& results); -template void -SearchOnGrowing<BinaryVector>(const segcore::SegmentGrowingImpl& segment, - int64_t ins_barrier, - const query::QueryInfo& info, - const EmbeddedType<BinaryVector>* query_data, - int64_t num_queries, - const faiss::BitsetView& bitset, - QueryResult& results); } // namespace milvus::query diff --git a/internal/core/src/query/SearchOnGrowing.h b/internal/core/src/query/SearchOnGrowing.h index f1251d512443867c73bd5c03f52482badb037219..2f7869d8c3e6df51a21933f77e59176372929dde 100644 --- a/internal/core/src/query/SearchOnGrowing.h +++ b/internal/core/src/query/SearchOnGrowing.h @@ -20,12 +20,11 @@ namespace milvus::query { using BitmapChunk = boost::dynamic_bitset<>; using BitmapSimple = std::deque<BitmapChunk>; -template <typename VectorType> void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, int64_t ins_barrier, const query::QueryInfo& info, - const EmbeddedType<VectorType>* query_data, + const void* query_data, int64_t num_queries, const faiss::BitsetView& bitset, QueryResult& results); diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index 2aec8b41e8a373af1d26d884f642534b684119d6..e7f57435d955e68e47d655007f832dcf8d34d429 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -63,9 +63,7 @@ SearchOnSealed(const Schema& schema, Assert(record.is_ready(field_offset)); auto indexing_entry = record.get_entry(field_offset); - std::cout << " SearchOnSealed, indexing_entry->metric:" << indexing_entry->metric_type_ << std::endl; - std::cout << " SearchOnSealed, query_info.metric_type_:" << query_info.metric_type_ << std::endl; - Assert(indexing_entry->metric_type_ == GetMetricType(query_info.metric_type_)); + Assert(indexing_entry->metric_type_ == query_info.metric_type_); auto final = [&] { auto ds = knowhere::GenDataset(num_queries, dim, query_data); diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h index 96e9439ed26849f5d7db3294e7a30b3dafa4e6a5..bf432ded44461333969db72ee3603629671acbd9 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.h +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -37,7 +37,7 @@ class ExecExprVisitor : public ExprVisitor { public: using RetType = std::deque<boost::dynamic_bitset<>>; - ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count) + ExecExprVisitor(const segcore::SegmentInternalInterface& segment, int64_t row_count) : segment_(segment), row_count_(row_count) { } RetType diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.h b/internal/core/src/query/generated/ExecPlanNodeVisitor.h index 08ac2f694e9e46cd59f37d0909f173a56dce71b3..882c66fe1499c3459faedb58e092a4fef8043e98 100644 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ExecPlanNodeVisitor.h @@ -29,7 +29,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { public: using RetType = QueryResult; - ExecPlanNodeVisitor(const segcore::SegmentGrowing& segment, + ExecPlanNodeVisitor(const segcore::SegmentInterface& segment, Timestamp timestamp, const PlaceholderGroup& placeholder_group) : segment_(segment), timestamp_(timestamp), placeholder_group_(placeholder_group) { @@ -53,7 +53,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { private: // std::optional<RetType> ret_; - const segcore::SegmentGrowing& segment_; + const segcore::SegmentInterface& segment_; Timestamp timestamp_; const PlaceholderGroup& placeholder_group_; diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 0c3e85e4a263812ab8c7f98efccfcaf0b4d86a9d..c8652da4ba8d92daa11f645d5dfb4d6807a4a346 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -25,7 +25,7 @@ namespace impl { class ExecExprVisitor : ExprVisitor { public: using RetType = std::deque<boost::dynamic_bitset<>>; - ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count) + ExecExprVisitor(const segcore::SegmentInternalInterface& segment, int64_t row_count) : segment_(segment), row_count_(row_count) { } RetType diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index 62cfeb56e30205e7688b1ae1ccc962222c6712e5..d558978ee311d22313c98d4a8c6d1326260d38fe 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -28,7 +28,7 @@ namespace impl { class ExecPlanNodeVisitor : PlanNodeVisitor { public: using RetType = QueryResult; - ExecPlanNodeVisitor(const segcore::SegmentGrowing& segment, + ExecPlanNodeVisitor(const segcore::SegmentInterface& segment, Timestamp timestamp, const PlaceholderGroup& placeholder_group) : segment_(segment), timestamp_(timestamp), placeholder_group_(placeholder_group) { @@ -52,7 +52,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor { private: // std::optional<RetType> ret_; - const segcore::SegmentGrowing& segment_; + const segcore::SegmentInterface& segment_; Timestamp timestamp_; const PlaceholderGroup& placeholder_group_; @@ -66,7 +66,7 @@ void ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { // TODO: optimize here, remove the dynamic cast assert(!ret_.has_value()); - auto segment = dynamic_cast<const segcore::SegmentGrowingImpl*>(&segment_); + auto segment = dynamic_cast<const segcore::SegmentInternalInterface*>(&segment_); AssertInfo(segment, "support SegmentSmallIndex Only"); RetType ret; auto& ph = placeholder_group_.at(0); @@ -76,7 +76,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { aligned_vector<uint8_t> bitset_holder; BitsetView view; // TODO: add API to unify row_count - auto row_count = segcore::get_barrier(segment->get_insert_record(), timestamp_); + auto row_count = segment->get_row_count(); if (node.predicate_.has_value()) { ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment, row_count).call_child(*node.predicate_.value()); @@ -84,12 +84,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8); } - auto& sealed_indexing = segment->get_sealed_indexing_record(); - if (sealed_indexing.is_ready(node.query_info_.field_offset_)) { - SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, view, ret); - } else { - SearchOnGrowing<VectorType>(*segment, row_count, node.query_info_, src_data, num_queries, view, ret); - } + segment->vector_search(row_count, node.query_info_, src_data, num_queries, view, ret); ret_ = ret; } diff --git a/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp index 6154dee4c16645f9ef3d88dcf2a11ac54b4ab9c0..bbbd7df45bbac60147fb29cb41fd7ccbca45e64d 100644 --- a/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp @@ -54,12 +54,12 @@ ShowPlanNodeVisitor::visit(FloatVectorANNS& node) { assert(!ret_); auto& info = node.query_info_; Json json_body{ - {"node_type", "FloatVectorANNS"}, // - {"metric_type", info.metric_type_}, // - {"field_offset_", info.field_offset_.get()}, // - {"topK", info.topK_}, // - {"search_params", info.search_params_}, // - {"placeholder_tag", node.placeholder_tag_}, // + {"node_type", "FloatVectorANNS"}, // + {"metric_type", MetricTypeToName(info.metric_type_)}, // + {"field_offset_", info.field_offset_.get()}, // + {"topK", info.topK_}, // + {"search_params", info.search_params_}, // + {"placeholder_tag", node.placeholder_tag_}, // }; if (node.predicate_.has_value()) { ShowExprVisitor expr_show; @@ -76,12 +76,12 @@ ShowPlanNodeVisitor::visit(BinaryVectorANNS& node) { assert(!ret_); auto& info = node.query_info_; Json json_body{ - {"node_type", "BinaryVectorANNS"}, // - {"metric_type", info.metric_type_}, // - {"field_offset_", info.field_offset_.get()}, // - {"topK", info.topK_}, // - {"search_params", info.search_params_}, // - {"placeholder_tag", node.placeholder_tag_}, // + {"node_type", "BinaryVectorANNS"}, // + {"metric_type", MetricTypeToName(info.metric_type_)}, // + {"field_offset_", info.field_offset_.get()}, // + {"topK", info.topK_}, // + {"search_params", info.search_params_}, // + {"placeholder_tag", node.placeholder_tag_}, // }; if (node.predicate_.has_value()) { ShowExprVisitor expr_show; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 3044de2142c23ba49bfe8f95acced1916a22cf8a..b356dc208b05a267a8e105a037c9b238f9ce2299 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -19,6 +19,7 @@ #include <knowhere/index/vector_index/adapter/VectorAdapter.h> #include <knowhere/index/vector_index/VecIndexFactory.h> #include <faiss/utils/distances.h> +#include <query/SearchOnSealed.h> #include "query/generated/ExecPlanNodeVisitor.h" #include "segcore/SegmentGrowingImpl.h" #include "query/PlanNode.h" @@ -237,17 +238,6 @@ SegmentGrowingImpl::GetMemoryUsageInBytes() const { return total_bytes; } -QueryResult -SegmentGrowingImpl::Search(const query::Plan* plan, - const query::PlaceholderGroup** placeholder_groups, - const Timestamp* timestamps, - int64_t num_groups) const { - Assert(num_groups == 1); - query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); - auto results = visitor.get_moved_result(*plan->plan_node_); - return results; -} - Status SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) { auto field_offset = schema_->get_offset(FieldName(info.field_name)); @@ -270,5 +260,19 @@ SegmentGrowingImpl::num_chunk_data() const { auto size = get_insert_record().ack_responder_.GetAck(); return upper_div(size, chunk_size_); } +void +SegmentGrowingImpl::vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const { + auto& sealed_indexing = this->get_sealed_indexing_record(); + if (sealed_indexing.is_ready(query_info.field_offset_)) { + query::SearchOnSealed(this->get_schema(), sealed_indexing, query_info, query_data, query_count, bitset, output); + } else { + SearchOnGrowing(*this, vec_count, query_info, query_data, query_count, bitset, output); + } +} } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index e779cac177b7734adf3fd69999d95ef7a283f222..6a54e05be7fac8c9ff852fb229be56c4c124f3e6 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -53,12 +53,6 @@ class SegmentGrowingImpl : public SegmentGrowing { Status Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; - QueryResult - Search(const query::Plan* Plan, - const query::PlaceholderGroup* placeholder_groups[], - const Timestamp timestamps[], - int64_t num_groups) const override; - // stop receive insert requests // will move data to immutable vector or something Status @@ -181,6 +175,14 @@ class SegmentGrowingImpl : public SegmentGrowing { indexing_record_(*schema_, chunk_size) { } + void + vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const override; + public: std::shared_ptr<DeletedRecord::TmpBitmap> get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false); diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index e6d4303b5f1f8649d1ac5795008c9dc0073beec7..0066ee360f4d25ad1ddf7ca8a1e71674fef4877d 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/SegmentInterface.h" +#include "query/generated/ExecPlanNodeVisitor.h" namespace milvus::segcore { class Naive; @@ -39,4 +40,16 @@ SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) results.row_data_.emplace_back(std::move(blob)); } } + +QueryResult +SegmentInterface::Search(const query::Plan* plan, + const query::PlaceholderGroup** placeholder_groups, + const Timestamp* timestamps, + int64_t num_groups) const { + Assert(num_groups == 1); + query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); + auto results = visitor.get_moved_result(*plan->plan_node_); + return results; +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 08f18b4b45b8a283ea4c894f99509558bab2cfc6..a6f0668e3eee8f0e93447748faf6a64470beddf2 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -17,6 +17,7 @@ #include "IndexingEntry.h" #include <knowhere/index/vector_index/VecIndex.h> #include "common/SystemProperty.h" +#include "query/PlanNode.h" namespace milvus::segcore { @@ -25,11 +26,11 @@ class SegmentInterface { void FillTargetEntry(const query::Plan* plan, QueryResult& results) const; - virtual QueryResult + QueryResult Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], const Timestamp timestamps[], - int64_t num_groups) const = 0; + int64_t num_groups) const; virtual int64_t GetMemoryUsageInBytes() const = 0; @@ -71,6 +72,14 @@ class SegmentInternalInterface : public SegmentInterface { } public: + virtual void + vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const = 0; + virtual int64_t num_chunk_index_safe(FieldOffset field_offset) const = 0; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index caa1670763dc08d3e7811016335885a2d6b6b57f..a0ef3a7723b65807d92c6449062b67d739b0b89a 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/SegmentSealedImpl.h" +#include "query/SearchOnSealed.h" namespace milvus::segcore { void SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { @@ -107,14 +108,6 @@ SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) return nullptr; } -QueryResult -SegmentSealedImpl::Search(const query::Plan* Plan, - const query::PlaceholderGroup** placeholder_groups, - const Timestamp* timestamps, - int64_t num_groups) const { - PanicInfo("unimplemented"); -} - int64_t SegmentSealedImpl::GetMemoryUsageInBytes() const { // TODO: add estimate for index @@ -134,6 +127,20 @@ SegmentSealedImpl::get_schema() const { return *schema_; } +void +SegmentSealedImpl::vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const { + auto field_offset = query_info.field_offset_; + auto& field_meta = schema_->operator[](field_offset); + Assert(field_meta.is_vector()); + Assert(vec_indexings_.is_ready(field_offset)); + query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output); +} + SegmentSealedPtr CreateSealedSegment(SchemaPtr schema, int64_t chunk_size) { return std::make_unique<SegmentSealedImpl>(schema); diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index ce7989f18a6fc7e368be65606a559d84c08855ee..f901e36679810a8cfc14e0064207ea1979a1bb33 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -26,12 +26,6 @@ class SegmentSealedImpl : public SegmentSealed { LoadFieldData(const LoadFieldDataInfo& info) override; public: - QueryResult - Search(const query::Plan* Plan, - const query::PlaceholderGroup* placeholder_groups[], - const Timestamp timestamps[], - int64_t num_groups) const override; - int64_t GetMemoryUsageInBytes() const override; @@ -104,6 +98,14 @@ class SegmentSealedImpl : public SegmentSealed { } } + void + vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const override; + bool is_all_ready() const { // TODO: optimize here diff --git a/internal/core/src/segcore/plan_c.cpp b/internal/core/src/segcore/plan_c.cpp index b3cb53543aac8a03dc1e26b9b8d4b3741d5b02de..16455ea4b5a09d55dcd4127778b90655edc2fc46 100644 --- a/internal/core/src/segcore/plan_c.cpp +++ b/internal/core/src/segcore/plan_c.cpp @@ -78,7 +78,8 @@ GetTopK(CPlan plan) { const char* GetMetricType(CPlan plan) { auto query_plan = static_cast<milvus::query::Plan*>(plan); - return strdup(query_plan->plan_node_->query_info_.metric_type_.c_str()); + auto metric_str = milvus::MetricTypeToName(query_plan->plan_node_->query_info_.metric_type_); + return strdup(metric_str.c_str()); } void diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 0f00d324f1299533754a99db69e24b925fe5a9cc..84d10709cc3581e701a44be70f285218136d3914 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -20,6 +20,7 @@ #include "common/type_c.h" #include <knowhere/index/vector_index/VecIndex.h> #include <knowhere/index/vector_index/adapter/VectorAdapter.h> +#include "common/Types.h" ////////////////////////////// common interfaces ////////////////////////////// CSegmentInterface @@ -79,7 +80,7 @@ Search(CSegmentInterface c_segment, auto status = CStatus(); try { *query_result = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups); - if (plan->plan_node_->query_info_.metric_type_ != "IP") { + if (plan->plan_node_->query_info_.metric_type_ != milvus::MetricType::METRIC_INNER_PRODUCT) { for (auto& dis : query_result->result_distances_) { dis *= -1; } diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 7d104400fd840b4d98809d739ddb217fd9d93f58..8a85e2eadd72ad08cbe2a1b545204e14484d3657 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -235,7 +235,8 @@ TEST(Expr, ShowExecutor) { int64_t num_queries = 100L; auto raw_data = DataGen(schema, num_queries); auto& info = node->query_info_; - info.metric_type_ = "L2"; + + info.metric_type_ = MetricType::METRIC_L2; info.topK_ = 20; info.field_offset_ = FieldOffset(0); node->predicate_ = std::nullopt; diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index e8124ac7466d5697d762f9ede2610570debb50d2..3470e274a21c3e4b2d859c271026ba894594de33 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -20,51 +20,12 @@ #include "query/generated/ExecPlanNodeVisitor.h" #include "query/PlanImpl.h" #include "segcore/SegmentGrowingImpl.h" +#include "segcore/SegmentSealed.h" #include "pb/schema.pb.h" using namespace milvus; using namespace milvus::query; using namespace milvus::segcore; -TEST(Query, Naive) { - SUCCEED(); - using namespace milvus::wtf; - std::string dsl_string = R"( -{ - "bool": { - "must": [ - { - "term": { - "A": [ - 1, - 2, - 5 - ] - } - }, - { - "range": { - "B": { - "GT": 1, - "LT": 100 - } - } - }, - { - "vector": { - "Vec": { - "metric_type": "L2", - "params": { - "nprobe": 10 - }, - "query": "$0", - "topk": 10 - } - } - } - ] - } -})"; -} TEST(Query, ShowExecutor) { using namespace milvus::query; @@ -76,7 +37,7 @@ TEST(Query, ShowExecutor) { int64_t num_queries = 100L; auto raw_data = DataGen(schema, num_queries); auto& info = node->query_info_; - info.metric_type_ = "L2"; + info.metric_type_ = MetricType::METRIC_L2; info.topK_ = 20; info.field_offset_ = FieldOffset(1000); node->predicate_ = std::nullopt; @@ -258,6 +219,7 @@ TEST(Query, ExecWithPredicate) { ])"); ASSERT_EQ(json.dump(2), ref.dump(2)); } + TEST(Query, ExecTerm) { using namespace milvus::query; using namespace milvus::segcore; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 671deb9084e6853cc75a924334ed18838b53ad8d..022916395e724e470c18fbc6251ee8dc8587b48c 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -22,7 +22,7 @@ using namespace milvus; using namespace milvus::segcore; -using namespace milvus; +using namespace milvus::query; TEST(Sealed, without_predicate) { using namespace milvus::query; @@ -286,4 +286,41 @@ TEST(Sealed, LoadFieldData) { ASSERT_EQ(chunk_span1[i], ref1[i]); ASSERT_EQ(chunk_span2[i], ref2[i]); } -} \ No newline at end of file + std::string dsl = R"({ + "bool": { + "must": [ + { + "range": { + "double": { + "GE": -1, + "LT": 1 + } + } + }, + { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 5 + } + } + } + ] + } + })"; + + auto plan = CreatePlan(*schema, dsl); + auto num_queries = 5; + auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + Timestamp time = 1000000; + std::vector<const PlaceholderGroup*> ph_group_arr = {ph_group.get()}; + + auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); + auto json = QueryResultToJson(qr); + std::cout << json.dump(1); +} diff --git a/internal/core/unittest/test_utils/DataGen.cpp b/internal/core/unittest/test_utils/DataGen.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f9f29239c07424b9228df7dcb6cf41275e874afd --- /dev/null +++ b/internal/core/unittest/test_utils/DataGen.cpp @@ -0,0 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +// +// Created by mike on 1/21/21. +// diff --git a/internal/distributed/datanode/client.go b/internal/distributed/datanode/client.go index 1bdfcc492c5699e4e674f0f1f58fcd95ad4f0a8e..a8480b6cbe93771a573e81429d785afdc469f73a 100644 --- a/internal/distributed/datanode/client.go +++ b/internal/distributed/datanode/client.go @@ -11,15 +11,15 @@ type Client struct { // GOOSE TODO: add DataNodeClient } -func (c *Client) Init() error { +func (c *Client) Init() { panic("implement me") } -func (c *Client) Start() error { +func (c *Client) Start() { panic("implement me") } -func (c *Client) Stop() error { +func (c *Client) Stop() { panic("implement me") } diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 5451595a380eda6dea4e538b2f71dcf7c9e227c6..7bf384d867ad27dbbf8da20a32e51b6d18220d03 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -28,9 +28,8 @@ type Server struct { loopWg sync.WaitGroup } -func (s *Server) Init() error { +func (s *Server) Init() { indexservice.Params.Init() - return nil } func (s *Server) Start() error { @@ -38,9 +37,8 @@ func (s *Server) Start() error { return s.startIndexServer() } -func (s *Server) Stop() error { +func (s *Server) Stop() { s.loopWg.Wait() - return nil } func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { diff --git a/internal/distributed/querynode/client.go b/internal/distributed/querynode/client.go index 0b0e0bb13ee2b10b9680b97828537ee613ae5309..6a0672f8a35ace09a15f1c58c0491f597280e60c 100644 --- a/internal/distributed/querynode/client.go +++ b/internal/distributed/querynode/client.go @@ -13,15 +13,15 @@ type Client struct { grpcClient querypb.QueryNodeClient } -func (c *Client) Init() error { +func (c *Client) Init() { panic("implement me") } -func (c *Client) Start() error { +func (c *Client) Start() { panic("implement me") } -func (c *Client) Stop() error { +func (c *Client) Stop() { panic("implement me") } diff --git a/internal/distributed/queryservice/client.go b/internal/distributed/queryservice/client.go index 64238f4bf649cab6d49a85beba468734aa0fb236..3609e5670ada7269046cada1ccfa031fd50ca2b6 100644 --- a/internal/distributed/queryservice/client.go +++ b/internal/distributed/queryservice/client.go @@ -9,15 +9,15 @@ type Client struct { grpcClient querypb.QueryServiceClient } -func (c *Client) Init() error { +func (c *Client) Init() { panic("implement me") } -func (c *Client) Start() error { +func (c *Client) Start() { panic("implement me") } -func (c *Client) Stop() error { +func (c *Client) Stop() { panic("implement me") } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 0c3c258061ced46359a8dfb06d37701d35d6ad2a..9551886bfc5ac4891367e37193c1bac0e6171837 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -13,15 +13,15 @@ type Server struct { queryService queryServiceImpl.Interface } -func (s *Server) Init() error { +func (s *Server) Init() { panic("implement me") } -func (s *Server) Start() error { +func (s *Server) Start() { panic("implement me") } -func (s *Server) Stop() error { +func (s *Server) Stop() { panic("implement me") } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index b3e9a189961f17722fec47e56a5059c6682e4862..01644c8cad64abfb7226bc8f1a918a711fc3427c 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -45,15 +45,15 @@ type IndexNode struct { //serviceClient indexservice.Interface // method factory } -func (i *IndexNode) Init() error { +func (i *IndexNode) Init() { panic("implement me") } -func (i *IndexNode) Start() error { +func (i *IndexNode) Start() { panic("implement me") } -func (i *IndexNode) Stop() error { +func (i *IndexNode) Stop() { panic("implement me") } diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 7c0f8363671ec5c33b89c45403b1ca68755fc6a6..862de4f678e03c0478ae318807176910a69a3be8 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -46,15 +46,15 @@ type IndexService struct { type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp -func (i *IndexService) Init() error { +func (i *IndexService) Init() { panic("implement me") } -func (i *IndexService) Start() error { +func (i *IndexService) Start() { panic("implement me") } -func (i *IndexService) Stop() error { +func (i *IndexService) Stop() { panic("implement me") } diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go index 50890e47e41effaf76742c8c8bf389c67626e0ac..3dfb349964cdc2ecd78f0f226138f3a3d0e980bf 100644 --- a/internal/proxyservice/proxyservice.go +++ b/internal/proxyservice/proxyservice.go @@ -13,15 +13,15 @@ type ProxyService struct { } -func (s ProxyService) Init() error { +func (s ProxyService) Init() { panic("implement me") } -func (s ProxyService) Start() error { +func (s ProxyService) Start() { panic("implement me") } -func (s ProxyService) Stop() error { +func (s ProxyService) Stop() { panic("implement me") } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index d30699359d2c4dc785b2825f7229bd045829f9bc..6b0fbfb9de46d6676fe48c4011bb1014a8e599c9 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -16,13 +16,10 @@ import ( "context" "errors" "fmt" + queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice" "io" "sync/atomic" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - - queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice" - "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/config" @@ -34,7 +31,9 @@ import ( ) type Node interface { - typeutil.Service + Init() + Start() + Stop() GetComponentStates() (*internalpb2.ComponentStates, error) GetTimeTickChannel() (string, error) @@ -124,7 +123,7 @@ func Init() { Params.Init() } -func (node *QueryNode) Init() error { +func (node *QueryNode) Init() { registerReq := queryPb.RegisterNodeRequest{ Address: &commonpb.Address{ Ip: Params.QueryNodeIP, @@ -142,10 +141,9 @@ func (node *QueryNode) Init() error { // TODO: use response.initParams Params.Init() - return nil } -func (node *QueryNode) Start() error { +func (node *QueryNode) Start() { // todo add connectMaster logic // init services and manager node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) @@ -164,10 +162,9 @@ func (node *QueryNode) Start() error { node.stateCode.Store(internalpb2.StateCode_HEALTHY) <-node.queryNodeLoopCtx.Done() - return nil } -func (node *QueryNode) Stop() error { +func (node *QueryNode) Stop() { node.stateCode.Store(internalpb2.StateCode_ABNORMAL) node.queryNodeLoopCancel() @@ -190,7 +187,6 @@ func (node *QueryNode) Stop() error { if node.closer != nil { node.closer.Close() } - return nil } func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 5edf64c37c17e12bdf514640cab6f038f3e89406..fe904da260412204a1d92c800a492abecbbf387c 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -9,15 +9,15 @@ type QueryService struct { } //serverBase interface -func (qs *QueryService) Init() error { +func (qs *QueryService) Init() { panic("implement me") } -func (qs *QueryService) Start() error { +func (qs *QueryService) Start() { panic("implement me") } -func (qs *QueryService) Stop() error { +func (qs *QueryService) Stop() { panic("implement me") } diff --git a/internal/util/typeutil/service.go b/internal/util/typeutil/service.go index e04c7e35a064bef1ecdc85e09203000d15318ef6..9fe320b54ab1b03b4b00912f983579d3c3b06a7a 100644 --- a/internal/util/typeutil/service.go +++ b/internal/util/typeutil/service.go @@ -5,9 +5,9 @@ import ( ) type Service interface { - Init() error - Start() error - Stop() error + Init() + Start() + Stop() } type Component interface {