Skip to content

Commit

Permalink
catalog: Add persist backed catalog state (#22360)
Browse files Browse the repository at this point in the history
This commit implements the DurableCatalogState trait using persist
as the backing store. There are a handful of follow-up items that we
need to do before considering using this implementation in persist.
Those items include:

  - Using Protobuf to serialize and deserialize the catalog data.
  - Update the shard ID for better readability.
  - Implement data format migrations.
  - Implement a linearizable way to confirm leadership.
  - Figure out a way to cleanly drop the persist handles.
  - Optimize transaction performance.
  - Remove the audit log from the catalog.
  - Remove storage usage from the catalog.
  - Implement a catalog debug tool to replace the stash debug tool.

Resolves #20953
  • Loading branch information
jkosh44 authored Oct 17, 2023
1 parent 738d2b0 commit 31bde54
Show file tree
Hide file tree
Showing 12 changed files with 1,712 additions and 157 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ impl Catalog {
debug_stash_factory: &DebugStashFactory,
now: NowFn,
) -> Result<Catalog, anyhow::Error> {
let mut openable_storage =
mz_catalog::debug_stash_backed_catalog_state(debug_stash_factory);
let openable_storage = mz_catalog::debug_stash_backed_catalog_state(debug_stash_factory);
let storage = openable_storage
.open(now.clone(), &debug_bootstrap_args(), None)
.await?;
Expand All @@ -479,7 +478,7 @@ impl Catalog {
schema: Some(schema),
tls,
};
let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config);
let openable_storage = mz_catalog::stash_backed_catalog_state(stash_config);
let storage = openable_storage
.open(now.clone(), &debug_bootstrap_args(), None)
.await?;
Expand All @@ -493,7 +492,7 @@ impl Catalog {
stash_config: StashConfig,
now: NowFn,
) -> Result<Catalog, anyhow::Error> {
let mut openable_storage = mz_catalog::stash_backed_catalog_state(stash_config);
let openable_storage = mz_catalog::stash_backed_catalog_state(stash_config);
let storage = openable_storage
.open_read_only(now.clone(), &debug_bootstrap_args())
.await?;
Expand Down
6 changes: 6 additions & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ publish = false

[dependencies]
async-trait = "0.1.68"
bytes = { version = "1.3.0", features = ["serde"] }
derivative = "2.2.0"
differential-dataflow = "0.12.0"
futures = "0.3.25"
itertools = "0.10.5"
once_cell = "1.16.0"
Expand All @@ -17,6 +19,8 @@ mz-compute-client = { path = "../compute-client" }
mz-controller = { path = "../controller" }
mz-controller-types = { path = "../controller-types" }
mz-ore = { path = "../ore", features = ["chrono", "async", "tracing_"] }
mz-persist-client = { path = "../persist-client" }
mz-persist-types = { path = "../persist-types" }
mz-pgrepr = { path = "../pgrepr" }
mz-proto = { path = "../proto" }
mz-repr = { path = "../repr", features = ["tracing_"] }
Expand All @@ -31,8 +35,10 @@ proptest-derive = { version = "0.3.0", features = ["boxed_union"] }
postgres-openssl = { version = "0.5.0" }
serde = "1.0.152"
serde_json = "1.0.89"
timely = { version = "0.12.0", default-features = false }
tracing = "0.1.37"
thiserror = "1.0.37"
uuid = "1.2.2"
workspace-hack = { version = "0.0.0", path = "../workspace-hack" }

[dev-dependencies]
Expand Down
29 changes: 21 additions & 8 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use std::collections::BTreeMap;
use std::fmt::Debug;
use std::num::NonZeroI64;
use std::time::Duration;
use uuid::Uuid;

use mz_stash::DebugStashFactory;

Expand All @@ -94,6 +95,7 @@ pub use crate::objects::{
SystemConfiguration, SystemObjectMapping, TimelineTimestamp,
};
use crate::objects::{IntrospectionSourceIndex, Snapshot};
use crate::persist::{PersistCatalogState, PersistHandle};
use crate::stash::{Connection, DebugOpenableConnection, OpenableConnection};
pub use crate::stash::{
StashConfig, ALL_COLLECTIONS, AUDIT_LOG_COLLECTION, CLUSTER_COLLECTION,
Expand All @@ -109,6 +111,7 @@ use mz_audit_log::{VersionedEvent, VersionedStorageUsage};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_client::PersistClient;
use mz_repr::adt::mz_acl_item::MzAclItem;
use mz_repr::role_id::RoleId;
use mz_repr::GlobalId;
Expand All @@ -121,6 +124,7 @@ pub mod builtin;
mod error;
pub mod initialize;
pub mod objects;
mod persist;

pub const DATABASE_ID_ALLOC_KEY: &str = "database";
pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
Expand All @@ -141,6 +145,8 @@ pub struct BootstrapArgs {
pub bootstrap_role: Option<String>,
}

pub type Epoch = NonZeroI64;

/// An API for opening a durable catalog state.
#[async_trait]
pub trait OpenableDurableCatalogState<D: DurableCatalogState>: Debug + Send {
Expand All @@ -153,7 +159,7 @@ pub trait OpenableDurableCatalogState<D: DurableCatalogState>: Debug + Send {
/// - Catalog initialization fails.
/// - Catalog migrations fail.
async fn open_savepoint(
&mut self,
mut self,
now: NowFn,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
Expand All @@ -165,7 +171,7 @@ pub trait OpenableDurableCatalogState<D: DurableCatalogState>: Debug + Send {
/// If the catalog is uninitialized or requires a migrations, then
/// it will fail to open in read only mode.
async fn open_read_only(
&mut self,
mut self,
now: NowFn,
bootstrap_args: &BootstrapArgs,
) -> Result<D, CatalogError>;
Expand All @@ -174,7 +180,7 @@ pub trait OpenableDurableCatalogState<D: DurableCatalogState>: Debug + Send {
/// catalog, if it has not been initialized, and perform any migrations
/// needed.
async fn open(
&mut self,
mut self,
now: NowFn,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
Expand All @@ -200,7 +206,7 @@ pub trait ReadOnlyDurableCatalogState: Debug + Send {
/// for their epoch.
///
/// NB: We may remove this in later iterations of Pv2.
fn epoch(&mut self) -> NonZeroI64;
fn epoch(&mut self) -> Epoch;

/// Returns the version of Materialize that last wrote to the catalog.
///
Expand Down Expand Up @@ -389,22 +395,29 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
}
}

/// Creates a durable catalog state implemented using the stash. The catalog status is unopened,
/// and must be opened before use.
/// Creates a openable durable catalog state implemented using the stash.
pub fn stash_backed_catalog_state(
config: StashConfig,
) -> impl OpenableDurableCatalogState<Connection> {
OpenableConnection::new(config)
}

/// Creates a debug durable catalog state implemented using the stash that is meant to be used in
/// tests. The catalog status is unopened, and must be opened before use.
/// Creates an openable debug durable catalog state implemented using the stash that is meant to be
/// used in tests.
pub fn debug_stash_backed_catalog_state(
debug_stash_factory: &DebugStashFactory,
) -> impl OpenableDurableCatalogState<Connection> + '_ {
DebugOpenableConnection::new(debug_stash_factory)
}

/// Creates an openable durable catalog state implemented using persist.
pub async fn persist_backed_catalog_state(
persist_client: PersistClient,
environment_id: Uuid,
) -> impl OpenableDurableCatalogState<PersistCatalogState> {
PersistHandle::new(persist_client, environment_id).await
}

pub fn debug_bootstrap_args() -> BootstrapArgs {
BootstrapArgs {
default_cluster_replica_size: "1".into(),
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ impl RustType<proto::IdAllocKey> for IdAllocKey {
}
}

#[derive(Clone, PartialOrd, PartialEq, Eq, Ord)]
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
pub struct IdAllocValue {
pub(crate) next_id: u64,
}
Expand Down
Loading

0 comments on commit 31bde54

Please sign in to comment.