diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 1d9d2d5ce..c32b9821e 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -84,7 +84,7 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp pub async fn init( network_config: NetworkConfig, instance_state: NodeState, - mut persistence: P, + persistence: P, network: Arc, state_relay_server: Option, metrics: &dyn Metrics, @@ -104,8 +104,8 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp .set(instance_state.node_id as usize); // Load saved consensus state from storage. - let initializer = persistence - .load_consensus_state(instance_state.clone(), &event_consumer) + let (initializer, anchor_view) = persistence + .load_consensus_state(instance_state.clone()) .await?; let committee_membership = GeneralStaticCommittee::create_election( @@ -180,6 +180,7 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp instance_state, network_config, event_consumer, + anchor_view, )) } @@ -194,6 +195,7 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp node_state: NodeState, config: NetworkConfig, event_consumer: impl PersistenceEventConsumer + 'static, + anchor_view: Option, ) -> Self { let events = handle.event_stream(); @@ -218,6 +220,7 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp external_event_handler, Some(event_streamer.clone()), event_consumer, + anchor_view, ), ); @@ -351,7 +354,19 @@ async fn handle_events( external_event_handler: ExternalEventHandler, events_streamer: Option>>>, event_consumer: impl PersistenceEventConsumer + 'static, + anchor_view: Option, ) { + if let Some(view) = anchor_view { + // Process and clean up any leaves that we may have persisted last time we were running but + // failed to handle due to a shutdown. + let mut p = persistence.write().await; + if let Err(err) = p.append_decided_leaves(view, vec![], &event_consumer).await { + tracing::warn!( + "failed to process decided leaves, chain may not be up to date: {err:#}" + ); + } + } + while let Some(event) = events.next().await { tracing::debug!(node_id, ?event, "consensus event"); diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index be9346368..5761bcd3e 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -364,12 +364,13 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { /// Load the latest known consensus state. /// /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis, - /// if there is no saved state). + /// if there is no saved state). Also returns the anchor view number, which can be used as a + /// reference point to process any events which were not processed before a previous shutdown, + /// if applicable,. async fn load_consensus_state( - &mut self, + &self, state: NodeState, - consumer: &(impl EventConsumer + 'static), - ) -> anyhow::Result> { + ) -> anyhow::Result<(HotShotInitializer, Option)> { let genesis_validated_state = ValidatedState::genesis(&state).0; let highest_voted_view = match self .load_latest_acted_view() @@ -385,7 +386,7 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { ViewNumber::genesis() } }; - let (leaf, high_qc) = match self + let (leaf, high_qc, anchor_view) = match self .load_anchor_leaf() .await .context("loading anchor leaf")? @@ -401,24 +402,15 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { ) ); - // Process and clean up any leaves that we may have persisted last time we were - // running but failed to handle due to a shutdown. - if let Err(err) = self - .append_decided_leaves(leaf.view_number(), vec![], consumer) - .await - { - tracing::warn!( - "failed to process decided leaves, chain may not be up to date: {err:#}" - ); - } - - (leaf, high_qc) + let anchor_view = leaf.view_number(); + (leaf, high_qc, Some(anchor_view)) } None => { tracing::info!("no saved leaf, starting from genesis leaf"); ( Leaf::genesis(&genesis_validated_state, &state).await, QuorumCertificate::genesis(&genesis_validated_state, &state).await, + None, ) } }; @@ -462,15 +454,18 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { "loaded consensus state" ); - Ok(HotShotInitializer::from_reload( - leaf, - state, - validated_state, - view, - saved_proposals, - high_qc, - undecided_leaves.into_values().collect(), - undecided_state, + Ok(( + HotShotInitializer::from_reload( + leaf, + state, + validated_state, + view, + saved_proposals, + high_qc, + undecided_leaves.into_values().collect(), + undecided_state, + ), + anchor_view, )) }