Skip to content

Commit

Permalink
[DNM] ct: add strawman impl of CREATE CONTINUAL TASK
Browse files Browse the repository at this point in the history
  • Loading branch information
danhhz committed Sep 6, 2024
1 parent 74a4630 commit 5847137
Show file tree
Hide file tree
Showing 24 changed files with 1,171 additions and 71 deletions.
5 changes: 5 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ impl Catalog {
}
policies
}

/// WIP
pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) {
self.state.entry_by_id.insert(id, entry);
}
}

#[derive(Debug)]
Expand Down
17 changes: 10 additions & 7 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,16 +1242,19 @@ impl CatalogState {
})
.into_element()
.ast;
let query = match &create_stmt {
Statement::CreateMaterializedView(stmt) => &stmt.query,
let query_string = match &create_stmt {
Statement::CreateMaterializedView(stmt) => {
let mut query_string = stmt.query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');
query_string
}
// TODO(ct): Remove.
Statement::CreateContinualTask(_) => "TODO(ct)".into(),
_ => unreachable!(),
};

let mut query_string = query.to_ast_string_stable();
// PostgreSQL appends a semicolon in `pg_matviews.definition`, we
// do the same for compatibility's sake.
query_string.push(';');

let mut updates = Vec::new();

updates.push(BuiltinTableUpdate {
Expand Down
27 changes: 26 additions & 1 deletion src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mz_controller::clusters::{
UnmanagedReplicaLocation,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NOW_ZERO;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -942,7 +943,31 @@ impl CatalogState {
initial_as_of,
})
}
Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"),
Plan::CreateContinualTask(CreateContinualTaskPlan {
desc,
continual_task,
..
}) => {
// TODO(ct): Figure out how to make this survive restarts. The
// expr we saved still had the LocalId placeholders for the
// output, but we don't have access to the real Id here.
let optimized_expr = OptimizedMirRelationExpr::declare_optimized(
mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()),
);
// TODO(ct): CatalogItem::ContinualTask
CatalogItem::MaterializedView(MaterializedView {
create_sql: continual_task.create_sql,
raw_expr: continual_task.expr.clone(),
optimized_expr,
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
non_null_assertions: continual_task.non_null_assertions,
custom_logical_compaction_window: continual_task.compaction_window,
refresh_schedule: continual_task.refresh_schedule,
initial_as_of: continual_task.as_of.map(Antichain::from_elem),
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
create_sql: index.create_sql,
on: index.on,
Expand Down
8 changes: 7 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,16 @@ impl Coordinator {
indexes_to_drop.push((*cluster_id, *id));
}
CatalogItem::MaterializedView(MaterializedView {
create_sql,
cluster_id,
..
}) => {
materialized_views_to_drop.push((*cluster_id, *id));
if create_sql.to_lowercase().contains("continual task") {
// TODO(ct): Hack no-op here because it's not actually
// running under this id on the cluster.
} else {
materialized_views_to_drop.push((*cluster_id, *id));
}
}
CatalogItem::View(_) => views_to_drop.push(*id),
CatalogItem::Secret(_) => {
Expand Down
180 changes: 178 additions & 2 deletions src/adapter/src/coord/sequencer/inner/create_continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,34 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource,
};
use mz_compute_types::sinks::{
ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection,
};
use mz_expr::visit::Visit;
use mz_expr::{Id, LocalId};
use mz_ore::instrument;
use mz_repr::adt::mz_acl_item::PrivilegeMap;
use mz_repr::optimize::OverrideFrom;
use mz_sql::names::ResolvedIds;
use mz_sql::plan;
use mz_sql::plan::{self, HirRelationExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};

use crate::catalog;
use crate::command::ExecuteResponse;
use crate::coord::Coordinator;
use crate::error::AdapterError;
use crate::optimize::dataflows::dataflow_import_id_bundle;
use crate::optimize::{self, Optimize};
use crate::session::Session;
use crate::util::ResultExt;

// TODO(ct): Big oof. Dedup a bunch of this with MVs.
impl Coordinator {
#[instrument]
pub(crate) async fn sequence_create_continual_task(
Expand All @@ -24,6 +43,163 @@ impl Coordinator {
plan: plan::CreateContinualTaskPlan,
resolved_ids: ResolvedIds,
) -> Result<ExecuteResponse, AdapterError> {
todo!("WIP {:?}", (session, plan, resolved_ids));
let plan::CreateContinualTaskPlan {
name,
desc,
input_id,
continual_task:
plan::MaterializedView {
create_sql,
cluster_id,
mut expr,
column_names,
non_null_assertions,
compaction_window: _,
refresh_schedule,
as_of: _,
},
} = plan;

// Collect optimizer parameters.
let compute_instance = self
.instance_snapshot(cluster_id)
.expect("compute instance does not exist");

let debug_name = self.catalog().resolve_full_name(&name, None).to_string();
let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
.override_from(&self.catalog.get_cluster(cluster_id).config.features());

let view_id = self.allocate_transient_id();
let bonus_id = self.allocate_transient_id();
let catalog_mut = self.catalog_mut();
let sink_id = catalog_mut.allocate_user_id().await?;

// Put a placeholder in the catalog so the optimizer can find something
// for the sink_id.
let fake_entry = CatalogEntry {
item: CatalogItem::Table(Table {
create_sql: Some(create_sql.clone()),
desc: desc.clone(),
conn_id: None,
resolved_ids: resolved_ids.clone(),
custom_logical_compaction_window: None,
is_retained_metrics_object: false,
data_source: TableDataSource::TableWrites {
defaults: Vec::new(),
},
}),
referenced_by: Vec::new(),
used_by: Vec::new(),
id: sink_id,
oid: 0,
name: name.clone(),
owner_id: *session.current_role_id(),
privileges: PrivilegeMap::new(),
};
catalog_mut.hack_add_ct(sink_id, fake_entry);

// Build an optimizer for this CONTINUAL TASK.
let mut optimizer = optimize::materialized_view::Optimizer::new(
Arc::new(catalog_mut.clone()),
compute_instance,
sink_id,
view_id,
column_names,
non_null_assertions,
refresh_schedule.clone(),
debug_name,
optimizer_config,
self.optimizer_metrics(),
);

// Replace our placeholder fake ctes with the real output id, now that
// we have it.
expr.visit_mut_post(&mut |expr| match expr {
HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => {
*id = Id::Global(sink_id);
}
_ => {}
})?;

// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
let raw_expr = expr.clone();
let local_mir_plan = optimizer.catch_unwind_optimize(expr)?;
let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
// MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
let (mut df_desc, _df_meta) = global_lir_plan.unapply();

// Timestamp selection
let mut id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id.clone());
// TODO(ct): Can't acquire a read hold on ourselves because we don't
// exist yet.
id_bundle.storage_ids.remove(&sink_id);
let read_holds = self.acquire_read_holds(&id_bundle);
let dataflow_as_of = read_holds.least_valid_read();
let storage_as_of = read_holds.least_valid_read();
df_desc.set_as_of(dataflow_as_of.clone());

// TODO(ct): HACKs
let (_id, sink) = df_desc.sink_exports.pop_first().expect("WIP");
df_desc.sink_exports.insert(bonus_id, sink);
for sink in df_desc.sink_exports.values_mut() {
match &mut sink.connection {
ComputeSinkConnection::Persist(PersistSinkConnection {
storage_metadata, ..
}) => {
sink.connection =
ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
input_id,
output_metadata: *storage_metadata,
output_id: sink_id,
})
}
_ => unreachable!("MV should produce persist sink connection"),
}
}

let ops = vec![catalog::Op::CreateItem {
id: sink_id,
name: name.clone(),
item: CatalogItem::MaterializedView(MaterializedView {
create_sql,
raw_expr,
optimized_expr: local_mir_plan.expr(),
desc: desc.clone(),
resolved_ids,
cluster_id,
non_null_assertions: Vec::new(),
custom_logical_compaction_window: None,
refresh_schedule,
initial_as_of: Some(storage_as_of.clone()),
}),
owner_id: *session.current_role_id(),
}];

let () = self
.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
coord
.controller
.storage
.create_collections(
coord.catalog.state().storage_metadata(),
None,
vec![(
sink_id,
CollectionDescription {
desc,
data_source: DataSource::Other(DataSourceOther::Compute),
since: Some(storage_as_of),
status_collection_id: None,
},
)],
)
.await
.unwrap_or_terminate("cannot fail to append");

coord.ship_dataflow(df_desc, cluster_id.clone()).await;
})
.await?;
Ok(ExecuteResponse::CreatedContinualTask)
}
}
12 changes: 10 additions & 2 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use mz_repr::{Datum, GlobalId, RowArena, Timestamp};
use mz_sql::ast::{ExplainStage, Statement};
use mz_sql::catalog::CatalogCluster;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
use mz_catalog::memory::objects::CatalogItem;
use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
use mz_sql::plan::QueryWhen;
use mz_sql::plan::{self, HirScalarExpr};
use mz_sql::session::metadata::SessionMetadata;
Expand Down Expand Up @@ -869,9 +869,17 @@ impl Coordinator {
CatalogItem::Table(_) | CatalogItem::Source(_) => {
transitive_storage_deps.insert(id);
}
CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
CatalogItem::Index(_) => {
transitive_compute_deps.insert(id);
}
CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => {
if create_sql.to_lowercase().contains("continual task") {
// TODO(ct): Hack no-op here because it's not actually
// running under this id on the cluster.
} else {
transitive_compute_deps.insert(id);
}
}
_ => {}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig};
use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
use mz_compute_types::plan::flat_plan::FlatPlan;
use mz_compute_types::plan::LirId;
use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection};
use mz_compute_types::sinks::{
ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, PersistSinkConnection,
};
use mz_compute_types::sources::SourceInstanceDesc;
use mz_dyncfg::ConfigSet;
use mz_expr::RowSetFinishing;
Expand Down Expand Up @@ -1243,7 +1245,17 @@ where
ComputeSinkConnection::Persist(conn)
}
ComputeSinkConnection::ContinualTask(conn) => {
todo!("WIP {:?}", conn);
let output_metadata = self
.storage_collections
.collection_metadata(conn.output_id)
.map_err(|_| DataflowCreationError::CollectionMissing(id))?
.clone();
let conn = ContinualTaskConnection {
input_id: conn.input_id,
output_id: conn.output_id,
output_metadata,
};
ComputeSinkConnection::ContinualTask(conn)
}
ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
ComputeSinkConnection::CopyToS3Oneshot(conn) => {
Expand Down
3 changes: 3 additions & 0 deletions src/compute-types/src/sinks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ message ProtoPersistSinkConnection {
}

message ProtoContinualTaskConnection {
mz_repr.global_id.ProtoGlobalId input_id = 1;
mz_storage_types.controller.ProtoCollectionMetadata output_metadata = 2;
mz_repr.global_id.ProtoGlobalId output_id = 3;
}

message ProtoCopyToS3OneshotSinkConnection {
Expand Down
Loading

0 comments on commit 5847137

Please sign in to comment.