Skip to content

Commit

Permalink
catalog: Clean up boot_ts usage in open (#25125)
Browse files Browse the repository at this point in the history
This commit renames the `boot_ts` parameter of the open family of method
in `OpenableDurableCatalogState` to `initial_ts`. `initial_ts` is more
accurate because the timestamp is only used to initialize the catalog if
it's a new environment. Additionally, `boot_ts` is used in other places
and is linearized against the timestamp oracle. In the open family of
methods, `boot_ts` wasn't linearized, so the name may have mistakenly
led people to people that this `boot_ts` was also linearized.

Additionally, `initial_ts` is removed as a parameter from
`open_readonly`, because `open_readonly` cannot initialize a new
environment, so it doesn't make sense to require a `initial_ts`.
  • Loading branch information
jkosh44 authored Feb 15, 2024
1 parent d6c5764 commit 4e79534
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 54 deletions.
6 changes: 3 additions & 3 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ impl Catalog {
stash_config,
));
let storage = openable_storage
.open_read_only(now(), &test_bootstrap_args())
.open_read_only(&test_bootstrap_args())
.await?;
Self::open_debug_catalog_inner(storage, now, environment_id).await
}
Expand All @@ -637,7 +637,7 @@ impl Catalog {
.await,
);
let storage = openable_storage
.open_read_only(now(), &test_bootstrap_args())
.open_read_only(&test_bootstrap_args())
.await?;
Self::open_debug_catalog_inner(storage, now, Some(environment_id)).await
}
Expand All @@ -661,7 +661,7 @@ impl Catalog {
.await,
);
let storage = openable_storage
.open_read_only(now(), &test_bootstrap_args())
.open_read_only(&test_bootstrap_args())
.await?;
Self::open_debug_catalog_inner(storage, now, Some(environment_id)).await
}
Expand Down
8 changes: 5 additions & 3 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// Will return an error in the following scenarios:
/// - Catalog initialization fails.
/// - Catalog migrations fail.
///
/// `initial_ts` is used as the initial timestamp for new environments.
async fn open_savepoint(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
Expand All @@ -116,19 +118,19 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// it will fail to open in read only mode.
async fn open_read_only(
mut self: Box<Self>,
boot_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError>;

/// Opens the catalog in a writeable mode. Optionally initializes the
/// catalog, if it has not been initialized, and perform any migrations
/// needed.
///
/// `initial_ts` is used as the initial timestamp for new environments.
/// `epoch_lower_bound` is used as a lower bound for the epoch that is used by the returned
/// catalog.
async fn open(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
Expand Down
17 changes: 8 additions & 9 deletions src/catalog/src/durable/impls/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct CatalogMigrator {
impl OpenableDurableCatalogState for CatalogMigrator {
async fn open_savepoint(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
Expand All @@ -85,7 +85,7 @@ impl OpenableDurableCatalogState for CatalogMigrator {
let tombstone = self.get_tombstone().await?;
let mut stash = self
.openable_stash
.open_savepoint(boot_ts, bootstrap_args, deploy_generation, None)
.open_savepoint(initial_ts, bootstrap_args, deploy_generation, None)
.await?;
// Forcibly mark the rollback as complete so we look at the correct implementation
// (stash) on re-boot. This is really a no-op because it's a savepoint catalog, but
Expand All @@ -107,15 +107,15 @@ impl OpenableDurableCatalogState for CatalogMigrator {
let stash = self
.openable_stash
.open_savepoint(
boot_ts.clone(),
initial_ts.clone(),
bootstrap_args,
deploy_generation.clone(),
persist_epoch,
)
.await;
let persist = self
.openable_persist
.open_savepoint(boot_ts, bootstrap_args, deploy_generation, stash_epoch)
.open_savepoint(initial_ts, bootstrap_args, deploy_generation, stash_epoch)
.await;

// If our target implementation is the stash, but persist is uninitialized, then we can
Expand Down Expand Up @@ -147,15 +147,14 @@ impl OpenableDurableCatalogState for CatalogMigrator {

async fn open_read_only(
self: Box<Self>,
_boot_ts: EpochMillis,
_bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
panic!("cannot use a read only catalog with the migrate implementation")
}

async fn open(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
Expand All @@ -171,7 +170,7 @@ impl OpenableDurableCatalogState for CatalogMigrator {
let tombstone = self.get_tombstone().await?;
let mut stash = self
.openable_stash
.open(boot_ts, bootstrap_args, deploy_generation, None)
.open(initial_ts, bootstrap_args, deploy_generation, None)
.await?;
// Forcibly mark the rollback as complete so we look at the correct implementation
// (stash) on re-boot.
Expand Down Expand Up @@ -201,7 +200,7 @@ impl OpenableDurableCatalogState for CatalogMigrator {
let stash = self
.openable_stash
.open(
boot_ts.clone(),
initial_ts.clone(),
bootstrap_args,
deploy_generation.clone(),
persist_epoch,
Expand All @@ -210,7 +209,7 @@ impl OpenableDurableCatalogState for CatalogMigrator {
fail::fail_point!("post_stash_fence");
let persist = self
.openable_persist
.open(boot_ts, bootstrap_args, deploy_generation, stash_epoch)
.open(initial_ts, bootstrap_args, deploy_generation, stash_epoch)
.await?;
fail::fail_point!("post_persist_fence");
Self::open_inner(stash, persist, direction).await
Expand Down
15 changes: 7 additions & 8 deletions src/catalog/src/durable/impls/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl UnopenedPersistCatalogState {
async fn open_inner(
mut self,
mode: Mode,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
Expand Down Expand Up @@ -434,7 +434,7 @@ impl UnopenedPersistCatalogState {
catalog.snapshot
);
let mut txn = catalog.transaction().await?;
initialize::initialize(&mut txn, bootstrap_args, boot_ts, deploy_generation).await?;
initialize::initialize(&mut txn, bootstrap_args, initial_ts, deploy_generation).await?;
txn
};

Expand Down Expand Up @@ -664,14 +664,14 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
#[tracing::instrument(level = "info", skip(self))]
async fn open_savepoint(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.open_inner(
Mode::Savepoint,
boot_ts,
initial_ts,
bootstrap_args,
deploy_generation,
epoch_lower_bound,
Expand All @@ -683,25 +683,24 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
#[tracing::instrument(level = "info", skip(self))]
async fn open_read_only(
mut self: Box<Self>,
boot_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.open_inner(Mode::Readonly, boot_ts, bootstrap_args, None, None)
self.open_inner(Mode::Readonly, EpochMillis::MIN, bootstrap_args, None, None)
.boxed()
.await
}

#[tracing::instrument(level = "info", skip(self))]
async fn open(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.open_inner(
Mode::Writable,
boot_ts,
initial_ts,
bootstrap_args,
deploy_generation,
epoch_lower_bound,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/durable/impls/persist/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn test_upgrade_shard() {
.await
.expect("failed to create persist catalog");
let _persist_state = Box::new(persist_openable_state)
.open_read_only(NOW_ZERO(), &test_bootstrap_args())
.open_read_only(&test_bootstrap_args())
.await
.expect("failed to open readonly persist catalog");

Expand Down
17 changes: 8 additions & 9 deletions src/catalog/src/durable/impls/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ where
{
async fn open_savepoint(
self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
let stash = self.stash.open_savepoint(
boot_ts.clone(),
initial_ts.clone(),
bootstrap_args,
deploy_generation.clone(),
epoch_lower_bound,
);
let persist = self.persist.open_savepoint(
boot_ts,
initial_ts,
bootstrap_args,
deploy_generation,
epoch_lower_bound,
Expand All @@ -110,11 +110,10 @@ where

async fn open_read_only(
self: Box<Self>,
boot_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
let stash = self.stash.open_read_only(boot_ts.clone(), bootstrap_args);
let persist = self.persist.open_read_only(boot_ts, bootstrap_args);
let stash = self.stash.open_read_only(bootstrap_args);
let persist = self.persist.open_read_only(bootstrap_args);
let (stash, persist) = futures::future::join(stash, persist).await;
soft_assert_eq_or_log!(
stash.is_ok(),
Expand All @@ -128,19 +127,19 @@ where

async fn open(
self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
let stash = self.stash.open(
boot_ts.clone(),
initial_ts.clone(),
bootstrap_args,
deploy_generation.clone(),
epoch_lower_bound,
);
let persist = self.persist.open(
boot_ts,
initial_ts,
bootstrap_args,
deploy_generation,
epoch_lower_bound,
Expand Down
30 changes: 14 additions & 16 deletions src/catalog/src/durable/impls/stash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,38 +207,37 @@ impl OpenableDurableCatalogState for OpenableConnection {
#[tracing::instrument(name = "storage::open_check", level = "info", skip_all)]
async fn open_savepoint(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.open_stash_savepoint(epoch_lower_bound).await?;
let stash = self.stash.take().expect("opened above");
retry_open(stash, boot_ts, bootstrap_args, deploy_generation).await
retry_open(stash, initial_ts, bootstrap_args, deploy_generation).await
}

#[tracing::instrument(name = "storage::open_read_only", level = "info", skip_all)]
async fn open_read_only(
mut self: Box<Self>,
boot_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.open_stash_read_only().await?;
let stash = self.stash.take().expect("opened above");
retry_open(stash, boot_ts, bootstrap_args, None).await
retry_open(stash, EpochMillis::MIN, bootstrap_args, None).await
}

#[tracing::instrument(name = "storage::open", level = "info", skip_all)]
async fn open(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.open_stash(epoch_lower_bound).await?;
let stash = self.stash.take().expect("opened above");
retry_open(stash, boot_ts, bootstrap_args, deploy_generation).await
retry_open(stash, initial_ts, bootstrap_args, deploy_generation).await
}

#[tracing::instrument(name = "storage::open_debug", level = "info", skip_all)]
Expand Down Expand Up @@ -451,7 +450,7 @@ impl OpenableDurableCatalogState for OpenableConnection {
/// If the inner stash has not been opened.
async fn retry_open(
mut stash: Stash,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
Expand All @@ -462,7 +461,7 @@ async fn retry_open(
let mut retry = pin::pin!(retry);

loop {
match open_inner(stash, boot_ts.clone(), bootstrap_args, deploy_generation).await {
match open_inner(stash, initial_ts.clone(), bootstrap_args, deploy_generation).await {
Ok(conn) => {
return Ok(conn);
}
Expand All @@ -480,7 +479,7 @@ async fn retry_open(
#[tracing::instrument(name = "storage::open_inner", level = "info", skip_all)]
async fn open_inner(
mut stash: Stash,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
) -> Result<Box<Connection>, (Stash, CatalogError)> {
Expand All @@ -499,7 +498,7 @@ async fn open_inner(
Ok(txn) => txn,
Err(e) => return Err((conn.stash, e)),
};
match initialize::initialize(&mut tx, &args, boot_ts, deploy_generation).await {
match initialize::initialize(&mut tx, &args, initial_ts, deploy_generation).await {
Ok(()) => {}
Err(e) => return Err((conn.stash, e)),
}
Expand Down Expand Up @@ -1318,14 +1317,14 @@ impl TestOpenableConnection<'_> {
impl OpenableDurableCatalogState for TestOpenableConnection<'_> {
async fn open_savepoint(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.openable_connection
.open_savepoint(
boot_ts,
initial_ts,
bootstrap_args,
deploy_generation,
epoch_lower_bound,
Expand All @@ -1335,24 +1334,23 @@ impl OpenableDurableCatalogState for TestOpenableConnection<'_> {

async fn open_read_only(
mut self: Box<Self>,
boot_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.openable_connection
.open_read_only(boot_ts, bootstrap_args)
.open_read_only(bootstrap_args)
.await
}

async fn open(
mut self: Box<Self>,
boot_ts: EpochMillis,
initial_ts: EpochMillis,
bootstrap_args: &BootstrapArgs,
deploy_generation: Option<u64>,
epoch_lower_bound: Option<Epoch>,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
self.openable_connection
.open(
boot_ts,
initial_ts,
bootstrap_args,
deploy_generation,
epoch_lower_bound,
Expand Down
Loading

0 comments on commit 4e79534

Please sign in to comment.