-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply projection to Statistics
in FilterExec
#13187
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,6 +258,26 @@ impl Statistics { | |
self | ||
} | ||
|
||
/// Project the statistics to the given column indices. | ||
/// | ||
/// For example, if we had statistics for columns `{"a", "b", "c"}`, | ||
/// projecting to `vec![2, 1]` would return statistics for columns `{"c", | ||
/// "b"}`. | ||
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self { | ||
let Some(projection) = projection else { | ||
return self; | ||
}; | ||
|
||
// todo: it would be nice to avoid cloning column statistics if | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. follow on PR to improve the performance: #13225 |
||
// possible (e.g. if the projection did not contain duplicates) | ||
self.column_statistics = projection | ||
.iter() | ||
.map(|&i| self.column_statistics[i].clone()) | ||
.collect(); | ||
|
||
self | ||
} | ||
|
||
/// Calculates the statistics after `fetch` and `skip` operations apply. | ||
/// Here, `self` denotes per-partition statistics. Use the `n_partitions` | ||
/// parameter to compute global statistics in a multi-partition setting. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec { | |
/// The output statistics of a filtering operation can be estimated if the | ||
/// predicate's selectivity value can be determined for the incoming data. | ||
fn statistics(&self) -> Result<Statistics> { | ||
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) | ||
let stats = Self::statistics_helper( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the global stats ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree the statistics calculation should be more sophisticated and I filed #13224 to track the idea However, I am worried about trying to change how the statistics calculations work in this PR (I outlined some challenges I see in #13224) Thus, I would like avoid doing a more substantial change in this PR (which fixes a functional bug) and we can sort out how to improve the statistics calculations as a subsequent PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense |
||
&self.input, | ||
self.predicate(), | ||
self.default_selectivity, | ||
)?; | ||
Ok(stats.project(self.projection.as_ref())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the bug fix |
||
} | ||
|
||
fn cardinality_effect(&self) -> CardinalityEffect { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91 | |
|
||
statement ok | ||
DROP TABLE test_non_utf8_binary; | ||
|
||
|
||
## Tests for https://github.com/apache/datafusion/issues/13186 | ||
statement ok | ||
create table cpu (time timestamp, usage_idle float, usage_user float, cpu int); | ||
|
||
statement ok | ||
insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3); | ||
|
||
# must put it into a parquet file to get statistics | ||
statement ok | ||
copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet'; | ||
|
||
# Run queries against parquet files | ||
statement ok | ||
create external table cpu_parquet | ||
stored as parquet | ||
location 'test_files/scratch/parquet/cpu.parquet'; | ||
|
||
# Double filtering | ||
# | ||
# Expect 1 row for both queries | ||
query PI | ||
select time, rn | ||
from ( | ||
select time, row_number() OVER (ORDER BY usage_idle, time) as rn | ||
from cpu | ||
where cpu = 3 | ||
) where rn > 0; | ||
---- | ||
1970-01-01T00:00:00 1 | ||
|
||
query PI | ||
select time, rn | ||
from ( | ||
select time, row_number() OVER (ORDER BY usage_idle, time) as rn | ||
from cpu_parquet | ||
where cpu = 3 | ||
) where rn > 0; | ||
---- | ||
1970-01-01T00:00:00 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this query errors without the fix |
||
|
||
|
||
# Clean up | ||
statement ok | ||
drop table cpu; | ||
|
||
statement ok | ||
drop table cpu_parquet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this method also be used when we project the statistics in hash_join?
https://github.com/alamb/datafusion/blob/86690fda27ddf7b983cf5597f5bde26ec70a725d/datafusion/physical-plan/src/joins/hash_join.rs#L790
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, also this implementation seems a bit more efficient than the one in hash join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 90aa0bd