Skip to content

Commit

Permalink
Set channel capacity before consuming it (milvus-io#25895)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Jul 26, 2023
1 parent 6f18587 commit 9618bd9
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 25 deletions.
16 changes: 15 additions & 1 deletion internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "common/Consts.h"
#include "common/RangeSearchHelper.h"
#include "common/Utils.h"
#include "log/Log.h"
#include "storage/FieldData.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/ThreadPool.h"
Expand Down Expand Up @@ -103,30 +104,43 @@ VectorMemIndex::Load(const Config& config) {
AssertInfo(index_files.has_value(),
"index file paths is empty when load index");

auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);

std::map<std::string, storage::FieldDataChannelPtr> channels;
for (const auto& file : index_files.value()) {
auto key = file.substr(file.find_last_of('/') + 1);
LOG_SEGCORE_INFO_ << "loading index file " << key;
if (channels.find(key) == channels.end()) {
channels.emplace(std::move(key),
std::make_shared<storage::FieldDataChannel>());
std::make_shared<storage::FieldDataChannel>(
parallel_degree * 2));
}
}

auto& pool = ThreadPool::GetInstance();
auto future = pool.Submit(
[&] { file_manager_->LoadFileStream(index_files.value(), channels); });

LOG_SEGCORE_INFO_ << "assemble index data...";
std::unordered_map<std::string, storage::FieldDataPtr> result;
AssembleIndexDatas(channels, result);
LOG_SEGCORE_INFO_ << "assemble index data done";

LOG_SEGCORE_INFO_ << "construct binary set...";
BinarySet binary_set;
for (auto& [key, data] : result) {
LOG_SEGCORE_INFO_ << "add index data to binary set: " << key;
auto size = data->Size();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}

LOG_SEGCORE_INFO_ << "load index into Knowhere...";
LoadWithoutAssemble(binary_set, config);
LOG_SEGCORE_INFO_ << "load vector index done";
}

void
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
auto field_data_info =
FieldDataInfo(field_id.get(), num_rows, load_info.mmap_dir_path);

auto parallel_degree = static_cast<uint64_t>(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.channel->set_capacity(parallel_degree * 2);

auto& pool = ThreadPool::GetInstance();
auto load_future = pool.Submit(
LoadFieldDatasFromRemote, insert_files, field_data_info.channel);
Expand Down
4 changes: 0 additions & 4 deletions internal/core/src/segcore/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,6 @@ LoadFieldDatasFromRemote(std::vector<std::string>& remote_files,
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);

// set the capacity to 2x parallel_degree, so the memory usage will not be greater than 2x DEFAULT_FIELD_MAX_MEMORY_LIMIT,
// which is 128 MiB
channel->set_capacity(parallel_degree * 2);

auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
std::sort(remote_files.begin(),
Expand Down
3 changes: 0 additions & 3 deletions internal/core/src/storage/MemFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ MemFileManagerImpl::LoadFileStream(
std::map<std::string, storage::FieldDataChannelPtr>& channels) {
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
for (auto& [_, channel] : channels) {
channel->set_capacity(parallel_degree * 2);
}

std::vector<std::string> batch_files;
auto LoadBatchIndexFiles = [&]() {
Expand Down
28 changes: 11 additions & 17 deletions internal/core/unittest/test_indexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,27 +386,21 @@ TEST_P(IndexTest, BuildAndQuery) {
milvus::index::IndexBasePtr new_index;
milvus::index::VectorIndex* vec_index = nullptr;

if (index_type == knowhere::IndexEnum::INDEX_DISKANN) {
// TODO ::diskann.query need load first, ugly
auto binary_set = index->Serialize(milvus::Config{});
index.reset();
auto binary_set = index->Upload();
index.reset();

new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager);
vec_index = dynamic_cast<milvus::index::VectorIndex*>(new_index.get());
new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager);
vec_index = dynamic_cast<milvus::index::VectorIndex*>(new_index.get());

std::vector<std::string> index_files;
for (auto& binary : binary_set.binary_map_) {
index_files.emplace_back(binary.first);
}
load_conf["index_files"] = index_files;
ASSERT_NO_THROW(vec_index->Load(binary_set, load_conf));
EXPECT_EQ(vec_index->Count(), NB);
} else {
vec_index = dynamic_cast<milvus::index::VectorIndex*>(index.get());
std::vector<std::string> index_files;
for (auto& binary : binary_set.binary_map_) {
index_files.emplace_back(binary.first);
}
EXPECT_EQ(vec_index->GetDim(), DIM);
load_conf["index_files"] = index_files;
ASSERT_NO_THROW(vec_index->Load(load_conf));
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);

milvus::SearchInfo search_info;
search_info.topk_ = K;
Expand Down

0 comments on commit 9618bd9

Please sign in to comment.