From 9618bd9b42ccf3c04fd41a5744f397f75aa06749 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 26 Jul 2023 17:35:01 +0800 Subject: [PATCH] Set channel capacity before consuming it (#25895) Signed-off-by: yah01 --- internal/core/src/index/VectorMemIndex.cpp | 16 ++++++++++- .../core/src/segcore/SegmentSealedImpl.cpp | 4 +++ internal/core/src/segcore/Utils.cpp | 4 --- .../core/src/storage/MemFileManagerImpl.cpp | 3 -- internal/core/unittest/test_indexing.cpp | 28 ++++++++----------- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 2f65babecef1f..6f20d916dd2e4 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -33,6 +33,7 @@ #include "common/Consts.h" #include "common/RangeSearchHelper.h" #include "common/Utils.h" +#include "log/Log.h" #include "storage/FieldData.h" #include "storage/MemFileManagerImpl.h" #include "storage/ThreadPool.h" @@ -103,12 +104,17 @@ VectorMemIndex::Load(const Config& config) { AssertInfo(index_files.has_value(), "index file paths is empty when load index"); + auto parallel_degree = + static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + std::map channels; for (const auto& file : index_files.value()) { auto key = file.substr(file.find_last_of('/') + 1); + LOG_SEGCORE_INFO_ << "loading index file " << key; if (channels.find(key) == channels.end()) { channels.emplace(std::move(key), - std::make_shared()); + std::make_shared( + parallel_degree * 2)); } } @@ -116,17 +122,25 @@ VectorMemIndex::Load(const Config& config) { auto future = pool.Submit( [&] { file_manager_->LoadFileStream(index_files.value(), channels); }); + LOG_SEGCORE_INFO_ << "assemble index data..."; std::unordered_map result; AssembleIndexDatas(channels, result); + LOG_SEGCORE_INFO_ << "assemble index data done"; + + LOG_SEGCORE_INFO_ << "construct binary set..."; BinarySet binary_set; for (auto& [key, data] : result) { + LOG_SEGCORE_INFO_ << "add index data to binary set: " << key; auto size = data->Size(); auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction auto buf = std::shared_ptr( (uint8_t*)const_cast(data->Data()), deleter); binary_set.Append(key, buf, size); } + + LOG_SEGCORE_INFO_ << "load index into Knowhere..."; LoadWithoutAssemble(binary_set, config); + LOG_SEGCORE_INFO_ << "load vector index done"; } void diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 94789843ce9be..cefa942946c4c 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -182,6 +182,10 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { auto field_data_info = FieldDataInfo(field_id.get(), num_rows, load_info.mmap_dir_path); + auto parallel_degree = static_cast( + DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + field_data_info.channel->set_capacity(parallel_degree * 2); + auto& pool = ThreadPool::GetInstance(); auto load_future = pool.Submit( LoadFieldDatasFromRemote, insert_files, field_data_info.channel); diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 03fed4a88db7d..2efd1ba4537af 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -555,10 +555,6 @@ LoadFieldDatasFromRemote(std::vector& remote_files, auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); - // set the capacity to 2x parallel_degree, so the memory usage will not be greater than 2x DEFAULT_FIELD_MAX_MEMORY_LIMIT, - // which is 128 MiB - channel->set_capacity(parallel_degree * 2); - auto rcm = storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); std::sort(remote_files.begin(), diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp index 9a028780444de..ddb4dfe2e781c 100644 --- a/internal/core/src/storage/MemFileManagerImpl.cpp +++ b/internal/core/src/storage/MemFileManagerImpl.cpp @@ -125,9 +125,6 @@ MemFileManagerImpl::LoadFileStream( std::map& channels) { auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); - for (auto& [_, channel] : channels) { - channel->set_capacity(parallel_degree * 2); - } std::vector batch_files; auto LoadBatchIndexFiles = [&]() { diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index 0ada8378c3263..0adf4ace142f0 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -386,27 +386,21 @@ TEST_P(IndexTest, BuildAndQuery) { milvus::index::IndexBasePtr new_index; milvus::index::VectorIndex* vec_index = nullptr; - if (index_type == knowhere::IndexEnum::INDEX_DISKANN) { - // TODO ::diskann.query need load first, ugly - auto binary_set = index->Serialize(milvus::Config{}); - index.reset(); + auto binary_set = index->Upload(); + index.reset(); - new_index = milvus::index::IndexFactory::GetInstance().CreateIndex( - create_index_info, file_manager); - vec_index = dynamic_cast(new_index.get()); + new_index = milvus::index::IndexFactory::GetInstance().CreateIndex( + create_index_info, file_manager); + vec_index = dynamic_cast(new_index.get()); - std::vector index_files; - for (auto& binary : binary_set.binary_map_) { - index_files.emplace_back(binary.first); - } - load_conf["index_files"] = index_files; - ASSERT_NO_THROW(vec_index->Load(binary_set, load_conf)); - EXPECT_EQ(vec_index->Count(), NB); - } else { - vec_index = dynamic_cast(index.get()); + std::vector index_files; + for (auto& binary : binary_set.binary_map_) { + index_files.emplace_back(binary.first); } - EXPECT_EQ(vec_index->GetDim(), DIM); + load_conf["index_files"] = index_files; + ASSERT_NO_THROW(vec_index->Load(load_conf)); EXPECT_EQ(vec_index->Count(), NB); + EXPECT_EQ(vec_index->GetDim(), DIM); milvus::SearchInfo search_info; search_info.topk_ = K;