From b5de2fd452dc1dabfe7162a441ae83af9824dcac Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 10 Jan 2025 14:11:02 +0100 Subject: [PATCH] storage: move prepare/init_state from controller to collections Work towards implementing https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20240117_decoupled_storage_controller.md --- src/adapter/src/catalog/open.rs | 23 +++++++++++------------ src/adapter/src/catalog/transact.rs | 17 +++++++++++------ src/adapter/src/coord/ddl.rs | 7 ++++++- src/storage-client/src/controller.rs | 20 -------------------- src/storage-controller/src/lib.rs | 22 ---------------------- 5 files changed, 28 insertions(+), 61 deletions(-) diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 6d18f935ecae3..53480a4287e24 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -59,7 +59,7 @@ use mz_sql::rbac; use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER}; use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput}; use mz_sql_parser::ast::display::AstDisplay; -use mz_storage_client::controller::StorageController; +use mz_storage_client::storage_collections::StorageCollections; use timely::Container; use tracing::{error, info, warn, Instrument}; use uuid::Uuid; @@ -652,15 +652,17 @@ impl Catalog { .boxed() } - /// Initializes the `storage_controller` to understand all shards that - /// `self` expects to exist. + /// Initializes STORAGE to understand all shards that `self` expects to + /// exist. /// /// Note that this must be done before creating/rendering collections /// because the storage controller might not be aware of new system /// collections created between versions. - async fn initialize_storage_controller_state( + async fn initialize_storage_state( &mut self, - storage_controller: &mut dyn StorageController, + storage_collections: &Arc< + dyn StorageCollections + Send + Sync, + >, storage_collections_to_drop: BTreeSet, ) -> Result<(), mz_catalog::durable::CatalogError> { let collections = self @@ -676,7 +678,7 @@ impl Catalog { let mut storage = self.storage().await; let mut txn = storage.transaction().await?; - storage_controller + storage_collections .initialize_state(&mut txn, collections, storage_collections_to_drop) .await .map_err(mz_catalog::durable::DurableCatalogError::from)?; @@ -706,7 +708,7 @@ impl Catalog { let controller_start = Instant::now(); info!("startup: controller init: beginning"); - let mut controller = { + let controller = { let mut storage = self.storage().await; let mut tx = storage.transaction().await?; mz_controller::prepare_initialization(&mut tx) @@ -724,11 +726,8 @@ impl Catalog { mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await }; - self.initialize_storage_controller_state( - &mut *controller.storage, - storage_collections_to_drop, - ) - .await?; + self.initialize_storage_state(&controller.storage_collections, storage_collections_to_drop) + .await?; info!( "startup: controller init: complete in {:?}", diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index 255e7eb9c8e6d..ac4144bbf61cb 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -10,6 +10,7 @@ //! Logic related to executing catalog transactions. use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; use std::time::Duration; use itertools::Itertools; @@ -54,7 +55,7 @@ use mz_sql::session::vars::OwnedVarInput; use mz_sql::session::vars::{Value as VarValue, VarInput}; use mz_sql::{rbac, DEFAULT_SCHEMA}; use mz_sql_parser::ast::{QualifiedReplica, Value}; -use mz_storage_client::controller::StorageController; +use mz_storage_client::storage_collections::StorageCollections; use tracing::{info, trace}; use crate::catalog::{ @@ -362,7 +363,9 @@ impl Catalog { &mut self, // n.b. this is an option to prevent us from needing to build out a // dummy impl of `StorageController` for tests. - storage_controller: Option<&mut dyn StorageController>, + storage_collections: Option< + &mut Arc + Send + Sync>, + >, oracle_write_ts: mz_repr::Timestamp, session: Option<&ConnMeta>, ops: Vec, @@ -416,7 +419,7 @@ impl Catalog { let mut state = self.state.clone(); Self::transact_inner( - storage_controller, + storage_collections, oracle_write_ts, session, ops, @@ -467,7 +470,9 @@ impl Catalog { /// - If the only element of `ops` is [`Op::TransactionDryRun`]. #[instrument(name = "catalog::transact_inner")] async fn transact_inner( - storage_controller: Option<&mut dyn StorageController>, + storage_collections: Option< + &mut Arc + Send + Sync>, + >, oracle_write_ts: mz_repr::Timestamp, session: Option<&ConnMeta>, mut ops: Vec, @@ -540,8 +545,8 @@ impl Catalog { } if dry_run_ops.is_empty() { - // `storage_controller` should only be `None` for tests. - if let Some(c) = storage_controller { + // `storage_collections` should only be `None` for tests. + if let Some(c) = storage_collections { c.prepare_state( tx, storage_collections_to_create, diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 15807d4951913..add23ec451364 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -521,7 +521,12 @@ impl Coordinator { mut builtin_table_updates, audit_events, } = catalog - .transact(Some(&mut *controller.storage), oracle_write_ts, conn, ops) + .transact( + Some(&mut controller.storage_collections), + oracle_write_ts, + conn, + ops, + ) .await?; // Update in-memory cluster replica statuses. diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 94e224676877e..77d6345ceaa30 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -654,26 +654,6 @@ pub trait StorageController: Debug { /// introspection type, as readers rely on this and might panic otherwise. fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp); - /// On boot, seed the controller's metadata/state. - async fn initialize_state( - &mut self, - txn: &mut (dyn StorageTxn + Send), - init_ids: BTreeSet, - drop_ids: BTreeSet, - ) -> Result<(), StorageError>; - - /// Update the implementor of [`StorageTxn`] with the appropriate metadata - /// given the IDs to add and drop. - /// - /// The data modified in the `StorageTxn` must be made available in all - /// subsequent calls that require [`StorageMetadata`] as a parameter. - async fn prepare_state( - &self, - txn: &mut (dyn StorageTxn + Send), - ids_to_add: BTreeSet, - ids_to_drop: BTreeSet, - ) -> Result<(), StorageError>; - async fn real_time_recent_timestamp( &self, source_ids: BTreeSet, diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index c068cf273555b..713b6c9c50464 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -2134,28 +2134,6 @@ where self.collection_manager.differential_write(id, op); } - async fn initialize_state( - &mut self, - txn: &mut (dyn StorageTxn + Send), - init_ids: BTreeSet, - drop_ids: BTreeSet, - ) -> Result<(), StorageError> { - self.storage_collections - .initialize_state(txn, init_ids, drop_ids) - .await - } - - async fn prepare_state( - &self, - txn: &mut (dyn StorageTxn + Send), - ids_to_add: BTreeSet, - ids_to_drop: BTreeSet, - ) -> Result<(), StorageError> { - self.storage_collections - .prepare_state(txn, ids_to_add, ids_to_drop) - .await - } - async fn real_time_recent_timestamp( &self, timestamp_objects: BTreeSet,