Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43758: [C++] Compute: More comment in RowEncoder #43763

Merged
merged 15 commits into from
Sep 2, 2024
Merged
6 changes: 3 additions & 3 deletions cpp/src/arrow/compute/light_array_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ struct ARROW_EXPORT KeyColumnMetadata {
/// If this is true the column will have a validity buffer and
/// a data buffer and the third buffer will be unused.
bool is_fixed_length;
/// \brief True if this column is the null type
/// \brief True if this column is the null type(NA).
bool is_null_type;
/// \brief The number of bytes for each item
///
/// Zero has a special meaning, indicating a bit vector with one bit per value if it
/// isn't a null type column.
/// isn't a null type column. Generally, this means that the column is a boolean type.
///
/// For a varying-length binary column this represents the number of bytes per offset.
uint32_t fixed_length;
Expand Down Expand Up @@ -403,7 +403,7 @@ class ARROW_EXPORT ExecBatchBuilder {

int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }

static int num_rows_max() { return 1 << kLogNumRows; }
static constexpr int num_rows_max() { return 1 << kLogNumRows; }

private:
static constexpr int kLogNumRows = 15;
Expand Down
56 changes: 26 additions & 30 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,37 @@ void FixedWidthKeyEncoder::AddLengthNull(int32_t* length) {

Status FixedWidthKeyEncoder::Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) {
auto handle_next_valid_value = [&](std::string_view bytes) {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
};
auto handle_next_null_value = [&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
};
if (data.is_array()) {
ArraySpan viewed = data.array;
// The original type might not be FixedSizeBinaryType, but it would
// treat the input as binary data.
auto view_ty = fixed_size_binary(byte_width_);
viewed.type = view_ty.get();
VisitArraySpanInline<FixedSizeBinaryType>(
viewed,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
});
VisitArraySpanInline<FixedSizeBinaryType>(viewed, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<arrow::internal::PrimitiveScalarBase>();
if (scalar.is_valid) {
const std::string_view data = scalar.view();
DCHECK_EQ(data.size(), static_cast<size_t>(byte_width_));
const std::string_view scalar_data = scalar.view();
DCHECK_EQ(scalar_data.size(), static_cast<size_t>(byte_width_));
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, data.data(), data.size());
encoded_ptr += byte_width_;
handle_next_valid_value(scalar_data);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -267,11 +263,11 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*

for (size_t i = 0; i < column_types.size(); ++i) {
const bool is_extension = column_types[i].id() == Type::EXTENSION;
const TypeHolder& type = is_extension
? arrow::internal::checked_pointer_cast<ExtensionType>(
column_types[i].GetSharedPtr())
->storage_type()
: column_types[i];
const TypeHolder& type =
is_extension
? arrow::internal::checked_cast<const ExtensionType*>(column_types[i].type)
->storage_type()
: column_types[i];

if (is_extension) {
extension_types_[i] = arrow::internal::checked_pointer_cast<ExtensionType>(
Expand Down Expand Up @@ -379,7 +375,7 @@ Result<ExecBatch> RowEncoder::Decode(int64_t num_rows, const int32_t* row_ids) {
ARROW_ASSIGN_OR_RAISE(out.values[i], ::arrow::internal::GetArrayView(
column_array_data, extension_types_[i]))
} else {
out.values[i] = column_array_data;
out.values[i] = std::move(column_array_data);
}
}

Expand Down
154 changes: 125 additions & 29 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,41 @@ struct ARROW_EXPORT KeyEncoder {

virtual ~KeyEncoder() = default;

// Increment the values in the lengths array by the length of the encoded key for the
// corresponding value in the given column.
//
// Generally if Encoder is for a fixed-width type, the length of the encoded key
// would add ExtraByteForNull + byte_width.
// If Encoder is for a variable-width type, the length would add ExtraByteForNull +
// sizeof(Offset) + buffer_size.
// If Encoder is for null type, the length would add 0.
virtual void AddLength(const ExecValue& value, int64_t batch_length,
int32_t* lengths) = 0;

// Increment the length by the length of an encoded null value.
// It's a special case for AddLength like `AddLength(Null-Scalar, 1, lengths)`.
virtual void AddLengthNull(int32_t* length) = 0;

// Encode the column into the encoded_bytes, which is an array of pointers to each row
// buffer.
//
// If value is an array, the array-size should be batch_length.
// If value is a scalar, the value would repeat batch_length times.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
// NB: The pointers in the encoded_bytes will be advanced as values being encoded into.
virtual Status Encode(const ExecValue&, int64_t batch_length,
uint8_t** encoded_bytes) = 0;

// Encode a null value into the encoded_bytes, which is an array of pointers to each row
// buffer.
//
// It's a special case for Encode like `Encode(Null-Scalar, 1, encoded_bytes)`.
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
// NB: The pointers in the encoded_bytes will be advanced as values being encoded into.
virtual void EncodeNull(uint8_t** encoded_bytes) = 0;

// Decode the encoded key from the encoded_bytes, which is an array of pointers to each
// row buffer, into an ArrayData.
//
// NB: The pointers in the encoded_bytes will be advanced as values being decoded from.
virtual Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes,
int32_t length, MemoryPool*) = 0;

Expand Down Expand Up @@ -94,7 +119,7 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
MemoryPool* pool) override;

std::shared_ptr<DataType> type_;
int byte_width_;
const int byte_width_;
};

struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder {
Expand All @@ -118,6 +143,7 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {
void AddLength(const ExecValue& data, int64_t batch_length, int32_t* lengths) override {
if (data.is_array()) {
int64_t i = 0;
ARROW_DCHECK_EQ(data.array.length, batch_length);
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
Expand All @@ -142,41 +168,34 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {

Status Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) override {
auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
};
auto handle_next_null_value = [&encoded_bytes]() {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
};
if (data.is_array()) {
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
});
DCHECK_EQ(data.length(), batch_length);
VisitArraySpanInline<T>(data.array, handle_next_valid_value,
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<BaseBinaryScalar>();
if (scalar.is_valid) {
const auto& bytes = *scalar.value;
const auto bytes = std::string_view{*scalar.value};
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
handle_next_valid_value(bytes);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -250,6 +269,68 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
}
};

/// RowEncoder encodes ExecSpan to a variable length byte sequence
/// created by concatenating the encoded form of each column. The encoding
/// for each column depends on its data type.
///
/// This is used to encode columns into row-major format, which will be
/// beneficial for grouping and joining operations.
///
/// Unlike DuckDB and arrow-rs, currently this row format can not help
/// sortings because the row-format is uncomparable.
///
/// # Key Column Encoding
///
/// The row format is composed of the the KeyColumn encodings for each,
/// and the column is encoded as follows:
/// 1. A null byte for each column, indicating whether the column is null.
/// "1" for null, "0" for non-null.
/// 2. The "fixed width" encoding for the column, it would exist whether
/// the column is null or not.
/// 3. The "variable payload" encoding for the column, it would exists only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it count into the "length" part for the var-length column (which will exist no matter the column is null or not)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some descr below

/// for non-null string/binary columns.
/// For string/binary columns, the length of the payload is in
/// "fixed width" part, and the binary contents are in the
/// "variable payload" part.
/// 4. Specially, if all columns in a row are null, the caller may decide
/// to refer to kRowIdForNulls instead of actually encoding/decoding
/// it using any KeyEncoder. See the comment for encoded_nulls_.
///
/// The endianness of the encoded bytes is platform-dependent.
///
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
/// ## Null Type
///
/// Null Type is a special case, it doesn't occupy any space in the
/// encoded row.
///
/// ## Fixed Width Type
///
/// Fixed Width Type is encoded as a fixed-width byte sequence. For example:
/// ```
/// Int8: 5, null, 6
/// ```
/// Would be encoded as [0 5], [1 0], [0 6].
///
/// ### Dictionary Type
///
/// Dictionary Type is encoded as a fixed-width byte sequence using
/// dictionary indices, the dictionary should be identical for all
/// rows.
///
/// ## Variable Width Type
///
/// Variable Width Type is encoded as:
/// [null byte, variable-byte length, variable bytes]. For example:
///
/// String "abc" Would be encoded as:
/// [0 0 0 0 3 'a' 'b' 'c']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be [0 3 0 0 0 'a' 'b' 'c'] on little-endian platforms?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch
How about:

0 ( 1 byte for not null) + 3 ( 4 bytes for length ) + "abc" (payload)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work too!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

///
/// String null Would be encoded as:
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
/// [1 0 0 0 0]
///
/// # Row Encoding
///
/// The row format is the concatenation of the encodings of each column.
class ARROW_EXPORT RowEncoder {
public:
static constexpr int kRowIdForNulls() { return -1; }
Expand All @@ -259,6 +340,9 @@ class ARROW_EXPORT RowEncoder {
Status EncodeAndAppend(const ExecSpan& batch);
Result<ExecBatch> Decode(int64_t num_rows, const int32_t* row_ids);

// Returns the encoded representation of the row at index i.
// If i is kRowIdForNulls, it returns the pre-encoded all-nulls
// row.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another optimization might be std::string_view unsafe_encoded_row(int32_t i), which not copying the row. When the std::string cannot applying SSO, it would be benifit from less heap allocation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open a separate PR for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open a separate PR for that?

Would do after this merged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline std::string encoded_row(int32_t i) const {
if (i == kRowIdForNulls()) {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
Expand All @@ -270,14 +354,26 @@ class ARROW_EXPORT RowEncoder {
}

int32_t num_rows() const {
return offsets_.size() == 0 ? 0 : static_cast<int32_t>(offsets_.size() - 1);
return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
}

private:
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
// offsets_ vector stores the starting position (offset) of each encoded row
// within the bytes_ vector. This allows for quick access to individual rows.
//
// The size would be num_rows + 1 if not empty, the last element is the total
// length of the bytes_ vector.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this issue: I'm thinking of an optimization here. We can define a flag to indicate that all the columns are fixed-sized or null. If it's, we can not maintain the offsets, just static compute a fixed-row-size, and using fixed-row-size to seek for the row.

std::vector<int32_t> offsets_;
// The encoded bytes of all non "kRowIdForNulls" rows.
std::vector<uint8_t> bytes_;
// A pre-encoded constant row with all its columns being null. Useful when
// the caller is certain that an entire row is null and then uses kRowIdForNulls
// to refer to it.
//
// EncodeAndAppend would never append this row, but encoded_row and Decode would
// return this row when kRowIdForNulls is passed.
std::vector<uint8_t> encoded_nulls_;
std::vector<std::shared_ptr<ExtensionType>> extension_types_;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/row/row_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct ARROW_EXPORT RowTableMetadata {
/// For a fixed-length binary row, common size of rows in bytes,
/// rounded up to the multiple of alignment.
///
/// For a varying-length binary, size of all encoded fixed-length key columns,
/// For a varying-length binary row, size of all encoded fixed-length key columns,
/// including lengths of varying-length columns, rounded up to the multiple of string
/// alignment.
uint32_t fixed_length;
Expand Down
Loading