From 4e276b6a34ef84dd33ca95fc12c0a76b85d3402b Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:38:27 -0700 Subject: [PATCH] ct: add `::ContinualTask` various adapter enums May as well rip off the band-aid and get it over with. --- src/adapter/src/catalog.rs | 2 + src/adapter/src/catalog/apply.rs | 2 +- .../src/catalog/builtin_table_updates.rs | 57 +- src/adapter/src/catalog/consistency.rs | 3 +- src/adapter/src/catalog/open.rs | 3 +- src/adapter/src/catalog/state.rs | 49 +- src/adapter/src/coord.rs | 33 + src/adapter/src/coord/ddl.rs | 11 +- src/adapter/src/coord/sequencer/inner.rs | 6 +- .../sequencer/inner/create_continual_task.rs | 8 +- src/adapter/src/coord/timeline.rs | 11 +- src/adapter/src/optimize/dataflows.rs | 6 +- src/audit-log/src/lib.rs | 2 + src/catalog/protos/hashes.json | 6 +- src/catalog/protos/objects.proto | 4 + src/catalog/protos/objects_v66.proto | 946 ++++++++++++++++++ src/catalog/src/durable/initialize.rs | 1 + .../src/durable/objects/serialization.rs | 16 + src/catalog/src/memory/objects.rs | 61 +- src/sql-parser/src/ast/defs/statement.rs | 5 +- src/sql-parser/src/parser.rs | 23 +- src/sql/src/catalog.rs | 13 +- src/sql/src/names.rs | 4 +- src/sql/src/plan.rs | 3 + src/sql/src/plan/statement/acl.rs | 5 +- src/sql/src/plan/statement/ddl.rs | 3 +- src/sql/src/plan/statement/show.rs | 3 +- src/sql/src/rbac.rs | 8 +- 28 files changed, 1219 insertions(+), 75 deletions(-) create mode 100644 src/catalog/protos/objects_v66.proto diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 5961b104e9df0..db7c69c9a0c17 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1318,6 +1318,7 @@ pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType CommentObjectId::Schema(_) => ObjectType::Schema, CommentObjectId::Cluster(_) => ObjectType::Cluster, CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica, + CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask, } } @@ -1347,6 +1348,7 @@ pub(crate) fn system_object_type_to_audit_object_type( mz_sql::catalog::ObjectType::Database => ObjectType::Database, mz_sql::catalog::ObjectType::Schema => ObjectType::Schema, mz_sql::catalog::ObjectType::Func => ObjectType::Func, + mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask, }, SystemObjectType::System => ObjectType::System, } diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 746a47a2030f6..ba0ccd86cc5ad 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -1619,7 +1619,7 @@ fn sort_updates_inner(updates: Vec) -> Vec { let mut builtin_index_additions = Vec::new(); for (builtin_item_update, ts, diff) in builtin_item_updates { match &builtin_item_update.description.object_type { - CatalogItemType::Index => push_update( + CatalogItemType::Index | CatalogItemType::ContinualTask => push_update( StateUpdate { kind: StateUpdateKind::SystemObjectMapping(builtin_item_update), ts, diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 659e3b11033ba..3603dabc937cf 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -33,8 +33,8 @@ use mz_catalog::config::AwsPrincipalContext; use mz_catalog::durable::SourceReferences; use mz_catalog::memory::error::{Error, ErrorKind}; use mz_catalog::memory::objects::{ - CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, DataSourceDesc, Func, - Index, MaterializedView, Sink, Table, TableDataSource, Type, View, + CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, ContinualTask, + DataSourceDesc, Func, Index, MaterializedView, Sink, Table, TableDataSource, Type, View, }; use mz_catalog::SYSTEM_CONN_ID; use mz_controller::clusters::{ @@ -669,6 +669,9 @@ impl CatalogState { CatalogItem::Connection(connection) => self.pack_connection_update( id, oid, schema_id, name, owner_id, privileges, connection, diff, ), + CatalogItem::ContinualTask(ct) => self.pack_continual_task_update( + id, oid, schema_id, name, owner_id, privileges, ct, diff, + ), }; if !entry.item().is_temporary() { @@ -1253,8 +1256,6 @@ impl CatalogState { query_string.push(';'); query_string } - // TODO(ct): Remove. - Statement::CreateContinualTask(_) => "TODO(ct)".into(), _ => unreachable!(), }; @@ -1338,6 +1339,51 @@ impl CatalogState { updates } + fn pack_continual_task_update( + &self, + id: GlobalId, + oid: u32, + schema_id: &SchemaSpecifier, + name: &str, + owner_id: &RoleId, + privileges: Datum, + ct: &ContinualTask, + diff: Diff, + ) -> Vec> { + let create_stmt = mz_sql::parse::parse(&ct.create_sql) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + ct.create_sql, e + ) + }) + .into_element() + .ast; + let query_string = "TODO(ct)"; + + let mut updates = Vec::new(); + + updates.push(BuiltinTableUpdate { + // TODO(ct): MZ_CONTINUAL_TASKS + id: &*MZ_MATERIALIZED_VIEWS, + row: Row::pack_slice(&[ + Datum::String(&id.to_string()), + Datum::UInt32(oid), + Datum::String(&schema_id.to_string()), + Datum::String(name), + Datum::String(&ct.cluster_id.to_string()), + Datum::String(&query_string), + Datum::String(&owner_id.to_string()), + privileges, + Datum::String(&ct.create_sql), + Datum::String(&create_stmt.to_ast_string_redacted()), + ]), + diff, + }); + + updates + } + fn pack_sink_update( &self, id: GlobalId, @@ -2025,7 +2071,8 @@ impl CatalogState { | CommentObjectId::Func(global_id) | CommentObjectId::Connection(global_id) | CommentObjectId::Secret(global_id) - | CommentObjectId::Type(global_id) => global_id.to_string(), + | CommentObjectId::Type(global_id) + | CommentObjectId::ContinualTask(global_id) => global_id.to_string(), CommentObjectId::Role(role_id) => role_id.to_string(), CommentObjectId::Database(database_id) => database_id.to_string(), CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(), diff --git a/src/adapter/src/catalog/consistency.rs b/src/adapter/src/catalog/consistency.rs index e631cdcdab04e..be92ae1e5b7e5 100644 --- a/src/adapter/src/catalog/consistency.rs +++ b/src/adapter/src/catalog/consistency.rs @@ -237,7 +237,8 @@ impl CatalogState { | CommentObjectId::Func(global_id) | CommentObjectId::Connection(global_id) | CommentObjectId::Type(global_id) - | CommentObjectId::Secret(global_id) => { + | CommentObjectId::Secret(global_id) + | CommentObjectId::ContinualTask(global_id) => { let entry = self.entry_by_id.get(&global_id); match entry { None => comment_inconsistencies diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 8d6bc7f9e53ae..cf66c4c249b80 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -693,7 +693,8 @@ impl Catalog { } CatalogItem::Table(_) | CatalogItem::Source(_) - | CatalogItem::MaterializedView(_) => { + | CatalogItem::MaterializedView(_) + | CatalogItem::ContinualTask(_) => { // Storage objects don't have any external objects to drop. } CatalogItem::Sink(_) => { diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 9e00c89024400..72ca4d172f653 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -28,9 +28,9 @@ use mz_catalog::builtin::{ use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap}; use mz_catalog::memory::error::{Error, ErrorKind}; use mz_catalog::memory::objects::{ - CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, DataSourceDesc, - Database, DefaultPrivileges, Index, MaterializedView, Role, Schema, Secret, Sink, Source, - Table, TableDataSource, Type, View, + CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, ContinualTask, + DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView, Role, Schema, Secret, + Sink, Source, Table, TableDataSource, Type, View, }; use mz_catalog::SYSTEM_CONN_ID; use mz_controller::clusters::{ @@ -38,7 +38,6 @@ use mz_controller::clusters::{ UnmanagedReplicaLocation, }; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_expr::OptimizedMirRelationExpr; use mz_ore::collections::CollectionExt; use mz_ore::now::NOW_ZERO; use mz_ore::soft_assert_no_log; @@ -312,7 +311,8 @@ impl CatalogState { CatalogItem::Log(_) => out.push(id), item @ (CatalogItem::View(_) | CatalogItem::MaterializedView(_) - | CatalogItem::Connection(_)) => { + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_)) => { // TODO(jkosh44) Unclear if this table wants to include all uses or only references. for id in &item.references().0 { self.introspection_dependencies_inner(*id, out); @@ -951,27 +951,14 @@ impl CatalogState { desc, continual_task, .. - }) => { - // TODO(ct): Figure out how to make this survive restarts. The - // expr we saved still had the LocalId placeholders for the - // output, but we don't have access to the real Id here. - let optimized_expr = OptimizedMirRelationExpr::declare_optimized( - mz_expr::MirRelationExpr::constant(Vec::new(), desc.typ().clone()), - ); - // TODO(ct): CatalogItem::ContinualTask - CatalogItem::MaterializedView(MaterializedView { - create_sql: continual_task.create_sql, - raw_expr: Arc::new(continual_task.expr.clone()), - optimized_expr: Arc::new(optimized_expr), - desc, - resolved_ids, - cluster_id: continual_task.cluster_id, - non_null_assertions: continual_task.non_null_assertions, - custom_logical_compaction_window: continual_task.compaction_window, - refresh_schedule: continual_task.refresh_schedule, - initial_as_of: continual_task.as_of.map(Antichain::from_elem), - }) - } + }) => CatalogItem::ContinualTask(ContinualTask { + create_sql: continual_task.create_sql, + raw_expr: Arc::new(continual_task.expr.clone()), + desc, + resolved_ids, + cluster_id: continual_task.cluster_id, + initial_as_of: continual_task.as_of.map(Antichain::from_elem), + }), Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { create_sql: index.create_sql, on: index.on, @@ -1335,7 +1322,8 @@ impl CatalogState { | CatalogItemType::MaterializedView | CatalogItemType::Index | CatalogItemType::Secret - | CatalogItemType::Connection => schema.items[builtin.name()].clone(), + | CatalogItemType::Connection + | CatalogItemType::ContinualTask => schema.items[builtin.name()].clone(), } } @@ -1745,6 +1733,7 @@ impl CatalogState { CatalogItemType::Connection => CommentObjectId::Connection(global_id), CatalogItemType::Type => CommentObjectId::Type(global_id), CatalogItemType::Secret => CommentObjectId::Secret(global_id), + CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(global_id), } } ObjectId::Role(role_id) => CommentObjectId::Role(role_id), @@ -2102,7 +2091,8 @@ impl CatalogState { | CommentObjectId::Func(id) | CommentObjectId::Connection(id) | CommentObjectId::Type(id) - | CommentObjectId::Secret(id) => Some(*id), + | CommentObjectId::Secret(id) + | CommentObjectId::ContinualTask(id) => Some(*id), CommentObjectId::Role(_) | CommentObjectId::Database(_) | CommentObjectId::Schema(_) @@ -2130,7 +2120,8 @@ impl CatalogState { | CommentObjectId::Func(id) | CommentObjectId::Connection(id) | CommentObjectId::Type(id) - | CommentObjectId::Secret(id) => { + | CommentObjectId::Secret(id) + | CommentObjectId::ContinualTask(id) => { let item = self.get_entry(&id); let name = self.resolve_full_name(item.name(), Some(conn_id)); name.to_string() diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index b498718a9239b..cfa1749a294b7 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2053,6 +2053,39 @@ impl Coordinator { ); } } + CatalogItem::ContinualTask(ct) => { + policies_to_set + .entry(policy.expect("continual tasks have a compaction window")) + .or_insert_with(Default::default) + .storage_ids + .insert(entry.id()); + + let mut df_desc = self + .catalog() + .try_get_physical_plan(&entry.id()) + .expect("added in `bootstrap_dataflow_plans`") + .clone(); + + if let Some(initial_as_of) = ct.initial_as_of.clone() { + df_desc.set_initial_as_of(initial_as_of); + } + + let df_meta = self + .catalog() + .try_get_dataflow_metainfo(&entry.id()) + .expect("added in `bootstrap_dataflow_plans`"); + + if self.catalog().state().system_config().enable_mz_notices() { + // Collect optimization hint updates. + self.catalog().state().pack_optimizer_notices( + &mut builtin_table_updates, + df_meta.optimizer_notices.iter(), + 1, + ); + } + + self.ship_dataflow(df_desc, ct.cluster_id, None).await; + } // Nothing to do for these cases CatalogItem::Log(_) | CatalogItem::Type(_) diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 26054187d6587..f98cdf93e833c 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1386,6 +1386,10 @@ impl Coordinator { CatalogItem::Secret(_) => { new_secrets += 1; } + CatalogItem::ContinualTask(_) => { + // TODO(ct): Give CTs their own limit? + new_materialized_views += 1; + } CatalogItem::Log(_) | CatalogItem::View(_) | CatalogItem::Index(_) @@ -1458,6 +1462,10 @@ impl Coordinator { CatalogItem::Secret(_) => { new_secrets -= 1; } + CatalogItem::ContinualTask(_) => { + // TODO(ct): Give CTs their own limit? + new_materialized_views -= 1; + } CatalogItem::Log(_) | CatalogItem::View(_) | CatalogItem::Index(_) @@ -1492,7 +1500,8 @@ impl Coordinator { | CatalogItem::View(_) | CatalogItem::Index(_) | CatalogItem::Type(_) - | CatalogItem::Func(_) => {} + | CatalogItem::Func(_) + | CatalogItem::ContinualTask(_) => {} }, Op::AlterRole { .. } | Op::AlterRetainHistory { .. } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index e957bd362a09c..187b0dedcd351 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -2605,7 +2605,7 @@ impl Coordinator { } } match entry.item().typ() { - typ @ (Func | View | MaterializedView) => { + typ @ (Func | View | MaterializedView | ContinualTask) => { ids_to_check.extend(entry.uses()); let valid_id = id.is_user() || matches!(typ, Func); valid_id @@ -2942,7 +2942,9 @@ impl Coordinator { }]; self.catalog_transact_with_side_effects(Some(session), ops, |coord| async { let cluster = match coord.catalog().get_entry(&plan.id).item() { - CatalogItem::Table(_) | CatalogItem::MaterializedView(_) => None, + CatalogItem::Table(_) + | CatalogItem::MaterializedView(_) + | CatalogItem::ContinualTask(_) => None, CatalogItem::Index(index) => Some(index.cluster_id), CatalogItem::Source(_) => { let read_policies = coord.catalog().source_read_policies(plan.id); diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index edf249274d4da..8ca9dda713f43 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use mz_catalog::memory::objects::{ - CatalogEntry, CatalogItem, MaterializedView, Table, TableDataSource, + CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource, }; use mz_compute_types::sinks::{ ComputeSinkConnection, ContinualTaskConnection, PersistSinkConnection, @@ -161,19 +161,15 @@ impl Coordinator { let ops = vec![catalog::Op::CreateItem { id: sink_id, name: name.clone(), - item: CatalogItem::MaterializedView(MaterializedView { + item: CatalogItem::ContinualTask(ContinualTask { // TODO(ct): This doesn't give the `DELETE FROM` / `INSERT INTO` // names the `[u1 AS "materialize"."public"."append_only"]` // style expansion. Bug? create_sql, raw_expr: Arc::new(raw_expr), - optimized_expr: Arc::new(local_mir_plan.expr()), desc: desc.clone(), resolved_ids, cluster_id, - non_null_assertions: Vec::new(), - custom_logical_compaction_window: None, - refresh_schedule, initial_as_of: Some(as_of.clone()), }), owner_id: *session.current_role_id(), diff --git a/src/adapter/src/coord/timeline.rs b/src/adapter/src/coord/timeline.rs index 2dcca1ae328a2..b1fda7612646d 100644 --- a/src/adapter/src/coord/timeline.rs +++ b/src/adapter/src/coord/timeline.rs @@ -19,7 +19,7 @@ use chrono::{DateTime, Utc}; use futures::Future; use itertools::Itertools; use mz_adapter_types::connection::ConnectionId; -use mz_catalog::memory::objects::{CatalogItem, MaterializedView, View}; +use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View}; use mz_compute_types::ComputeInstanceId; use mz_expr::CollectionPlan; use mz_ore::collections::CollectionExt; @@ -333,7 +333,8 @@ impl Coordinator { match entry.item() { CatalogItem::Table(_) | CatalogItem::Source(_) - | CatalogItem::MaterializedView(_) => { + | CatalogItem::MaterializedView(_) + | CatalogItem::ContinualTask(_) => { id_bundle.storage_ids.insert(entry.id()); } CatalogItem::Index(index) => { @@ -460,6 +461,12 @@ impl Coordinator { timelines.insert(TimelineContext::TimestampDependent); ids.extend(optimized_expr.depends_on()); } + CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => { + // See comment in MaterializedView + timelines.insert(TimelineContext::TimestampDependent); + // TODO(ct): optimized_expr? + ids.extend(raw_expr.depends_on()); + } CatalogItem::Table(table) => { timelines.insert(TimelineContext::TimelineDependent(table.timeline())); } diff --git a/src/adapter/src/optimize/dataflows.rs b/src/adapter/src/optimize/dataflows.rs index 4f491f6b053bd..5644ce57e1233 100644 --- a/src/adapter/src/optimize/dataflows.rs +++ b/src/adapter/src/optimize/dataflows.rs @@ -226,6 +226,9 @@ impl<'a> DataflowBuilder<'a> { CatalogItem::Log(log) => { dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic); } + CatalogItem::ContinualTask(ct) => { + dataflow.import_source(*id, ct.desc.typ().clone(), monotonic); + } _ => unreachable!(), } } @@ -396,7 +399,8 @@ impl<'a> DataflowBuilder<'a> { | CatalogItem::Log(_) | CatalogItem::MaterializedView(_) | CatalogItem::Sink(_) - | CatalogItem::Func(_) => Ok(false), + | CatalogItem::Func(_) + | CatalogItem::ContinualTask(_) => Ok(false), } })?; diff --git a/src/audit-log/src/lib.rs b/src/audit-log/src/lib.rs index a6f0b26edfda6..5cf48a0b1d932 100644 --- a/src/audit-log/src/lib.rs +++ b/src/audit-log/src/lib.rs @@ -108,6 +108,7 @@ pub enum ObjectType { Cluster, ClusterReplica, Connection, + ContinualTask, Database, Func, Index, @@ -129,6 +130,7 @@ impl ObjectType { ObjectType::Cluster => "Cluster", ObjectType::ClusterReplica => "Cluster Replica", ObjectType::Connection => "Connection", + ObjectType::ContinualTask => "Continual Task", ObjectType::Database => "Database", ObjectType::Func => "Function", ObjectType::Index => "Index", diff --git a/src/catalog/protos/hashes.json b/src/catalog/protos/hashes.json index 04ffd720c4ca5..bf79e7d65043f 100644 --- a/src/catalog/protos/hashes.json +++ b/src/catalog/protos/hashes.json @@ -1,7 +1,7 @@ [ { "name": "objects.proto", - "md5": "497b292990f570bc2f75586d35dbdada" + "md5": "f8cf9f2a09591ea4fe5efa94611c14d2" }, { "name": "objects_v60.proto", @@ -26,5 +26,9 @@ { "name": "objects_v65.proto", "md5": "541dd7ebf7d37ec0c2aa480ca48a84cf" + }, + { + "name": "objects_v66.proto", + "md5": "d1cf53e138c35d68c13181b75305be57" } ] diff --git a/src/catalog/protos/objects.proto b/src/catalog/protos/objects.proto index 54b4f0f9c61fa..121c667feb343 100644 --- a/src/catalog/protos/objects.proto +++ b/src/catalog/protos/objects.proto @@ -163,6 +163,7 @@ message CommentKey { GlobalId connection = 9; GlobalId type = 10; GlobalId secret = 11; + GlobalId continual_task = 17; RoleId role = 12; DatabaseId database = 13; ResolvedSchema schema = 14; @@ -254,6 +255,7 @@ enum CatalogItemType { CATALOG_ITEM_TYPE_FUNC = 8; CATALOG_ITEM_TYPE_SECRET = 9; CATALOG_ITEM_TYPE_CONNECTION = 10; + CATALOG_ITEM_TYPE_CONTINUAL_TASK = 11; } message CatalogItem { @@ -457,6 +459,7 @@ enum ObjectType { OBJECT_TYPE_DATABASE = 13; OBJECT_TYPE_SCHEMA = 14; OBJECT_TYPE_FUNC = 15; + OBJECT_TYPE_CONTINUAL_TASK = 16; } message DefaultPrivilegesKey { @@ -509,6 +512,7 @@ message AuditLogEventV1 { OBJECT_TYPE_TYPE = 14; OBJECT_TYPE_VIEW = 15; OBJECT_TYPE_SYSTEM = 16; + OBJECT_TYPE_CONTINUAL_TASK = 17; } message IdFullNameV1 { diff --git a/src/catalog/protos/objects_v66.proto b/src/catalog/protos/objects_v66.proto new file mode 100644 index 0000000000000..593f249e37a10 --- /dev/null +++ b/src/catalog/protos/objects_v66.proto @@ -0,0 +1,946 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// This protobuf file defines the types we store in the Stash. +// +// Before and after modifying this file, make sure you have a snapshot of the before version, +// e.g. a copy of this file named 'objects_v{CATALOG_VERSION}.proto', and a snapshot of the file +// after your modifications, e.g. 'objects_v{CATALOG_VERSION + 1}.proto'. Then you can write a +// migration using these two files, and no matter how the types change in the future, we'll always +// have these snapshots to facilitate the migration. + +// buf breaking: ignore (does currently not require backward-compatibility) + +syntax = "proto3"; + +package objects_v66; + +message ConfigKey { + string key = 1; +} + +message ConfigValue { + uint64 value = 1; +} + +message SettingKey { + string name = 1; +} + +message SettingValue { + string value = 1; +} + +message IdAllocKey { + string name = 1; +} + +message IdAllocValue { + uint64 next_id = 1; +} + +message GidMappingKey { + string schema_name = 1; + CatalogItemType object_type = 2; + string object_name = 3; +} + +message GidMappingValue { + uint64 id = 1; + string fingerprint = 2; +} + +message ClusterKey { + ClusterId id = 1; +} + +message ClusterValue { + reserved 2; + string name = 1; + RoleId owner_id = 3; + repeated MzAclItem privileges = 4; + ClusterConfig config = 5; +} + +message ClusterIntrospectionSourceIndexKey { + ClusterId cluster_id = 1; + string name = 2; +} + +message ClusterIntrospectionSourceIndexValue { + uint64 index_id = 1; + uint32 oid = 2; +} + +message ClusterReplicaKey { + ReplicaId id = 1; +} + +message ClusterReplicaValue { + ClusterId cluster_id = 1; + string name = 2; + ReplicaConfig config = 3; + RoleId owner_id = 4; +} + +message DatabaseKey { + DatabaseId id = 1; +} + +message DatabaseValue { + string name = 1; + RoleId owner_id = 2; + repeated MzAclItem privileges = 3; + uint32 oid = 4; +} + +message SchemaKey { + SchemaId id = 1; +} + +message SchemaValue { + DatabaseId database_id = 1; + string name = 2; + RoleId owner_id = 3; + repeated MzAclItem privileges = 4; + uint32 oid = 5; +} + +message ItemKey { + GlobalId gid = 1; +} + +message ItemValue { + SchemaId schema_id = 1; + string name = 2; + CatalogItem definition = 3; + RoleId owner_id = 4; + repeated MzAclItem privileges = 5; + uint32 oid = 6; +} + +message RoleKey { + RoleId id = 1; +} + +message RoleValue { + string name = 1; + RoleAttributes attributes = 2; + RoleMembership membership = 3; + RoleVars vars = 4; + uint32 oid = 5; +} + +message ServerConfigurationKey { + string name = 1; +} + +message ServerConfigurationValue { + string value = 1; +} + +message AuditLogKey { + oneof event { + AuditLogEventV1 v1 = 1; + } +} + +message CommentKey { + oneof object { + GlobalId table = 1; + GlobalId view = 2; + GlobalId materialized_view = 4; + GlobalId source = 5; + GlobalId sink = 6; + GlobalId index = 7; + GlobalId func = 8; + GlobalId connection = 9; + GlobalId type = 10; + GlobalId secret = 11; + GlobalId continual_task = 17; + RoleId role = 12; + DatabaseId database = 13; + ResolvedSchema schema = 14; + ClusterId cluster = 15; + ClusterReplicaId cluster_replica = 16; + } + oneof sub_component { + uint64 column_pos = 3; + } +} + +message CommentValue { + string comment = 1; +} + +message SourceReferencesKey { + GlobalId source = 1; +} + +message SourceReferencesValue { + repeated SourceReference references = 1; + EpochMillis updated_at = 2; +} + +message SourceReference { + string name = 1; + optional string namespace = 2; + repeated string columns = 3; +} + +message StorageCollectionMetadataKey { + GlobalId id = 1; +} + +// This value is stored transparently, however, it should only ever be +// manipulated by the storage controller. +message StorageCollectionMetadataValue { + string shard = 1; +} + +// This value is stored transparently, however, it should only ever be +// manipulated by the storage controller. +message UnfinalizedShardKey { + string shard = 1; +} + +// This value is stored transparently, however, it should only ever be +// manipulated by the storage controller. +message TxnWalShardValue { + string shard = 1; +} + +// ---- Common Types +// +// Note: Normally types like this would go in some sort of `common.proto` file, but we want to keep +// our proto definitions in a single file to make snapshotting easier, hence them living here. + +message Empty {/* purposefully empty */} + +// In protobuf a "None" string is the same thing as an empty string. To get the same semantics of +// an `Option` from Rust, we need to wrap a string in a message. +message StringWrapper { + string inner = 1; +} + +message Duration { + uint64 secs = 1; + uint32 nanos = 2; +} + +message EpochMillis { + uint64 millis = 1; +} + +// Opaque timestamp type that is specific to Materialize. +message Timestamp { + uint64 internal = 1; +} + +enum CatalogItemType { + CATALOG_ITEM_TYPE_UNKNOWN = 0; + CATALOG_ITEM_TYPE_TABLE = 1; + CATALOG_ITEM_TYPE_SOURCE = 2; + CATALOG_ITEM_TYPE_SINK = 3; + CATALOG_ITEM_TYPE_VIEW = 4; + CATALOG_ITEM_TYPE_MATERIALIZED_VIEW = 5; + CATALOG_ITEM_TYPE_INDEX = 6; + CATALOG_ITEM_TYPE_TYPE = 7; + CATALOG_ITEM_TYPE_FUNC = 8; + CATALOG_ITEM_TYPE_SECRET = 9; + CATALOG_ITEM_TYPE_CONNECTION = 10; + CATALOG_ITEM_TYPE_CONTINUAL_TASK = 11; +} + +message CatalogItem { + message V1 { + string create_sql = 1; + } + + oneof value { + V1 v1 = 1; + } +} + +message GlobalId { + oneof value { + uint64 system = 1; + uint64 user = 2; + uint64 transient = 3; + Empty explain = 4; + } +} + +message ClusterId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message DatabaseId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message ResolvedDatabaseSpecifier { + oneof spec { + Empty ambient = 1; + DatabaseId id = 2; + } +} + +message SchemaId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message SchemaSpecifier { + oneof spec { + Empty temporary = 1; + SchemaId id = 2; + } +} + +message ResolvedSchema { + ResolvedDatabaseSpecifier database = 1; + SchemaSpecifier schema = 2; +} + +message ReplicaId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message ClusterReplicaId { + ClusterId cluster_id = 1; + ReplicaId replica_id = 2; +} + +message ReplicaLogging { + bool log_logging = 1; + Duration interval = 2; +} + +message OptimizerFeatureOverride { + string name = 1; + string value = 2; +} + +message ClusterScheduleRefreshOptions { + Duration rehydration_time_estimate = 1; +} + +message ClusterSchedule { + oneof value { + Empty manual = 1; + ClusterScheduleRefreshOptions refresh = 2; + } +} + +message ClusterConfig { + message ManagedCluster { + string size = 1; + uint32 replication_factor = 2; + repeated string availability_zones = 3; + ReplicaLogging logging = 4; + bool disk = 6; + repeated OptimizerFeatureOverride optimizer_feature_overrides = 7; + ClusterSchedule schedule = 8; + } + + oneof variant { + Empty unmanaged = 1; + ManagedCluster managed = 2; + } + optional string workload_class = 3; +} + +message ReplicaConfig { + message UnmanagedLocation { + repeated string storagectl_addrs = 1; + repeated string storage_addrs = 2; + repeated string computectl_addrs = 3; + repeated string compute_addrs = 4; + uint64 workers = 5; + } + + message ManagedLocation { + string size = 1; + optional string availability_zone = 2; + bool disk = 4; + bool internal = 5; + optional string billed_as = 6; + bool pending = 7; + } + + oneof location { + UnmanagedLocation unmanaged = 1; + ManagedLocation managed = 2; + } + ReplicaLogging logging = 3; +} + +message RoleId { + oneof value { + uint64 system = 1; + uint64 user = 2; + Empty public = 3; + uint64 predefined = 4; + } +} + +message RoleAttributes { + bool inherit = 1; +} + +message RoleMembership { + message Entry { + RoleId key = 1; + RoleId value = 2; + } + + repeated Entry map = 1; +} + +message RoleVars { + message SqlSet { + repeated string entries = 1; + } + + message Entry { + string key = 1; + oneof val { + string flat = 2; + SqlSet sql_set = 3; + } + } + + repeated Entry entries = 1; +} + +message AclMode { + // A bit flag representing all the privileges that can be granted to a role. + uint64 bitflags = 1; +} + +message MzAclItem { + RoleId grantee = 1; + RoleId grantor = 2; + AclMode acl_mode = 3; +} + +enum ObjectType { + OBJECT_TYPE_UNKNOWN = 0; + OBJECT_TYPE_TABLE = 1; + OBJECT_TYPE_VIEW = 2; + OBJECT_TYPE_MATERIALIZED_VIEW = 3; + OBJECT_TYPE_SOURCE = 4; + OBJECT_TYPE_SINK = 5; + OBJECT_TYPE_INDEX = 6; + OBJECT_TYPE_TYPE = 7; + OBJECT_TYPE_ROLE = 8; + OBJECT_TYPE_CLUSTER = 9; + OBJECT_TYPE_CLUSTER_REPLICA = 10; + OBJECT_TYPE_SECRET = 11; + OBJECT_TYPE_CONNECTION = 12; + OBJECT_TYPE_DATABASE = 13; + OBJECT_TYPE_SCHEMA = 14; + OBJECT_TYPE_FUNC = 15; + OBJECT_TYPE_CONTINUAL_TASK = 16; +} + +message DefaultPrivilegesKey { + RoleId role_id = 1; + DatabaseId database_id = 2; + SchemaId schema_id = 3; + ObjectType object_type = 4; + RoleId grantee = 5; +} + +message DefaultPrivilegesValue { + AclMode privileges = 1; +} + +message SystemPrivilegesKey { + RoleId grantee = 1; + RoleId grantor = 2; +} + +message SystemPrivilegesValue { + AclMode acl_mode = 1; +} + +message AuditLogEventV1 { + enum EventType { + EVENT_TYPE_UNKNOWN = 0; + EVENT_TYPE_CREATE = 1; + EVENT_TYPE_DROP = 2; + EVENT_TYPE_ALTER = 3; + EVENT_TYPE_GRANT = 4; + EVENT_TYPE_REVOKE = 5; + EVENT_TYPE_COMMENT = 6; + } + + enum ObjectType { + OBJECT_TYPE_UNKNOWN = 0; + OBJECT_TYPE_CLUSTER = 1; + OBJECT_TYPE_CLUSTER_REPLICA = 2; + OBJECT_TYPE_CONNECTION = 3; + OBJECT_TYPE_DATABASE = 4; + OBJECT_TYPE_FUNC = 5; + OBJECT_TYPE_INDEX = 6; + OBJECT_TYPE_MATERIALIZED_VIEW = 7; + OBJECT_TYPE_ROLE = 8; + OBJECT_TYPE_SECRET = 9; + OBJECT_TYPE_SCHEMA = 10; + OBJECT_TYPE_SINK = 11; + OBJECT_TYPE_SOURCE = 12; + OBJECT_TYPE_TABLE = 13; + OBJECT_TYPE_TYPE = 14; + OBJECT_TYPE_VIEW = 15; + OBJECT_TYPE_SYSTEM = 16; + OBJECT_TYPE_CONTINUAL_TASK = 17; + } + + message IdFullNameV1 { + string id = 1; + FullNameV1 name = 2; + } + + message FullNameV1 { + string database = 1; + string schema = 2; + string item = 3; + } + + message IdNameV1 { + string id = 1; + string name = 2; + } + + message RenameClusterV1 { + string id = 1; + string old_name = 2; + string new_name = 3; + } + + message RenameClusterReplicaV1 { + string cluster_id = 1; + string replica_id = 2; + string old_name = 3; + string new_name = 4; + } + + message RenameItemV1 { + string id = 1; + FullNameV1 old_name = 2; + FullNameV1 new_name = 3; + } + + message CreateClusterReplicaV1 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + string logical_size = 5; + bool disk = 6; + optional string billed_as = 7; + bool internal = 8; + } + + message CreateClusterReplicaV2 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + string logical_size = 5; + bool disk = 6; + optional string billed_as = 7; + bool internal = 8; + CreateOrDropClusterReplicaReasonV1 reason = 9; + SchedulingDecisionsWithReasonsV1 scheduling_policies = 10; + } + + message DropClusterReplicaV1 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + } + + message DropClusterReplicaV2 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + CreateOrDropClusterReplicaReasonV1 reason = 5; + SchedulingDecisionsWithReasonsV1 scheduling_policies = 6; + } + + message CreateOrDropClusterReplicaReasonV1 { + oneof reason { + Empty Manual = 1; + Empty Schedule = 2; + Empty System = 3; + } + } + + message SchedulingDecisionsWithReasonsV1 { + RefreshDecisionWithReasonV1 on_refresh = 1; + } + + message RefreshDecisionWithReasonV1 { + oneof decision { + Empty On = 1; + Empty Off = 2; + } + repeated string objects_needing_refresh = 3; + string rehydration_time_estimate = 4; + } + + message CreateSourceSinkV1 { + string id = 1; + FullNameV1 name = 2; + StringWrapper size = 3; + } + + message CreateSourceSinkV2 { + string id = 1; + FullNameV1 name = 2; + StringWrapper size = 3; + string external_type = 4; + } + + message CreateSourceSinkV3 { + string id = 1; + FullNameV1 name = 2; + string external_type = 3; + } + + message CreateSourceSinkV4 { + string id = 1; + StringWrapper cluster_id = 2; + FullNameV1 name = 3; + string external_type = 4; + } + + message CreateIndexV1 { + string id = 1; + string cluster_id = 2; + FullNameV1 name = 3; + } + + message CreateMaterializedViewV1 { + string id = 1; + string cluster_id = 2; + FullNameV1 name = 3; + } + + message AlterSourceSinkV1 { + string id = 1; + FullNameV1 name = 2; + StringWrapper old_size = 3; + StringWrapper new_size = 4; + } + + message AlterSetClusterV1 { + string id = 1; + FullNameV1 name = 2; + StringWrapper old_cluster = 3; + StringWrapper new_cluster = 4; + } + + message GrantRoleV1 { + string role_id = 1; + string member_id = 2; + string grantor_id = 3; + } + + message GrantRoleV2 { + string role_id = 1; + string member_id = 2; + string grantor_id = 3; + string executed_by = 4; + } + + message RevokeRoleV1 { + string role_id = 1; + string member_id = 2; + } + + message RevokeRoleV2 { + string role_id = 1; + string member_id = 2; + string grantor_id = 3; + string executed_by = 4; + } + + message UpdatePrivilegeV1 { + string object_id = 1; + string grantee_id = 2; + string grantor_id = 3; + string privileges = 4; + } + + message AlterDefaultPrivilegeV1 { + string role_id = 1; + StringWrapper database_id = 2; + StringWrapper schema_id = 3; + string grantee_id = 4; + string privileges = 5; + } + + message UpdateOwnerV1 { + string object_id = 1; + string old_owner_id = 2; + string new_owner_id = 3; + } + + message SchemaV1 { + string id = 1; + string name = 2; + string database_name = 3; + } + + message SchemaV2 { + string id = 1; + string name = 2; + StringWrapper database_name = 3; + } + + message RenameSchemaV1 { + string id = 1; + optional string database_name = 2; + string old_name = 3; + string new_name = 4; + } + + message UpdateItemV1 { + string id = 1; + FullNameV1 name = 2; + } + + message AlterRetainHistoryV1 { + string id = 1; + optional string old_history = 2; + optional string new_history = 3; + } + + message ToNewIdV1 { + string id = 1; + string new_id = 2; + } + + message FromPreviousIdV1 { + string id = 1; + string previous_id = 2; + } + + message SetV1 { + string name = 1; + optional string value = 2; + } + + message RotateKeysV1 { + string id = 1; + string name = 2; + } + + uint64 id = 1; + EventType event_type = 2; + ObjectType object_type = 3; + StringWrapper user = 4; + EpochMillis occurred_at = 5; + + // next-id: 40 + oneof details { + CreateClusterReplicaV1 create_cluster_replica_v1 = 6; + CreateClusterReplicaV2 create_cluster_replica_v2 = 33; + DropClusterReplicaV1 drop_cluster_replica_v1 = 7; + DropClusterReplicaV2 drop_cluster_replica_v2 = 34; + CreateSourceSinkV1 create_source_sink_v1 = 8; + CreateSourceSinkV2 create_source_sink_v2 = 9; + AlterSourceSinkV1 alter_source_sink_v1 = 10; + AlterSetClusterV1 alter_set_cluster_v1 = 25; + GrantRoleV1 grant_role_v1 = 11; + GrantRoleV2 grant_role_v2 = 12; + RevokeRoleV1 revoke_role_v1 = 13; + RevokeRoleV2 revoke_role_v2 = 14; + UpdatePrivilegeV1 update_privilege_v1 = 22; + AlterDefaultPrivilegeV1 alter_default_privilege_v1 = 23; + UpdateOwnerV1 update_owner_v1 = 24; + IdFullNameV1 id_full_name_v1 = 15; + RenameClusterV1 rename_cluster_v1 = 20; + RenameClusterReplicaV1 rename_cluster_replica_v1 = 21; + RenameItemV1 rename_item_v1 = 16; + IdNameV1 id_name_v1 = 17; + SchemaV1 schema_v1 = 18; + SchemaV2 schema_v2 = 19; + RenameSchemaV1 rename_schema_v1 = 27; + UpdateItemV1 update_item_v1 = 26; + CreateSourceSinkV3 create_source_sink_v3 = 29; + AlterRetainHistoryV1 alter_retain_history_v1 = 30; + ToNewIdV1 to_new_id_v1 = 31; + FromPreviousIdV1 from_previous_id_v1 = 32; + SetV1 set_v1 = 35; + Empty reset_all_v1 = 36; + RotateKeysV1 rotate_keys_v1 = 37; + CreateSourceSinkV4 create_source_sink_v4 = 38; + CreateIndexV1 create_index_v1 = 39; + CreateMaterializedViewV1 create_materialized_view_v1 = 40; + } +} + +// Wrapper of key-values used by the persist implementation to serialize the catalog. +message StateUpdateKind { + message AuditLog { + AuditLogKey key = 1; + } + + message Cluster { + ClusterKey key = 1; + ClusterValue value = 2; + } + + message ClusterReplica { + ClusterReplicaKey key = 1; + ClusterReplicaValue value = 2; + } + + message Comment { + CommentKey key = 1; + CommentValue value = 2; + } + + message Config { + ConfigKey key = 1; + ConfigValue value = 2; + } + + message Database { + DatabaseKey key = 1; + DatabaseValue value = 2; + } + + message DefaultPrivileges { + DefaultPrivilegesKey key = 1; + DefaultPrivilegesValue value = 2; + } + + message FenceToken { + uint64 deploy_generation = 1; + int64 epoch = 2; + } + + message Epoch { + int64 epoch = 1; + } + + message IdAlloc { + IdAllocKey key = 1; + IdAllocValue value = 2; + } + + message ClusterIntrospectionSourceIndex { + ClusterIntrospectionSourceIndexKey key = 1; + ClusterIntrospectionSourceIndexValue value = 2; + } + + message Item { + ItemKey key = 1; + ItemValue value = 2; + } + + message Role { + RoleKey key = 1; + RoleValue value = 2; + } + + message Schema { + SchemaKey key = 1; + SchemaValue value = 2; + } + + message Setting { + SettingKey key = 1; + SettingValue value = 2; + } + + message ServerConfiguration { + ServerConfigurationKey key = 1; + ServerConfigurationValue value = 2; + } + + message SourceReferences { + SourceReferencesKey key = 1; + SourceReferencesValue value = 2; + } + + message GidMapping { + GidMappingKey key = 1; + GidMappingValue value = 2; + } + + message SystemPrivileges { + SystemPrivilegesKey key = 1; + SystemPrivilegesValue value = 2; + } + + message StorageCollectionMetadata { + StorageCollectionMetadataKey key = 1; + StorageCollectionMetadataValue value = 2; + } + + message UnfinalizedShard { + UnfinalizedShardKey key = 1; + } + + message TxnWalShard { + TxnWalShardValue value = 1; + } + + reserved 15; + reserved "storage_usage"; + reserved 19; + reserved "timestamp"; + reserved 22; + reserved "persist_txn_shard"; + + oneof kind { + AuditLog audit_log = 1; + Cluster cluster = 2; + ClusterReplica cluster_replica = 3; + Comment comment = 4; + Config config = 5; + Database database = 6; + DefaultPrivileges default_privileges = 7; + Epoch epoch = 8; + IdAlloc id_alloc = 9; + ClusterIntrospectionSourceIndex cluster_introspection_source_index = 10; + Item item = 11; + Role role = 12; + Schema schema = 13; + Setting setting = 14; + ServerConfiguration server_configuration = 16; + GidMapping gid_mapping = 17; + SystemPrivileges system_privileges = 18; + StorageCollectionMetadata storage_collection_metadata = 20; + UnfinalizedShard unfinalized_shard = 21; + TxnWalShard txn_wal_shard = 23; + SourceReferences source_references = 24; + FenceToken fence_token = 25; + } +} diff --git a/src/catalog/src/durable/initialize.rs b/src/catalog/src/durable/initialize.rs index 9fe5a58fb23b4..3f84aa1c6d46a 100644 --- a/src/catalog/src/durable/initialize.rs +++ b/src/catalog/src/durable/initialize.rs @@ -362,6 +362,7 @@ pub(crate) async fn initialize( ObjectType::Database => mz_audit_log::ObjectType::Database, ObjectType::Schema => mz_audit_log::ObjectType::Schema, ObjectType::Func => mz_audit_log::ObjectType::Func, + ObjectType::ContinualTask => mz_audit_log::ObjectType::ContinualTask, }; audit_events.push(( mz_audit_log::EventType::Grant, diff --git a/src/catalog/src/durable/objects/serialization.rs b/src/catalog/src/durable/objects/serialization.rs index 49b4ae3c3e759..991810857e6e5 100644 --- a/src/catalog/src/durable/objects/serialization.rs +++ b/src/catalog/src/durable/objects/serialization.rs @@ -1111,6 +1111,7 @@ impl RustType for CatalogItemType { CatalogItemType::Func => proto::CatalogItemType::Func, CatalogItemType::Secret => proto::CatalogItemType::Secret, CatalogItemType::Connection => proto::CatalogItemType::Connection, + CatalogItemType::ContinualTask => proto::CatalogItemType::ContinualTask, } } @@ -1126,6 +1127,7 @@ impl RustType for CatalogItemType { proto::CatalogItemType::Func => CatalogItemType::Func, proto::CatalogItemType::Secret => CatalogItemType::Secret, proto::CatalogItemType::Connection => CatalogItemType::Connection, + proto::CatalogItemType::ContinualTask => CatalogItemType::ContinualTask, proto::CatalogItemType::Unknown => { return Err(TryFromProtoError::unknown_enum_variant("CatalogItemType")); } @@ -1152,6 +1154,7 @@ impl RustType for ObjectType { ObjectType::Database => proto::ObjectType::Database, ObjectType::Schema => proto::ObjectType::Schema, ObjectType::Func => proto::ObjectType::Func, + ObjectType::ContinualTask => proto::ObjectType::ContinualTask, } } @@ -1172,6 +1175,7 @@ impl RustType for ObjectType { proto::ObjectType::Database => Ok(ObjectType::Database), proto::ObjectType::Schema => Ok(ObjectType::Schema), proto::ObjectType::Func => Ok(ObjectType::Func), + proto::ObjectType::ContinualTask => Ok(ObjectType::ContinualTask), proto::ObjectType::Unknown => Err(TryFromProtoError::unknown_enum_variant( "ObjectType::Unknown", )), @@ -1345,6 +1349,9 @@ impl RustType for CommentObjectId { CommentObjectId::Database(database_id) => { proto::comment_key::Object::Database(database_id.into_proto()) } + CommentObjectId::ContinualTask(global_id) => { + proto::comment_key::Object::ContinualTask(global_id.into_proto()) + } CommentObjectId::Schema((database, schema)) => { proto::comment_key::Object::Schema(proto::ResolvedSchema { database: Some(database.into_proto()), @@ -1396,6 +1403,9 @@ impl RustType for CommentObjectId { proto::comment_key::Object::Secret(global_id) => { CommentObjectId::Secret(global_id.into_rust()?) } + proto::comment_key::Object::ContinualTask(global_id) => { + CommentObjectId::ContinualTask(global_id.into_rust()?) + } proto::comment_key::Object::Role(role_id) => { CommentObjectId::Role(role_id.into_rust()?) } @@ -1537,6 +1547,9 @@ impl RustType for mz_audit_log::ObjectTyp mz_audit_log::ObjectType::Connection => { proto::audit_log_event_v1::ObjectType::Connection } + mz_audit_log::ObjectType::ContinualTask => { + proto::audit_log_event_v1::ObjectType::ContinualTask + } mz_audit_log::ObjectType::Database => proto::audit_log_event_v1::ObjectType::Database, mz_audit_log::ObjectType::Func => proto::audit_log_event_v1::ObjectType::Func, mz_audit_log::ObjectType::Index => proto::audit_log_event_v1::ObjectType::Index, @@ -1564,6 +1577,9 @@ impl RustType for mz_audit_log::ObjectTyp proto::audit_log_event_v1::ObjectType::Connection => { Ok(mz_audit_log::ObjectType::Connection) } + proto::audit_log_event_v1::ObjectType::ContinualTask => { + Ok(mz_audit_log::ObjectType::ContinualTask) + } proto::audit_log_event_v1::ObjectType::Database => { Ok(mz_audit_log::ObjectType::Database) } diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index e58ce68b1504c..158acf590f452 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -515,6 +515,7 @@ pub enum CatalogItem { Func(Func), Secret(Secret), Connection(Connection), + ContinualTask(ContinualTask), } impl From for durable::Item { @@ -954,6 +955,17 @@ pub struct Connection { pub resolved_ids: ResolvedIds, } +#[derive(Debug, Clone, Serialize)] +pub struct ContinualTask { + pub create_sql: String, + pub raw_expr: Arc, + pub desc: RelationDesc, + pub resolved_ids: ResolvedIds, + pub cluster_id: ClusterId, + /// See the comment on [MaterializedView::initial_as_of]. + pub initial_as_of: Option>, +} + impl CatalogItem { /// Returns a string indicating the type of this catalog entry. pub fn typ(&self) -> mz_sql::catalog::CatalogItemType { @@ -969,15 +981,17 @@ impl CatalogItem { CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func, CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret, CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection, + CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask, } } /// Whether this item represents a storage collection. pub fn is_storage_collection(&self) -> bool { match self { - CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::MaterializedView(_) => { - true - } + CatalogItem::Table(_) + | CatalogItem::Source(_) + | CatalogItem::MaterializedView(_) + | CatalogItem::ContinualTask(_) => true, CatalogItem::Log(_) | CatalogItem::Sink(_) | CatalogItem::View(_) @@ -1004,6 +1018,7 @@ impl CatalogItem { CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)), CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)), CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed), + CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)), CatalogItem::Func(_) | CatalogItem::Index(_) | CatalogItem::Sink(_) @@ -1073,6 +1088,7 @@ impl CatalogItem { CatalogItem::MaterializedView(mview) => &mview.resolved_ids, CatalogItem::Secret(_) => &*EMPTY, CatalogItem::Connection(connection) => &connection.resolved_ids, + CatalogItem::ContinualTask(ct) => &ct.resolved_ids, } } @@ -1095,6 +1111,7 @@ impl CatalogItem { CatalogItem::Type(_) => {} CatalogItem::View(view) => uses.extend(view.raw_expr.depends_on()), CatalogItem::MaterializedView(mview) => uses.extend(mview.raw_expr.depends_on()), + CatalogItem::ContinualTask(ct) => uses.extend(ct.raw_expr.depends_on()), CatalogItem::Secret(_) => {} CatalogItem::Connection(_) => {} } @@ -1115,7 +1132,8 @@ impl CatalogItem { | CatalogItem::Secret(_) | CatalogItem::Type(_) | CatalogItem::Func(_) - | CatalogItem::Connection(_) => None, + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_) => None, } } @@ -1195,6 +1213,11 @@ impl CatalogItem { Ok(CatalogItem::Type(i)) } CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())), + CatalogItem::ContinualTask(i) => { + let mut i = i.clone(); + i.create_sql = do_rewrite(i.create_sql)?; + Ok(CatalogItem::ContinualTask(i)) + } } } @@ -1265,6 +1288,11 @@ impl CatalogItem { i.create_sql = do_rewrite(i.create_sql)?; Ok(CatalogItem::Connection(i)) } + CatalogItem::ContinualTask(i) => { + let mut i = i.clone(); + i.create_sql = do_rewrite(i.create_sql)?; + Ok(CatalogItem::ContinualTask(i)) + } } } @@ -1350,7 +1378,8 @@ impl CatalogItem { | CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) | CatalogItem::Index(Index { create_sql, .. }) | CatalogItem::Secret(Secret { create_sql, .. }) - | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql), + | CatalogItem::Connection(Connection { create_sql, .. }) + | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql), CatalogItem::Func(_) | CatalogItem::Log(_) => None, }; let Some(create_sql) = create_sql else { @@ -1385,7 +1414,8 @@ impl CatalogItem { | CatalogItem::Type(_) | CatalogItem::Func(_) | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => None, + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_) => None, } } @@ -1403,6 +1433,7 @@ impl CatalogItem { DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None, }, CatalogItem::Sink(sink) => Some(sink.cluster_id), + CatalogItem::ContinualTask(ct) => Some(ct.cluster_id), CatalogItem::Table(_) | CatalogItem::Log(_) | CatalogItem::View(_) @@ -1427,7 +1458,8 @@ impl CatalogItem { | CatalogItem::Type(_) | CatalogItem::Func(_) | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => None, + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_) => None, } } @@ -1448,7 +1480,8 @@ impl CatalogItem { | CatalogItem::Type(_) | CatalogItem::Func(_) | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => return None, + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_) => return None, }; Some(cw) } @@ -1465,7 +1498,8 @@ impl CatalogItem { CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::Index(_) - | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(), + | CatalogItem::MaterializedView(_) + | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(), CatalogItem::Log(_) | CatalogItem::View(_) | CatalogItem::Sink(_) @@ -1492,7 +1526,8 @@ impl CatalogItem { | CatalogItem::Type(_) | CatalogItem::Func(_) | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => false, + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_) => false, } } @@ -1527,6 +1562,7 @@ impl CatalogItem { CatalogItem::Secret(secret) => secret.create_sql.clone(), CatalogItem::Connection(connection) => connection.create_sql.clone(), CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"), + CatalogItem::ContinualTask(ct) => ct.create_sql.clone(), } } @@ -1553,6 +1589,7 @@ impl CatalogItem { CatalogItem::Secret(secret) => secret.create_sql, CatalogItem::Connection(connection) => connection.create_sql, CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"), + CatalogItem::ContinualTask(ct) => ct.create_sql, } } } @@ -1727,7 +1764,8 @@ impl CatalogEntry { | CatalogItem::Type(_) | CatalogItem::Func(_) | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => None, + | CatalogItem::Connection(_) + | CatalogItem::ContinualTask(_) => None, } } @@ -2421,6 +2459,7 @@ impl mz_sql::catalog::CatalogItem for CatalogEntry { CatalogItem::Connection(Connection { create_sql, .. }) => create_sql, CatalogItem::Func(_) => "", CatalogItem::Log(_) => "", + CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql, } } diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 3c914f6d5dca5..cb27c85a3252e 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -3800,6 +3800,7 @@ pub enum ObjectType { Schema, Func, Subsource, + ContinualTask, } impl ObjectType { @@ -3815,7 +3816,8 @@ impl ObjectType { | ObjectType::Secret | ObjectType::Connection | ObjectType::Func - | ObjectType::Subsource => true, + | ObjectType::Subsource + | ObjectType::ContinualTask => true, ObjectType::Database | ObjectType::Schema | ObjectType::Cluster @@ -3844,6 +3846,7 @@ impl AstDisplay for ObjectType { ObjectType::Schema => "SCHEMA", ObjectType::Func => "FUNCTION", ObjectType::Subsource => "SUBSOURCE", + ObjectType::ContinualTask => "CONTINUAL TASK", }) } } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 979f5f56d2a0c..e04dab0a3edf6 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -4359,7 +4359,8 @@ impl<'a> Parser<'a> { | ObjectType::Index | ObjectType::Type | ObjectType::Secret - | ObjectType::Connection => { + | ObjectType::Connection + | ObjectType::ContinualTask => { let names = self.parse_comma_separated(|parser| { Ok(UnresolvedObjectName::Item(parser.parse_item_name()?)) })?; @@ -4874,9 +4875,10 @@ impl<'a> Parser<'a> { ObjectType::Index => self.parse_alter_index(), ObjectType::Secret => self.parse_alter_secret(), ObjectType::Connection => self.parse_alter_connection(), - ObjectType::View | ObjectType::MaterializedView | ObjectType::Table => { - self.parse_alter_views(object_type) - } + ObjectType::View + | ObjectType::MaterializedView + | ObjectType::Table + | ObjectType::ContinualTask => self.parse_alter_views(object_type), ObjectType::Type => { let if_exists = self .parse_if_exists() @@ -6463,7 +6465,8 @@ impl<'a> Parser<'a> { | ObjectType::Type | ObjectType::Secret | ObjectType::Connection - | ObjectType::Func => UnresolvedObjectName::Item(self.parse_item_name()?), + | ObjectType::Func + | ObjectType::ContinualTask => UnresolvedObjectName::Item(self.parse_item_name()?), ObjectType::Role => UnresolvedObjectName::Role(self.parse_identifier()?), ObjectType::Cluster => UnresolvedObjectName::Cluster(self.parse_identifier()?), ObjectType::ClusterReplica => { @@ -7236,6 +7239,11 @@ impl<'a> Parser<'a> { let in_cluster = self.parse_optional_in_cluster()?; ShowObjectType::MaterializedView { in_cluster } } + ObjectType::ContinualTask => { + let in_cluster = self.parse_optional_in_cluster()?; + // TODO(ct): ShowObjectType::ContinualTask + ShowObjectType::MaterializedView { in_cluster } + } ObjectType::Index => { let on_object = if self.parse_one_of_keywords(&[ON]).is_some() { Some(self.parse_raw_name()?) @@ -8564,7 +8572,10 @@ impl<'a> Parser<'a> { object_type: ObjectType, ) -> Result { match object_type { - ObjectType::View | ObjectType::MaterializedView | ObjectType::Source => { + ObjectType::View + | ObjectType::MaterializedView + | ObjectType::Source + | ObjectType::ContinualTask => { parser_err!( self, self.peek_prev_pos(), diff --git a/src/sql/src/catalog.rs b/src/sql/src/catalog.rs index 556d88178a688..dcc0d3f271870 100644 --- a/src/sql/src/catalog.rs +++ b/src/sql/src/catalog.rs @@ -674,6 +674,8 @@ pub enum CatalogItemType { Secret, /// A connection. Connection, + /// A continual task. + ContinualTask, } impl CatalogItemType { @@ -707,6 +709,7 @@ impl CatalogItemType { CatalogItemType::Func => false, CatalogItemType::Secret => false, CatalogItemType::Connection => false, + CatalogItemType::ContinualTask => true, } } } @@ -724,6 +727,7 @@ impl fmt::Display for CatalogItemType { CatalogItemType::Func => f.write_str("func"), CatalogItemType::Secret => f.write_str("secret"), CatalogItemType::Connection => f.write_str("connection"), + CatalogItemType::ContinualTask => f.write_str("continual task"), } } } @@ -741,6 +745,7 @@ impl From for ObjectType { CatalogItemType::Func => ObjectType::Func, CatalogItemType::Secret => ObjectType::Secret, CatalogItemType::Connection => ObjectType::Connection, + CatalogItemType::ContinualTask => ObjectType::ContinualTask, } } } @@ -758,6 +763,7 @@ impl From for mz_audit_log::ObjectType { CatalogItemType::Func => mz_audit_log::ObjectType::Func, CatalogItemType::Secret => mz_audit_log::ObjectType::Secret, CatalogItemType::Connection => mz_audit_log::ObjectType::Connection, + CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask, } } } @@ -1320,6 +1326,7 @@ pub enum ObjectType { Database, Schema, Func, + ContinualTask, } impl ObjectType { @@ -1329,7 +1336,8 @@ impl ObjectType { ObjectType::Table | ObjectType::View | ObjectType::MaterializedView - | ObjectType::Source => true, + | ObjectType::Source + | ObjectType::ContinualTask => true, ObjectType::Sink | ObjectType::Index | ObjectType::Type @@ -1364,6 +1372,7 @@ impl From for ObjectType { mz_sql_parser::ast::ObjectType::Database => ObjectType::Database, mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema, mz_sql_parser::ast::ObjectType::Func => ObjectType::Func, + mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask, } } } @@ -1386,6 +1395,7 @@ impl From for ObjectType { CommentObjectId::Schema(_) => ObjectType::Schema, CommentObjectId::Cluster(_) => ObjectType::Cluster, CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica, + CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask, } } } @@ -1408,6 +1418,7 @@ impl Display for ObjectType { ObjectType::Database => "DATABASE", ObjectType::Schema => "SCHEMA", ObjectType::Func => "FUNCTION", + ObjectType::ContinualTask => "CONTINUAL TASK", }) } } diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 576cf40aeb6ea..0d07e4543f3e6 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -1125,7 +1125,8 @@ impl From for ObjectId { | CommentObjectId::Func(global_id) | CommentObjectId::Connection(global_id) | CommentObjectId::Type(global_id) - | CommentObjectId::Secret(global_id) => ObjectId::Item(global_id), + | CommentObjectId::Secret(global_id) + | CommentObjectId::ContinualTask(global_id) => ObjectId::Item(global_id), CommentObjectId::Role(id) => ObjectId::Role(id), CommentObjectId::Database(id) => ObjectId::Database(id), CommentObjectId::Schema(id) => ObjectId::Schema(id), @@ -1178,6 +1179,7 @@ pub enum CommentObjectId { Connection(GlobalId), Type(GlobalId), Secret(GlobalId), + ContinualTask(GlobalId), Role(RoleId), Database(DatabaseId), Schema((ResolvedDatabaseSpecifier, SchemaSpecifier)), diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 897069e61a332..fce723d743455 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -351,6 +351,7 @@ impl Plan { ObjectType::Database => "drop database", ObjectType::Schema => "drop schema", ObjectType::Func => "drop function", + ObjectType::ContinualTask => "drop continual task", }, Plan::DropOwned(_) => "drop owned", Plan::EmptyQuery => "do nothing", @@ -390,6 +391,7 @@ impl Plan { ObjectType::Database => "alter database", ObjectType::Schema => "alter schema", ObjectType::Func => "alter function", + ObjectType::ContinualTask => "alter continual task", }, Plan::AlterCluster(_) => "alter cluster", Plan::AlterClusterRename(_) => "alter cluster rename", @@ -424,6 +426,7 @@ impl Plan { ObjectType::Database => "alter database owner", ObjectType::Schema => "alter schema owner", ObjectType::Func => "alter function owner", + ObjectType::ContinualTask => "alter continual task owner", }, Plan::AlterTableAddColumn(_) => "alter table add column", Plan::Declare(_) => "declare", diff --git a/src/sql/src/plan/statement/acl.rs b/src/sql/src/plan/statement/acl.rs index 05ed13e906aba..f7030c8dee9ff 100644 --- a/src/sql/src/plan/statement/acl.rs +++ b/src/sql/src/plan/statement/acl.rs @@ -601,7 +601,10 @@ pub fn plan_alter_default_privileges( ) -> Result { let object_type: ObjectType = (*grant_or_revoke.object_type()).into(); match object_type { - ObjectType::View | ObjectType::MaterializedView | ObjectType::Source => sql_bail!( + ObjectType::View + | ObjectType::MaterializedView + | ObjectType::Source + | ObjectType::ContinualTask => sql_bail!( "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead" ), ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => { diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 2f92a306fb6c8..6b555baffc97a 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -4981,7 +4981,8 @@ fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> b | CatalogItemType::Sink | CatalogItemType::Type | CatalogItemType::Secret - | CatalogItemType::Connection => true, + | CatalogItemType::Connection + | CatalogItemType::ContinualTask => true, CatalogItemType::Index => false, }, } diff --git a/src/sql/src/plan/statement/show.rs b/src/sql/src/plan/statement/show.rs index 179b6299c478c..95c19baeb2b0d 100644 --- a/src/sql/src/plan/statement/show.rs +++ b/src/sql/src/plan/statement/show.rs @@ -614,7 +614,8 @@ pub fn show_columns<'a>( CatalogItemType::Source | CatalogItemType::Table | CatalogItemType::View - | CatalogItemType::MaterializedView => (), + | CatalogItemType::MaterializedView + | CatalogItemType::ContinualTask => (), ty @ CatalogItemType::Connection | ty @ CatalogItemType::Index | ty @ CatalogItemType::Func diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 4c008aa40549e..32758ba41d678 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -1582,7 +1582,9 @@ fn generate_read_privileges_inner( privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)) } match item.item_type() { - CatalogItemType::View | CatalogItemType::MaterializedView => { + CatalogItemType::View + | CatalogItemType::MaterializedView + | CatalogItemType::ContinualTask => { privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id)); views.push((item.references().0.clone().into_iter(), item.owner_id())); } @@ -1711,6 +1713,7 @@ pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode { SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE, SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE, SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE, + SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT, SystemObjectType::System => ALL_SYSTEM_PRIVILEGES, } } @@ -1728,7 +1731,8 @@ const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode { ObjectType::Table | ObjectType::View | ObjectType::MaterializedView - | ObjectType::Source => AclMode::SELECT, + | ObjectType::Source + | ObjectType::ContinualTask => AclMode::SELECT, ObjectType::Type | ObjectType::Schema => AclMode::USAGE, ObjectType::Sink | ObjectType::Index