From ec36f40aa80c16fec3e3c1c3c07bf6f3da525931 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 7 Jan 2025 16:54:03 +0800 Subject: [PATCH] fix(iceberg): only convert iceberg table to iceberg source for batch dql (#20045) --- src/frontend/src/handler/query.rs | 6 +++++- src/frontend/src/planner/mod.rs | 11 +++++++++++ src/frontend/src/planner/relation.rs | 8 +++++--- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index ce7cef55c0b85..4f4b1f2187aa0 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -247,7 +247,11 @@ fn gen_batch_query_plan( .. } = bind_result; - let mut planner = Planner::new_for_batch(context); + let mut planner = if matches!(bound, BoundStatement::Query(_)) { + Planner::new_for_batch_dql(context) + } else { + Planner::new_for_batch(context) + }; let mut logical = planner.plan(bound)?; let schema = logical.schema(); diff --git a/src/frontend/src/planner/mod.rs b/src/frontend/src/planner/mod.rs index 4044488fe6c70..4a899c5ffd53b 100644 --- a/src/frontend/src/planner/mod.rs +++ b/src/frontend/src/planner/mod.rs @@ -48,9 +48,20 @@ pub struct Planner { pub enum PlanFor { Stream, Batch, + /// `BatchDql` is a special mode for batch. + /// Iceberg engine table will be converted to iceberg source based on this mode. + BatchDql, } impl Planner { + pub fn new_for_batch_dql(ctx: OptimizerContextRef) -> Planner { + Planner { + ctx, + share_cache: Default::default(), + plan_for: PlanFor::BatchDql, + } + } + pub fn new_for_batch(ctx: OptimizerContextRef) -> Planner { Planner { ctx, diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 2bcaa21f7316a..4faf016a5bfe9 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -92,7 +92,9 @@ impl Planner { ); match (base_table.table_catalog.engine, self.plan_for()) { - (Engine::Hummock, PlanFor::Stream) | (Engine::Hummock, PlanFor::Batch) => { + (Engine::Hummock, PlanFor::Stream) + | (Engine::Hummock, PlanFor::Batch) + | (Engine::Hummock, PlanFor::BatchDql) => { match as_of { None | Some(AsOf::ProcessTime) @@ -105,7 +107,7 @@ impl Planner { }; Ok(scan.into()) } - (Engine::Iceberg, PlanFor::Stream) => { + (Engine::Iceberg, PlanFor::Stream) | (Engine::Iceberg, PlanFor::Batch) => { match as_of { None | Some(AsOf::VersionNum(_)) @@ -120,7 +122,7 @@ impl Planner { } Ok(scan.into()) } - (Engine::Iceberg, PlanFor::Batch) => { + (Engine::Iceberg, PlanFor::BatchDql) => { match as_of { None | Some(AsOf::VersionNum(_))