Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43758: [C++] Compute: More comment in RowEncoder #43763

Merged
merged 15 commits into from
Sep 2, 2024
Merged
6 changes: 3 additions & 3 deletions cpp/src/arrow/compute/light_array_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ struct ARROW_EXPORT KeyColumnMetadata {
/// If this is true the column will have a validity buffer and
/// a data buffer and the third buffer will be unused.
bool is_fixed_length;
/// \brief True if this column is the null type
/// \brief True if this column is the null type(NA).
bool is_null_type;
/// \brief The number of bytes for each item
///
/// Zero has a special meaning, indicating a bit vector with one bit per value if it
/// isn't a null type column.
/// isn't a null type column. Generally, this means that the column is a boolean type.
///
/// For a varying-length binary column this represents the number of bytes per offset.
uint32_t fixed_length;
Expand Down Expand Up @@ -403,7 +403,7 @@ class ARROW_EXPORT ExecBatchBuilder {

int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }

static int num_rows_max() { return 1 << kLogNumRows; }
static constexpr int num_rows_max() { return 1 << kLogNumRows; }

private:
static constexpr int kLogNumRows = 15;
Expand Down
50 changes: 22 additions & 28 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,35 @@ void FixedWidthKeyEncoder::AddLengthNull(int32_t* length) {

Status FixedWidthKeyEncoder::Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) {
auto handle_next_valid_value = [&](std::string_view bytes) {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
};
auto handle_next_null_value = [&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
};
if (data.is_array()) {
ArraySpan viewed = data.array;
auto view_ty = fixed_size_binary(byte_width_);
viewed.type = view_ty.get();
VisitArraySpanInline<FixedSizeBinaryType>(
viewed,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
});
VisitArraySpanInline<FixedSizeBinaryType>(viewed, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<arrow::internal::PrimitiveScalarBase>();
if (scalar.is_valid) {
const std::string_view data = scalar.view();
DCHECK_EQ(data.size(), static_cast<size_t>(byte_width_));
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, data.data(), data.size());
encoded_ptr += byte_width_;
handle_next_valid_value(data);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -267,11 +261,11 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*

for (size_t i = 0; i < column_types.size(); ++i) {
const bool is_extension = column_types[i].id() == Type::EXTENSION;
const TypeHolder& type = is_extension
? arrow::internal::checked_pointer_cast<ExtensionType>(
column_types[i].GetSharedPtr())
->storage_type()
: column_types[i];
const TypeHolder& type =
is_extension
? arrow::internal::checked_cast<const ExtensionType*>(column_types[i].type)
->storage_type()
: column_types[i];

if (is_extension) {
extension_types_[i] = arrow::internal::checked_pointer_cast<ExtensionType>(
Expand Down Expand Up @@ -379,7 +373,7 @@ Result<ExecBatch> RowEncoder::Decode(int64_t num_rows, const int32_t* row_ids) {
ARROW_ASSIGN_OR_RAISE(out.values[i], ::arrow::internal::GetArrayView(
column_array_data, extension_types_[i]))
} else {
out.values[i] = column_array_data;
out.values[i] = std::move(column_array_data);
}
}

Expand Down
114 changes: 86 additions & 28 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,33 @@ struct ARROW_EXPORT KeyEncoder {

virtual ~KeyEncoder() = default;

// Increment the length of the encoded key for the given value to the lengths array.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
//
// Generally if Encoder is for a fixed-width type, the length of the encoded key
// would add ExtraByteForNull + byte_width.
// If Encoder is for a variable-width type, the length would add ExtraByteForNull +
// sizeof(Offset) + buffer_size.
// If Encoder is null type, the length would add 0.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
virtual void AddLength(const ExecValue& value, int64_t batch_length,
int32_t* lengths) = 0;

// Increment the length for a null value.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
// It's a special case for AddLength like `AddLength(Null-Scalar, 1, lengths)`.
virtual void AddLengthNull(int32_t* length) = 0;

// Encode the value into the encoded_bytes buffer.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
//
// If value is an array, the array-size should be batch_length.
// If value is a scalar, the value would repeat batch_length times.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
virtual Status Encode(const ExecValue&, int64_t batch_length,
uint8_t** encoded_bytes) = 0;

// Add a null byte to the encoded_bytes buffer.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
//
// It's a special case for Encode like `Encode(Null-Scalar, 1, encoded_bytes)`.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
virtual void EncodeNull(uint8_t** encoded_bytes) = 0;

// Decode the encoded key from `encoded_bytes` buffer to an ArrayData.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
virtual Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes,
int32_t length, MemoryPool*) = 0;

Expand Down Expand Up @@ -142,41 +159,33 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {

Status Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) override {
auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
};
auto handle_next_null_value = [&encoded_bytes]() {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
};
if (data.is_array()) {
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
});
VisitArraySpanInline<T>(data.array, handle_next_valid_value,
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<BaseBinaryScalar>();
if (scalar.is_valid) {
const auto& bytes = *scalar.value;
const auto bytes = std::string_view{*scalar.value};
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
handle_next_valid_value(bytes);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -250,6 +259,51 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};

/// RowEncoder encodes ExecSpan to a variable length byte sequence
/// created by concatenating the encoded form of each column. The encoding
/// for each column depends on its datatype.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
///
/// This is used to encode rows for grouping and joining operations.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
///
/// Unlike DuckDB and arrow-rs, currently this row format can not help
/// sortings because the row-format is uncomparable.
///
/// The row format is composed of the the KeyColumn encodings for each,
/// and the column is encoded as follows:
/// 1. A null byte for each column, indicating whether the column is null.
/// "1" for null, "0" for non-null.
/// 2. The "fixed width" encoding for the column, it would exists whether
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
/// the column is null or not.
/// 3. The "variable width" encoding for the column, it would exists only
/// for non-null string/binary columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is misleading. If by "variable width" encoding you mean the length + var-length-bytes, then the length always occupies sizeof(Offset) bytes, even for null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to variable payload

///
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
/// ## Null Type
///
/// Null Type is a special case, it doesn't occupy any space in the encoded row.
///
/// ## Fixed Width Type
///
/// Fixed Width Type is encoded as a fixed-width byte sequence. For example:
/// ```
/// Int8: [5, null, 6]
/// ```
/// Would be encoded as [0 5 1 0 0 6].
///
/// ### Dictionary Type
///
/// Dictionary Type is encoded as a fixed-width byte sequence using dictionary
/// indices, the dictionary should be same for all rows.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
///
/// ## Variable Width Type
///
/// Variable Width Type is encoded as:
/// [null byte, variable-byte length, variable bytes]. For example:
///
/// String "abc" Would be encoded as:
/// [0 0 0 0 3 'a' 'b' 'c']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be [0 3 0 0 0 'a' 'b' 'c'] on little-endian platforms?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch
How about:

0 ( 1 byte for not null) + 3 ( 4 bytes for length ) + "abc" (payload)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work too!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

///
/// String null Would be encoded as:
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
/// [1 0 0 0 0]
class ARROW_EXPORT RowEncoder {
public:
static constexpr int kRowIdForNulls() { return -1; }
Expand All @@ -259,6 +313,7 @@ class ARROW_EXPORT RowEncoder {
Status EncodeAndAppend(const ExecSpan& batch);
Result<ExecBatch> Decode(int64_t num_rows, const int32_t* row_ids);

// Return the encoded row at the given index as a string
inline std::string encoded_row(int32_t i) const {
if (i == kRowIdForNulls()) {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
Expand All @@ -270,14 +325,17 @@ class ARROW_EXPORT RowEncoder {
}

int32_t num_rows() const {
return offsets_.size() == 0 ? 0 : static_cast<int32_t>(offsets_.size() - 1);
return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
}

private:
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
// The offsets of each row in the encoded bytes.
// The size would be num_rows + 1 if there are rows.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
std::vector<int32_t> offsets_;
std::vector<uint8_t> bytes_;
// A fixed nulls buffer for all null rows(kRowIdForNulls).
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
std::vector<uint8_t> encoded_nulls_;
std::vector<std::shared_ptr<ExtensionType>> extension_types_;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct ARROW_EXPORT RowTableMetadata {
/// For a fixed-length binary row, common size of rows in bytes,
/// rounded up to the multiple of alignment.
///
/// For a varying-length binary, size of all encoded fixed-length key columns,
/// For a varying-length binary row, size of all encoded fixed-length key columns,
/// including lengths of varying-length columns, rounded up to the multiple of string
/// alignment.
uint32_t fixed_length;
Expand Down
Loading