diff --git a/Cargo.lock b/Cargo.lock index 895d8d3ebfed0..dba2f5cb37aa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3687,7 +3687,9 @@ name = "mz-catalog" version = "0.0.0" dependencies = [ "async-trait", + "bytes", "derivative", + "differential-dataflow", "futures", "insta", "itertools", @@ -3696,6 +3698,8 @@ dependencies = [ "mz-controller", "mz-controller-types", "mz-ore", + "mz-persist-client", + "mz-persist-types", "mz-pgrepr", "mz-postgres-util", "mz-proto", @@ -3714,9 +3718,11 @@ dependencies = [ "serde", "serde_json", "thiserror", + "timely", "tokio", "tokio-postgres", "tracing", + "uuid", "workspace-hack", ] diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index e57f5bcf77bf8..f96486dac9b4d 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -454,8 +454,7 @@ impl Catalog { debug_stash_factory: &DebugStashFactory, now: NowFn, ) -> Result { - let mut openable_storage = - mz_catalog::debug_stash_backed_catalog_state(debug_stash_factory); + let openable_storage = mz_catalog::debug_stash_backed_catalog_state(debug_stash_factory); let storage = openable_storage .open(now.clone(), &debug_bootstrap_args(), None) .await?; @@ -479,7 +478,7 @@ impl Catalog { schema: Some(schema), tls, }; - let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); + let openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); let storage = openable_storage .open(now.clone(), &debug_bootstrap_args(), None) .await?; @@ -493,7 +492,7 @@ impl Catalog { stash_config: StashConfig, now: NowFn, ) -> Result { - let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); + let openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); let storage = openable_storage .open_read_only(now.clone(), &debug_bootstrap_args()) .await?; diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 766d34b607342..f1a3e8af101d9 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -8,7 +8,9 @@ publish = false [dependencies] async-trait = "0.1.68" +bytes = { version = "1.3.0", features = ["serde"] } derivative = "2.2.0" +differential-dataflow = "0.12.0" futures = "0.3.25" itertools = "0.10.5" once_cell = "1.16.0" @@ -17,6 +19,8 @@ mz-compute-client = { path = "../compute-client" } mz-controller = { path = "../controller" } mz-controller-types = { path = "../controller-types" } mz-ore = { path = "../ore", features = ["chrono", "async", "tracing_"] } +mz-persist-client = { path = "../persist-client" } +mz-persist-types = { path = "../persist-types" } mz-pgrepr = { path = "../pgrepr" } mz-proto = { path = "../proto" } mz-repr = { path = "../repr", features = ["tracing_"] } @@ -31,8 +35,10 @@ proptest-derive = { version = "0.3.0", features = ["boxed_union"] } postgres-openssl = { version = "0.5.0" } serde = "1.0.152" serde_json = "1.0.89" +timely = { version = "0.12.0", default-features = false } tracing = "0.1.37" thiserror = "1.0.37" +uuid = "1.2.2" workspace-hack = { version = "0.0.0", path = "../workspace-hack" } [dev-dependencies] diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 3086fa3387c05..2e2e701d3db3a 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -84,6 +84,7 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::num::NonZeroI64; use std::time::Duration; +use uuid::Uuid; use mz_stash::DebugStashFactory; @@ -94,6 +95,7 @@ pub use crate::objects::{ SystemConfiguration, SystemObjectMapping, TimelineTimestamp, }; use crate::objects::{IntrospectionSourceIndex, Snapshot}; +use crate::persist::{PersistCatalogState, PersistHandle}; use crate::stash::{Connection, DebugOpenableConnection, OpenableConnection}; pub use crate::stash::{ StashConfig, ALL_COLLECTIONS, AUDIT_LOG_COLLECTION, CLUSTER_COLLECTION, @@ -109,6 +111,7 @@ use mz_audit_log::{VersionedEvent, VersionedStorageUsage}; use mz_controller_types::{ClusterId, ReplicaId}; use mz_ore::collections::CollectionExt; use mz_ore::now::NowFn; +use mz_persist_client::PersistClient; use mz_repr::adt::mz_acl_item::MzAclItem; use mz_repr::role_id::RoleId; use mz_repr::GlobalId; @@ -121,6 +124,7 @@ pub mod builtin; mod error; pub mod initialize; pub mod objects; +mod persist; pub const DATABASE_ID_ALLOC_KEY: &str = "database"; pub const SCHEMA_ID_ALLOC_KEY: &str = "schema"; @@ -141,6 +145,8 @@ pub struct BootstrapArgs { pub bootstrap_role: Option, } +pub type Epoch = NonZeroI64; + /// An API for opening a durable catalog state. #[async_trait] pub trait OpenableDurableCatalogState: Debug + Send { @@ -153,7 +159,7 @@ pub trait OpenableDurableCatalogState: Debug + Send { /// - Catalog initialization fails. /// - Catalog migrations fail. async fn open_savepoint( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, deploy_generation: Option, @@ -165,7 +171,7 @@ pub trait OpenableDurableCatalogState: Debug + Send { /// If the catalog is uninitialized or requires a migrations, then /// it will fail to open in read only mode. async fn open_read_only( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, ) -> Result; @@ -174,7 +180,7 @@ pub trait OpenableDurableCatalogState: Debug + Send { /// catalog, if it has not been initialized, and perform any migrations /// needed. async fn open( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, deploy_generation: Option, @@ -200,7 +206,7 @@ pub trait ReadOnlyDurableCatalogState: Debug + Send { /// for their epoch. /// /// NB: We may remove this in later iterations of Pv2. - fn epoch(&mut self) -> NonZeroI64; + fn epoch(&mut self) -> Epoch; /// Returns the version of Materialize that last wrote to the catalog. /// @@ -389,22 +395,29 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState { } } -/// Creates a durable catalog state implemented using the stash. The catalog status is unopened, -/// and must be opened before use. +/// Creates a openable durable catalog state implemented using the stash. pub fn stash_backed_catalog_state( config: StashConfig, ) -> impl OpenableDurableCatalogState { OpenableConnection::new(config) } -/// Creates a debug durable catalog state implemented using the stash that is meant to be used in -/// tests. The catalog status is unopened, and must be opened before use. +/// Creates an openable debug durable catalog state implemented using the stash that is meant to be +/// used in tests. pub fn debug_stash_backed_catalog_state( debug_stash_factory: &DebugStashFactory, ) -> impl OpenableDurableCatalogState + '_ { DebugOpenableConnection::new(debug_stash_factory) } +/// Creates an openable durable catalog state implemented using persist. +pub async fn persist_backed_catalog_state( + persist_client: PersistClient, + environment_id: Uuid, +) -> impl OpenableDurableCatalogState { + PersistHandle::new(persist_client, environment_id).await +} + pub fn debug_bootstrap_args() -> BootstrapArgs { BootstrapArgs { default_cluster_replica_size: "1".into(), diff --git a/src/catalog/src/objects.rs b/src/catalog/src/objects.rs index ec385deb0a00c..407b3e50127d2 100644 --- a/src/catalog/src/objects.rs +++ b/src/catalog/src/objects.rs @@ -924,7 +924,7 @@ impl RustType for IdAllocKey { } } -#[derive(Clone, PartialOrd, PartialEq, Eq, Ord)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)] pub struct IdAllocValue { pub(crate) next_id: u64, } diff --git a/src/catalog/src/persist.rs b/src/catalog/src/persist.rs new file mode 100644 index 0000000000000..0ffe69f6a037f --- /dev/null +++ b/src/catalog/src/persist.rs @@ -0,0 +1,1122 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::str::FromStr; + +use async_trait::async_trait; +use differential_dataflow::lattice::Lattice; +use itertools::Itertools; +use mz_audit_log::{VersionedEvent, VersionedStorageUsage}; +use mz_controller_types::{ClusterId, ReplicaId}; +use mz_ore::now::NowFn; +use mz_ore::{soft_assert, soft_assert_eq}; +use mz_persist_client::read::ReadHandle; +use mz_persist_client::write::WriteHandle; +use mz_persist_client::{Diagnostics, PersistClient, ShardId}; +use mz_persist_types::codec_impls::{ + SimpleDecoder, SimpleEncoder, SimpleSchema, UnitSchema, VecU8Schema, +}; +use mz_persist_types::dyn_struct::{ColumnsMut, ColumnsRef, DynStructCfg}; +use mz_persist_types::Codec; +use mz_proto::RustType; +use mz_repr::adt::mz_acl_item::MzAclItem; +use mz_repr::role_id::RoleId; +use mz_repr::{Diff, GlobalId}; +use mz_stash::USER_VERSION_KEY; +use mz_stash_types::objects::proto; +use mz_stash_types::STASH_VERSION; +use mz_storage_types::sources::Timeline; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; +use timely::progress::Antichain; +use tracing::debug; +use uuid::Uuid; + +use crate::initialize::DEPLOY_GENERATION; +use crate::objects::{ + AuditLogKey, ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue, + DurableType, IntrospectionSourceIndex, Snapshot, StorageUsageKey, TimestampValue, +}; +use crate::transaction::{ + add_new_builtin_cluster_replicas_migration, add_new_builtin_clusters_migration, + TransactionBatch, +}; +use crate::{ + initialize, BootstrapArgs, CatalogError, Cluster, ClusterReplica, Comment, Database, + DefaultPrivilege, DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState, + ReadOnlyDurableCatalogState, ReplicaConfig, Role, Schema, SystemConfiguration, + SystemObjectMapping, TimelineTimestamp, Transaction, +}; + +/// New-type used to represent timestamps in persist. +type Timestamp = mz_repr::Timestamp; + +/// Durable catalog mode that dictates the effect of mutable operations. +#[derive(Debug)] +enum Mode { + /// Mutable operations are prohibited. + Readonly, + /// Mutable operations have an effect in-memory, but aren't persisted durably. + Savepoint, + /// Mutable operations have an effect in-memory and durably. + Writable, +} + +/// A single update to the catalog state. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct StateUpdate { + /// They kind and contents of the state update. + kind: StateUpdateKind, + /// The timestamp at which the update occurred. + ts: Timestamp, + /// Record count difference for the update. + diff: Diff, +} + +impl StateUpdate { + /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s at timestamp `ts`. + fn from_txn_batch(txn_batch: TransactionBatch, ts: Timestamp) -> Vec { + fn from_batch( + batch: Vec<(K, V, Diff)>, + ts: Timestamp, + kind: fn(K, V) -> StateUpdateKind, + ) -> impl Iterator { + batch.into_iter().map(move |(k, v, diff)| StateUpdate { + kind: kind(k, v), + ts, + diff, + }) + } + let TransactionBatch { + databases, + schemas, + items, + comments, + roles, + clusters, + cluster_replicas, + introspection_sources, + id_allocator, + configs, + settings, + timestamps, + system_gid_mapping, + system_configurations, + default_privileges, + system_privileges, + audit_log_updates, + storage_usage_updates, + } = txn_batch; + let databases = from_batch(databases, ts, StateUpdateKind::Database); + let schemas = from_batch(schemas, ts, StateUpdateKind::Schema); + let items = from_batch(items, ts, StateUpdateKind::Item); + let comments = from_batch(comments, ts, StateUpdateKind::Comment); + let roles = from_batch(roles, ts, StateUpdateKind::Role); + let clusters = from_batch(clusters, ts, StateUpdateKind::Cluster); + let cluster_replicas = from_batch(cluster_replicas, ts, StateUpdateKind::ClusterReplica); + let introspection_sources = from_batch( + introspection_sources, + ts, + StateUpdateKind::IntrospectionSourceIndex, + ); + let id_allocators = from_batch(id_allocator, ts, StateUpdateKind::IdAllocator); + let configs = from_batch(configs, ts, StateUpdateKind::Config); + let settings = from_batch(settings, ts, StateUpdateKind::Setting); + let timestamps = from_batch(timestamps, ts, StateUpdateKind::Timestamp); + let system_object_mappings = + from_batch(system_gid_mapping, ts, StateUpdateKind::SystemObjectMapping); + let system_configurations = from_batch( + system_configurations, + ts, + StateUpdateKind::SystemConfiguration, + ); + let default_privileges = + from_batch(default_privileges, ts, StateUpdateKind::DefaultPrivilege); + let system_privileges = from_batch(system_privileges, ts, StateUpdateKind::SystemPrivilege); + let audit_logs = from_batch(audit_log_updates, ts, StateUpdateKind::AuditLog); + let storage_usage_updates = + from_batch(storage_usage_updates, ts, StateUpdateKind::StorageUsage); + + databases + .chain(schemas) + .chain(items) + .chain(comments) + .chain(roles) + .chain(clusters) + .chain(cluster_replicas) + .chain(introspection_sources) + .chain(id_allocators) + .chain(configs) + .chain(settings) + .chain(timestamps) + .chain(system_object_mappings) + .chain(system_configurations) + .chain(default_privileges) + .chain(system_privileges) + .chain(audit_logs) + .chain(storage_usage_updates) + .collect() + } +} + +/// The contents of a single state update. +// TODO(jkosh44) Remove Serde when we switch to proto fully. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum StateUpdateKind { + AuditLog(proto::AuditLogKey, ()), + Cluster(proto::ClusterKey, proto::ClusterValue), + ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue), + Comment(proto::CommentKey, proto::CommentValue), + Config(proto::ConfigKey, proto::ConfigValue), + Database(proto::DatabaseKey, proto::DatabaseValue), + DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue), + Epoch(Epoch), + IdAllocator(proto::IdAllocKey, proto::IdAllocValue), + IntrospectionSourceIndex( + proto::ClusterIntrospectionSourceIndexKey, + proto::ClusterIntrospectionSourceIndexValue, + ), + Item(proto::ItemKey, proto::ItemValue), + Role(proto::RoleKey, proto::RoleValue), + Schema(proto::SchemaKey, proto::SchemaValue), + Setting(proto::SettingKey, proto::SettingValue), + StorageUsage(proto::StorageUsageKey, ()), + SystemConfiguration( + proto::ServerConfigurationKey, + proto::ServerConfigurationValue, + ), + SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue), + SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue), + Timestamp(proto::TimestampKey, proto::TimestampValue), +} + +// TODO(jkosh44) As an initial implementation we simply serialize and deserialize everything as a +// JSON. Before enabling in production, we'd like to utilize Protobuf to serialize and deserialize +// everything. +impl Codec for StateUpdateKind { + type Schema = VecU8Schema; + + fn codec_name() -> String { + "StateUpdateJson".to_string() + } + + fn encode(&self, buf: &mut B) + where + B: bytes::BufMut, + { + let bytes = serde_json::to_vec(&self).expect("failed to encode StateUpdate"); + buf.put(bytes.as_slice()); + } + + fn decode<'a>(buf: &'a [u8]) -> Result { + serde_json::from_slice(buf).map_err(|err| err.to_string()) + } +} + +impl mz_persist_types::columnar::Schema for VecU8Schema { + type Encoder<'a> = SimpleEncoder<'a, StateUpdateKind, Vec>; + + type Decoder<'a> = SimpleDecoder<'a, StateUpdateKind, Vec>; + + fn columns(&self) -> DynStructCfg { + SimpleSchema::>::columns(&()) + } + + fn decoder<'a>(&self, cols: ColumnsRef<'a>) -> Result, String> { + SimpleSchema::>::decoder(cols, |val, ret| { + *ret = StateUpdateKind::decode(val).expect("should be valid StateUpdateKind") + }) + } + + fn encoder<'a>(&self, cols: ColumnsMut<'a>) -> Result, String> { + SimpleSchema::>::push_encoder(cols, |col, val| { + let mut buf = Vec::new(); + StateUpdateKind::encode(val, &mut buf); + mz_persist_types::columnar::ColumnPush::>::push(col, &buf) + }) + } +} + +/// Handles and metadata needed to interact with persist. +/// +/// Production users should call [`Self::_expire`] before dropping a [`PersistHandle`] so that it +/// can expire its leases. If/when rust gets AsyncDrop, this will be done automatically. +#[derive(Debug)] +pub struct PersistHandle { + /// Write handle to persist. + write_handle: WriteHandle, + /// Read handle to persist. + read_handle: ReadHandle, +} + +impl PersistHandle { + /// Create a new [`PersistHandle`] to the catalog state associated with `environment_id`. + pub(crate) async fn new(persist_client: PersistClient, environment_id: Uuid) -> PersistHandle { + // TODO(jkosh44) Using the environment ID directly is sufficient for correctness purposes. + // However, for observability reasons, it would be much more readble if the shard ID was + // constructed as `format!("s{}CATALOG", hash(environment_id))`. + let shard_id = ShardId::from_str(&format!("s{environment_id}")).expect("known to be valid"); + let (write_handle, read_handle) = persist_client + .open( + shard_id, + Arc::new(VecU8Schema::default()), + Arc::new(UnitSchema), + Diagnostics { + shard_name: "catalog".to_string(), + handle_purpose: "durable catalog state".to_string(), + }, + ) + .await + .expect("invalid usage"); + PersistHandle { + write_handle, + read_handle, + } + } + + async fn open_inner( + mut self, + mode: Mode, + now: NowFn, + bootstrap_args: &BootstrapArgs, + deploy_generation: Option, + ) -> Result { + let (is_initialized, upper) = self.is_initialized_inner().await; + + if !matches!(mode, Mode::Writable) && !is_initialized { + return Err(CatalogError::Durable(DurableCatalogError::NotWritable( + format!("catalog tables do not exist; will not create in {mode:?} mode"), + ))); + } + + let version = self.get_user_version(upper).await; + let read_only = matches!(mode, Mode::Readonly); + + // Grab the current catalog contents from persist. + let mut initial_snapshot: Vec<_> = self.snapshot(upper).await.collect(); + // Sniff out the most recent epoch. + let mut epoch = if is_initialized { + let epoch_idx = initial_snapshot + .iter() + .rposition(|update| { + matches!( + update, + StateUpdate { + kind: StateUpdateKind::Epoch(_), + .. + } + ) + }) + .expect("initialized catalog must have an epoch"); + match initial_snapshot.remove(epoch_idx) { + StateUpdate { + kind: StateUpdateKind::Epoch(epoch), + diff, + .. + } => { + soft_assert_eq!(diff, 1); + epoch + } + _ => unreachable!("checked above"), + } + } else { + Epoch::new(1).expect("known to be non-zero") + }; + // Note only writable catalogs attempt to increment the epoch. + if matches!(mode, Mode::Writable) { + epoch = Epoch::new(epoch.get() + 1).expect("known to be non-zero"); + } + + let mut catalog = PersistCatalogState { + mode, + persist_handle: self, + upper, + epoch, + // Initialize empty in-memory state. + snapshot: Snapshot::empty(), + fenced: false, + }; + + let mut txn = if is_initialized { + if !read_only { + let version = + version.ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))?; + if version != STASH_VERSION { + // TODO(jkosh44) Implement migrations. + panic!("the persist catalog does not know how to perform migrations yet"); + } + } + + // Update in-memory contents with with persist snapshot. + catalog.apply_updates(initial_snapshot.into_iter())?; + + let mut txn = catalog.transaction().await?; + if let Some(deploy_generation) = deploy_generation { + txn.set_config(DEPLOY_GENERATION.into(), deploy_generation)?; + } + txn + } else { + // Get the current timestamp so we can record when we booted. We don't have to worry + // about `boot_ts` being less than a previously used timestamp because the catalog is + // uninitialized and there are no previous timestamps. + let boot_ts = now(); + + let mut txn = catalog.transaction().await?; + initialize::initialize(&mut txn, bootstrap_args, boot_ts, deploy_generation).await?; + txn + }; + + if !read_only { + add_new_builtin_clusters_migration(&mut txn)?; + add_new_builtin_cluster_replicas_migration(&mut txn, bootstrap_args)?; + } + + if read_only { + let (txn_batch, _) = txn.into_parts(); + // The upper here doesn't matter because we are only apply the updates in memory. + let updates = StateUpdate::from_txn_batch(txn_batch, catalog.upper); + catalog.apply_updates(updates.into_iter())?; + } else { + txn.commit().await?; + } + + Ok(catalog) + } + + /// Fetch the current upper of the catalog state. + // TODO(jkosh44) This isn't actually guaranteed to be linearizable. Before enabling this in + // production we need a new linearizable solution. + async fn current_upper(&mut self) -> Timestamp { + self.write_handle + .fetch_recent_upper() + .await + .as_option() + .cloned() + .expect("we use a totally ordered time and never finalize the shard") + } + + /// Reports if the catalog state has been initialized, and the current upper. + async fn is_initialized_inner(&mut self) -> (bool, Timestamp) { + let upper = self.current_upper().await; + (upper > 0.into(), upper) + } + + /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog + /// state up to, but not including, `upper`. + /// + /// The output is consolidated and sorted by timestamp in ascending order. + async fn snapshot( + &mut self, + upper: Timestamp, + ) -> impl Iterator + DoubleEndedIterator { + let snapshot = if upper > 0.into() { + let since = self.read_handle.since().clone(); + let mut as_of = upper.saturating_sub(1); + as_of.advance_by(since.borrow()); + self.read_handle + .snapshot_and_fetch(Antichain::from_elem(as_of)) + .await + .expect("we have advanced the restart_as_of by the since") + } else { + Vec::new() + }; + soft_assert!( + snapshot.iter().all(|(_, _, diff)| *diff == 1), + "snapshot_and_fetch guarantees a consolidated result" + ); + snapshot + .into_iter() + .map(|((key, _unit), ts, diff)| StateUpdate { + kind: key.expect("key decoding error"), + ts, + diff, + }) + .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts)) + } + + /// Get value of config `key`. + /// + /// Some configs need to be read before the catalog is opened for bootstrapping. + async fn get_config(&mut self, key: &str, upper: Timestamp) -> Option { + let mut configs: Vec<_> = self + .snapshot(upper) + .await + .rev() + .filter_map( + |StateUpdate { + kind, + ts: _, + diff: _, + }| match kind { + StateUpdateKind::Config(k, v) if k.key == key => Some(v.value), + _ => None, + }, + ) + .collect(); + soft_assert!( + configs.len() <= 1, + "multiple configs should not share the same key: {configs:?}" + ); + configs.pop() + } + + /// Get the user version of this instance. + /// + /// The user version is used to determine if a migration is needed. + async fn get_user_version(&mut self, upper: Timestamp) -> Option { + self.get_config(USER_VERSION_KEY, upper).await + } + + /// Politely expires all handles to persist, releasing their leases. + // TODO(jkosh44) Before enabling this in production we need to expose this through the trait. + async fn _expire(self) { + self.read_handle.expire().await; + self.write_handle.expire().await; + } +} + +#[async_trait] +impl OpenableDurableCatalogState for PersistHandle { + async fn open_savepoint( + mut self, + now: NowFn, + bootstrap_args: &BootstrapArgs, + deploy_generation: Option, + ) -> Result { + self.open_inner(Mode::Savepoint, now, bootstrap_args, deploy_generation) + .await + } + + async fn open_read_only( + mut self, + now: NowFn, + bootstrap_args: &BootstrapArgs, + ) -> Result { + self.open_inner(Mode::Readonly, now, bootstrap_args, None) + .await + } + + async fn open( + mut self, + now: NowFn, + bootstrap_args: &BootstrapArgs, + deploy_generation: Option, + ) -> Result { + self.open_inner(Mode::Writable, now, bootstrap_args, deploy_generation) + .await + } + + async fn is_initialized(&mut self) -> Result { + Ok(self.is_initialized_inner().await.0) + } + + async fn get_deployment_generation(&mut self) -> Result, CatalogError> { + let upper = self.current_upper().await; + Ok(self.get_config(DEPLOY_GENERATION, upper).await) + } +} + +/// A durable store of the catalog state using Persist as an implementation. +#[derive(Debug)] +pub struct PersistCatalogState { + /// The [`Mode`] that this catalog was opened in. + mode: Mode, + /// Handles to the persist shard containing the catalog. + persist_handle: PersistHandle, + /// The current upper of the persist shard. + upper: Timestamp, + /// The epoch of this catalog. + epoch: Epoch, + /// A cache of the entire catalogs state. + snapshot: Snapshot, + /// True if this catalog has fenced out older catalogs, false otherwise. + fenced: bool, +} + +impl PersistCatalogState { + /// Applies [`StateUpdate`]s to the in memory catalog cache. + fn apply_updates( + &mut self, + updates: impl Iterator, + ) -> Result<(), DurableCatalogError> { + fn apply(map: &mut BTreeMap, key: K, value: V, diff: Diff) { + if diff == 1 { + map.insert(key, value); + } else if diff == -1 { + map.remove(&key); + } + } + + for update in updates { + debug!("applying catalog update: {update:?}"); + match update { + StateUpdate { kind, ts: _, diff } if diff == 1 || diff == -1 => match kind { + StateUpdateKind::AuditLog(_, _) => { + // We can ignore audit log updates since it's not cached in memory. + } + StateUpdateKind::Cluster(key, value) => { + apply(&mut self.snapshot.clusters, key, value, diff); + } + StateUpdateKind::ClusterReplica(key, value) => { + apply(&mut self.snapshot.cluster_replicas, key, value, diff); + } + StateUpdateKind::Comment(key, value) => { + apply(&mut self.snapshot.comments, key, value, diff); + } + StateUpdateKind::Config(key, value) => { + apply(&mut self.snapshot.configs, key, value, diff); + } + StateUpdateKind::Database(key, value) => { + apply(&mut self.snapshot.databases, key, value, diff); + } + StateUpdateKind::DefaultPrivilege(key, value) => { + apply(&mut self.snapshot.default_privileges, key, value, diff); + } + StateUpdateKind::Epoch(epoch) => { + return Err(DurableCatalogError::Fence(format!( + "current catalog epoch {} fenced by new catalog epoch {}", + self.epoch, epoch + ))); + } + StateUpdateKind::IdAllocator(key, value) => { + apply(&mut self.snapshot.id_allocator, key, value, diff); + } + StateUpdateKind::IntrospectionSourceIndex(key, value) => { + apply(&mut self.snapshot.introspection_sources, key, value, diff); + } + StateUpdateKind::Item(key, value) => { + apply(&mut self.snapshot.items, key, value, diff); + } + StateUpdateKind::Role(key, value) => { + apply(&mut self.snapshot.roles, key, value, diff); + } + StateUpdateKind::Schema(key, value) => { + apply(&mut self.snapshot.schemas, key, value, diff); + } + StateUpdateKind::Setting(key, value) => { + apply(&mut self.snapshot.settings, key, value, diff); + } + StateUpdateKind::StorageUsage(_, _) => { + // We can ignore storage usage since it's not cached in memory. + } + StateUpdateKind::SystemConfiguration(key, value) => { + apply(&mut self.snapshot.system_configurations, key, value, diff); + } + StateUpdateKind::SystemObjectMapping(key, value) => { + apply(&mut self.snapshot.system_object_mappings, key, value, diff); + } + StateUpdateKind::SystemPrivilege(key, value) => { + apply(&mut self.snapshot.system_privileges, key, value, diff); + } + StateUpdateKind::Timestamp(key, value) => { + apply(&mut self.snapshot.timestamps, key, value, diff); + } + }, + invalid_update => { + panic!("invalid update in consolidated trace: {:?}", invalid_update); + } + } + } + + Ok(()) + } + + /// Execute and return the results of `f` on the current catalog snapshot. + /// + /// Will return an error if the catalog has been fenced out. + async fn with_snapshot( + &mut self, + f: impl FnOnce(&Snapshot) -> Result, + ) -> Result { + self.confirm_leadership().await?; + f(&self.snapshot) + } +} + +#[async_trait] +impl ReadOnlyDurableCatalogState for PersistCatalogState { + fn epoch(&mut self) -> Epoch { + self.epoch + } + + async fn get_catalog_content_version(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .settings + .get(&proto::SettingKey { + name: "catalog_content_version".to_string(), + }) + .map(|value| value.value.clone())) + }) + .await + } + + async fn get_clusters(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .clusters + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Cluster::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_cluster_replicas(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .cluster_replicas + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| ClusterReplica::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_databases(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .databases + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Database::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_schemas(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .schemas + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Schema::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_system_items(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .system_object_mappings + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| SystemObjectMapping::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_introspection_source_indexes( + &mut self, + cluster_id: ClusterId, + ) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .introspection_sources + .clone() + .into_iter() + .map(RustType::from_proto) + .filter_map_ok( + |(k, v): ( + ClusterIntrospectionSourceIndexKey, + ClusterIntrospectionSourceIndexValue, + )| { + if k.cluster_id == cluster_id { + Some((k.name, GlobalId::System(v.index_id))) + } else { + None + } + }, + ) + .collect::>()?) + }) + .await + } + + async fn get_roles(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .roles + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Role::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_default_privileges(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .default_privileges + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| DefaultPrivilege::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_system_privileges(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .system_privileges + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| MzAclItem::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_system_configurations( + &mut self, + ) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .system_configurations + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| SystemConfiguration::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_comments(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .comments + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| Comment::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_timestamps(&mut self) -> Result, CatalogError> { + self.with_snapshot(|snapshot| { + Ok(snapshot + .timestamps + .clone() + .into_iter() + .map(RustType::from_proto) + .map_ok(|(k, v)| TimelineTimestamp::from_key_value(k, v)) + .collect::>()?) + }) + .await + } + + async fn get_timestamp( + &mut self, + timeline: &Timeline, + ) -> Result, CatalogError> { + let key = proto::TimestampKey { + id: timeline.to_string(), + }; + self.with_snapshot(|snapshot| { + let val: Option = snapshot + .timestamps + .get(&key) + .cloned() + .map(RustType::from_proto) + .transpose()?; + Ok(val.map(|v| v.ts)) + }) + .await + } + + async fn get_audit_logs(&mut self) -> Result, CatalogError> { + self.confirm_leadership().await?; + // This is only called during bootstrapping and we don't want to cache all + // audit logs in memory because they can grow quite large. Therefore, we + // go back to persist and grab everything again. + Ok(self + .persist_handle + .snapshot(self.upper) + .await + .filter_map( + |StateUpdate { + kind, + ts: _, + diff: _, + }| match kind { + StateUpdateKind::AuditLog(key, ()) => Some(key), + _ => None, + }, + ) + .map(RustType::from_proto) + .map_ok(|key: AuditLogKey| key.event) + .collect::>()?) + } + + async fn get_next_id(&mut self, id_type: &str) -> Result { + let key = proto::IdAllocKey { + name: id_type.to_string(), + }; + self.with_snapshot(|snapshot| { + Ok(snapshot.id_allocator.get(&key).expect("must exist").next_id) + }) + .await + } + + async fn snapshot(&mut self) -> Result { + self.with_snapshot(|snapshot| Ok(snapshot.clone())).await + } +} + +#[async_trait] +impl DurableCatalogState for PersistCatalogState { + fn is_read_only(&self) -> bool { + matches!(self.mode, Mode::Readonly) + } + + async fn transaction(&mut self) -> Result { + let snapshot = self.snapshot().await?; + Transaction::new(self, snapshot) + } + + async fn commit_transaction( + &mut self, + txn_batch: TransactionBatch, + ) -> Result<(), CatalogError> { + if self.is_read_only() { + return Err(DurableCatalogError::NotWritable( + "cannot commit a transaction in a read-only catalog".to_string(), + ) + .into()); + } + + let current_upper = self.upper.clone(); + let next_upper = current_upper.step_forward(); + + let updates = StateUpdate::from_txn_batch(txn_batch, current_upper); + self.apply_updates(updates.clone().into_iter())?; + + if matches!(self.mode, Mode::Writable) { + let mut batch_builder = self + .persist_handle + .write_handle + .builder(Antichain::from_elem(current_upper)); + + for StateUpdate { kind, ts, diff } in updates { + batch_builder + .add(&kind, &(), &ts, &diff) + .await + .expect("invalid usage"); + } + // If we haven't fenced the previous catalog state, do that now. + if !self.fenced { + batch_builder + .add(&StateUpdateKind::Epoch(self.epoch), &(), ¤t_upper, &1) + .await + .expect("invalid usage"); + } + let mut batch = batch_builder + .finish(Antichain::from_elem(next_upper)) + .await + .expect("invalid usage"); + match self + .persist_handle + .write_handle + .compare_and_append_batch( + &mut [&mut batch], + Antichain::from_elem(current_upper), + Antichain::from_elem(next_upper), + ) + .await + .expect("invalid usage") + { + Ok(()) => { + self.persist_handle + .read_handle + .downgrade_since(&Antichain::from_elem(current_upper)) + .await; + self.upper = next_upper; + } + Err(upper_mismatch) => { + return Err(DurableCatalogError::Fence(format!( + "current catalog upper {:?} fenced by new catalog upper {:?}", + upper_mismatch.expected, upper_mismatch.current + )) + .into()) + } + } + } + + self.fenced = true; + Ok(()) + } + + async fn confirm_leadership(&mut self) -> Result<(), CatalogError> { + let upper = self.persist_handle.current_upper().await; + if upper == self.upper { + Ok(()) + } else { + Err(DurableCatalogError::Fence(format!( + "current catalog upper {:?} fenced by new catalog upper {:?}", + self.upper, upper + )) + .into()) + } + } + + async fn set_connect_timeout(&mut self, _connect_timeout: Duration) { + // Persist is able to set this timeout internally so this is a no-op. The connect timeout + // passed to this function in production and the timeout Persist uses are both derived + // from the "crdb_connect_timeout" system variable. + } + + // TODO(jkosh44) For most modifications we delegate to transactions to avoid duplicate code. + // This is slightly inefficient because we have to clone an entire snapshot when we usually + // only need one part of the snapshot. This is mostly OK because most non-transaction + // modifications are only called once during bootstrap. The exceptions are `allocate_id` and + // `set_timestamp` Potential mitigations against these performance hits are: + // + // - Perform all modifications during bootstrap in a single transaction. + // - Utilize `CoW`s in `Transaction`s to avoid cloning unnecessary state. + + async fn set_catalog_content_version(&mut self, new_version: &str) -> Result<(), CatalogError> { + let mut txn = self.transaction().await?; + txn.set_setting( + "catalog_content_version".to_string(), + Some(new_version.to_string()), + )?; + txn.commit().await + } + + async fn get_and_prune_storage_usage( + &mut self, + retention_period: Option, + boot_ts: mz_repr::Timestamp, + ) -> Result, CatalogError> { + // If no usage retention period is set, set the cutoff to MIN so nothing + // is removed. + let cutoff_ts = match retention_period { + None => u128::MIN, + Some(period) => u128::from(boot_ts).saturating_sub(period.as_millis()), + }; + let storage_usage = self + .persist_handle + .snapshot(self.upper) + .await + .filter_map( + |StateUpdate { + kind, + ts: _, + diff: _, + }| match kind { + StateUpdateKind::StorageUsage(key, ()) => Some(key), + _ => None, + }, + ) + .map(RustType::from_proto) + .map_ok(|key: StorageUsageKey| key.metric); + let mut events = Vec::new(); + let mut expired = Vec::new(); + + for event in storage_usage { + let event = event?; + if u128::from(event.timestamp()) >= cutoff_ts { + events.push(event); + } else if retention_period.is_some() { + expired.push(event); + } + } + + if !self.is_read_only() { + let mut txn = self.transaction().await?; + txn.remove_storage_usage_events(expired); + txn.commit().await?; + } else { + self.confirm_leadership().await?; + } + + Ok(events) + } + + async fn set_system_items( + &mut self, + mappings: Vec, + ) -> Result<(), CatalogError> { + let mut txn = self.transaction().await?; + for mapping in mappings { + txn.set_system_object_mapping(mapping)?; + } + txn.commit().await + } + + async fn set_introspection_source_indexes( + &mut self, + mappings: Vec, + ) -> Result<(), CatalogError> { + let mut txn = self.transaction().await?; + for mapping in mappings { + txn.set_introspection_source_index(mapping)?; + } + txn.commit().await + } + + async fn set_replica_config( + &mut self, + replica_id: ReplicaId, + cluster_id: ClusterId, + name: String, + config: ReplicaConfig, + owner_id: RoleId, + ) -> Result<(), CatalogError> { + let mut txn = self.transaction().await?; + txn.set_replica(replica_id, cluster_id, name, config, owner_id)?; + txn.commit().await + } + + async fn set_timestamp( + &mut self, + timeline: &Timeline, + timestamp: mz_repr::Timestamp, + ) -> Result<(), CatalogError> { + let mut txn = self.transaction().await?; + txn.set_timestamp(timeline.clone(), timestamp)?; + txn.commit().await + } + + async fn set_deploy_generation(&mut self, deploy_generation: u64) -> Result<(), CatalogError> { + let mut txn = self.transaction().await?; + txn.set_config(DEPLOY_GENERATION.into(), deploy_generation)?; + txn.commit().await + } + + async fn allocate_id(&mut self, id_type: &str, amount: u64) -> Result, CatalogError> { + if amount == 0 { + return Ok(Vec::new()); + } + let mut txn = self.transaction().await?; + let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?; + txn.commit().await?; + Ok(ids) + } +} diff --git a/src/catalog/src/stash.rs b/src/catalog/src/stash.rs index 22ba2fdfda684..32e7a8e6afaf6 100644 --- a/src/catalog/src/stash.rs +++ b/src/catalog/src/stash.rs @@ -11,7 +11,6 @@ use async_trait::async_trait; use derivative::Derivative; use std::collections::BTreeMap; use std::iter::once; -use std::num::NonZeroI64; use std::pin; use std::sync::Arc; use std::time::Duration; @@ -49,8 +48,8 @@ use crate::transaction::{ TransactionBatch, }; use crate::{ - initialize, BootstrapArgs, CatalogError, DurableCatalogState, OpenableDurableCatalogState, - ReadOnlyDurableCatalogState, + initialize, BootstrapArgs, CatalogError, DurableCatalogState, Epoch, + OpenableDurableCatalogState, ReadOnlyDurableCatalogState, }; pub const SETTING_COLLECTION: TypedCollection = @@ -106,7 +105,7 @@ pub const SYSTEM_PRIVILEGES_COLLECTION: TypedCollection< // [`mz_stash::upgrade::v17_to_v18`] as an example. /// Configuration needed to connect to the stash. -#[derive(Derivative)] +#[derive(Derivative, Clone)] #[derivative(Debug)] pub struct StashConfig { pub stash_factory: StashFactory, @@ -187,7 +186,7 @@ impl OpenableConnection { impl OpenableDurableCatalogState for OpenableConnection { #[tracing::instrument(name = "storage::open_check", level = "info", skip_all)] async fn open_savepoint( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, deploy_generation: Option, @@ -199,7 +198,7 @@ impl OpenableDurableCatalogState for OpenableConnection { #[tracing::instrument(name = "storage::open_read_only", level = "info", skip_all)] async fn open_read_only( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, ) -> Result { @@ -210,7 +209,7 @@ impl OpenableDurableCatalogState for OpenableConnection { #[tracing::instrument(name = "storage::open", level = "info", skip_all)] async fn open( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, deploy_generation: Option, @@ -420,10 +419,10 @@ impl Connection { #[async_trait] impl ReadOnlyDurableCatalogState for Connection { - fn epoch(&mut self) -> NonZeroI64 { + fn epoch(&mut self) -> Epoch { self.stash .epoch() - .expect("a opened stash should always have an epoch number") + .expect("an opened stash should always have an epoch number") } async fn get_catalog_content_version(&mut self) -> Result, CatalogError> { @@ -1169,7 +1168,7 @@ impl DebugOpenableConnection<'_> { #[async_trait] impl OpenableDurableCatalogState for DebugOpenableConnection<'_> { async fn open_savepoint( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, deploy_generation: Option, @@ -1180,7 +1179,7 @@ impl OpenableDurableCatalogState for DebugOpenableConnection<'_> { } async fn open_read_only( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, ) -> Result { @@ -1190,7 +1189,7 @@ impl OpenableDurableCatalogState for DebugOpenableConnection<'_> { } async fn open( - &mut self, + mut self, now: NowFn, bootstrap_args: &BootstrapArgs, deploy_generation: Option, diff --git a/src/catalog/src/transaction.rs b/src/catalog/src/transaction.rs index b50a20bfe0ccc..6c04aaa84298b 100644 --- a/src/catalog/src/transaction.rs +++ b/src/catalog/src/transaction.rs @@ -11,7 +11,7 @@ use crate::builtin::{BuiltinLog, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS}; use crate::objects::{ AuditLogKey, Cluster, ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey, ClusterReplicaValue, ClusterValue, CommentKey, - CommentValue, ConfigKey, ConfigValue, Database, DatabaseKey, DatabaseValue, + CommentValue, Config, ConfigKey, ConfigValue, Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue, DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue, IntrospectionSourceIndex, Item, ItemKey, ItemValue, ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue, ServerConfigurationKey, @@ -20,7 +20,7 @@ use crate::objects::{ }; use crate::objects::{ClusterConfig, ClusterVariant}; use crate::{ - BootstrapArgs, CatalogError, DurableCatalogState, ReplicaLocation, Snapshot, + BootstrapArgs, CatalogError, DurableCatalogState, ReplicaLocation, Snapshot, TimelineTimestamp, DATABASE_ID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY, }; @@ -28,6 +28,7 @@ use itertools::Itertools; use mz_audit_log::{VersionedEvent, VersionedStorageUsage}; use mz_controller::clusters::ReplicaLogging; use mz_controller_types::{ClusterId, ReplicaId}; +use mz_ore::collections::CollectionExt; use mz_proto::RustType; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; use mz_repr::role_id::RoleId; @@ -595,18 +596,33 @@ impl<'a> Transaction<'a> { } pub fn get_and_increment_id(&mut self, key: String) -> Result { - let id = self + Ok(self.get_and_increment_id_by(key, 1)?.into_element()) + } + + pub fn get_and_increment_id_by( + &mut self, + key: String, + amount: u64, + ) -> Result, CatalogError> { + let current_id = self .id_allocator .items() .get(&IdAllocKey { name: key.clone() }) .unwrap_or_else(|| panic!("{key} id allocator missing")) .next_id; - let next_id = id.checked_add(1).ok_or(SqlCatalogError::IdExhaustion)?; + let next_id = current_id + .checked_add(amount) + .ok_or(SqlCatalogError::IdExhaustion)?; let prev = self .id_allocator .set(IdAllocKey { name: key }, Some(IdAllocValue { next_id }))?; - assert!(prev.is_some()); - Ok(id) + assert_eq!( + prev, + Some(IdAllocValue { + next_id: current_id + }) + ); + Ok((current_id..next_id).collect()) } pub(crate) fn insert_id_allocator( @@ -692,6 +708,14 @@ impl<'a> Transaction<'a> { } } + /// Removes all storage usage events in `events` from the transaction. + pub(crate) fn remove_storage_usage_events(&mut self, events: Vec) { + let events = events + .into_iter() + .map(|event| (StorageUsageKey { metric: event }.into_proto(), (), -1)); + self.storage_usage_updates.extend(events); + } + /// Removes item `id` from the transaction. /// /// Returns an error if `id` is not found. @@ -971,6 +995,80 @@ impl<'a> Transaction<'a> { Ok(()) } + /// Set persisted setting. + pub(crate) fn set_setting( + &mut self, + name: String, + value: Option, + ) -> Result<(), CatalogError> { + self.settings.set( + SettingKey { name }, + value.map(|value| SettingValue { value }), + )?; + Ok(()) + } + + /// Set persisted introspection source index. + pub(crate) fn set_introspection_source_index( + &mut self, + introspection_source_index: IntrospectionSourceIndex, + ) -> Result<(), CatalogError> { + let (key, value) = introspection_source_index.into_key_value(); + self.introspection_sources.set(key, Some(value))?; + Ok(()) + } + + /// Set persisted system object mappings. + pub(crate) fn set_system_object_mapping( + &mut self, + mapping: SystemObjectMapping, + ) -> Result<(), CatalogError> { + let (key, value) = mapping.into_key_value(); + self.system_gid_mapping.set(key, Some(value))?; + Ok(()) + } + + /// Set persisted timestamp. + pub(crate) fn set_timestamp( + &mut self, + timeline: Timeline, + ts: mz_repr::Timestamp, + ) -> Result<(), CatalogError> { + let timeline_timestamp = TimelineTimestamp { timeline, ts }; + let (key, value) = timeline_timestamp.into_key_value(); + self.timestamps.set(key, Some(value))?; + Ok(()) + } + + /// Set persisted replica. + pub(crate) fn set_replica( + &mut self, + replica_id: ReplicaId, + cluster_id: ClusterId, + name: String, + config: ReplicaConfig, + owner_id: RoleId, + ) -> Result<(), CatalogError> { + let replica = ClusterReplica { + cluster_id, + replica_id, + name, + config, + owner_id, + }; + let (key, value) = replica.into_key_value(); + self.cluster_replicas.set(key, Some(value))?; + Ok(()) + } + + /// Set persisted configuration. + pub(crate) fn set_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> { + let config = Config { key, value }; + let (key, value) = config.into_key_value(); + self.configs.set(key, Some(value))?; + Ok(()) + } + pub fn update_comment( &mut self, object_id: CommentObjectId, @@ -1043,12 +1141,7 @@ impl<'a> Transaction<'a> { } } - /// Commits the storage transaction to the stash. Any error returned indicates the stash may be - /// in an indeterminate state and needs to be fully re-read before proceeding. In general, this - /// must be fatal to the calling process. We do not panic/halt inside this function itself so - /// that errors can bubble up during initialization. - #[tracing::instrument(level = "debug", skip_all)] - pub async fn commit(self) -> Result<(), CatalogError> { + pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) { let txn_batch = TransactionBatch { databases: self.databases.pending(), schemas: self.schemas.pending(), @@ -1069,7 +1162,17 @@ impl<'a> Transaction<'a> { audit_log_updates: self.audit_log_updates, storage_usage_updates: self.storage_usage_updates, }; - self.durable_catalog.commit_transaction(txn_batch).await + (txn_batch, self.durable_catalog) + } + + /// Commits the storage transaction to the stash. Any error returned indicates the stash may be + /// in an indeterminate state and needs to be fully re-read before proceeding. In general, this + /// must be fatal to the calling process. We do not panic/halt inside this function itself so + /// that errors can bubble up during initialization. + #[tracing::instrument(level = "debug", skip_all)] + pub async fn commit(self) -> Result<(), CatalogError> { + let (txn_batch, durable_catalog) = self.into_parts(); + durable_catalog.commit_transaction(txn_batch).await } } diff --git a/src/catalog/tests/open.rs b/src/catalog/tests/open.rs index 08fc10f347624..4d529e0966542 100644 --- a/src/catalog/tests/open.rs +++ b/src/catalog/tests/open.rs @@ -87,20 +87,23 @@ // END LINT CONFIG use mz_catalog::{ - debug_bootstrap_args, debug_stash_backed_catalog_state, stash_backed_catalog_state, - CatalogError, DurableCatalogState, OpenableDurableCatalogState, StashConfig, + debug_bootstrap_args, debug_stash_backed_catalog_state, persist_backed_catalog_state, + stash_backed_catalog_state, CatalogError, DurableCatalogState, Epoch, + OpenableDurableCatalogState, StashConfig, }; use mz_ore::now::{NOW_ZERO, SYSTEM_TIME}; +use mz_persist_client::PersistClient; use mz_repr::role_id::RoleId; use mz_stash::DebugStashFactory; -use std::num::NonZeroI64; +use uuid::Uuid; #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_stash_is_initialized() { let (debug_factory, stash_config) = stash_config().await; - let openable_state = stash_backed_catalog_state(stash_config); - test_is_initialized(openable_state).await; + let openable_state1 = stash_backed_catalog_state(stash_config.clone()); + let openable_state2 = stash_backed_catalog_state(stash_config); + test_is_initialized(openable_state1, openable_state2).await; debug_factory.drop().await; } @@ -108,31 +111,45 @@ async fn test_stash_is_initialized() { #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_debug_stash_is_initialized() { let debug_factory = DebugStashFactory::new().await; - let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - test_is_initialized(debug_openable_state).await; + let debug_openable_state1 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state2 = debug_stash_backed_catalog_state(&debug_factory); + test_is_initialized(debug_openable_state1, debug_openable_state2).await; debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_is_initialized() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let persist_openable_state1 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state2 = + persist_backed_catalog_state(persist_client, environment_id).await; + test_is_initialized(persist_openable_state1, persist_openable_state2).await; +} + async fn test_is_initialized( - mut openable_state: impl OpenableDurableCatalogState, + mut openable_state1: impl OpenableDurableCatalogState, + mut openable_state2: impl OpenableDurableCatalogState, ) { assert!( - !openable_state.is_initialized().await.unwrap(), + !openable_state1.is_initialized().await.unwrap(), "catalog has not been opened yet" ); - let _ = openable_state + let _ = openable_state1 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); assert!( - openable_state.is_initialized().await.unwrap(), + openable_state2.is_initialized().await.unwrap(), "catalog has been opened yet" ); - // Check twice because the implementation will cache a read-only stash. + // Check twice because some implementations will cache a read-only stash. assert!( - openable_state.is_initialized().await.unwrap(), + openable_state2.is_initialized().await.unwrap(), "catalog has been opened yet" ); } @@ -141,8 +158,9 @@ async fn test_is_initialized( #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_stash_get_deployment_generation() { let (debug_factory, stash_config) = stash_config().await; - let openable_state = stash_backed_catalog_state(stash_config); - test_get_deployment_generation(openable_state).await; + let openable_state1 = stash_backed_catalog_state(stash_config.clone()); + let openable_state2 = stash_backed_catalog_state(stash_config); + test_get_deployment_generation(openable_state1, openable_state2).await; debug_factory.drop().await; } @@ -150,33 +168,47 @@ async fn test_stash_get_deployment_generation() { #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_debug_stash_get_deployment_generation() { let debug_factory = DebugStashFactory::new().await; - let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - test_get_deployment_generation(debug_openable_state).await; + let debug_openable_state1 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state2 = debug_stash_backed_catalog_state(&debug_factory); + test_get_deployment_generation(debug_openable_state1, debug_openable_state2).await; debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_get_deployment_generation() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let persist_openable_state1 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state2 = + persist_backed_catalog_state(persist_client, environment_id).await; + test_get_deployment_generation(persist_openable_state1, persist_openable_state2).await; +} + async fn test_get_deployment_generation( - mut openable_state: impl OpenableDurableCatalogState, + mut openable_state1: impl OpenableDurableCatalogState, + mut openable_state2: impl OpenableDurableCatalogState, ) { assert_eq!( - openable_state.get_deployment_generation().await.unwrap(), + openable_state1.get_deployment_generation().await.unwrap(), None, "deployment generation has not been set" ); - let _ = openable_state + let _ = openable_state1 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), Some(42)) .await .unwrap(); assert_eq!( - openable_state.get_deployment_generation().await.unwrap(), + openable_state2.get_deployment_generation().await.unwrap(), Some(42), "deployment generation has been set to 42" ); - // Check twice because the implementation will cache a read-only stash. + // Check twice because some implementations will cache a read-only stash. assert_eq!( - openable_state.get_deployment_generation().await.unwrap(), + openable_state2.get_deployment_generation().await.unwrap(), Some(42), "deployment generation has been set to 42" ); @@ -186,8 +218,17 @@ async fn test_get_deployment_generation( #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_stash_open_savepoint() { let (debug_factory, stash_config) = stash_config().await; - let openable_state = stash_backed_catalog_state(stash_config); - test_open_savepoint(openable_state).await; + let openable_state1 = stash_backed_catalog_state(stash_config.clone()); + let openable_state2 = stash_backed_catalog_state(stash_config.clone()); + let openable_state3 = stash_backed_catalog_state(stash_config.clone()); + let openable_state4 = stash_backed_catalog_state(stash_config); + test_open_savepoint( + openable_state1, + openable_state2, + openable_state3, + openable_state4, + ) + .await; debug_factory.drop().await; } @@ -195,17 +236,51 @@ async fn test_stash_open_savepoint() { #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_debug_stash_open_savepoint() { let debug_factory = DebugStashFactory::new().await; - let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - test_open_savepoint(debug_openable_state).await; + let debug_openable_state1 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state2 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state3 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state4 = debug_stash_backed_catalog_state(&debug_factory); + test_open_savepoint( + debug_openable_state1, + debug_openable_state2, + debug_openable_state3, + debug_openable_state4, + ) + .await; debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_open_savepoint() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let persist_openable_state1 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state2 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state3 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state4 = + persist_backed_catalog_state(persist_client, environment_id).await; + test_open_savepoint( + persist_openable_state1, + persist_openable_state2, + persist_openable_state3, + persist_openable_state4, + ) + .await; +} + async fn test_open_savepoint( - mut openable_state: impl OpenableDurableCatalogState, + openable_state1: impl OpenableDurableCatalogState, + openable_state2: impl OpenableDurableCatalogState, + openable_state3: impl OpenableDurableCatalogState, + openable_state4: impl OpenableDurableCatalogState, ) { { // Can't open a savepoint catalog until it's been initialized. - let err = openable_state + let err = openable_state1 .open_savepoint(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap_err(); @@ -216,26 +291,20 @@ async fn test_open_savepoint( // Initialize the stash. { - let mut state = openable_state + let mut state = openable_state2 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); - assert_eq!( - state.epoch(), - NonZeroI64::new(2).expect("known to be non-zero") - ); + assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); } // Open catalog in check mode. - let mut state = openable_state + let mut state = openable_state3 .open_savepoint(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); // Savepoint catalogs do not increment the epoch. - assert_eq!( - state.epoch(), - NonZeroI64::new(2).expect("known to be non-zero") - ); + assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); // Perform write. let mut txn = state.transaction().await.unwrap(); @@ -254,7 +323,7 @@ async fn test_open_savepoint( { // Open catalog normally. - let mut state = openable_state + let mut state = openable_state4 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); @@ -273,8 +342,10 @@ async fn test_open_savepoint( #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_stash_open_read_only() { let (debug_factory, stash_config) = stash_config().await; - let openable_state = stash_backed_catalog_state(stash_config); - test_open_read_only(openable_state).await; + let openable_state1 = stash_backed_catalog_state(stash_config.clone()); + let openable_state2 = stash_backed_catalog_state(stash_config.clone()); + let openable_state3 = stash_backed_catalog_state(stash_config); + test_open_read_only(openable_state1, openable_state2, openable_state3).await; debug_factory.drop().await; } @@ -282,16 +353,44 @@ async fn test_stash_open_read_only() { #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_debug_stash_open_read_only() { let debug_factory = DebugStashFactory::new().await; - let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - test_open_read_only(debug_openable_state).await; + let debug_openable_state1 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state2 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state3 = debug_stash_backed_catalog_state(&debug_factory); + test_open_read_only( + debug_openable_state1, + debug_openable_state2, + debug_openable_state3, + ) + .await; debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_open_read_only() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let persist_openable_state1 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state2 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state3 = + persist_backed_catalog_state(persist_client, environment_id).await; + test_open_read_only( + persist_openable_state1, + persist_openable_state2, + persist_openable_state3, + ) + .await; +} + async fn test_open_read_only( - mut openable_state: impl OpenableDurableCatalogState, + openable_state1: impl OpenableDurableCatalogState, + openable_state2: impl OpenableDurableCatalogState, + openable_state3: impl OpenableDurableCatalogState, ) { // Can't open a read-only stash until it's been initialized. - let err = openable_state + let err = openable_state1 .open_read_only(SYSTEM_TIME.clone(), &debug_bootstrap_args()) .await .unwrap_err(); @@ -302,37 +401,39 @@ async fn test_open_read_only( // Initialize the stash. { - let mut state = openable_state + let mut state = openable_state2 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); - assert_eq!( - state.epoch(), - NonZeroI64::new(2).expect("known to be non-zero") - ); + assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); } - let mut state = openable_state + let mut state = openable_state3 .open_read_only(SYSTEM_TIME.clone(), &debug_bootstrap_args()) .await .unwrap(); // Read-only catalogs do not increment the epoch. - assert_eq!( - state.epoch(), - NonZeroI64::new(2).expect("known to be non-zero") - ); + assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); let err = state.set_deploy_generation(42).await.unwrap_err(); - assert!(err - .to_string() - .contains("cannot execute UPDATE in a read-only transaction")); + match err { + CatalogError::Catalog(_) => panic!("unexpected catalog error"), + CatalogError::Durable(e) => assert!( + e.can_recover_with_write_mode() + // Stash returns an opaque Postgres error here and doesn't realize that that the + // above should be true. + || e.to_string() + .contains("cannot execute UPDATE in a read-only transaction") + ), + } } #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_stash_open() { let (debug_factory, stash_config) = stash_config().await; - let openable_state = stash_backed_catalog_state(stash_config); - test_open(openable_state).await; + let openable_state1 = stash_backed_catalog_state(stash_config.clone()); + let openable_state2 = stash_backed_catalog_state(stash_config); + test_open(openable_state1, openable_state2).await; debug_factory.drop().await; } @@ -340,25 +441,36 @@ async fn test_stash_open() { #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_debug_stash_open() { let debug_factory = DebugStashFactory::new().await; - let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory); - test_open(debug_openable_state).await; + let debug_openable_state1 = debug_stash_backed_catalog_state(&debug_factory); + let debug_openable_state2 = debug_stash_backed_catalog_state(&debug_factory); + test_open(debug_openable_state1, debug_openable_state2).await; debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_open() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let persist_openable_state1 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let persist_openable_state2 = + persist_backed_catalog_state(persist_client, environment_id).await; + test_open(persist_openable_state1, persist_openable_state2).await; +} + async fn test_open( - mut openable_state: impl OpenableDurableCatalogState, + openable_state1: impl OpenableDurableCatalogState, + openable_state2: impl OpenableDurableCatalogState, ) { let (snapshot, audit_log) = { - let mut state = openable_state + let mut state = openable_state1 // Use `NOW_ZERO` for consistent timestamps in the snapshots. .open(NOW_ZERO.clone(), &debug_bootstrap_args(), None) .await .unwrap(); - assert_eq!( - state.epoch(), - NonZeroI64::new(2).expect("known to be non-zero") - ); + assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); // Check initial snapshot. let snapshot = state.snapshot().await.unwrap(); insta::assert_debug_snapshot!(snapshot); @@ -368,15 +480,12 @@ async fn test_open( }; // Reopening the catalog will increment the epoch, but shouldn't change the initial snapshot. { - let mut state = openable_state + let mut state = openable_state2 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); - assert_eq!( - state.epoch(), - NonZeroI64::new(3).expect("known to be non-zero") - ); + assert_eq!(state.epoch(), Epoch::new(3).expect("known to be non-zero")); assert_eq!(state.snapshot().await.unwrap(), snapshot); assert_eq!(state.get_audit_logs().await.unwrap(), audit_log); } diff --git a/src/catalog/tests/read-write.rs b/src/catalog/tests/read-write.rs index 133e710396be8..7ca57919c395f 100644 --- a/src/catalog/tests/read-write.rs +++ b/src/catalog/tests/read-write.rs @@ -94,16 +94,18 @@ use mz_audit_log::{ use mz_catalog::objects::{Config, DurableType, IdAlloc, IntrospectionSourceIndex}; use mz_catalog::objects::{SystemObjectDescription, SystemObjectUniqueIdentifier}; use mz_catalog::{ - debug_bootstrap_args, debug_stash_backed_catalog_state, Cluster, ClusterConfig, ClusterReplica, - ClusterVariant, Comment, Database, DefaultPrivilege, DurableCatalogState, Item, - OpenableDurableCatalogState, ReplicaConfig, ReplicaLocation, Role, Schema, SystemConfiguration, - SystemObjectMapping, TimelineTimestamp, DATABASE_ID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, - USER_ITEM_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY, + debug_bootstrap_args, debug_stash_backed_catalog_state, persist_backed_catalog_state, + CatalogError, Cluster, ClusterConfig, ClusterReplica, ClusterVariant, Comment, Database, + DefaultPrivilege, DurableCatalogError, DurableCatalogState, Item, OpenableDurableCatalogState, + ReplicaConfig, ReplicaLocation, Role, Schema, SystemConfiguration, SystemObjectMapping, + TimelineTimestamp, DATABASE_ID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY, + USER_ROLE_ID_ALLOC_KEY, }; use mz_controller::clusters::ReplicaLogging; use mz_controller_types::{ClusterId, ReplicaId}; use mz_ore::collections::CollectionExt; use mz_ore::now::SYSTEM_TIME; +use mz_persist_client::PersistClient; use mz_proto::RustType; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; use mz_repr::role_id::RoleId; @@ -117,30 +119,60 @@ use mz_stash::DebugStashFactory; use mz_stash_types::objects::proto; use mz_storage_types::sources::Timeline; use std::time::Duration; +use uuid::Uuid; #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_stash_confirm_leadership() { let debug_factory = DebugStashFactory::new().await; - let openable_state = debug_stash_backed_catalog_state(&debug_factory); - test_confirm_leadership(openable_state).await; + let openable_state1 = debug_stash_backed_catalog_state(&debug_factory); + let openable_state2 = debug_stash_backed_catalog_state(&debug_factory); + test_confirm_leadership(openable_state1, openable_state2).await; debug_factory.drop().await; } + +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_confirm_leadership() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state1 = + persist_backed_catalog_state(persist_client.clone(), environment_id).await; + let openable_state2 = persist_backed_catalog_state(persist_client, environment_id).await; + test_confirm_leadership(openable_state1, openable_state2).await; +} + async fn test_confirm_leadership( - mut openable_state: impl OpenableDurableCatalogState, + openable_state1: impl OpenableDurableCatalogState, + openable_state2: impl OpenableDurableCatalogState, ) { - let mut state1 = openable_state + let mut state1 = openable_state1 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); assert!(state1.confirm_leadership().await.is_ok()); - let mut state2 = openable_state + let mut state2 = openable_state2 .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await .unwrap(); assert!(state2.confirm_leadership().await.is_ok()); - assert!(state1.confirm_leadership().await.is_err()); + + let err = state1.confirm_leadership().await.unwrap_err(); + assert!(matches!( + err, + CatalogError::Durable(DurableCatalogError::Fence(_)) + )); + + // Test that state1 can't start a transaction. + let err = match state1.transaction().await { + Ok(_) => panic!("unexpected Ok"), + Err(e) => e, + }; + assert!(matches!( + err, + CatalogError::Durable(DurableCatalogError::Fence(_)) + )); } #[mz_ore::test(tokio::test)] @@ -152,8 +184,17 @@ async fn test_stash_catalog_content_version() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_catalog_content_version() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_catalog_content_version(openable_state).await; +} + async fn test_catalog_content_version( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let mut state = openable_state .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) @@ -189,8 +230,17 @@ async fn test_stash_get_and_prune_storage_usage() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_get_and_prune_storage_usage() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_get_and_prune_storage_usage(openable_state).await; +} + async fn test_get_and_prune_storage_usage( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let old_event = VersionedStorageUsage::V1(StorageUsageV1 { id: 1, @@ -242,8 +292,17 @@ async fn test_stash_system_items() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_system_items() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_system_items(openable_state).await; +} + async fn test_system_items( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let mappings = [ SystemObjectMapping { @@ -299,8 +358,17 @@ async fn test_stash_introspection_source_indexes() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_introspection_source_indexes() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_introspection_source_indexes(openable_state).await; +} + async fn test_introspection_source_indexes( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let mappings = [ IntrospectionSourceIndex { @@ -364,8 +432,17 @@ async fn test_stash_replicas() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_replicas() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_replicas(openable_state).await; +} + async fn test_replicas( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let replica = ClusterReplica { cluster_id: ClusterId::User(1), @@ -424,8 +501,17 @@ async fn test_stash_timestamps() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_timestamps() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_timestamps(openable_state).await; +} + async fn test_timestamps( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let timeline_timestamp = TimelineTimestamp { timeline: Timeline::User("Mars".to_string()), @@ -473,8 +559,17 @@ async fn test_stash_deploy_generation() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_deploy_generation() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_deploy_generation(openable_state).await; +} + async fn test_deploy_generation( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let deploy_generation = 42; let mut state = openable_state @@ -512,8 +607,17 @@ async fn test_stash_allocate_id() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_allocate_id() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_allocate_id(openable_state).await; +} + async fn test_allocate_id( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let id_type = USER_ITEM_ALLOC_KEY; let mut state = openable_state @@ -550,8 +654,17 @@ async fn test_stash_clusters() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_clusters() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_clusters(openable_state).await; +} + async fn test_clusters( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let clusters = [ Cluster { @@ -624,8 +737,17 @@ async fn test_stash_databases() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_databases() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_databases(openable_state).await; +} + async fn test_databases( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let mut state = openable_state .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) @@ -687,9 +809,16 @@ async fn test_stash_schemas() { debug_factory.drop().await; } -async fn test_schemas( - mut openable_state: impl OpenableDurableCatalogState, -) { +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_schemas() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_schemas(openable_state).await; +} + +async fn test_schemas(openable_state: impl OpenableDurableCatalogState) { let mut state = openable_state .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await @@ -753,9 +882,16 @@ async fn test_stash_roles() { debug_factory.drop().await; } -async fn test_roles( - mut openable_state: impl OpenableDurableCatalogState, -) { +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_roles() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_roles(openable_state).await; +} + +async fn test_roles(openable_state: impl OpenableDurableCatalogState) { let mut state = openable_state .open(SYSTEM_TIME.clone(), &debug_bootstrap_args(), None) .await @@ -823,8 +959,17 @@ async fn test_stash_default_privileges() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_default_privileges() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_default_privileges(openable_state).await; +} + async fn test_default_privileges( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let default_privileges = [ DefaultPrivilege { @@ -900,8 +1045,17 @@ async fn test_stash_system_privileges() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_system_privileges() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_system_privileges(openable_state).await; +} + async fn test_system_privileges( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let system_privileges = [ MzAclItem { @@ -960,8 +1114,17 @@ async fn test_stash_system_configurations() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_system_configurations() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_system_configurations(openable_state).await; +} + async fn test_system_configurations( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let system_configurations = [ SystemConfiguration { @@ -1017,8 +1180,17 @@ async fn test_stash_comments() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_comments() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_comments(openable_state).await; +} + async fn test_comments( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let comments = [ Comment { @@ -1077,8 +1249,17 @@ async fn test_stash_audit_logs() { debug_factory.drop().await; } +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_audit_logs() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_audit_logs(openable_state).await; +} + async fn test_audit_logs( - mut openable_state: impl OpenableDurableCatalogState, + openable_state: impl OpenableDurableCatalogState, ) { let audit_logs = [ VersionedEvent::V1(EventV1 { @@ -1136,9 +1317,16 @@ async fn test_stash_items() { debug_factory.drop().await; } -async fn test_items( - mut openable_state: impl OpenableDurableCatalogState, -) { +#[mz_ore::test(tokio::test)] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` +async fn test_persist_items() { + let persist_client = PersistClient::new_for_tests().await; + let environment_id = Uuid::new_v4(); + let openable_state = persist_backed_catalog_state(persist_client.clone(), environment_id).await; + test_items(openable_state).await; +} + +async fn test_items(openable_state: impl OpenableDurableCatalogState) { let items = [ Item { id: GlobalId::User(100), diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index 2077a4aff1496..27c09395882f7 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -363,20 +363,22 @@ impl Listeners { mz_server_core::serve(internal_http_conns, internal_http_server) }); - let mut openable_adapter_storage = mz_catalog::stash_backed_catalog_state(StashConfig { - stash_factory: mz_stash::StashFactory::from_metrics(Arc::clone( - &config.controller.stash_metrics, - )), - stash_url: config.adapter_stash_url.clone(), - schema: stash_schema.clone(), - tls: tls.clone(), - }); - 'leader_promotion: { let Some(deploy_generation) = config.deploy_generation else { break 'leader_promotion; }; tracing::info!("Requested deploy generation {deploy_generation}"); + + let mut openable_adapter_storage = + mz_catalog::stash_backed_catalog_state(StashConfig { + stash_factory: mz_stash::StashFactory::from_metrics(Arc::clone( + &config.controller.stash_metrics, + )), + stash_url: config.adapter_stash_url.clone(), + schema: stash_schema.clone(), + tls: tls.clone(), + }); + if !openable_adapter_storage.is_initialized().await? { tracing::info!("Stash doesn't exist so there's no current deploy generation. We won't wait to be leader"); break 'leader_promotion; @@ -426,6 +428,14 @@ impl Listeners { } } + let openable_adapter_storage = mz_catalog::stash_backed_catalog_state(StashConfig { + stash_factory: mz_stash::StashFactory::from_metrics(Arc::clone( + &config.controller.stash_metrics, + )), + stash_url: config.adapter_stash_url.clone(), + schema: stash_schema.clone(), + tls: tls.clone(), + }); let mut adapter_storage = Box::new( openable_adapter_storage .open( diff --git a/src/stash-debug/src/main.rs b/src/stash-debug/src/main.rs index bcfa44d43da01..e292de5553e51 100644 --- a/src/stash-debug/src/main.rs +++ b/src/stash-debug/src/main.rs @@ -479,7 +479,7 @@ impl Usage { let metrics_registry = &MetricsRegistry::new(); let now = SYSTEM_TIME.clone(); - let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); + let openable_storage = mz_catalog::stash_backed_catalog_state(stash_config); let storage = Box::new( openable_storage .open_savepoint(