Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/sql: Refactor subsource purification (prep for source-fed table purification) [ENG-TASK-19] #28310

Merged
merged 3 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,28 +521,28 @@ impl Coordinator {
PurifiedStatement::PurifiedCreateSource {
create_progress_subsource_stmt,
create_source_stmt,
create_subsource_stmts,
subsources,
} => {
self.plan_purified_create_source(
&ctx,
params,
create_progress_subsource_stmt,
create_source_stmt,
create_subsource_stmts,
subsources,
)
.await
}
PurifiedStatement::PurifiedAlterSourceAddSubsources {
altered_id,
source_name,
options,
create_subsource_stmts,
subsources,
} => {
self.plan_purified_alter_source_add_subsource(
ctx.session(),
params,
altered_id,
source_name,
options,
create_subsource_stmts,
subsources,
)
.await
}
Expand Down
48 changes: 32 additions & 16 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ use mz_sql::names::{
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
SchemaSpecifier, SystemObjectId,
};
use mz_sql::plan::StatementContext;
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::mysql::{MySqlSourceDetails, ProtoMySqlSourceDetails};
use mz_storage_types::sources::postgres::{
Expand Down Expand Up @@ -382,25 +384,37 @@ impl Coordinator {
&mut self,
session: &Session,
params: Params,
id: GlobalId,
source_name: ResolvedItemName,
options: Vec<AlterSourceAddSubsourceOption<Aug>>,
create_subsource_stmts: Vec<CreateSubsourceStatement<Aug>>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
) -> Result<(Plan, ResolvedIds), AdapterError> {
let mut subsources = Vec::with_capacity(create_subsource_stmts.len());
for subsource_stmt in create_subsource_stmts {
let mut subsource_plans = Vec::with_capacity(subsources.len());

// Generate subsource statements
let conn_catalog = self.catalog().for_system_session();
let pcx = plan::PlanContext::zero();
let scx = StatementContext::new(Some(&pcx), &conn_catalog);

let source_id = *source_name.item_id();
let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;

for subsource_stmt in subsource_stmts {
let s = self
.plan_subsource(session, &params, subsource_stmt)
.await?;
subsources.push(s);
subsource_plans.push(s);
}

let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
subsources,
subsources: subsource_plans,
options,
};

Ok((
Plan::AlterSource(mz_sql::plan::AlterSourcePlan { id, action }),
Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
id: source_id,
action,
}),
ResolvedIds(BTreeSet::new()),
))
}
Expand All @@ -414,9 +428,9 @@ impl Coordinator {
params: Params,
progress_stmt: CreateSubsourceStatement<Aug>,
mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
subsource_stmts: Vec<CreateSubsourceStatement<Aug>>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
) -> Result<(Plan, ResolvedIds), AdapterError> {
let mut create_source_plans = Vec::with_capacity(subsource_stmts.len() + 2);
let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);

// 1. First plan the progress subsource.
//
Expand All @@ -440,15 +454,11 @@ impl Coordinator {

create_source_plans.push(progress_plan);

// 2. Then plan the main source.
//
// The subsources need this to exist in order to set their `OF SOURCE`
// correctly.
source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));

let resolved_ids = mz_sql::names::visit_dependencies(&source_stmt);

