-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-41873: [Acero][C++] Reduce asof-join overhead by minimizing copies of the left hand side #41874
base: main
Are you sure you want to change the base?
Conversation
|
361756f
to
f105892
Compare
ccs for very helpful reviewers in the past: @westonpace @icexelloss @bkietz |
size_t start = -1; | ||
size_t end = -1; | ||
|
||
for (const auto& slice : slices) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not self-evident, the asof-join works by creating a CompositeEntry
for each output row.
Since these so-called "contiguous inputs" are Slice
able, we squash these entries down as a preprocessing step. For example, suppose slices
has a LHS table that looks like this:
{rb_addr: 1234, start: 1, end: 2},
{rb_addr: 1234, start: 2, end: 3},
{rb_addr: 1234, start: 3, end: 4},
...
{rb_addr: 1234, start: 3, end: 1001},
{rb_addr: 4321, start: 100001, end: 100002},
{rb_addr: 4321, start: 100002, end: 100003},
...
{rb_addr: 4321, start: 100002, end: 123456},
It's be silly to find derive slices in this potentially long vector for every column we mean to output. Thus, this function will squash this down to a very compact vector:
{rb_addr: 1234, start: 1, end: 1001},
{rb_addr: 4321, start: 100001, end: 123456},
Which we can quickly use to slice the appropriate output column(s).
contiguous_blocks = std::unordered_map<int, std::vector<CompositeEntry>>(); | ||
contiguous_blocks.value().reserve(contiguous_srcs.size()); | ||
for (int src_table : contiguous_srcs) { | ||
ARROW_ASSIGN_OR_RAISE(auto flattened_blocks, FlattenSlices(src_table)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note we do this outside of materializeColumn
. Let's say the LHS of the asof join has 1000 columns that we want to output. We don't want to flatten the the LHS slices 1000 times for each of the columns - instead, we just flatten once per contiguous table.
6b19840
to
51bc610
Compare
// we can just take zero-copy slices of it. This is much faster than how other | ||
// tables are treated, wherein we need to copy | ||
return CompositeTable{schema, inputs.size(), dst_to_src, pool, | ||
/*contiguous_sources=*/{0}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/*contiguous_sources=*/{0}}; | |
/*contiguous_srcs_=*/{0}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits.
DCHECK_EQ(chunk->type()->id(), type->id()); | ||
col.push_back(std::move(chunk)); | ||
} | ||
return arrow::Concatenate(col); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return arrow::Concatenate(col); | |
return arrow::Concatenate(col, pool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to pass the specified this->pool
down to the Concatenate
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand the idea and the overall logic looks correct to me (I'll need some more time to look into more details).
Though the existing tests should cover this good, could you add dedicated UT for the change (e.g, for UnmaterializedCompositeTable
operations with meaningful contiguous_srcs
)? I think that helps in terms of both quality and readability. Thanks.
DCHECK_EQ(chunk->type()->id(), type->id()); | ||
col.push_back(std::move(chunk)); | ||
} | ||
return arrow::Concatenate(col); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concatenate
internally copies the chunks which has its own overhead as well, though it should still outperform the row-by-row copying esp. with the flattened slices.
Just mentioning this to make sure we don't expect too much on this improvement :)
Hi @JerAguilon , I've put some comments on the PR a while ago. I wonder if you are still willing to move on with it? Or is there anything I can do to help? Thanks. |
Rationale for this change
This PR is a 30-65% performance optimization on the asof-join benchmarks, hence there are no visible behavioral/test changes.
Please read #41873, where I explain exactly why this optimization works. The idea is for the left hand side of the join, rather than copying data to the output arrays cell-by-cell, we can take
Array::Slice
s, which are zero-copy and minimal overhead. This results in large speedups that scale with the number of LHS columns we are emitting.What changes are included in this PR?
Note: To reduce merge conflict headaches, I have rebased this PR on top of #41125 since I am aware it was just accepted.this is now rebased on origin/main since this PR was mergedAside from the changes in the parent PR, the changes are mostly localized to
unmaterialized_table.h
. We add a new fieldcontiguous_srcs
, which contains the set of table IDs that can be simplySlice
d.asof_join_node.cc
simply initializes anUnmaterializedCompositeTable
with this new field:Which indicates that table ID 0 (i.e., the left hand side) can be quickly sliced.
Are these changes tested?
Yes - here are some results running
arrow-acero-asof-join-benchmark
: https://gist.github.com/JerAguilon/68568525f3818f60dc2ffcfe5eb6aba2This was run on a 32GB Apple M1 Macbook Pro
Generally, we see a 30-65% improvement in rows/sec with no discernible changes in peak memory. The scale of improvement depends on the number of columns on the LHS.
Anecdotally, there are peak memory improvements at larger scale than the benchmarks. I've personally asof-joined 50GB+ parquet files. At this size, you can accumulate a large backlog of work on the producer thread. If you emit rows faster, then the backlog can be kept lower. Furthermore, there is even larger benefit if the left hand table has variable-length data types like strings or arrays. Copying these cells is very expensive!
Are there any user-facing changes?