Skip to content

Commit

Permalink
[DNM] ct: add strawman impl of CREATE CONTINUAL TASK
Browse files Browse the repository at this point in the history
  • Loading branch information
danhhz committed Sep 6, 2024
1 parent 74a4630 commit 2712c06
Show file tree
Hide file tree
Showing 28 changed files with 1,173 additions and 102 deletions.
17 changes: 11 additions & 6 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ impl Catalog {
}
policies
}

/// WIP
pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) {
self.state.entry_by_id.insert(id, entry);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -2342,7 +2347,7 @@ mod tests {
.expect("unable to open debug catalog");
let item = catalog
.state()
.deserialize_item(&create_sql)
.deserialize_item(id, &create_sql)
.expect("unable to parse view");
catalog
.transact(
Expand Down Expand Up @@ -3227,16 +3232,16 @@ mod tests {
let schema_spec = schema.id().clone();
let schema_name = &schema.name().schema;
let database_spec = ResolvedDatabaseSpecifier::Id(database_id);
let mv_id = catalog
.allocate_user_id()
.await
.expect("unable to allocate id");
let mv = catalog
.state()
.deserialize_item(&format!(
.deserialize_item(mv_id, &format!(
"CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"
))
.expect("unable to deserialize item");
let mv_id = catalog
.allocate_user_id()
.await
.expect("unable to allocate id");
catalog
.transact(
None,
Expand Down
20 changes: 13 additions & 7 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ impl CatalogState {
Builtin::Index(index) => {
let mut item = self
.parse_item(
Some(id),
&index.create_sql(),
None,
index.is_retained_metrics_object,
Expand Down Expand Up @@ -736,6 +737,7 @@ impl CatalogState {
Builtin::Connection(connection) => {
let mut item = self
.parse_item(
Some(id),
connection.sql,
None,
false,
Expand Down Expand Up @@ -855,9 +857,11 @@ impl CatalogState {
// This makes it difficult to use the `UpdateFrom` trait, but the structure
// is still the same as the trait.
if retraction.create_sql() != create_sql {
let item = self.deserialize_item(&create_sql).unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
let item =
self.deserialize_item(item.id, &create_sql)
.unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
retraction.item = item;
}
retraction.id = id;
Expand All @@ -869,9 +873,11 @@ impl CatalogState {
retraction
}
None => {
let catalog_item = self.deserialize_item(&create_sql).unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
let catalog_item = self
.deserialize_item(item.id, &create_sql)
.unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
CatalogEntry {
item: catalog_item,
referenced_by: Vec::new(),
Expand Down Expand Up @@ -1182,7 +1188,7 @@ impl CatalogState {
let handle = mz_ore::task::spawn(
|| "parse view",
async move {
let res = task_state.parse_item(&create_sql, None, false, None);
let res = task_state.parse_item(None, &create_sql, None, false, None);
(id, res)
}
.instrument(span),
Expand Down
17 changes: 10 additions & 7 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,16 +1242,19 @@ impl CatalogState {
})
.into_element()
.ast;
let query = match &create_stmt {
Statement::CreateMaterializedView(stmt) => &stmt.query,
let query_string = match &create_stmt {
Statement::CreateMaterializedView(stmt) => {
let mut query_string = stmt.query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');
query_string
}
// TODO(ct): Remove.
Statement::CreateContinualTask(_) => "TODO(ct)".into(),
_ => unreachable!(),
};

let mut query_string = query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');

let mut updates = Vec::new();

updates.push(BuiltinTableUpdate {
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl CatalogItemRebuilder {
custom_logical_compaction_window,
} => state
.parse_item(
None,
&sql,
None,
is_retained_metrics_object,
Expand Down
55 changes: 51 additions & 4 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use mz_controller::clusters::{
UnmanagedReplicaLocation,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::visit::Visit;
use mz_expr::{Id, LocalId};
use mz_ore::collections::CollectionExt;
use mz_ore::now::NOW_ZERO;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -67,7 +69,7 @@ use mz_sql::names::{
use mz_sql::plan::{
CreateConnectionPlan, CreateContinualTaskPlan, CreateIndexPlan, CreateMaterializedViewPlan,
CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
CreateViewPlan, Params, Plan, PlanContext,
CreateViewPlan, HirRelationExpr, Params, Plan, PlanContext,
};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
Expand Down Expand Up @@ -787,14 +789,19 @@ impl CatalogState {
}

/// Parses the given SQL string into a pair of [`CatalogItem`].
pub(crate) fn deserialize_item(&self, create_sql: &str) -> Result<CatalogItem, AdapterError> {
self.parse_item(create_sql, None, false, None)
pub(crate) fn deserialize_item(
&self,
id: GlobalId,
create_sql: &str,
) -> Result<CatalogItem, AdapterError> {
self.parse_item(Some(id), create_sql, None, false, None)
}

/// Parses the given SQL string into a `CatalogItem`.
#[mz_ore::instrument]
pub(crate) fn parse_item(
&self,
item_id: Option<GlobalId>,
create_sql: &str,
pcx: Option<&PlanContext>,
is_retained_metrics_object: bool,
Expand Down Expand Up @@ -942,7 +949,47 @@ impl CatalogState {
initial_as_of,
})
}
Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"),
Plan::CreateContinualTask(CreateContinualTaskPlan { continual_task, .. }) => {
// Collect optimizer parameters.
let optimizer_config =
optimize::OptimizerConfig::from(session_catalog.system_vars());
// Build an optimizer for this VIEW.
// TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);

let mut raw_expr = continual_task.expr;
// Replace our placeholder fake ctes with the real output id, now that
// we have it.
raw_expr.visit_mut_post(&mut |expr| match expr {
HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => {
*id = Id::Global(item_id.expect("WIP hacks"));
}
_ => {}
})?;

let optimized_expr = optimizer.optimize(raw_expr.clone())?;
let mut typ = optimized_expr.typ();
for &i in &continual_task.non_null_assertions {
typ.column_types[i].nullable = false;
}
let desc = RelationDesc::new(typ, continual_task.column_names);

let initial_as_of = continual_task.as_of.map(Antichain::from_elem);

// TODO(ct): CatalogItem::ContinualTask
CatalogItem::MaterializedView(MaterializedView {
create_sql: continual_task.create_sql,
raw_expr,
optimized_expr,
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
non_null_assertions: continual_task.non_null_assertions,
custom_logical_compaction_window: continual_task.compaction_window,
refresh_schedule: continual_task.refresh_schedule,
initial_as_of,
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
create_sql: index.create_sql,
on: index.on,
Expand Down
8 changes: 5 additions & 3 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{
CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
WithOptionValue,
Expand Down Expand Up @@ -501,7 +500,7 @@ impl Coordinator {
self.handle_execute_inner(stmt, params, ctx).await
}

#[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
#[instrument(name = "coord::handle_execute_inner")]
pub(crate) async fn handle_execute_inner(
&mut self,
stmt: Arc<Statement<Raw>>,
Expand Down Expand Up @@ -1060,7 +1059,10 @@ impl Coordinator {
.any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
{
let catalog = self.catalog().for_session(session);
let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(
&catalog,
cmvs.in_cluster.as_ref(),
)?;
let ids = self
.index_oracle(cluster)
.sufficient_collections(resolved_ids.0.iter());
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,15 @@ impl Coordinator {
indexes_to_drop.push((*cluster_id, *id));
}
CatalogItem::MaterializedView(MaterializedView {
create_sql,
cluster_id,
..
}) => {
materialized_views_to_drop.push((*cluster_id, *id));
if create_sql.to_lowercase().contains("continual task") {
// WIP noop because it's not actually running under this id
} else {
materialized_views_to_drop.push((*cluster_id, *id));
}
}
CatalogItem::View(_) => views_to_drop.push(*id),
CatalogItem::Secret(_) => {
Expand Down
Loading

0 comments on commit 2712c06

Please sign in to comment.