Skip to content

Commit

Permalink
fix: Specify row count in sort_batch for empty batch
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 16, 2024
1 parent 6ca9d10 commit fbd4092
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl PartialSortStream {
fn sort_in_mem_batches(self: &mut Pin<&mut Self>) -> Result<RecordBatch> {
let input_batch = concat_batches(&self.schema(), &self.in_mem_batches)?;
self.in_mem_batches.clear();
println!("input_batch: {:?}", input_batch);
let result = sort_batch(&input_batch, &self.expr, self.fetch)?;
if let Some(remaining_fetch) = self.fetch {
// remaining_fetch - result.num_rows() is always be >= 0
Expand Down
27 changes: 25 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, UInt32Array};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
Expand Down Expand Up @@ -617,7 +617,12 @@ pub(crate) fn sort_batch(
.map(|c| take(c.as_ref(), &indices, None))
.collect::<Result<_, _>>()?;

Ok(RecordBatch::try_new(batch.schema(), columns)?)
let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)?)
}

#[inline]
Expand Down Expand Up @@ -1008,6 +1013,8 @@ mod tests {
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeConfig;

use datafusion_common::ScalarValue;
use datafusion_physical_expr::expressions::Literal;
use futures::FutureExt;

#[tokio::test]
Expand Down Expand Up @@ -1415,4 +1422,20 @@ mod tests {

Ok(())
}

#[test]
fn test_empty_sort_batch() {
let schema = Arc::new(Schema::empty());
let options = RecordBatchOptions::new().with_row_count(Some(1));
let batch =
RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();

let expressions = vec![PhysicalSortExpr {
expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
options: SortOptions::default(),
}];

let result = sort_batch(&batch, &expressions, None).unwrap();
assert_eq!(result.num_rows(), 1);
}
}

0 comments on commit fbd4092

Please sign in to comment.