diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index a0800142debbcc48a8c5d7ce45f300c945d5bfcd..84bb68fcc679b68ba95884be7e25349a9aa49f37 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -319,14 +319,14 @@ Parser::ParseItemList(const Json& body) { std::vector<ExprPtr> results; if (body.is_object()) { // only one item; - auto new_entry = ParseAnyNode(body); - results.emplace_back(std::move(new_entry)); + auto new_expr = ParseAnyNode(body); + results.emplace_back(std::move(new_expr)); } else { // item array Assert(body.is_array()); for (auto& item : body) { - auto new_entry = ParseAnyNode(item); - results.emplace_back(std::move(new_entry)); + auto new_expr = ParseAnyNode(item); + results.emplace_back(std::move(new_expr)); } } auto old_size = results.size(); diff --git a/internal/core/src/query/SearchBruteForce.cpp b/internal/core/src/query/SearchBruteForce.cpp index 13ef843679f81110704f36b39e4238d06aa594dc..65bb5bb9003cc95de04dc59c2542d6c4e2842021 100644 --- a/internal/core/src/query/SearchBruteForce.cpp +++ b/internal/core/src/query/SearchBruteForce.cpp @@ -24,7 +24,7 @@ SubQueryResult BinarySearchBruteForceFast(MetricType metric_type, int64_t dim, const uint8_t* binary_chunk, - int64_t chunk_size, + int64_t size_per_chunk, int64_t topk, int64_t num_queries, const uint8_t* query_data, @@ -34,7 +34,7 @@ BinarySearchBruteForceFast(MetricType metric_type, idx_t* result_labels = sub_result.get_labels(); int64_t code_size = dim / 8; - const idx_t block_size = chunk_size; + const idx_t block_size = size_per_chunk; bool use_heap = true; if (metric_type == faiss::METRIC_Jaccard || metric_type == faiss::METRIC_Tanimoto) { @@ -50,7 +50,7 @@ BinarySearchBruteForceFast(MetricType metric_type, result_labels + query_base_index * topk, D + query_base_index * topk}; binary_distence_knn_hc(metric_type, &res, query_data + query_base_index * code_size, binary_chunk, - chunk_size, code_size, + size_per_chunk, code_size, /* ordered = */ true, bitset); } if (metric_type == faiss::METRIC_Tanimoto) { @@ -67,7 +67,7 @@ BinarySearchBruteForceFast(MetricType metric_type, } // only match ids will be chosed, not to use heap - binary_distence_knn_mc(metric_type, query_data + s * code_size, binary_chunk, nn, chunk_size, topk, + binary_distence_knn_mc(metric_type, query_data + s * code_size, binary_chunk, nn, size_per_chunk, topk, code_size, D + s * topk, result_labels + s * topk, bitset); } } else if (metric_type == faiss::METRIC_Hamming) { @@ -82,10 +82,10 @@ BinarySearchBruteForceFast(MetricType metric_type, faiss::int_maxheap_array_t res = {size_t(nn), size_t(topk), result_labels + s * topk, int_distances.data() + s * topk}; - hammings_knn_hc(&res, query_data + s * code_size, binary_chunk, chunk_size, code_size, + hammings_knn_hc(&res, query_data + s * code_size, binary_chunk, size_per_chunk, code_size, /* ordered = */ true, bitset); } else { - hammings_knn_mc(query_data + s * code_size, binary_chunk, nn, chunk_size, topk, code_size, + hammings_knn_mc(query_data + s * code_size, binary_chunk, nn, size_per_chunk, topk, code_size, int_distances.data() + s * topk, result_labels + s * topk, bitset); } } @@ -101,7 +101,7 @@ BinarySearchBruteForceFast(MetricType metric_type, SubQueryResult FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, const float* chunk_data, - int64_t chunk_size, + int64_t size_per_chunk, const faiss::BitsetView& bitset) { auto metric_type = query_dataset.metric_type; auto num_queries = query_dataset.num_queries; @@ -111,11 +111,11 @@ FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, if (metric_type == MetricType::METRIC_L2) { faiss::float_maxheap_array_t buf{(size_t)num_queries, (size_t)topk, sub_qr.get_labels(), sub_qr.get_values()}; - faiss::knn_L2sqr(query_dataset.query_data, chunk_data, dim, num_queries, chunk_size, &buf, bitset); + faiss::knn_L2sqr(query_dataset.query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset); return sub_qr; } else { faiss::float_minheap_array_t buf{(size_t)num_queries, (size_t)topk, sub_qr.get_labels(), sub_qr.get_values()}; - faiss::knn_inner_product(query_dataset.query_data, chunk_data, dim, num_queries, chunk_size, &buf, bitset); + faiss::knn_inner_product(query_dataset.query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset); return sub_qr; } } @@ -123,10 +123,10 @@ FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, SubQueryResult BinarySearchBruteForce(const dataset::BinaryQueryDataset& query_dataset, const uint8_t* binary_chunk, - int64_t chunk_size, + int64_t size_per_chunk, const faiss::BitsetView& bitset) { // TODO: refactor the internal function - return BinarySearchBruteForceFast(query_dataset.metric_type, query_dataset.dim, binary_chunk, chunk_size, + return BinarySearchBruteForceFast(query_dataset.metric_type, query_dataset.dim, binary_chunk, size_per_chunk, query_dataset.topk, query_dataset.num_queries, query_dataset.query_data, bitset); } } // namespace milvus::query diff --git a/internal/core/src/query/SearchBruteForce.h b/internal/core/src/query/SearchBruteForce.h index c7e166fbb102fba3827e1f2606e7935834075823..f544ed8540c267dfcd2727f3a5663e34efd5979d 100644 --- a/internal/core/src/query/SearchBruteForce.h +++ b/internal/core/src/query/SearchBruteForce.h @@ -21,13 +21,13 @@ namespace milvus::query { SubQueryResult BinarySearchBruteForce(const dataset::BinaryQueryDataset& query_dataset, const uint8_t* binary_chunk, - int64_t chunk_size, + int64_t size_per_chunk, const faiss::BitsetView& bitset); SubQueryResult FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, const float* chunk_data, - int64_t chunk_size, + int64_t size_per_chunk, const faiss::BitsetView& bitset); } // namespace milvus::query diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 54edd7e2f5c6da2a1ea5b97fc690e9f120fea2e7..6c0b61627437ed56ce0a2c5434624548267e2b35 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -72,46 +72,46 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, dataset::FloatQueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data}; auto max_indexed_id = indexing_record.get_finished_ack(); - const auto& indexing_entry = indexing_record.get_vec_entry(vecfield_offset); - auto search_conf = indexing_entry.get_search_conf(topK); + const auto& field_indexing = indexing_record.get_vec_field_indexing(vecfield_offset); + auto search_conf = field_indexing.get_search_conf(topK); for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) { - auto chunk_size = indexing_entry.get_chunk_size(); - auto indexing = indexing_entry.get_indexing(chunk_id); + auto size_per_chunk = field_indexing.get_size_per_chunk(); + auto indexing = field_indexing.get_chunk_indexing(chunk_id); - auto sub_view = BitsetSubView(bitset, chunk_id * chunk_size, chunk_size); + auto sub_view = BitsetSubView(bitset, chunk_id * size_per_chunk, size_per_chunk); auto sub_qr = SearchOnIndex(query_dataset, *indexing, search_conf, sub_view); // convert chunk uid to segment uid for (auto& x : sub_qr.mutable_labels()) { if (x != -1) { - x += chunk_id * chunk_size; + x += chunk_id * size_per_chunk; } } final_qr.merge(sub_qr); } - auto vec_ptr = record.get_entity<FloatVector>(vecfield_offset); + auto vec_ptr = record.get_field_data<FloatVector>(vecfield_offset); // step 4: brute force search where small indexing is unavailable - auto vec_chunk_size = vec_ptr->get_chunk_size(); - Assert(vec_chunk_size == indexing_entry.get_chunk_size()); - auto max_chunk = upper_div(ins_barrier, vec_chunk_size); + auto vec_size_per_chunk = vec_ptr->get_size_per_chunk(); + Assert(vec_size_per_chunk == field_indexing.get_size_per_chunk()); + auto max_chunk = upper_div(ins_barrier, vec_size_per_chunk); for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { auto& chunk = vec_ptr->get_chunk(chunk_id); - auto element_begin = chunk_id * vec_chunk_size; - auto element_end = std::min(ins_barrier, (chunk_id + 1) * vec_chunk_size); - auto chunk_size = element_end - element_begin; + auto element_begin = chunk_id * vec_size_per_chunk; + auto element_end = std::min(ins_barrier, (chunk_id + 1) * vec_size_per_chunk); + auto size_per_chunk = element_end - element_begin; - auto sub_view = BitsetSubView(bitset, element_begin, chunk_size); - auto sub_qr = FloatSearchBruteForce(query_dataset, chunk.data(), chunk_size, sub_view); + auto sub_view = BitsetSubView(bitset, element_begin, size_per_chunk); + auto sub_qr = FloatSearchBruteForce(query_dataset, chunk.data(), size_per_chunk, sub_view); // convert chunk uid to segment uid for (auto& x : sub_qr.mutable_labels()) { if (x != -1) { - x += chunk_id * vec_chunk_size; + x += chunk_id * vec_size_per_chunk; } } final_qr.merge(sub_qr); @@ -160,18 +160,18 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, // step 3: small indexing search query::dataset::BinaryQueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data}; - auto vec_ptr = record.get_entity<BinaryVector>(vecfield_offset); + auto vec_ptr = record.get_field_data<BinaryVector>(vecfield_offset); auto max_indexed_id = 0; // step 4: brute force search where small indexing is unavailable - auto vec_chunk_size = vec_ptr->get_chunk_size(); - auto max_chunk = upper_div(ins_barrier, vec_chunk_size); + auto vec_size_per_chunk = vec_ptr->get_size_per_chunk(); + auto max_chunk = upper_div(ins_barrier, vec_size_per_chunk); SubQueryResult final_result(num_queries, topK, metric_type); for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { auto& chunk = vec_ptr->get_chunk(chunk_id); - auto element_begin = chunk_id * vec_chunk_size; - auto element_end = std::min(ins_barrier, (chunk_id + 1) * vec_chunk_size); + auto element_begin = chunk_id * vec_size_per_chunk; + auto element_end = std::min(ins_barrier, (chunk_id + 1) * vec_size_per_chunk); auto nsize = element_end - element_begin; auto sub_view = BitsetSubView(bitset, element_begin, nsize); @@ -180,7 +180,7 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, // convert chunk uid to segment uid for (auto& x : sub_result.mutable_labels()) { if (x != -1) { - x += chunk_id * vec_chunk_size; + x += chunk_id * vec_size_per_chunk; } } final_result.merge(sub_result); diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index e7f57435d955e68e47d655007f832dcf8d34d429..b078b74552a3e619f3c31502d55fe7d0730ad52b 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -62,16 +62,16 @@ SearchOnSealed(const Schema& schema, auto dim = field.get_dim(); Assert(record.is_ready(field_offset)); - auto indexing_entry = record.get_entry(field_offset); - Assert(indexing_entry->metric_type_ == query_info.metric_type_); + auto field_indexing = record.get_field_indexing(field_offset); + Assert(field_indexing->metric_type_ == query_info.metric_type_); auto final = [&] { auto ds = knowhere::GenDataset(num_queries, dim, query_data); auto conf = query_info.search_params_; conf[milvus::knowhere::meta::TOPK] = query_info.topK_; - conf[milvus::knowhere::Metric::TYPE] = MetricTypeToName(indexing_entry->metric_type_); - return indexing_entry->indexing_->Query(ds, conf, bitset); + conf[milvus::knowhere::Metric::TYPE] = MetricTypeToName(field_indexing->metric_type_); + return field_indexing->indexing_->Query(ds, conf, bitset); }(); auto ids = final->Get<idx_t*>(knowhere::meta::IDS); diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index c8652da4ba8d92daa11f645d5dfb4d6807a4a346..058854af6a5aea731842bb1597d50a459460a1ea 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -120,41 +120,33 @@ template <typename T, typename IndexFunc, typename ElementFunc> auto ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_func, ElementFunc element_func) -> RetType { - auto data_type = expr.data_type_; auto& schema = segment_.get_schema(); auto field_offset = expr.field_offset_; auto& field_meta = schema[field_offset]; - // auto vec_ptr = records.get_entity<T>(field_offset); - // auto& vec = *vec_ptr; - // const segcore::ScalarIndexingEntry<T>& entry = indexing_record.get_scalar_entry<T>(field_offset); - - // RetType results(vec.num_chunk()); - auto indexing_barrier = segment_.num_chunk_index_safe(field_offset); - auto chunk_size = segment_.size_per_chunk(); - auto num_chunk = upper_div(row_count_, chunk_size); + auto indexing_barrier = segment_.num_chunk_index(field_offset); + auto size_per_chunk = segment_.size_per_chunk(); + auto num_chunk = upper_div(row_count_, size_per_chunk); RetType results; using Index = knowhere::scalar::StructuredIndex<T>; for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) { - // auto& result = results[chunk_id]; const Index& indexing = segment_.chunk_scalar_index<T>(field_offset, chunk_id); // NOTE: knowhere is not const-ready // This is a dirty workaround auto data = index_func(const_cast<Index*>(&indexing)); - Assert(data->size() == chunk_size); + Assert(data->size() == size_per_chunk); results.emplace_back(std::move(*data)); } for (auto chunk_id = indexing_barrier; chunk_id < num_chunk; ++chunk_id) { - boost::dynamic_bitset<> result(chunk_size); - // auto& result = results[chunk_id]; - result.resize(chunk_size); + boost::dynamic_bitset<> result(size_per_chunk); + result.resize(size_per_chunk); auto chunk = segment_.chunk_data<T>(field_offset, chunk_id); const T* data = chunk.data(); - for (int index = 0; index < chunk_size; ++index) { + for (int index = 0; index < size_per_chunk; ++index) { result[index] = element_func(data[index]); } - Assert(result.size() == chunk_size); + Assert(result.size() == size_per_chunk); results.emplace_back(std::move(result)); } return results; @@ -282,27 +274,19 @@ template <typename T> auto ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType { auto& expr = static_cast<TermExprImpl<T>&>(expr_raw); - // auto& records = segment_.get_insert_record(); - auto data_type = expr.data_type_; auto& schema = segment_.get_schema(); auto field_offset = expr_raw.field_offset_; auto& field_meta = schema[field_offset]; - // auto vec_ptr = records.get_entity<T>(field_offset); - // auto& vec = *vec_ptr; - auto chunk_size = segment_.size_per_chunk(); - auto num_chunk = upper_div(row_count_, chunk_size); + auto size_per_chunk = segment_.size_per_chunk(); + auto num_chunk = upper_div(row_count_, size_per_chunk); RetType bitsets; - - // auto N = records.ack_responder_.GetAck(); - // TODO: enable index for term - for (int64_t chunk_id = 0; chunk_id < num_chunk; ++chunk_id) { Span<T> chunk = segment_.chunk_data<T>(field_offset, chunk_id); - auto size = chunk_id == num_chunk - 1 ? row_count_ - chunk_id * chunk_size : chunk_size; + auto size = chunk_id == num_chunk - 1 ? row_count_ - chunk_id * size_per_chunk : size_per_chunk; - boost::dynamic_bitset<> bitset(chunk_size); + boost::dynamic_bitset<> bitset(size_per_chunk); for (int i = 0; i < size; ++i) { auto value = chunk.data()[i]; bool is_in = std::binary_search(expr.terms_.begin(), expr.terms_.end(), value); diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 506cfb9b805372e7edd6d3e96fab169b54b45706..fc796361d1d8dcfa905ff5a79c5123da961ea0aa 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -6,7 +6,7 @@ set(SEGCORE_FILES SegmentGrowing.cpp SegmentGrowingImpl.cpp SegmentSealedImpl.cpp - IndexingEntry.cpp + FieldIndexing.cpp InsertRecord.cpp Reduce.cpp plan_c.cpp diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index 87cfd76ac8d903f19e2711ed08040d5a68bd4019..bb8a8545335e0ac1eef336861034d4558e530ee0 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -72,7 +72,7 @@ class ThreadSafeVector { class VectorBase { public: - explicit VectorBase(int64_t chunk_size) : chunk_size_(chunk_size) { + explicit VectorBase(int64_t size_per_chunk) : size_per_chunk_(size_per_chunk) { } virtual ~VectorBase() = default; @@ -86,12 +86,12 @@ class VectorBase { get_span_base(int64_t chunk_id) const = 0; int64_t - get_chunk_size() const { - return chunk_size_; + get_size_per_chunk() const { + return size_per_chunk_; } protected: - const int64_t chunk_size_; + const int64_t size_per_chunk_; }; template <typename Type, bool is_scalar = false> @@ -111,27 +111,28 @@ class ConcurrentVectorImpl : public VectorBase { std::conditional_t<is_scalar, Type, std::conditional_t<std::is_same_v<Type, float>, FloatVector, BinaryVector>>; public: - explicit ConcurrentVectorImpl(ssize_t dim, int64_t chunk_size) : VectorBase(chunk_size), Dim(is_scalar ? 1 : dim) { + explicit ConcurrentVectorImpl(ssize_t dim, int64_t size_per_chunk) + : VectorBase(size_per_chunk), Dim(is_scalar ? 1 : dim) { Assert(is_scalar ? dim == 1 : dim != 1); } void grow_to_at_least(int64_t element_count) override { - auto chunk_count = upper_div(element_count, chunk_size_); - chunks_.emplace_to_at_least(chunk_count, Dim * chunk_size_); + auto chunk_count = upper_div(element_count, size_per_chunk_); + chunks_.emplace_to_at_least(chunk_count, Dim * size_per_chunk_); } Span<TraitType> get_span(int64_t chunk_id) const { auto& chunk = get_chunk(chunk_id); if constexpr (is_scalar) { - return Span<TraitType>(chunk.data(), chunk_size_); + return Span<TraitType>(chunk.data(), size_per_chunk_); } else if constexpr (std::is_same_v<Type, int64_t> || std::is_same_v<Type, int>) { // only for testing PanicInfo("unimplemented"); } else { static_assert(std::is_same_v<typename TraitType::embedded_type, Type>); - return Span<TraitType>(chunk.data(), chunk_size_, Dim); + return Span<TraitType>(chunk.data(), size_per_chunk_, Dim); } } @@ -151,28 +152,28 @@ class ConcurrentVectorImpl : public VectorBase { return; } this->grow_to_at_least(element_offset + element_count); - auto chunk_id = element_offset / chunk_size_; - auto chunk_offset = element_offset % chunk_size_; + auto chunk_id = element_offset / size_per_chunk_; + auto chunk_offset = element_offset % size_per_chunk_; ssize_t source_offset = 0; // first partition: - if (chunk_offset + element_count <= chunk_size_) { + if (chunk_offset + element_count <= size_per_chunk_) { // only first fill_chunk(chunk_id, chunk_offset, element_count, source, source_offset); return; } - auto first_size = chunk_size_ - chunk_offset; + auto first_size = size_per_chunk_ - chunk_offset; fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset); - source_offset += chunk_size_ - chunk_offset; + source_offset += size_per_chunk_ - chunk_offset; element_count -= first_size; ++chunk_id; // the middle - while (element_count >= chunk_size_) { - fill_chunk(chunk_id, 0, chunk_size_, source, source_offset); - source_offset += chunk_size_; - element_count -= chunk_size_; + while (element_count >= size_per_chunk_) { + fill_chunk(chunk_id, 0, size_per_chunk_, source, source_offset); + source_offset += size_per_chunk_; + element_count -= size_per_chunk_; ++chunk_id; } @@ -190,16 +191,16 @@ class ConcurrentVectorImpl : public VectorBase { // just for fun, don't use it directly const Type* get_element(ssize_t element_index) const { - auto chunk_id = element_index / chunk_size_; - auto chunk_offset = element_index % chunk_size_; + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; return get_chunk(chunk_id).data() + chunk_offset * Dim; } const Type& operator[](ssize_t element_index) const { Assert(Dim == 1); - auto chunk_id = element_index / chunk_size_; - auto chunk_offset = element_index % chunk_size_; + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; return get_chunk(chunk_id)[chunk_offset]; } @@ -232,24 +233,24 @@ template <typename Type> class ConcurrentVector : public ConcurrentVectorImpl<Type, true> { public: static_assert(std::is_fundamental_v<Type>); - explicit ConcurrentVector(int64_t chunk_size) - : ConcurrentVectorImpl<Type, true>::ConcurrentVectorImpl(1, chunk_size) { + explicit ConcurrentVector(int64_t size_per_chunk) + : ConcurrentVectorImpl<Type, true>::ConcurrentVectorImpl(1, size_per_chunk) { } }; template <> class ConcurrentVector<FloatVector> : public ConcurrentVectorImpl<float, false> { public: - ConcurrentVector(int64_t dim, int64_t chunk_size) - : ConcurrentVectorImpl<float, false>::ConcurrentVectorImpl(dim, chunk_size) { + ConcurrentVector(int64_t dim, int64_t size_per_chunk) + : ConcurrentVectorImpl<float, false>::ConcurrentVectorImpl(dim, size_per_chunk) { } }; template <> class ConcurrentVector<BinaryVector> : public ConcurrentVectorImpl<uint8_t, false> { public: - explicit ConcurrentVector(int64_t dim, int64_t chunk_size) - : binary_dim_(dim), ConcurrentVectorImpl(dim / 8, chunk_size) { + explicit ConcurrentVector(int64_t dim, int64_t size_per_chunk) + : binary_dim_(dim), ConcurrentVectorImpl(dim / 8, size_per_chunk) { Assert(dim % 8 == 0); } diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 7dc65594f84ddcdf71938fddc5a82f58e8e82118..98b4245a2e14d1c1a40a136f2126357e4ca89f06 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -29,9 +29,11 @@ struct DeletedRecord { std::shared_ptr<TmpBitmap> clone(int64_t capacity); }; - static constexpr int64_t deprecated_chunk_size = 32 * 1024; + static constexpr int64_t deprecated_size_per_chunk = 32 * 1024; DeletedRecord() - : lru_(std::make_shared<TmpBitmap>()), timestamps_(deprecated_chunk_size), uids_(deprecated_chunk_size) { + : lru_(std::make_shared<TmpBitmap>()), + timestamps_(deprecated_size_per_chunk), + uids_(deprecated_size_per_chunk) { lru_->bitmap_ptr = std::make_shared<faiss::ConcurrentBitset>(0); } diff --git a/internal/core/src/segcore/IndexingEntry.cpp b/internal/core/src/segcore/FieldIndexing.cpp similarity index 73% rename from internal/core/src/segcore/IndexingEntry.cpp rename to internal/core/src/segcore/FieldIndexing.cpp index 1cd64059552f1b7c0d2fc8a4f520500c311690fe..1309e2eb9649a1b98925b410e5225ebaeba74421 100644 --- a/internal/core/src/segcore/IndexingEntry.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -9,14 +9,14 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#include "segcore/IndexingEntry.h" +#include "segcore/FieldIndexing.h" #include <thread> #include <knowhere/index/vector_index/IndexIVF.h> #include <knowhere/index/vector_index/adapter/VectorAdapter.h> namespace milvus::segcore { void -VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { +VectorFieldIndexing::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field_meta_.get_dim(); @@ -30,7 +30,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector const auto& chunk = source->get_chunk(chunk_id); // build index for chunk auto indexing = std::make_unique<knowhere::IVF>(); - auto dataset = knowhere::GenDataset(source->get_chunk_size(), dim, chunk.data()); + auto dataset = knowhere::GenDataset(source->get_size_per_chunk(), dim, chunk.data()); indexing->Train(dataset, conf); indexing->AddWithoutIds(dataset, conf); data_[chunk_id] = std::move(indexing); @@ -38,7 +38,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector } knowhere::Config -VecIndexingEntry::get_build_conf() const { +VectorFieldIndexing::get_build_conf() const { return knowhere::Config{{knowhere::meta::DIM, field_meta_.get_dim()}, {knowhere::IndexParams::nlist, 100}, {knowhere::IndexParams::nprobe, 4}, @@ -47,7 +47,7 @@ VecIndexingEntry::get_build_conf() const { } knowhere::Config -VecIndexingEntry::get_search_conf(int top_K) const { +VectorFieldIndexing::get_search_conf(int top_K) const { return knowhere::Config{{knowhere::meta::DIM, field_meta_.get_dim()}, {knowhere::meta::TOPK, top_K}, {knowhere::IndexParams::nlist, 100}, @@ -71,8 +71,8 @@ IndexingRecord::UpdateResourceAck(int64_t chunk_ack, const InsertRecord& record) lck.unlock(); // std::thread([this, old_ack, chunk_ack, &record] { - for (auto& [field_offset, entry] : entries_) { - auto vec_base = record.get_base_entity(field_offset); + for (auto& [field_offset, entry] : field_indexings_) { + auto vec_base = record.get_field_data_base(field_offset); entry->BuildIndexRange(old_ack, chunk_ack, vec_base); } finished_ack_.AddSegment(old_ack, chunk_ack); @@ -81,7 +81,7 @@ IndexingRecord::UpdateResourceAck(int64_t chunk_ack, const InsertRecord& record) template <typename T> void -ScalarIndexingEntry<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { +ScalarFieldIndexing<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { auto source = dynamic_cast<const ConcurrentVector<T>*>(vec_base); Assert(source); auto num_chunk = source->num_chunk(); @@ -92,16 +92,16 @@ ScalarIndexingEntry<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const // build index for chunk // TODO auto indexing = std::make_unique<knowhere::scalar::StructuredIndexSort<T>>(); - indexing->Build(vec_base->get_chunk_size(), chunk.data()); + indexing->Build(vec_base->get_size_per_chunk(), chunk.data()); data_[chunk_id] = std::move(indexing); } } -std::unique_ptr<IndexingEntry> -CreateIndex(const FieldMeta& field_meta, int64_t chunk_size) { +std::unique_ptr<FieldIndexing> +CreateIndex(const FieldMeta& field_meta, int64_t size_per_chunk) { if (field_meta.is_vector()) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - return std::make_unique<VecIndexingEntry>(field_meta, chunk_size); + return std::make_unique<VectorFieldIndexing>(field_meta, size_per_chunk); } else { // TODO PanicInfo("unsupported"); @@ -109,19 +109,19 @@ CreateIndex(const FieldMeta& field_meta, int64_t chunk_size) { } switch (field_meta.get_data_type()) { case DataType::BOOL: - return std::make_unique<ScalarIndexingEntry<bool>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<bool>>(field_meta, size_per_chunk); case DataType::INT8: - return std::make_unique<ScalarIndexingEntry<int8_t>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<int8_t>>(field_meta, size_per_chunk); case DataType::INT16: - return std::make_unique<ScalarIndexingEntry<int16_t>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<int16_t>>(field_meta, size_per_chunk); case DataType::INT32: - return std::make_unique<ScalarIndexingEntry<int32_t>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<int32_t>>(field_meta, size_per_chunk); case DataType::INT64: - return std::make_unique<ScalarIndexingEntry<int64_t>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<int64_t>>(field_meta, size_per_chunk); case DataType::FLOAT: - return std::make_unique<ScalarIndexingEntry<float>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<float>>(field_meta, size_per_chunk); case DataType::DOUBLE: - return std::make_unique<ScalarIndexingEntry<double>>(field_meta, chunk_size); + return std::make_unique<ScalarFieldIndexing<double>>(field_meta, size_per_chunk); default: PanicInfo("unsupported"); } diff --git a/internal/core/src/segcore/IndexingEntry.h b/internal/core/src/segcore/FieldIndexing.h similarity index 63% rename from internal/core/src/segcore/IndexingEntry.h rename to internal/core/src/segcore/FieldIndexing.h index f790f5e1a156d98c5822fb08697d13ab62c64f5f..c18bc9d04fb7242e2f00594bb919b0cddb44b30e 100644 --- a/internal/core/src/segcore/IndexingEntry.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -24,14 +24,14 @@ namespace milvus::segcore { // this should be concurrent // All concurrent -class IndexingEntry { +class FieldIndexing { public: - explicit IndexingEntry(const FieldMeta& field_meta, int64_t chunk_size) - : field_meta_(field_meta), chunk_size_(chunk_size) { + explicit FieldIndexing(const FieldMeta& field_meta, int64_t size_per_chunk) + : field_meta_(field_meta), size_per_chunk_(size_per_chunk) { } - IndexingEntry(const IndexingEntry&) = delete; - IndexingEntry& - operator=(const IndexingEntry&) = delete; + FieldIndexing(const FieldIndexing&) = delete; + FieldIndexing& + operator=(const FieldIndexing&) = delete; // Do this in parallel virtual void @@ -43,29 +43,29 @@ class IndexingEntry { } int64_t - get_chunk_size() const { - return chunk_size_; + get_size_per_chunk() const { + return size_per_chunk_; } virtual knowhere::Index* - get_indexing(int64_t chunk_id) const = 0; + get_chunk_indexing(int64_t chunk_id) const = 0; protected: // additional info const FieldMeta& field_meta_; - const int64_t chunk_size_; + const int64_t size_per_chunk_; }; template <typename T> -class ScalarIndexingEntry : public IndexingEntry { +class ScalarFieldIndexing : public FieldIndexing { public: - using IndexingEntry::IndexingEntry; + using FieldIndexing::FieldIndexing; void BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) override; // concurrent knowhere::scalar::StructuredIndex<T>* - get_indexing(int64_t chunk_id) const override { + get_chunk_indexing(int64_t chunk_id) const override { Assert(!field_meta_.is_vector()); return data_.at(chunk_id).get(); } @@ -74,16 +74,16 @@ class ScalarIndexingEntry : public IndexingEntry { tbb::concurrent_vector<std::unique_ptr<knowhere::scalar::StructuredIndex<T>>> data_; }; -class VecIndexingEntry : public IndexingEntry { +class VectorFieldIndexing : public FieldIndexing { public: - using IndexingEntry::IndexingEntry; + using FieldIndexing::FieldIndexing; void BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) override; // concurrent knowhere::VecIndex* - get_indexing(int64_t chunk_id) const override { + get_chunk_indexing(int64_t chunk_id) const override { Assert(field_meta_.is_vector()); return data_.at(chunk_id).get(); } @@ -97,12 +97,13 @@ class VecIndexingEntry : public IndexingEntry { tbb::concurrent_vector<std::unique_ptr<knowhere::VecIndex>> data_; }; -std::unique_ptr<IndexingEntry> -CreateIndex(const FieldMeta& field_meta, int64_t chunk_size); +std::unique_ptr<FieldIndexing> +CreateIndex(const FieldMeta& field_meta, int64_t size_per_chunk); class IndexingRecord { public: - explicit IndexingRecord(const Schema& schema, int64_t chunk_size) : schema_(schema), chunk_size_(chunk_size) { + explicit IndexingRecord(const Schema& schema, int64_t size_per_chunk) + : schema_(schema), size_per_chunk_(size_per_chunk) { Initialize(); } @@ -111,7 +112,7 @@ class IndexingRecord { int offset = 0; for (auto& field : schema_) { if (field.get_data_type() != DataType::VECTOR_BINARY) { - entries_.try_emplace(FieldOffset(offset), CreateIndex(field, chunk_size_)); + field_indexings_.try_emplace(FieldOffset(offset), CreateIndex(field, size_per_chunk_)); } ++offset; } @@ -128,24 +129,24 @@ class IndexingRecord { return finished_ack_.GetAck(); } - const IndexingEntry& - get_entry(FieldOffset field_offset) const { - assert(entries_.count(field_offset)); - return *entries_.at(field_offset); + const FieldIndexing& + get_field_indexing(FieldOffset field_offset) const { + assert(field_indexings_.count(field_offset)); + return *field_indexings_.at(field_offset); } - const VecIndexingEntry& - get_vec_entry(FieldOffset field_offset) const { - auto& entry = get_entry(field_offset); - auto ptr = dynamic_cast<const VecIndexingEntry*>(&entry); + const VectorFieldIndexing& + get_vec_field_indexing(FieldOffset field_offset) const { + auto& field_indexing = get_field_indexing(field_offset); + auto ptr = dynamic_cast<const VectorFieldIndexing*>(&field_indexing); AssertInfo(ptr, "invalid indexing"); return *ptr; } template <typename T> auto - get_scalar_entry(FieldOffset field_offset) const -> const ScalarIndexingEntry<T>& { - auto& entry = get_entry(field_offset); - auto ptr = dynamic_cast<const ScalarIndexingEntry<T>*>(&entry); + get_scalar_field_indexing(FieldOffset field_offset) const -> const ScalarFieldIndexing<T>& { + auto& entry = get_field_indexing(field_offset); + auto ptr = dynamic_cast<const ScalarFieldIndexing<T>*>(&entry); AssertInfo(ptr, "invalid indexing"); return *ptr; } @@ -159,11 +160,11 @@ class IndexingRecord { // std::atomic<int64_t> finished_ack_ = 0; AckResponder finished_ack_; std::mutex mutex_; - int64_t chunk_size_; + int64_t size_per_chunk_; private: // field_offset => indexing - std::map<FieldOffset, std::unique_ptr<IndexingEntry>> entries_; + std::map<FieldOffset, std::unique_ptr<FieldIndexing>> field_indexings_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/InsertRecord.cpp b/internal/core/src/segcore/InsertRecord.cpp index 66bd9f8995ce88ccb314d7bf2a6018e0265f22df..460c1e6f4fe3bd5bbe96e11cd62926e8b491f655 100644 --- a/internal/core/src/segcore/InsertRecord.cpp +++ b/internal/core/src/segcore/InsertRecord.cpp @@ -13,14 +13,14 @@ namespace milvus::segcore { -InsertRecord::InsertRecord(const Schema& schema, int64_t chunk_size) : uids_(1), timestamps_(1) { +InsertRecord::InsertRecord(const Schema& schema, int64_t size_per_chunk) : uids_(1), timestamps_(1) { for (auto& field : schema) { if (field.is_vector()) { if (field.get_data_type() == DataType::VECTOR_FLOAT) { - this->insert_entity<FloatVector>(field.get_dim(), chunk_size); + this->append_field_data<FloatVector>(field.get_dim(), size_per_chunk); continue; } else if (field.get_data_type() == DataType::VECTOR_BINARY) { - this->insert_entity<BinaryVector>(field.get_dim(), chunk_size); + this->append_field_data<BinaryVector>(field.get_dim(), size_per_chunk); continue; } else { PanicInfo("unsupported"); @@ -28,34 +28,34 @@ InsertRecord::InsertRecord(const Schema& schema, int64_t chunk_size) : uids_(1), } switch (field.get_data_type()) { case DataType::BOOL: { - this->insert_entity<bool>(chunk_size); + this->append_field_data<bool>(size_per_chunk); break; } case DataType::INT8: { - this->insert_entity<int8_t>(chunk_size); + this->append_field_data<int8_t>(size_per_chunk); break; } case DataType::INT16: { - this->insert_entity<int16_t>(chunk_size); + this->append_field_data<int16_t>(size_per_chunk); break; } case DataType::INT32: { - this->insert_entity<int32_t>(chunk_size); + this->append_field_data<int32_t>(size_per_chunk); break; } case DataType::INT64: { - this->insert_entity<int64_t>(chunk_size); + this->append_field_data<int64_t>(size_per_chunk); break; } case DataType::FLOAT: { - this->insert_entity<float>(chunk_size); + this->append_field_data<float>(size_per_chunk); break; } case DataType::DOUBLE: { - this->insert_entity<double>(chunk_size); + this->append_field_data<double>(size_per_chunk); break; } default: { diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index dbb649bc67b3da87be696443982c8a99aa305f2b..c59e4a685e32e6cd3f7839028c7e5053cf317585 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -24,47 +24,53 @@ struct InsertRecord { ConcurrentVector<Timestamp> timestamps_; ConcurrentVector<idx_t> uids_; - explicit InsertRecord(const Schema& schema, int64_t chunk_size); + explicit InsertRecord(const Schema& schema, int64_t size_per_chunk); + // get field data without knowing the type + // return VectorBase type auto - get_base_entity(FieldOffset field_offset) const { - auto ptr = entity_vec_[field_offset.get()].get(); + get_field_data_base(FieldOffset field_offset) const { + auto ptr = field_datas_[field_offset.get()].get(); return ptr; } + // get field data in given type, const version template <typename Type> auto - get_entity(FieldOffset field_offset) const { - auto base_ptr = get_base_entity(field_offset); + get_field_data(FieldOffset field_offset) const { + auto base_ptr = get_field_data_base(field_offset); auto ptr = dynamic_cast<const ConcurrentVector<Type>*>(base_ptr); Assert(ptr); return ptr; } + // get field data in given type, nonconst version template <typename Type> auto - get_entity(FieldOffset field_offset) { - auto base_ptr = get_base_entity(field_offset); + get_field_data(FieldOffset field_offset) { + auto base_ptr = get_field_data_base(field_offset); auto ptr = dynamic_cast<ConcurrentVector<Type>*>(base_ptr); Assert(ptr); return ptr; } + // append a column of scalar type template <typename Type> void - insert_entity(int64_t chunk_size) { + append_field_data(int64_t size_per_chunk) { static_assert(std::is_fundamental_v<Type>); - entity_vec_.emplace_back(std::make_unique<ConcurrentVector<Type>>(chunk_size)); + field_datas_.emplace_back(std::make_unique<ConcurrentVector<Type>>(size_per_chunk)); } + // append a column of vector type template <typename VectorType> void - insert_entity(int64_t dim, int64_t chunk_size) { + append_field_data(int64_t dim, int64_t size_per_chunk) { static_assert(std::is_base_of_v<VectorTrait, VectorType>); - entity_vec_.emplace_back(std::make_unique<ConcurrentVector<VectorType>>(dim, chunk_size)); + field_datas_.emplace_back(std::make_unique<ConcurrentVector<VectorType>>(dim, size_per_chunk)); } private: - std::vector<std::unique_ptr<VectorBase>> entity_vec_; + std::vector<std::unique_ptr<VectorBase>> field_datas_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SealedIndexingRecord.h b/internal/core/src/segcore/SealedIndexingRecord.h index edd6d85e0bf391a0556bb16c170644c6c24845c1..07607bbd2d97317a9d3daee6f1062172a9453aca 100644 --- a/internal/core/src/segcore/SealedIndexingRecord.h +++ b/internal/core/src/segcore/SealedIndexingRecord.h @@ -31,30 +31,30 @@ using SealedIndexingEntryPtr = std::unique_ptr<SealedIndexingEntry>; struct SealedIndexingRecord { void - add_entry(FieldOffset field_offset, MetricType metric_type, knowhere::VecIndexPtr indexing) { + append_field_indexing(FieldOffset field_offset, MetricType metric_type, knowhere::VecIndexPtr indexing) { auto ptr = std::make_unique<SealedIndexingEntry>(); ptr->indexing_ = indexing; ptr->metric_type_ = metric_type; std::unique_lock lck(mutex_); - entries_[field_offset] = std::move(ptr); + field_indexings_[field_offset] = std::move(ptr); } const SealedIndexingEntry* - get_entry(FieldOffset field_offset) const { + get_field_indexing(FieldOffset field_offset) const { std::shared_lock lck(mutex_); - AssertInfo(entries_.count(field_offset), "field_offset not found"); - return entries_.at(field_offset).get(); + AssertInfo(field_indexings_.count(field_offset), "field_offset not found"); + return field_indexings_.at(field_offset).get(); } bool is_ready(FieldOffset field_offset) const { std::shared_lock lck(mutex_); - return entries_.count(field_offset); + return field_indexings_.count(field_offset); } private: // field_offset -> SealedIndexingEntry - std::map<FieldOffset, SealedIndexingEntryPtr> entries_; + std::map<FieldOffset, SealedIndexingEntryPtr> field_indexings_; mutable std::shared_mutex mutex_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowing.cpp b/internal/core/src/segcore/SegmentGrowing.cpp index 065f7464aca967711920f73d4b18bd4991276f0d..0d51d8abb26fbc69398db419ef31e721c1b33214 100644 --- a/internal/core/src/segcore/SegmentGrowing.cpp +++ b/internal/core/src/segcore/SegmentGrowing.cpp @@ -20,8 +20,8 @@ TestABI() { } std::unique_ptr<SegmentGrowing> -CreateGrowingSegment(SchemaPtr schema, int64_t chunk_size) { - auto segment = std::make_unique<SegmentGrowingImpl>(schema, chunk_size); +CreateGrowingSegment(SchemaPtr schema, int64_t size_per_chunk) { + auto segment = std::make_unique<SegmentGrowingImpl>(schema, size_per_chunk); return segment; } diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index 90ffd57d4fda7027349309cfd584556fbaf0890e..fa8307c88e391084ac55e1f7ec18b7ea582aa779 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -80,7 +80,7 @@ class SegmentGrowing : public SegmentInternalInterface { using SegmentGrowingPtr = std::unique_ptr<SegmentGrowing>; SegmentGrowingPtr -CreateGrowingSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024); +CreateGrowingSegment(SchemaPtr schema, int64_t size_per_chunk = 32 * 1024); } // namespace segcore } // namespace milvus diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index a1f965285b9de1bbe18ebabcb40604ca4db4e921..d118c8804c9ab0fa04f2ba1ec64871ccd5e8fcef 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -170,7 +170,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_begin, record_.uids_.set_data(reserved_begin, uids.data(), size); for (int fid = 0; fid < schema_->size(); ++fid) { auto field_offset = FieldOffset(fid); - record_.get_base_entity(field_offset)->set_data_raw(reserved_begin, entities[fid].data(), size); + record_.get_field_data_base(field_offset)->set_data_raw(reserved_begin, entities[fid].data(), size); } for (int i = 0; i < uids.size(); ++i) { @@ -180,7 +180,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_begin, } record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); - indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / chunk_size_, record_); + indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_); return Status::OK(); } @@ -231,9 +231,9 @@ SegmentGrowingImpl::Close() { int64_t SegmentGrowingImpl::GetMemoryUsageInBytes() const { int64_t total_bytes = 0; - int64_t ins_n = upper_align(record_.reserved, chunk_size_); + int64_t ins_n = upper_align(record_.reserved, size_per_chunk_); total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1); - int64_t del_n = upper_align(deleted_record_.reserved, chunk_size_); + int64_t del_n = upper_align(deleted_record_.reserved, size_per_chunk_); total_bytes += del_n * (16 * 2); return total_bytes; } @@ -245,20 +245,20 @@ SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) { Assert(info.index_params.count("metric_type")); auto metric_type_str = info.index_params.at("metric_type"); - sealed_indexing_record_.add_entry(field_offset, GetMetricType(metric_type_str), info.index); + sealed_indexing_record_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index); return Status::OK(); } SpanBase SegmentGrowingImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const { - auto vec = get_insert_record().get_base_entity(field_offset); + auto vec = get_insert_record().get_field_data_base(field_offset); return vec->get_span_base(chunk_id); } int64_t -SegmentGrowingImpl::num_chunk_data() const { +SegmentGrowingImpl::num_chunk() const { auto size = get_insert_record().ack_responder_.GetAck(); - return upper_div(size, chunk_size_); + return upper_div(size, size_per_chunk_); } void SegmentGrowingImpl::vector_search(int64_t vec_count, diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 6a54e05be7fac8c9ff852fb229be56c4c124f3e6..9776936c7c1088c8ae826457dd75e177695e8246 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -27,7 +27,7 @@ #include "utils/Status.h" #include "segcore/DeletedRecord.h" #include "utils/EasyAssert.h" -#include "IndexingEntry.h" +#include "FieldIndexing.h" #include "InsertRecord.h" #include <utility> #include <memory> @@ -89,18 +89,18 @@ class SegmentGrowingImpl : public SegmentGrowing { // return count of index that has index, i.e., [0, num_chunk_index) have built index int64_t - num_chunk_index_safe(FieldOffset field_offset) const final { + num_chunk_index(FieldOffset field_offset) const final { return indexing_record_.get_finished_ack(); } const knowhere::Index* chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const final { - return indexing_record_.get_entry(field_offset).get_indexing(chunk_id); + return indexing_record_.get_field_indexing(field_offset).get_chunk_indexing(chunk_id); } int64_t size_per_chunk() const final { - return chunk_size_; + return size_per_chunk_; } public: @@ -152,27 +152,27 @@ class SegmentGrowingImpl : public SegmentGrowing { void bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override { // TODO: support more types - auto vec_ptr = record_.get_base_entity(field_offset); + auto vec_ptr = record_.get_field_data_base(field_offset); auto data_type = schema_->operator[](field_offset).get_data_type(); Assert(data_type == DataType::INT64); bulk_subscript_impl<int64_t>(*vec_ptr, seg_offsets, count, output); } int64_t - num_chunk_data() const override; + num_chunk() const override; Status LoadIndexing(const LoadIndexInfo& info) override; public: friend std::unique_ptr<SegmentGrowing> - CreateGrowingSegment(SchemaPtr schema, int64_t chunk_size); + CreateGrowingSegment(SchemaPtr schema, int64_t size_per_chunk); - explicit SegmentGrowingImpl(SchemaPtr schema, int64_t chunk_size) - : chunk_size_(chunk_size), + explicit SegmentGrowingImpl(SchemaPtr schema, int64_t size_per_chunk) + : size_per_chunk_(size_per_chunk), schema_(std::move(schema)), - record_(*schema_, chunk_size), - indexing_record_(*schema_, chunk_size) { + record_(*schema_, size_per_chunk), + indexing_record_(*schema_, size_per_chunk) { } void @@ -192,7 +192,7 @@ class SegmentGrowingImpl : public SegmentGrowing { chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const override; private: - int64_t chunk_size_; + int64_t size_per_chunk_; SchemaPtr schema_; std::atomic<SegmentState> state_ = SegmentState::Open; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index a6f0668e3eee8f0e93447748faf6a64470beddf2..b68a9c99352875d01cce717fefabe2029841baa0 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -14,15 +14,18 @@ #include "common/Schema.h" #include "query/Plan.h" #include "common/Span.h" -#include "IndexingEntry.h" +#include "FieldIndexing.h" #include <knowhere/index/vector_index/VecIndex.h> #include "common/SystemProperty.h" #include "query/PlanNode.h" namespace milvus::segcore { +// common interface of SegmentSealed and SegmentGrowing +// used by C API class SegmentInterface { public: + // fill results according to target_entries in plan void FillTargetEntry(const query::Plan* plan, QueryResult& results) const; @@ -44,14 +47,17 @@ class SegmentInterface { virtual ~SegmentInterface() = default; protected: + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type virtual void bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to field_offset virtual void bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; }; // internal API for DSL calculation +// only for implementation class SegmentInternalInterface : public SegmentInterface { public: template <typename T> @@ -80,21 +86,24 @@ class SegmentInternalInterface : public SegmentInterface { const BitsetView& bitset, QueryResult& output) const = 0; + // count of chunk that has index available virtual int64_t - num_chunk_index_safe(FieldOffset field_offset) const = 0; + num_chunk_index(FieldOffset field_offset) const = 0; + // count of chunks virtual int64_t - num_chunk_data() const = 0; + num_chunk() const = 0; - // return chunk_size for each chunk, renaming against confusion + // element size in each chunk virtual int64_t size_per_chunk() const = 0; protected: - // blob and row_count + // internal API: return chunk_data in span virtual SpanBase chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; + // internal API: return chunk_index in span, support scalar index only virtual const knowhere::Index* chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const = 0; }; diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 6114ecbaa88700e49505214fb4c69789c65b085c..97104bdb670240ba9e7c8c8e89eb56c4c823fe06 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -27,6 +27,6 @@ class SegmentSealed : public SegmentInternalInterface { using SegmentSealedPtr = std::unique_ptr<SegmentSealed>; SegmentSealedPtr -CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024); +CreateSealedSegment(SchemaPtr schema, int64_t size_per_chunk = 32 * 1024); } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index a0ef3a7723b65807d92c6449062b67d739b0b89a..49935905d5dd2527d61326f8509c26dd931798b1 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -29,7 +29,7 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { row_count_opt_ = row_count; } Assert(!vec_indexings_.is_ready(field_offset)); - vec_indexings_.add_entry(field_offset, GetMetricType(metric_type_str), info.index); + vec_indexings_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index); ++ready_count_; } @@ -77,13 +77,13 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { } int64_t -SegmentSealedImpl::num_chunk_index_safe(FieldOffset field_offset) const { +SegmentSealedImpl::num_chunk_index(FieldOffset field_offset) const { // TODO: support scalar index return 0; } int64_t -SegmentSealedImpl::num_chunk_data() const { +SegmentSealedImpl::num_chunk() const { return 1; } @@ -142,7 +142,7 @@ SegmentSealedImpl::vector_search(int64_t vec_count, } SegmentSealedPtr -CreateSealedSegment(SchemaPtr schema, int64_t chunk_size) { +CreateSealedSegment(SchemaPtr schema, int64_t size_per_chunk) { return std::make_unique<SegmentSealedImpl>(schema); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index f901e36679810a8cfc14e0064207ea1979a1bb33..5871fe67bce7c4b64936e1f3ec1c6fe1c6e4296d 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -37,12 +37,12 @@ class SegmentSealedImpl : public SegmentSealed { public: int64_t - num_chunk_index_safe(FieldOffset field_offset) const override; + num_chunk_index(FieldOffset field_offset) const override; int64_t - num_chunk_data() const override; + num_chunk() const override; - // return chunk_size for each chunk, renaming against confusion + // return size_per_chunk for each chunk, renaming against confusion int64_t size_per_chunk() const override; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index a8b3ead9806e93ba9f55bd68ab304f72ca2f69cb..713fdb6b069d56fed77153e2b795b1577a3a300c 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -277,7 +277,7 @@ TEST(Sealed, LoadFieldData) { vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; segment->LoadIndex(vec_info); } - ASSERT_EQ(segment->num_chunk_data(), 1); + ASSERT_EQ(segment->num_chunk(), 1); auto chunk_span1 = segment->chunk_data<int64_t>(FieldOffset(1), 0); auto chunk_span2 = segment->chunk_data<double>(FieldOffset(2), 0); auto ref1 = dataset.get_col<int64_t>(1); diff --git a/internal/core/unittest/test_span.cpp b/internal/core/unittest/test_span.cpp index d77c9baded9f2a69980d940a757ae2b32827c7d0..8c582f344c97e878660f3f3f3899be517d2df32f 100644 --- a/internal/core/unittest/test_span.cpp +++ b/internal/core/unittest/test_span.cpp @@ -19,38 +19,38 @@ TEST(Span, Naive) { using namespace milvus::query; using namespace milvus::segcore; int64_t N = 1000 * 1000; - constexpr int64_t chunk_size = 32 * 1024; + constexpr int64_t size_per_chunk = 32 * 1024; auto schema = std::make_shared<Schema>(); schema->AddDebugField("binaryvec", DataType::VECTOR_BINARY, 512, MetricType::METRIC_Jaccard); schema->AddDebugField("age", DataType::FLOAT); schema->AddDebugField("floatvec", DataType::VECTOR_FLOAT, 32, MetricType::METRIC_L2); auto dataset = DataGen(schema, N); - auto segment = CreateGrowingSegment(schema, chunk_size); + auto segment = CreateGrowingSegment(schema, size_per_chunk); segment->PreInsert(N); segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_); auto vec_ptr = dataset.get_col<uint8_t>(0); auto age_ptr = dataset.get_col<float>(1); auto float_ptr = dataset.get_col<float>(2); SegmentInternalInterface& interface = *segment; - auto num_chunk = interface.num_chunk_data(); - ASSERT_EQ(num_chunk, upper_div(N, chunk_size)); + auto num_chunk = interface.num_chunk(); + ASSERT_EQ(num_chunk, upper_div(N, size_per_chunk)); auto row_count = interface.get_row_count(); ASSERT_EQ(N, row_count); for (auto chunk_id = 0; chunk_id < num_chunk; ++chunk_id) { auto vec_span = interface.chunk_data<BinaryVector>(FieldOffset(0), chunk_id); auto age_span = interface.chunk_data<float>(FieldOffset(1), chunk_id); auto float_span = interface.chunk_data<FloatVector>(FieldOffset(2), chunk_id); - auto begin = chunk_id * chunk_size; - auto end = std::min((chunk_id + 1) * chunk_size, N); - auto chunk_size = end - begin; - for (int i = 0; i < chunk_size * 512 / 8; ++i) { + auto begin = chunk_id * size_per_chunk; + auto end = std::min((chunk_id + 1) * size_per_chunk, N); + auto size_per_chunk = end - begin; + for (int i = 0; i < size_per_chunk * 512 / 8; ++i) { ASSERT_EQ(vec_span.data()[i], vec_ptr[i + begin * 512 / 8]); } - for (int i = 0; i < chunk_size; ++i) { + for (int i = 0; i < size_per_chunk; ++i) { ASSERT_EQ(age_span.data()[i], age_ptr[i + begin]); } - for (int i = 0; i < chunk_size; ++i) { + for (int i = 0; i < size_per_chunk; ++i) { ASSERT_EQ(float_span.data()[i], float_ptr[i + begin * 32]); } }