Skip to content

Commit

Permalink
persist-txn: allow data shards to be forgotten (#22475)
Browse files Browse the repository at this point in the history
This is necessary to support deleting a table in mz (which requires
advancing the persist shard's upper to the empty antichain).

A data shard is removed from the txns set using a `forget` operation
that writes a retraction of the registration update at some `forget_ts`.
After this, the shard may be used through normal means, such as direct
`compare_and_append` writes or tombstone-ing it. To prevent accidental
misuse, the forget operation ensures that all writes to the data shard
have been applied before writing the retraction.

This PR also removes the restriction that a data shard must be
registered to read it with a snapshot. That was added initially because
it seems "obvious" (and because reads initially worked differently), but
in practice it turns out to be unnecessary and the system is less
brittle without the requirement. Specifically:

- The information necessary to read at a given timestamp is simply
whether there are any unapplied writes below it.
- At times before the first registration time, there are trivially no
unapplied writes.
- Because the forget operation carefully ensures that all writes have
been applied before writing the retraction, this stays true for _every_
register ts, including the latest one.
- As a result, we always know the data shard has no unapplied writes up
to the first unapplied write of the current registration (or
progress_exclusive if there are none).
- (This will remain true when we add compaction, because it will keep
the invariant that the txns shard since is held back before every write
until it's known to be applied.)

Touches #22173

I think this also...

Fixes #22465

### Motivation

  * This PR adds a known-desirable feature.
  • Loading branch information
danhhz authored Oct 21, 2023
1 parent 48cf08a commit f91987b
Show file tree
Hide file tree
Showing 8 changed files with 774 additions and 321 deletions.
15 changes: 15 additions & 0 deletions doc/developer/design/20230705_v2_txn_management.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ Restrictions:
growing unboundedly and also means that, at any given time, the txns shard
contains the set of txns that need to be applied (as well as the set of
registered data shards).
- A data shard may be _forgotten_ at some `forget_ts` to reclaim it from the
txns system. This allows us to delete it (e.g. when a table is dropped). Like
registration, forget is idempotent.

### Usage

Expand Down Expand Up @@ -362,6 +365,18 @@ maintenance or a CRDB write, but this is also true for registering a reader. On
the balance, I think this is a _much_ better set of tradeoffs than the original
plan.

### Forget

A data shard is removed from the txns set using a `forget` operation that writes
a retraction of the registration update at some `forget_ts`. After this, the
shard may be used through normal means, such as direct `compare_and_append`
writes or tombstone-ing it. To prevent accidental misuse, the forget operation
ensures that all writes to the data shard have been applied before writing the
retraction.

The code will support repeatedly registering and forgetting the same data shard,
but this is not expected to be used in normal operation.

## Alternatives

### Alternative 1: Put all tables in a single persist shard
Expand Down
14 changes: 2 additions & 12 deletions src/persist-cli/src/maelstrom/txn_list_append_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,8 @@ impl Transactor {
// To recover them, instead of grabbing a snapshot at the read_ts,
// we have to start a subscription at time 0 and walk it forward
// until we pass read_ts.
let as_of = self
.txns
.read_cache()
.data_since(data_id)
.expect("data shard has been registered");
let mut subscribe = DataSubscribe::new(
"maelstrom",
self.client.clone(),
self.txns_id,
*data_id,
as_of,
);
let mut subscribe =
DataSubscribe::new("maelstrom", self.client.clone(), self.txns_id, *data_id, 0);
while subscribe.progress() <= read_ts {
subscribe.step();
}
Expand Down
22 changes: 0 additions & 22 deletions src/persist-txn/src/error.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/persist-txn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ use timely::order::TotalOrder;
use timely::progress::{Antichain, Timestamp};
use tracing::{debug, instrument};

pub mod error;
pub mod operator;
pub mod txn_read;
pub mod txn_write;
Expand Down
20 changes: 13 additions & 7 deletions src/persist-txn/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use timely::progress::{Antichain, Timestamp};
use timely::scheduling::Scheduler;
use timely::worker::Worker;
use timely::{Data, WorkerConfig};
use tracing::debug;

use crate::txn_read::{DataListenNext, TxnsCache};
use crate::{TxnsCodec, TxnsCodecDefault};
Expand Down Expand Up @@ -104,6 +105,7 @@ where
let mut passthrough_input =
builder.new_input_connection(&passthrough, Pipeline, vec![Antichain::new()]);

let name = name.to_owned();
let shutdown_button = builder.build(move |capabilities| async move {
let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
let client = client.await;
Expand All @@ -120,9 +122,7 @@ where
let mut txns_cache = TxnsCache::<T, C>::open(txns_read).await;

txns_cache.update_gt(&as_of).await;
let snap = txns_cache
.data_snapshot(&data_id, as_of.clone())
.expect("data shard is registered");
let snap = txns_cache.data_snapshot(data_id, as_of.clone());
let data_write = client
.open_writer::<K, V, T, D>(
data_id,
Expand Down Expand Up @@ -180,9 +180,15 @@ where
// find out what to do next given our current progress.
loop {
txns_cache.update_ge(&output_progress_exclusive).await;
let data_listen_next = txns_cache
.data_listen_next(&data_id, output_progress_exclusive.clone())
.expect("table should still exist");
let data_listen_next =
txns_cache.data_listen_next(&data_id, output_progress_exclusive.clone());
debug!(
"txns_progress({}): data_listen_next {:.9} at {:?}: {:?}",
name,
data_id.to_string(),
output_progress_exclusive,
data_listen_next
);
match data_listen_next {
// We've caught up to the txns upper and we have to wait for
// it to advance before asking again.
Expand Down Expand Up @@ -225,7 +231,7 @@ where
///
/// [Subscribe]: mz_persist_client::read::Subscribe
pub struct DataSubscribe {
as_of: u64,
pub(crate) as_of: u64,
pub(crate) worker: Worker<timely::communication::allocator::Thread>,
data: ProbeHandle<u64>,
txns: ProbeHandle<u64>,
Expand Down
Loading

0 comments on commit f91987b

Please sign in to comment.