From 2712c06be494ec847cec8cbd1a5f73f393ac17a0 Mon Sep 17 00:00:00 2001
From: Daniel Harrison <52528+danhhz@users.noreply.github.com>
Date: Fri, 6 Sep 2024 08:55:58 -0700
Subject: [PATCH] [DNM] ct: add strawman impl of CREATE CONTINUAL TASK

---
 src/adapter/src/catalog.rs                    |  17 +-
 src/adapter/src/catalog/apply.rs              |  20 +-
 .../src/catalog/builtin_table_updates.rs      |  17 +-
 src/adapter/src/catalog/open.rs               |   1 +
 src/adapter/src/catalog/state.rs              |  55 ++-
 src/adapter/src/coord/command_handler.rs      |   8 +-
 src/adapter/src/coord/ddl.rs                  |   7 +-
 .../sequencer/inner/create_continual_task.rs  | 183 +++++++++-
 .../inner/create_materialized_view.rs         |   2 +-
 src/compute-client/src/controller/instance.rs |  17 +-
 src/compute-types/src/sinks.proto             |   4 +
 src/compute-types/src/sinks.rs                |  19 +-
 src/compute/src/render.rs                     | 111 +++++-
 src/compute/src/render/sinks.rs               |   4 +
 src/compute/src/sink/continual_task.rs        | 344 +++++++++++++++++-
 src/compute/src/sink/copy_to_s3_oneshot.rs    |   1 +
 src/compute/src/sink/persist_sink.rs          |   1 +
 src/compute/src/sink/subscribe.rs             |   1 +
 src/sql/src/names.rs                          |  13 +-
 src/sql/src/normalize.rs                      |  32 +-
 src/sql/src/plan.rs                           |   9 +-
 src/sql/src/plan/query.rs                     |  55 ++-
 src/sql/src/plan/statement.rs                 |  15 +-
 src/sql/src/plan/statement/ddl.rs             | 189 +++++++++-
 src/sql/src/rbac.rs                           |  22 +-
 test/sqllogictest/ct_audit_log.slt            |  48 +++
 test/sqllogictest/ct_stream_table_join.slt    |  44 +++
 test/sqllogictest/ct_upsert.slt               |  36 ++
 28 files changed, 1173 insertions(+), 102 deletions(-)
 create mode 100644 test/sqllogictest/ct_audit_log.slt
 create mode 100644 test/sqllogictest/ct_stream_table_join.slt
 create mode 100644 test/sqllogictest/ct_upsert.slt

diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs
index c397638d3dee8..8dd138fe75f44 100644
--- a/src/adapter/src/catalog.rs
+++ b/src/adapter/src/catalog.rs
@@ -360,6 +360,11 @@ impl Catalog {
         }
         policies
     }
+
+    /// WIP
+    pub fn hack_add_ct(&mut self, id: GlobalId, entry: CatalogEntry) {
+        self.state.entry_by_id.insert(id, entry);
+    }
 }
 
 #[derive(Debug)]
