Skip to content

Commit

Permalink
Merge pull request #29675 from danhhz/ct_todos_adapter
Browse files Browse the repository at this point in the history
ct: add `::ContinualTask` to various adapter enums
  • Loading branch information
danhhz authored Sep 25, 2024
2 parents 316fc3e + 2b2c6ba commit 01e0078
Show file tree
Hide file tree
Showing 43 changed files with 1,499 additions and 84 deletions.
2 changes: 2 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_activity_log_thinned -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_cluster_workload_classes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_error_counts_raw_unified -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_continual_tasks -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_activity_log_redacted -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_activity_log_thinned -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_aggregates -->
Expand All @@ -1153,6 +1154,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_cluster_replicas -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_columns -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_connections -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_continual_tasks -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_databases -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_indexes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_materialized_views -->
Expand Down
7 changes: 7 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,11 @@ impl Catalog {
.filter(|role| role.is_user())
}

pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
self.entries()
.filter(|entry| entry.is_continual_task() && entry.id().is_user())
}

pub fn system_privileges(&self) -> &PrivilegeMap {
&self.state.system_privileges
}
Expand Down Expand Up @@ -1308,6 +1313,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType
CommentObjectId::Schema(_) => ObjectType::Schema,
CommentObjectId::Cluster(_) => ObjectType::Cluster,
CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
}
}

