From 5aa1c98ec14176c60b747989a16e20108e1e3b4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20E=2E=20G=C3=A9vay?= Date: Mon, 11 Nov 2024 11:08:05 +0100 Subject: [PATCH] Merge pull request #30408 from ggevay/refresh_compaction Fix Persist compaction for `REFRESH` MVs --- src/adapter/src/catalog/transact.rs | 10 +- src/adapter/src/coord/cluster_scheduling.rs | 164 ++- src/audit-log/src/lib.rs | 56 + src/buf.yaml | 2 + src/catalog/build.rs | 1 + src/catalog/protos/hashes.json | 6 +- src/catalog/protos/objects.proto | 38 + src/catalog/protos/objects_v70.proto | 1061 +++++++++++++++++ .../src/durable/objects/serialization.rs | 160 ++- src/catalog/src/durable/upgrade.rs | 6 +- .../durable/upgrade/snapshots/objects_v70.txt | 0 src/catalog/src/durable/upgrade/v69_to_v70.rs | 19 + src/repr/src/refresh_schedule.rs | 7 + src/sql/src/session/vars.rs | 5 + src/sql/src/session/vars/definitions.rs | 13 + test/sqllogictest/materialized_views.slt | 110 +- .../materialized-view-refresh-options.td | 1 + .../materialized-view-refresh-options.td | 1 + 18 files changed, 1586 insertions(+), 74 deletions(-) create mode 100644 src/catalog/protos/objects_v70.proto create mode 100644 src/catalog/src/durable/upgrade/snapshots/objects_v70.txt create mode 100644 src/catalog/src/durable/upgrade/v69_to_v70.rs diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index debd7312506b0..2c6479331b7d9 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -19,7 +19,7 @@ use mz_adapter_types::dyncfgs::{ }; use mz_audit_log::{ CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1, - ObjectType, SchedulingDecisionsWithReasonsV1, VersionedEvent, VersionedStorageUsage, + ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage, }; use mz_catalog::builtin::BuiltinLog; use mz_catalog::durable::{NetworkPolicy, Transaction}; @@ -294,7 +294,7 @@ impl ReplicaCreateDropReason { self, ) -> ( CreateOrDropClusterReplicaReasonV1, - Option, + Option, ) { let (reason, scheduling_policies) = match self { ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None), @@ -952,8 +952,8 @@ impl Catalog { }) = &config.location { let (reason, scheduling_policies) = reason.into_audit_log(); - let details = EventDetails::CreateClusterReplicaV2( - mz_audit_log::CreateClusterReplicaV2 { + let details = EventDetails::CreateClusterReplicaV3( + mz_audit_log::CreateClusterReplicaV3 { cluster_id: cluster_id.to_string(), cluster_name: cluster.name.clone(), replica_id: Some(id.to_string()), @@ -1443,7 +1443,7 @@ impl Catalog { let (reason, scheduling_policies) = reason.into_audit_log(); let details = - EventDetails::DropClusterReplicaV2(mz_audit_log::DropClusterReplicaV2 { + EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 { cluster_id: cluster_id.to_string(), cluster_name: cluster.name.clone(), replica_id: Some(replica_id.to_string()), diff --git a/src/adapter/src/coord/cluster_scheduling.rs b/src/adapter/src/coord/cluster_scheduling.rs index a32a8c6b42917..11db8d2bfd167 100644 --- a/src/adapter/src/coord/cluster_scheduling.rs +++ b/src/adapter/src/coord/cluster_scheduling.rs @@ -9,13 +9,13 @@ use crate::coord::{Coordinator, Message}; use itertools::Itertools; -use mz_audit_log::SchedulingDecisionsWithReasonsV1; +use mz_audit_log::SchedulingDecisionsWithReasonsV2; use mz_catalog::memory::objects::{CatalogItem, ClusterVariant, ClusterVariantManaged}; use mz_controller_types::ClusterId; use mz_ore::collections::CollectionExt; -use mz_ore::soft_panic_or_log; +use mz_ore::{soft_assert_or_log, soft_panic_or_log}; use mz_repr::adt::interval::Interval; -use mz_repr::GlobalId; +use mz_repr::{GlobalId, TimestampManipulation}; use mz_sql::catalog::CatalogCluster; use mz_sql::plan::{AlterClusterPlanStrategy, ClusterSchedule}; use std::time::{Duration, Instant}; @@ -48,26 +48,36 @@ pub struct RefreshDecision { /// Whether the ON REFRESH policy wants a certain cluster to be On. cluster_on: bool, /// Objects that currently need a refresh on the cluster (taking into account the rehydration - /// time estimate). + /// time estimate), and therefore should keep the cluster On. objects_needing_refresh: Vec, + /// Objects for which we estimate that they currently need Persist compaction, and therefore + /// should keep the cluster On. + objects_needing_compaction: Vec, /// The HYDRATION TIME ESTIMATE setting of the cluster. hydration_time_estimate: Duration, } impl SchedulingDecision { - pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV1 + pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV2 where I: IntoIterator, { - SchedulingDecisionsWithReasonsV1 { + SchedulingDecisionsWithReasonsV2 { on_refresh: reasons .into_iter() .filter_map(|r| match r { SchedulingDecision::Refresh(RefreshDecision { cluster_on, - objects_needing_refresh: mvs_needing_refresh, + objects_needing_refresh, + objects_needing_compaction, hydration_time_estimate, }) => { + soft_assert_or_log!( + !cluster_on + || !objects_needing_refresh.is_empty() + || !objects_needing_compaction.is_empty(), + "`cluster_on = true` should have an explanation" + ); let mut hydration_time_estimate_str = String::new(); mz_repr::strconv::format_interval( &mut hydration_time_estimate_str, @@ -75,9 +85,13 @@ impl SchedulingDecision { "planning ensured that this is convertible back to Interval", ), ); - Some(mz_audit_log::RefreshDecisionWithReasonV1 { + Some(mz_audit_log::RefreshDecisionWithReasonV2 { decision: (*cluster_on).into(), - objects_needing_refresh: mvs_needing_refresh + objects_needing_refresh: objects_needing_refresh + .iter() + .map(|id| id.to_string()) + .collect(), + objects_needing_compaction: objects_needing_compaction .iter() .map(|id| id.to_string()) .collect(), @@ -105,8 +119,13 @@ impl Coordinator { fn check_refresh_policy(&self) { let start_time = Instant::now(); - // Collect the smallest REFRESH MV write frontiers per cluster. - let mut refresh_mv_write_frontiers = Vec::new(); + // Collect information about REFRESH MVs: + // - cluster + // - hydration_time_estimate of the cluster + // - MV's id + // - MV's write frontier + // - MV's refresh schedule + let mut refresh_mv_infos = Vec::new(); for cluster in self.catalog().clusters() { if let ClusterVariant::Managed(ref config) = cluster.config.variant { match config.schedule { @@ -123,23 +142,21 @@ impl Coordinator { if let CatalogItem::MaterializedView(mv) = self.catalog().get_entry(id).item() { - if mv.refresh_schedule.is_some() { + mv.refresh_schedule.clone().map(|refresh_schedule| { let (_since, write_frontier) = self .controller .storage - .collection_frontiers(*id) + .collection_frontiers(*id) .expect("the storage controller should know about MVs that exist in the catalog"); - Some((*id, write_frontier)) - } else { - None - } + (*id, write_frontier, refresh_schedule) + }) } else { None } }) .collect_vec(); - debug!(%cluster.id, ?refresh_mv_write_frontiers, "check_refresh_policy"); - refresh_mv_write_frontiers.push((cluster.id, hydration_time_estimate, mvs)); + debug!(%cluster.id, ?refresh_mv_infos, "check_refresh_policy"); + refresh_mv_infos.push((cluster.id, hydration_time_estimate, mvs)); } } } @@ -152,42 +169,87 @@ impl Coordinator { let internal_cmd_tx = self.internal_cmd_tx.clone(); let check_scheduling_policies_seconds_cloned = self.metrics.check_scheduling_policies_seconds.clone(); + let compaction_estimate = self + .catalog() + .system_config() + .cluster_refresh_mv_compaction_estimate() + .try_into() + .expect("should be configured to a reasonable value"); mz_ore::task::spawn(|| "refresh policy get ts and make decisions", async move { let task_start_time = Instant::now(); let local_read_ts = ts_oracle.read_ts().await; - debug!(%local_read_ts, ?refresh_mv_write_frontiers, "check_refresh_policy background task"); - let decisions = refresh_mv_write_frontiers + debug!(%local_read_ts, ?refresh_mv_infos, "check_refresh_policy background task"); + let decisions = refresh_mv_infos .into_iter() - .map( - |(cluster_id, hydration_time_estimate, refresh_mv_write_frontiers)| { - // We are just checking that - // write_frontier < local_read_ts + hydration_time_estimate - let hydration_estimate = &hydration_time_estimate - .try_into() - .expect("checked during planning"); - let local_read_ts_adjusted = - local_read_ts.step_forward_by(hydration_estimate); - let mvs_needing_refresh = refresh_mv_write_frontiers - .into_iter() - .filter_map(|(id, frontier)| { - if frontier.less_than(&local_read_ts_adjusted) { - Some(id) - } else { - None - } - }) - .collect_vec(); - let cluster_on = !mvs_needing_refresh.is_empty(); - ( - cluster_id, - SchedulingDecision::Refresh(RefreshDecision { - cluster_on, - objects_needing_refresh: mvs_needing_refresh, - hydration_time_estimate, - }), - ) - }, - ) + .map(|(cluster_id, hydration_time_estimate, refresh_mv_info)| { + // 1. check that + // write_frontier < local_read_ts + hydration_time_estimate + let hydration_estimate = &hydration_time_estimate + .try_into() + .expect("checked during planning"); + let local_read_ts_adjusted = local_read_ts.step_forward_by(hydration_estimate); + let mvs_needing_refresh = refresh_mv_info + .iter() + .cloned() + .filter_map(|(id, frontier, _refresh_schedule)| { + if frontier.less_than(&local_read_ts_adjusted) { + Some(id) + } else { + None + } + }) + .collect_vec(); + + // 2. check that + // prev_refresh + compaction_estimate > local_read_ts + let mvs_needing_compaction = refresh_mv_info + .into_iter() + .filter_map(|(id, frontier, refresh_schedule)| { + let frontier = frontier.as_option(); + // `prev_refresh` will be None in two cases: + // 1. When there is no previous refresh, because we haven't yet had + // the first refresh. In this case, there is no need to schedule + // time now for compaction. + // 2. In the niche case where a `REFRESH EVERY` MV's write frontier + // is empty. In this case, it's not impossible that there would be a + // need for compaction. But I can't see any easy way to correctly + // handle this case, because we don't have any info handy about when + // the last refresh happened in wall clock time, because the + // frontiers have no relation to wall clock time. So, we'll not + // schedule any compaction time. + // (Note that `REFRESH AT` MVs with empty frontiers, which is a more + // common case, are fine, because `last_refresh` will return + // Some(...) for them.) + let prev_refresh = match frontier { + Some(frontier) => frontier.round_down_minus_1(&refresh_schedule), + None => refresh_schedule.last_refresh(), + }; + prev_refresh + .map(|prev_refresh| { + if prev_refresh.step_forward_by(&compaction_estimate) + > local_read_ts + { + Some(id) + } else { + None + } + }) + .flatten() + }) + .collect_vec(); + + let cluster_on = + !mvs_needing_refresh.is_empty() || !mvs_needing_compaction.is_empty(); + ( + cluster_id, + SchedulingDecision::Refresh(RefreshDecision { + cluster_on, + objects_needing_refresh: mvs_needing_refresh, + objects_needing_compaction: mvs_needing_compaction, + hydration_time_estimate, + }), + ) + }) .collect(); if let Err(e) = internal_cmd_tx.send(Message::SchedulingDecisions(vec![( REFRESH_POLICY_NAME, diff --git a/src/audit-log/src/lib.rs b/src/audit-log/src/lib.rs index c9262fa3c150d..2013f4184ad68 100644 --- a/src/audit-log/src/lib.rs +++ b/src/audit-log/src/lib.rs @@ -157,9 +157,11 @@ pub enum EventDetails { #[serde(rename = "CreateComputeReplicaV1")] // historical name CreateClusterReplicaV1(CreateClusterReplicaV1), CreateClusterReplicaV2(CreateClusterReplicaV2), + CreateClusterReplicaV3(CreateClusterReplicaV3), #[serde(rename = "DropComputeReplicaV1")] // historical name DropClusterReplicaV1(DropClusterReplicaV1), DropClusterReplicaV2(DropClusterReplicaV2), + DropClusterReplicaV3(DropClusterReplicaV3), CreateSourceSinkV1(CreateSourceSinkV1), CreateSourceSinkV2(CreateSourceSinkV2), CreateSourceSinkV3(CreateSourceSinkV3), @@ -267,6 +269,17 @@ pub struct DropClusterReplicaV2 { pub scheduling_policies: Option, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] +pub struct DropClusterReplicaV3 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub reason: CreateOrDropClusterReplicaReasonV1, + #[serde(skip_serializing_if = "Option::is_none")] + pub scheduling_policies: Option, +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] pub struct CreateClusterReplicaV1 { pub cluster_id: String, @@ -296,6 +309,21 @@ pub struct CreateClusterReplicaV2 { pub scheduling_policies: Option, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] +pub struct CreateClusterReplicaV3 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub logical_size: String, + pub disk: bool, + pub billed_as: Option, + pub internal: bool, + pub reason: CreateOrDropClusterReplicaReasonV1, + #[serde(skip_serializing_if = "Option::is_none")] + pub scheduling_policies: Option, +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] #[serde(rename_all = "kebab-case")] pub enum CreateOrDropClusterReplicaReasonV1 { @@ -313,6 +341,15 @@ pub struct SchedulingDecisionsWithReasonsV1 { pub on_refresh: RefreshDecisionWithReasonV1, } +/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing +/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there +/// can be settings of the policy as well as other information about the state of the system.) +#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] +pub struct SchedulingDecisionsWithReasonsV2 { + /// The reason for the refresh policy for wanting to turn a cluster On or Off. + pub on_refresh: RefreshDecisionWithReasonV2, +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] pub struct RefreshDecisionWithReasonV1 { pub decision: SchedulingDecisionV1, @@ -323,6 +360,19 @@ pub struct RefreshDecisionWithReasonV1 { pub hydration_time_estimate: String, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] +pub struct RefreshDecisionWithReasonV2 { + pub decision: SchedulingDecisionV1, + /// Objects that currently need a refresh on the cluster (taking into account the rehydration + /// time estimate), and therefore should keep the cluster On. + pub objects_needing_refresh: Vec, + /// Objects for which we estimate that they currently need Persist compaction, and therefore + /// should keep the cluster On. + pub objects_needing_compaction: Vec, + /// The HYDRATION TIME ESTIMATE setting of the cluster. + pub hydration_time_estimate: String, +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)] #[serde(rename_all = "kebab-case")] pub enum SchedulingDecisionV1 { @@ -520,12 +570,18 @@ impl EventDetails { EventDetails::CreateClusterReplicaV2(v) => { serde_json::to_value(v).expect("must serialize") } + EventDetails::CreateClusterReplicaV3(v) => { + serde_json::to_value(v).expect("must serialize") + } EventDetails::DropClusterReplicaV1(v) => { serde_json::to_value(v).expect("must serialize") } EventDetails::DropClusterReplicaV2(v) => { serde_json::to_value(v).expect("must serialize") } + EventDetails::DropClusterReplicaV3(v) => { + serde_json::to_value(v).expect("must serialize") + } EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"), EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"), EventDetails::RenameClusterReplicaV1(v) => { diff --git a/src/buf.yaml b/src/buf.yaml index d96f783a193df..d9c627a68001c 100644 --- a/src/buf.yaml +++ b/src/buf.yaml @@ -24,6 +24,8 @@ breaking: # reason: does currently not require backward-compatibility - catalog/protos/objects_v69.proto # reason: does currently not require backward-compatibility + - catalog/protos/objects_v70.proto + # reason: does currently not require backward-compatibility - cluster-client/src/client.proto # reason: does currently not require backward-compatibility - compute-client/src/logging.proto diff --git a/src/catalog/build.rs b/src/catalog/build.rs index 8cab970339015..16cce303e0dec 100644 --- a/src/catalog/build.rs +++ b/src/catalog/build.rs @@ -172,6 +172,7 @@ fn main() -> anyhow::Result<()> { .enum_attribute("ClusterSchedule.value", ATTR) .enum_attribute("CreateOrDropClusterReplicaReasonV1.reason", ATTR) .enum_attribute("RefreshDecisionWithReasonV1.decision", ATTR) + .enum_attribute("RefreshDecisionWithReasonV2.decision", ATTR) // Serialize/deserialize the top-level enum in the persist-backed // catalog as "internally tagged"[^1] to set up persist pushdown // statistics for success. diff --git a/src/catalog/protos/hashes.json b/src/catalog/protos/hashes.json index 5a17d54058293..b1927137ccf18 100644 --- a/src/catalog/protos/hashes.json +++ b/src/catalog/protos/hashes.json @@ -1,7 +1,7 @@ [ { "name": "objects.proto", - "md5": "fcf9955ad176500fb86cd43d65dc77f6" + "md5": "2d781c72c4a56b13dfb1b4215f3614f0" }, { "name": "objects_v67.proto", @@ -14,5 +14,9 @@ { "name": "objects_v69.proto", "md5": "638e206754da134b10a0712d63bdd8dc" + }, + { + "name": "objects_v70.proto", + "md5": "a43660c9160c900f00d62d3031e2fad0" } ] diff --git a/src/catalog/protos/objects.proto b/src/catalog/protos/objects.proto index cb70286eae740..42a9cfcbdbf07 100644 --- a/src/catalog/protos/objects.proto +++ b/src/catalog/protos/objects.proto @@ -646,6 +646,19 @@ message AuditLogEventV1 { SchedulingDecisionsWithReasonsV1 scheduling_policies = 10; } + message CreateClusterReplicaV3 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + string logical_size = 5; + bool disk = 6; + optional string billed_as = 7; + bool internal = 8; + CreateOrDropClusterReplicaReasonV1 reason = 9; + SchedulingDecisionsWithReasonsV2 scheduling_policies = 10; + } + message DropClusterReplicaV1 { string cluster_id = 1; string cluster_name = 2; @@ -662,6 +675,15 @@ message AuditLogEventV1 { SchedulingDecisionsWithReasonsV1 scheduling_policies = 6; } + message DropClusterReplicaV3 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + CreateOrDropClusterReplicaReasonV1 reason = 5; + SchedulingDecisionsWithReasonsV2 scheduling_policies = 6; + } + message CreateOrDropClusterReplicaReasonV1 { oneof reason { Empty Manual = 1; @@ -674,6 +696,10 @@ message AuditLogEventV1 { RefreshDecisionWithReasonV1 on_refresh = 1; } + message SchedulingDecisionsWithReasonsV2 { + RefreshDecisionWithReasonV2 on_refresh = 1; + } + message RefreshDecisionWithReasonV1 { oneof decision { Empty On = 1; @@ -683,6 +709,16 @@ message AuditLogEventV1 { string rehydration_time_estimate = 4; } + message RefreshDecisionWithReasonV2 { + oneof decision { + Empty On = 1; + Empty Off = 2; + } + repeated string objects_needing_refresh = 3; + repeated string objects_needing_compaction = 5; + string rehydration_time_estimate = 4; + } + message CreateSourceSinkV1 { string id = 1; FullNameV1 name = 2; @@ -841,8 +877,10 @@ message AuditLogEventV1 { oneof details { CreateClusterReplicaV1 create_cluster_replica_v1 = 6; CreateClusterReplicaV2 create_cluster_replica_v2 = 33; + CreateClusterReplicaV3 create_cluster_replica_v3 = 41; DropClusterReplicaV1 drop_cluster_replica_v1 = 7; DropClusterReplicaV2 drop_cluster_replica_v2 = 34; + DropClusterReplicaV3 drop_cluster_replica_v3 = 42; CreateSourceSinkV1 create_source_sink_v1 = 8; CreateSourceSinkV2 create_source_sink_v2 = 9; AlterSourceSinkV1 alter_source_sink_v1 = 10; diff --git a/src/catalog/protos/objects_v70.proto b/src/catalog/protos/objects_v70.proto new file mode 100644 index 0000000000000..da5cdba889a6b --- /dev/null +++ b/src/catalog/protos/objects_v70.proto @@ -0,0 +1,1061 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// This protobuf file defines the types we store in the Stash. +// +// Before and after modifying this file, make sure you have a snapshot of the before version, +// e.g. a copy of this file named 'objects_v{CATALOG_VERSION}.proto', and a snapshot of the file +// after your modifications, e.g. 'objects_v{CATALOG_VERSION + 1}.proto'. Then you can write a +// migration using these two files, and no matter how the types change in the future, we'll always +// have these snapshots to facilitate the migration. + +// buf breaking: ignore (does currently not require backward-compatibility) + +syntax = "proto3"; + +package objects_v70; + +message ConfigKey { + string key = 1; +} + +message ConfigValue { + uint64 value = 1; +} + +message SettingKey { + string name = 1; +} + +message SettingValue { + string value = 1; +} + +message IdAllocKey { + string name = 1; +} + +message IdAllocValue { + uint64 next_id = 1; +} + +message GidMappingKey { + string schema_name = 1; + CatalogItemType object_type = 2; + string object_name = 3; +} + +message GidMappingValue { + // TODO(parkmycar): Ideally this is a SystemCatalogItemId but making this change panics 0dt + // upgrades if there were new builtin objects added since the older version of Materialize + // doesn't know how to read the new SystemCatalogItemId type. + uint64 id = 1; + string fingerprint = 2; + SystemGlobalId global_id = 3; +} + +message ClusterKey { + ClusterId id = 1; +} + +message ClusterValue { + reserved 2; + string name = 1; + RoleId owner_id = 3; + repeated MzAclItem privileges = 4; + ClusterConfig config = 5; +} + +message ClusterIntrospectionSourceIndexKey { + ClusterId cluster_id = 1; + string name = 2; +} + +message ClusterIntrospectionSourceIndexValue { + // TODO(parkmycar): Ideally this is a SystemCatalogItemId but making this change panics 0dt + // upgrades if there were new builtin objects added since the older version of Materialize + // doesn't know how to read the new SystemCatalogItemId type. + uint64 index_id = 1; + uint32 oid = 2; + SystemGlobalId global_id = 3; +} + +message ClusterReplicaKey { + ReplicaId id = 1; +} + +message ClusterReplicaValue { + ClusterId cluster_id = 1; + string name = 2; + ReplicaConfig config = 3; + RoleId owner_id = 4; +} + +message DatabaseKey { + DatabaseId id = 1; +} + +message DatabaseValue { + string name = 1; + RoleId owner_id = 2; + repeated MzAclItem privileges = 3; + uint32 oid = 4; +} + +message SchemaKey { + SchemaId id = 1; +} + +message SchemaValue { + DatabaseId database_id = 1; + string name = 2; + RoleId owner_id = 3; + repeated MzAclItem privileges = 4; + uint32 oid = 5; +} + +message ItemKey { + CatalogItemId gid = 1; +} + +message ItemValue { + SchemaId schema_id = 1; + string name = 2; + CatalogItem definition = 3; + RoleId owner_id = 4; + repeated MzAclItem privileges = 5; + uint32 oid = 6; + GlobalId global_id = 7; + repeated ItemVersion extra_versions = 8; +} + +message ItemVersion { + GlobalId global_id = 1; + Version version = 2; +} + +message RoleKey { + RoleId id = 1; +} + +message RoleValue { + string name = 1; + RoleAttributes attributes = 2; + RoleMembership membership = 3; + RoleVars vars = 4; + uint32 oid = 5; +} + +message NetworkPolicyKey { + NetworkPolicyId id = 1; +} + +message NetworkPolicyValue { + string name = 1; + repeated NetworkPolicyRule rules = 2; + RoleId owner_id = 3; + repeated MzAclItem privileges = 4; + uint32 oid = 5; +} + +message ServerConfigurationKey { + string name = 1; +} + +message ServerConfigurationValue { + string value = 1; +} + +message AuditLogKey { + oneof event { + AuditLogEventV1 v1 = 1; + } +} + +message CommentKey { + oneof object { + CatalogItemId table = 1; + CatalogItemId view = 2; + CatalogItemId materialized_view = 4; + CatalogItemId source = 5; + CatalogItemId sink = 6; + CatalogItemId index = 7; + CatalogItemId func = 8; + CatalogItemId connection = 9; + CatalogItemId type = 10; + CatalogItemId secret = 11; + CatalogItemId continual_task = 17; + RoleId role = 12; + DatabaseId database = 13; + ResolvedSchema schema = 14; + ClusterId cluster = 15; + ClusterReplicaId cluster_replica = 16; + NetworkPolicyId network_policy = 18; + } + oneof sub_component { + uint64 column_pos = 3; + } +} + +message CommentValue { + string comment = 1; +} + +message SourceReferencesKey { + CatalogItemId source = 1; +} + +message SourceReferencesValue { + repeated SourceReference references = 1; + EpochMillis updated_at = 2; +} + +message SourceReference { + string name = 1; + optional string namespace = 2; + repeated string columns = 3; +} + +message StorageCollectionMetadataKey { + GlobalId id = 1; +} + +// This value is stored transparently, however, it should only ever be +// manipulated by the storage controller. +message StorageCollectionMetadataValue { + string shard = 1; +} + +// This value is stored transparently, however, it should only ever be +// manipulated by the storage controller. +message UnfinalizedShardKey { + string shard = 1; +} + +// This value is stored transparently, however, it should only ever be +// manipulated by the storage controller. +message TxnWalShardValue { + string shard = 1; +} + +// ---- Common Types +// +// Note: Normally types like this would go in some sort of `common.proto` file, but we want to keep +// our proto definitions in a single file to make snapshotting easier, hence them living here. + +message Empty { + /* purposefully empty */ +} + +// In protobuf a "None" string is the same thing as an empty string. To get the same semantics of +// an `Option` from Rust, we need to wrap a string in a message. +message StringWrapper { + string inner = 1; +} + +message Duration { + uint64 secs = 1; + uint32 nanos = 2; +} + +message EpochMillis { + uint64 millis = 1; +} + +// Opaque timestamp type that is specific to Materialize. +message Timestamp { + uint64 internal = 1; +} + +message Version { + uint64 value = 2; +} + +enum CatalogItemType { + CATALOG_ITEM_TYPE_UNKNOWN = 0; + CATALOG_ITEM_TYPE_TABLE = 1; + CATALOG_ITEM_TYPE_SOURCE = 2; + CATALOG_ITEM_TYPE_SINK = 3; + CATALOG_ITEM_TYPE_VIEW = 4; + CATALOG_ITEM_TYPE_MATERIALIZED_VIEW = 5; + CATALOG_ITEM_TYPE_INDEX = 6; + CATALOG_ITEM_TYPE_TYPE = 7; + CATALOG_ITEM_TYPE_FUNC = 8; + CATALOG_ITEM_TYPE_SECRET = 9; + CATALOG_ITEM_TYPE_CONNECTION = 10; + CATALOG_ITEM_TYPE_CONTINUAL_TASK = 11; +} + +message CatalogItem { + message V1 { + string create_sql = 1; + } + + oneof value { + V1 v1 = 1; + } +} + +message CatalogItemId { + oneof value { + uint64 system = 1; + uint64 user = 2; + uint64 transient = 3; + } +} + +/// A newtype wrapper for a `CatalogItemId` that is always in the "system" namespace. +message SystemCatalogItemId { + uint64 value = 1; +} + +message GlobalId { + oneof value { + uint64 system = 1; + uint64 user = 2; + uint64 transient = 3; + Empty explain = 4; + } +} + +/// A newtype wrapper for a `GlobalId` that is always in the "system" namespace. +message SystemGlobalId { + uint64 value = 1; +} + +message ClusterId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message DatabaseId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message ResolvedDatabaseSpecifier { + oneof spec { + Empty ambient = 1; + DatabaseId id = 2; + } +} + +message SchemaId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message SchemaSpecifier { + oneof spec { + Empty temporary = 1; + SchemaId id = 2; + } +} + +message ResolvedSchema { + ResolvedDatabaseSpecifier database = 1; + SchemaSpecifier schema = 2; +} + +message ReplicaId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message ClusterReplicaId { + ClusterId cluster_id = 1; + ReplicaId replica_id = 2; +} + +message NetworkPolicyId { + oneof value { + uint64 system = 1; + uint64 user = 2; + } +} + +message ReplicaLogging { + bool log_logging = 1; + Duration interval = 2; +} + +message OptimizerFeatureOverride { + string name = 1; + string value = 2; +} + +message ClusterScheduleRefreshOptions { + Duration rehydration_time_estimate = 1; +} + +message ClusterSchedule { + oneof value { + Empty manual = 1; + ClusterScheduleRefreshOptions refresh = 2; + } +} + +message ClusterConfig { + message ManagedCluster { + string size = 1; + uint32 replication_factor = 2; + repeated string availability_zones = 3; + ReplicaLogging logging = 4; + bool disk = 6; + repeated OptimizerFeatureOverride optimizer_feature_overrides = 7; + ClusterSchedule schedule = 8; + } + + oneof variant { + Empty unmanaged = 1; + ManagedCluster managed = 2; + } + optional string workload_class = 3; +} + +message ReplicaConfig { + message UnmanagedLocation { + repeated string storagectl_addrs = 1; + repeated string storage_addrs = 2; + repeated string computectl_addrs = 3; + repeated string compute_addrs = 4; + uint64 workers = 5; + } + + message ManagedLocation { + string size = 1; + optional string availability_zone = 2; + bool disk = 4; + bool internal = 5; + optional string billed_as = 6; + bool pending = 7; + } + + oneof location { + UnmanagedLocation unmanaged = 1; + ManagedLocation managed = 2; + } + ReplicaLogging logging = 3; +} + +message RoleId { + oneof value { + uint64 system = 1; + uint64 user = 2; + Empty public = 3; + uint64 predefined = 4; + } +} + +message RoleAttributes { + bool inherit = 1; +} + +message RoleMembership { + message Entry { + RoleId key = 1; + RoleId value = 2; + } + + repeated Entry map = 1; +} + +message RoleVars { + message SqlSet { + repeated string entries = 1; + } + + message Entry { + string key = 1; + oneof val { + string flat = 2; + SqlSet sql_set = 3; + } + } + + repeated Entry entries = 1; +} + +message NetworkPolicyRule { + string name = 1; + oneof action { + Empty allow = 2; + } + oneof direction { + Empty ingress = 3; + } + string address = 4; +} + +message AclMode { + // A bit flag representing all the privileges that can be granted to a role. + uint64 bitflags = 1; +} + +message MzAclItem { + RoleId grantee = 1; + RoleId grantor = 2; + AclMode acl_mode = 3; +} + +enum ObjectType { + OBJECT_TYPE_UNKNOWN = 0; + OBJECT_TYPE_TABLE = 1; + OBJECT_TYPE_VIEW = 2; + OBJECT_TYPE_MATERIALIZED_VIEW = 3; + OBJECT_TYPE_SOURCE = 4; + OBJECT_TYPE_SINK = 5; + OBJECT_TYPE_INDEX = 6; + OBJECT_TYPE_TYPE = 7; + OBJECT_TYPE_ROLE = 8; + OBJECT_TYPE_CLUSTER = 9; + OBJECT_TYPE_CLUSTER_REPLICA = 10; + OBJECT_TYPE_SECRET = 11; + OBJECT_TYPE_CONNECTION = 12; + OBJECT_TYPE_DATABASE = 13; + OBJECT_TYPE_SCHEMA = 14; + OBJECT_TYPE_FUNC = 15; + OBJECT_TYPE_CONTINUAL_TASK = 16; + OBJECT_TYPE_NETWORK_POLICY = 17; +} + +message DefaultPrivilegesKey { + RoleId role_id = 1; + DatabaseId database_id = 2; + SchemaId schema_id = 3; + ObjectType object_type = 4; + RoleId grantee = 5; +} + +message DefaultPrivilegesValue { + AclMode privileges = 1; +} + +message SystemPrivilegesKey { + RoleId grantee = 1; + RoleId grantor = 2; +} + +message SystemPrivilegesValue { + AclMode acl_mode = 1; +} + +message AuditLogEventV1 { + enum EventType { + EVENT_TYPE_UNKNOWN = 0; + EVENT_TYPE_CREATE = 1; + EVENT_TYPE_DROP = 2; + EVENT_TYPE_ALTER = 3; + EVENT_TYPE_GRANT = 4; + EVENT_TYPE_REVOKE = 5; + EVENT_TYPE_COMMENT = 6; + } + + enum ObjectType { + OBJECT_TYPE_UNKNOWN = 0; + OBJECT_TYPE_CLUSTER = 1; + OBJECT_TYPE_CLUSTER_REPLICA = 2; + OBJECT_TYPE_CONNECTION = 3; + OBJECT_TYPE_DATABASE = 4; + OBJECT_TYPE_FUNC = 5; + OBJECT_TYPE_INDEX = 6; + OBJECT_TYPE_MATERIALIZED_VIEW = 7; + OBJECT_TYPE_ROLE = 8; + OBJECT_TYPE_SECRET = 9; + OBJECT_TYPE_SCHEMA = 10; + OBJECT_TYPE_SINK = 11; + OBJECT_TYPE_SOURCE = 12; + OBJECT_TYPE_TABLE = 13; + OBJECT_TYPE_TYPE = 14; + OBJECT_TYPE_VIEW = 15; + OBJECT_TYPE_SYSTEM = 16; + OBJECT_TYPE_CONTINUAL_TASK = 17; + OBJECT_TYPE_NETWORK_POLICY = 18; + } + + message IdFullNameV1 { + string id = 1; + FullNameV1 name = 2; + } + + message FullNameV1 { + string database = 1; + string schema = 2; + string item = 3; + } + + message IdNameV1 { + string id = 1; + string name = 2; + } + + message RenameClusterV1 { + string id = 1; + string old_name = 2; + string new_name = 3; + } + + message RenameClusterReplicaV1 { + string cluster_id = 1; + string replica_id = 2; + string old_name = 3; + string new_name = 4; + } + + message RenameItemV1 { + string id = 1; + FullNameV1 old_name = 2; + FullNameV1 new_name = 3; + } + + message CreateClusterReplicaV1 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + string logical_size = 5; + bool disk = 6; + optional string billed_as = 7; + bool internal = 8; + } + + message CreateClusterReplicaV2 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + string logical_size = 5; + bool disk = 6; + optional string billed_as = 7; + bool internal = 8; + CreateOrDropClusterReplicaReasonV1 reason = 9; + SchedulingDecisionsWithReasonsV1 scheduling_policies = 10; + } + + message CreateClusterReplicaV3 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + string logical_size = 5; + bool disk = 6; + optional string billed_as = 7; + bool internal = 8; + CreateOrDropClusterReplicaReasonV1 reason = 9; + SchedulingDecisionsWithReasonsV2 scheduling_policies = 10; + } + + message DropClusterReplicaV1 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + } + + message DropClusterReplicaV2 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + CreateOrDropClusterReplicaReasonV1 reason = 5; + SchedulingDecisionsWithReasonsV1 scheduling_policies = 6; + } + + message DropClusterReplicaV3 { + string cluster_id = 1; + string cluster_name = 2; + StringWrapper replica_id = 3; + string replica_name = 4; + CreateOrDropClusterReplicaReasonV1 reason = 5; + SchedulingDecisionsWithReasonsV2 scheduling_policies = 6; + } + + message CreateOrDropClusterReplicaReasonV1 { + oneof reason { + Empty Manual = 1; + Empty Schedule = 2; + Empty System = 3; + } + } + + message SchedulingDecisionsWithReasonsV1 { + RefreshDecisionWithReasonV1 on_refresh = 1; + } + + message SchedulingDecisionsWithReasonsV2 { + RefreshDecisionWithReasonV2 on_refresh = 1; + } + + message RefreshDecisionWithReasonV1 { + oneof decision { + Empty On = 1; + Empty Off = 2; + } + repeated string objects_needing_refresh = 3; + string rehydration_time_estimate = 4; + } + + message RefreshDecisionWithReasonV2 { + oneof decision { + Empty On = 1; + Empty Off = 2; + } + repeated string objects_needing_refresh = 3; + repeated string objects_needing_compaction = 5; + string rehydration_time_estimate = 4; + } + + message CreateSourceSinkV1 { + string id = 1; + FullNameV1 name = 2; + StringWrapper size = 3; + } + + message CreateSourceSinkV2 { + string id = 1; + FullNameV1 name = 2; + StringWrapper size = 3; + string external_type = 4; + } + + message CreateSourceSinkV3 { + string id = 1; + FullNameV1 name = 2; + string external_type = 3; + } + + message CreateSourceSinkV4 { + string id = 1; + StringWrapper cluster_id = 2; + FullNameV1 name = 3; + string external_type = 4; + } + + message CreateIndexV1 { + string id = 1; + string cluster_id = 2; + FullNameV1 name = 3; + } + + message CreateMaterializedViewV1 { + string id = 1; + string cluster_id = 2; + FullNameV1 name = 3; + } + + message AlterSourceSinkV1 { + string id = 1; + FullNameV1 name = 2; + StringWrapper old_size = 3; + StringWrapper new_size = 4; + } + + message AlterSetClusterV1 { + string id = 1; + FullNameV1 name = 2; + StringWrapper old_cluster = 3; + StringWrapper new_cluster = 4; + } + + message GrantRoleV1 { + string role_id = 1; + string member_id = 2; + string grantor_id = 3; + } + + message GrantRoleV2 { + string role_id = 1; + string member_id = 2; + string grantor_id = 3; + string executed_by = 4; + } + + message RevokeRoleV1 { + string role_id = 1; + string member_id = 2; + } + + message RevokeRoleV2 { + string role_id = 1; + string member_id = 2; + string grantor_id = 3; + string executed_by = 4; + } + + message UpdatePrivilegeV1 { + string object_id = 1; + string grantee_id = 2; + string grantor_id = 3; + string privileges = 4; + } + + message AlterDefaultPrivilegeV1 { + string role_id = 1; + StringWrapper database_id = 2; + StringWrapper schema_id = 3; + string grantee_id = 4; + string privileges = 5; + } + + message UpdateOwnerV1 { + string object_id = 1; + string old_owner_id = 2; + string new_owner_id = 3; + } + + message SchemaV1 { + string id = 1; + string name = 2; + string database_name = 3; + } + + message SchemaV2 { + string id = 1; + string name = 2; + StringWrapper database_name = 3; + } + + message RenameSchemaV1 { + string id = 1; + optional string database_name = 2; + string old_name = 3; + string new_name = 4; + } + + message UpdateItemV1 { + string id = 1; + FullNameV1 name = 2; + } + + message AlterRetainHistoryV1 { + string id = 1; + optional string old_history = 2; + optional string new_history = 3; + } + + message ToNewIdV1 { + string id = 1; + string new_id = 2; + } + + message FromPreviousIdV1 { + string id = 1; + string previous_id = 2; + } + + message SetV1 { + string name = 1; + optional string value = 2; + } + + message RotateKeysV1 { + string id = 1; + string name = 2; + } + + uint64 id = 1; + EventType event_type = 2; + ObjectType object_type = 3; + StringWrapper user = 4; + EpochMillis occurred_at = 5; + + // next-id: 40 + oneof details { + CreateClusterReplicaV1 create_cluster_replica_v1 = 6; + CreateClusterReplicaV2 create_cluster_replica_v2 = 33; + CreateClusterReplicaV3 create_cluster_replica_v3 = 41; + DropClusterReplicaV1 drop_cluster_replica_v1 = 7; + DropClusterReplicaV2 drop_cluster_replica_v2 = 34; + DropClusterReplicaV3 drop_cluster_replica_v3 = 42; + CreateSourceSinkV1 create_source_sink_v1 = 8; + CreateSourceSinkV2 create_source_sink_v2 = 9; + AlterSourceSinkV1 alter_source_sink_v1 = 10; + AlterSetClusterV1 alter_set_cluster_v1 = 25; + GrantRoleV1 grant_role_v1 = 11; + GrantRoleV2 grant_role_v2 = 12; + RevokeRoleV1 revoke_role_v1 = 13; + RevokeRoleV2 revoke_role_v2 = 14; + UpdatePrivilegeV1 update_privilege_v1 = 22; + AlterDefaultPrivilegeV1 alter_default_privilege_v1 = 23; + UpdateOwnerV1 update_owner_v1 = 24; + IdFullNameV1 id_full_name_v1 = 15; + RenameClusterV1 rename_cluster_v1 = 20; + RenameClusterReplicaV1 rename_cluster_replica_v1 = 21; + RenameItemV1 rename_item_v1 = 16; + IdNameV1 id_name_v1 = 17; + SchemaV1 schema_v1 = 18; + SchemaV2 schema_v2 = 19; + RenameSchemaV1 rename_schema_v1 = 27; + UpdateItemV1 update_item_v1 = 26; + CreateSourceSinkV3 create_source_sink_v3 = 29; + AlterRetainHistoryV1 alter_retain_history_v1 = 30; + ToNewIdV1 to_new_id_v1 = 31; + FromPreviousIdV1 from_previous_id_v1 = 32; + SetV1 set_v1 = 35; + Empty reset_all_v1 = 36; + RotateKeysV1 rotate_keys_v1 = 37; + CreateSourceSinkV4 create_source_sink_v4 = 38; + CreateIndexV1 create_index_v1 = 39; + CreateMaterializedViewV1 create_materialized_view_v1 = 40; + } +} + +// Wrapper of key-values used by the persist implementation to serialize the catalog. +message StateUpdateKind { + reserved "Epoch"; + + message AuditLog { + AuditLogKey key = 1; + } + + message Cluster { + ClusterKey key = 1; + ClusterValue value = 2; + } + + message ClusterReplica { + ClusterReplicaKey key = 1; + ClusterReplicaValue value = 2; + } + + message Comment { + CommentKey key = 1; + CommentValue value = 2; + } + + message Config { + ConfigKey key = 1; + ConfigValue value = 2; + } + + message Database { + DatabaseKey key = 1; + DatabaseValue value = 2; + } + + message DefaultPrivileges { + DefaultPrivilegesKey key = 1; + DefaultPrivilegesValue value = 2; + } + + message FenceToken { + uint64 deploy_generation = 1; + int64 epoch = 2; + } + + message IdAlloc { + IdAllocKey key = 1; + IdAllocValue value = 2; + } + + message ClusterIntrospectionSourceIndex { + ClusterIntrospectionSourceIndexKey key = 1; + ClusterIntrospectionSourceIndexValue value = 2; + } + + message Item { + ItemKey key = 1; + ItemValue value = 2; + } + + message Role { + RoleKey key = 1; + RoleValue value = 2; + } + + message NetworkPolicy { + NetworkPolicyKey key = 1; + NetworkPolicyValue value = 2; + } + + message Schema { + SchemaKey key = 1; + SchemaValue value = 2; + } + + message Setting { + SettingKey key = 1; + SettingValue value = 2; + } + + message ServerConfiguration { + ServerConfigurationKey key = 1; + ServerConfigurationValue value = 2; + } + + message SourceReferences { + SourceReferencesKey key = 1; + SourceReferencesValue value = 2; + } + + message GidMapping { + GidMappingKey key = 1; + GidMappingValue value = 2; + } + + message SystemPrivileges { + SystemPrivilegesKey key = 1; + SystemPrivilegesValue value = 2; + } + + message StorageCollectionMetadata { + StorageCollectionMetadataKey key = 1; + StorageCollectionMetadataValue value = 2; + } + + message UnfinalizedShard { + UnfinalizedShardKey key = 1; + } + + message TxnWalShard { + TxnWalShardValue value = 1; + } + + reserved 15; + reserved "storage_usage"; + reserved 19; + reserved "timestamp"; + reserved 22; + reserved "persist_txn_shard"; + reserved 8; + reserved "epoch"; + + oneof kind { + AuditLog audit_log = 1; + Cluster cluster = 2; + ClusterReplica cluster_replica = 3; + Comment comment = 4; + Config config = 5; + Database database = 6; + DefaultPrivileges default_privileges = 7; + IdAlloc id_alloc = 9; + ClusterIntrospectionSourceIndex cluster_introspection_source_index = 10; + Item item = 11; + Role role = 12; + Schema schema = 13; + Setting setting = 14; + ServerConfiguration server_configuration = 16; + GidMapping gid_mapping = 17; + SystemPrivileges system_privileges = 18; + StorageCollectionMetadata storage_collection_metadata = 20; + UnfinalizedShard unfinalized_shard = 21; + TxnWalShard txn_wal_shard = 23; + SourceReferences source_references = 24; + FenceToken fence_token = 25; + NetworkPolicy network_policy = 26; + } +} diff --git a/src/catalog/src/durable/objects/serialization.rs b/src/catalog/src/durable/objects/serialization.rs index 979a6e2f1be78..323c33e05ba3d 100644 --- a/src/catalog/src/durable/objects/serialization.rs +++ b/src/catalog/src/durable/objects/serialization.rs @@ -13,14 +13,15 @@ use std::time::Duration; use mz_audit_log::{ AlterDefaultPrivilegeV1, AlterRetainHistoryV1, AlterSetClusterV1, AlterSourceSinkV1, - CreateClusterReplicaV1, CreateClusterReplicaV2, CreateIndexV1, CreateMaterializedViewV1, - CreateOrDropClusterReplicaReasonV1, CreateSourceSinkV1, CreateSourceSinkV2, CreateSourceSinkV3, - CreateSourceSinkV4, DropClusterReplicaV1, DropClusterReplicaV2, EventDetails, EventType, - EventV1, FromPreviousIdV1, FullNameV1, GrantRoleV1, GrantRoleV2, IdFullNameV1, IdNameV1, - RefreshDecisionWithReasonV1, RenameClusterReplicaV1, RenameClusterV1, RenameItemV1, + CreateClusterReplicaV1, CreateClusterReplicaV2, CreateClusterReplicaV3, CreateIndexV1, + CreateMaterializedViewV1, CreateOrDropClusterReplicaReasonV1, CreateSourceSinkV1, + CreateSourceSinkV2, CreateSourceSinkV3, CreateSourceSinkV4, DropClusterReplicaV1, + DropClusterReplicaV2, DropClusterReplicaV3, EventDetails, EventType, EventV1, FromPreviousIdV1, + FullNameV1, GrantRoleV1, GrantRoleV2, IdFullNameV1, IdNameV1, RefreshDecisionWithReasonV1, + RefreshDecisionWithReasonV2, RenameClusterReplicaV1, RenameClusterV1, RenameItemV1, RenameSchemaV1, RevokeRoleV1, RevokeRoleV2, RotateKeysV1, SchedulingDecisionV1, - SchedulingDecisionsWithReasonsV1, SchemaV1, SchemaV2, SetV1, ToNewIdV1, UpdateItemV1, - UpdateOwnerV1, UpdatePrivilegeV1, VersionedEvent, + SchedulingDecisionsWithReasonsV1, SchedulingDecisionsWithReasonsV2, SchemaV1, SchemaV2, SetV1, + ToNewIdV1, UpdateItemV1, UpdateOwnerV1, UpdatePrivilegeV1, VersionedEvent, }; use mz_compute_client::controller::ComputeReplicaLogging; use mz_controller_types::ReplicaId; @@ -1973,6 +1974,36 @@ impl RustType for DropClusterRe } } +impl RustType for DropClusterReplicaV3 { + fn into_proto(&self) -> proto::audit_log_event_v1::DropClusterReplicaV3 { + proto::audit_log_event_v1::DropClusterReplicaV3 { + cluster_id: self.cluster_id.to_string(), + cluster_name: self.cluster_name.to_string(), + replica_id: self.replica_id.as_ref().map(|id| proto::StringWrapper { + inner: id.to_string(), + }), + replica_name: self.replica_name.to_string(), + reason: Some(self.reason.into_proto()), + scheduling_policies: self.scheduling_policies.into_proto(), + } + } + + fn from_proto( + proto: proto::audit_log_event_v1::DropClusterReplicaV3, + ) -> Result { + Ok(DropClusterReplicaV3 { + cluster_id: proto.cluster_id, + cluster_name: proto.cluster_name, + replica_id: proto.replica_id.map(|s| s.inner), + replica_name: proto.replica_name, + reason: proto + .reason + .into_rust_if_some("DropClusterReplicaV3::reason")?, + scheduling_policies: proto.scheduling_policies.into_rust()?, + }) + } +} + impl RustType for CreateClusterReplicaV1 { fn into_proto(&self) -> proto::audit_log_event_v1::CreateClusterReplicaV1 { proto::audit_log_event_v1::CreateClusterReplicaV1 { @@ -2043,6 +2074,44 @@ impl RustType for CreateClust } } +impl RustType for CreateClusterReplicaV3 { + fn into_proto(&self) -> proto::audit_log_event_v1::CreateClusterReplicaV3 { + proto::audit_log_event_v1::CreateClusterReplicaV3 { + cluster_id: self.cluster_id.to_string(), + cluster_name: self.cluster_name.to_string(), + replica_id: self.replica_id.as_ref().map(|id| proto::StringWrapper { + inner: id.to_string(), + }), + replica_name: self.replica_name.to_string(), + logical_size: self.logical_size.to_string(), + disk: self.disk, + billed_as: self.billed_as.clone(), + internal: self.internal, + reason: Some(self.reason.into_proto()), + scheduling_policies: self.scheduling_policies.into_proto(), + } + } + + fn from_proto( + proto: proto::audit_log_event_v1::CreateClusterReplicaV3, + ) -> Result { + Ok(CreateClusterReplicaV3 { + cluster_id: proto.cluster_id, + cluster_name: proto.cluster_name, + replica_id: proto.replica_id.map(|id| id.inner), + replica_name: proto.replica_name, + logical_size: proto.logical_size, + disk: proto.disk, + billed_as: proto.billed_as, + internal: proto.internal, + reason: proto + .reason + .into_rust_if_some("DropClusterReplicaV3::reason")?, + scheduling_policies: proto.scheduling_policies.into_rust()?, + }) + } +} + impl RustType for CreateOrDropClusterReplicaReasonV1 { @@ -2092,6 +2161,26 @@ impl RustType } } +impl RustType + for SchedulingDecisionsWithReasonsV2 +{ + fn into_proto(&self) -> proto::audit_log_event_v1::SchedulingDecisionsWithReasonsV2 { + proto::audit_log_event_v1::SchedulingDecisionsWithReasonsV2 { + on_refresh: Some(self.on_refresh.into_proto()), + } + } + + fn from_proto( + proto: proto::audit_log_event_v1::SchedulingDecisionsWithReasonsV2, + ) -> Result { + Ok(SchedulingDecisionsWithReasonsV2 { + on_refresh: proto + .on_refresh + .into_rust_if_some("SchedulingDecisionsWithReasonsV2::on_refresh")?, + }) + } +} + impl RustType for RefreshDecisionWithReasonV1 { @@ -2135,6 +2224,51 @@ impl RustType } } +impl RustType + for RefreshDecisionWithReasonV2 +{ + fn into_proto(&self) -> proto::audit_log_event_v1::RefreshDecisionWithReasonV2 { + let decision = match &self.decision { + SchedulingDecisionV1::On => { + proto::audit_log_event_v1::refresh_decision_with_reason_v2::Decision::On(Empty {}) + } + SchedulingDecisionV1::Off => { + proto::audit_log_event_v1::refresh_decision_with_reason_v2::Decision::Off(Empty {}) + } + }; + proto::audit_log_event_v1::RefreshDecisionWithReasonV2 { + decision: Some(decision), + objects_needing_refresh: self.objects_needing_refresh.clone(), + objects_needing_compaction: self.objects_needing_compaction.clone(), + rehydration_time_estimate: self.hydration_time_estimate.clone(), + } + } + + fn from_proto( + proto: proto::audit_log_event_v1::RefreshDecisionWithReasonV2, + ) -> Result { + let decision = match proto.decision { + None => { + return Err(TryFromProtoError::missing_field( + "CreateOrDropClusterReplicaReasonV2::reason", + )); + } + Some(proto::audit_log_event_v1::refresh_decision_with_reason_v2::Decision::On( + Empty {}, + )) => SchedulingDecisionV1::On, + Some(proto::audit_log_event_v1::refresh_decision_with_reason_v2::Decision::Off( + Empty {}, + )) => SchedulingDecisionV1::Off, + }; + Ok(RefreshDecisionWithReasonV2 { + decision, + objects_needing_refresh: proto.objects_needing_refresh, + objects_needing_compaction: proto.objects_needing_compaction, + hydration_time_estimate: proto.rehydration_time_estimate, + }) + } +} + impl RustType for CreateSourceSinkV1 { fn into_proto(&self) -> proto::audit_log_event_v1::CreateSourceSinkV1 { proto::audit_log_event_v1::CreateSourceSinkV1 { @@ -2648,12 +2782,18 @@ impl RustType for EventDetails { EventDetails::CreateClusterReplicaV2(details) => { CreateClusterReplicaV2(details.into_proto()) } + EventDetails::CreateClusterReplicaV3(details) => { + CreateClusterReplicaV3(details.into_proto()) + } EventDetails::DropClusterReplicaV1(details) => { DropClusterReplicaV1(details.into_proto()) } EventDetails::DropClusterReplicaV2(details) => { DropClusterReplicaV2(details.into_proto()) } + EventDetails::DropClusterReplicaV3(details) => { + DropClusterReplicaV3(details.into_proto()) + } EventDetails::CreateSourceSinkV1(details) => CreateSourceSinkV1(details.into_proto()), EventDetails::CreateSourceSinkV2(details) => CreateSourceSinkV2(details.into_proto()), EventDetails::CreateSourceSinkV3(details) => CreateSourceSinkV3(details.into_proto()), @@ -2705,12 +2845,18 @@ impl RustType for EventDetails { CreateClusterReplicaV2(details) => { Ok(EventDetails::CreateClusterReplicaV2(details.into_rust()?)) } + CreateClusterReplicaV3(details) => { + Ok(EventDetails::CreateClusterReplicaV3(details.into_rust()?)) + } DropClusterReplicaV1(details) => { Ok(EventDetails::DropClusterReplicaV1(details.into_rust()?)) } DropClusterReplicaV2(details) => { Ok(EventDetails::DropClusterReplicaV2(details.into_rust()?)) } + DropClusterReplicaV3(details) => { + Ok(EventDetails::DropClusterReplicaV3(details.into_rust()?)) + } CreateSourceSinkV1(details) => { Ok(EventDetails::CreateSourceSinkV1(details.into_rust()?)) } diff --git a/src/catalog/src/durable/upgrade.rs b/src/catalog/src/durable/upgrade.rs index 2baafff5c60b5..763844bf772d7 100644 --- a/src/catalog/src/durable/upgrade.rs +++ b/src/catalog/src/durable/upgrade.rs @@ -180,14 +180,14 @@ macro_rules! objects { } } -objects!(v67, v68, v69); +objects!(v67, v68, v69, v70); /// The current version of the `Catalog`. /// /// We will initialize new `Catalog`es with this version, and migrate existing `Catalog`es to this /// version. Whenever the `Catalog` changes, e.g. the protobufs we serialize in the `Catalog` /// change, we need to bump this version. -pub const CATALOG_VERSION: u64 = 69; +pub const CATALOG_VERSION: u64 = 70; /// The minimum `Catalog` version number that we support migrating from. /// @@ -201,6 +201,7 @@ const FUTURE_VERSION: u64 = CATALOG_VERSION + 1; mod v67_to_v68; mod v68_to_v69; +mod v69_to_v70; /// Describes a single action to take during a migration from `V1` to `V2`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -281,6 +282,7 @@ async fn run_upgrade( 67 => run_versioned_upgrade(unopened_catalog_state, version, v67_to_v68::upgrade).await, 68 => run_versioned_upgrade(unopened_catalog_state, version, v68_to_v69::upgrade).await, + 69 => run_versioned_upgrade(unopened_catalog_state, version, v69_to_v70::upgrade).await, // Up-to-date, no migration needed! CATALOG_VERSION => Ok(CATALOG_VERSION), diff --git a/src/catalog/src/durable/upgrade/snapshots/objects_v70.txt b/src/catalog/src/durable/upgrade/snapshots/objects_v70.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/catalog/src/durable/upgrade/v69_to_v70.rs b/src/catalog/src/durable/upgrade/v69_to_v70.rs new file mode 100644 index 0000000000000..f538456ec8c8c --- /dev/null +++ b/src/catalog/src/durable/upgrade/v69_to_v70.rs @@ -0,0 +1,19 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::durable::upgrade::MigrationAction; +use crate::durable::upgrade::{objects_v69 as v69, objects_v70 as v70}; + +/// In v70, we updated the audit log entries when creating/dropping replicas due to refresh +/// schedules. +pub fn upgrade( + _snapshot: Vec, +) -> Vec> { + Vec::new() +} diff --git a/src/repr/src/refresh_schedule.rs b/src/repr/src/refresh_schedule.rs index 1d4c049786edd..5a65187c076fc 100644 --- a/src/repr/src/refresh_schedule.rs +++ b/src/repr/src/refresh_schedule.rs @@ -56,6 +56,13 @@ impl RefreshSchedule { /// Returns the time of the last refresh. Returns None if there is no last refresh (e.g., for a /// periodic refresh). + /// + /// (If there is no last refresh, then we have a `REFRESH EVERY`, in which case the saturating + /// roundup puts a refresh at the maximum possible timestamp. This means that it would make + /// some sense to return the maximum possible timestamp instead of None. Indeed, some of our + /// callers handle our None return value in exactly this way. However, some other callers do + /// something else with None, and therefore we don't want to hardcode this behavior in this + /// function.) pub fn last_refresh(&self) -> Option { if self.everies.is_empty() { self.ats.iter().max().cloned() diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 7cac39770dbaf..71c37164fcaf4 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1307,6 +1307,7 @@ impl SystemVars { &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL, &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL, &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED, + &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE, &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT, &STATEMENT_LOGGING_MAX_SAMPLE_RATE, &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE, @@ -2218,6 +2219,10 @@ impl SystemVars { *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED) } + pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration { + *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE) + } + /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter. pub fn privatelink_status_update_quota_per_minute(&self) -> u32 { *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE) diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 506f0cb4ee488..7e77333cbe6aa 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1653,6 +1653,19 @@ pub mod cluster_scheduling { "Enables SecurityContext for clusterd instances, restricting capabilities to improve security.", false, ); + + const DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: Duration = Duration::from_secs(60); + + pub static CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE: VarDefinition = VarDefinition::new( + "cluster_refresh_mv_compaction_estimate", + value!(Duration; DEFAULT_CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE), + "How much time to wait for compaction after a REFRESH MV completes a refresh \ + before turning off the refresh cluster. This is needed because Persist does compaction \ + only after a write, but refresh MVs do writes only at their refresh times. \ + (In the long term, we'd like to remove this configuration and instead wait exactly \ + until compaction has settled. We'd need some new Persist API for this.)", + false, + ); } /// Macro to simplify creating feature flags, i.e. boolean flags that we use to toggle the diff --git a/test/sqllogictest/materialized_views.slt b/test/sqllogictest/materialized_views.slt index 2ace586b8b81f..af6d5ae482a8e 100644 --- a/test/sqllogictest/materialized_views.slt +++ b/test/sqllogictest/materialized_views.slt @@ -1281,6 +1281,12 @@ ALTER SYSTEM SET enable_cluster_schedule_refresh = true ---- COMPLETE 0 +# Let's not complicate things with `cluster_refresh_mv_compaction_estimate` at first. +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 0 +---- +COMPLETE 0 + statement error db error: ERROR: Expected one of MANUAL or ON, found identifier "aaaaaaaa" CREATE CLUSTER c_schedule_0 (SIZE = '1', SCHEDULE = AAAAAAAA); @@ -1580,14 +1586,102 @@ drop cluster-replica "c_schedule_4" "manual" true NULL create cluster-replica "c_schedule_1" "manual" true NULL create cluster-replica "c_schedule_2" "manual" true NULL create cluster-replica "c_schedule_5" "manual" true NULL -drop cluster-replica "c_schedule_1" "schedule" false {"decision":"off","hydration_time_estimate":"00:00:00","objects_needing_refresh":[]} -drop cluster-replica "c_schedule_3" "schedule" false {"decision":"off","hydration_time_estimate":"00:00:00","objects_needing_refresh":[]} -create cluster-replica "c_schedule_1" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_refresh":["uXXX"]} -create cluster-replica "c_schedule_3" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_refresh":["uXXX"]} -create cluster-replica "c_schedule_4" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_refresh":["uXXX"]} -create cluster-replica "c_schedule_hydration_time_estimate" "schedule" false {"decision":"on","hydration_time_estimate":"00:16:35","objects_needing_refresh":["uXXX"]} - -# Materialized views in this file can be explained +drop cluster-replica "c_schedule_1" "schedule" false {"decision":"off","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":[]} +drop cluster-replica "c_schedule_3" "schedule" false {"decision":"off","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":[]} +create cluster-replica "c_schedule_1" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":["uXXX"]} +create cluster-replica "c_schedule_3" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":["uXXX"]} +create cluster-replica "c_schedule_4" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":["uXXX"]} +create cluster-replica "c_schedule_hydration_time_estimate" "schedule" false {"decision":"on","hydration_time_estimate":"00:16:35","objects_needing_compaction":[],"objects_needing_refresh":["uXXX"]} + +## Now test `cluster_refresh_mv_compaction_estimate`. +## (This would make the above audit test flaky, so it should be after that.) +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 1200000 +---- +COMPLETE 0 + +statement ok +CREATE CLUSTER c_schedule_6 (SIZE = '1', SCHEDULE = ON REFRESH); + +query I +SELECT replication_factor FROM mz_catalog.mz_clusters WHERE name = 'c_schedule_6'; +---- +0 + +statement ok +CREATE MATERIALIZED VIEW mv13 +IN CLUSTER c_schedule_6 +WITH (REFRESH AT CREATION) +AS SELECT sum(x*y) - count(*) AS r FROM t2; + +# Wait until the first refresh is complete. +query I +SELECT r+r FROM mv13; +---- +31916 + +# We'd turn it off at the next scheduling decision if it were not for `cluster_refresh_mv_compaction_estimate` +statement ok +SELECT mz_unsafe.mz_sleep(3+1+1); + +query I +SELECT replication_factor FROM mz_catalog.mz_clusters WHERE name = 'c_schedule_6'; +---- +1 + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 0 +---- +COMPLETE 0 + +# Should turn off at the next scheduling decision. +statement ok +SELECT mz_unsafe.mz_sleep(3+1+1); + +query I +SELECT replication_factor FROM mz_catalog.mz_clusters WHERE name = 'c_schedule_6'; +---- +0 + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 120000 +---- +COMPLETE 0 + +# Should turn on at the next scheduling decision. +statement ok +SELECT mz_unsafe.mz_sleep(3+1+1); + +query I +SELECT replication_factor FROM mz_catalog.mz_clusters WHERE name = 'c_schedule_6'; +---- +1 + +# The audit events should now have a row that has a non-empty `objects_needing_compaction`. +query TTTTBT +SELECT DISTINCT + event_type, + object_type, + (details->'cluster_name')::text, + (details->'reason')::text, + (details->'scheduling_policies') IS NULL, + regexp_replace((details->'scheduling_policies'->'on_refresh')::text, '\["u.*"\]', '["uXXX"]') +FROM mz_audit_events +WHERE + event_type IN ('create', 'drop') AND + object_type = 'cluster-replica' AND + (details->'cluster_name')::text = '"c_schedule_6"'; +---- +drop cluster-replica "c_schedule_6" "schedule" false {"decision":"off","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":[]} +create cluster-replica "c_schedule_6" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_compaction":["uXXX"],"objects_needing_refresh":[]} +create cluster-replica "c_schedule_6" "schedule" false {"decision":"on","hydration_time_estimate":"00:00:00","objects_needing_compaction":[],"objects_needing_refresh":["uXXX"]} + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 0 +---- +COMPLETE 0 + +## EXPLAIN FILTER PUSHDOWN can be run on materialized views in this file simple conn=mz_system,user=mz_system ALTER SYSTEM SET enable_explain_pushdown = true diff --git a/test/testdrive-old-kafka-src-syntax/materialized-view-refresh-options.td b/test/testdrive-old-kafka-src-syntax/materialized-view-refresh-options.td index 9aa7c0a1fb1f8..e3bf07ad402ca 100644 --- a/test/testdrive-old-kafka-src-syntax/materialized-view-refresh-options.td +++ b/test/testdrive-old-kafka-src-syntax/materialized-view-refresh-options.td @@ -11,6 +11,7 @@ $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-inter ALTER SYSTEM SET enable_refresh_every_mvs = true ALTER SYSTEM SET enable_cluster_schedule_refresh = true ALTER SYSTEM SET enable_unstable_dependencies = true +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 0 > CREATE DATABASE materialized_view_refresh_options; > SET DATABASE = materialized_view_refresh_options; diff --git a/test/testdrive/materialized-view-refresh-options.td b/test/testdrive/materialized-view-refresh-options.td index 9aa7c0a1fb1f8..e3bf07ad402ca 100644 --- a/test/testdrive/materialized-view-refresh-options.td +++ b/test/testdrive/materialized-view-refresh-options.td @@ -11,6 +11,7 @@ $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-inter ALTER SYSTEM SET enable_refresh_every_mvs = true ALTER SYSTEM SET enable_cluster_schedule_refresh = true ALTER SYSTEM SET enable_unstable_dependencies = true +ALTER SYSTEM SET cluster_refresh_mv_compaction_estimate = 0 > CREATE DATABASE materialized_view_refresh_options; > SET DATABASE = materialized_view_refresh_options;