Skip to content

Commit

Permalink
ct: add ::ContinualTask various adapter enums
Browse files Browse the repository at this point in the history
May as well rip off the band-aid and get it over with.
  • Loading branch information
danhhz committed Sep 19, 2024
1 parent 7b492cc commit 4e276b6
Show file tree
Hide file tree
Showing 28 changed files with 1,219 additions and 75 deletions.
2 changes: 2 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType
CommentObjectId::Schema(_) => ObjectType::Schema,
CommentObjectId::Cluster(_) => ObjectType::Cluster,
CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
}
}

Expand Down Expand Up @@ -1347,6 +1348,7 @@ pub(crate) fn system_object_type_to_audit_object_type(
mz_sql::catalog::ObjectType::Database => ObjectType::Database,
mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
mz_sql::catalog::ObjectType::Func => ObjectType::Func,
mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
},
SystemObjectType::System => ObjectType::System,
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
let mut builtin_index_additions = Vec::new();
for (builtin_item_update, ts, diff) in builtin_item_updates {
match &builtin_item_update.description.object_type {
CatalogItemType::Index => push_update(
CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
StateUpdate {
kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
ts,
Expand Down
57 changes: 52 additions & 5 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, DataSourceDesc, Func,
Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, ContinualTask,
DataSourceDesc, Func, Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{
Expand Down Expand Up @@ -669,6 +669,9 @@ impl CatalogState {
CatalogItem::Connection(connection) => self.pack_connection_update(
id, oid, schema_id, name, owner_id, privileges, connection, diff,
),
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
id, oid, schema_id, name, owner_id, privileges, ct, diff,
),
};

if !entry.item().is_temporary() {
Expand Down Expand Up @@ -1253,8 +1256,6 @@ impl CatalogState {
query_string.push(';');
query_string
}
// TODO(ct): Remove.
Statement::CreateContinualTask(_) => "TODO(ct)".into(),
_ => unreachable!(),
};

Expand Down Expand Up @@ -1338,6 +1339,51 @@ impl CatalogState {
updates
}

fn pack_continual_task_update(
&self,
id: GlobalId,
oid: u32,
schema_id: &SchemaSpecifier,
name: &str,
owner_id: &RoleId,
privileges: Datum,
ct: &ContinualTask,
diff: Diff,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let create_stmt = mz_sql::parse::parse(&ct.create_sql)
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
ct.create_sql, e
)
})
.into_element()
.ast;
let query_string = "TODO(ct)";

let mut updates = Vec::new();

updates.push(BuiltinTableUpdate {
// TODO(ct): MZ_CONTINUAL_TASKS
id: &*MZ_MATERIALIZED_VIEWS,
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Datum::String(&schema_id.to_string()),
Datum::String(name),
Datum::String(&ct.cluster_id.to_string()),
Datum::String(&query_string),
Datum::String(&owner_id.to_string()),
privileges,
Datum::String(&ct.create_sql),
Datum::String(&create_stmt.to_ast_string_redacted()),
]),
diff,
});

updates
}

