Skip to content

Commit

Permalink
Merge pull request #28310 from rjobanp/subsource-schema-planning
Browse files Browse the repository at this point in the history
storage/sql: Refactor subsource purification (prep for source-fed table purification)
  • Loading branch information
rjobanp authored Jul 21, 2024
2 parents 65bbe8c + e48c44d commit 343710c
Show file tree
Hide file tree
Showing 19 changed files with 628 additions and 540 deletions.
12 changes: 6 additions & 6 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,28 +524,28 @@ impl Coordinator {
PurifiedStatement::PurifiedCreateSource {
create_progress_subsource_stmt,
create_source_stmt,
create_subsource_stmts,
subsources,
} => {
self.plan_purified_create_source(
&ctx,
params,
create_progress_subsource_stmt,
create_source_stmt,
create_subsource_stmts,
subsources,
)
.await
}
PurifiedStatement::PurifiedAlterSourceAddSubsources {
altered_id,
source_name,
options,
create_subsource_stmts,
subsources,
} => {
self.plan_purified_alter_source_add_subsource(
ctx.session(),
params,
altered_id,
source_name,
options,
create_subsource_stmts,
subsources,
)
.await
}
Expand Down
48 changes: 32 additions & 16 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ use mz_sql::names::{
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
SchemaSpecifier, SystemObjectId,
};
use mz_sql::plan::StatementContext;
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::mysql::{MySqlSourceDetails, ProtoMySqlSourceDetails};
use mz_storage_types::sources::postgres::{
Expand Down Expand Up @@ -382,25 +384,37 @@ impl Coordinator {
&mut self,
session: &Session,
params: Params,
id: GlobalId,
source_name: ResolvedItemName,
options: Vec<AlterSourceAddSubsourceOption<Aug>>,
create_subsource_stmts: Vec<CreateSubsourceStatement<Aug>>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
) -> Result<(Plan, ResolvedIds), AdapterError> {
let mut subsources = Vec::with_capacity(create_subsource_stmts.len());
for subsource_stmt in create_subsource_stmts {
let mut subsource_plans = Vec::with_capacity(subsources.len());

// Generate subsource statements
let conn_catalog = self.catalog().for_system_session();
let pcx = plan::PlanContext::zero();
let scx = StatementContext::new(Some(&pcx), &conn_catalog);

let source_id = *source_name.item_id();
let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;

for subsource_stmt in subsource_stmts {
let s = self
.plan_subsource(session, &params, subsource_stmt)
.await?;
subsources.push(s);
subsource_plans.push(s);
}

let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
subsources,
subsources: subsource_plans,
options,
};

Ok((
Plan::AlterSource(mz_sql::plan::AlterSourcePlan { id, action }),
Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
id: source_id,
action,
}),
ResolvedIds(BTreeSet::new()),
))
}
Expand All @@ -414,9 +428,9 @@ impl Coordinator {
params: Params,
progress_stmt: CreateSubsourceStatement<Aug>,
mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
subsource_stmts: Vec<CreateSubsourceStatement<Aug>>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
) -> Result<(Plan, ResolvedIds), AdapterError> {
let mut create_source_plans = Vec::with_capacity(subsource_stmts.len() + 2);
let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);

// 1. First plan the progress subsource.
//
Expand All @@ -440,15 +454,11 @@ impl Coordinator {

create_source_plans.push(progress_plan);

// 2. Then plan the main source.
//
// The subsources need this to exist in order to set their `OF SOURCE`
// correctly.
source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));

let resolved_ids = mz_sql::names::visit_dependencies(&source_stmt);

