Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix bugs of vector index #55123

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ CONF_mBool(enable_vector_index_block_cache, "true");
CONF_mInt32(config_vector_index_build_concurrency, "8");

// default not to build the empty index
CONF_mInt32(config_vector_index_default_build_threshold, "0");
CONF_mInt32(config_vector_index_default_build_threshold, "100");

// When upgrade thrift to 0.20.0, the MaxMessageSize member defines the maximum size of a (received) message, in bytes.
// The default value is represented by a constant named DEFAULT_MAX_MESSAGE_SIZE, whose value is 100 * 1024 * 1024 bytes.
Expand Down
22 changes: 8 additions & 14 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Status OlapChunkSource::prepare(RuntimeState* state) {
if (_use_vector_index) {
_use_ivfpq = vector_search_options.use_ivfpq;
_vector_distance_column_name = vector_search_options.vector_distance_column_name;
_vector_slot_id = vector_search_options.vector_slot_id;
_params.vector_search_option = std::make_shared<VectorSearchOption>();
}
const TupleDescriptor* tuple_desc = state->desc_tbl().get_tuple_descriptor(thrift_olap_scan_node.tuple_id);
Expand Down Expand Up @@ -320,12 +321,10 @@ Status OlapChunkSource::_init_scanner_columns(std::vector<uint32_t>& scanner_col
for (auto slot : *_slots) {
DCHECK(slot->is_materialized());
int32_t index;
if (_use_vector_index && !_use_ivfpq) {
index = _tablet_schema->field_index(slot->col_name(), _vector_distance_column_name);
if (slot->col_name() == _vector_distance_column_name) {
_params.vector_search_option->vector_column_id = index;
_params.vector_search_option->vector_slot_id = slot->id();
}
if (_use_vector_index && !_use_ivfpq && slot->id() == _vector_slot_id) {
index = _tablet_schema->num_columns();
_params.vector_search_option->vector_column_id = index;
_params.vector_search_option->vector_slot_id = slot->id();
} else {
index = _tablet_schema->field_index(slot->col_name());
}
Expand All @@ -352,12 +351,7 @@ Status OlapChunkSource::_init_scanner_columns(std::vector<uint32_t>& scanner_col

Status OlapChunkSource::_init_unused_output_columns(const std::vector<std::string>& unused_output_columns) {
for (const auto& col_name : unused_output_columns) {
int32_t index;
if (_use_vector_index && !_use_ivfpq) {
index = _tablet_schema->field_index(col_name, _vector_distance_column_name);
} else {
index = _tablet_schema->field_index(col_name);
}
int32_t index = _tablet_schema->field_index(col_name);
if (index < 0) {
std::stringstream ss;
ss << "invalid field name: " << col_name;
Expand Down Expand Up @@ -562,8 +556,8 @@ Status OlapChunkSource::_init_global_dicts(TabletReaderParams* params) {
if (iter != global_dict_map.end()) {
auto& dict_map = iter->second.first;
int32_t index;
if (_use_vector_index && !_use_ivfpq) {
index = _tablet_schema->field_index(slot->col_name(), _vector_distance_column_name);
if (_use_vector_index && !_use_ivfpq && slot->id() == _vector_slot_id) {
index = _tablet_schema->num_columns();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possiable have more than one vector index slot id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, one table can only have at most one vector index for now. Many places use this assumption.

Status SegmentIterator::_init_ann_reader() {
    std::unordered_map<int32_t, TabletIndex> col_map_index;
    for (const auto& index : *_segment->tablet_schema().indexes()) {
        if (index.index_type() == VECTOR) {
            col_map_index.emplace(index.col_unique_ids()[0], index);
        }
    }
    
    std::vector<TabletIndex> hit_indexes;
    for (auto& field : _schema.fields()) {
        if (col_map_index.count(field->uid()) > 0) {
            hit_indexes.emplace_back(col_map_index.at(field->uid()));
        }
    }
    // TODO: Support more index in one segment iterator, only support one index for now
    DCHECK(hit_indexes.size() <= 1) << "Only support query no more than one index now";
    
    if (hit_indexes.empty()) {
        return Status::OK();
    }
    
    auto tablet_index_meta = std::make_shared<TabletIndex>(hit_indexes[0]);
    std::string index_path = IndexDescriptor::vector_index_file_path(_opts.rowset_path, _opts.rowsetid.to_string(),
                                                                     segment_id(), tablet_index_meta->index_id());
    return _init_reader_from_file(index_path, tablet_index_meta, _query_params);
}

} else {
index = _tablet_schema->field_index(slot->col_name());
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ class OlapChunkSource final : public ChunkSource {
std::vector<ColumnAccessPathPtr> _column_access_paths;

bool _use_vector_index = false;

bool _use_ivfpq = false;

std::string _vector_distance_column_name;
SlotId _vector_slot_id;

// The following are profile meatures
int64_t _num_rows_read = 0;
Expand Down
11 changes: 7 additions & 4 deletions be/src/storage/index/vector/tenann/del_id_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef WITH_TENANN
#include "storage/del_vector.h"
#include "storage/range.h"
Expand All @@ -39,10 +42,10 @@

namespace starrocks {

class DelIdFilter : public tenann::IdFilter {
class DelIdFilter final : public tenann::IdFilter {
public:
DelIdFilter(const SparseRange<>& scan_range);
~DelIdFilter() = default;
explicit DelIdFilter(const SparseRange<>& scan_range);
~DelIdFilter() override = default;

bool IsMember(tenann::idx_t id) const override;

Expand All @@ -51,4 +54,4 @@ class DelIdFilter : public tenann::IdFilter {
};

} // namespace starrocks
#endif
#endif
143 changes: 60 additions & 83 deletions be/src/storage/index/vector/tenann/tenann_index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "tenann/factory/index_factory.h"

namespace starrocks {

// =============== TenAnnIndexBuilderProxy =============

Status TenAnnIndexBuilderProxy::init() {
Expand All @@ -35,10 +36,19 @@ Status TenAnnIndexBuilderProxy::init() {
return Status::OK();
}).status());

if (!meta.common_params().contains("dim")) {
return Status::InvalidArgument("Dim is needed because it's a critical common param");
const auto& params = meta.common_params();

if (!params.contains(index::vector::DIM)) {
return Status::InvalidArgument("dim is needed because it's a critical common param");
}
_dim = params[index::vector::DIM];

if (!params.contains(index::vector::METRIC_TYPE)) {
return Status::InvalidArgument("metric_type is needed because it's a critical common param");
}
_dim = meta.common_params()["dim"];
_is_input_normalized = params.contains(index::vector::IS_VECTOR_NORMED) &&
params[index::vector::IS_VECTOR_NORMED] &&
params[index::vector::METRIC_TYPE] == tenann::MetricType::kCosineSimilarity;

auto meta_copy = meta;
if (meta.index_type() == tenann::IndexType::kFaissIvfPq && config::enable_vector_index_block_cache) {
Expand All @@ -51,7 +61,7 @@ Status TenAnnIndexBuilderProxy::init() {
// build and write index
_index_builder = tenann::IndexFactory::CreateBuilderFromMeta(meta_copy);
_index_builder->index_writer()->SetIndexCache(tenann::IndexCache::GetGlobalInstance());
if (_src_is_nullable) {
if (_is_element_nullable) {
_index_builder->EnableCustomRowId();
}
_index_builder->Open(_index_path);
Expand All @@ -66,98 +76,64 @@ Status TenAnnIndexBuilderProxy::init() {
return Status::OK();
}

Status TenAnnIndexBuilderProxy::add(const Column& data) {
try {
auto vector_view = tenann::ArraySeqView{.data = const_cast<uint8_t*>(data.raw_data()),
.dim = _dim,
.size = static_cast<uint32_t>(data.size()),
.elem_type = tenann::kFloatType};
if (data.is_array() && data.size() != 0) {
const auto& cur_array = down_cast<const ArrayColumn&>(data);
auto offsets = cur_array.offsets();
size_t last_offset = 0;
auto* offsets_data = reinterpret_cast<uint32_t*>(offsets.mutable_raw_data());
for (size_t i = 1; i < offsets.size(); i++) {
size_t dim = offsets_data[i] - last_offset;
if (dim > 0 && _dim != dim) {
LOG(WARNING) << "index dim: " << _dim << ", written dim: " << dim;
return Status::InvalidArgument(
strings::Substitute("The dimensions of the vector written are inconsistent, index dim is "
"$0 but data dim is $1, vector data is ",
_dim, dim));
}
last_offset = offsets_data[i];
}
}

_index_builder->Add({vector_view});
} catch (tenann::Error& e) {
LOG(WARNING) << e.what();
return Status::InternalError(e.what());
template <bool is_input_normalized>
static Status valid_input_vector(const ArrayColumn& input_column, const size_t index_dim) {
if (input_column.empty()) {
return Status::OK();
}
return Status::OK();
}

Status TenAnnIndexBuilderProxy::add(const Column& data, const Column& null_map, const size_t offset) {
try {
auto vector_view = tenann::ArraySeqView{.data = const_cast<uint8_t*>(data.raw_data()),
.dim = _dim,
.size = static_cast<uint32_t>(data.size()),
.elem_type = tenann::kFloatType};
if (data.is_array() && data.size() != 0) {
const auto& cur_array = down_cast<const ArrayColumn&>(data);
auto offsets = cur_array.offsets();
size_t last_offset = 0;
auto* offsets_data = reinterpret_cast<uint32_t*>(offsets.mutable_raw_data());
for (size_t i = 1; i < offsets.size(); i++) {
size_t dim = offsets_data[i] - last_offset;
if (dim > 0 && _dim != dim) {
LOG(WARNING) << "index dim: " << _dim << ", written dim: " << dim;
return Status::InvalidArgument(
strings::Substitute("The dimensions of the vector written are inconsistent, index dim is "
"$0 but data dim is $1, vector data is ",
_dim, dim));
}
last_offset = offsets_data[i];
}
const size_t num_rows = input_column.size();
const auto* offsets = reinterpret_cast<const uint32_t*>(input_column.offsets().raw_data());
const auto* nums = reinterpret_cast<const float*>(input_column.elements().raw_data());

for (size_t i = 0; i < num_rows; i++) {
const size_t input_dim = offsets[i + 1] - offsets[i];

if (input_dim != index_dim) {
return Status::InvalidArgument(
strings::Substitute("The dimensions of the vector written are inconsistent, index dim is "
"$0 but data dim is $1",
index_dim, input_dim));
}
std::vector<int64_t> row_ids(data.size());
std::iota(row_ids.begin(), row_ids.end(), offset);
_index_builder->Add({vector_view}, row_ids.data(), null_map.raw_data());

} catch (tenann::Error& e) {
LOG(WARNING) << e.what();
return Status::InternalError(e.what());
if constexpr (is_input_normalized) {
double sum = 0;
for (int j = 0; j < input_dim; j++) {
sum += nums[offsets[i] + j] * nums[offsets[i] + j];
}
if (std::abs(sum - 1) > 1e-6) {
return Status::InvalidArgument(
"The input vector is not normalized but `metric_type` is cosine_similarity and "
"`is_vector_normed` is true");
}
}
}

return Status::OK();
}

Status TenAnnIndexBuilderProxy::write(const Column& data) {
try {
auto vector_view = tenann::ArraySeqView{.data = const_cast<uint8_t*>(data.raw_data()),
.dim = _dim,
.size = static_cast<uint32_t>(data.size()),
.elem_type = tenann::kFloatType};
Status TenAnnIndexBuilderProxy::add(const Column& array_column, const size_t offset) {
DCHECK(array_column.is_array());
const auto& array_col = down_cast<const ArrayColumn&>(array_column);

_index_builder->Add({vector_view});
} catch (tenann::Error& e) {
LOG(WARNING) << e.what();
return Status::InternalError(e.what());
DCHECK(array_col.elements_column()->is_nullable());
const auto& nullable_elements = down_cast<const NullableColumn&>(array_col.elements());
const auto& is_element_nulls = nullable_elements.null_column_ref();

if (_is_input_normalized) {
RETURN_IF_ERROR(valid_input_vector<true>(array_col, _dim));
} else {
RETURN_IF_ERROR(valid_input_vector<false>(array_col, _dim));
}
return Status::OK();
}

Status TenAnnIndexBuilderProxy::write(const Column& data, const Column& null_map) {
try {
auto vector_view = tenann::ArraySeqView{.data = const_cast<uint8_t*>(data.raw_data()),
auto vector_view = tenann::ArraySeqView{.data = const_cast<uint8_t*>(array_col.raw_data()),
.dim = _dim,
.size = static_cast<uint32_t>(data.size()),
.size = static_cast<uint32_t>(array_col.size()),
.elem_type = tenann::kFloatType};

std::vector<int64_t> row_ids(data.size());
std::iota(row_ids.begin(), row_ids.end(), 0);
_index_builder->Add({vector_view}, row_ids.data(), null_map.raw_data());

std::vector<int64_t> row_ids(array_col.size());
std::iota(row_ids.begin(), row_ids.end(), offset);
_index_builder->Add({vector_view}, row_ids.data(), is_element_nulls.raw_data());
} catch (tenann::Error& e) {
LOG(WARNING) << e.what();
return Status::InternalError(e.what());
Expand All @@ -175,10 +151,11 @@ Status TenAnnIndexBuilderProxy::flush() {
return Status::OK();
}

void TenAnnIndexBuilderProxy::close() {
void TenAnnIndexBuilderProxy::close() const {
if (_index_builder && !_index_builder->is_closed()) {
_index_builder->Close();
}
}

} // namespace starrocks
#endif
29 changes: 15 additions & 14 deletions be/src/storage/index/vector/tenann/tenann_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#ifdef WITH_TENANN

#include <memory>
Expand All @@ -24,35 +26,34 @@
namespace starrocks {

// A proxy to real Ten ANN index builder
class TenAnnIndexBuilderProxy : public VectorIndexBuilder {
class TenAnnIndexBuilderProxy final : public VectorIndexBuilder {
public:
TenAnnIndexBuilderProxy(std::shared_ptr<TabletIndex> tablet_index, std::string segment_index_path,
bool src_is_nullable)
bool is_element_nullable)
: VectorIndexBuilder(std::move(tablet_index), std::move(segment_index_path)),
_src_is_nullable(src_is_nullable){};
_is_element_nullable(is_element_nullable) {}

// proxy should not clean index builder resource
~TenAnnIndexBuilderProxy() override { close(); };

Status init() override;

Status add(const Column& data) override;

Status add(const Column& data, const Column& null_map, const size_t offset) override;

Status write(const Column& data) override;

Status write(const Column& data, const Column& null_map) override;
Status add(const Column& array_column, const size_t offset) override;

Status flush() override;

void close();
void close() const;

private:
std::shared_ptr<tenann::IndexBuilder> _index_builder;
uint32_t _dim = 0;
OnceFlag _init_once;
bool _src_is_nullable;
std::shared_ptr<tenann::IndexBuilder> _index_builder = nullptr;
uint32_t _dim = 0;
// This will be true when `metric_type` is cosine_similarity and `is_vector_normed` is true.
// When it is true, the vector (a row of the array column) is either null or the sum of the squares of all elements
// equals 1.
bool _is_input_normalized = false;

const bool _is_element_nullable;
};

} // namespace starrocks
Expand Down
7 changes: 3 additions & 4 deletions be/src/storage/index/vector/tenann/tenann_index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,16 @@ StatusOr<tenann::IndexMeta> get_vector_meta(const std::shared_ptr<TabletIndex>&
if (meta.index_type() == tenann::IndexType::kFaissIvfPq) {
meta.index_params()[starrocks::index::vector::NLIST] = int(4 * sqrt(starrocks::index::vector::nb_));

CRITICAL_CHECK_AND_GET(tablet_index, index_properties, M, param_value)
meta.index_params()[starrocks::index::vector::M] = std::atoi(param_value.c_str());

CRITICAL_CHECK_AND_GET(tablet_index, index_properties, nbits, param_value)
meta.index_params()[starrocks::index::vector::NBITS] = std::atoi(param_value.c_str());

CRITICAL_CHECK_AND_GET(tablet_index, index_properties, m_ivfpq, param_value)
meta.index_params()[starrocks::index::vector::M] = std::atoi(param_value.c_str());
} else if (meta.index_type() == tenann::IndexType::kFaissHnsw) {
CRITICAL_CHECK_AND_GET(tablet_index, index_properties, efconstruction, param_value)
meta.index_params()[starrocks::index::vector::EF_CONSTRUCTION] = std::atoi(param_value.c_str());

CRITICAL_CHECK_AND_GET(tablet_index, index_properties, M, param_value)
CRITICAL_CHECK_AND_GET(tablet_index, index_properties, m, param_value)
meta.index_params()[starrocks::index::vector::M] = std::atoi(param_value.c_str());

GET_OR_DEFAULT(tablet_index, search_properties, efsearch, param_value, "40")
Expand Down
Loading
Loading