Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43926: [C++] Compute: RowEncoder eliminates offsets when all columns are fixed-sized #43931

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/hash_join_dict.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Status HashJoinDictBuild::Init(ExecContext* ctx, std::shared_ptr<Array> dictiona

auto iter = hash_table_.find(str);
if (iter == hash_table_.end()) {
hash_table_.insert(std::make_pair(str, num_entries));
hash_table_.insert(std::make_pair(std::move(str), num_entries));
ids[i] = num_entries;
entries_to_take.push_back(static_cast<int32_t>(i));
++num_entries;
Expand Down
60 changes: 57 additions & 3 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*
ARROW_DCHECK(false);
}

// Decide if the row is fixed width
int32_t fixed_length_accum = 0;
for (size_t i = 0; i < column_types.size(); ++i) {
auto encoder_info = encoders_[i]->GetEncoderInfo();
if (!encoder_info.is_fixed_width) {
fixed_length_accum = kInvalidFixedWidthOffset;
break;
}
fixed_length_accum += encoder_info.fixed_width;
}
this->fixed_width_length_ = fixed_length_accum;

int32_t total_length = 0;
for (size_t i = 0; i < column_types.size(); ++i) {
encoders_[i]->AddLengthNull(&total_length);
Expand All @@ -319,9 +331,42 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*
void RowEncoder::Clear() {
offsets_.clear();
bytes_.clear();
fixed_width_row_count_ = 0;
}

Status RowEncoder::EncodeAndAppendForFixedWidth(const ExecSpan& batch) {
size_t length_before =
static_cast<size_t>(this->fixed_width_length_) * this->fixed_width_row_count_;
ARROW_CHECK_EQ(length_before, bytes_.size());
#ifndef NDEBUG
{
int32_t accum_length = 0;
std::vector<int32_t> lengthes(batch.length, 0);
for (int i = 0; i < batch.num_values(); ++i) {
encoders_[i]->AddLength(batch[i], batch.length, lengthes.data());
}
for (int i = 0; i < batch.length; ++i) {
accum_length += lengthes[i];
}
ARROW_CHECK_EQ(accum_length, this->fixed_width_length_ * batch.length);
}
#endif
bytes_.resize(length_before + batch.length * this->fixed_width_length_);
std::vector<uint8_t*> buf_ptrs(batch.length);
for (int64_t i = 0; i < batch.length; ++i) {
buf_ptrs[i] = bytes_.data() + length_before + i * fixed_width_length_;
}
fixed_width_row_count_ += batch.length;
for (int i = 0; i < batch.num_values(); ++i) {
RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, buf_ptrs.data()));
}
return Status::OK();
}

Status RowEncoder::EncodeAndAppend(const ExecSpan& batch) {
if (IsFixedWidth()) {
return EncodeAndAppendForFixedWidth(batch);
}
if (offsets_.empty()) {
offsets_.resize(1);
offsets_[0] = 0;
Expand Down Expand Up @@ -359,9 +404,18 @@ Result<ExecBatch> RowEncoder::Decode(int64_t num_rows, const int32_t* row_ids) {
ExecBatch out({}, num_rows);

std::vector<uint8_t*> buf_ptrs(num_rows);
for (int64_t i = 0; i < num_rows; ++i) {
buf_ptrs[i] = (row_ids[i] == kRowIdForNulls()) ? encoded_nulls_.data()
: bytes_.data() + offsets_[row_ids[i]];
if (IsFixedWidth()) {
for (int64_t i = 0; i < num_rows; ++i) {
buf_ptrs[i] = (row_ids[i] == kRowIdForNulls())
? encoded_nulls_.data()
: bytes_.data() + fixed_width_length_ * row_ids[i];
}
} else {
for (int64_t i = 0; i < num_rows; ++i) {
buf_ptrs[i] = (row_ids[i] == kRowIdForNulls())
? encoded_nulls_.data()
: bytes_.data() + offsets_[row_ids[i]];
}
}

out.values.resize(encoders_.size());
Expand Down
55 changes: 53 additions & 2 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ struct ARROW_EXPORT KeyEncoder {
static bool IsNull(const uint8_t* encoded_bytes) {
return encoded_bytes[0] == kNullByte;
}

struct EncoderInfo {
Copy link
Contributor

@zanmato1984 zanmato1984 Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using a more descriptive name - XxInfo generally doesn't give much information about Xx. Maybe something like EncodeColumnMeta?

bool is_fixed_width;
int32_t fixed_width;
};

virtual EncoderInfo GetEncoderInfo() const = 0;
};

struct ARROW_EXPORT BooleanKeyEncoder : KeyEncoder {
Expand All @@ -99,6 +106,10 @@ struct ARROW_EXPORT BooleanKeyEncoder : KeyEncoder {

Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/true, 2};
}
};

struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
Expand All @@ -118,6 +129,10 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/true, byte_width_ + 1};
}

std::shared_ptr<DataType> type_;
const int byte_width_;
};
Expand All @@ -132,6 +147,8 @@ struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder {
Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

// Uses `GetEncoderInfo` in `FixedWidthKeyEncoder`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this line for?


MemoryPool* pool_;
std::shared_ptr<Array> dictionary_;
};
Expand Down Expand Up @@ -248,6 +265,10 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {

explicit VarLengthKeyEncoder(std::shared_ptr<DataType> type) : type_(std::move(type)) {}

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/false, /*fixed_width=*/5};
}

std::shared_ptr<DataType> type_;
};

Expand All @@ -267,6 +288,10 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
MemoryPool* pool) override {
return ArrayData::Make(null(), length, {NULLPTR}, length);
}

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/true, /*fixed_width=*/0};
}
};

/// RowEncoder encodes ExecSpan to a variable length byte sequence
Expand Down Expand Up @@ -348,23 +373,49 @@ class ARROW_EXPORT RowEncoder {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
encoded_nulls_.size());
}
int32_t row_length = offsets_[i + 1] - offsets_[i];
return std::string(reinterpret_cast<const char*>(bytes_.data() + offsets_[i]),
int32_t row_length = 0;
int32_t row_offset = 0;
if (IsFixedWidth()) {
row_length = fixed_width_length_;
row_offset = i * fixed_width_length_;
} else {
row_length = offsets_[i + 1] - offsets_[i];
row_offset = offsets_[i];
}
return std::string(reinterpret_cast<const char*>(bytes_.data() + row_offset),
row_length);
}

int32_t num_rows() const {
if (IsFixedWidth()) {
return fixed_width_row_count_;
}
return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
}

private:
Status EncodeAndAppendForFixedWidth(const ExecSpan& batch);
bool IsFixedWidth() const noexcept {
return fixed_width_length_ != kInvalidFixedWidthOffset;
}

private:
static constexpr int32_t kInvalidFixedWidthOffset = 1;
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
// When all columns in a row are fixed-width or NA, the encoded row
// doesn't need to maintain the row offsets. In this case, the
// offsets_.size() would be also empty.
int32_t fixed_width_length_{kInvalidFixedWidthOffset};
int32_t fixed_width_row_count_{0};
// offsets_ vector stores the starting position (offset) of each encoded row
// within the bytes_ vector. This allows for quick access to individual rows.
//
// The size would be num_rows + 1 if not empty, the last element is the total
// length of the bytes_ vector.
//
// When all columns in a row are fixed-width or NA, the offsets_ can be
// eliminated, and the encoded row can be accessed by fixed_width_length_.
std::vector<int32_t> offsets_;
// The encoded bytes of all non "kRowIdForNulls" rows.
std::vector<uint8_t> bytes_;
Expand Down
Loading