Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41873: [Acero][C++] Reduce asof-join overhead by minimizing copies of the left hand side #41874

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,12 @@ class CompositeTableBuilder {
}
}
}
return CompositeTable{schema, inputs.size(), dst_to_src, pool};
// NB: The left hand side of the join (index 0) is the reference table of the
// asof-join. The output is comprised of continuous slices of the this table, so
// we can just take zero-copy slices of it. This is much faster than how other
// tables are treated, wherein we need to copy
return CompositeTable{schema, inputs.size(), dst_to_src, pool,
/*contiguous_sources=*/{0}};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/*contiguous_sources=*/{0}};
/*contiguous_srcs_=*/{0}};

}
};

Expand Down
110 changes: 101 additions & 9 deletions cpp/src/arrow/acero/unmaterialized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <optional>
#include <unordered_set>
#include <vector>
#include "arrow/array/builder_base.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/type_traits.h"
Expand Down Expand Up @@ -55,13 +57,27 @@ class UnmaterializedSliceBuilder;
template <size_t MAX_COMPOSITE_TABLES>
class UnmaterializedCompositeTable {
public:
/// \brief Initialized an UnmaterializedCompositeTable.
///
/// \param[in] output_schema the schema of the output table.
/// \param[in] num_composite_tables number of composite tables in the output schema.
/// \param[in] output_col_to_src_ mapping from output column to source table ID and
/// column ID. Note that src table IDs should range from 0 to num_composite_tables - 1,
/// inclusive.
/// \param[in] pool_ memory pool to use for allocations.
/// \param[in] contiguous_srcs_ set of source table IDs that are contiguous.
/// A contiguous source table is one where all slices are from contiguous slices of the
/// input record batches. This allows the materializer to minimize copies by taking
/// contiguous `RecordBatch::Slice`'s.
UnmaterializedCompositeTable(
const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
arrow::MemoryPool* pool_ = arrow::default_memory_pool())
arrow::MemoryPool* pool_ = arrow::default_memory_pool(),
std::unordered_set<int> contiguous_srcs_ = {})
: schema(output_schema),
num_composite_tables(num_composite_tables),
output_col_to_src(std::move(output_col_to_src_)),
contiguous_srcs(std::move(contiguous_srcs_)),
pool{pool_} {}

// Shallow wrappers around std::vector for performance
Expand All @@ -79,11 +95,22 @@ class UnmaterializedCompositeTable {
DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());

#define MATERIALIZE_CASE(id) \
case arrow::Type::id: { \
using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
break; \
std::optional<std::unordered_map<int, std::vector<CompositeEntry>>> contiguous_blocks;
if (contiguous_srcs.size() > 0) {
contiguous_blocks = std::unordered_map<int, std::vector<CompositeEntry>>();
contiguous_blocks.value().reserve(contiguous_srcs.size());
for (int src_table : contiguous_srcs) {
ARROW_ASSIGN_OR_RAISE(auto flattened_blocks, FlattenSlices(src_table));
Copy link
Contributor Author

@JerAguilon JerAguilon May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we do this outside of materializeColumn. Let's say the LHS of the asof join has 1000 columns that we want to output. We don't want to flatten the the LHS slices 1000 times for each of the columns - instead, we just flatten once per contiguous table.

contiguous_blocks.value()[src_table] = std::move(flattened_blocks);
}
}

#define MATERIALIZE_CASE(id) \
case arrow::Type::id: { \
using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), \
materializeColumn<T>(field_type, i_col, contiguous_blocks)); \
break; \
}

