diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index c397638d3dee8..80df60877f086 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -360,6 +360,11 @@ impl Catalog { } policies } + + /// WIP + pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) { + self.state.entry_by_id.insert(id, entry); + } } #[derive(Debug)] diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 0ba2e38465804..959f932ec2869 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -1242,16 +1242,19 @@ impl CatalogState { }) .into_element() .ast; - let query = match &create_stmt { - Statement::CreateMaterializedView(stmt) => &stmt.query, + let query_string = match &create_stmt { + Statement::CreateMaterializedView(stmt) => { + let mut query_string = stmt.query.to_ast_string_stable(); + // PostgreSQL appends a semicolon in `pg_matviews.definition`, we + // do the same for compatibility's sake. + query_string.push(';'); + query_string + } + // TODO(ct): Remove. + Statement::CreateContinualTask(_) => "TODO(ct)".into(), _ => unreachable!(), }; - let mut query_string = query.to_ast_string_stable(); - // PostgreSQL appends a semicolon in `pg_matviews.definition`, we - // do the same for compatibility's sake. - query_string.push(';'); - let mut updates = Vec::new(); updates.push(BuiltinTableUpdate { diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 5d27d2b8d58b2..7c11479d851c2 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -38,6 +38,7 @@ use mz_controller::clusters::{ UnmanagedReplicaLocation, }; use mz_controller_types::{ClusterId, ReplicaId}; +use mz_expr::OptimizedMirRelationExpr; use mz_ore::collections::CollectionExt; use mz_ore::now::NOW_ZERO; use mz_ore::soft_assert_no_log; @@ -942,7 +943,31 @@ impl CatalogState { initial_as_of, }) } - Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"), + Plan::CreateContinualTask(CreateContinualTaskPlan { + desc, + continual_task, + .. + }) => { + // TODO(ct): Figure out how to make this survive restarts. The + // expr we saved still had the LocalId placeholders for the + // output, but we don't have access to the real Id here. + let optimized_expr = OptimizedMirRelationExpr::declare_optimized( + mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()), + ); + // TODO(ct): CatalogItem::ContinualTask + CatalogItem::MaterializedView(MaterializedView { + create_sql: continual_task.create_sql, + raw_expr: continual_task.expr.clone(), + optimized_expr, + desc, + resolved_ids, + cluster_id: continual_task.cluster_id, + non_null_assertions: continual_task.non_null_assertions, + custom_logical_compaction_window: continual_task.compaction_window, + refresh_schedule: continual_task.refresh_schedule, + initial_as_of: continual_task.as_of.map(Antichain::from_elem), + }) + } Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { create_sql: index.create_sql, on: index.on, diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 24ac16d713346..9ccd5eb3128f5 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -255,10 +255,16 @@ impl Coordinator { indexes_to_drop.push((*cluster_id, *id)); } CatalogItem::MaterializedView(MaterializedView { + create_sql, cluster_id, .. }) => { - materialized_views_to_drop.push((*cluster_id, *id)); + if create_sql.to_lowercase().contains("continual task") { + // TODO(ct): Hack no-op here because it's not actually + // running under this id on the cluster. + } else { + materialized_views_to_drop.push((*cluster_id, *id)); + } } CatalogItem::View(_) => views_to_drop.push(*id), CatalogItem::Secret(_) => { diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index f94fa182182aa..3c3e2d601e070 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -7,15 +7,34 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + +use mz_catalog::memory::objects::{ + CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource, +}; +use mz_compute_types::sinks::{ + ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection, +}; +use mz_expr::visit::Visit; +use mz_expr::{Id, LocalId}; use mz_ore::instrument; +use mz_repr::adt::mz_acl_item::PrivilegeMap; +use mz_repr::optimize::OverrideFrom; use mz_sql::names::ResolvedIds; -use mz_sql::plan; +use mz_sql::plan::{self, HirRelationExpr}; +use mz_sql::session::metadata::SessionMetadata; +use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther}; +use crate::catalog; use crate::command::ExecuteResponse; use crate::coord::Coordinator; use crate::error::AdapterError; +use crate::optimize::dataflows::dataflow_import_id_bundle; +use crate::optimize::{self, Optimize}; use crate::session::Session; +use crate::util::ResultExt; +// TODO(ct): Big oof. Dedup a bunch of this with MVs. impl Coordinator { #[instrument] pub(crate) async fn sequence_create_continual_task( @@ -24,6 +43,163 @@ impl Coordinator { plan: plan::CreateContinualTaskPlan, resolved_ids: ResolvedIds, ) -> Result { - todo!("WIP {:?}", (session, plan, resolved_ids)); + let plan::CreateContinualTaskPlan { + name, + desc, + input_id, + continual_task: + plan::MaterializedView { + create_sql, + cluster_id, + mut expr, + column_names, + non_null_assertions, + compaction_window: _, + refresh_schedule, + as_of: _, + }, + } = plan; + + // Collect optimizer parameters. + let compute_instance = self + .instance_snapshot(cluster_id) + .expect("compute instance does not exist"); + + let debug_name = self.catalog().resolve_full_name(&name, None).to_string(); + let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config()) + .override_from(&self.catalog.get_cluster(cluster_id).config.features()); + + let view_id = self.allocate_transient_id(); + let bonus_id = self.allocate_transient_id(); + let catalog_mut = self.catalog_mut(); + let sink_id = catalog_mut.allocate_user_id().await?; + + // Put a placeholder in the catalog so the optimizer can find something + // for the sink_id. + let fake_entry = CatalogEntry { + item: CatalogItem::Table(Table { + create_sql: Some(create_sql.clone()), + desc: desc.clone(), + conn_id: None, + resolved_ids: resolved_ids.clone(), + custom_logical_compaction_window: None, + is_retained_metrics_object: false, + data_source: TableDataSource::TableWrites { + defaults: Vec::new(), + }, + }), + referenced_by: Vec::new(), + used_by: Vec::new(), + id: sink_id, + oid: 0, + name: name.clone(), + owner_id: *session.current_role_id(), + privileges: PrivilegeMap::new(), + }; + catalog_mut.hack_add_ct(sink_id, fake_entry); + + // Build an optimizer for this CONTINUAL TASK. + let mut optimizer = optimize::materialized_view::Optimizer::new( + Arc::new(catalog_mut.clone()), + compute_instance, + sink_id, + view_id, + column_names, + non_null_assertions, + refresh_schedule.clone(), + debug_name, + optimizer_config, + self.optimizer_metrics(), + ); + + // Replace our placeholder fake ctes with the real output id, now that + // we have it. + expr.visit_mut_post(&mut |expr| match expr { + HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => { + *id = Id::Global(sink_id); + } + _ => {} + })?; + + // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global) + let raw_expr = expr.clone(); + let local_mir_plan = optimizer.catch_unwind_optimize(expr)?; + let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?; + // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global) + let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?; + let (mut df_desc, _df_meta) = global_lir_plan.unapply(); + + // Timestamp selection + let mut id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id.clone()); + // TODO(ct): Can't acquire a read hold on ourselves because we don't + // exist yet. + id_bundle.storage_ids.remove(&sink_id); + let read_holds = self.acquire_read_holds(&id_bundle); + let dataflow_as_of = read_holds.least_valid_read(); + let storage_as_of = read_holds.least_valid_read(); + df_desc.set_as_of(dataflow_as_of.clone()); + + // TODO(ct): HACKs + let (_id, sink) = df_desc.sink_exports.pop_first().expect("WIP"); + df_desc.sink_exports.insert(bonus_id, sink); + for sink in df_desc.sink_exports.values_mut() { + match &mut sink.connection { + ComputeSinkConnection::Persist(PersistSinkConnection { + storage_metadata, .. + }) => { + sink.connection = + ComputeSinkConnection::ContinualTask(ContinualTaskConnection { + input_id, + output_metadata: *storage_metadata, + output_id: sink_id, + }) + } + _ => unreachable!("MV should produce persist sink connection"), + } + } + + let ops = vec![catalog::Op::CreateItem { + id: sink_id, + name: name.clone(), + item: CatalogItem::MaterializedView(MaterializedView { + create_sql, + raw_expr, + optimized_expr: local_mir_plan.expr(), + desc: desc.clone(), + resolved_ids, + cluster_id, + non_null_assertions: Vec::new(), + custom_logical_compaction_window: None, + refresh_schedule, + initial_as_of: Some(storage_as_of.clone()), + }), + owner_id: *session.current_role_id(), + }]; + + let () = self + .catalog_transact_with_side_effects(Some(session), ops, |coord| async { + coord + .controller + .storage + .create_collections( + coord.catalog.state().storage_metadata(), + None, + vec![( + sink_id, + CollectionDescription { + desc, + data_source: DataSource::Other(DataSourceOther::Compute), + since: Some(storage_as_of), + status_collection_id: None, + }, + )], + ) + .await + .unwrap_or_terminate("cannot fail to append"); + + coord.ship_dataflow(df_desc, cluster_id.clone()).await; + }) + .await?; + Ok(ExecuteResponse::CreatedContinualTask) } } diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 583ff66903643..259627aa0745c 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -24,7 +24,7 @@ use mz_repr::{Datum, GlobalId, RowArena, Timestamp}; use mz_sql::ast::{ExplainStage, Statement}; use mz_sql::catalog::CatalogCluster; // Import `plan` module, but only import select elements to avoid merge conflicts on use statements. -use mz_catalog::memory::objects::CatalogItem; +use mz_catalog::memory::objects::{CatalogItem, MaterializedView}; use mz_sql::plan::QueryWhen; use mz_sql::plan::{self, HirScalarExpr}; use mz_sql::session::metadata::SessionMetadata; @@ -869,9 +869,17 @@ impl Coordinator { CatalogItem::Table(_) | CatalogItem::Source(_) => { transitive_storage_deps.insert(id); } - CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => { + CatalogItem::Index(_) => { transitive_compute_deps.insert(id); } + CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => { + if create_sql.to_lowercase().contains("continual task") { + // TODO(ct): Hack no-op here because it's not actually + // running under this id on the cluster. + } else { + transitive_compute_deps.insert(id); + } + } _ => {} } } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 625cc3c8d371c..69e7672bc4d46 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -22,7 +22,9 @@ use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig}; use mz_compute_types::dataflows::{BuildDesc, DataflowDescription}; use mz_compute_types::plan::flat_plan::FlatPlan; use mz_compute_types::plan::LirId; -use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection}; +use mz_compute_types::sinks::{ + ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, PersistSinkConnection, +}; use mz_compute_types::sources::SourceInstanceDesc; use mz_dyncfg::ConfigSet; use mz_expr::RowSetFinishing; @@ -1243,7 +1245,17 @@ where ComputeSinkConnection::Persist(conn) } ComputeSinkConnection::ContinualTask(conn) => { - todo!("WIP {:?}", conn); + let output_metadata = self + .storage_collections + .collection_metadata(conn.output_id) + .map_err(|_| DataflowCreationError::CollectionMissing(id))? + .clone(); + let conn = ContinualTaskConnection { + input_id: conn.input_id, + output_id: conn.output_id, + output_metadata, + }; + ComputeSinkConnection::ContinualTask(conn) } ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn), ComputeSinkConnection::CopyToS3Oneshot(conn) => { diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto index e55398971c505..db9f146cbef1e 100644 --- a/src/compute-types/src/sinks.proto +++ b/src/compute-types/src/sinks.proto @@ -48,6 +48,9 @@ message ProtoPersistSinkConnection { } message ProtoContinualTaskConnection { + mz_repr.global_id.ProtoGlobalId input_id = 1; + mz_storage_types.controller.ProtoCollectionMetadata output_metadata = 2; + mz_repr.global_id.ProtoGlobalId output_id = 3; } message ProtoCopyToS3OneshotSinkConnection { diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index 9a3e2335c8e52..ab856e0670641 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -256,15 +256,33 @@ impl RustType for PersistSinkConnection { - _phantom: std::marker::PhantomData, + pub input_id: GlobalId, + pub output_metadata: S, + // TODO(ct): This can be removed once we actually render this under the + // correct id. + pub output_id: GlobalId, } impl RustType for ContinualTaskConnection { fn into_proto(&self) -> ProtoContinualTaskConnection { - todo!("WIP"); + ProtoContinualTaskConnection { + input_id: Some(self.input_id.into_proto()), + output_metadata: Some(self.output_metadata.into_proto()), + output_id: Some(self.output_id.into_proto()), + } } fn from_proto(proto: ProtoContinualTaskConnection) -> Result { - todo!("WIP {:?}", proto); + Ok(ContinualTaskConnection { + input_id: proto + .input_id + .into_rust_if_some("ProtoContinualTaskConnection::input_id")?, + output_metadata: proto + .output_metadata + .into_rust_if_some("ProtoContinualTaskConnection::output_metadata")?, + output_id: proto + .output_id + .into_rust_if_some("ProtoContinualTaskConnection::output_id")?, + }) } } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index d85d8d3f05222..f67703a2f19b3 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -150,6 +150,7 @@ use crate::logging::compute::LogDataflowErrors; use crate::render::context::{ ArrangementFlavor, Context, MzArrangement, MzArrangementImport, ShutdownToken, }; +use crate::sink::continual_task::ContinualTaskCtx; use crate::typedefs::{ErrSpine, KeyBatcher}; pub mod context; @@ -202,6 +203,10 @@ pub fn build_compute_dataflow( let build_name = format!("BuildRegion: {}", &dataflow.debug_name); timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| { + // TODO(ct): This should be a config of the source instead, but at least try + // to contain the hacks. + let mut ct_ctx = ContinualTaskCtx::new(&dataflow); + // The scope.clone() occurs to allow import in the region. // We build a region here to establish a pattern of a scope inside the dataflow, // so that other similar uses (e.g. with iterative scopes) do not require weird @@ -217,23 +222,40 @@ pub fn build_compute_dataflow( .expect("Linear operators should always be valid") }); - // Note: For correctness, we require that sources only emit times advanced by - // `dataflow.as_of`. `persist_source` is documented to provide this guarantee. - let (mut ok_stream, err_stream, token) = persist_source::persist_source( + let maybe_ct_source = ct_ctx.render_source( inner, *source_id, - Arc::clone(&compute_state.persist_clients), - &compute_state.txns_ctx, - &compute_state.worker_config, - source.storage_metadata.clone(), - dataflow.as_of.clone(), - SnapshotMode::Include, - dataflow.until.clone(), + &source.storage_metadata, + &compute_state, mfp.as_mut(), - compute_state.dataflow_max_inflight_bytes(), - start_signal.clone(), - |error| panic!("compute_import: {error}"), + &start_signal, ); + let maybe_ct_source = maybe_ct_source.map(|(ct_times, ok, err, tokens)| { + // TODO(ct): Ideally this would be encapsulated by ContinualTaskCtx, but the + // types are tricky. + ct_ctx.ct_times.push(ct_times.leave_region().leave_region()); + (ok, err, tokens) + }); + + // Note: For correctness, we require that sources only emit times advanced by + // `dataflow.as_of`. `persist_source` is documented to provide this guarantee. + let (mut ok_stream, err_stream, token) = maybe_ct_source.unwrap_or_else(|| { + persist_source::persist_source( + inner, + *source_id, + Arc::clone(&compute_state.persist_clients), + &compute_state.txns_ctx, + &compute_state.worker_config, + source.storage_metadata.clone(), + dataflow.as_of.clone(), + SnapshotMode::Include, + dataflow.until.clone(), + mfp.as_mut(), + compute_state.dataflow_max_inflight_bytes(), + start_signal.clone(), + |error| panic!("compute_import: {error}"), + ) + }); // If `mfp` is non-identity, we need to apply what remains. // For the moment, assert that it is either trivial or `None`. @@ -333,6 +355,7 @@ pub fn build_compute_dataflow( sink_id, &sink, start_signal.clone(), + ct_ctx.input_times(), ); } }); @@ -396,6 +419,7 @@ pub fn build_compute_dataflow( sink_id, &sink, start_signal.clone(), + ct_ctx.input_times(), ); } }); diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index f6a72eaf20bfe..1127d352da1a8 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -47,6 +47,7 @@ where sink_id: GlobalId, sink: &ComputeSinkDesc, start_signal: StartSignal, + ct_times: Option>, ) { soft_assert_or_log!( sink.non_null_assertions.is_strictly_sorted(), @@ -143,6 +144,7 @@ where start_signal, ok_collection.enter_region(inner), err_collection.enter_region(inner), + ct_times.map(|x| x.enter_region(inner)), ); if let Some(sink_token) = sink_token { @@ -169,6 +171,8 @@ where start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + // WIP figure out a better way to smuggle this in + ct_times: Option>, ) -> Option>; } diff --git a/src/compute/src/sink/continual_task.rs b/src/compute/src/sink/continual_task.rs index 836dfb2332670..d94308d0dbafe 100644 --- a/src/compute/src/sink/continual_task.rs +++ b/src/compute/src/sink/continual_task.rs @@ -8,20 +8,140 @@ // by the Apache License, Version 2.0. use std::any::Any; +use std::cell::RefCell; +use std::collections::BTreeSet; use std::rc::Rc; +use std::sync::Arc; -use differential_dataflow::Collection; -use mz_compute_types::sinks::{ComputeSinkDesc, ContinualTaskConnection}; +use differential_dataflow::consolidation::consolidate_updates; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::{AsCollection, Collection, Hashable}; +use futures::{Future, FutureExt, StreamExt}; +use mz_compute_types::dataflows::DataflowDescription; +use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection}; +use mz_expr::MfpPlan; +use mz_ore::cast::CastFrom; +use mz_persist_client::operators::shard_source::SnapshotMode; +use mz_persist_client::write::WriteHandle; +use mz_persist_client::Diagnostics; +use mz_persist_types::codec_impls::UnitSchema; use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_storage_operators::persist_source; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; -use timely::dataflow::Scope; -use timely::progress::Antichain; +use mz_storage_types::sources::SourceData; +use mz_timely_util::builder_async::{ + Button, Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, +}; +use timely::dataflow::channels::pact::{Exchange, Pipeline}; +use timely::dataflow::operators::{Filter, Map}; +use timely::dataflow::{Scope, Stream}; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp as _}; use crate::compute_state::ComputeState; use crate::render::sinks::SinkRender; use crate::render::StartSignal; +pub(crate) struct ContinualTaskCtx> { + dataflow_as_of: Option>, + ct_inputs: BTreeSet, + pub ct_times: Vec>, +} + +impl> ContinualTaskCtx { + pub fn new(dataflow: &DataflowDescription) -> Self { + let mut ct_inputs = BTreeSet::new(); + for sink in dataflow.sink_exports.values() { + match &sink.connection { + ComputeSinkConnection::ContinualTask(ContinualTaskConnection { + input_id, .. + }) => { + ct_inputs.insert(*input_id); + } + _ => continue, + } + } + let mut ret = ContinualTaskCtx { + dataflow_as_of: None, + ct_inputs, + ct_times: Vec::new(), + }; + // Only clone the as_of if we're in a CT dataflow. + if !ret.ct_inputs.is_empty() { + ret.dataflow_as_of = dataflow.as_of.clone(); + } + ret + } + + pub fn render_source>( + &mut self, + scope: &mut S, + source_id: GlobalId, + source_meta: &CollectionMetadata, + compute_state: &ComputeState, + map_filter_project: Option<&mut MfpPlan>, + start_signal: &StartSignal, + ) -> Option<( + Collection, + Stream, + Stream, + Vec, + )> { + if !self.ct_inputs.contains(&source_id) { + return None; + } + + let (ok_stream, err_stream, mut tokens) = persist_source::persist_source( + scope, + source_id, + Arc::clone(&compute_state.persist_clients), + &compute_state.txns_ctx, + &compute_state.worker_config, + source_meta.clone(), + self.dataflow_as_of.clone(), + SnapshotMode::Exclude, + Antichain::new(), + map_filter_project, + compute_state.dataflow_max_inflight_bytes(), + start_signal.clone(), + |error| panic!("compute_import: {error}"), + ); + let ok_stream = ok_stream.filter(|(data, ts, diff)| { + if *diff < 0 { + tracing::info!("WIP dropping {:?}", (data, ts, diff)); + false + } else { + true + } + }); + // WIP reduce this down to one update per timestamp before exchanging. + let ct_times = ok_stream.map(|(_row, ts, _diff)| ((), ts, 1)); + let (token, ok_stream) = crate::sink::continual_task::step_backward( + &source_id.to_string(), + Collection::new(ok_stream), + ); + tokens.push(token); + Some(( + ct_times.as_collection(), + ok_stream.inner, + err_stream, + tokens, + )) + } + + pub fn input_times(&self) -> Option> { + let Some(first) = self.ct_times.first() else { + return None; + }; + let mut scope = first.scope(); + let diff_times = + differential_dataflow::collection::concatenate(&mut scope, self.ct_times.to_vec()); + Some(diff_times) + } +} + impl SinkRender for ContinualTaskConnection where G: Scope, @@ -29,24 +149,321 @@ where fn render_sink( &self, compute_state: &mut ComputeState, - sink: &ComputeSinkDesc, + _sink: &ComputeSinkDesc, sink_id: GlobalId, - as_of: Antichain, + _as_of: Antichain, start_signal: StartSignal, oks: Collection, errs: Collection, + append_times: Option>, ) -> Option> { - todo!( - "WIP {:?}", - ( - std::any::type_name_of_val(&compute_state), - sink, - sink_id, - as_of, - std::any::type_name_of_val(&start_signal), - std::any::type_name_of_val(&oks), - std::any::type_name_of_val(&errs), - ) + let to_append = oks + .map(|x| SourceData(Ok(x))) + .concat(&errs.map(|x| SourceData(Err(x)))); + let append_times = append_times.expect("should be provided by ContinualTaskCtx"); + + // WIP do something with this + let active_worker = true; + let shared_frontier = Rc::new(RefCell::new(if active_worker { + Antichain::from_elem(Timestamp::minimum()) + } else { + Antichain::new() + })); + let collection = compute_state.expect_collection_mut(sink_id); + collection.sink_write_frontier = Some(Rc::clone(&shared_frontier)); + + let write_handle = { + let clients = Arc::clone(&compute_state.persist_clients); + let metadata = self.output_metadata.clone(); + async move { + let client = clients + .open(metadata.persist_location) + .await + .expect("valid location"); + client + .open_writer( + metadata.data_shard, + metadata.relation_desc.into(), + UnitSchema.into(), + Diagnostics { + shard_name: sink_id.to_string(), + handle_purpose: "WIP".into(), + }, + ) + .await + .expect("codecs should match") + } + }; + + let button = continual_task_sink( + &sink_id.to_string(), + to_append, + append_times, + write_handle, + start_signal, ); + Some(Rc::new(button.press_on_drop())) + } +} + +fn continual_task_sink>( + name: &str, + to_append: Collection, + append_times: Collection, + write_handle: impl Future> + Send + 'static, + start_signal: StartSignal, +) -> Button { + let scope = to_append.scope(); + let operator_name = format!("ContinualTask({})", name); + let mut op = AsyncOperatorBuilder::new(operator_name, scope.clone()); + + // TODO(ct): This all works perfectly well data parallel (assuming we + // broadcast the append_times). We just need to hook it up to the + // multi-worker persist-sink, but that requires some refactoring. This would + // also remove the need for this to be an async timely operator. + let active_worker = name.hashed(); + let to_append_input = + op.new_disconnected_input(&to_append.inner, Exchange::new(move |_| active_worker)); + let append_times_input = + op.new_disconnected_input(&append_times.inner, Exchange::new(move |_| active_worker)); + + let active_worker = usize::cast_from(active_worker) % scope.peers() == scope.index(); + let button = op.build(move |_capabilities| async move { + if !active_worker { + return; + } + let () = start_signal.await; + let mut write_handle = write_handle.await; + + #[derive(Debug)] + enum OpEvent { + ToAppend(Event>), + AppendTimes(Event>), + } + + impl OpEvent { + fn apply(self, state: &mut SinkState) { + tracing::info!("WIP event {:?}", self); + match self { + // WIP big comment on the step_forward calls + OpEvent::ToAppend(Event::Data(_cap, x)) => state.to_append.extend( + x.into_iter() + .map(|(k, t, d)| ((k, ()), t.step_forward(), d)), + ), + OpEvent::ToAppend(Event::Progress(x)) => { + state.to_append_progress = x.into_option().expect("WIP").step_forward() + } + OpEvent::AppendTimes(Event::Data(_cap, x)) => state + .append_times + .extend(x.into_iter().map(|((), t, _d)| t)), + OpEvent::AppendTimes(Event::Progress(x)) => { + state.append_times_progress = x.into_option().expect("WIP") + } + } + } + } + + let to_insert_input = to_append_input.map(OpEvent::ToAppend); + let append_times_input = append_times_input.map(OpEvent::AppendTimes); + let mut op_inputs = futures::stream::select(to_insert_input, append_times_input); + + let mut state = SinkState::new(); + loop { + let Some(event) = op_inputs.next().await else { + // Inputs exhausted, shutting down. + return; + }; + event.apply(&mut state); + // Also drain any other events that may be ready. + while let Some(Some(event)) = op_inputs.next().now_or_never() { + event.apply(&mut state); + } + tracing::info!("WIP about to process {:?}", state); + let Some((new_upper, to_append)) = state.process() else { + continue; + }; + tracing::info!("WIP got write {:?}: {:?}", new_upper, to_append); + + let mut expected_upper = write_handle.shared_upper(); + while expected_upper.less_than(&new_upper) { + let res = write_handle + .compare_and_append( + &to_append, + expected_upper.clone(), + Antichain::from_elem(new_upper), + ) + .await + .expect("usage was valid"); + match res { + Ok(()) => { + state.output_progress = new_upper; + break; + } + Err(err) => { + expected_upper = err.current; + continue; + } + } + } + } + }); + + button +} + +#[derive(Debug)] +struct SinkState { + append_times: BTreeSet, + append_times_progress: T, + to_append: Vec<((K, V), T, D)>, + to_append_progress: T, + output_progress: T, +} + +impl SinkState { + fn new() -> Self { + SinkState { + // The known times at which we're going to write data to the output. + // This is guaranteed to include all times < append_times_progress, + // except that ones < output_progress may have been truncated. + append_times: BTreeSet::new(), + append_times_progress: Timestamp::minimum(), + + // The data we've collected to append to the output. This is often + // compacted to advancing times and is expected to be ~empty in the + // steady state. + to_append: Vec::new(), + to_append_progress: Timestamp::minimum(), + + // A lower bound on the upper of the output. + output_progress: Timestamp::minimum(), + } + } + + /// Returns data to write to the output, if any, and the new upper to use. + /// + /// TODO(ct): Remove the Vec allocation here. + fn process(&mut self) -> Option<(Timestamp, Vec<((&K, &V), &Timestamp, &D)>)> { + // We can only append at times >= the output_progress, so pop off + // anything unnecessary. + while let Some(x) = self.append_times.first() { + // WIP regression test for >= vs > + if *x >= self.output_progress { + break; + } + self.append_times.pop_first(); + } + + // WIP bug: not allowed to return anything from append_times until we + // have progress info, because they could arrive out of order + let write_ts = match self.append_times.first() { + Some(x) if self.output_progress < *x => { + // WIP regression test + return Some((x.clone(), Vec::new())); + } + Some(x) => x, + // The CT sink's contract is that it only writes data at times we + // received an input diff. There are none in `[output_progress, + // append_times_progress)`, so we can go ahead and advance the upper + // of the output. + // + // We could instead ensure liveness by basing this off of to_append, + // but for any CTs reading the output (expected to be a common case) + // we'd end up looping each timestamp through persist one-by-one. + None if self.output_progress < self.append_times_progress => { + return Some((self.append_times_progress, Vec::new())); + } + // Nothing to do! + None => return None, + }; + + if self.to_append_progress <= *write_ts { + // Don't have all the necessary data yet. + return None; + } + + // Time to write some data! Produce the collection as of write_ts by + // advancing timestamps, consolidating, and filtering out anything at + // future timestamps. + let as_of = &[write_ts.clone()]; + for (_, t, _) in &mut self.to_append { + t.advance_by(AntichainRef::new(as_of)) + } + consolidate_updates(&mut self.to_append); + // WIP resize the vec down if cap >> len? + + let append_data = self + .to_append + .iter() + .filter_map(|((k, v), t, d)| (t <= write_ts).then_some(((k, v), t, d))) + .collect(); + Some((write_ts.step_forward(), append_data)) } } + +pub(crate) fn step_backward( + name: &str, + input: Collection, +) -> (PressOnDropButton, Collection) +where + G: Scope, +{ + let name_orig = name.to_string(); + let name = format!("StepBackward({})", name); + let mut builder = AsyncOperatorBuilder::new(name.clone(), input.scope()); + let mut input = builder.new_disconnected_input(&input.inner, Pipeline); + let (mut output, output_stream) = builder.new_output(); + let button = builder.build(move |caps| async move { + let [mut cap]: [_; 1] = caps.try_into().expect("one capability per output"); + loop { + let Some(event) = input.next().await else { + tracing::info!("WIP StepBackward exhausted, shutting down"); + return; + }; + match event { + Event::Data(_data_cap, mut data) => { + // tracing::info!("{} input data {:?}", name, data); + let mut retractions = Vec::new(); + for (row, ts, diff) in &mut data { + let orig_ts = *ts; + *ts = ts.step_back().expect("WIP"); + let mut negated = *diff; + differential_dataflow::Diff::negate(&mut negated); + tracing::info!( + "WIP {} read ct input\n {:?} read {} {:?} {}\n {:?} diff {} {:?} {}\n {:?} diff {} {:?} {}", + name_orig, + orig_ts, + name_orig, + row, + diff, + ts, + name_orig, + row, + diff, + orig_ts, + name_orig, + row, + negated, + ); + retractions.push((row.clone(), orig_ts, negated)); + } + tracing::debug!("{} emitting data {:?}", name, data); + output.give_container(&cap, &mut data); + output.give_container(&cap, &mut retractions); + } + Event::Progress(new_progress) => { + let Some(new_progress) = new_progress.into_option() else { + return; + }; + let new_progress = new_progress.step_back().expect("WIP"); + if cap.time() < &new_progress { + tracing::debug!("{} downgrading cap to {:?}", name, new_progress); + cap.downgrade(&new_progress); + } + } + } + } + }); + + (button.press_on_drop(), Collection::new(output_stream)) +} diff --git a/src/compute/src/sink/copy_to_s3_oneshot.rs b/src/compute/src/sink/copy_to_s3_oneshot.rs index 9c0a9cebe5eaa..c95a9c75ee18b 100644 --- a/src/compute/src/sink/copy_to_s3_oneshot.rs +++ b/src/compute/src/sink/copy_to_s3_oneshot.rs @@ -45,6 +45,7 @@ where _start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + _: Option>, ) -> Option> { // An encapsulation of the copy to response protocol. // Used to send rows and errors if this fails. diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index a2ee9846e4349..e6cad135e751d 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -60,6 +60,7 @@ where start_signal: StartSignal, mut ok_collection: Collection, mut err_collection: Collection, + _: Option>, ) -> Option> { // Attach a probe reporting the compute frontier. // The `apply_refresh` operator can round up frontiers, making it impossible to accurately diff --git a/src/compute/src/sink/subscribe.rs b/src/compute/src/sink/subscribe.rs index 9768a09ba976b..6f4ff72deced1 100644 --- a/src/compute/src/sink/subscribe.rs +++ b/src/compute/src/sink/subscribe.rs @@ -42,6 +42,7 @@ where _start_signal: StartSignal, sinked_collection: Collection, err_collection: Collection, + _: Option>, ) -> Option> { // An encapsulation of the Subscribe response protocol. // Used to send rows and progress messages, diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 75800afe84e13..17b2a612b2466 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -22,7 +22,7 @@ use mz_ore::str::StrExt; use mz_repr::role_id::RoleId; use mz_repr::ColumnName; use mz_repr::GlobalId; -use mz_sql_parser::ast::Expr; +use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr}; use mz_sql_parser::ident; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; @@ -1569,6 +1569,17 @@ impl<'a> Fold for NameResolver<'a> { result } + fn fold_create_continual_task_statement( + &mut self, + stmt: CreateContinualTaskStatement, + ) -> CreateContinualTaskStatement { + // TODO(ct): Insert a fake CTE so that using the name of the continual + // task in the inserts and deletes resolves. + let cte_name = normalize::ident(stmt.name.0.last().expect("WIP").clone()); + self.ctes.insert(cte_name, LocalId::new(0)); + mz_sql_parser::ast::fold::fold_create_continual_task_statement(self, stmt) + } + fn fold_cte_id(&mut self, _id: ::CteId) -> ::CteId { panic!("this should have been handled when walking the CTE"); } diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index abb15ef31e3e2..bf74618a378f2 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -21,12 +21,13 @@ use mz_repr::{ColumnName, GlobalId}; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::visit_mut::{self, VisitMut}; use mz_sql_parser::ast::{ - CreateConnectionStatement, CreateContinualTaskStatement, CreateIndexStatement, - CreateMaterializedViewStatement, CreateSecretStatement, CreateSinkStatement, - CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement, - CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, - CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, MutRecBlock, Op, Query, Statement, - TableFactor, UnresolvedItemName, UnresolvedSchemaName, Value, ViewDefinition, + ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement, + CreateIndexStatement, CreateMaterializedViewStatement, CreateSecretStatement, + CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement, + CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement, + CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, + MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName, + Value, ViewDefinition, }; use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier}; @@ -414,12 +415,23 @@ pub fn create_statement( Statement::CreateContinualTask(CreateContinualTaskStatement { name, - columns, - input, + // WIP do we need to normalize columns and input? + columns: _, + input: _, stmts, - in_cluster, + in_cluster: _, }) => { - todo!("WIP {:?}", (name, columns, input, stmts, in_cluster)); + *name = allocate_name(name)?; + for stmt in stmts { + let mut normalizer = QueryNormalizer::new(); + match stmt { + ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt), + ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt), + } + if let Some(err) = normalizer.err { + return Err(err); + } + } } Statement::CreateIndex(CreateIndexStatement { diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index dbaa76c5d4c66..4ab3f51d2c2bc 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -700,7 +700,14 @@ pub struct CreateMaterializedViewPlan { } #[derive(Debug, Clone)] -pub struct CreateContinualTaskPlan {} +pub struct CreateContinualTaskPlan { + pub name: QualifiedItemName, + pub desc: RelationDesc, + // TODO(ct): Multiple inputs. + pub input_id: GlobalId, + pub continual_task: MaterializedView, + // TODO(ct): replace, drop_ids, if_not_exists +} #[derive(Debug, Clone)] pub struct CreateIndexPlan { diff --git a/src/sql/src/plan/query.rs b/src/sql/src/plan/query.rs index 1dad7b41c41ea..5303002e40a11 100644 --- a/src/sql/src/plan/query.rs +++ b/src/sql/src/plan/query.rs @@ -165,6 +165,57 @@ pub fn plan_root_query( }) } +/// TODO(ct): Dedup this with [plan_root_query]. +#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")] +pub fn plan_ct_query( + mut qcx: &mut QueryContext, + mut query: Query, +) -> Result, PlanError> { + transform_ast::transform(qcx.scx, &mut query)?; + let PlannedQuery { + mut expr, + scope, + order_by, + limit, + offset, + project, + group_size_hints, + } = plan_query(&mut qcx, &query)?; + + let mut finishing = RowSetFinishing { + limit, + offset, + project, + order_by, + }; + + // Attempt to push the finishing's ordering past its projection. This allows + // data to be projected down on the workers rather than the coordinator. It + // also improves the optimizer's demand analysis, as the optimizer can only + // reason about demand information in `expr` (i.e., it can't see + // `finishing.project`). + try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by); + + expr.finish_maintained(&mut finishing, group_size_hints); + + let typ = qcx.relation_type(&expr); + let typ = RelationType::new( + finishing + .project + .iter() + .map(|i| typ.column_types[*i].clone()) + .collect(), + ); + let desc = RelationDesc::new(typ, scope.column_names()); + + Ok(PlannedRootQuery { + expr, + desc, + finishing, + scope, + }) +} + /// Attempts to push a projection through an order by. /// /// The returned bool indicates whether the pushdown was successful or not. @@ -6083,8 +6134,8 @@ impl QueryLifetime { /// Description of a CTE sufficient for query planning. #[derive(Debug, Clone)] pub struct CteDesc { - name: String, - desc: RelationDesc, + pub name: String, + pub desc: RelationDesc, } /// The state required when planning a `Query`. diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index fbd799f92b72c..f5601e4436350 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -22,7 +22,7 @@ use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_ use mz_controller_types::{ is_cluster_size_v2, ClusterId, ReplicaId, DEFAULT_REPLICA_LOGGING_INTERVAL, }; -use mz_expr::{CollectionPlan, UnmaterializableFunc}; +use mz_expr::{CollectionPlan, LocalId, UnmaterializableFunc}; use mz_interchange::avro::{AvroSchemaGenerator, DocTarget}; use mz_ore::cast::{CastFrom, TryCastFrom}; use mz_ore::collections::{CollectionExt, HashSet}; @@ -122,7 +122,9 @@ use crate::names::{ }; use crate::normalize::{self, ident}; use crate::plan::error::PlanError; -use crate::plan::query::{plan_expr, scalar_type_from_catalog, ExprContext, QueryLifetime}; +use crate::plan::query::{ + plan_expr, scalar_type_from_catalog, scalar_type_from_sql, CteDesc, ExprContext, QueryLifetime, +}; use crate::plan::scope::Scope; use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS}; use crate::plan::statement::{scl, StatementContext, StatementDesc}; @@ -137,10 +139,10 @@ use crate::plan::{ AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, - CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan, - CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, - CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, DropObjectsPlan, - DropOwnedPlan, FullItemName, Index, Ingestion, MaterializedView, Params, Plan, + CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, + CreateMaterializedViewPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, + CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, + DropObjectsPlan, DropOwnedPlan, FullItemName, Index, Ingestion, MaterializedView, Params, Plan, PlanClusterOption, PlanNotice, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, WebhookValidation, @@ -2530,10 +2532,175 @@ generate_extracted_config!( pub fn plan_create_continual_task( scx: &StatementContext, - stmt: CreateContinualTaskStatement, + mut stmt: CreateContinualTaskStatement, params: &Params, ) -> Result { - todo!("WIP {:?}", (scx, stmt, params)); + let cluster_id = match &stmt.in_cluster { + None => scx.catalog.resolve_cluster(None)?.id(), + Some(in_cluster) => in_cluster.id, + }; + stmt.in_cluster = Some(ResolvedClusterName { + id: cluster_id, + print_name: None, + }); + + let create_sql = + normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?; + + let fake_cte_name = normalize::ident(stmt.name.0.last().expect("WIP").clone()); + let partial_name = normalize::unresolved_item_name(stmt.name)?; + let name = scx.allocate_qualified_name(partial_name.clone())?; + let desc = { + let mut desc_columns = Vec::with_capacity(stmt.columns.capacity()); + for col in stmt.columns.iter() { + desc_columns.push(( + normalize::column_name(col.name.clone()), + ColumnType { + scalar_type: scalar_type_from_sql(scx, &col.data_type)?, + nullable: true, + }, + )); + } + RelationDesc::from_names_and_types(desc_columns) + }; + let input = scx.get_item_by_resolved_name(&stmt.input)?; + + let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView); + qcx.ctes.insert( + LocalId::new(0), + CteDesc { + name: fake_cte_name.clone(), + desc: desc.clone(), + }, + ); + + let mut exprs = Vec::new(); + let mut has_write = BTreeSet::new(); + for stmt in &stmt.stmts { + let query = continual_task_query(&fake_cte_name, &mut has_write, stmt) + .ok_or_else(|| sql_err!("TODO(ct)"))?; + let query::PlannedRootQuery { + mut expr, + desc: _, + finishing, + scope: _, + } = query::plan_ct_query(&mut qcx, query)?; + // We get back a trivial finishing, see comment in `plan_view`. + assert!(finishing.is_trivial(expr.arity())); + // TODO(ct): Is this right? + expr.bind_parameters(params)?; + // TODO(ct): Validate the planned desc. + match stmt { + ast::ContinualTaskStmt::Insert(_) => exprs.push(expr), + ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()), + } + } + // TODO(ct): Collect things by output and assert that there is only one (or + // support multiple outputs). + let expr = exprs + .into_iter() + .reduce(|acc, expr| acc.union(expr)) + .ok_or_else(|| sql_err!("TODO(ct)"))?; + + let column_names: Vec = desc.iter_names().cloned().collect(); + if let Some(dup) = column_names.iter().duplicates().next() { + sql_bail!("column {} specified more than once", dup.as_str().quoted()); + } + + // Check for an object in the catalog with this same name + let full_name = scx.catalog.resolve_full_name(&name); + let partial_name = PartialItemName::from(full_name.clone()); + // For PostgreSQL compatibility, we need to prevent creating this when there + // is an existing object *or* type of the same name. + if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) { + return Err(PlanError::ItemAlreadyExists { + name: full_name.to_string(), + item_type: item.item_type(), + }); + } + + Ok(Plan::CreateContinualTask(CreateContinualTaskPlan { + name, + desc, + input_id: input.id(), + continual_task: MaterializedView { + create_sql, + expr, + column_names, + cluster_id, + non_null_assertions: Vec::new(), + compaction_window: None, + refresh_schedule: None, + as_of: None, + }, + })) +} + +fn continual_task_query<'a>( + fake_cte_name: &str, + has_write: &mut BTreeSet<&'a ResolvedItemName>, + stmt: &'a ast::ContinualTaskStmt, +) -> Option> { + match stmt { + ast::ContinualTaskStmt::Insert(ast::InsertStatement { + table_name, + columns, + source, + returning, + }) => { + // Inserts are blind writes so don't care if we already have a + // write. + has_write.insert(table_name); + if !columns.is_empty() || !returning.is_empty() { + return None; + } + match source { + ast::InsertSource::Query(query) => Some(query.clone()), + ast::InsertSource::DefaultValues => None, + } + } + ast::ContinualTaskStmt::Delete(ast::DeleteStatement { + table_name, + alias, + using, + selection, + }) => { + if has_write.contains(table_name) || !using.is_empty() { + return None; + } + has_write.insert(table_name); + // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`. + let from = ast::TableWithJoins { + relation: ast::TableFactor::Table { + // TODO(ct): Huge hack. + name: ResolvedItemName::Cte { + id: LocalId::new(0), + name: fake_cte_name.to_owned(), + }, + alias: alias.clone(), + }, + joins: Vec::new(), + }; + let select = ast::Select { + from: vec![from], + selection: selection.clone(), + distinct: None, + projection: vec![ast::SelectItem::Wildcard], + group_by: Vec::new(), + having: None, + options: Vec::new(), + }; + let query = ast::Query { + ctes: ast::CteBlock::Simple(Vec::new()), + body: ast::SetExpr::Select(Box::new(select)), + order_by: Vec::new(), + limit: None, + offset: None, + }; + // Then negate it to turn it into retractions (after planning it). + Some(query) + } + } } pub fn describe_create_sink( diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 2381d57088518..a813c3474be75 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -601,7 +601,27 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, - Plan::CreateContinualTask(plan::CreateContinualTaskPlan {}) => todo!("WIP"), + Plan::CreateContinualTask(plan::CreateContinualTaskPlan { + name, + desc: _, + input_id: _, + continual_task, + }) => RbacRequirements { + privileges: vec![ + ( + SystemObjectId::Object(name.qualifiers.clone().into()), + AclMode::CREATE, + role_id, + ), + ( + SystemObjectId::Object(continual_task.cluster_id.into()), + AclMode::CREATE, + role_id, + ), + ], + item_usage: &CREATE_ITEM_USAGE, + ..Default::default() + }, Plan::CreateIndex(plan::CreateIndexPlan { name, index, diff --git a/test/sqllogictest/ct_audit_log.slt b/test/sqllogictest/ct_audit_log.slt new file mode 100644 index 0000000000000..064ad6ea65cb5 --- /dev/null +++ b/test/sqllogictest/ct_audit_log.slt @@ -0,0 +1,48 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +statement ok +CREATE TABLE mv_input (key INT8) + +statement ok +INSERT INTO mv_input VALUES (1); + +statement ok +CREATE MATERIALIZED VIEW anomalies AS SELECT count(*)::INT8 FROM mv_input; + +query I +SELECT * FROM anomalies +---- +1 + +statement ok +CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS ( + INSERT INTO audit_log SELECT * FROM anomalies; +) + +query I +SELECT * FROM audit_log +---- +1 + +statement ok +INSERT INTO mv_input VALUES (2), (3) + +query I +SELECT * FROM anomalies +---- +3 + +query I +SELECT * FROM audit_log +---- +1 +3 diff --git a/test/sqllogictest/ct_stream_table_join.slt b/test/sqllogictest/ct_stream_table_join.slt new file mode 100644 index 0000000000000..17db3d53b2b63 --- /dev/null +++ b/test/sqllogictest/ct_stream_table_join.slt @@ -0,0 +1,44 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +statement ok +CREATE TABLE big (key INT) + +statement ok +CREATE TABLE small (key INT, val STRING) + +statement ok +INSERT INTO small VALUES (1, 'v0') + +statement ok +CREATE CONTINUAL TASK stj (key INT, val STRING) ON INPUT big AS ( + INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key; +) + +statement ok +INSERT INTO big VALUES (1) + +query IT +SELECT * FROM stj +---- +1 v0 + +statement ok +UPDATE small SET val = 'v1' + +statement ok +INSERT INTO big VALUES (1) + +query IT +SELECT * FROM stj +---- +1 v0 +1 v1 diff --git a/test/sqllogictest/ct_upsert.slt b/test/sqllogictest/ct_upsert.slt new file mode 100644 index 0000000000000..3535d918c6260 --- /dev/null +++ b/test/sqllogictest/ct_upsert.slt @@ -0,0 +1,36 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +statement ok +CREATE TABLE append_only (key INT, val INT) + +statement ok +CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS ( + DELETE FROM upsert WHERE key IN (SELECT key FROM append_only); + INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key; +) + +statement ok +INSERT INTO append_only VALUES (1, 2), (1, 1) + +query II +SELECT * FROM upsert +---- +1 2 + +statement ok +INSERT INTO append_only VALUES (1, 3), (2, 4) + +query IT +SELECT * FROM upsert +---- +1 3 +2 4