-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply projection to Statistics
in FilterExec
#13187
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,6 +258,26 @@ impl Statistics { | |
self | ||
} | ||
|
||
/// Project the statistics to the given column indices. | ||
/// | ||
/// For example, if we had statistics for columns `{"a", "b", "c"}`, | ||
/// projecting to `vec![2, 1]` would return statistics for columns `{"c", | ||
/// "b"}`. | ||
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self { | ||
let Some(projection) = projection else { | ||
return self; | ||
}; | ||
|
||
// todo: it would be nice to avoid cloning column statistics if | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. follow on PR to improve the performance: #13225 |
||
// possible (e.g. if the projection did not contain duplicates) | ||
self.column_statistics = projection | ||
.iter() | ||
.map(|&i| self.column_statistics[i].clone()) | ||
.collect(); | ||
|
||
self | ||
} | ||
|
||
/// Calculates the statistics after `fetch` and `skip` operations apply. | ||
/// Here, `self` denotes per-partition statistics. Use the `n_partitions` | ||
/// parameter to compute global statistics in a multi-partition setting. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec { | |
/// The output statistics of a filtering operation can be estimated if the | ||
/// predicate's selectivity value can be determined for the incoming data. | ||
fn statistics(&self) -> Result<Statistics> { | ||
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) | ||
let stats = Self::statistics_helper( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the global stats ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree the statistics calculation should be more sophisticated and I filed #13224 to track the idea However, I am worried about trying to change how the statistics calculations work in this PR (I outlined some challenges I see in #13224) Thus, I would like avoid doing a more substantial change in this PR (which fixes a functional bug) and we can sort out how to improve the statistics calculations as a subsequent PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense |
||
&self.input, | ||
self.predicate(), | ||
self.default_selectivity, | ||
)?; | ||
Ok(stats.project(self.projection.as_ref())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the bug fix |
||
} | ||
|
||
fn cardinality_effect(&self) -> CardinalityEffect { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -780,24 +780,15 @@ impl ExecutionPlan for HashJoinExec { | |
// TODO stats: it is not possible in general to know the output size of joins | ||
// There are some special cases though, for example: | ||
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` | ||
let mut stats = estimate_join_statistics( | ||
let stats = estimate_join_statistics( | ||
Arc::clone(&self.left), | ||
Arc::clone(&self.right), | ||
self.on.clone(), | ||
&self.join_type, | ||
&self.join_schema, | ||
)?; | ||
// Project statistics if there is a projection | ||
if let Some(projection) = &self.projection { | ||
stats.column_statistics = stats | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this code also appears to assume projections never contain repeated values 🤔 I can try and improve the statistics calculation in a future PR to avoid cloning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm yeah, although that assumption probably holds at least in DF codebase (still not good to assume). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made a PR that projects the column statistics without copying as well as handling repetitions: #13225 |
||
.column_statistics | ||
.into_iter() | ||
.enumerate() | ||
.filter(|(i, _)| projection.contains(i)) | ||
.map(|(_, s)| s) | ||
.collect(); | ||
} | ||
Ok(stats) | ||
Ok(stats.project(self.projection.as_ref())) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91 | |
|
||
statement ok | ||
DROP TABLE test_non_utf8_binary; | ||
|
||
|
||
## Tests for https://github.com/apache/datafusion/issues/13186 | ||
statement ok | ||
create table cpu (time timestamp, usage_idle float, usage_user float, cpu int); | ||
|
||
statement ok | ||
insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3); | ||
|
||
# must put it into a parquet file to get statistics | ||
statement ok | ||
copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet'; | ||
|
||
# Run queries against parquet files | ||
statement ok | ||
create external table cpu_parquet | ||
stored as parquet | ||
location 'test_files/scratch/parquet/cpu.parquet'; | ||
|
||
# Double filtering | ||
# | ||
# Expect 1 row for both queries | ||
query PI | ||
select time, rn | ||
from ( | ||
select time, row_number() OVER (ORDER BY usage_idle, time) as rn | ||
from cpu | ||
where cpu = 3 | ||
) where rn > 0; | ||
---- | ||
1970-01-01T00:00:00 1 | ||
|
||
query PI | ||
select time, rn | ||
from ( | ||
select time, row_number() OVER (ORDER BY usage_idle, time) as rn | ||
from cpu_parquet | ||
where cpu = 3 | ||
) where rn > 0; | ||
---- | ||
1970-01-01T00:00:00 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this query errors without the fix |
||
|
||
|
||
# Clean up | ||
statement ok | ||
drop table cpu; | ||
|
||
statement ok | ||
drop table cpu_parquet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this method also be used when we project the statistics in hash_join?
https://github.com/alamb/datafusion/blob/86690fda27ddf7b983cf5597f5bde26ec70a725d/datafusion/physical-plan/src/joins/hash_join.rs#L790
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, also this implementation seems a bit more efficient than the one in hash join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 90aa0bd