@@ -2342,7 +2347,7 @@ mod tests {
                     .expect("unable to open debug catalog");
             let item = catalog
                 .state()
-                .deserialize_item(&create_sql)
+                .deserialize_item(id, &create_sql)
                 .expect("unable to parse view");
             catalog
                 .transact(
@@ -3227,16 +3232,16 @@ mod tests {
             let schema_spec = schema.id().clone();
             let schema_name = &schema.name().schema;
             let database_spec = ResolvedDatabaseSpecifier::Id(database_id);
+            let mv_id = catalog
+                .allocate_user_id()
+                .await
+                .expect("unable to allocate id");
             let mv = catalog
                 .state()
-                .deserialize_item(&format!(
+                .deserialize_item(mv_id, &format!(
                     "CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"
                 ))
                 .expect("unable to deserialize item");
-            let mv_id = catalog
-                .allocate_user_id()
-                .await
-                .expect("unable to allocate id");
             catalog
                 .transact(
                     None,
diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs
index 746a47a2030f6..604831bd54d54 100644
--- a/src/adapter/src/catalog/apply.rs
+++ b/src/adapter/src/catalog/apply.rs
@@ -611,6 +611,7 @@ impl CatalogState {
             Builtin::Index(index) => {
                 let mut item = self
                     .parse_item(
+                        Some(id),
                         &index.create_sql(),
                         None,
                         index.is_retained_metrics_object,
@@ -736,6 +737,7 @@ impl CatalogState {
             Builtin::Connection(connection) => {
                 let mut item = self
                     .parse_item(
+                        Some(id),
                         connection.sql,
                         None,
                         false,
@@ -855,9 +857,11 @@ impl CatalogState {
                         // This makes it difficult to use the `UpdateFrom` trait, but the structure
                         // is still the same as the trait.
                         if retraction.create_sql() != create_sql {
-                            let item = self.deserialize_item(&create_sql).unwrap_or_else(|e| {
-                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
-                            });
+                            let item =
+                                self.deserialize_item(item.id, &create_sql)
+                                    .unwrap_or_else(|e| {
+                                        panic!("{e:?}: invalid persisted SQL: {create_sql}")
+                                    });
                             retraction.item = item;
                         }
                         retraction.id = id;
@@ -869,9 +873,11 @@ impl CatalogState {
                         retraction
                     }
                     None => {
-                        let catalog_item = self.deserialize_item(&create_sql).unwrap_or_else(|e| {
-                            panic!("{e:?}: invalid persisted SQL: {create_sql}")
-                        });
+                        let catalog_item = self
+                            .deserialize_item(item.id, &create_sql)
+                            .unwrap_or_else(|e| {
+                                panic!("{e:?}: invalid persisted SQL: {create_sql}")
+                            });
                         CatalogEntry {
                             item: catalog_item,
                             referenced_by: Vec::new(),
@@ -1182,7 +1188,7 @@ impl CatalogState {
                     let handle = mz_ore::task::spawn(
                         || "parse view",
                         async move {
-                            let res = task_state.parse_item(&create_sql, None, false, None);
+                            let res = task_state.parse_item(None, &create_sql, None, false, None);
                             (id, res)
                         }
                         .instrument(span),
diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs
index 0ba2e38465804..959f932ec2869 100644
--- a/src/adapter/src/catalog/builtin_table_updates.rs
+++ b/src/adapter/src/catalog/builtin_table_updates.rs
@@ -1242,16 +1242,19 @@ impl CatalogState {
             })
             .into_element()
             .ast;
-        let query = match &create_stmt {
-            Statement::CreateMaterializedView(stmt) => &stmt.query,
+        let query_string = match &create_stmt {
+            Statement::CreateMaterializedView(stmt) => {
+                let mut query_string = stmt.query.to_ast_string_stable();
+                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
+                // do the same for compatibility's sake.
+                query_string.push(';');
+                query_string
+            }
+            // TODO(ct): Remove.
+            Statement::CreateContinualTask(_) => "TODO(ct)".into(),
             _ => unreachable!(),
         };
 
-        let mut query_string = query.to_ast_string_stable();
-        // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
-        // do the same for compatibility's sake.
-        query_string.push(';');
-
         let mut updates = Vec::new();
 
         updates.push(BuiltinTableUpdate {
diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs
index ebaef9d075ccd..6cd31a63f9030 100644
--- a/src/adapter/src/catalog/open.rs
+++ b/src/adapter/src/catalog/open.rs
@@ -152,6 +152,7 @@ impl CatalogItemRebuilder {
                 custom_logical_compaction_window,
             } => state
                 .parse_item(
+                    None,
                     &sql,
                     None,
                     is_retained_metrics_object,
diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs
index 5d27d2b8d58b2..42ea2c73c34af 100644
--- a/src/adapter/src/catalog/state.rs
+++ b/src/adapter/src/catalog/state.rs
@@ -38,6 +38,8 @@ use mz_controller::clusters::{
     UnmanagedReplicaLocation,
 };
 use mz_controller_types::{ClusterId, ReplicaId};
+use mz_expr::visit::Visit;
+use mz_expr::{Id, LocalId};
 use mz_ore::collections::CollectionExt;
 use mz_ore::now::NOW_ZERO;
 use mz_ore::soft_assert_no_log;
@@ -67,7 +69,7 @@ use mz_sql::names::{
 use mz_sql::plan::{
     CreateConnectionPlan, CreateContinualTaskPlan, CreateIndexPlan, CreateMaterializedViewPlan,
     CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
-    CreateViewPlan, Params, Plan, PlanContext,
+    CreateViewPlan, HirRelationExpr, Params, Plan, PlanContext,
 };
 use mz_sql::rbac;
 use mz_sql::session::metadata::SessionMetadata;
@@ -787,14 +789,19 @@ impl CatalogState {
     }
 
     /// Parses the given SQL string into a pair of [`CatalogItem`].
-    pub(crate) fn deserialize_item(&self, create_sql: &str) -> Result<CatalogItem, AdapterError> {
-        self.parse_item(create_sql, None, false, None)
+    pub(crate) fn deserialize_item(
+        &self,
+        id: GlobalId,
+        create_sql: &str,
+    ) -> Result<CatalogItem, AdapterError> {
+        self.parse_item(Some(id), create_sql, None, false, None)
     }
 
     /// Parses the given SQL string into a `CatalogItem`.
     #[mz_ore::instrument]
     pub(crate) fn parse_item(
         &self,
+        item_id: Option<GlobalId>,
         create_sql: &str,
         pcx: Option<&PlanContext>,
         is_retained_metrics_object: bool,
@@ -942,7 +949,47 @@ impl CatalogState {
                     initial_as_of,
                 })
             }
-            Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"),
+            Plan::CreateContinualTask(CreateContinualTaskPlan { continual_task, .. }) => {
+                // Collect optimizer parameters.
+                let optimizer_config =
+                    optimize::OptimizerConfig::from(session_catalog.system_vars());
+                // Build an optimizer for this VIEW.
+                // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
+                let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
+
+                let mut raw_expr = continual_task.expr;
+                // Replace our placeholder fake ctes with the real output id, now that
+                // we have it.
+                raw_expr.visit_mut_post(&mut |expr| match expr {
+                    HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => {
+                        *id = Id::Global(item_id.expect("WIP hacks"));
+                    }
+                    _ => {}
+                })?;
+
+                let optimized_expr = optimizer.optimize(raw_expr.clone())?;
+                let mut typ = optimized_expr.typ();
+                for &i in &continual_task.non_null_assertions {
+                    typ.column_types[i].nullable = false;
+                }
+                let desc = RelationDesc::new(typ, continual_task.column_names);
+
+                let initial_as_of = continual_task.as_of.map(Antichain::from_elem);
+
+                // TODO(ct): CatalogItem::ContinualTask
+                CatalogItem::MaterializedView(MaterializedView {
+                    create_sql: continual_task.create_sql,
+                    raw_expr,
+                    optimized_expr,
+                    desc,
+                    resolved_ids,
+                    cluster_id: continual_task.cluster_id,
+                    non_null_assertions: continual_task.non_null_assertions,
+                    custom_logical_compaction_window: continual_task.compaction_window,
+                    refresh_schedule: continual_task.refresh_schedule,
+                    initial_as_of,
+                })
+            }
             Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
                 create_sql: index.create_sql,
                 on: index.on,
diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs
index e7e5f8c2c0689..53f0be6a11a58 100644
--- a/src/adapter/src/coord/command_handler.rs
+++ b/src/adapter/src/coord/command_handler.rs
@@ -45,7 +45,6 @@ use mz_sql::session::user::User;
 use mz_sql::session::vars::{
     EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
 };
-use mz_sql_parser::ast::display::AstDisplay;
 use mz_sql_parser::ast::{
     CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
     WithOptionValue,
@@ -501,7 +500,7 @@ impl Coordinator {
         self.handle_execute_inner(stmt, params, ctx).await
     }
 
-    #[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
+    #[instrument(name = "coord::handle_execute_inner")]
     pub(crate) async fn handle_execute_inner(
         &mut self,
         stmt: Arc<Statement<Raw>>,
@@ -1060,7 +1059,10 @@ impl Coordinator {
             .any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
         {
             let catalog = self.catalog().for_session(session);
-            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
+            let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(
+                &catalog,
+                cmvs.in_cluster.as_ref(),
+            )?;
             let ids = self
                 .index_oracle(cluster)
                 .sufficient_collections(resolved_ids.0.iter());
diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs
index 24ac16d713346..db2d3b49854c4 100644
--- a/src/adapter/src/coord/ddl.rs
+++ b/src/adapter/src/coord/ddl.rs
@@ -255,10 +255,15 @@ impl Coordinator {
                                         indexes_to_drop.push((*cluster_id, *id));
                                     }
                                     CatalogItem::MaterializedView(MaterializedView {
+                                        create_sql,
                                         cluster_id,
                                         ..
                                     }) => {
-                                        materialized_views_to_drop.push((*cluster_id, *id));
+                                        if create_sql.to_lowercase().contains("continual task") {
+                                            // WIP noop because it's not actually running under this id
+                                        } else {
+                                            materialized_views_to_drop.push((*cluster_id, *id));
+                                        }
                                     }
                                     CatalogItem::View(_) => views_to_drop.push(*id),
                                     CatalogItem::Secret(_) => {
diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs
index f94fa182182aa..1afce0277ce62 100644
--- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs
+++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs
@@ -7,15 +7,34 @@
 // the Business Source License, use of this software will be governed
 // by the Apache License, Version 2.0.
 
+use std::sync::Arc;
+
+use mz_catalog::memory::objects::{
+    CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource,
+};
+use mz_compute_types::sinks::{
+    ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection,
+};
+use mz_expr::visit::Visit;
+use mz_expr::{Id, LocalId};
 use mz_ore::instrument;
+use mz_repr::adt::mz_acl_item::PrivilegeMap;
+use mz_repr::optimize::OverrideFrom;
 use mz_sql::names::ResolvedIds;
-use mz_sql::plan;
+use mz_sql::plan::{self, HirRelationExpr};
+use mz_sql::session::metadata::SessionMetadata;
+use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};
 
+use crate::catalog;
 use crate::command::ExecuteResponse;
 use crate::coord::Coordinator;
 use crate::error::AdapterError;
+use crate::optimize::dataflows::dataflow_import_id_bundle;
+use crate::optimize::{self, Optimize};
 use crate::session::Session;
+use crate::util::ResultExt;
 
+// TODO(ct): Big oof. Dedup a bunch of this with MVs.
 impl Coordinator {
     #[instrument]
     pub(crate) async fn sequence_create_continual_task(
@@ -24,6 +43,166 @@ impl Coordinator {
         plan: plan::CreateContinualTaskPlan,
         resolved_ids: ResolvedIds,
     ) -> Result<ExecuteResponse, AdapterError> {
-        todo!("WIP {:?}", (session, plan, resolved_ids));
+        let plan::CreateContinualTaskPlan {
+            name,
+            desc,
+            input_id,
+            continual_task:
+                plan::MaterializedView {
+                    create_sql,
+                    cluster_id,
+                    mut expr,
+                    column_names,
+                    non_null_assertions,
+                    compaction_window: _,
+                    refresh_schedule,
+                    as_of: _,
+                },
+        } = plan;
+
+        // Collect optimizer parameters.
+        let compute_instance = self
+            .instance_snapshot(cluster_id)
+            .expect("compute instance does not exist");
+
+        let debug_name = self.catalog().resolve_full_name(&name, None).to_string();
+        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
+            .override_from(&self.catalog.get_cluster(cluster_id).config.features());
+
+        let view_id = self.allocate_transient_id();
+        let bonus_id = self.allocate_transient_id();
+        let catalog_mut = self.catalog_mut();
+        let sink_id = catalog_mut.allocate_user_id().await?;
+
+        // Put a placeholder in the table so the optimizer can find something
+        // for the sink_id.
+        let fake_entry = CatalogEntry {
+            item: CatalogItem::Table(Table {
+                create_sql: Some(create_sql.clone()),
+                desc: desc.clone(),
+                conn_id: None,
+                resolved_ids: resolved_ids.clone(),
+                custom_logical_compaction_window: None,
+                is_retained_metrics_object: false,
+                data_source: TableDataSource::TableWrites {
+                    defaults: Vec::new(),
+                },
+            }),
+            referenced_by: Vec::new(),
+            used_by: Vec::new(),
+            id: sink_id,
+            oid: 0,
+            name: name.clone(),
+            owner_id: *session.current_role_id(),
+            privileges: PrivilegeMap::new(),
+        };
+        catalog_mut.hack_add_ct(sink_id, fake_entry);
+
+        // Build an optimizer for this CONTINUAL TASK.
+        let mut optimizer = optimize::materialized_view::Optimizer::new(
+            Arc::new(catalog_mut.clone()),
+            compute_instance,
+            sink_id,
+            view_id,
+            column_names,
+            non_null_assertions,
+            refresh_schedule.clone(),
+            debug_name,
+            optimizer_config,
+            self.optimizer_metrics(),
+        );
+
+        // Replace our placeholder fake ctes with the real output id, now that
+        // we have it.
+        expr.visit_mut_post(&mut |expr| match expr {
+            HirRelationExpr::Get { id, .. } if *id == Id::Local(LocalId::new(0)) => {
+                *id = Id::Global(sink_id);
+            }
+            _ => {}
+        })?;
+
+        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
+        let raw_expr = expr.clone();
+        let local_mir_plan = optimizer.catch_unwind_optimize(expr)?;
+        let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?;
+        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
+        let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
+
+        // Timestamp selection
+        let mut id_bundle =
+            dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id.clone());
+        // WIP yay more hacks
+        id_bundle.storage_ids.remove(&sink_id);
+        let read_holds = self.acquire_read_holds(&id_bundle);
+        let (dataflow_as_of, storage_as_of, _until) =
+            self.select_timestamps(id_bundle, refresh_schedule.as_ref(), &read_holds)?;
+        let output_desc = global_lir_plan.desc().clone();
+        let (mut df_desc, _df_meta) = global_lir_plan.unapply();
+        df_desc.set_as_of(dataflow_as_of.clone());
+
+        // TODO(ct): HACKs
+        let (_id, sink) = df_desc.sink_exports.pop_first().expect("WIP");
+        df_desc.sink_exports.insert(bonus_id, sink);
+        for sink in df_desc.sink_exports.values_mut() {
+            match &mut sink.connection {
+                ComputeSinkConnection::Persist(PersistSinkConnection {
+                    value_desc,
+                    storage_metadata,
+                }) => {
+                    sink.connection =
+                        ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
+                            input_id,
+                            output_id: sink_id,
+                            output_desc: value_desc.clone(),
+                            output_metadata: *storage_metadata,
+                        })
+                }
+                _ => unreachable!("MV should produce persist sink connection"),
+            }
+        }
+
+        let ops = vec![catalog::Op::CreateItem {
+            id: sink_id,
+            name: name.clone(),
+            item: CatalogItem::MaterializedView(MaterializedView {
+                create_sql,
+                raw_expr,
+                optimized_expr: local_mir_plan.expr(),
+                desc: output_desc.clone(),
+                resolved_ids,
+                cluster_id,
+                non_null_assertions: Vec::new(),
+                custom_logical_compaction_window: None,
+                refresh_schedule,
+                initial_as_of: Some(storage_as_of.clone()),
+            }),
+            owner_id: *session.current_role_id(),
+        }];
+
+        let () = self
+            .catalog_transact_with_side_effects(Some(session), ops, |coord| async {
+                coord
+                    .controller
+                    .storage
+                    .create_collections(
+                        coord.catalog.state().storage_metadata(),
+                        None,
+                        vec![(
+                            sink_id,
+                            CollectionDescription {
+                                desc: output_desc,
+                                data_source: DataSource::Other(DataSourceOther::Compute),
+                                since: Some(storage_as_of),
+                                status_collection_id: None,
+                            },
+                        )],
+                    )
+                    .await
+                    .unwrap_or_terminate("cannot fail to append");
+
+                coord.ship_dataflow(df_desc, cluster_id.clone()).await;
+            })
+            .await?;
+        Ok(ExecuteResponse::CreatedContinualTask)
     }
 }
diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
index 1143c9ef535cd..c249a74a481a3 100644
--- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
+++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
@@ -722,7 +722,7 @@ impl Coordinator {
 
     /// Select the initial `dataflow_as_of`, `storage_as_of`, and `until` frontiers for a
     /// materialized view.
-    fn select_timestamps(
+    pub(crate) fn select_timestamps(
         &self,
         id_bundle: CollectionIdBundle,
         refresh_schedule: Option<&RefreshSchedule>,
diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs
index 625cc3c8d371c..d9132910a5a9f 100644
--- a/src/compute-client/src/controller/instance.rs
+++ b/src/compute-client/src/controller/instance.rs
@@ -22,7 +22,9 @@ use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig};
 use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
 use mz_compute_types::plan::flat_plan::FlatPlan;
 use mz_compute_types::plan::LirId;
-use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection};
+use mz_compute_types::sinks::{
+    ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection, PersistSinkConnection,
+};
 use mz_compute_types::sources::SourceInstanceDesc;
 use mz_dyncfg::ConfigSet;
 use mz_expr::RowSetFinishing;
@@ -1243,7 +1245,18 @@ where
                     ComputeSinkConnection::Persist(conn)
                 }
                 ComputeSinkConnection::ContinualTask(conn) => {
-                    todo!("WIP {:?}", conn);
+                    let output_metadata = self
+                        .storage_collections
+                        .collection_metadata(conn.output_id)
+                        .map_err(|_| DataflowCreationError::CollectionMissing(id))?
+                        .clone();
+                    let conn = ContinualTaskConnection {
+                        input_id: conn.input_id,
+                        output_id: conn.output_id,
+                        output_desc: conn.output_desc,
+                        output_metadata,
+                    };
+                    ComputeSinkConnection::ContinualTask(conn)
                 }
                 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
                 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto
index e55398971c505..5aeec45cef571 100644
--- a/src/compute-types/src/sinks.proto
+++ b/src/compute-types/src/sinks.proto
@@ -48,6 +48,10 @@ message ProtoPersistSinkConnection {
 }
 
 message ProtoContinualTaskConnection {
+    mz_repr.global_id.ProtoGlobalId input_id = 4;
+    mz_repr.global_id.ProtoGlobalId output_id = 1;
+    mz_repr.relation_and_scalar.ProtoRelationDesc output_desc = 2;
+    mz_storage_types.controller.ProtoCollectionMetadata output_metadata = 3;
 }
 
 message ProtoCopyToS3OneshotSinkConnection {
diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs
index 9a3e2335c8e52..ab765de536040 100644
--- a/src/compute-types/src/sinks.rs
+++ b/src/compute-types/src/sinks.rs
@@ -256,15 +256,28 @@ impl RustType<ProtoPersistSinkConnection> for PersistSinkConnection<CollectionMe
 #[allow(missing_docs)] // WIP
 #[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
 pub struct ContinualTaskConnection<S> {
-    _phantom: std::marker::PhantomData<S>,
+    pub input_id: GlobalId,
+    pub output_id: GlobalId,
+    pub output_desc: RelationDesc,
+    pub output_metadata: S,
 }
 
 impl RustType<ProtoContinualTaskConnection> for ContinualTaskConnection<CollectionMetadata> {
     fn into_proto(&self) -> ProtoContinualTaskConnection {
-        todo!("WIP");
+        ProtoContinualTaskConnection {
+            input_id: Some(self.input_id.into_proto()),
+            output_id: Some(self.output_id.into_proto()),
+            output_desc: Some(self.output_desc.into_proto()),
+            output_metadata: Some(self.output_metadata.into_proto()),
+        }
     }
 
     fn from_proto(proto: ProtoContinualTaskConnection) -> Result<Self, TryFromProtoError> {
-        todo!("WIP {:?}", proto);
+        Ok(ContinualTaskConnection {
+            input_id: proto.input_id.into_rust_if_some("WIP")?,
+            output_id: proto.output_id.into_rust_if_some("WIP")?,
+            output_desc: proto.output_desc.into_rust_if_some("WIP")?,
+            output_metadata: proto.output_metadata.into_rust_if_some("WIP")?,
+        })
     }
 }
diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs
index d85d8d3f05222..3b4f6083a095f 100644
--- a/src/compute/src/render.rs
+++ b/src/compute/src/render.rs
@@ -121,6 +121,7 @@ use futures::FutureExt;
 use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
 use mz_compute_types::plan::flat_plan::{FlatPlan, FlatPlanNode};
 use mz_compute_types::plan::LirId;
+use mz_compute_types::sinks::ComputeSinkConnection;
 use mz_expr::{EvalError, Id};
 use mz_persist_client::operators::shard_source::SnapshotMode;
 use mz_repr::{Datum, GlobalId, Row, SharedRow};
@@ -132,7 +133,7 @@ use timely::communication::Allocate;
 use timely::container::columnation::Columnation;
 use timely::dataflow::channels::pact::Pipeline;
 use timely::dataflow::operators::to_stream::ToStream;
-use timely::dataflow::operators::{probe, BranchWhen, Operator, Probe};
+use timely::dataflow::operators::{probe, BranchWhen, Filter, Inspect, Map, Operator, Probe};
 use timely::dataflow::scopes::Child;
 use timely::dataflow::{Scope, Stream};
 use timely::order::Product;
@@ -175,6 +176,18 @@ pub fn build_compute_dataflow<A: Allocate>(
     dataflow: DataflowDescription<FlatPlan, CollectionMetadata>,
     start_signal: StartSignal,
 ) {
+    // WIP HACKS this should be a config of the source
+    let mut ct_inputs = BTreeSet::new();
+    for (_, x) in &dataflow.sink_exports {
+        match &x.connection {
+            ComputeSinkConnection::ContinualTask(x) => {
+                ct_inputs.insert(x.input_id.clone());
+            }
+            _ => continue,
+        }
+    }
+    // tracing::info!("WIP ct_inputs {:?}\n{:?}", ct_inputs, dataflow);
+
     // Mutually recursive view definitions require special handling.
     let recursive = dataflow
         .objects_to_build
@@ -208,6 +221,7 @@ pub fn build_compute_dataflow<A: Allocate>(
         // alternate type signatures.
         let mut imported_sources = Vec::new();
         let mut tokens = BTreeMap::new();
+        let mut ct_input_times = Vec::new();
         scope.clone().region_named(&input_name, |region| {
             // Import declared sources into the rendering context.
             for (source_id, (source, _monotonic)) in dataflow.source_imports.iter() {
@@ -217,23 +231,78 @@ pub fn build_compute_dataflow<A: Allocate>(
                             .expect("Linear operators should always be valid")
                     });
 
-                    // Note: For correctness, we require that sources only emit times advanced by
-                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
-                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
-                        inner,
-                        *source_id,
-                        Arc::clone(&compute_state.persist_clients),
-                        &compute_state.txns_ctx,
-                        &compute_state.worker_config,
-                        source.storage_metadata.clone(),
-                        dataflow.as_of.clone(),
-                        SnapshotMode::Include,
-                        dataflow.until.clone(),
-                        mfp.as_mut(),
-                        compute_state.dataflow_max_inflight_bytes(),
-                        start_signal.clone(),
-                        |error| panic!("compute_import: {error}"),
-                    );
+                    let ct_source = ct_inputs.contains(source_id);
+                    let ct_dataflow = !ct_inputs.is_empty();
+                    let (mut ok_stream, err_stream, token) = if !ct_source {
+                        // Note: For correctness, we require that sources only emit times advanced by
+                        // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
+                        let (ok_stream, err_stream, tokens) = persist_source::persist_source(
+                            inner,
+                            *source_id,
+                            Arc::clone(&compute_state.persist_clients),
+                            &compute_state.txns_ctx,
+                            &compute_state.worker_config,
+                            source.storage_metadata.clone(),
+                            dataflow.as_of.clone(),
+                            SnapshotMode::Include,
+                            dataflow.until.clone(),
+                            mfp.as_mut(),
+                            compute_state.dataflow_max_inflight_bytes(),
+                            start_signal.clone(),
+                            |error| panic!("compute_import: {error}"),
+                        );
+                        let source_id = source_id.to_string();
+                        let ok_stream = ok_stream.inspect(move |(row, ts, diff)| {
+                            if !ct_dataflow {
+                                return;
+                            }
+                            tracing::info!(
+                                "WIP {} read normal\n    {:?} read  {} {:?} {}",
+                                source_id,
+                                ts,
+                                source_id,
+                                row,
+                                diff
+                            )
+                        });
+                        (ok_stream, err_stream, tokens)
+                    } else {
+                        let (ok_stream, err_stream, mut tokens) = persist_source::persist_source(
+                            inner,
+                            *source_id,
+                            Arc::clone(&compute_state.persist_clients),
+                            &compute_state.txns_ctx,
+                            &compute_state.worker_config,
+                            source.storage_metadata.clone(),
+                            dataflow.as_of.clone(),
+                            SnapshotMode::Exclude,
+                            dataflow.until.clone(),
+                            mfp.as_mut(),
+                            compute_state.dataflow_max_inflight_bytes(),
+                            start_signal.clone(),
+                            |error| panic!("compute_import: {error}"),
+                        );
+                        let ok_stream = ok_stream.filter(|(data, ts, diff)| {
+                            if *diff < 0 {
+                                tracing::info!("WIP dropping {:?}", (data, ts, diff));
+                                false
+                            } else {
+                                true
+                            }
+                        });
+                        // WIP reduce this down to one update per timestamp before exchanging.
+                        //
+                        // WIP we have to skip this (or panic?) if the input is an output
+                        let ct_input_time = ok_stream.map(|(_row, ts, _diff)| ((), ts, 1));
+                        ct_input_times
+                            .push(ct_input_time.as_collection().leave_region().leave_region());
+                        let (token, ok_stream) = crate::sink::continual_task::step_backward(
+                            &source_id.to_string(),
+                            Collection::new(ok_stream),
+                        );
+                        tokens.push(token);
+                        (ok_stream.inner, err_stream, tokens)
+                    };
 
                     // If `mfp` is non-identity, we need to apply what remains.
                     // For the moment, assert that it is either trivial or `None`.
@@ -325,6 +394,8 @@ pub fn build_compute_dataflow<A: Allocate>(
                 }
 
                 // Export declared sinks.
+                let ct_input_times =
+                    differential_dataflow::collection::concatenate(scope, ct_input_times);
                 for (sink_id, dependencies, sink) in sinks {
                     context.export_sink(
                         compute_state,
@@ -333,6 +404,7 @@ pub fn build_compute_dataflow<A: Allocate>(
                         sink_id,
                         &sink,
                         start_signal.clone(),
+                        ct_input_times.clone(),
                     );
                 }
             });
@@ -388,6 +460,8 @@ pub fn build_compute_dataflow<A: Allocate>(
                 }
 
                 // Export declared sinks.
+                let ct_input_times =
+                    differential_dataflow::collection::concatenate(scope, ct_input_times);
                 for (sink_id, dependencies, sink) in sinks {
                     context.export_sink(
                         compute_state,
@@ -396,6 +470,7 @@ pub fn build_compute_dataflow<A: Allocate>(
                         sink_id,
                         &sink,
                         start_signal.clone(),
+                        ct_input_times.clone(),
                     );
                 }
             });
diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs
index f6a72eaf20bfe..5667aa12d83b6 100644
--- a/src/compute/src/render/sinks.rs
+++ b/src/compute/src/render/sinks.rs
@@ -47,6 +47,7 @@ where
         sink_id: GlobalId,
         sink: &ComputeSinkDesc<CollectionMetadata>,
         start_signal: StartSignal,
+        ct_input_times: Collection<G, (), Diff>,
     ) {
         soft_assert_or_log!(
             sink.non_null_assertions.is_strictly_sorted(),
@@ -143,6 +144,7 @@ where
                     start_signal,
                     ok_collection.enter_region(inner),
                     err_collection.enter_region(inner),
+                    ct_input_times.enter_region(inner),
                 );
 
                 if let Some(sink_token) = sink_token {
@@ -169,6 +171,8 @@ where
         start_signal: StartSignal,
         sinked_collection: Collection<G, Row, Diff>,
         err_collection: Collection<G, DataflowError, Diff>,
+        // WIP figure out a better way to smuggle this in
+        ct_input_times: Collection<G, (), Diff>,
     ) -> Option<Rc<dyn Any>>;
 }
 
diff --git a/src/compute/src/sink/continual_task.rs b/src/compute/src/sink/continual_task.rs
index 836dfb2332670..642930ce17561 100644
--- a/src/compute/src/sink/continual_task.rs
+++ b/src/compute/src/sink/continual_task.rs
@@ -8,15 +8,32 @@
 // by the Apache License, Version 2.0.
 
 use std::any::Any;
+use std::cell::RefCell;
+use std::collections::BTreeSet;
 use std::rc::Rc;
+use std::sync::Arc;
 
-use differential_dataflow::Collection;
+use differential_dataflow::consolidation::consolidate_updates;
+use differential_dataflow::difference::Semigroup;
+use differential_dataflow::lattice::Lattice;
+use differential_dataflow::{Collection, Hashable};
+use futures::{Future, FutureExt, StreamExt};
 use mz_compute_types::sinks::{ComputeSinkDesc, ContinualTaskConnection};
+use mz_ore::cast::CastFrom;
+use mz_persist_client::write::WriteHandle;
+use mz_persist_client::Diagnostics;
+use mz_persist_types::codec_impls::UnitSchema;
 use mz_repr::{Diff, GlobalId, Row, Timestamp};
 use mz_storage_types::controller::CollectionMetadata;
 use mz_storage_types::errors::DataflowError;
+use mz_storage_types::sources::SourceData;
+use mz_timely_util::builder_async::{
+    Button, Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
+};
+use timely::dataflow::channels::pact::{Exchange, Pipeline};
 use timely::dataflow::Scope;
-use timely::progress::Antichain;
+use timely::progress::frontier::AntichainRef;
+use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
 
 use crate::compute_state::ComputeState;
 use crate::render::sinks::SinkRender;
@@ -29,24 +46,321 @@ where
     fn render_sink(
         &self,
         compute_state: &mut ComputeState,
-        sink: &ComputeSinkDesc<CollectionMetadata>,
+        _sink: &ComputeSinkDesc<CollectionMetadata>,
         sink_id: GlobalId,
-        as_of: Antichain<Timestamp>,
+        _as_of: Antichain<Timestamp>,
         start_signal: StartSignal,
         oks: Collection<G, Row, Diff>,
         errs: Collection<G, DataflowError, Diff>,
+        ct_input_times: Collection<G, (), Diff>,
     ) -> Option<Rc<dyn Any>> {
-        todo!(
-            "WIP {:?}",
-            (
-                std::any::type_name_of_val(&compute_state),
-                sink,
-                sink_id,
-                as_of,
-                std::any::type_name_of_val(&start_signal),
-                std::any::type_name_of_val(&oks),
-                std::any::type_name_of_val(&errs),
-            )
+        let to_append = oks
+            .map(|x| SourceData(Ok(x)))
+            .concat(&errs.map(|x| SourceData(Err(x))));
+
+        // WIP do something with this
+        let active_worker = true;
+        let shared_frontier = Rc::new(RefCell::new(if active_worker {
+            Antichain::from_elem(TimelyTimestamp::minimum())
+        } else {
+            Antichain::new()
+        }));
+        let collection = compute_state.expect_collection_mut(sink_id);
+        collection.sink_write_frontier = Some(Rc::clone(&shared_frontier));
+        // return None;
+
+        let write_handle = {
+            let clients = Arc::clone(&compute_state.persist_clients);
+            let metadata = self.output_metadata.clone();
+            async move {
+                let client = clients
+                    .open(metadata.persist_location)
+                    .await
+                    .expect("valid location");
+                client
+                    .open_writer(
+                        metadata.data_shard,
+                        metadata.relation_desc.into(),
+                        UnitSchema.into(),
+                        Diagnostics {
+                            shard_name: sink_id.to_string(),
+                            handle_purpose: "WIP".into(),
+                        },
+                    )
+                    .await
+                    .expect("codecs should match")
+            }
+        };
+
+        let button = continual_task_sink(
+            &sink_id.to_string(),
+            to_append,
+            ct_input_times,
+            write_handle,
+            start_signal,
         );
+        Some(Rc::new(button.press_on_drop()))
+    }
+}
+
+fn continual_task_sink<G: Scope<Timestamp = Timestamp>>(
+    name: &str,
+    to_append: Collection<G, SourceData, Diff>,
+    append_times: Collection<G, (), Diff>,
+    write_handle: impl Future<Output = WriteHandle<SourceData, (), Timestamp, Diff>> + Send + 'static,
+    start_signal: StartSignal,
+) -> Button {
+    let scope = to_append.scope();
+    let operator_name = format!("ContinualTask({})", name);
+    let mut op = AsyncOperatorBuilder::new(operator_name, scope.clone());
+
+    // TODO(ct): This all works perfectly well data parallel (assuming we
+    // broadcast the append_times). We just need to hook it up to the
+    // multi-worker persist-sink, but that requires some refactoring. This would
+    // also remove the need for this to be an async timely operator.
+    let active_worker = name.hashed();
+    let to_append_input =
+        op.new_disconnected_input(&to_append.inner, Exchange::new(move |_| active_worker));
+    let append_times_input =
+        op.new_disconnected_input(&append_times.inner, Exchange::new(move |_| active_worker));
+
+    let active_worker = usize::cast_from(active_worker) % scope.peers() == scope.index();
+    let button = op.build(move |_capabilities| async move {
+        if !active_worker {
+            return;
+        }
+        let () = start_signal.await;
+        let mut write_handle = write_handle.await;
+
+        #[derive(Debug)]
+        enum OpEvent<C> {
+            ToAppend(Event<Timestamp, C, Vec<(SourceData, Timestamp, Diff)>>),
+            AppendTimes(Event<Timestamp, C, Vec<((), Timestamp, Diff)>>),
+        }
+
+        impl<C: std::fmt::Debug> OpEvent<C> {
+            fn apply(self, state: &mut SinkState<SourceData, (), Timestamp, Diff>) {
+                tracing::info!("WIP event {:?}", self);
+                match self {
+                    // WIP big comment on the step_forward calls
+                    OpEvent::ToAppend(Event::Data(_cap, x)) => state.to_append.extend(
+                        x.into_iter()
+                            .map(|(k, t, d)| ((k, ()), t.step_forward(), d)),
+                    ),
+                    OpEvent::ToAppend(Event::Progress(x)) => {
+                        state.to_append_progress = x.into_option().expect("WIP").step_forward()
+                    }
+                    OpEvent::AppendTimes(Event::Data(_cap, x)) => state
+                        .append_times
+                        .extend(x.into_iter().map(|((), t, _d)| t)),
+                    OpEvent::AppendTimes(Event::Progress(x)) => {
+                        state.append_times_progress = x.into_option().expect("WIP")
+                    }
+                }
+            }
+        }
+
+        let to_insert_input = to_append_input.map(OpEvent::ToAppend);
+        let append_times_input = append_times_input.map(OpEvent::AppendTimes);
+        let mut op_inputs = futures::stream::select(to_insert_input, append_times_input);
+
+        let mut state = SinkState::new();
+        loop {
+            let Some(event) = op_inputs.next().await else {
+                // Inputs exhausted, shutting down.
+                return;
+            };
+            event.apply(&mut state);
+            // Also drain any other events that may be ready.
+            while let Some(Some(event)) = op_inputs.next().now_or_never() {
+                event.apply(&mut state);
+            }
+            tracing::info!("WIP about to process {:?}", state);
+            let Some((new_upper, to_append)) = state.process() else {
+                continue;
+            };
+            tracing::info!("WIP got write {:?}: {:?}", new_upper, to_append);
+
+            let mut expected_upper = write_handle.shared_upper();
+            while expected_upper.less_than(&new_upper) {
+                let res = write_handle
+                    .compare_and_append(
+                        &to_append,
+                        expected_upper.clone(),
+                        Antichain::from_elem(new_upper),
+                    )
+                    .await
+                    .expect("usage was valid");
+                match res {
+                    Ok(()) => {
+                        state.output_progress = new_upper;
+                        break;
+                    }
+                    Err(err) => {
+                        expected_upper = err.current;
+                        continue;
+                    }
+                }
+            }
+        }
+    });
+
+    button
+}
+
+#[derive(Debug)]
+struct SinkState<K, V, T, D> {
+    append_times: BTreeSet<T>,
+    append_times_progress: T,
+    to_append: Vec<((K, V), T, D)>,
+    to_append_progress: T,
+    output_progress: T,
+}
+
+impl<K: Ord, V: Ord, D: Semigroup> SinkState<K, V, Timestamp, D> {
+    fn new() -> Self {
+        SinkState {
+            // The known times at which we're going to write data to the output.
+            // This is guaranteed to include all times < append_times_progress,
+            // except that ones < output_progress may have been truncated.
+            append_times: BTreeSet::new(),
+            append_times_progress: Timestamp::minimum(),
+
+            // The data we've collected to append to the output. This is often
+            // compacted to advancing times and is expected to be ~empty in the
+            // steady state.
+            to_append: Vec::new(),
+            to_append_progress: Timestamp::minimum(),
+
+            // A lower bound on the upper of the output.
+            output_progress: Timestamp::minimum(),
+        }
+    }
+
+    /// Returns data to write to the output, if any, and the new upper to use.
+    ///
+    /// TODO(ct): Remove the Vec allocation here.
+    fn process(&mut self) -> Option<(Timestamp, Vec<((&K, &V), &Timestamp, &D)>)> {
+        // We can only append at times >= the output_progress, so pop off
+        // anything unnecessary.
+        while let Some(x) = self.append_times.first() {
+            // WIP regression test for >= vs >
+            if *x >= self.output_progress {
+                break;
+            }
+            self.append_times.pop_first();
+        }
+
+        // WIP bug: not allowed to return anything from append_times until we
+        // have progress info, because they could arrive out of order
+        let write_ts = match self.append_times.first() {
+            Some(x) if self.output_progress < *x => {
+                // WIP regression test
+                return Some((x.clone(), Vec::new()));
+            }
+            Some(x) => x,
+            // The CT sink's contract is that it only writes data at times we
+            // received an input diff. There are none in `[output_progress,
+            // append_times_progress)`, so we can go ahead and advance the upper
+            // of the output.
+            //
+            // We could instead ensure liveness by basing this off of to_append,
+            // but for any CTs reading the output (expected to be a common case)
+            // we'd end up looping each timestamp through persist one-by-one.
+            None if self.output_progress < self.append_times_progress => {
+                return Some((self.append_times_progress, Vec::new()));
+            }
+            // Nothing to do!
+            None => return None,
+        };
+
+        if self.to_append_progress <= *write_ts {
+            // Don't have all the necessary data yet.
+            return None;
+        }
+
+        // Time to write some data! Produce the collection as of write_ts by
+        // advancing timestamps, consolidating, and filtering out anything at
+        // future timestamps.
+        let as_of = &[write_ts.clone()];
+        for (_, t, _) in &mut self.to_append {
+            t.advance_by(AntichainRef::new(as_of))
+        }
+        consolidate_updates(&mut self.to_append);
+        // WIP resize the vec down if cap >> len?
+
+        let append_data = self
+            .to_append
+            .iter()
+            .filter_map(|((k, v), t, d)| (t <= write_ts).then_some(((k, v), t, d)))
+            .collect();
+        Some((write_ts.step_forward(), append_data))
     }
 }
+
+pub(crate) fn step_backward<G>(
+    name: &str,
+    input: Collection<G, Row, Diff>,
+) -> (PressOnDropButton, Collection<G, Row, Diff>)
+where
+    G: Scope<Timestamp = Timestamp>,
+{
+    let name_orig = name.to_string();
+    let name = format!("StepBackward({})", name);
+    let mut builder = AsyncOperatorBuilder::new(name.clone(), input.scope());
+    let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
+    let (mut output, output_stream) = builder.new_output();
+    let button = builder.build(move |caps| async move {
+        let [mut cap]: [_; 1] = caps.try_into().expect("one capability per output");
+        loop {
+            let Some(event) = input.next().await else {
+                tracing::info!("WIP StepBackward exhausted, shutting down");
+                return;
+            };
+            match event {
+                Event::Data(_data_cap, mut data) => {
+                    // tracing::info!("{} input data {:?}", name, data);
+                    let mut retractions = Vec::new();
+                    for (row, ts, diff) in &mut data {
+                        let orig_ts = *ts;
+                        *ts = ts.step_back().expect("WIP");
+                        let mut negated = *diff;
+                        differential_dataflow::Diff::negate(&mut negated);
+                        tracing::info!(
+                            "WIP {} read ct input\n    {:?} read  {} {:?} {}\n    {:?} diff  {} {:?} {}\n    {:?} diff  {} {:?} {}",
+                            name_orig,
+                            orig_ts,
+                            name_orig,
+                            row,
+                            diff,
+                            ts,
+                            name_orig,
+                            row,
+                            diff,
+                            orig_ts,
+                            name_orig,
+                            row,
+                            negated,
+                        );
+                        retractions.push((row.clone(), orig_ts, negated));
+                    }
+                    tracing::debug!("{} emitting data {:?}", name, data);
+                    output.give_container(&cap, &mut data);
+                    output.give_container(&cap, &mut retractions);
+                }
+                Event::Progress(new_progress) => {
+                    let Some(new_progress) = new_progress.into_option() else {
+                        return;
+                    };
+                    let new_progress = new_progress.step_back().expect("WIP");
+                    if cap.time() < &new_progress {
+                        tracing::debug!("{} downgrading cap to {:?}", name, new_progress);
+                        cap.downgrade(&new_progress);
+                    }
+                }
+            }
+        }
+    });
+
+    (button.press_on_drop(), Collection::new(output_stream))
+}
diff --git a/src/compute/src/sink/copy_to_s3_oneshot.rs b/src/compute/src/sink/copy_to_s3_oneshot.rs
index 9c0a9cebe5eaa..dfc6917970c03 100644
--- a/src/compute/src/sink/copy_to_s3_oneshot.rs
+++ b/src/compute/src/sink/copy_to_s3_oneshot.rs
@@ -45,6 +45,7 @@ where
         _start_signal: StartSignal,
         sinked_collection: Collection<G, Row, Diff>,
         err_collection: Collection<G, DataflowError, Diff>,
+        _: Collection<G, (), Diff>,
     ) -> Option<Rc<dyn Any>> {
         // An encapsulation of the copy to response protocol.
         // Used to send rows and errors if this fails.
diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs
index a2ee9846e4349..64f5a58a59a96 100644
--- a/src/compute/src/sink/persist_sink.rs
+++ b/src/compute/src/sink/persist_sink.rs
@@ -60,6 +60,7 @@ where
         start_signal: StartSignal,
         mut ok_collection: Collection<G, Row, Diff>,
         mut err_collection: Collection<G, DataflowError, Diff>,
+        _: Collection<G, (), Diff>,
     ) -> Option<Rc<dyn Any>> {
         // Attach a probe reporting the compute frontier.
         // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
diff --git a/src/compute/src/sink/subscribe.rs b/src/compute/src/sink/subscribe.rs
index 9768a09ba976b..7f72888780231 100644
--- a/src/compute/src/sink/subscribe.rs
+++ b/src/compute/src/sink/subscribe.rs
@@ -42,6 +42,7 @@ where
         _start_signal: StartSignal,
         sinked_collection: Collection<G, Row, Diff>,
         err_collection: Collection<G, DataflowError, Diff>,
+        _: Collection<G, (), Diff>,
     ) -> Option<Rc<dyn Any>> {
         // An encapsulation of the Subscribe response protocol.
         // Used to send rows and progress messages,
diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs
index 75800afe84e13..17b2a612b2466 100644
--- a/src/sql/src/names.rs
+++ b/src/sql/src/names.rs
@@ -22,7 +22,7 @@ use mz_ore::str::StrExt;
 use mz_repr::role_id::RoleId;
 use mz_repr::ColumnName;
 use mz_repr::GlobalId;
-use mz_sql_parser::ast::Expr;
+use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr};
 use mz_sql_parser::ident;
 use proptest_derive::Arbitrary;
 use serde::{Deserialize, Serialize};
@@ -1569,6 +1569,17 @@ impl<'a> Fold<Raw, Aug> for NameResolver<'a> {
         result
     }
 
+    fn fold_create_continual_task_statement(
+        &mut self,
+        stmt: CreateContinualTaskStatement<Raw>,
+    ) -> CreateContinualTaskStatement<Aug> {
+        // TODO(ct): Insert a fake CTE so that using the name of the continual
+        // task in the inserts and deletes resolves.
+        let cte_name = normalize::ident(stmt.name.0.last().expect("WIP").clone());
+        self.ctes.insert(cte_name, LocalId::new(0));
+        mz_sql_parser::ast::fold::fold_create_continual_task_statement(self, stmt)
+    }
+
     fn fold_cte_id(&mut self, _id: <Raw as AstInfo>::CteId) -> <Aug as AstInfo>::CteId {
         panic!("this should have been handled when walking the CTE");
     }
diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs
index abb15ef31e3e2..bf74618a378f2 100644
--- a/src/sql/src/normalize.rs
+++ b/src/sql/src/normalize.rs
@@ -21,12 +21,13 @@ use mz_repr::{ColumnName, GlobalId};
 use mz_sql_parser::ast::display::AstDisplay;
 use mz_sql_parser::ast::visit_mut::{self, VisitMut};
 use mz_sql_parser::ast::{
-    CreateConnectionStatement, CreateContinualTaskStatement, CreateIndexStatement,
-    CreateMaterializedViewStatement, CreateSecretStatement, CreateSinkStatement,
-    CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement,
-    CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement,
-    CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, MutRecBlock, Op, Query, Statement,
-    TableFactor, UnresolvedItemName, UnresolvedSchemaName, Value, ViewDefinition,
+    ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
+    CreateIndexStatement, CreateMaterializedViewStatement, CreateSecretStatement,
+    CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
+    CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
+    CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
+    MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName,
+    Value, ViewDefinition,
 };
 
 use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
@@ -414,12 +415,23 @@ pub fn create_statement(
 
         Statement::CreateContinualTask(CreateContinualTaskStatement {
             name,
-            columns,
-            input,
+            // WIP do we need to normalize columns and input?
+            columns: _,
+            input: _,
             stmts,
-            in_cluster,
+            in_cluster: _,
         }) => {
-            todo!("WIP {:?}", (name, columns, input, stmts, in_cluster));
+            *name = allocate_name(name)?;
+            for stmt in stmts {
+                let mut normalizer = QueryNormalizer::new();
+                match stmt {
+                    ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
+                    ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
+                }
+                if let Some(err) = normalizer.err {
+                    return Err(err);
+                }
+            }
         }
 
         Statement::CreateIndex(CreateIndexStatement {
diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs
index dbaa76c5d4c66..4ab3f51d2c2bc 100644
--- a/src/sql/src/plan.rs
+++ b/src/sql/src/plan.rs
@@ -700,7 +700,14 @@ pub struct CreateMaterializedViewPlan {
 }
 
 #[derive(Debug, Clone)]
-pub struct CreateContinualTaskPlan {}
+pub struct CreateContinualTaskPlan {
+    pub name: QualifiedItemName,
+    pub desc: RelationDesc,
+    // TODO(ct): Multiple inputs.
+    pub input_id: GlobalId,
+    pub continual_task: MaterializedView,
+    // TODO(ct): replace, drop_ids, if_not_exists
+}
 
 #[derive(Debug, Clone)]
 pub struct CreateIndexPlan {
diff --git a/src/sql/src/plan/query.rs b/src/sql/src/plan/query.rs
index 1dad7b41c41ea..5303002e40a11 100644
--- a/src/sql/src/plan/query.rs
+++ b/src/sql/src/plan/query.rs
@@ -165,6 +165,57 @@ pub fn plan_root_query(
     })
 }
 
+/// TODO(ct): Dedup this with [plan_root_query].
+#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
+pub fn plan_ct_query(
+    mut qcx: &mut QueryContext,
+    mut query: Query<Aug>,
+) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
+    transform_ast::transform(qcx.scx, &mut query)?;
+    let PlannedQuery {
+        mut expr,
+        scope,
+        order_by,
+        limit,
+        offset,
+        project,
+        group_size_hints,
+    } = plan_query(&mut qcx, &query)?;
+
+    let mut finishing = RowSetFinishing {
+        limit,
+        offset,
+        project,
+        order_by,
+    };
+
+    // Attempt to push the finishing's ordering past its projection. This allows
+    // data to be projected down on the workers rather than the coordinator. It
+    // also improves the optimizer's demand analysis, as the optimizer can only
+    // reason about demand information in `expr` (i.e., it can't see
+    // `finishing.project`).
+    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
+
+    expr.finish_maintained(&mut finishing, group_size_hints);
+
+    let typ = qcx.relation_type(&expr);
+    let typ = RelationType::new(
+        finishing
+            .project
+            .iter()
+            .map(|i| typ.column_types[*i].clone())
+            .collect(),
+    );
+    let desc = RelationDesc::new(typ, scope.column_names());
+
+    Ok(PlannedRootQuery {
+        expr,
+        desc,
+        finishing,
+        scope,
+    })
+}
+
 /// Attempts to push a projection through an order by.
 ///
 /// The returned bool indicates whether the pushdown was successful or not.
@@ -6083,8 +6134,8 @@ impl QueryLifetime {
 /// Description of a CTE sufficient for query planning.
 #[derive(Debug, Clone)]
 pub struct CteDesc {
-    name: String,
-    desc: RelationDesc,
+    pub name: String,
+    pub desc: RelationDesc,
 }
 
 /// The state required when planning a `Query`.
diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs
index 029f19ba26035..a309ace92badf 100644
--- a/src/sql/src/plan/statement.rs
+++ b/src/sql/src/plan/statement.rs
@@ -17,9 +17,8 @@ use std::collections::{BTreeMap, BTreeSet};
 use mz_repr::namespaces::is_system_schema;
 use mz_repr::{ColumnType, GlobalId, RelationDesc, ScalarType};
 use mz_sql_parser::ast::{
-    ColumnDef, ColumnName, ConnectionDefaultAwsPrivatelink, CreateMaterializedViewStatement,
-    RawItemName, ShowStatement, StatementKind, TableConstraint, UnresolvedDatabaseName,
-    UnresolvedSchemaName,
+    ColumnDef, ColumnName, ConnectionDefaultAwsPrivatelink, RawItemName, ShowStatement,
+    StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName,
 };
 use mz_storage_types::connections::inline::ReferencedConnection;
 use mz_storage_types::connections::{AwsPrivatelink, Connection, SshTunnel, Tunnel};
@@ -31,9 +30,9 @@ use crate::catalog::{
 };
 use crate::names::{
     self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
-    QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
-    ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
-    SystemObjectId,
+    QualifiedItemName, RawDatabaseSpecifier, ResolvedClusterName, ResolvedColumnReference,
+    ResolvedDataType, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName,
+    SchemaSpecifier, SystemObjectId,
 };
 use crate::normalize;
 use crate::plan::error::PlanError;
@@ -988,9 +987,9 @@ impl<'a> StatementContext<'a> {
 
 pub fn resolve_cluster_for_materialized_view<'a>(
     catalog: &'a dyn SessionCatalog,
-    stmt: &CreateMaterializedViewStatement<Aug>,
+    in_cluster: Option<&ResolvedClusterName>,
 ) -> Result<ClusterId, PlanError> {
-    Ok(match &stmt.in_cluster {
+    Ok(match in_cluster {
         None => catalog.resolve_cluster(None)?.id(),
         Some(in_cluster) => in_cluster.id,
     })
diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs
index fbd799f92b72c..c361ac4f64f9c 100644
--- a/src/sql/src/plan/statement/ddl.rs
+++ b/src/sql/src/plan/statement/ddl.rs
@@ -22,7 +22,7 @@ use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_
 use mz_controller_types::{
     is_cluster_size_v2, ClusterId, ReplicaId, DEFAULT_REPLICA_LOGGING_INTERVAL,
 };
-use mz_expr::{CollectionPlan, UnmaterializableFunc};
+use mz_expr::{CollectionPlan, LocalId, UnmaterializableFunc};
 use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
 use mz_ore::cast::{CastFrom, TryCastFrom};
 use mz_ore::collections::{CollectionExt, HashSet};
@@ -122,7 +122,9 @@ use crate::names::{
 };
 use crate::normalize::{self, ident};
 use crate::plan::error::PlanError;
-use crate::plan::query::{plan_expr, scalar_type_from_catalog, ExprContext, QueryLifetime};
+use crate::plan::query::{
+    plan_expr, scalar_type_from_catalog, scalar_type_from_sql, CteDesc, ExprContext, QueryLifetime,
+};
 use crate::plan::scope::Scope;
 use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
 use crate::plan::statement::{scl, StatementContext, StatementDesc};
@@ -137,10 +139,10 @@ use crate::plan::{
     AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan,
     ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan,
     CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
-    CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
-    CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan,
-    CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, DropObjectsPlan,
-    DropOwnedPlan, FullItemName, Index, Ingestion, MaterializedView, Params, Plan,
+    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
+    CreateMaterializedViewPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
+    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
+    DropObjectsPlan, DropOwnedPlan, FullItemName, Index, Ingestion, MaterializedView, Params, Plan,
     PlanClusterOption, PlanNotice, QueryContext, ReplicaConfig, Secret, Sink, Source, Table,
     TableDataSource, Type, VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters,
     WebhookHeaders, WebhookValidation,
@@ -2261,8 +2263,10 @@ pub fn plan_create_materialized_view(
     mut stmt: CreateMaterializedViewStatement<Aug>,
     params: &Params,
 ) -> Result<Plan, PlanError> {
-    let cluster_id =
-        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
+    let cluster_id = crate::plan::statement::resolve_cluster_for_materialized_view(
+        scx.catalog,
+        stmt.in_cluster.as_ref(),
+    )?;
     stmt.in_cluster = Some(ResolvedClusterName {
         id: cluster_id,
         print_name: None,
@@ -2530,10 +2534,175 @@ generate_extracted_config!(
 
 pub fn plan_create_continual_task(
     scx: &StatementContext,
-    stmt: CreateContinualTaskStatement<Aug>,
+    mut stmt: CreateContinualTaskStatement<Aug>,
     params: &Params,
 ) -> Result<Plan, PlanError> {
-    todo!("WIP {:?}", (scx, stmt, params));
+    let cluster_id = crate::plan::statement::resolve_cluster_for_materialized_view(
+        scx.catalog,
+        stmt.in_cluster.as_ref(),
+    )?;
+    stmt.in_cluster = Some(ResolvedClusterName {
+        id: cluster_id,
+        print_name: None,
+    });
+
+    let create_sql =
+        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
+
+    let fake_cte_name = normalize::ident(stmt.name.0.last().expect("WIP").clone());
+    let partial_name = normalize::unresolved_item_name(stmt.name)?;
+    let name = scx.allocate_qualified_name(partial_name.clone())?;
+    let desc = {
+        let mut desc_columns = Vec::with_capacity(stmt.columns.capacity());
+        for col in stmt.columns.iter() {
+            desc_columns.push((
+                normalize::column_name(col.name.clone()),
+                ColumnType {
+                    scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
+                    nullable: true,
+                },
+            ));
+        }
+        RelationDesc::from_names_and_types(desc_columns)
+    };
+    let input = scx.get_item_by_resolved_name(&stmt.input)?;
+
+    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
+    qcx.ctes.insert(
+        LocalId::new(0),
+        CteDesc {
+            name: fake_cte_name.clone(),
+            desc: desc.clone(),
+        },
+    );
+
+    let mut exprs = Vec::new();
+    let mut has_write = BTreeSet::new();
+    for stmt in &stmt.stmts {
+        let query = continual_task_query(&fake_cte_name, &mut has_write, stmt)
+            .ok_or_else(|| sql_err!("TODO(ct)"))?;
+        let query::PlannedRootQuery {
+            mut expr,
+            desc: _,
+            finishing,
+            scope: _,
+        } = query::plan_ct_query(&mut qcx, query)?;
+        // We get back a trivial finishing, see comment in `plan_view`.
+        assert!(finishing.is_trivial(expr.arity()));
+        // TODO(ct): Is this right?
+        expr.bind_parameters(params)?;
+        // TODO(ct): Validate the planned desc.
+        match stmt {
+            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
+            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
+        }
+    }
+    // TODO(ct): Collect things by output and assert that there is only one (or
+    // support multiple outputs).
+    let expr = exprs
+        .into_iter()
+        .reduce(|acc, expr| acc.union(expr))
+        .ok_or_else(|| sql_err!("TODO(ct)"))?;
+
+    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
+    if let Some(dup) = column_names.iter().duplicates().next() {
+        sql_bail!("column {} specified more than once", dup.as_str().quoted());
+    }
+
+    // Check for an object in the catalog with this same name
+    let full_name = scx.catalog.resolve_full_name(&name);
+    let partial_name = PartialItemName::from(full_name.clone());
+    // For PostgreSQL compatibility, we need to prevent creating this when there
+    // is an existing object *or* type of the same name.
+    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
+        return Err(PlanError::ItemAlreadyExists {
+            name: full_name.to_string(),
+            item_type: item.item_type(),
+        });
+    }
+
+    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
+        name,
+        desc,
+        input_id: input.id(),
+        continual_task: MaterializedView {
+            create_sql,
+            expr,
+            column_names,
+            cluster_id,
+            non_null_assertions: Vec::new(),
+            compaction_window: None,
+            refresh_schedule: None,
+            as_of: None,
+        },
+    }))
+}
+
+fn continual_task_query<'a>(
+    fake_cte_name: &str,
+    has_write: &mut BTreeSet<&'a ResolvedItemName>,
+    stmt: &'a ast::ContinualTaskStmt<Aug>,
+) -> Option<ast::Query<Aug>> {
+    match stmt {
+        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
+            table_name,
+            columns,
+            source,
+            returning,
+        }) => {
+            // Inserts are blind writes so don't care if we already have a
+            // write.
+            has_write.insert(table_name);
+            if !columns.is_empty() || !returning.is_empty() {
+                return None;
+            }
+            match source {
+                ast::InsertSource::Query(query) => Some(query.clone()),
+                ast::InsertSource::DefaultValues => None,
+            }
+        }
+        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
+            table_name,
+            alias,
+            using,
+            selection,
+        }) => {
+            if has_write.contains(table_name) || !using.is_empty() {
+                return None;
+            }
+            has_write.insert(table_name);
+            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
+            let from = ast::TableWithJoins {
+                relation: ast::TableFactor::Table {
+                    // TODO(ct): Huge hack.
+                    name: ResolvedItemName::Cte {
+                        id: LocalId::new(0),
+                        name: fake_cte_name.to_owned(),
+                    },
+                    alias: alias.clone(),
+                },
+                joins: Vec::new(),
+            };
+            let select = ast::Select {
+                from: vec![from],
+                selection: selection.clone(),
+                distinct: None,
+                projection: vec![ast::SelectItem::Wildcard],
+                group_by: Vec::new(),
+                having: None,
+                options: Vec::new(),
+            };
+            let query = ast::Query {
+                ctes: ast::CteBlock::Simple(Vec::new()),
+                body: ast::SetExpr::Select(Box::new(select)),
+                order_by: Vec::new(),
+                limit: None,
+                offset: None,
+            };
+            // Then negate it to turn it into retractions (after planning it).
+            Some(query)
+        }
+    }
 }
 
 pub fn describe_create_sink(
diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs
index 2381d57088518..a813c3474be75 100644
--- a/src/sql/src/rbac.rs
+++ b/src/sql/src/rbac.rs
@@ -601,7 +601,27 @@ fn generate_rbac_requirements(
             item_usage: &CREATE_ITEM_USAGE,
             ..Default::default()
         },
-        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {}) => todo!("WIP"),
+        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
+            name,
+            desc: _,
+            input_id: _,
+            continual_task,
+        }) => RbacRequirements {
+            privileges: vec![
+                (
+                    SystemObjectId::Object(name.qualifiers.clone().into()),
+                    AclMode::CREATE,
+                    role_id,
+                ),
+                (
+                    SystemObjectId::Object(continual_task.cluster_id.into()),
+                    AclMode::CREATE,
+                    role_id,
+                ),
+            ],
+            item_usage: &CREATE_ITEM_USAGE,
+            ..Default::default()
+        },
         Plan::CreateIndex(plan::CreateIndexPlan {
             name,
             index,
diff --git a/test/sqllogictest/ct_audit_log.slt b/test/sqllogictest/ct_audit_log.slt
new file mode 100644
index 0000000000000..b247263375fd7
--- /dev/null
+++ b/test/sqllogictest/ct_audit_log.slt
@@ -0,0 +1,48 @@
+# Copyright Materialize, Inc. and contributors. All rights reserved.
+#
+# Use of this software is governed by the Business Source License
+# included in the LICENSE file at the root of this repository.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0.
+
+mode cockroach
+
+statement ok
+create table mv_input (key int8)
+
+statement ok
+insert into mv_input values (1);
+
+statement ok
+create materialized view anomalies as select count(*)::int8 from mv_input;
+
+query I
+select * from anomalies
+----
+1
+
+statement ok
+CREATE CONTINUAL TASK audit_log (count int8) ON INPUT anomalies AS (
+    INSERT INTO audit_log SELECT * from anomalies;
+)
+
+query I
+select * from audit_log
+----
+1
+
+statement ok
+insert into mv_input values (2), (3)
+
+query I
+select * from anomalies
+----
+3
+
+query I
+select * from audit_log
+----
+1
+3
diff --git a/test/sqllogictest/ct_stream_table_join.slt b/test/sqllogictest/ct_stream_table_join.slt
new file mode 100644
index 0000000000000..4f14f4f58c957
--- /dev/null
+++ b/test/sqllogictest/ct_stream_table_join.slt
@@ -0,0 +1,44 @@
+# Copyright Materialize, Inc. and contributors. All rights reserved.
+#
+# Use of this software is governed by the Business Source License
+# included in the LICENSE file at the root of this repository.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0.
+
+mode cockroach
+
+statement ok
+create table big (key int)
+
+statement ok
+create table small (key int, val string)
+
+statement ok
+insert into small values (1, 'v0')
+
+statement ok
+CREATE CONTINUAL TASK stj (key int, val int) ON INPUT big AS (
+    INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key;
+)
+
+statement ok
+insert into big values (1)
+
+query IT
+select * from stj
+----
+1 v0
+
+statement ok
+update small set val = 'v1'
+
+statement ok
+insert into big values (1)
+
+query IT
+select * from stj
+----
+1 v0
+1 v1
diff --git a/test/sqllogictest/ct_upsert.slt b/test/sqllogictest/ct_upsert.slt
new file mode 100644
index 0000000000000..80ba7cc4567a2
--- /dev/null
+++ b/test/sqllogictest/ct_upsert.slt
@@ -0,0 +1,36 @@
+# Copyright Materialize, Inc. and contributors. All rights reserved.
+#
+# Use of this software is governed by the Business Source License
+# included in the LICENSE file at the root of this repository.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0.
+
+mode cockroach
+
+statement ok
+create table append_only (key int, val int)
+
+statement ok
+CREATE CONTINUAL TASK upsert (key int, val int) ON INPUT append_only AS (
+    DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
+    INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
+)
+
+statement ok
+insert into append_only values (1, 2), (1, 1)
+
+query II
+select * from upsert
+----
+1 2
+
+statement ok
+insert into append_only values (1, 3), (2, 4)
+
+query IT
+select * from upsert
+----
+1 3
+2 4