Expand Down Expand Up @@ -1337,6 +1343,7 @@ pub(crate) fn system_object_type_to_audit_object_type(
mz_sql::catalog::ObjectType::Database => ObjectType::Database,
mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
mz_sql::catalog::ObjectType::Func => ObjectType::Func,
mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
},
SystemObjectType::System => ObjectType::System,
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
let mut builtin_index_additions = Vec::new();
for (builtin_item_update, ts, diff) in builtin_item_updates {
match &builtin_item_update.description.object_type {
CatalogItemType::Index => push_update(
CatalogItemType::Index | CatalogItemType::ContinualTask => push_update(
StateUpdate {
kind: StateUpdateKind::SystemObjectMapping(builtin_item_update),
ts,
Expand Down
74 changes: 67 additions & 7 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mz_catalog::builtin::{
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTERS, MZ_CLUSTER_REPLICAS,
MZ_CLUSTER_REPLICA_METRICS, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES,
MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS,
MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS,
MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS,
MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEXES, MZ_INDEX_COLUMNS, MZ_INTERNAL_CLUSTER_REPLICAS,
MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES, MZ_KAFKA_SOURCE_TABLES, MZ_LIST_TYPES,
MZ_MAP_TYPES, MZ_MATERIALIZED_VIEWS, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
Expand All @@ -32,8 +32,8 @@ use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, DataSourceDesc, Func,
Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, ContinualTask,
DataSourceDesc, Func, Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{
Expand All @@ -52,7 +52,7 @@ use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
use mz_repr::refresh_schedule::RefreshEvery;
use mz_repr::role_id::RoleId;
use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, ScalarType, Timestamp};
use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
use mz_sql::catalog::{
CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
TypeCategory,
Expand Down Expand Up @@ -671,6 +671,9 @@ impl CatalogState {
CatalogItem::Connection(connection) => self.pack_connection_update(
id, oid, schema_id, name, owner_id, privileges, connection, diff,
),
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
id, oid, schema_id, name, owner_id, privileges, ct, diff,
),
};

if !entry.item().is_temporary() {
Expand Down Expand Up @@ -1268,8 +1271,6 @@ impl CatalogState {
query_string.push(';');
query_string
}
// TODO(ct): Remove.
Statement::CreateContinualTask(_) => "TODO(ct)".into(),
_ => unreachable!(),
};

Expand Down Expand Up @@ -1353,6 +1354,64 @@ impl CatalogState {
updates
}

fn pack_continual_task_update(
&self,
id: GlobalId,
oid: u32,
schema_id: &SchemaSpecifier,
name: &str,
owner_id: &RoleId,
privileges: Datum,
ct: &ContinualTask,
diff: Diff,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let create_stmt = mz_sql::parse::parse(&ct.create_sql)
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
ct.create_sql, e
)
})
.into_element()
.ast;
let query_string = match &create_stmt {
Statement::CreateContinualTask(stmt) => {
let mut query_string = String::new();
for stmt in &stmt.stmts {
let s = match stmt {
ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
};
if query_string.is_empty() {
query_string = s;
} else {
query_string.push_str("; ");
query_string.push_str(&s);
}
}
query_string
}
_ => unreachable!(),
};

vec![BuiltinTableUpdate {
id: &*MZ_CONTINUAL_TASKS,
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Datum::String(&schema_id.to_string()),
Datum::String(name),
Datum::String(&ct.cluster_id.to_string()),
Datum::String(&query_string),
Datum::String(&owner_id.to_string()),
privileges,
Datum::String(&ct.create_sql),
Datum::String(&create_stmt.to_ast_string_redacted()),
]),
diff,
}]
}

fn pack_sink_update(
&self,
id: GlobalId,
Expand Down Expand Up @@ -2045,7 +2104,8 @@ impl CatalogState {
| CommentObjectId::Func(global_id)
| CommentObjectId::Connection(global_id)
| CommentObjectId::Secret(global_id)
| CommentObjectId::Type(global_id) => global_id.to_string(),
| CommentObjectId::Type(global_id)
| CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
CommentObjectId::Role(role_id) => role_id.to_string(),
CommentObjectId::Database(database_id) => database_id.to_string(),
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ impl CatalogState {
| CommentObjectId::Func(global_id)
| CommentObjectId::Connection(global_id)
| CommentObjectId::Type(global_id)
| CommentObjectId::Secret(global_id) => {
| CommentObjectId::Secret(global_id)
| CommentObjectId::ContinualTask(global_id) => {
let entry = self.entry_by_id.get(&global_id);
match entry {
None => comment_inconsistencies
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,8 @@ impl Catalog {
}
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::MaterializedView(_) => {
| CatalogItem::MaterializedView(_)
| CatalogItem::ContinualTask(_) => {
// Storage objects don't have any external objects to drop.
}
CatalogItem::Sink(_) => {
Expand Down
49 changes: 20 additions & 29 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@ use mz_catalog::builtin::{
use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, DataSourceDesc,
Database, DefaultPrivileges, Index, MaterializedView, Role, Schema, Secret, Sink, Source,
Table, TableDataSource, Type, View,
CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, ContinualTask,
DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView, Role, Schema, Secret,
Sink, Source, Table, TableDataSource, Type, View,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
UnmanagedReplicaLocation,
};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NOW_ZERO;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -315,7 +314,8 @@ impl CatalogState {
CatalogItem::Log(_) => out.push(id),
item @ (CatalogItem::View(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Connection(_)) => {
| CatalogItem::Connection(_)
| CatalogItem::ContinualTask(_)) => {
// TODO(jkosh44) Unclear if this table wants to include all uses or only references.
for id in &item.references().0 {
self.introspection_dependencies_inner(*id, out);
Expand Down Expand Up @@ -954,27 +954,14 @@ impl CatalogState {
desc,
continual_task,
..
}) => {
// TODO(ct): Figure out how to make this survive restarts. The
// expr we saved still had the LocalId placeholders for the
// output, but we don't have access to the real Id here.
let optimized_expr = OptimizedMirRelationExpr::declare_optimized(
mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()),
);
// TODO(ct): CatalogItem::ContinualTask
CatalogItem::MaterializedView(MaterializedView {
create_sql: continual_task.create_sql,
raw_expr: Arc::new(continual_task.expr.clone()),
optimized_expr: Arc::new(optimized_expr),
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
non_null_assertions: continual_task.non_null_assertions,
custom_logical_compaction_window: continual_task.compaction_window,
refresh_schedule: continual_task.refresh_schedule,
initial_as_of: continual_task.as_of.map(Antichain::from_elem),
})
}
}) => CatalogItem::ContinualTask(ContinualTask {
create_sql: continual_task.create_sql,
raw_expr: Arc::new(continual_task.expr.clone()),
desc,
resolved_ids,
cluster_id: continual_task.cluster_id,
initial_as_of: continual_task.as_of.map(Antichain::from_elem),
}),
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
create_sql: index.create_sql,
on: index.on,
Expand Down Expand Up @@ -1338,7 +1325,8 @@ impl CatalogState {
| CatalogItemType::MaterializedView
| CatalogItemType::Index
| CatalogItemType::Secret
| CatalogItemType::Connection => schema.items[builtin.name()].clone(),
| CatalogItemType::Connection
| CatalogItemType::ContinualTask => schema.items[builtin.name()].clone(),
}
}

Expand Down Expand Up @@ -1748,6 +1736,7 @@ impl CatalogState {
CatalogItemType::Connection => CommentObjectId::Connection(global_id),
CatalogItemType::Type => CommentObjectId::Type(global_id),
CatalogItemType::Secret => CommentObjectId::Secret(global_id),
CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(global_id),
}
}
ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
Expand Down Expand Up @@ -2105,7 +2094,8 @@ impl CatalogState {
| CommentObjectId::Func(id)
| CommentObjectId::Connection(id)
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id) => Some(*id),
| CommentObjectId::Secret(id)
| CommentObjectId::ContinualTask(id) => Some(*id),
CommentObjectId::Role(_)
| CommentObjectId::Database(_)
| CommentObjectId::Schema(_)
Expand Down Expand Up @@ -2133,7 +2123,8 @@ impl CatalogState {
| CommentObjectId::Func(id)
| CommentObjectId::Connection(id)
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id) => {
| CommentObjectId::Secret(id)
| CommentObjectId::ContinualTask(id) => {
let item = self.get_entry(&id);
let name = self.resolve_full_name(item.name(), Some(conn_id));
name.to_string()
Expand Down
33 changes: 33 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,39 @@ impl Coordinator {
);
}
}
CatalogItem::ContinualTask(ct) => {
policies_to_set
.entry(policy.expect("continual tasks have a compaction window"))
.or_insert_with(Default::default)
.storage_ids
.insert(entry.id());

let mut df_desc = self
.catalog()
.try_get_physical_plan(&entry.id())
.expect("added in `bootstrap_dataflow_plans`")
.clone();

if let Some(initial_as_of) = ct.initial_as_of.clone() {
df_desc.set_initial_as_of(initial_as_of);
}

let df_meta = self
.catalog()
.try_get_dataflow_metainfo(&entry.id())
.expect("added in `bootstrap_dataflow_plans`");

if self.catalog().state().system_config().enable_mz_notices() {
// Collect optimization hint updates.
self.catalog().state().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
1,
);
}

self.ship_dataflow(df_desc, ct.cluster_id, None).await;
}
// Nothing to do for these cases
CatalogItem::Log(_)
| CatalogItem::Type(_)
Expand Down
Loading

0 comments on commit 01e0078

Please sign in to comment.