// Build the arrays column-by-column from the rows
Expand Down Expand Up @@ -143,6 +170,7 @@ class UnmaterializedCompositeTable {
std::shared_ptr<arrow::Schema> schema;
size_t num_composite_tables;
std::unordered_map<int, std::pair<int, int>> output_col_to_src;
std::unordered_set<int> contiguous_srcs;

arrow::MemoryPool* pool;

Expand Down Expand Up @@ -204,15 +232,79 @@ class UnmaterializedCompositeTable {
return builder.Append(data + offset0, offset1 - offset0);
}

arrow::Result<std::vector<CompositeEntry>> FlattenSlices(int table_index) {
std::vector<CompositeEntry> flattened_blocks;

arrow::RecordBatch* active_rb = NULL;
size_t start = -1;
size_t end = -1;

for (const auto& slice : slices) {
Copy link
Contributor Author

@JerAguilon JerAguilon May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not self-evident, the asof-join works by creating a CompositeEntry for each output row.

Since these so-called "contiguous inputs" are Sliceable, we squash these entries down as a preprocessing step. For example, suppose slices has a LHS table that looks like this:

{rb_addr: 1234, start: 1, end: 2},
{rb_addr: 1234, start: 2, end: 3},
{rb_addr: 1234, start: 3, end: 4},
...
{rb_addr: 1234, start: 3, end: 1001},
{rb_addr: 4321, start: 100001, end: 100002},
{rb_addr: 4321, start: 100002, end: 100003},
...
{rb_addr: 4321, start: 100002, end: 123456},

It's be silly to find derive slices in this potentially long vector for every column we mean to output. Thus, this function will squash this down to a very compact vector:

{rb_addr: 1234, start: 1, end: 1001},
{rb_addr: 4321, start: 100001, end: 123456},

Which we can quickly use to slice the appropriate output column(s).

const auto& [batch, block_start, block_end] = slice.components[table_index];
if (active_rb == NULL) {
active_rb = batch;
start = block_start;
end = block_end;
} else if (active_rb == batch && block_start == end) {
end = block_end;
} else {
flattened_blocks.push_back({active_rb, start, end});
active_rb = batch;
start = block_start;
end = block_end;
}
DCHECK_NE(active_rb, NULL);
}
// flush the last batch
flattened_blocks.push_back({active_rb, start, end});
return flattened_blocks;
}

template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
const std::shared_ptr<arrow::DataType>& type, int i_col) {
const std::shared_ptr<arrow::DataType>& type, int i_col,
const std::optional<std::unordered_map<int, std::vector<CompositeEntry>>>&
contiguous_blocks) {
const auto& [table_index, column_index] = output_col_to_src[i_col];

if (contiguous_blocks.has_value() &&
contiguous_blocks.value().find(table_index) != contiguous_blocks.value().end()) {
return materializeContiguous<Type>(contiguous_blocks.value().at(table_index), type,
column_index);
}
return materializeRowByRow<Type>(type, table_index, column_index);
}

/// If the source table is contiguous, we can take slices of the record batch
/// directly. This is much cheaper than copying the data row-by-row into an output
/// array.
template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeContiguous(
const std::vector<CompositeEntry>& flattened_blocks,
const std::shared_ptr<arrow::DataType>& type, int column_index) {
if (Size() == 0) {
ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
return builderPtr->Finish();
}

std::vector<std::shared_ptr<arrow::Array>> col;
col.reserve(flattened_blocks.size());
for (const auto& [rb, block_start, block_end] : flattened_blocks) {
auto chunk = rb->column(column_index)
->Slice(block_start, /*length=*/(block_end - block_start + 1));
DCHECK_EQ(chunk->type()->id(), type->id());
col.push_back(std::move(chunk));
}
return arrow::Concatenate(col);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return arrow::Concatenate(col);
return arrow::Concatenate(col, pool);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass the specified this->pool down to the Concatenate call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concatenate internally copies the chunks which has its own overhead as well, though it should still outperform the row-by-row copying esp. with the flattened slices.

Just mentioning this to make sure we don't expect too much on this improvement :)

}

template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeRowByRow(
const std::shared_ptr<arrow::DataType>& type, int table_index, int column_index) {
ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));

const auto& [table_index, column_index] = output_col_to_src[i_col];

for (const auto& unmaterialized_slice : slices) {
const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
if (batch) {
Expand Down
Loading