Skip to content

Commit

Permalink
storage: move prepare/init_state from controller to collections
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Jan 13, 2025
1 parent 5d8ae13 commit b5de2fd
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 61 deletions.
23 changes: 11 additions & 12 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use mz_sql::rbac;
use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::controller::StorageController;
use mz_storage_client::storage_collections::StorageCollections;
use timely::Container;
use tracing::{error, info, warn, Instrument};
use uuid::Uuid;
Expand Down Expand Up @@ -652,15 +652,17 @@ impl Catalog {
.boxed()
}

/// Initializes the `storage_controller` to understand all shards that
/// `self` expects to exist.
/// Initializes STORAGE to understand all shards that `self` expects to
/// exist.
///
/// Note that this must be done before creating/rendering collections
/// because the storage controller might not be aware of new system
/// collections created between versions.
async fn initialize_storage_controller_state(
async fn initialize_storage_state(
&mut self,
storage_controller: &mut dyn StorageController<Timestamp = mz_repr::Timestamp>,
storage_collections: &Arc<
dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
>,
storage_collections_to_drop: BTreeSet<GlobalId>,
) -> Result<(), mz_catalog::durable::CatalogError> {
let collections = self
Expand All @@ -676,7 +678,7 @@ impl Catalog {
let mut storage = self.storage().await;
let mut txn = storage.transaction().await?;

storage_controller
storage_collections
.initialize_state(&mut txn, collections, storage_collections_to_drop)
.await
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
Expand Down Expand Up @@ -706,7 +708,7 @@ impl Catalog {
let controller_start = Instant::now();
info!("startup: controller init: beginning");

let mut controller = {
let controller = {
let mut storage = self.storage().await;
let mut tx = storage.transaction().await?;
mz_controller::prepare_initialization(&mut tx)
Expand All @@ -724,11 +726,8 @@ impl Catalog {
mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
};

self.initialize_storage_controller_state(
&mut *controller.storage,
storage_collections_to_drop,
)
.await?;
self.initialize_storage_state(&controller.storage_collections, storage_collections_to_drop)
.await?;

info!(
"startup: controller init: complete in {:?}",
Expand Down
17 changes: 11 additions & 6 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! Logic related to executing catalog transactions.
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
Expand Down Expand Up @@ -54,7 +55,7 @@ use mz_sql::session::vars::OwnedVarInput;
use mz_sql::session::vars::{Value as VarValue, VarInput};
use mz_sql::{rbac, DEFAULT_SCHEMA};
use mz_sql_parser::ast::{QualifiedReplica, Value};
use mz_storage_client::controller::StorageController;
use mz_storage_client::storage_collections::StorageCollections;
use tracing::{info, trace};

use crate::catalog::{
Expand Down Expand Up @@ -362,7 +363,9 @@ impl Catalog {
&mut self,
// n.b. this is an option to prevent us from needing to build out a
// dummy impl of `StorageController` for tests.
storage_controller: Option<&mut dyn StorageController<Timestamp = mz_repr::Timestamp>>,
storage_collections: Option<
&mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
>,
oracle_write_ts: mz_repr::Timestamp,
session: Option<&ConnMeta>,
ops: Vec<Op>,
Expand Down Expand Up @@ -416,7 +419,7 @@ impl Catalog {
let mut state = self.state.clone();

Self::transact_inner(
storage_controller,
storage_collections,
oracle_write_ts,
session,
ops,
Expand Down Expand Up @@ -467,7 +470,9 @@ impl Catalog {
/// - If the only element of `ops` is [`Op::TransactionDryRun`].
#[instrument(name = "catalog::transact_inner")]
async fn transact_inner(
storage_controller: Option<&mut dyn StorageController<Timestamp = mz_repr::Timestamp>>,
storage_collections: Option<
&mut Arc<dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync>,
>,
oracle_write_ts: mz_repr::Timestamp,
session: Option<&ConnMeta>,
mut ops: Vec<Op>,
Expand Down Expand Up @@ -540,8 +545,8 @@ impl Catalog {
}

if dry_run_ops.is_empty() {
// `storage_controller` should only be `None` for tests.
if let Some(c) = storage_controller {
// `storage_collections` should only be `None` for tests.
if let Some(c) = storage_collections {
c.prepare_state(
tx,
storage_collections_to_create,
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,12 @@ impl Coordinator {
mut builtin_table_updates,
audit_events,
} = catalog
.transact(Some(&mut *controller.storage), oracle_write_ts, conn, ops)
.transact(
Some(&mut controller.storage_collections),
oracle_write_ts,
conn,
ops,
)
.await?;

// Update in-memory cluster replica statuses.
Expand Down
20 changes: 0 additions & 20 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,26 +654,6 @@ pub trait StorageController: Debug {
/// introspection type, as readers rely on this and might panic otherwise.
fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);

/// On boot, seed the controller's metadata/state.
async fn initialize_state(
&mut self,
txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>;

/// Update the implementor of [`StorageTxn`] with the appropriate metadata
/// given the IDs to add and drop.
///
/// The data modified in the `StorageTxn` must be made available in all
/// subsequent calls that require [`StorageMetadata`] as a parameter.
async fn prepare_state(
&self,
txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Result<(), StorageError<Self::Timestamp>>;

async fn real_time_recent_timestamp(
&self,
source_ids: BTreeSet<GlobalId>,
Expand Down
22 changes: 0 additions & 22 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2134,28 +2134,6 @@ where
self.collection_manager.differential_write(id, op);
}

async fn initialize_state(
&mut self,
txn: &mut (dyn StorageTxn<T> + Send),
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
) -> Result<(), StorageError<T>> {
self.storage_collections
.initialize_state(txn, init_ids, drop_ids)
.await
}

async fn prepare_state(
&self,
txn: &mut (dyn StorageTxn<T> + Send),
ids_to_add: BTreeSet<GlobalId>,
ids_to_drop: BTreeSet<GlobalId>,
) -> Result<(), StorageError<T>> {
self.storage_collections
.prepare_state(txn, ids_to_add, ids_to_drop)
.await
}

async fn real_time_recent_timestamp(
&self,
timestamp_objects: BTreeSet<GlobalId>,
Expand Down

0 comments on commit b5de2fd

Please sign in to comment.