Skip to content

Commit

Permalink
Use a more robust method for deciding which leaves have already been …
Browse files Browse the repository at this point in the history
…processed

Store last processed leaf view in Postgres rather than trying to
dead reckon.
  • Loading branch information
jbearer committed Jul 17, 2024
1 parent fadc710 commit 32149ae
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
5 changes: 5 additions & 0 deletions sequencer/api/migrations/V37__anchor_leaf_chain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@ ALTER TABLE anchor_leaf
DROP COLUMN id,
DROP COLUMN height,
ADD CONSTRAINT anchor_leaf_pk PRIMARY KEY (view);

CREATE TABLE event_stream (
id SERIAL PRIMARY KEY,
last_processed_view BIGINT
);
64 changes: 47 additions & 17 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,22 +673,6 @@ async fn collect_garbage(
)
.await?;

// The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is one
// -- was always included in the _previous_ decide event...but not removed from the database
// (see above). Thus, we can exclude the oldest leaf from the new decide event.
if let Some((oldest_view, _)) = leaves.first_key_value() {
// The only exception is when the oldest leaf is the genesis leaf; then there was no
// previous decide event.
if *oldest_view > 0 {
tracing::debug!(
oldest_view,
remaining_leaves = leaves.len() - 1,
"excluding previously processed view from decide event"
);
leaves.pop_first();
}
}

// Clean up old proposals. These are not part of the decide event we generate for the consumer,
// so we don't need to return them.
tx.execute(
Expand All @@ -697,6 +681,39 @@ async fn collect_garbage(
)
.await?;

// Exclude from the decide event any leaves which have definitely already been processed. We may
// have selected an already-processed leaf because the oldest leaf -- the last leaf processed in
// the previous decide event -- remained in the database to serve as the anchor leaf, so our
// query would have returned it. In fact, this will almost always be the case, but there are two
// cases where it might not be, and we must process this leaf after all:
//
// 1. The oldest leaf is the genesis leaf, and there _is_ no previous decide event
// 2. We previously stored some leaves in the database and then failed while processing the
// decide event, or shut down before generating the decide event, and so we are only now
// generating the decide event for those previous leaves.
//
// Since these cases (particularly case 2) are hard to account for explicitly, we just use a
// persistent value in the database to remember how far we have successfully processed the event
// stream.
let last_processed_view: Option<i64> = tx
.query_opt_static("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
.await?
.map(|row| row.get("last_processed_view"));
let leaves = if let Some(v) = last_processed_view {
let new_leaves = leaves.split_off(&((v as u64) + 1));
if !leaves.is_empty() {
tracing::debug!(
v,
remaining_leaves = new_leaves.len(),
?leaves,
"excluding already-processed leaves from decide event"
);
}
new_leaves
} else {
leaves
};

// Collate all the information by view number and construct a chain of leaves and a chain of
// corresponding QCs.
let (leaf_chain, qcs): (Vec<_>, Vec<_>) = leaves
Expand Down Expand Up @@ -739,7 +756,7 @@ async fn collect_garbage(
tracing::info!(?view, "no new leaves at decide");
return Ok(());
};
tracing::debug!(?final_qc, ?leaf_chain, "generating decide event");
tracing::debug!(?view, ?final_qc, ?leaf_chain, "generating decide event");

consumer
.handle_event(&Event {
Expand All @@ -752,6 +769,19 @@ async fn collect_garbage(
})
.await?;

// Now that we have definitely processed leaves up to `view`, we can update
// `last_processed_view` so we don't process these leaves again. We may still fail at this
// point, or shut down, and fail to complete this update. At worst this will lead to us sending
// a duplicate decide event the next time we are called; this is fine as the event consumer is
// required to be idempotent.
tx.upsert(
"event_stream",
["id", "last_processed_view"],
["id"],
[[sql_param(&1i32), sql_param(&(view.u64() as i64))]],
)
.await?;

Ok(())
}

Expand Down

0 comments on commit 32149ae

Please sign in to comment.