diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 114dbf3bb03a6..8c577b41a4299 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -27,7 +27,8 @@ use mz_catalog::builtin::{ BUILTIN_PREFIXES, MZ_INTROSPECTION_CLUSTER, }; use mz_catalog::{ - BootstrapArgs, DurableCatalogState, OpenableDurableCatalogState, StashConfig, Transaction, + debug_bootstrap_args, DurableCatalogState, OpenableDurableCatalogState, StashConfig, + Transaction, }; use mz_compute_types::dataflows::DataflowDescription; use mz_controller::clusters::{ @@ -456,7 +457,7 @@ impl Catalog { let mut openable_storage = mz_catalog::debug_stash_backed_catalog_state(debug_stash_factory); let storage = openable_storage - .open(now.clone(), &Self::debug_bootstrap_args(), None) + .open(now.clone(), &debug_bootstrap_args(), None) .await?; Self::open_debug_stash_catalog(storage, now).await } @@ -480,7 +481,7 @@ impl Catalog { }; let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); let storage = openable_storage - .open(now.clone(), &Self::debug_bootstrap_args(), None) + .open(now.clone(), &debug_bootstrap_args(), None) .await?; Self::open_debug_stash_catalog(storage, now).await } @@ -494,7 +495,7 @@ impl Catalog { ) -> Result { let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); let storage = openable_storage - .open_read_only(now.clone(), &Self::debug_bootstrap_args()) + .open_read_only(now.clone(), &debug_bootstrap_args()) .await?; Self::open_debug_stash_catalog(storage, now).await } @@ -546,14 +547,6 @@ impl Catalog { Ok(catalog) } - fn debug_bootstrap_args() -> BootstrapArgs { - BootstrapArgs { - default_cluster_replica_size: "1".into(), - builtin_cluster_replica_size: "1".into(), - bootstrap_role: None, - } - } - pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> { Self::for_session_state(&self.state, session) } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index edbfbbf4b8f59..3086fa3387c05 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -122,13 +122,15 @@ mod error; pub mod initialize; pub mod objects; -const DATABASE_ID_ALLOC_KEY: &str = "database"; -const SCHEMA_ID_ALLOC_KEY: &str = "schema"; -const USER_ROLE_ID_ALLOC_KEY: &str = "user_role"; -const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute"; -const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute"; -const USER_REPLICA_ID_ALLOC_KEY: &str = "replica"; -const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica"; +pub const DATABASE_ID_ALLOC_KEY: &str = "database"; +pub const SCHEMA_ID_ALLOC_KEY: &str = "schema"; +pub const USER_ITEM_ALLOC_KEY: &str = "user"; +pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system"; +pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role"; +pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute"; +pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute"; +pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica"; +pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica"; pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog"; pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage"; @@ -353,14 +355,14 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState { /// Allocates and returns `amount` system [`GlobalId`]s. async fn allocate_system_ids(&mut self, amount: u64) -> Result, CatalogError> { - let id = self.allocate_id("system", amount).await?; + let id = self.allocate_id(SYSTEM_ITEM_ALLOC_KEY, amount).await?; Ok(id.into_iter().map(GlobalId::System).collect()) } /// Allocates and returns a user [`GlobalId`]. async fn allocate_user_id(&mut self) -> Result { - let id = self.allocate_id("user", 1).await?; + let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1).await?; let id = id.into_element(); Ok(GlobalId::User(id)) } @@ -402,3 +404,11 @@ pub fn debug_stash_backed_catalog_state( ) -> impl OpenableDurableCatalogState + '_ { DebugOpenableConnection::new(debug_stash_factory) } + +pub fn debug_bootstrap_args() -> BootstrapArgs { + BootstrapArgs { + default_cluster_replica_size: "1".into(), + builtin_cluster_replica_size: "1".into(), + bootstrap_role: None, + } +} diff --git a/src/catalog/src/objects.rs b/src/catalog/src/objects.rs index c89127dff21a8..ec385deb0a00c 100644 --- a/src/catalog/src/objects.rs +++ b/src/catalog/src/objects.rs @@ -24,7 +24,6 @@ use mz_stash_types::objects::{proto, RustType, TryFromProtoError}; use mz_storage_types::sources::Timeline; use proptest_derive::Arbitrary; use std::collections::BTreeMap; -use std::time::Duration; // Structs used to pass information to outside modules. @@ -38,7 +37,7 @@ use std::time::Duration; /// This trait is based on [`RustType`], however it is meant to /// convert the types used in [`RustType`] to a more consumable and /// condensed type. -pub(crate) trait DurableType: Sized { +pub trait DurableType: Sized { /// Consume and convert `Self` into a `(K, V)` key-value pair. fn into_key_value(self) -> (K, V); @@ -47,7 +46,7 @@ pub(crate) trait DurableType: Sized { fn from_key_value(key: K, value: V) -> Self; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Database { pub id: DatabaseId, pub name: String, @@ -77,7 +76,7 @@ impl DurableType for Database { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Schema { pub id: SchemaId, pub name: String, @@ -110,7 +109,7 @@ impl DurableType for Schema { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Role { pub id: RoleId, pub name: String, @@ -143,7 +142,7 @@ impl DurableType for Role { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Cluster { pub id: ClusterId, pub name: String, @@ -258,7 +257,7 @@ pub struct ClusterVariantManaged { pub disk: bool, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct IntrospectionSourceIndex { pub cluster_id: ClusterId, pub name: String, @@ -307,7 +306,7 @@ impl DurableType for ReplicaLocation { } } -#[derive(Clone, Debug)] -pub struct ComputeReplicaLogging { - pub log_logging: bool, - pub interval: Option, -} - -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Item { pub id: GlobalId, pub schema_id: SchemaId, @@ -554,7 +547,7 @@ pub struct SystemObjectDescription { pub object_name: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SystemObjectUniqueIdentifier { pub id: GlobalId, pub fingerprint: String, @@ -567,7 +560,7 @@ pub struct SystemObjectUniqueIdentifier { /// As such, system objects are keyed in the catalog storage by the /// tuple (schema_name, object_type, object_name), which is guaranteed /// to be unique. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SystemObjectMapping { pub description: SystemObjectDescription, pub unique_identifier: SystemObjectUniqueIdentifier, @@ -608,7 +601,7 @@ impl DurableType for SystemObjectMapping { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct DefaultPrivilege { pub object: DefaultPrivilegeObject, pub acl_item: DefaultPrivilegeAclItem, @@ -646,7 +639,7 @@ impl DurableType for DefaultPrivil } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Comment { pub object_id: CommentObjectId, pub sub_component: Option, @@ -675,7 +668,7 @@ impl DurableType for Comment { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct IdAlloc { pub name: String, pub next_id: u64, @@ -699,7 +692,7 @@ impl DurableType for IdAlloc { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Config { pub key: String, pub value: u64, @@ -743,7 +736,7 @@ impl DurableType for Setting { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct TimelineTimestamp { pub timeline: Timeline, pub ts: mz_repr::Timestamp, @@ -767,7 +760,7 @@ impl DurableType for TimelineTimestamp { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SystemConfiguration { pub name: String, pub value: String, diff --git a/src/catalog/tests/open.rs b/src/catalog/tests/open.rs index 53207607f2358..08fc10f347624 100644 --- a/src/catalog/tests/open.rs +++ b/src/catalog/tests/open.rs @@ -87,8 +87,8 @@ // END LINT CONFIG use mz_catalog::{ - debug_stash_backed_catalog_state, stash_backed_catalog_state, BootstrapArgs, CatalogError, - DurableCatalogState, OpenableDurableCatalogState, StashConfig, + debug_bootstrap_args, debug_stash_backed_catalog_state, stash_backed_catalog_state, + CatalogError, DurableCatalogState, OpenableDurableCatalogState, StashConfig, }; use mz_ore::now::{NOW_ZERO, SYSTEM_TIME}; use mz_repr::role_id::RoleId; @@ -97,23 +97,23 @@ use std::num::NonZeroI64; #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_is_initialized() { +async fn test_stash_is_initialized() { let (debug_factory, stash_config) = stash_config().await; let openable_state = stash_backed_catalog_state(stash_config); - is_initialized(openable_state).await; + test_is_initialized(openable_state).await; debug_factory.drop().await; } #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_debug_is_initialized() { +async fn test_debug_stash_is_initialized() { let debug_factory = DebugStashFactory::new().await; let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - is_initialized(debug_openable_state).await; + test_is_initialized(debug_openable_state).await; debug_factory.drop().await; } -async fn is_initialized( +async fn test_is_initialized( mut openable_state: impl OpenableDurableCatalogState, ) { assert!( @@ -122,7 +122,7 @@ async fn is_initialized( ); let _ = openable_state - .open(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); @@ -139,23 +139,23 @@ async fn is_initialized( #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_get_deployment_generation() { +async fn test_stash_get_deployment_generation() { let (debug_factory, stash_config) = stash_config().await; let openable_state = stash_backed_catalog_state(stash_config); - get_deployment_generation(openable_state).await; + test_get_deployment_generation(openable_state).await; debug_factory.drop().await; } #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_debug_get_deployment_generation() { +async fn test_debug_stash_get_deployment_generation() { let debug_factory = DebugStashFactory::new().await; let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - get_deployment_generation(debug_openable_state).await; + test_get_deployment_generation(debug_openable_state).await; debug_factory.drop().await; } -async fn get_deployment_generation( +async fn test_get_deployment_generation( mut openable_state: impl OpenableDurableCatalogState, ) { assert_eq!( @@ -165,7 +165,7 @@ async fn get_deployment_generation( ); let _ = openable_state - .open(SYSTEM_TIME.clone(), &bootstrap_args(), Some(42)) + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), Some(42)) .await .unwrap(); @@ -184,29 +184,29 @@ async fn get_deployment_generation( #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_open_savepoint() { +async fn test_stash_open_savepoint() { let (debug_factory, stash_config) = stash_config().await; let openable_state = stash_backed_catalog_state(stash_config); - open_savepoint(openable_state).await; + test_open_savepoint(openable_state).await; debug_factory.drop().await; } #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_debug_open_savepoint() { +async fn test_debug_stash_open_savepoint() { let debug_factory = DebugStashFactory::new().await; let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - open_savepoint(debug_openable_state).await; + test_open_savepoint(debug_openable_state).await; debug_factory.drop().await; } -async fn open_savepoint( +async fn test_open_savepoint( mut openable_state: impl OpenableDurableCatalogState, ) { { // Can't open a savepoint catalog until it's been initialized. let err = openable_state - .open_savepoint(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open_savepoint(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap_err(); match err { @@ -217,7 +217,7 @@ async fn open_savepoint( // Initialize the stash. { let mut state = openable_state - .open(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); assert_eq!( @@ -228,7 +228,7 @@ async fn open_savepoint( // Open catalog in check mode. let mut state = openable_state - .open_savepoint(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open_savepoint(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); // Savepoint catalogs do not increment the epoch. @@ -255,7 +255,7 @@ async fn open_savepoint( { // Open catalog normally. let mut state = openable_state - .open(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); // Write should not have persisted. @@ -271,28 +271,28 @@ async fn open_savepoint( #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_open_read_only() { +async fn test_stash_open_read_only() { let (debug_factory, stash_config) = stash_config().await; let openable_state = stash_backed_catalog_state(stash_config); - open_read_only(openable_state).await; + test_open_read_only(openable_state).await; debug_factory.drop().await; } #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_debug_open_read_only() { +async fn test_debug_stash_open_read_only() { let debug_factory = DebugStashFactory::new().await; let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - open_read_only(debug_openable_state).await; + test_open_read_only(debug_openable_state).await; debug_factory.drop().await; } -async fn open_read_only( +async fn test_open_read_only( mut openable_state: impl OpenableDurableCatalogState, ) { // Can't open a read-only stash until it's been initialized. let err = openable_state - .open_read_only(SYSTEM_TIME.clone(), &bootstrap_args()) + .open_read_only(SYSTEM_TIME.clone(), &debug_bootstrap_args()) .await .unwrap_err(); match err { @@ -303,7 +303,7 @@ async fn open_read_only( // Initialize the stash. { let mut state = openable_state - .open(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); assert_eq!( @@ -313,7 +313,7 @@ async fn open_read_only( } let mut state = openable_state - .open_read_only(SYSTEM_TIME.clone(), &bootstrap_args()) + .open_read_only(SYSTEM_TIME.clone(), &debug_bootstrap_args()) .await .unwrap(); // Read-only catalogs do not increment the epoch. @@ -329,27 +329,29 @@ async fn open_read_only( #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_open() { +async fn test_stash_open() { let (debug_factory, stash_config) = stash_config().await; let openable_state = stash_backed_catalog_state(stash_config); - open(openable_state).await; + test_open(openable_state).await; debug_factory.drop().await; } #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` -async fn test_debug_open() { +async fn test_debug_stash_open() { let debug_factory = DebugStashFactory::new().await; let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - open(debug_openable_state).await; + test_open(debug_openable_state).await; debug_factory.drop().await; } -async fn open(mut openable_state: impl OpenableDurableCatalogState) { +async fn test_open( + mut openable_state: impl OpenableDurableCatalogState, +) { let (snapshot, audit_log) = { let mut state = openable_state // Use `NOW_ZERO` for consistent timestamps in the snapshots. - .open(NOW_ZERO.clone(), &bootstrap_args(), None) + .open(NOW_ZERO.clone(), &debug_bootstrap_args(), None) .await .unwrap(); @@ -367,7 +369,7 @@ async fn open(mut openable_state: impl OpenableDurableCa // Reopening the catalog will increment the epoch, but shouldn't change the initial snapshot. { let mut state = openable_state - .open(SYSTEM_TIME.clone(), &bootstrap_args(), None) + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); @@ -392,11 +394,3 @@ async fn stash_config() -> (DebugStashFactory, StashConfig) { }; (debug_stash_factory, config) } - -fn bootstrap_args() -> BootstrapArgs { - BootstrapArgs { - default_cluster_replica_size: "1".into(), - builtin_cluster_replica_size: "1".into(), - bootstrap_role: None, - } -} diff --git a/src/catalog/tests/read-write.rs b/src/catalog/tests/read-write.rs new file mode 100644 index 0000000000000..133e710396be8 --- /dev/null +++ b/src/catalog/tests/read-write.rs @@ -0,0 +1,1192 @@ +// Copyright 2018 sqlparser-rs contributors. All rights reserved. +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// This file is derived from the sqlparser-rs project, available at +// https://github.com/andygrove/sqlparser-rs. It was incorporated +// directly into Materialize on December 21, 2019. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// BEGIN LINT CONFIG +// DO NOT EDIT. Automatically generated by bin/gen-lints. +// Have complaints about the noise? See the note in misc/python/materialize/cli/gen-lints.py first. +#![allow(unknown_lints)] +#![allow(clippy::style)] +#![allow(clippy::complexity)] +#![allow(clippy::large_enum_variant)] +#![allow(clippy::mutable_key_type)] +#![allow(clippy::stable_sort_primitive)] +#![allow(clippy::map_entry)] +#![allow(clippy::box_default)] +#![allow(clippy::drain_collect)] +#![warn(clippy::bool_comparison)] +#![warn(clippy::clone_on_ref_ptr)] +#![warn(clippy::no_effect)] +#![warn(clippy::unnecessary_unwrap)] +#![warn(clippy::dbg_macro)] +#![warn(clippy::todo)] +#![warn(clippy::wildcard_dependencies)] +#![warn(clippy::zero_prefixed_literal)] +#![warn(clippy::borrowed_box)] +#![warn(clippy::deref_addrof)] +#![warn(clippy::double_must_use)] +#![warn(clippy::double_parens)] +#![warn(clippy::extra_unused_lifetimes)] +#![warn(clippy::needless_borrow)] +#![warn(clippy::needless_question_mark)] +#![warn(clippy::needless_return)] +#![warn(clippy::redundant_pattern)] +#![warn(clippy::redundant_slicing)] +#![warn(clippy::redundant_static_lifetimes)] +#![warn(clippy::single_component_path_imports)] +#![warn(clippy::unnecessary_cast)] +#![warn(clippy::useless_asref)] +#![warn(clippy::useless_conversion)] +#![warn(clippy::builtin_type_shadow)] +#![warn(clippy::duplicate_underscore_argument)] +#![warn(clippy::double_neg)] +#![warn(clippy::unnecessary_mut_passed)] +#![warn(clippy::wildcard_in_or_patterns)] +#![warn(clippy::crosspointer_transmute)] +#![warn(clippy::excessive_precision)] +#![warn(clippy::overflow_check_conditional)] +#![warn(clippy::as_conversions)] +#![warn(clippy::match_overlapping_arm)] +#![warn(clippy::zero_divided_by_zero)] +#![warn(clippy::must_use_unit)] +#![warn(clippy::suspicious_assignment_formatting)] +#![warn(clippy::suspicious_else_formatting)] +#![warn(clippy::suspicious_unary_op_formatting)] +#![warn(clippy::mut_mutex_lock)] +#![warn(clippy::print_literal)] +#![warn(clippy::same_item_push)] +#![warn(clippy::useless_format)] +#![warn(clippy::write_literal)] +#![warn(clippy::redundant_closure)] +#![warn(clippy::redundant_closure_call)] +#![warn(clippy::unnecessary_lazy_evaluations)] +#![warn(clippy::partialeq_ne_impl)] +#![warn(clippy::redundant_field_names)] +#![warn(clippy::transmutes_expressible_as_ptr_casts)] +#![warn(clippy::unused_async)] +#![warn(clippy::disallowed_methods)] +#![warn(clippy::disallowed_macros)] +#![warn(clippy::disallowed_types)] +#![warn(clippy::from_over_into)] +// END LINT CONFIG + +use itertools::Itertools; +use mz_audit_log::{ + CreateClusterReplicaV1, EventDetails, EventType, EventV1, IdNameV1, StorageUsageV1, + VersionedEvent, VersionedStorageUsage, +}; +use mz_catalog::objects::{Config, DurableType, IdAlloc, IntrospectionSourceIndex}; +use mz_catalog::objects::{SystemObjectDescription, SystemObjectUniqueIdentifier}; +use mz_catalog::{ + debug_bootstrap_args, debug_stash_backed_catalog_state, Cluster, ClusterConfig, ClusterReplica, + ClusterVariant, Comment, Database, DefaultPrivilege, DurableCatalogState, Item, + OpenableDurableCatalogState, ReplicaConfig, ReplicaLocation, Role, Schema, SystemConfiguration, + SystemObjectMapping, TimelineTimestamp, DATABASE_ID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, + USER_ITEM_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY, +}; +use mz_controller::clusters::ReplicaLogging; +use mz_controller_types::{ClusterId, ReplicaId}; +use mz_ore::collections::CollectionExt; +use mz_ore::now::SYSTEM_TIME; +use mz_proto::RustType; +use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; +use mz_repr::role_id::RoleId; +use mz_repr::GlobalId; +use mz_sql::catalog::{ + CatalogItemType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributes, + RoleMembership, +}; +use mz_sql::names::{CommentObjectId, DatabaseId, SchemaId}; +use mz_stash::DebugStashFactory; +use mz_stash_types::objects::proto; +use mz_storage_types::sources::Timeline; +use std::time::Duration; + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_confirm_leadership() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_confirm_leadership(openable_state).await; + debug_factory.drop().await; +} +async fn test_confirm_leadership( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mut state1 = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + assert!(state1.confirm_leadership().await.is_ok()); + + let mut state2 = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + assert!(state2.confirm_leadership().await.is_ok()); + assert!(state1.confirm_leadership().await.is_err()); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_catalog_content_version() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_catalog_content_version(openable_state).await; + debug_factory.drop().await; +} + +async fn test_catalog_content_version( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + assert!(state.get_catalog_content_version().await.unwrap().is_none()); + state.set_catalog_content_version("foo").await.unwrap(); + assert_eq!( + state.get_catalog_content_version().await.unwrap().unwrap(), + "foo" + ); + assert_eq!( + state + .snapshot() + .await + .unwrap() + .settings + .get(&proto::SettingKey { + name: "catalog_content_version".to_string() + }) + .unwrap() + .value, + "foo".to_string() + ); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_get_and_prune_storage_usage() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_get_and_prune_storage_usage(openable_state).await; + debug_factory.drop().await; +} + +async fn test_get_and_prune_storage_usage( + mut openable_state: impl OpenableDurableCatalogState, +) { + let old_event = VersionedStorageUsage::V1(StorageUsageV1 { + id: 1, + shard_id: Some("recent".to_string()), + size_bytes: 42, + collection_timestamp: 10, + }); + let recent_event = VersionedStorageUsage::V1(StorageUsageV1 { + id: 1, + shard_id: Some("recent".to_string()), + size_bytes: 42, + collection_timestamp: 20, + }); + let boot_ts = mz_repr::Timestamp::new(23); + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + txn.insert_storage_usage_event(old_event.clone()); + txn.insert_storage_usage_event(recent_event.clone()); + txn.commit().await.unwrap(); + + // Test with no retention period. + let events = state + .get_and_prune_storage_usage(None, boot_ts) + .await + .unwrap(); + assert_eq!(events.len(), 2); + assert!(events.contains(&old_event)); + assert!(events.contains(&recent_event)); + + // Test with some retention period. + let events = state + .get_and_prune_storage_usage(Some(Duration::from_millis(10)), boot_ts) + .await + .unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events.into_element(), recent_event); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_system_items() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_system_items(openable_state).await; + debug_factory.drop().await; +} + +async fn test_system_items( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mappings = [ + SystemObjectMapping { + description: SystemObjectDescription { + schema_name: "foo".to_string(), + object_type: CatalogItemType::Table, + object_name: "bar".to_string(), + }, + unique_identifier: SystemObjectUniqueIdentifier { + id: GlobalId::System(42), + fingerprint: "abcd".to_string(), + }, + }, + SystemObjectMapping { + description: SystemObjectDescription { + schema_name: "public".to_string(), + object_type: CatalogItemType::View, + object_name: "v".to_string(), + }, + unique_identifier: SystemObjectUniqueIdentifier { + id: GlobalId::System(43), + fingerprint: "hsokj".to_string(), + }, + }, + ]; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + state.set_system_items(mappings.to_vec()).await.unwrap(); + + assert_eq!(state.get_system_items().await.unwrap(), mappings.to_vec()); + let snapshot_mappings: Vec<_> = state + .snapshot() + .await + .unwrap() + .system_object_mappings + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| SystemObjectMapping::from_key_value(k, v)) + .collect::>() + .unwrap(); + assert_eq!(snapshot_mappings, mappings.to_vec()) +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_introspection_source_indexes() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_introspection_source_indexes(openable_state).await; + debug_factory.drop().await; +} + +async fn test_introspection_source_indexes( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mappings = [ + IntrospectionSourceIndex { + cluster_id: ClusterId::User(1), + name: "foo".to_string(), + index_id: GlobalId::System(1), + }, + IntrospectionSourceIndex { + cluster_id: ClusterId::User(2), + name: "bar".to_string(), + index_id: GlobalId::System(2), + }, + ]; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + state + .set_introspection_source_indexes(mappings.to_vec()) + .await + .unwrap(); + + assert_eq!( + state + .get_introspection_source_indexes(mappings[0].cluster_id) + .await + .unwrap() + .get(&mappings[0].name) + .unwrap(), + &mappings[0].index_id + ); + assert_eq!( + state + .get_introspection_source_indexes(mappings[1].cluster_id) + .await + .unwrap() + .get(&mappings[1].name) + .unwrap(), + &mappings[1].index_id + ); + let snapshot_mappings: Vec<_> = state + .snapshot() + .await + .unwrap() + .introspection_sources + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| IntrospectionSourceIndex::from_key_value(k, v)) + .collect::>() + .unwrap(); + assert_eq!(snapshot_mappings, mappings.to_vec()) +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_replicas() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_replicas(openable_state).await; + debug_factory.drop().await; +} + +async fn test_replicas( + mut openable_state: impl OpenableDurableCatalogState, +) { + let replica = ClusterReplica { + cluster_id: ClusterId::User(1), + replica_id: ReplicaId::User(1), + name: "foo".to_string(), + config: ReplicaConfig { + location: ReplicaLocation::Managed { + size: "1".to_string(), + availability_zone: None, + disk: false, + internal: false, + billed_as: None, + }, + logging: ReplicaLogging::default(), + idle_arrangement_merge_effort: None, + }, + owner_id: RoleId::User(1), + }; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + state + .set_replica_config( + replica.replica_id.clone(), + replica.cluster_id.clone(), + replica.name.clone(), + replica.config.clone(), + replica.owner_id.clone(), + ) + .await + .unwrap(); + + let persisted_replicas = state.get_cluster_replicas().await.unwrap(); + assert!(persisted_replicas.contains(&replica)); + let snapshot_replicas: Vec<_> = state + .snapshot() + .await + .unwrap() + .cluster_replicas + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| ClusterReplica::from_key_value(k, v)) + .collect::>() + .unwrap(); + assert!(snapshot_replicas.contains(&replica)); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_timestamps() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_timestamps(openable_state).await; + debug_factory.drop().await; +} + +async fn test_timestamps( + mut openable_state: impl OpenableDurableCatalogState, +) { + let timeline_timestamp = TimelineTimestamp { + timeline: Timeline::User("Mars".to_string()), + ts: mz_repr::Timestamp::new(42), + }; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + state + .set_timestamp(&timeline_timestamp.timeline, timeline_timestamp.ts) + .await + .unwrap(); + + assert_eq!( + state + .get_timestamp(&timeline_timestamp.timeline) + .await + .unwrap() + .unwrap(), + timeline_timestamp.ts + ); + let persisted_timestamps = state.get_timestamps().await.unwrap(); + assert!(persisted_timestamps.contains(&timeline_timestamp),); + let snapshot_timeline_timestamps: Vec<_> = state + .snapshot() + .await + .unwrap() + .timestamps + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| TimelineTimestamp::from_key_value(k, v)) + .collect::>() + .unwrap(); + assert!(snapshot_timeline_timestamps.contains(&timeline_timestamp),); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_deploy_generation() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_deploy_generation(openable_state).await; + debug_factory.drop().await; +} + +async fn test_deploy_generation( + mut openable_state: impl OpenableDurableCatalogState, +) { + let deploy_generation = 42; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + state + .set_deploy_generation(deploy_generation) + .await + .unwrap(); + + let snapshot_configs: Vec<_> = state + .snapshot() + .await + .unwrap() + .configs + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Config::from_key_value(k, v)) + .collect::>() + .unwrap(); + assert!(snapshot_configs.contains(&Config { + key: "deploy_generation".to_string(), + value: deploy_generation + })); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_allocate_id() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_allocate_id(openable_state).await; + debug_factory.drop().await; +} + +async fn test_allocate_id( + mut openable_state: impl OpenableDurableCatalogState, +) { + let id_type = USER_ITEM_ALLOC_KEY; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + let start_id = state.get_next_id(id_type).await.unwrap(); + let ids = state.allocate_id(id_type, 3).await.unwrap(); + assert_eq!(ids, (start_id..(start_id + 3)).collect::>()); + + let snapshot_id_allocs: Vec<_> = state + .snapshot() + .await + .unwrap() + .id_allocator + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| IdAlloc::from_key_value(k, v)) + .collect::>() + .unwrap(); + assert!(snapshot_id_allocs.contains(&IdAlloc { + name: id_type.to_string(), + next_id: start_id + 3, + })); +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_clusters() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_clusters(openable_state).await; + debug_factory.drop().await; +} + +async fn test_clusters( + mut openable_state: impl OpenableDurableCatalogState, +) { + let clusters = [ + Cluster { + id: ClusterId::User(10), + name: "foo".to_string(), + linked_object_id: None, + owner_id: RoleId::User(1), + privileges: Vec::new(), + config: ClusterConfig { + variant: ClusterVariant::Unmanaged, + }, + }, + Cluster { + id: ClusterId::User(20), + name: "bar".to_string(), + linked_object_id: None, + owner_id: RoleId::User(2), + privileges: Vec::new(), + config: ClusterConfig { + variant: ClusterVariant::Unmanaged, + }, + }, + ]; + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + + let mut txn = state.transaction().await.unwrap(); + for cluster in &clusters { + txn.insert_user_cluster( + cluster.id, + &cluster.name, + cluster.linked_object_id, + Vec::new(), + cluster.owner_id, + cluster.privileges.clone(), + cluster.config.clone(), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_clusters = state.get_clusters().await.unwrap(); + for cluster in &clusters { + assert!(persisted_clusters.contains(cluster)); + } + + let snapshot_clusters: Vec<_> = state + .snapshot() + .await + .unwrap() + .clusters + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Cluster::from_key_value(k, v)) + .collect::>() + .unwrap(); + for cluster in &clusters { + assert!(snapshot_clusters.contains(cluster)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_databases() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_databases(openable_state).await; + debug_factory.drop().await; +} + +async fn test_databases( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let database_id = state.get_next_id(DATABASE_ID_ALLOC_KEY).await.unwrap(); + let databases = [ + Database { + id: DatabaseId::User(database_id), + name: "foo".to_string(), + owner_id: RoleId::User(1), + privileges: Vec::new(), + }, + Database { + id: DatabaseId::User(database_id + 1), + name: "bar".to_string(), + owner_id: RoleId::User(1), + privileges: Vec::new(), + }, + ]; + + let mut txn = state.transaction().await.unwrap(); + for database in &databases { + txn.insert_user_database( + &database.name, + database.owner_id, + database.privileges.clone(), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_databases = state.get_databases().await.unwrap(); + for database in &databases { + assert!(persisted_databases.contains(database)); + } + + let snapshot_databases: Vec<_> = state + .snapshot() + .await + .unwrap() + .databases + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Database::from_key_value(k, v)) + .collect::>() + .unwrap(); + for database in &databases { + assert!(snapshot_databases.contains(database)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_schemas() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_schemas(openable_state).await; + debug_factory.drop().await; +} + +async fn test_schemas( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let schema_id = state.get_next_id(SCHEMA_ID_ALLOC_KEY).await.unwrap(); + let schemas = [ + Schema { + id: SchemaId::User(schema_id), + name: "foo".to_string(), + database_id: Some(DatabaseId::User(1)), + owner_id: RoleId::User(1), + privileges: Vec::new(), + }, + Schema { + id: SchemaId::User(schema_id + 1), + name: "bar".to_string(), + database_id: Some(DatabaseId::User(1)), + owner_id: RoleId::User(1), + privileges: Vec::new(), + }, + ]; + + let mut txn = state.transaction().await.unwrap(); + for schema in &schemas { + txn.insert_user_schema( + schema.database_id.clone().expect("set above"), + &schema.name, + schema.owner_id, + schema.privileges.clone(), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_schemas = state.get_schemas().await.unwrap(); + for schema in &schemas { + assert!(persisted_schemas.contains(schema)); + } + + let snapshot_schemas: Vec<_> = state + .snapshot() + .await + .unwrap() + .schemas + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Schema::from_key_value(k, v)) + .collect::>() + .unwrap(); + for schema in &schemas { + assert!(snapshot_schemas.contains(schema)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_roles() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_roles(openable_state).await; + debug_factory.drop().await; +} + +async fn test_roles( + mut openable_state: impl OpenableDurableCatalogState, +) { + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let role_id = state.get_next_id(USER_ROLE_ID_ALLOC_KEY).await.unwrap(); + let roles = [ + Role { + id: RoleId::User(role_id), + name: "foo".to_string(), + attributes: RoleAttributes::new(), + membership: RoleMembership { + map: Default::default(), + }, + vars: Default::default(), + }, + Role { + id: RoleId::User(role_id + 1), + name: "bar".to_string(), + attributes: RoleAttributes::new(), + membership: RoleMembership { + map: Default::default(), + }, + vars: Default::default(), + }, + ]; + + let mut txn = state.transaction().await.unwrap(); + for role in &roles { + txn.insert_user_role( + role.name.clone(), + role.attributes.clone(), + role.membership.clone(), + role.vars.clone(), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_roles = state.get_roles().await.unwrap(); + for role in &roles { + assert!(persisted_roles.contains(role)); + } + + let snapshot_roles: Vec<_> = state + .snapshot() + .await + .unwrap() + .roles + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Role::from_key_value(k, v)) + .collect::>() + .unwrap(); + for role in &roles { + assert!(snapshot_roles.contains(role)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_default_privileges() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_default_privileges(openable_state).await; + debug_factory.drop().await; +} + +async fn test_default_privileges( + mut openable_state: impl OpenableDurableCatalogState, +) { + let default_privileges = [ + DefaultPrivilege { + object: DefaultPrivilegeObject { + role_id: RoleId::User(1), + database_id: None, + schema_id: None, + object_type: ObjectType::Table, + }, + acl_item: DefaultPrivilegeAclItem { + grantee: RoleId::User(2), + acl_mode: AclMode::all(), + }, + }, + DefaultPrivilege { + object: DefaultPrivilegeObject { + role_id: RoleId::User(3), + database_id: None, + schema_id: None, + object_type: ObjectType::View, + }, + acl_item: DefaultPrivilegeAclItem { + grantee: RoleId::User(4), + acl_mode: AclMode::CREATE, + }, + }, + ]; + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + for default_privilege in &default_privileges { + txn.set_default_privilege( + default_privilege.object.role_id, + default_privilege.object.database_id, + default_privilege.object.schema_id, + default_privilege.object.object_type, + default_privilege.acl_item.grantee, + Some(default_privilege.acl_item.acl_mode), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_default_privileges = state.get_default_privileges().await.unwrap(); + for default_privilege in &default_privileges { + assert!(persisted_default_privileges.contains(default_privilege)); + } + + let snapshot_default_privileges: Vec<_> = state + .snapshot() + .await + .unwrap() + .default_privileges + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| DefaultPrivilege::from_key_value(k, v)) + .collect::>() + .unwrap(); + for default_privilege in &default_privileges { + assert!(snapshot_default_privileges.contains(default_privilege)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_system_privileges() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_system_privileges(openable_state).await; + debug_factory.drop().await; +} + +async fn test_system_privileges( + mut openable_state: impl OpenableDurableCatalogState, +) { + let system_privileges = [ + MzAclItem { + grantee: RoleId::User(1), + grantor: RoleId::User(2), + acl_mode: AclMode::CREATE_ROLE, + }, + MzAclItem { + grantee: RoleId::User(3), + grantor: RoleId::User(4), + acl_mode: AclMode::CREATE_CLUSTER, + }, + ]; + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + for system_privilege in &system_privileges { + txn.set_system_privilege( + system_privilege.grantee, + system_privilege.grantor, + Some(system_privilege.acl_mode), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_system_privileges = state.get_system_privileges().await.unwrap(); + for system_privilege in &system_privileges { + assert!(persisted_system_privileges.contains(system_privilege)); + } + + let snapshot_system_privileges: Vec<_> = state + .snapshot() + .await + .unwrap() + .system_privileges + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| MzAclItem::from_key_value(k, v)) + .collect::>() + .unwrap(); + for system_privilege in &system_privileges { + assert!(snapshot_system_privileges.contains(system_privilege)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_system_configurations() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_system_configurations(openable_state).await; + debug_factory.drop().await; +} + +async fn test_system_configurations( + mut openable_state: impl OpenableDurableCatalogState, +) { + let system_configurations = [ + SystemConfiguration { + name: "foo".to_string(), + value: "bar".to_string(), + }, + SystemConfiguration { + name: "vewasdewd".to_string(), + value: "asiewvw".to_string(), + }, + ]; + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + for system_configuration in &system_configurations { + txn.upsert_system_config( + &system_configuration.name, + system_configuration.value.clone(), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_system_configurations = state.get_system_configurations().await.unwrap(); + for system_configuration in &system_configurations { + assert!(persisted_system_configurations.contains(system_configuration)); + } + + let snapshot_system_configurations: Vec<_> = state + .snapshot() + .await + .unwrap() + .system_configurations + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| SystemConfiguration::from_key_value(k, v)) + .collect::>() + .unwrap(); + for system_configuration in &system_configurations { + assert!(snapshot_system_configurations.contains(system_configuration)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_comments() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_comments(openable_state).await; + debug_factory.drop().await; +} + +async fn test_comments( + mut openable_state: impl OpenableDurableCatalogState, +) { + let comments = [ + Comment { + object_id: CommentObjectId::Database(DatabaseId::User(1)), + sub_component: None, + comment: "cool".to_string(), + }, + Comment { + object_id: CommentObjectId::View(GlobalId::User(1)), + sub_component: Some(1), + comment: "nice".to_string(), + }, + ]; + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + for comment in &comments { + txn.update_comment( + comment.object_id.clone(), + comment.sub_component.clone(), + Some(comment.comment.clone()), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let persisted_comments = state.get_comments().await.unwrap(); + for comment in &comments { + assert!(persisted_comments.contains(comment)); + } + + let snapshot_comments: Vec<_> = state + .snapshot() + .await + .unwrap() + .comments + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Comment::from_key_value(k, v)) + .collect::>() + .unwrap(); + for comment in &comments { + assert!(snapshot_comments.contains(comment)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_audit_logs() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_audit_logs(openable_state).await; + debug_factory.drop().await; +} + +async fn test_audit_logs( + mut openable_state: impl OpenableDurableCatalogState, +) { + let audit_logs = [ + VersionedEvent::V1(EventV1 { + id: 100, + event_type: EventType::Create, + object_type: mz_audit_log::ObjectType::ClusterReplica, + details: EventDetails::CreateClusterReplicaV1(CreateClusterReplicaV1 { + cluster_id: "1".to_string(), + cluster_name: "foo".to_string(), + replica_id: Some("1".to_string()), + replica_name: "bar".to_string(), + logical_size: "1".to_string(), + disk: false, + billed_as: None, + internal: false, + }), + user: Some("joe".to_string()), + occurred_at: 100, + }), + VersionedEvent::V1(EventV1 { + id: 200, + event_type: EventType::Drop, + object_type: mz_audit_log::ObjectType::View, + details: EventDetails::IdNameV1(IdNameV1 { + id: "2".to_string(), + name: "v".to_string(), + }), + user: Some("mike".to_string()), + occurred_at: 200, + }), + ]; + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + for audit_log in &audit_logs { + txn.insert_audit_log_event(audit_log.clone()); + } + txn.commit().await.unwrap(); + + let persisted_audit_logs = state.get_audit_logs().await.unwrap(); + for audit_log in &audit_logs { + assert!(persisted_audit_logs.contains(audit_log)); + } +} + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_stash_items() { + let debug_factory = DebugStashFactory::new().await; + let openable_state = debug_stash_backed_catalog_state(&debug_factory); + test_items(openable_state).await; + debug_factory.drop().await; +} + +async fn test_items( + mut openable_state: impl OpenableDurableCatalogState, +) { + let items = [ + Item { + id: GlobalId::User(100), + schema_id: SchemaId::User(1), + name: "foo".to_string(), + create_sql: "CREATE VIEW v AS SELECT 1".to_string(), + owner_id: RoleId::User(1), + privileges: vec![], + }, + Item { + id: GlobalId::User(200), + schema_id: SchemaId::User(1), + name: "bar".to_string(), + create_sql: "CREATE MATERIALIZED VIEW mv AS SELECT 2".to_string(), + owner_id: RoleId::User(2), + privileges: vec![], + }, + ]; + + let mut state = openable_state + .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) + .await + .unwrap(); + let mut txn = state.transaction().await.unwrap(); + for item in &items { + txn.insert_item( + item.id, + item.schema_id, + &item.name, + item.create_sql.clone(), + item.owner_id, + item.privileges.clone(), + ) + .unwrap(); + } + txn.commit().await.unwrap(); + + let snapshot_items: Vec<_> = state + .snapshot() + .await + .unwrap() + .items + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Item::from_key_value(k, v)) + .collect::>() + .unwrap(); + for item in &items { + assert!(snapshot_items.contains(item)); + } +}