From 7709b385262b40dac01213ec3afeca15d13e98ed Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Wed, 29 May 2024 23:42:55 +0900 Subject: [PATCH 1/4] Asof join - minimize copies on the left hand side of the join --- cpp/src/arrow/acero/asof_join_node.cc | 7 +- cpp/src/arrow/acero/unmaterialized_table.h | 109 +++++++++++++++++++-- 2 files changed, 106 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 848cbdf7506ad..86ca6e7f8179c 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -912,7 +912,12 @@ class CompositeTableBuilder { } } } - return CompositeTable{schema, inputs.size(), dst_to_src, pool}; + // NB: The left hand side of the join (index 0) is the reference table of the + // asof-join. The output is comprised of continuous slices of the this table, so + // we can just take zero-copy slices of it. This is much faster than how other + // tables are treated, wherein we need to copy + return CompositeTable{schema, inputs.size(), dst_to_src, pool, + /*contiguous_sources=*/{0}}; } }; diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h index 05d6c866936e0..6444df6d463a2 100644 --- a/cpp/src/arrow/acero/unmaterialized_table.h +++ b/cpp/src/arrow/acero/unmaterialized_table.h @@ -18,10 +18,12 @@ #pragma once #include +#include #include #include "arrow/array/builder_base.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_primitive.h" +#include "arrow/array/concatenate.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/type_traits.h" @@ -55,13 +57,26 @@ class UnmaterializedSliceBuilder; template class UnmaterializedCompositeTable { public: + /// \brief Initialized an UnmaterializedCompositeTable. + /// + /// \param[in] output_schema the schema of the output table. + /// \param[in] num_composite_tables number of composite tables in the output schema. + /// \param[in] output_col_to_src_ mapping from output column to source table ID and + /// column ID. Note that src table IDs should range from 0 to num_composite_tables - 1, + /// inclusive. + /// \param[in] contiguous_srcs_ set of source table IDs that are contiguous. + /// A contiguous source table is one where all slices are from contiguous slices of the + /// input record batches. This allows the materializer to minimize copies by taking + /// contiguous `RecordBatch::Slice`'s. UnmaterializedCompositeTable( const std::shared_ptr& output_schema, size_t num_composite_tables, std::unordered_map> output_col_to_src_, - arrow::MemoryPool* pool_ = arrow::default_memory_pool()) + arrow::MemoryPool* pool_ = arrow::default_memory_pool(), + std::unordered_set contiguous_srcs_ = {}) : schema(output_schema), num_composite_tables(num_composite_tables), output_col_to_src(std::move(output_col_to_src_)), + contiguous_srcs(std::move(contiguous_srcs_)), pool{pool_} {} // Shallow wrappers around std::vector for performance @@ -79,11 +94,22 @@ class UnmaterializedCompositeTable { DCHECK_LE(Size(), (uint64_t)std::numeric_limits::max()); std::vector> arrays(schema->num_fields()); -#define MATERIALIZE_CASE(id) \ - case arrow::Type::id: { \ - using T = typename arrow::TypeIdTraits::Type; \ - ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn(field_type, i_col)); \ - break; \ + std::optional>> contiguous_blocks; + if (contiguous_srcs.size() > 0) { + contiguous_blocks = std::unordered_map>(); + contiguous_blocks.value().reserve(contiguous_srcs.size()); + for (int src_table : contiguous_srcs) { + ARROW_ASSIGN_OR_RAISE(auto flattened_blocks, FlattenSlices(src_table)); + contiguous_blocks.value()[src_table] = std::move(flattened_blocks); + } + } + +#define MATERIALIZE_CASE(id) \ + case arrow::Type::id: { \ + using T = typename arrow::TypeIdTraits::Type; \ + ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), \ + materializeColumn(field_type, i_col, contiguous_blocks)); \ + break; \ } // Build the arrays column-by-column from the rows @@ -143,6 +169,7 @@ class UnmaterializedCompositeTable { std::shared_ptr schema; size_t num_composite_tables; std::unordered_map> output_col_to_src; + std::unordered_set contiguous_srcs; arrow::MemoryPool* pool; @@ -204,15 +231,79 @@ class UnmaterializedCompositeTable { return builder.Append(data + offset0, offset1 - offset0); } + arrow::Result> FlattenSlices(int table_index) { + std::vector flattened_blocks; + + arrow::RecordBatch* active_rb = nullptr; + size_t start = -1; + size_t end = -1; + + for (const auto& slice : slices) { + const auto& [batch, block_start, block_end] = slice.components[table_index]; + if (active_rb == nullptr) { + active_rb = batch; + start = block_start; + end = block_end; + } else if (active_rb == batch && block_start == end) { + end = block_end; + } else { + flattened_blocks.push_back({active_rb, start, end}); + active_rb = batch; + start = block_start; + end = block_end; + } + DCHECK_NE(active_rb, nullptr); + } + // flush the last batch + flattened_blocks.push_back({active_rb, start, end}); + return flattened_blocks; + } + template ::BuilderType> arrow::Result> materializeColumn( - const std::shared_ptr& type, int i_col) { + const std::shared_ptr& type, int i_col, + const std::optional>>& + contiguous_blocks) { + const auto& [table_index, column_index] = output_col_to_src[i_col]; + + if (contiguous_blocks.has_value() && + contiguous_blocks.value().find(table_index) != contiguous_blocks.value().end()) { + return materializeContiguous(contiguous_blocks.value().at(table_index), type, + column_index); + } + return materializeRowByRow(type, table_index, column_index); + } + + /// If the source table is contiguous, we can take slices of the record batch + /// directly. This is much cheaper than copying the data row-by-row into an output + /// array. + template ::BuilderType> + arrow::Result> materializeContiguous( + const std::vector& flattened_blocks, + const std::shared_ptr& type, int column_index) { + if (Size() == 0) { + ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool)); + return builderPtr->Finish(); + } + + std::vector> col; + col.reserve(flattened_blocks.size()); + for (const auto& [rb, block_start, block_end] : flattened_blocks) { + auto chunk = rb->column(column_index) + ->Slice(block_start, /*length=*/(block_end - block_start + 1)); + DCHECK_EQ(chunk->type()->id(), type->id()); + col.push_back(std::move(chunk)); + } + return arrow::Concatenate(col); + } + + template ::BuilderType> + arrow::Result> materializeRowByRow( + const std::shared_ptr& type, int table_index, int column_index) { ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool)); Builder& builder = *arrow::internal::checked_cast(builderPtr.get()); ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - const auto& [table_index, column_index] = output_col_to_src[i_col]; - for (const auto& unmaterialized_slice : slices) { const auto& [batch, start, end] = unmaterialized_slice.components[table_index]; if (batch) { From ffe38b0286be8e0cbc48aed091fb119a95f0366c Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Thu, 30 May 2024 00:47:01 +0900 Subject: [PATCH 2/4] lint --- cpp/src/arrow/acero/unmaterialized_table.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h index 6444df6d463a2..d2d97a2e0afb9 100644 --- a/cpp/src/arrow/acero/unmaterialized_table.h +++ b/cpp/src/arrow/acero/unmaterialized_table.h @@ -234,13 +234,13 @@ class UnmaterializedCompositeTable { arrow::Result> FlattenSlices(int table_index) { std::vector flattened_blocks; - arrow::RecordBatch* active_rb = nullptr; + arrow::RecordBatch* active_rb = NULL; size_t start = -1; size_t end = -1; for (const auto& slice : slices) { const auto& [batch, block_start, block_end] = slice.components[table_index]; - if (active_rb == nullptr) { + if (active_rb == NULL) { active_rb = batch; start = block_start; end = block_end; @@ -252,7 +252,7 @@ class UnmaterializedCompositeTable { start = block_start; end = block_end; } - DCHECK_NE(active_rb, nullptr); + DCHECK_NE(active_rb, NULL); } // flush the last batch flattened_blocks.push_back({active_rb, start, end}); From 51bc610e656249304fa78731196aa27cdae809d7 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Thu, 30 May 2024 12:05:19 +0900 Subject: [PATCH 3/4] rebase onto origin/main now that https://github.com/apache/arrow/pull/41125 is merged From 500e568ea9ab297cf2e0ca2bbe67cfe8e03183d3 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Thu, 30 May 2024 12:22:22 +0900 Subject: [PATCH 4/4] Document a param --- cpp/src/arrow/acero/unmaterialized_table.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h index d2d97a2e0afb9..f8802f0d4eeb9 100644 --- a/cpp/src/arrow/acero/unmaterialized_table.h +++ b/cpp/src/arrow/acero/unmaterialized_table.h @@ -64,6 +64,7 @@ class UnmaterializedCompositeTable { /// \param[in] output_col_to_src_ mapping from output column to source table ID and /// column ID. Note that src table IDs should range from 0 to num_composite_tables - 1, /// inclusive. + /// \param[in] pool_ memory pool to use for allocations. /// \param[in] contiguous_srcs_ set of source table IDs that are contiguous. /// A contiguous source table is one where all slices are from contiguous slices of the /// input record batches. This allows the materializer to minimize copies by taking