Skip to content

Commit

Permalink
Move event back-processing to async task so it doesn't block startup
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Aug 12, 2024
1 parent c4f04e4 commit 6f623e6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 29 deletions.
21 changes: 18 additions & 3 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
pub async fn init(
network_config: NetworkConfig<PubKey>,
instance_state: NodeState,
mut persistence: P,
persistence: P,
network: Arc<N>,
state_relay_server: Option<Url>,
metrics: &dyn Metrics,
Expand All @@ -104,8 +104,8 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
.set(instance_state.node_id as usize);

// Load saved consensus state from storage.
let initializer = persistence
.load_consensus_state(instance_state.clone(), &event_consumer)
let (initializer, anchor_view) = persistence
.load_consensus_state(instance_state.clone())
.await?;

let committee_membership = GeneralStaticCommittee::create_election(
Expand Down Expand Up @@ -180,6 +180,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
instance_state,
network_config,
event_consumer,
anchor_view,
))
}

Expand All @@ -194,6 +195,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
node_state: NodeState,
config: NetworkConfig<PubKey>,
event_consumer: impl PersistenceEventConsumer + 'static,
anchor_view: Option<ViewNumber>,
) -> Self {
let events = handle.event_stream();

Expand All @@ -218,6 +220,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, Ver: StaticVersionTyp
external_event_handler,
Some(event_streamer.clone()),
event_consumer,
anchor_view,
),
);

Expand Down Expand Up @@ -351,7 +354,19 @@ async fn handle_events<Ver: StaticVersionType>(
external_event_handler: ExternalEventHandler,
events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
event_consumer: impl PersistenceEventConsumer + 'static,
anchor_view: Option<ViewNumber>,
) {
if let Some(view) = anchor_view {
// Process and clean up any leaves that we may have persisted last time we were running but
// failed to handle due to a shutdown.
let mut p = persistence.write().await;
if let Err(err) = p.append_decided_leaves(view, vec![], &event_consumer).await {
tracing::warn!(
"failed to process decided leaves, chain may not be up to date: {err:#}"
);
}
}

while let Some(event) = events.next().await {
tracing::debug!(node_id, ?event, "consensus event");

Expand Down
47 changes: 21 additions & 26 deletions types/src/v0/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,13 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
/// Load the latest known consensus state.
///
/// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
/// if there is no saved state).
/// if there is no saved state). Also returns the anchor view number, which can be used as a
/// reference point to process any events which were not processed before a previous shutdown,
/// if applicable,.
async fn load_consensus_state(
&mut self,
&self,
state: NodeState,
consumer: &(impl EventConsumer + 'static),
) -> anyhow::Result<HotShotInitializer<SeqTypes>> {
) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
let genesis_validated_state = ValidatedState::genesis(&state).0;
let highest_voted_view = match self
.load_latest_acted_view()
Expand All @@ -385,7 +386,7 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
ViewNumber::genesis()
}
};
let (leaf, high_qc) = match self
let (leaf, high_qc, anchor_view) = match self
.load_anchor_leaf()
.await
.context("loading anchor leaf")?
Expand All @@ -401,24 +402,15 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
)
);

// Process and clean up any leaves that we may have persisted last time we were
// running but failed to handle due to a shutdown.
if let Err(err) = self
.append_decided_leaves(leaf.view_number(), vec![], consumer)
.await
{
tracing::warn!(
"failed to process decided leaves, chain may not be up to date: {err:#}"
);
}

(leaf, high_qc)
let anchor_view = leaf.view_number();
(leaf, high_qc, Some(anchor_view))
}
None => {
tracing::info!("no saved leaf, starting from genesis leaf");
(
Leaf::genesis(&genesis_validated_state, &state).await,
QuorumCertificate::genesis(&genesis_validated_state, &state).await,
None,
)
}
};
Expand Down Expand Up @@ -462,15 +454,18 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
"loaded consensus state"
);

Ok(HotShotInitializer::from_reload(
leaf,
state,
validated_state,
view,
saved_proposals,
high_qc,
undecided_leaves.into_values().collect(),
undecided_state,
Ok((
HotShotInitializer::from_reload(
leaf,
state,
validated_state,
view,
saved_proposals,
high_qc,
undecided_leaves.into_values().collect(),
undecided_state,
),
anchor_view,
))
}

Expand Down

0 comments on commit 6f623e6

Please sign in to comment.