// Plan primary source.
// 2. Then plan the main source.
let source_plan = match self.plan_statement(
ctx.session(),
Statement::CreateSource(source_stmt),
Expand All @@ -468,15 +478,21 @@ impl Coordinator {
print_id: true,
};

// Generate subsource statements
let conn_catalog = self.catalog().for_system_session();
let pcx = plan::PlanContext::zero();
let scx = StatementContext::new(Some(&pcx), &conn_catalog);

let subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;

create_source_plans.push(CreateSourcePlanBundle {
source_id,
plan: source_plan,
resolved_ids: resolved_ids.clone(),
});

// 3. Finally, plan all the subsources
for mut stmt in subsource_stmts {
stmt.of_source = Some(of_source.clone());
for stmt in subsource_stmts {
let plan = self.plan_subsource(ctx.session(), &params, stmt).await?;
create_source_plans.push(plan);
}
Expand Down
36 changes: 19 additions & 17 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ pub struct CreateSourceStatement<T: AstInfo> {
pub if_not_exists: bool,
pub key_constraint: Option<KeyConstraint>,
pub with_options: Vec<CreateSourceOption<T>>,
pub referenced_subsources: Option<ReferencedSubsources>,
pub external_references: Option<ExternalReferences>,
pub progress_subsource: Option<DeferredItemName<T>>,
}

Expand Down Expand Up @@ -1022,7 +1022,7 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
f.write_node(envelope);
}

if let Some(subsources) = &self.referenced_subsources {
if let Some(subsources) = &self.external_references {
f.write_str(" ");
f.write_node(subsources);
}
Expand All @@ -1041,40 +1041,42 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
}
impl_display_t!(CreateSourceStatement);

/// A selected subsource in a FOR TABLES (..) statement
/// A selected external reference in a FOR TABLES (..) statement
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct CreateSourceSubsource {
pub struct ExternalReferenceExport {
pub reference: UnresolvedItemName,
pub subsource: Option<UnresolvedItemName>,
pub alias: Option<UnresolvedItemName>,
}

impl AstDisplay for CreateSourceSubsource {
impl AstDisplay for ExternalReferenceExport {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_node(&self.reference);
if let Some(subsource) = &self.subsource {
if let Some(alias) = &self.alias {
f.write_str(" AS ");
f.write_node(subsource);
f.write_node(alias);
}
}
}
impl_display!(CreateSourceSubsource);
impl_display!(ExternalReferenceExport);

/// Specifies which set of external references to generate a source export
/// for in a `CREATE SOURCE` statement.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ReferencedSubsources {
pub enum ExternalReferences {
/// A subset defined with FOR TABLES (...)
SubsetTables(Vec<CreateSourceSubsource>),
SubsetTables(Vec<ExternalReferenceExport>),
/// A subset defined with FOR SCHEMAS (...)
SubsetSchemas(Vec<Ident>),
/// FOR ALL TABLES
All,
}

impl AstDisplay for ReferencedSubsources {
impl AstDisplay for ExternalReferences {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::SubsetTables(subsources) => {
Self::SubsetTables(tables) => {
f.write_str("FOR TABLES (");
f.write_node(&display::comma_separated(subsources));
f.write_node(&display::comma_separated(tables));
f.write_str(")");
}
Self::SubsetSchemas(schemas) => {
Expand All @@ -1086,7 +1088,7 @@ impl AstDisplay for ReferencedSubsources {
}
}
}
impl_display!(ReferencedSubsources);
impl_display!(ExternalReferences);

/// An option in a `CREATE SUBSOURCE` statement.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -2462,7 +2464,7 @@ pub enum AlterSourceAction<T: AstInfo> {
SetOptions(Vec<CreateSourceOption<T>>),
ResetOptions(Vec<CreateSourceOptionName>),
AddSubsources {
subsources: Vec<CreateSourceSubsource>,
external_references: Vec<ExternalReferenceExport>,
options: Vec<AlterSourceAddSubsourceOption<T>>,
},
DropSubsources {
Expand Down Expand Up @@ -2505,7 +2507,7 @@ impl<T: AstInfo> AstDisplay for AlterSourceAction<T> {
}
}
AlterSourceAction::AddSubsources {
subsources,
external_references: subsources,
options,
} => {
f.write_str("ADD SUBSOURCE ");
Expand Down
16 changes: 8 additions & 8 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2764,14 +2764,14 @@ impl<'a> Parser<'a> {
self.expect_token(&Token::LParen)?;
let subsources = self.parse_comma_separated(Parser::parse_subsource_references)?;
self.expect_token(&Token::RParen)?;
Some(ReferencedSubsources::SubsetTables(subsources))
Some(ExternalReferences::SubsetTables(subsources))
} else if self.parse_keywords(&[FOR, SCHEMAS]) {
self.expect_token(&Token::LParen)?;
let schemas = self.parse_comma_separated(Parser::parse_identifier)?;
self.expect_token(&Token::RParen)?;
Some(ReferencedSubsources::SubsetSchemas(schemas))
Some(ExternalReferences::SubsetSchemas(schemas))
} else if self.parse_keywords(&[FOR, ALL, TABLES]) {
Some(ReferencedSubsources::All)
Some(ExternalReferences::All)
} else {
None
};
Expand Down Expand Up @@ -2802,23 +2802,23 @@ impl<'a> Parser<'a> {
envelope,
if_not_exists,
key_constraint,
referenced_subsources,
external_references: referenced_subsources,
progress_subsource,
with_options,
}))
}

fn parse_subsource_references(&mut self) -> Result<CreateSourceSubsource, ParserError> {
fn parse_subsource_references(&mut self) -> Result<ExternalReferenceExport, ParserError> {
let reference = self.parse_item_name()?;
let subsource = if self.parse_one_of_keywords(&[AS, INTO]).is_some() {
Some(self.parse_item_name()?)
} else {
None
};

Ok(CreateSourceSubsource {
Ok(ExternalReferenceExport {
reference,
subsource,
alias: subsource,
})
}

Expand Down Expand Up @@ -4843,7 +4843,7 @@ impl<'a> Parser<'a> {
source_name,
if_exists,
action: AlterSourceAction::AddSubsources {
subsources,
external_references: subsources,
options,
},
})
Expand Down
Loading

0 comments on commit 343710c

Please sign in to comment.