diff --git a/internal/core/src/indexbuilder/IndexWrapper.cpp b/internal/core/src/indexbuilder/IndexWrapper.cpp index cb9c6584dcfc3c28c76112346ea12072a0086940..40f2f686d3db7549a1a9d29eac5faa6898e6915e 100644 --- a/internal/core/src/indexbuilder/IndexWrapper.cpp +++ b/internal/core/src/indexbuilder/IndexWrapper.cpp @@ -214,8 +214,9 @@ IndexWrapper::StoreRawData(const knowhere::DatasetPtr& dataset) { /* * brief Return serialized binary set + * TODO: use a more efficient method to manage memory, consider std::vector later */ -milvus::indexbuilder::IndexWrapper::Binary +std::unique_ptr<IndexWrapper::Binary> IndexWrapper::Serialize() { auto binarySet = index_->Serialize(config_); auto index_type = get_index_type(); @@ -238,10 +239,11 @@ IndexWrapper::Serialize() { auto ok = ret.SerializeToString(&serialized_data); Assert(ok); - auto data = new char[serialized_data.length()]; - memcpy(data, serialized_data.c_str(), serialized_data.length()); + auto binary = std::make_unique<IndexWrapper::Binary>(); + binary->data.resize(serialized_data.length()); + memcpy(binary->data.data(), serialized_data.c_str(), serialized_data.length()); - return {data, static_cast<int32_t>(serialized_data.length())}; + return binary; } void diff --git a/internal/core/src/indexbuilder/IndexWrapper.h b/internal/core/src/indexbuilder/IndexWrapper.h index 8bf2ed881c8a7e2ee4f86cdb88da7200e47ebfbe..90e1486fef84a6c8e5d5ee1689514ee725a33d92 100644 --- a/internal/core/src/indexbuilder/IndexWrapper.h +++ b/internal/core/src/indexbuilder/IndexWrapper.h @@ -29,11 +29,10 @@ class IndexWrapper { BuildWithoutIds(const knowhere::DatasetPtr& dataset); struct Binary { - char* data; - int32_t size; + std::vector<char> data; }; - Binary + std::unique_ptr<Binary> Serialize(); void diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 388aeca36bb4eb5e0535bef7b1a718b15fcfab47..43cb596273c6e03ab2d5f34661c820cd29cdffae 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -85,13 +85,12 @@ BuildBinaryVecIndexWithoutIds(CIndex index, int64_t data_size, const uint8_t* ve } CStatus -SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer) { +SerializeToSlicedBuffer(CIndex index, CBinary* c_binary) { auto status = CStatus(); try { auto cIndex = (milvus::indexbuilder::IndexWrapper*)index; auto binary = cIndex->Serialize(); - *buffer_size = binary.size; - *res_buffer = binary.data; + *c_binary = binary.release(); status.error_code = Success; status.error_msg = ""; } catch (std::exception& e) { @@ -101,6 +100,25 @@ SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer) { return status; } +int64_t +GetCBinarySize(CBinary c_binary) { + auto cBinary = (milvus::indexbuilder::IndexWrapper::Binary*)c_binary; + return cBinary->data.size(); +} + +// Note: the memory of data is allocated outside +void +GetCBinaryData(CBinary c_binary, void* data) { + auto cBinary = (milvus::indexbuilder::IndexWrapper::Binary*)c_binary; + memcpy(data, cBinary->data.data(), cBinary->data.size()); +} + +void +DeleteCBinary(CBinary c_binary) { + auto cBinary = (milvus::indexbuilder::IndexWrapper::Binary*)c_binary; + delete cBinary; +} + CStatus LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, int32_t size) { auto status = CStatus(); @@ -265,3 +283,8 @@ DeleteIndexQueryResult(CIndexQueryResult res) { } return status; } + +void +DeleteByteArray(const char* array) { + delete[] array; +} diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index 18ec83389fd28ff07a88ff8d19c0c6373b84d90a..c028378a52299a99aaeeafc0b9b222d414cd2411 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -32,6 +32,7 @@ extern "C" { typedef void* CIndex; typedef void* CIndexQueryResult; +typedef void* CBinary; // TODO: how could we pass map between go and c++ more efficiently? // Solution: using protobuf instead of json, this way significantly increase programming efficiency @@ -49,7 +50,17 @@ CStatus BuildBinaryVecIndexWithoutIds(CIndex index, int64_t data_size, const uint8_t* vectors); CStatus -SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer); +SerializeToSlicedBuffer(CIndex index, CBinary* c_binary); + +int64_t +GetCBinarySize(CBinary c_binary); + +// Note: the memory of data is allocated outside +void +GetCBinaryData(CBinary c_binary, void* data); + +void +DeleteCBinary(CBinary c_binary); CStatus LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, int32_t size); @@ -92,6 +103,9 @@ GetDistancesOfQueryResult(CIndexQueryResult res, float* distances); CStatus DeleteIndexQueryResult(CIndexQueryResult res); +void +DeleteByteArray(const char* array); + #ifdef __cplusplus }; #endif diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 20b6fafadb3a5991b485700eaf862a733f4e4661..8229c30e4b039ec491d925c0400eb22fda86efd4 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -24,19 +24,20 @@ add_executable(all_tests ${MILVUS_TEST_FILES} ) -set(INDEX_BUILDER_TEST_FILES - test_index_wrapper.cpp) -add_executable(index_builder_test - ${INDEX_BUILDER_TEST_FILES} - ) -target_link_libraries(index_builder_test - gtest - gtest_main - milvus_segcore - milvus_indexbuilder - log - pthread - ) +# check if memory leak exists in index builder +# set(INDEX_BUILDER_TEST_FILES +# test_index_wrapper.cpp) +# add_executable(index_builder_test +# ${INDEX_BUILDER_TEST_FILES} +# ) +# target_link_libraries(index_builder_test +# gtest +# gtest_main +# milvus_segcore +# milvus_indexbuilder +# log +# pthread +# ) target_link_libraries(all_tests gtest diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index fad216093fb64c931d71c12a9970c92407bb2b66..99daa7d419fde3b1789eaf3f8bfe1e03399efc66 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -556,7 +556,7 @@ TEST(IVFFLATNMWrapper, Codec) { auto binary = index->Serialize(); auto copy_index = std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str()); - ASSERT_NO_THROW(copy_index->Load(binary.data, binary.size)); + ASSERT_NO_THROW(copy_index->Load(binary->data.data(), binary->data.size())); ASSERT_EQ(copy_index->dim(), copy_index->dim()); auto copy_binary = copy_index->Serialize(); } @@ -657,13 +657,13 @@ TEST_P(IndexWrapperTest, Codec) { auto binary = index->Serialize(); auto copy_index = std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str()); - ASSERT_NO_THROW(copy_index->Load(binary.data, binary.size)); + ASSERT_NO_THROW(copy_index->Load(binary->data.data(), binary->data.size())); ASSERT_EQ(copy_index->dim(), copy_index->dim()); auto copy_binary = copy_index->Serialize(); if (!milvus::indexbuilder::is_in_nm_list(index_type)) { // binary may be not same due to uncertain internal map order - ASSERT_EQ(binary.size, copy_binary.size); - ASSERT_EQ(strcmp(binary.data, copy_binary.data), 0); + ASSERT_EQ(binary->data.size(), copy_binary->data.size()); + ASSERT_EQ(binary->data, copy_binary->data); } } diff --git a/internal/indexnode/index.go b/internal/indexnode/index.go index a703db9cbb36391295f429c2b993ca22d00da1fd..c70f05d024e23ebc5b2353446ae6bcc453027788 100644 --- a/internal/indexnode/index.go +++ b/internal/indexnode/index.go @@ -130,14 +130,10 @@ type CIndex struct { } func (index *CIndex) Serialize() ([]*Blob, error) { - /* - CStatus - SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer); - */ + var cBinary C.CBinary + defer C.DeleteCBinary(cBinary) - var cDumpedSlicedBuffer *C.char - var bufferSize int32 - status := C.SerializeToSlicedBuffer(index.indexPtr, (*C.int32_t)(unsafe.Pointer(&bufferSize)), &cDumpedSlicedBuffer) + status := C.SerializeToSlicedBuffer(index.indexPtr, &cBinary) errorCode := status.error_code if errorCode != 0 { errorMsg := C.GoString(status.error_msg) @@ -145,11 +141,12 @@ func (index *CIndex) Serialize() ([]*Blob, error) { return nil, errors.New("SerializeToSlicedBuffer failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } - defer C.free(unsafe.Pointer(cDumpedSlicedBuffer)) + binarySize := C.GetCBinarySize(cBinary) + binaryData := make([]byte, binarySize) + C.GetCBinaryData(cBinary, unsafe.Pointer(&binaryData[0])) - dumpedSlicedBuffer := C.GoBytes(unsafe.Pointer(cDumpedSlicedBuffer), (C.int32_t)(bufferSize)) var blobs indexcgopb.BinarySet - err := proto.Unmarshal(dumpedSlicedBuffer, &blobs) + err := proto.Unmarshal(binaryData, &blobs) if err != nil { return nil, err } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 258213f796d86f5624680c8cca74132e257f1232..ac19c3a67ca80d51d14f362bfd11842e01a35e6d 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -195,6 +195,12 @@ func (it *IndexBuildTask) Execute() error { fmt.Println("NewCIndex err:", err.Error()) return err } + defer func() { + err = it.index.Delete() + if err != nil { + log.Print("CIndexDelete Failed") + } + }() getKeyByPathNaive := func(path string) string { // splitElements := strings.Split(path, "/") @@ -301,9 +307,9 @@ func (it *IndexBuildTask) Execute() error { it.savePaths = append(it.savePaths, savePath) } } - err = it.index.Delete() - if err != nil { - log.Print("CIndexDelete Failed") - } + // err = it.index.Delete() + // if err != nil { + // log.Print("CIndexDelete Failed") + // } return nil }