Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Determine ordering of file groups #9593

Merged
merged 52 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7587a07
add statistics to PartitionedFile
suremarc Nov 3, 2023
1e380b2
just dump work for now
suremarc Nov 3, 2023
263453f
working test case
suremarc Nov 3, 2023
5634bd7
fix jumbled rebase
suremarc Mar 13, 2024
7428fe0
forgot to annotate #[test]
suremarc Mar 13, 2024
4816343
more refactoring
suremarc Mar 13, 2024
c7be9e0
add a link
suremarc Mar 13, 2024
fc1a668
refactor again
suremarc Mar 13, 2024
1c42e00
whitespace
suremarc Mar 13, 2024
3446fed
format debug log
suremarc Mar 13, 2024
3fe8558
remove useless itertools
suremarc Mar 13, 2024
8ba4001
refactor test
suremarc Mar 13, 2024
9c8729a
fix bug
suremarc Mar 15, 2024
6df9832
use sort_file_groups in ListingTable
suremarc Mar 15, 2024
f855a8a
move check into a better place
suremarc Mar 15, 2024
3e5263b
refactor test a bit
suremarc Mar 15, 2024
5b7b307
more testing
suremarc Mar 15, 2024
4761096
more testing
suremarc Mar 15, 2024
a95dffa
better error message
suremarc Mar 15, 2024
1a66604
fix log msg
suremarc Mar 15, 2024
cca5f0f
fix again
suremarc Mar 15, 2024
e6e10e8
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc Mar 21, 2024
8f7a2d7
add sqllogictest and fixes
suremarc Mar 21, 2024
e9fad54
fix test
suremarc Mar 21, 2024
e982f0f
Update datafusion/core/src/datasource/listing/mod.rs
suremarc Mar 30, 2024
cc9f144
Update datafusion/core/src/datasource/physical_plan/file_scan_config.rs
suremarc Mar 30, 2024
95bb790
more unit tests
suremarc Mar 31, 2024
0e60230
rename to split_groups_by_statistics
suremarc Mar 31, 2024
9f375e8
only use groups if there's <= target_partitions
suremarc Mar 31, 2024
3d9d293
refactor a bit, no need for projected_schema
suremarc Mar 31, 2024
1366c99
fix reverse order
suremarc Mar 31, 2024
a29be69
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc Apr 9, 2024
2ef8006
save work for now
suremarc Apr 9, 2024
b112c26
Merge branch 'main' into statistics-planning
suremarc Apr 24, 2024
0153acf
lots of test cases in new slt
suremarc Apr 25, 2024
4e03528
remove output check
suremarc Apr 25, 2024
695e674
fix
suremarc Apr 25, 2024
ec4282b
fix last test
suremarc Apr 25, 2024
1030b30
comment on params
suremarc Apr 25, 2024
2f34684
clippy
suremarc Apr 25, 2024
24c0bc5
revert parquet.slt
suremarc Apr 25, 2024
61f883f
no need to pass projection separately
suremarc Apr 26, 2024
9bc29cf
Update datafusion/core/src/datasource/listing/mod.rs
suremarc Apr 30, 2024
aa89433
update comment on in
suremarc Apr 30, 2024
d7fc78a
fix test?
suremarc Apr 30, 2024
f3a69e5
un-fix?
suremarc Apr 30, 2024
1a010b7
add fix back in?
suremarc Apr 30, 2024
f41d1c9
move indices_sorted_by_min to MinMaxStatistics
suremarc May 1, 2024
15e1339
move MinMaxStatistics to its own module
suremarc May 1, 2024
a2c9b4e
fix license
suremarc May 1, 2024
d7c9af6
add feature flag
suremarc May 1, 2024
82166fd
update config
suremarc May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 135 additions & 23 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ impl FileScanConfig {
// Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
// https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html

if flattened_files.is_empty() {
return Ok(vec![]);
}

let statistics = MinMaxStatistics::new_from_files(
sort_order,
table_schema,
Expand Down Expand Up @@ -863,16 +867,17 @@ mod tests {
}

struct TestCase {
#[allow(unused)]
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<datafusion_expr::Expr>,
expected_result: Result<Vec<Vec<usize>>, &'static str>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}

use datafusion_expr::col;
let cases = vec![
TestCase {
name: "test sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
Expand All @@ -884,9 +889,44 @@ mod tests {
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec![0, 1], vec![2]]),
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
// same input but file '2' is in the middle
// test that we still order correctly
TestCase {
name: "test sort with files ordered differently",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
// FIXME: this test is broken
TestCase {
name: "reverse sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(false, true)],
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more tests here?

  1. The same input but "2" is now in the middle to ensure we sort and group them correctly
  2. All three non-overlapped files
  3. All three overlapped files
  4. Empty input

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other important cases are additional negative cases

  1. The files have values but no statistics (the groups shouldn't get reordered)
  2. One of the files has no statistics

// reject nullable sort columns
TestCase {
name: "no nullable sort columns",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
Expand All @@ -900,6 +940,62 @@ mod tests {
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column"),
},
TestCase {
name: "all three non-overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all these tests also always have the first file with the minimum stastistics value -- can you possibly also test what happens when it is not (aka add a test that runs this test with file ids 2, 1, 0)?

File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1", "2"]]),
},
TestCase {
name: "all three overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
},
TestCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please add a test for a single input file too?

name: "empty input",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![]),
},
TestCase {
name: "one file missing statistics",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![None]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics\ncaused by\ncollect min/max values\ncaused by\nError during planning: statistics not found"),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice newly added tests

];

for case in cases {
Expand Down Expand Up @@ -930,31 +1026,47 @@ mod tests {

let partitioned_files =
case.files.into_iter().map(From::from).collect::<Vec<_>>();
let results = FileScanConfig::sort_file_groups(
let result = FileScanConfig::sort_file_groups(
&table_schema,
&table_schema,
&[partitioned_files.clone()],
&sort_order,
)
.map(|file_groups| {
file_groups
.into_iter()
.map(|file_group| {
file_group
.iter()
.map(|file| {
partitioned_files
.iter()
.position(|f| f.object_meta == file.object_meta)
.unwrap()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.map_err(|e| e.to_string().leak() as &'static str);
);
let results_by_name = result
.as_ref()
.map(|file_groups| {
file_groups
.iter()
.map(|file_group| {
file_group
.iter()
.map(|file| {
partitioned_files
.iter()
.find_map(|f| {
if f.object_meta == file.object_meta {
Some(
f.object_meta
.location
.as_ref()
.rsplit('/')
.next()
.unwrap()
.trim_end_matches(".parquet"),
)
} else {
None
}
})
.unwrap()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.map_err(|e| e.to_string().leak() as &'static str);

assert_eq!(results, case.expected_result);
assert_eq!(results_by_name, case.expected_result, "{}", case.name);
}

return Ok(());
Expand Down
42 changes: 24 additions & 18 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,20 @@ fn get_projected_output_ordering(
all_orderings
}

// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please put this structure into its own module (e.g. datafusion/core/src/datasource/physical_plan/statistics.rs) so that it is easier to find

/// The min/max values are ordered by [`Self::sort_order`].
pub(crate) struct MinMaxStatistics {
min: arrow::row::Rows,
max: arrow::row::Rows,
sort_order: Vec<PhysicalSortExpr>,
}

impl MinMaxStatistics {
#[allow(unused)]
fn sort_order(&self) -> &[PhysicalSortExpr] {
&self.sort_order
}

fn new_from_files<'a>(
sort_order: &[PhysicalSortExpr],
table_schema: &SchemaRef,
Expand All @@ -535,29 +542,27 @@ impl MinMaxStatistics {
DataFusionError::Plan("Parquet file missing statistics".to_string())
})?;

let get_min_max = |i: usize| -> (Vec<ScalarValue>, Vec<ScalarValue>) {
statistics_and_partition_values
let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
Ok(statistics_and_partition_values
.iter()
.map(|(s, pv)| {
if i < s.column_statistics.len() {
(
s.column_statistics[i]
.min_value
.get_value()
.cloned()
.unwrap_or(ScalarValue::Null),
s.column_statistics[i]
.max_value
.get_value()
.cloned()
.unwrap_or(ScalarValue::Null),
)
s.column_statistics[i]
.min_value
.get_value()
.cloned()
.zip(s.column_statistics[i].max_value.get_value().cloned())
.ok_or_else(|| {
DataFusionError::Plan("statistics not found".to_string())
})
} else {
let partition_value = &pv[i - s.column_statistics.len()];
(partition_value.clone(), partition_value.clone())
Ok((partition_value.clone(), partition_value.clone()))
}
})
.unzip()
.collect::<Result<Vec<_>>>()?
.into_iter()
.unzip())
};

let (min_values, max_values): (Vec<_>, Vec<_>) = projected_schema
Expand All @@ -570,7 +575,7 @@ impl MinMaxStatistics {
e,
Some(format!("get min/max for field: '{}'", field.name())),
)
})?);
})?)?;
Ok((
ScalarValue::iter_to_array(min)?,
ScalarValue::iter_to_array(max)?,
Expand Down Expand Up @@ -662,6 +667,7 @@ impl MinMaxStatistics {
Ok(Self {
min: min.map_err(|e| e.context("build min rows"))?,
max: max.map_err(|e| e.context("build max rows"))?,
sort_order: sort_order.to_vec(),
})
}

Expand Down