Skip to content

Commit

Permalink
fix: fix incorrect schema in window expr in subquery with lazy_materi…
Browse files Browse the repository at this point in the history
…alization (databendlabs#14895)

fix incorrect schema in window expr in subquery with lazy_materialization
  • Loading branch information
ariesdevil authored Mar 10, 2024
1 parent e54cd2f commit 4cc7064
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 15 deletions.
40 changes: 28 additions & 12 deletions src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,30 @@ impl Binder {
return Ok(());
}

let mut metadata = self.metadata.write();
if metadata.tables().len() != 1 {
// Only support single table query.
return Ok(());
}

// As we don't if this is subquery, we need add required cols to metadata's non_lazy_columns,
// so if the inner query not match the lazy materialized but outer query matched, we can prevent
// the cols that inner query required not be pruned when analyze outer query.
{
let mut non_lazy_cols = HashSet::new();
for s in select_list.items.iter() {
// The TableScan's schema uses name_mapping to prune columns,
// all lazy columns will be skipped to add to name_mapping in TableScan.
// When build physical window plan, if window's order by or partition by provided,
// we need create a `EvalScalar` for physical window inputs, so we should keep the window
// used cols not be pruned.
if let ScalarExpr::WindowFunction(_) = &s.scalar {
non_lazy_cols.extend(s.scalar.used_columns())
}
}
metadata.add_non_lazy_columns(non_lazy_cols);
}

let limit_threadhold = self.ctx.get_settings().get_lazy_read_threshold()? as usize;

let where_cols = where_scalar
Expand All @@ -838,12 +862,6 @@ impl Binder {
return Ok(());
}

let mut metadata = self.metadata.write();
if metadata.tables().len() != 1 {
// Only support single table query.
return Ok(());
}

if !metadata
.table(0)
.table()
Expand Down Expand Up @@ -881,13 +899,8 @@ impl Binder {

let mut select_cols = HashSet::with_capacity(select_list.items.len());
for s in select_list.items.iter() {
// The TableScan's schema uses name_mapping to prune columns,
// all lazy columns will be skipped to add to name_mapping in TableScan.
// When build physical window plan, if window's order by or partition by proviede,
// we need create a `EvalScalar` for physical window inputs, so we should keep the window
// used cols not be pruned.
if let ScalarExpr::WindowFunction(_) = &s.scalar {
non_lazy_cols.extend(s.scalar.used_columns())
continue;
} else {
select_cols.extend(s.scalar.used_columns())
}
Expand All @@ -911,6 +924,9 @@ impl Binder {
// add internal_cols to non_lazy_cols
non_lazy_cols.extend(internal_cols);

// add previous(subquery) stored non_lazy_columns to non_lazy_cols
non_lazy_cols.extend(metadata.non_lazy_columns());

let lazy_cols = select_cols.difference(&non_lazy_cols).copied().collect();
metadata.add_lazy_columns(lazy_cols);

Expand Down
14 changes: 14 additions & 0 deletions src/query/sql/src/planner/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub struct Metadata {
columns: Vec<ColumnEntry>,
/// Columns that are lazy materialized.
lazy_columns: HashSet<IndexType>,
/// Columns that are used for compute lazy materialized.
/// If outer query match the lazy materialized rule but inner query doesn't,
/// we need add cols that inner query required to non_lazy_columns
/// to prevent these cols to be pruned.
non_lazy_columns: HashSet<IndexType>,
/// Mappings from table index to _row_id column index.
table_row_id_index: HashMap<IndexType, IndexType>,
agg_indexes: HashMap<String, Vec<(u64, String, SExpr)>>,
Expand Down Expand Up @@ -129,10 +134,19 @@ impl Metadata {
self.lazy_columns.extend(indices);
}

pub fn add_non_lazy_columns(&mut self, indices: HashSet<usize>) {
debug_assert!(indices.iter().all(|i| *i < self.columns.len()));
self.non_lazy_columns.extend(indices);
}

pub fn lazy_columns(&self) -> &HashSet<usize> {
&self.lazy_columns
}

pub fn non_lazy_columns(&self) -> &HashSet<usize> {
&self.non_lazy_columns
}

pub fn set_table_row_id_index(&mut self, table_index: IndexType, row_id_index: IndexType) {
self.table_row_id_index.insert(table_index, row_id_index);
}
Expand Down
27 changes: 24 additions & 3 deletions tests/sqllogictests/suites/mode/standalone/explain/window.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ DROP TABLE IF EXISTS empsalary
statement ok
CREATE TABLE empsalary (depname string, empno bigint, salary int, enroll_date date)

query
query T
explain SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno
----
Sort
Expand All @@ -36,7 +36,7 @@ Sort
statement ok
set max_threads=4;

query
query T
explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno;
----
CompoundBlockOperator(Project) × 1 processor
Expand Down Expand Up @@ -390,7 +390,7 @@ CompoundBlockOperator(Project) × 1 processor
SyncReadParquetDataSource × 1 processor

# row fetch with window function(plan explain)
query
query T
explain select *, sum(a) over (partition by a order by a desc rows between unbounded preceding and current row) from t where a > 1 order by b limit 3
----
RowFetch
Expand Down Expand Up @@ -426,5 +426,26 @@ RowFetch
├── push downs: [filters: [is_true(t.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
drop table if exists table43764_orc

statement ok
CREATE TABLE table43764_orc (rowkey VARCHAR NOT NULL, time TIMESTAMP NULL, sirc_action VARCHAR NULL, sirc_operation_count DECIMAL(36, 16) NULL, akc087 VARCHAR NULL, aae035 VARCHAR)

# window in subquery
query T
explain pipeline select time, rowkey from (select *, row_number() over(partition by rowkey order by time desc) as rn from table43764_orc) a where rn = 1 limit 4
----
CompoundBlockOperator(Project) × 1 processor
LimitTransform × 1 processor
TransformFilter × 1 processor
Transform Window × 1 processor
Merge (TransformSortMerge × 4 processors) to (Transform Window × 1)
TransformSortMerge × 4 processors
SortPartialTransform × 4 processors
Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4)
DeserializeDataTransform × 1 processor
SyncReadParquetDataSource × 1 processor

statement ok
DROP DATABASE test_explain_window;

0 comments on commit 4cc7064

Please sign in to comment.