Skip to content

Commit

Permalink
cleaup use
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 10, 2025
1 parent f03d143 commit 1428792
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ fn add_global_limit(

#[cfg(test)]
mod test {
use crate::limit_pushdown::LimitPushdown;
use crate::PhysicalOptimizerRule;
use super::*;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -382,7 +381,7 @@ mod test {

#[test]
fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero(
) -> datafusion_common::Result<()> {
) -> Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(schema)?;
let global_limit = global_limit_exec(streaming_table, 0, Some(5));
Expand All @@ -407,7 +406,7 @@ mod test {

#[test]
fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_limit_when_skip_is_nonzero(
) -> datafusion_common::Result<()> {
) -> Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(schema)?;
let global_limit = global_limit_exec(streaming_table, 2, Some(5));
Expand All @@ -433,7 +432,7 @@ mod test {

#[test]
fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limit(
) -> datafusion_common::Result<()> {
) -> Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(Arc::clone(&schema))?;
let repartition = repartition_exec(streaming_table)?;
Expand Down Expand Up @@ -472,7 +471,7 @@ mod test {
}

#[test]
fn pushes_global_limit_exec_through_projection_exec() -> datafusion_common::Result<()>
fn pushes_global_limit_exec_through_projection_exec() -> Result<()>
{
let schema = create_schema();
let streaming_table = streaming_table_exec(Arc::clone(&schema))?;
Expand Down Expand Up @@ -505,7 +504,7 @@ mod test {

#[test]
fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batches_exec_into_fetching_version(
) -> datafusion_common::Result<()> {
) -> Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(Arc::clone(&schema)).unwrap();
let coalesce_batches = coalesce_batches_exec(streaming_table);
Expand Down Expand Up @@ -536,7 +535,7 @@ mod test {
}

#[test]
fn pushes_global_limit_into_multiple_fetch_plans() -> datafusion_common::Result<()> {
fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(Arc::clone(&schema)).unwrap();
let coalesce_batches = coalesce_batches_exec(streaming_table);
Expand Down Expand Up @@ -584,7 +583,7 @@ mod test {

#[test]
fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
) -> datafusion_common::Result<()> {
) -> Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(Arc::clone(&schema))?;
let repartition = repartition_exec(streaming_table)?;
Expand Down Expand Up @@ -618,7 +617,7 @@ mod test {
}

#[test]
fn merges_local_limit_with_local_limit() -> datafusion_common::Result<()> {
fn merges_local_limit_with_local_limit() -> Result<()> {
let schema = create_schema();
let empty_exec = empty_exec(schema);
let child_local_limit = local_limit_exec(empty_exec, 10);
Expand All @@ -643,7 +642,7 @@ mod test {
}

#[test]
fn merges_global_limit_with_global_limit() -> datafusion_common::Result<()> {
fn merges_global_limit_with_global_limit() -> Result<()> {
let schema = create_schema();
let empty_exec = empty_exec(schema);
let child_global_limit = global_limit_exec(empty_exec, 10, Some(30));
Expand All @@ -668,7 +667,7 @@ mod test {
}

#[test]
fn merges_global_limit_with_local_limit() -> datafusion_common::Result<()> {
fn merges_global_limit_with_local_limit() -> Result<()> {
let schema = create_schema();
let empty_exec = empty_exec(schema);
let local_limit = local_limit_exec(empty_exec, 40);
Expand All @@ -693,7 +692,7 @@ mod test {
}

#[test]
fn merges_local_limit_with_global_limit() -> datafusion_common::Result<()> {
fn merges_local_limit_with_global_limit() -> Result<()> {
let schema = create_schema();
let empty_exec = empty_exec(schema);
let global_limit = global_limit_exec(empty_exec, 20, Some(30));
Expand Down Expand Up @@ -727,7 +726,7 @@ mod test {

fn streaming_table_exec(
schema: SchemaRef,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(StreamingTableExec::try_new(
Arc::clone(&schema),
vec![Arc::new(DummyStreamPartition { schema }) as _],
Expand Down Expand Up @@ -772,7 +771,7 @@ mod test {
fn projection_exec(
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ProjectionExec::try_new(
vec![
(col("c1", schema.as_ref()).unwrap(), "c1".to_string()),
Expand All @@ -786,7 +785,7 @@ mod test {
fn filter_exec(
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(FilterExec::try_new(
Arc::new(BinaryExpr::new(
col("c3", schema.as_ref()).unwrap(),
Expand All @@ -809,7 +808,7 @@ mod test {

fn repartition_exec(
streaming_table: Arc<dyn ExecutionPlan>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(RepartitionExec::try_new(
streaming_table,
Partitioning::RoundRobinBatch(8),
Expand Down

0 comments on commit 1428792

Please sign in to comment.