Skip to content

Commit

Permalink
fall back to full merge on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
suremarc committed Nov 7, 2024
1 parent c138b24 commit b469b71
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,30 @@ impl ExecutionPlan for SortPreservingMergeExec {
MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]"))
.register(&context.runtime_env().memory_pool);

let statistics = MinMaxStatistics::new_from_statistics(
// Organize the input partitions into chains,
// where elements of each chain are input partitions that are
// non-overlapping, and each chain is ordered by their min/max statistics.
let stream_packing = match MinMaxStatistics::new_from_statistics(
&self.expr,
&self.schema(),
&self.input.statistics_by_partition()?,
)?;
) {
Ok(statistics) => statistics.first_fit(),
Err(e) => {
log::debug!("error analyzing statistics for plan: {e}\nfalling back to full sort-merge");
(0..input_partitions).map(|i| vec![i]).collect()
}
};

// Organize the input partitions into chains,
// where elements of each chain are input partitions that are
// non-overlapping, and each chain is ordered by their min/max statistics.
// Then concatenate each chain into a single stream.
let mut streams = statistics
.first_fit()
// Concatenate each chain into a single stream.
let mut streams = stream_packing
.into_iter()
.map(|chain| {
let streams = chain
.into_iter()
.map(|i| self.input.execute(i, Arc::clone(&context)))
.collect::<Result<Vec<_>>>()?;

// Concatenate the chain into a single stream
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.input.schema(),
futures::stream::iter(streams).flatten(),
Expand Down

0 comments on commit b469b71

Please sign in to comment.