Skip to content

Commit

Permalink
Asof join - minimize copies on the left hand side of the join
Browse files Browse the repository at this point in the history
  • Loading branch information
JerAguilon committed May 29, 2024
1 parent f7b45df commit f105892
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 10 deletions.
7 changes: 6 additions & 1 deletion cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,12 @@ class CompositeTableBuilder {
}
}
}
return CompositeTable{schema, inputs.size(), dst_to_src, pool};
// NB: The left hand side of the join (index 0) is the reference table of the
// asof-join. The output is comprised of continuous slices of the this table, so
// we can just take zero-copy slices of it. This is much faster than how other
// tables are treated, wherein we need to copy
return CompositeTable{schema, inputs.size(), dst_to_src, pool,
/*contiguous_sources=*/{0}};
}
};

Expand Down
109 changes: 100 additions & 9 deletions cpp/src/arrow/acero/unmaterialized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <optional>
#include <unordered_set>
#include <vector>
#include "arrow/array/builder_base.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/type_traits.h"
Expand Down Expand Up @@ -55,13 +57,26 @@ class UnmaterializedSliceBuilder;
template <size_t MAX_COMPOSITE_TABLES>
class UnmaterializedCompositeTable {
public:
/// \brief Initialized an UnmaterializedCompositeTable.
///
/// \param[in] output_schema the schema of the output table.
/// \param[in] num_composite_tables number of composite tables in the output schema.
/// \param[in] output_col_to_src_ mapping from output column to source table ID and
/// column ID. Note that src table IDs should range from 0 to num_composite_tables - 1,
/// inclusive.
/// \param[in] contiguous_srcs_ set of source table IDs that are contiguous.
/// A contiguous source table is one where all slices are from contiguous slices of the
/// input record batches. This allows the materializer to minimize copies by taking
/// contiguous `RecordBatch::Slice`'s.
UnmaterializedCompositeTable(
const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
arrow::MemoryPool* pool_ = arrow::default_memory_pool())
arrow::MemoryPool* pool_ = arrow::default_memory_pool(),
std::unordered_set<int> contiguous_srcs_ = {})
: schema(output_schema),
num_composite_tables(num_composite_tables),
output_col_to_src(std::move(output_col_to_src_)),
contiguous_srcs(std::move(contiguous_srcs_)),
pool{pool_} {}

// Shallow wrappers around std::vector for performance
Expand All @@ -79,11 +94,22 @@ class UnmaterializedCompositeTable {
DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());

#define MATERIALIZE_CASE(id) \
case arrow::Type::id: { \
using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
break; \
std::optional<std::unordered_map<int, std::vector<CompositeEntry>>> contiguous_blocks;
if (contiguous_srcs.size() > 0) {
contiguous_blocks = std::unordered_map<int, std::vector<CompositeEntry>>();
contiguous_blocks.value().reserve(contiguous_srcs.size());
for (int src_table : contiguous_srcs) {
ARROW_ASSIGN_OR_RAISE(auto flattened_blocks, FlattenSlices(src_table));
contiguous_blocks.value()[src_table] = std::move(flattened_blocks);
}
}

#define MATERIALIZE_CASE(id) \
case arrow::Type::id: { \
using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), \
materializeColumn<T>(field_type, i_col, contiguous_blocks)); \
break; \
}

// Build the arrays column-by-column from the rows
Expand Down Expand Up @@ -143,6 +169,7 @@ class UnmaterializedCompositeTable {
std::shared_ptr<arrow::Schema> schema;
size_t num_composite_tables;
std::unordered_map<int, std::pair<int, int>> output_col_to_src;
std::unordered_set<int> contiguous_srcs;

arrow::MemoryPool* pool;

Expand Down Expand Up @@ -204,15 +231,79 @@ class UnmaterializedCompositeTable {
return builder.Append(data + offset0, offset1 - offset0);
}

arrow::Result<std::vector<CompositeEntry>> FlattenSlices(int table_index) {
std::vector<CompositeEntry> flattened_blocks;

arrow::RecordBatch* active_rb = nullptr;
size_t start = -1;
size_t end = -1;

for (const auto& slice : slices) {
const auto& [batch, block_start, block_end] = slice.components[table_index];
if (active_rb == nullptr) {
active_rb = batch;
start = block_start;
end = block_end;
} else if (active_rb == batch && block_start == end) {
end = block_end;
} else {
flattened_blocks.push_back({active_rb, start, end});
active_rb = batch;
start = block_start;
end = block_end;
}
DCHECK_NE(active_rb, nullptr);
}
// flush the last batch
flattened_blocks.push_back({active_rb, start, end});
return flattened_blocks;
}

template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
const std::shared_ptr<arrow::DataType>& type, int i_col) {
const std::shared_ptr<arrow::DataType>& type, int i_col,
const std::optional<std::unordered_map<int, std::vector<CompositeEntry>>>&
contiguous_blocks) {
const auto& [table_index, column_index] = output_col_to_src[i_col];

if (contiguous_blocks.has_value() &&
contiguous_blocks.value().find(table_index) != contiguous_blocks.value().end()) {
return materializeContiguous<Type>(contiguous_blocks.value().at(table_index), type,
column_index);
}
return materializeRowByRow<Type>(type, table_index, column_index);
}

/// If the source table is contiguous, we can take slices of the record batch
/// directly. This is much cheaper than copying the data row-by-row into an output
/// array.
template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeContiguous(
const std::vector<CompositeEntry>& flattened_blocks,
const std::shared_ptr<arrow::DataType>& type, int column_index) {
if (Size() == 0) {
ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
return builderPtr->Finish();
}

std::vector<std::shared_ptr<arrow::Array>> col;
col.reserve(flattened_blocks.size());
for (const auto& [rb, block_start, block_end] : flattened_blocks) {
auto chunk = rb->column(column_index)
->Slice(block_start, /*length=*/(block_end - block_start + 1));
DCHECK_EQ(chunk->type()->id(), type->id());
col.push_back(std::move(chunk));
}
return arrow::Concatenate(col);
}

template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
arrow::Result<std::shared_ptr<arrow::Array>> materializeRowByRow(
const std::shared_ptr<arrow::DataType>& type, int table_index, int column_index) {
ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));

const auto& [table_index, column_index] = output_col_to_src[i_col];

for (const auto& unmaterialized_slice : slices) {
const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
if (batch) {
Expand Down

0 comments on commit f105892

Please sign in to comment.