// Plan primary source.
// 2. Then plan the main source.
let source_plan = match self.plan_statement(
ctx.session(),
Statement::CreateSource(source_stmt),
Expand All @@ -468,15 +478,21 @@ impl Coordinator {
print_id: true,
};

// Generate subsource statements
let conn_catalog = self.catalog().for_system_session();
let pcx = plan::PlanContext::zero();
let scx = StatementContext::new(Some(&pcx), &conn_catalog);

let subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;

create_source_plans.push(CreateSourcePlanBundle {
source_id,
plan: source_plan,
resolved_ids: resolved_ids.clone(),
});

// 3. Finally, plan all the subsources
for mut stmt in subsource_stmts {
stmt.of_source = Some(of_source.clone());
for stmt in subsource_stmts {
let plan = self.plan_subsource(ctx.session(), &params, stmt).await?;
create_source_plans.push(plan);
}
Expand Down
36 changes: 19 additions & 17 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ pub struct CreateSourceStatement<T: AstInfo> {
pub if_not_exists: bool,
pub key_constraint: Option<KeyConstraint>,
pub with_options: Vec<CreateSourceOption<T>>,
pub referenced_subsources: Option<ReferencedSubsources>,
pub external_references: Option<ExternalReferences>,
pub progress_subsource: Option<DeferredItemName<T>>,
}

Expand Down Expand Up @@ -1022,7 +1022,7 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
f.write_node(envelope);
}

if let Some(subsources) = &self.referenced_subsources {
if let Some(subsources) = &self.external_references {
f.write_str(" ");
f.write_node(subsources);
}
Expand All @@ -1041,40 +1041,42 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
}
impl_display_t!(CreateSourceStatement);

/// A selected subsource in a FOR TABLES (..) statement
/// A selected external reference in a FOR TABLES (..) statement
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct CreateSourceSubsource {
pub struct ExternalReferenceExport {
pub reference: UnresolvedItemName,
pub subsource: Option<UnresolvedItemName>,
pub alias: Option<UnresolvedItemName>,
}

impl AstDisplay for CreateSourceSubsource {
impl AstDisplay for ExternalReferenceExport {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_node(&self.reference);
if let Some(subsource) = &self.subsource {
if let Some(alias) = &self.alias {
f.write_str(" AS ");
f.write_node(subsource);
f.write_node(alias);
}
}
}
impl_display!(CreateSourceSubsource);
impl_display!(ExternalReferenceExport);

/// Specifies which set of external references to generate a source export
/// for in a `CREATE SOURCE` statement.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ReferencedSubsources {
pub enum ExternalReferences {
/// A subset defined with FOR TABLES (...)
SubsetTables(Vec<CreateSourceSubsource>),
SubsetTables(Vec<ExternalReferenceExport>),
/// A subset defined with FOR SCHEMAS (...)
SubsetSchemas(Vec<Ident>),
/// FOR ALL TABLES
All,
}

impl AstDisplay for ReferencedSubsources {
impl AstDisplay for ExternalReferences {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::SubsetTables(subsources) => {
Self::SubsetTables(tables) => {
f.write_str("FOR TABLES (");
f.write_node(&display::comma_separated(subsources));
f.write_node(&display::comma_separated(tables));
f.write_str(")");
}
Self::SubsetSchemas(schemas) => {
Expand All @@ -1086,7 +1088,7 @@ impl AstDisplay for ReferencedSubsources {
}
}
}
impl_display!(ReferencedSubsources);
impl_display!(ExternalReferences);

/// An option in a `CREATE SUBSOURCE` statement.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -2462,7 +2464,7 @@ pub enum AlterSourceAction<T: AstInfo> {
SetOptions(Vec<CreateSourceOption<T>>),
ResetOptions(Vec<CreateSourceOptionName>),
AddSubsources {
subsources: Vec<CreateSourceSubsource>,
external_references: Vec<ExternalReferenceExport>,
options: Vec<AlterSourceAddSubsourceOption<T>>,
},
DropSubsources {
Expand Down Expand Up @@ -2505,7 +2507,7 @@ impl<T: AstInfo> AstDisplay for AlterSourceAction<T> {
}
}
AlterSourceAction::AddSubsources {
subsources,
external_references: subsources,
options,
} => {
f.write_str("ADD SUBSOURCE ");
Expand Down
16 changes: 8 additions & 8 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2764,14 +2764,14 @@ impl<'a> Parser<'a> {
self.expect_token(&Token::LParen)?;
let subsources = self.parse_comma_separated(Parser::parse_subsource_references)?;
self.expect_token(&Token::RParen)?;
Some(ReferencedSubsources::SubsetTables(subsources))
Some(ExternalReferences::SubsetTables(subsources))
} else if self.parse_keywords(&[FOR, SCHEMAS]) {
self.expect_token(&Token::LParen)?;
let schemas = self.parse_comma_separated(Parser::parse_identifier)?;
self.expect_token(&Token::RParen)?;
Some(ReferencedSubsources::SubsetSchemas(schemas))
Some(ExternalReferences::SubsetSchemas(schemas))
} else if self.parse_keywords(&[FOR, ALL, TABLES]) {
Some(ReferencedSubsources::All)
Some(ExternalReferences::All)
} else {
None
};
Expand Down Expand Up @@ -2802,23 +2802,23 @@ impl<'a> Parser<'a> {
envelope,
if_not_exists,
key_constraint,
referenced_subsources,
external_references: referenced_subsources,
progress_subsource,
with_options,
}))
}

fn parse_subsource_references(&mut self) -> Result<CreateSourceSubsource, ParserError> {
fn parse_subsource_references(&mut self) -> Result<ExternalReferenceExport, ParserError> {
let reference = self.parse_item_name()?;
let subsource = if self.parse_one_of_keywords(&[AS, INTO]).is_some() {
Some(self.parse_item_name()?)
} else {
None
};

Ok(CreateSourceSubsource {
Ok(ExternalReferenceExport {
reference,
subsource,
alias: subsource,
})
}

Expand Down Expand Up @@ -4843,7 +4843,7 @@ impl<'a> Parser<'a> {
source_name,
if_exists,
action: AlterSourceAction::AddSubsources {
subsources,
external_references: subsources,
options,
},
})
Expand Down
Loading
Loading