fn pack_sink_update(
&self,
id: GlobalId,
Expand Down Expand Up @@ -2025,7 +2071,8 @@ impl CatalogState {
| CommentObjectId::Func(global_id)
| CommentObjectId::Connection(global_id)
| CommentObjectId::Secret(global_id)
| CommentObjectId::Type(global_id) => global_id.to_string(),
| CommentObjectId::Type(global_id)
| CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
CommentObjectId::Role(role_id) => role_id.to_string(),
CommentObjectId::Database(database_id) => database_id.to_string(),
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ impl CatalogState {
| CommentObjectId::Func(global_id)
| CommentObjectId::Connection(global_id)
| CommentObjectId::Type(global_id)
| CommentObjectId::Secret(global_id) => {
| CommentObjectId::Secret(global_id)
| CommentObjectId::ContinualTask(global_id) => {
let entry = self.entry_by_id.get(&global_id);
match entry {
None => comment_inconsistencies
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,8 @@ impl Catalog {
}
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::MaterializedView(_) => {
| CatalogItem::MaterializedView(_)
| CatalogItem::ContinualTask(_) => {
// Storage objects don't have any external objects to drop.
}
CatalogItem::Sink(_) => {
Expand Down
49 changes: 20 additions & 29 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@ use mz_catalog::builtin::{
use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, DataSourceDesc,
Database, DefaultPrivileges, Index, MaterializedView, Role, Schema, Secret, Sink, Source,
Table, TableDataSource, Type, View,
CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, ContinualTask,
DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView, Role, Schema, Secret,
Sink, Source, Table, TableDataSource, Type, View,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
UnmanagedReplicaLocation,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NOW_ZERO;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -312,7 +311,8 @@ impl CatalogState {
CatalogItem::Log(_) => out.push(id),
item @ (CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Connection(_)) => {
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_)) => {
// TODO(jkosh44) Unclear if this table wants to include all uses or only references.
for id in &item.references().0 {
self.introspection_dependencies_inner(*id, out);
Expand Down Expand Up @@ -951,27 +951,14 @@ impl CatalogState {
desc,
continual_task,
..
}) => {
// TODO(ct): Figure out how to make this survive restarts. The
// expr we saved still had the LocalId placeholders for the
// output, but we don't have access to the real Id here.
let optimized_expr = OptimizedMirRelationExpr::declare_optimized(
mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()),
);
// TODO(ct): CatalogItem::ContinualTask
CatalogItem::MaterializedView(MaterializedView {
create_sql: continual_task.create_sql,
raw_expr: Arc::new(continual_task.expr.clone()),
optimized_expr: Arc::new(optimized_expr),
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
non_null_assertions: continual_task.non_null_assertions,
custom_logical_compaction_window: continual_task.compaction_window,
refresh_schedule: continual_task.refresh_schedule,
initial_as_of: continual_task.as_of.map(Antichain::from_elem),
})
}
}) => CatalogItem::ContinualTask(ContinualTask {
create_sql: continual_task.create_sql,
raw_expr: Arc::new(continual_task.expr.clone()),
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
initial_as_of: continual_task.as_of.map(Antichain::from_elem),
}),
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
create_sql: index.create_sql,
on: index.on,
Expand Down Expand Up @@ -1335,7 +1322,8 @@ impl CatalogState {
| CatalogItemType::MaterializedView
| CatalogItemType::Index
| CatalogItemType::Secret
| CatalogItemType::Connection => schema.items[builtin.name()].clone(),
| CatalogItemType::Connection
| CatalogItemType::ContinualTask => schema.items[builtin.name()].clone(),
}
}

Expand Down Expand Up @@ -1745,6 +1733,7 @@ impl CatalogState {
CatalogItemType::Connection => CommentObjectId::Connection(global_id),
CatalogItemType::Type => CommentObjectId::Type(global_id),
CatalogItemType::Secret => CommentObjectId::Secret(global_id),
CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(global_id),
}
}
ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
Expand Down Expand Up @@ -2102,7 +2091,8 @@ impl CatalogState {
| CommentObjectId::Func(id)
| CommentObjectId::Connection(id)
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id) => Some(*id),
| CommentObjectId::Secret(id)
| CommentObjectId::ContinualTask(id) => Some(*id),
CommentObjectId::Role(_)
| CommentObjectId::Database(_)
| CommentObjectId::Schema(_)
Expand Down Expand Up @@ -2130,7 +2120,8 @@ impl CatalogState {
| CommentObjectId::Func(id)
| CommentObjectId::Connection(id)
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id) => {
| CommentObjectId::Secret(id)
| CommentObjectId::ContinualTask(id) => {
let item = self.get_entry(&id);
let name = self.resolve_full_name(item.name(), Some(conn_id));
name.to_string()
Expand Down
33 changes: 33 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,39 @@ impl Coordinator {
);
}
}
CatalogItem::ContinualTask(ct) => {
policies_to_set
.entry(policy.expect("continual tasks have a compaction window"))
.or_insert_with(Default::default)
.storage_ids
.insert(entry.id());

let mut df_desc = self
.catalog()
.try_get_physical_plan(&entry.id())
.expect("added in `bootstrap_dataflow_plans`")
.clone();

if let Some(initial_as_of) = ct.initial_as_of.clone() {
df_desc.set_initial_as_of(initial_as_of);
}

let df_meta = self
.catalog()
.try_get_dataflow_metainfo(&entry.id())
.expect("added in `bootstrap_dataflow_plans`");

if self.catalog().state().system_config().enable_mz_notices() {
// Collect optimization hint updates.
self.catalog().state().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
1,
);
}

self.ship_dataflow(df_desc, ct.cluster_id, None).await;
}
// Nothing to do for these cases
CatalogItem::Log(_)
| CatalogItem::Type(_)
Expand Down
11 changes: 10 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,10 @@ impl Coordinator {
CatalogItem::Secret(_) => {
new_secrets += 1;
}
CatalogItem::ContinualTask(_) => {
// TODO(ct): Give CTs their own limit?
new_materialized_views += 1;
}
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
Expand Down Expand Up @@ -1458,6 +1462,10 @@ impl Coordinator {
CatalogItem::Secret(_) => {
new_secrets -= 1;
}
CatalogItem::ContinualTask(_) => {
// TODO(ct): Give CTs their own limit?
new_materialized_views -= 1;
}
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
Expand Down Expand Up @@ -1492,7 +1500,8 @@ impl Coordinator {
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_) => {}
| CatalogItem::Func(_)
| CatalogItem::ContinualTask(_) => {}
},
Op::AlterRole { .. }
| Op::AlterRetainHistory { .. }
Expand Down
6 changes: 4 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2605,7 +2605,7 @@ impl Coordinator {
}
}
match entry.item().typ() {
typ @ (Func | View | MaterializedView) => {
typ @ (Func | View | MaterializedView | ContinualTask) => {
ids_to_check.extend(entry.uses());
let valid_id = id.is_user() || matches!(typ, Func);
valid_id
Expand Down Expand Up @@ -2942,7 +2942,9 @@ impl Coordinator {
}];
self.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
let cluster = match coord.catalog().get_entry(&plan.id).item() {
CatalogItem::Table(_) | CatalogItem::MaterializedView(_) => None,
CatalogItem::Table(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::ContinualTask(_) => None,
CatalogItem::Index(index) => Some(index.cluster_id),
CatalogItem::Source(_) => {
let read_policies = coord.catalog().source_read_policies(plan.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use std::sync::Arc;

use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource,
CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
};
use mz_compute_types::sinks::{
ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection,
Expand Down Expand Up @@ -161,19 +161,15 @@ impl Coordinator {
let ops = vec![catalog::Op::CreateItem {
id: sink_id,
name: name.clone(),
item: CatalogItem::MaterializedView(MaterializedView {
item: CatalogItem::ContinualTask(ContinualTask {
// TODO(ct): This doesn't give the `DELETE FROM` / `INSERT INTO`
// names the `[u1 AS "materialize"."public"."append_only"]`
// style expansion. Bug?
create_sql,
raw_expr: Arc::new(raw_expr),
optimized_expr: Arc::new(local_mir_plan.expr()),
desc: desc.clone(),
resolved_ids,
cluster_id,
non_null_assertions: Vec::new(),
custom_logical_compaction_window: None,
refresh_schedule,
initial_as_of: Some(as_of.clone()),
}),
owner_id: *session.current_role_id(),
Expand Down
11 changes: 9 additions & 2 deletions src/adapter/src/coord/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use chrono::{DateTime, Utc};
use futures::Future;
use itertools::Itertools;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::{CatalogItem, MaterializedView, View};
use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
use mz_compute_types::ComputeInstanceId;
use mz_expr::CollectionPlan;
use mz_ore::collections::CollectionExt;
Expand Down Expand Up @@ -333,7 +333,8 @@ impl Coordinator {
match entry.item() {
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::MaterializedView(_) => {
| CatalogItem::MaterializedView(_)
| CatalogItem::ContinualTask(_) => {
id_bundle.storage_ids.insert(entry.id());
}
CatalogItem::Index(index) => {
Expand Down Expand Up @@ -460,6 +461,12 @@ impl Coordinator {
timelines.insert(TimelineContext::TimestampDependent);
ids.extend(optimized_expr.depends_on());
}
CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
// See comment in MaterializedView
timelines.insert(TimelineContext::TimestampDependent);
// TODO(ct): optimized_expr?
ids.extend(raw_expr.depends_on());
}
CatalogItem::Table(table) => {
timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
}
Expand Down
Loading

0 comments on commit 4e276b6

Please sign in to comment.