Skip to content

Commit

Permalink
adapter/sources: Support custom timelines on source tables to handle …
Browse files Browse the repository at this point in the history
…CdCv2 envelope type correctly
  • Loading branch information
rjobanp committed Oct 17, 2024
1 parent 0f0d3e3 commit dd64a2a
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl Catalog {
}

/// For the Sources ids in `ids`, return the read policies for all `ids` and additional ids that
/// propagate from them. Specifically, `ids` contains a source, it and all of its subsources
/// propagate from them. Specifically, if `ids` contains a source, it and all of its source exports
/// will be added to the result.
pub fn source_read_policies(
&self,
Expand Down
14 changes: 9 additions & 5 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,11 @@ impl CatalogState {
let mut updates = self
.pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);

if let TableDataSource::DataSource(data_source) = &table.data_source {
if let TableDataSource::DataSource {
desc: data_source,
timeline: _,
} = &table.data_source
{
updates.extend(match data_source {
DataSourceDesc::IngestionExport {
ingestion_id,
Expand Down Expand Up @@ -753,7 +757,7 @@ impl CatalogState {
// Use initial lcw so that we can tell apart default from non-existent windows.
if let Some(cw) = entry.item().initial_logical_compaction_window() {
updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
// Propagate subsource changes.
// Propagate source export changes.
for (cw, mut ids) in self.source_compaction_windows([id]) {
// Id already accounted for above.
ids.remove(&id);
Expand Down Expand Up @@ -805,10 +809,10 @@ impl CatalogState {
.ast
.to_ast_string_redacted()
});
let source_id = if let TableDataSource::DataSource(DataSourceDesc::IngestionExport {
ingestion_id,
let source_id = if let TableDataSource::DataSource {
desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
..
}) = &table.data_source
} = &table.data_source
{
Some(ingestion_id.to_string())
} else {
Expand Down
109 changes: 67 additions & 42 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,26 +832,30 @@ impl CatalogState {
mz_sql::plan::TableDataSource::TableWrites { defaults } => {
TableDataSource::TableWrites { defaults }
}
mz_sql::plan::TableDataSource::DataSource(data_source_desc) => {
match data_source_desc {
mz_sql::plan::DataSourceDesc::IngestionExport {
mz_sql::plan::TableDataSource::DataSource {
desc: data_source_desc,
timeline,
} => match data_source_desc {
mz_sql::plan::DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
} => TableDataSource::DataSource {
desc: DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
} => TableDataSource::DataSource(DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
}),
_ => {
return Err(AdapterError::Unstructured(anyhow::anyhow!(
"unsupported data source for table"
)))
}
},
timeline,
},
_ => {
return Err(AdapterError::Unstructured(anyhow::anyhow!(
"unsupported data source for table"
)))
}
}
},
},
}),
Plan::CreateSource(CreateSourcePlan {
Expand Down Expand Up @@ -2052,7 +2056,7 @@ impl CatalogState {

/// For the Sources ids in `ids`, return the compaction windows for all `ids` and additional ids
/// that propagate from them. Specifically, if `ids` contains a source, it and all of its
/// subsources will be added to the result.
/// source exports will be added to the result.
pub fn source_compaction_windows(
&self,
ids: impl IntoIterator<Item = GlobalId>,
Expand All @@ -2065,33 +2069,54 @@ impl CatalogState {
continue;
}
let entry = self.get_entry(&id);
let Some(source) = entry.source() else {
// Views could depend on sources, so ignore them if added by used_by below.
continue;
};
let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
match source.data_source {
DataSourceDesc::Ingestion { .. } => {
// For sources, look up each dependent subsource and propagate.
cws.entry(source_cw).or_default().insert(id);
ids.extend(entry.used_by());
}
DataSourceDesc::IngestionExport { ingestion_id, .. } => {
// For subsources, look up the parent source and propagate the compaction
// window.
let ingestion = self
.get_entry(&ingestion_id)
.source()
.expect("must be source");
let cw = ingestion
.custom_logical_compaction_window
.unwrap_or(source_cw);
cws.entry(cw).or_default().insert(id);
match entry.item() {
CatalogItem::Source(source) => {
let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
match source.data_source {
DataSourceDesc::Ingestion { .. } => {
// For sources, look up each dependent source export and propagate.
cws.entry(source_cw).or_default().insert(id);
ids.extend(entry.used_by());
}
DataSourceDesc::IngestionExport { ingestion_id, .. } => {
// For subsources, look up the parent source and propagate the compaction
// window.
let ingestion = self
.get_entry(&ingestion_id)
.source()
.expect("must be source");
let cw = ingestion
.custom_logical_compaction_window
.unwrap_or(source_cw);
cws.entry(cw).or_default().insert(id);
}
DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. } => {
cws.entry(source_cw).or_default().insert(id);
}
}
}
DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. } => {
cws.entry(source_cw).or_default().insert(id);
CatalogItem::Table(table) => match &table.data_source {
TableDataSource::DataSource {
desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
timeline: _,
} => {
let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
let ingestion = self
.get_entry(ingestion_id)
.source()
.expect("must be source");
let cw = ingestion
.custom_logical_compaction_window
.unwrap_or(table_cw);
cws.entry(cw).or_default().insert(id);
}
_ => {}
},
_ => {
// Views could depend on sources, so ignore them if added by used_by above.
continue;
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2495,9 +2495,10 @@ impl Coordinator {
TableDataSource::TableWrites { defaults: _ } => {
CollectionDescription::for_table(table.desc.clone())
}
TableDataSource::DataSource(data_source_desc) => {
source_desc(data_source_desc, &table.desc)
}
TableDataSource::DataSource {
desc: data_source_desc,
timeline: _,
} => source_desc(data_source_desc, &table.desc),
};
collections.push((id, collection_desc));
}
Expand Down
25 changes: 17 additions & 8 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,18 +908,24 @@ impl Coordinator {
plan::TableDataSource::TableWrites { defaults } => {
TableDataSource::TableWrites { defaults }
}
plan::TableDataSource::DataSource(data_source_plan) => match data_source_plan {
plan::TableDataSource::DataSource {
desc: data_source_plan,
timeline,
} => match data_source_plan {
plan::DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
} => TableDataSource::DataSource(DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
}),
} => TableDataSource::DataSource {
desc: DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
},
timeline,
},
o => {
unreachable!("CREATE TABLE data source got {:?}", o)
}
Expand Down Expand Up @@ -990,7 +996,10 @@ impl Coordinator {
)
.await;
}
TableDataSource::DataSource(data_source) => {
TableDataSource::DataSource {
desc: data_source,
timeline: _,
} => {
match data_source {
DataSourceDesc::IngestionExport {
ingestion_id,
Expand Down
30 changes: 20 additions & 10 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,13 @@ pub struct Table {
}

impl Table {
// The Coordinator controls insertions for tables (including system tables),
// so they are realtime.
pub fn timeline(&self) -> Timeline {
Timeline::EpochMilliseconds
match &self.data_source {
// The Coordinator controls insertions for writable tables
// (including system tables), so they are realtime.
TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
TableDataSource::DataSource { timeline, .. } => timeline.clone(),
}
}
}

Expand All @@ -666,7 +669,10 @@ pub enum TableDataSource {

/// The table receives its data from the identified `DataSourceDesc`.
/// This table type does not support INSERT/UPDATE/DELETE statements.
DataSource(DataSourceDesc),
DataSource {
desc: DataSourceDesc,
timeline: Timeline,
},
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -1846,12 +1852,16 @@ impl CatalogEntry {
_ => None,
},
CatalogItem::Table(table) => match &table.data_source {
TableDataSource::DataSource(DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
}) => Some((*ingestion_id, external_reference, details, data_config)),
TableDataSource::DataSource {
desc:
DataSourceDesc::IngestionExport {
ingestion_id,
external_reference,
details,
data_config,
},
timeline: _,
} => Some((*ingestion_id, external_reference, details, data_config)),
_ => None,
},
_ => None,
Expand Down
6 changes: 5 additions & 1 deletion src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,8 @@ pub enum TableFromSourceOptionName {
TextColumns,
/// Columns you want to exclude when ingesting data
ExcludeColumns,
/// The timeline to use for this table
Timeline,
/// Hex-encoded protobuf of a `ProtoSourceExportStatementDetails`
/// message, which includes details necessary for planning this
/// table as a Source Export
Expand All @@ -1586,6 +1588,7 @@ impl AstDisplay for TableFromSourceOptionName {
f.write_str(match self {
TableFromSourceOptionName::TextColumns => "TEXT COLUMNS",
TableFromSourceOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
TableFromSourceOptionName::Timeline => "TIMELINE",
TableFromSourceOptionName::Details => "DETAILS",
})
}
Expand All @@ -1602,7 +1605,8 @@ impl WithOptionName for TableFromSourceOptionName {
match self {
TableFromSourceOptionName::Details
| TableFromSourceOptionName::TextColumns
| TableFromSourceOptionName::ExcludeColumns => false,
| TableFromSourceOptionName::ExcludeColumns
| TableFromSourceOptionName::Timeline => false,
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,10 @@ pub enum TableDataSource {

/// The table receives its data from the identified `DataSourceDesc`.
/// This table type does not support INSERT/UPDATE/DELETE statements.
DataSource(DataSourceDesc),
DataSource {
desc: DataSourceDesc,
timeline: Timeline,
},
}

#[derive(Clone, Debug)]
Expand Down
28 changes: 26 additions & 2 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,7 @@ generate_extracted_config!(
TableFromSourceOption,
(TextColumns, Vec::<Ident>, Default(vec![])),
(ExcludeColumns, Vec::<Ident>, Default(vec![])),
(Timeline, String),
(Details, String)
);

Expand Down Expand Up @@ -1560,6 +1561,7 @@ pub fn plan_create_table_from_source(
text_columns,
exclude_columns,
details,
timeline,
seen: _,
} = with_options.clone().try_into()?;

Expand Down Expand Up @@ -1720,6 +1722,26 @@ pub fn plan_create_table_from_source(
source_connection,
)?;

let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;

// Allow users to specify a timeline. If they do not, determine a default
// timeline for the source.
let timeline = match timeline {
None => match envelope {
SourceEnvelope::CdcV2 => {
Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
}
_ => Timeline::EpochMilliseconds,
},
// TODO(benesch): if we stabilize this, can we find a better name than
// `mz_epoch_ms`? Maybe just `mz_system`?
Some(timeline) if timeline == "mz_epoch_ms" => Timeline::EpochMilliseconds,
Some(timeline) if timeline.starts_with("mz_") => {
return Err(PlanError::UnacceptableTimelineName(timeline));
}
Some(timeline) => Timeline::User(timeline),
};

let data_source = DataSourceDesc::IngestionExport {
ingestion_id,
external_reference: external_reference
Expand All @@ -1730,7 +1752,6 @@ pub fn plan_create_table_from_source(
data_config: SourceExportDataConfig { envelope, encoding },
};

let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
let if_not_exists = *if_not_exists;

let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
Expand All @@ -1740,7 +1761,10 @@ pub fn plan_create_table_from_source(
desc,
temporary: false,
compaction_window: None,
data_source: TableDataSource::DataSource(data_source),
data_source: TableDataSource::DataSource {
desc: data_source,
timeline,
},
};

Ok(Plan::CreateTable(CreateTablePlan {
Expand Down
Loading

0 comments on commit dd64a2a

Please sign in to comment.