diff --git a/internal/core/CMakeLists.txt b/internal/core/CMakeLists.txt index e8ae1fc32bd895e9bdb96a6aa39f85b7452bf683..8afcc8ef68e583013750002f00a5078799862580 100644 --- a/internal/core/CMakeLists.txt +++ b/internal/core/CMakeLists.txt @@ -185,7 +185,6 @@ if ( BUILD_UNIT_TEST STREQUAL "ON" ) append_flags( CMAKE_CXX_FLAGS FLAGS "-DELPP_DISABLE_LOGS") add_subdirectory(unittest) - add_subdirectory(bench) endif () add_custom_target( Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean ) diff --git a/internal/core/bench/CMakeLists.txt b/internal/core/bench/CMakeLists.txt deleted file mode 100644 index 8b1b0a13e6e92419cb8fb4df7e8618c4cd7e4fbd..0000000000000000000000000000000000000000 --- a/internal/core/bench/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -include_directories(${CMAKE_HOME_DIRECTORY}/src) -include_directories(${CMAKE_HOME_DIRECTORY}/unittest) -include_directories(${CMAKE_HOME_DIRECTORY}/src/index/knowhere) - -set(bench_srcs - bench_naive.cpp - bench_search.cpp -) - -add_executable(all_bench ${bench_srcs}) -target_link_libraries(all_bench - milvus_segcore - milvus_indexbuilder - log - pthread - ) - -target_link_libraries(all_bench benchmark::benchmark_main) diff --git a/internal/core/bench/bench_naive.cpp b/internal/core/bench/bench_naive.cpp deleted file mode 100644 index 4d1c0152734293a0b8c1b167d23f6ca31fd1d60a..0000000000000000000000000000000000000000 --- a/internal/core/bench/bench_naive.cpp +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include <benchmark/benchmark.h> -#include <string> - -static void -BN_Naive_StringCreation(benchmark::State& state) { - for (auto _ : state) std::string empty_string; -} -// Register the function as a benchmark -BENCHMARK(BN_Naive_StringCreation); - -// Define another benchmark -static void -BN_Naive_StringCopy(benchmark::State& state) { - std::string x = "hello"; - for (auto _ : state) std::string copy(x); -} -BENCHMARK(BN_Naive_StringCopy); diff --git a/internal/core/bench/bench_search.cpp b/internal/core/bench/bench_search.cpp deleted file mode 100644 index cd581e07fc9f3a70d6b6b3d7a5a90bb0618c1768..0000000000000000000000000000000000000000 --- a/internal/core/bench/bench_search.cpp +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include <cstdint> -#include <benchmark/benchmark.h> -#include <string> -#include "segcore/SegmentGrowing.h" -#include "segcore/SegmentSealed.h" -#include "test_utils/DataGen.h" - -using namespace milvus; -using namespace milvus::query; -using namespace milvus::segcore; - -static int dim = 128; -static int64_t N = 1024 * 1024 * 1; - -const auto schema = []() { - auto schema = std::make_shared<Schema>(); - schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, MetricType::METRIC_L2); - return schema; -}(); - -const auto dataset_ = [] { - auto dataset_ = DataGen(schema, N); - return dataset_; -}(); - -const auto plan = [] { - std::string dsl = R"({ - "bool": { - "must": [ - { - "vector": { - "fakevec": { - "metric_type": "L2", - "params": { - "nprobe": 4 - }, - "query": "$0", - "topk": 5 - } - } - } - ] - } - })"; - auto plan = CreatePlan(*schema, dsl); - return plan; -}(); -auto ph_group = [] { - auto num_queries = 5; - auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, 1024); - auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); - return ph_group; -}(); - -static void -Search_SmallIndex(benchmark::State& state) { - // schema->AddDebugField("age", DataType::FLOAT); - - auto is_small_index = state.range(0); - auto chunk_size = state.range(1) * 1024; - auto segment = CreateGrowingSegment(schema, chunk_size); - if (!is_small_index) { - segment->debug_disable_small_index(); - } - segment->PreInsert(N); - ColumnBasedRawData raw_data; - raw_data.columns_ = dataset_.cols_; - raw_data.count = N; - segment->Insert(0, N, dataset_.row_ids_.data(), dataset_.timestamps_.data(), raw_data); - - Timestamp time = 10000000; - std::vector<const PlaceholderGroup*> ph_group_arr = {ph_group.get()}; - - for (auto _ : state) { - auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); - } -} - -BENCHMARK(Search_SmallIndex)->MinTime(5)->ArgsProduct({{true, false}, {8, 16, 32, 64, 128}}); - -static void -Search_Sealed(benchmark::State& state) { - auto segment = CreateSealedSegment(schema); - SealedLoader(dataset_, *segment); - auto choice = state.range(0); - if (choice == 0) { - // Brute Force - } else if (choice == 1) { - // ivf - auto vec = (const float*)dataset_.cols_[0].data(); - auto indexing = GenIndexing(N, dim, vec); - LoadIndexInfo info; - info.index = indexing; - info.index_params["index_type"] = "IVF"; - info.index_params["index_mode"] = "CPU"; - info.index_params["metric_type"] = MetricTypeToName(MetricType::METRIC_L2); - segment->LoadIndex(info); - } - Timestamp time = 10000000; - std::vector<const PlaceholderGroup*> ph_group_arr = {ph_group.get()}; - - for (auto _ : state) { - auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); - } -} - -BENCHMARK(Search_Sealed)->MinTime(5)->Arg(1)->Arg(0); diff --git a/internal/core/build-support/lint_exclusions.txt b/internal/core/build-support/lint_exclusions.txt index 1b0e402f97712ec3209e90ae8fc01f43a3827f52..5514c3eeb745b11e1838a7057e4f2d91be85591c 100644 --- a/internal/core/build-support/lint_exclusions.txt +++ b/internal/core/build-support/lint_exclusions.txt @@ -5,5 +5,4 @@ *src/grpc* *output* *unittest* -*bench* *src/pb* diff --git a/internal/core/run_clang_format.sh b/internal/core/run_clang_format.sh index 8f475584db733d90a1041979f9bc00c83b316394..b5a682639c7541575fbc181c825f13b367b06be7 100755 --- a/internal/core/run_clang_format.sh +++ b/internal/core/run_clang_format.sh @@ -12,6 +12,5 @@ formatThis() { formatThis "${CorePath}/src" formatThis "${CorePath}/unittest" -formatThis "${CorePath}/bench" ${CorePath}/build-support/add_license.sh ${CorePath}/build-support/cpp_license.txt ${CorePath} diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index 6cc5f11b11c0954fa1c154547bdb6fa75962cbf8..02701ed938c4ce51c97b07dea3b5842f54c2abfb 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -50,9 +50,6 @@ class SegmentGrowing : public SegmentInternalInterface { }; public: - virtual void - debug_disable_small_index() = 0; - virtual int64_t PreInsert(int64_t size) = 0; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index b6dd81795718c304e3451ae85cdb24dc4bcb6cc7..e701e461d4fe58d0fa57dc9f67eca29b3d465ef0 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -188,10 +188,9 @@ SegmentGrowingImpl::do_insert(int64_t reserved_begin, // NOTE: this must be the last step, cannot be put above uid2offset_.insert(std::make_pair(row_id, reserved_begin + i)); } + record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); - if (!debug_disable_small_index_) { - indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_); - } + indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_); } Status diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 7a62a8edee6e2e44d9debca35666f4865b207dc7..bf75d6bb08ed45bc05ada888bb1f1fb3705257ec 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -112,11 +112,6 @@ class SegmentGrowingImpl : public SegmentGrowing { } public: - void - debug_disable_small_index() override { - debug_disable_small_index_ = true; - } - ssize_t get_row_count() const override { return record_.ack_responder_.GetAck(); @@ -209,9 +204,6 @@ class SegmentGrowingImpl : public SegmentGrowing { SealedIndexingRecord sealed_indexing_record_; tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_; - - private: - bool debug_disable_small_index_ = false; }; } // namespace milvus::segcore diff --git a/internal/core/thirdparty/CMakeLists.txt b/internal/core/thirdparty/CMakeLists.txt index 787d7e6535271096c6cd532ae70455d23aa8a607..4dfe837381ee089f0f84a06fd0f16dbf19ae31fb 100644 --- a/internal/core/thirdparty/CMakeLists.txt +++ b/internal/core/thirdparty/CMakeLists.txt @@ -41,7 +41,6 @@ find_package( Threads REQUIRED ) # ****************************** Thirdparty googletest *************************************** if ( MILVUS_BUILD_TESTS ) add_subdirectory( gtest ) - add_subdirectory( google_benchmark) endif() diff --git a/internal/core/thirdparty/google_benchmark/CMakeLists.txt b/internal/core/thirdparty/google_benchmark/CMakeLists.txt deleted file mode 100644 index e5406dce8bee94669133f9ead0260b8c3220f96a..0000000000000000000000000000000000000000 --- a/internal/core/thirdparty/google_benchmark/CMakeLists.txt +++ /dev/null @@ -1,21 +0,0 @@ -include(FetchContent) -FetchContent_Declare(google_benchmark - URL https://github.com/google/benchmark/archive/v1.5.2.tar.gz - URL_MD5 084b34aceaeac11a6607d35220ca2efa - DOWNLOAD_DIR ${THIRDPARTY_DOWNLOAD_PATH} - SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/google_benchmark - BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/google_benchmark - ) - -FetchContent_GetProperties( google_benchmark ) -if ( NOT google_benchmark_POPULATED ) - - FetchContent_Populate( google_benchmark ) - - # Adding the following targets: - # gtest, gtest_main, gmock, gmock_main - message("gb=${google_benchmark_SOURCE_DIR}") - add_subdirectory( ${google_benchmark_SOURCE_DIR} - ${google_benchmark_BINARY_DIR} - EXCLUDE_FROM_ALL ) -endif() diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 9ebaa690d3a7db00a32e7fb6059ca2911a62c066..c74274d5b5f99aef63a9303e7fbe8c78ffc91c9e 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -9,12 +9,12 @@ import ( "sync" "time" - "github.com/zilliztech/milvus-distributed/internal/log" - "go.uber.org/zap" - "github.com/apache/pulsar-client-go/pulsar" "github.com/golang/protobuf/proto" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -475,6 +475,7 @@ type PulsarTtMsgStream struct { unsolvedBuf map[Consumer][]TsMsg unsolvedMutex *sync.Mutex lastTimeStamp Timestamp + syncConsumer chan int } func newPulsarTtMsgStream(ctx context.Context, @@ -487,11 +488,13 @@ func newPulsarTtMsgStream(ctx context.Context, return nil, err } unsolvedBuf := make(map[Consumer][]TsMsg) + syncConsumer := make(chan int, 1) return &PulsarTtMsgStream{ PulsarMsgStream: *pulsarMsgStream, unsolvedBuf: unsolvedBuf, unsolvedMutex: &sync.Mutex{}, + syncConsumer: syncConsumer, }, nil } @@ -515,6 +518,9 @@ func (ms *PulsarTtMsgStream) AsConsumer(channels []string, } ms.consumerLock.Lock() + if len(ms.consumers) == 0 { + ms.syncConsumer <- 1 + } ms.consumers = append(ms.consumers, pc) ms.unsolvedBuf[pc] = make([]TsMsg, 0) ms.consumerChannels = append(ms.consumerChannels, channels[i]) @@ -536,12 +542,37 @@ func (ms *PulsarTtMsgStream) Start() { } } +func (ms *PulsarTtMsgStream) Close() { + ms.streamCancel() + close(ms.syncConsumer) + ms.wait.Wait() + + for _, producer := range ms.producers { + if producer != nil { + producer.Close() + } + } + for _, consumer := range ms.consumers { + if consumer != nil { + consumer.Close() + } + } + if ms.client != nil { + ms.client.Close() + } +} + func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() ms.unsolvedBuf = make(map[Consumer][]TsMsg) isChannelReady := make(map[Consumer]bool) eofMsgTimeStamp := make(map[Consumer]Timestamp) + if _, ok := <-ms.syncConsumer; !ok { + log.Debug("consumer closed!") + return + } + for { select { case <-ms.ctx.Done():