Skip to content

Commit

Permalink
Attach next epoch justify qc to proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszrzasik committed Nov 28, 2024
1 parent a6c4cab commit 4488084
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 26 deletions.
31 changes: 28 additions & 3 deletions crates/example-types/src/storage_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
sync::Arc,
};

use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
use anyhow::{bail, Result};
use async_lock::RwLock;
use async_trait::async_trait;
Expand All @@ -17,7 +18,7 @@ use hotshot_types::{
data::{DaProposal, Leaf, Leaf2, QuorumProposal, QuorumProposal2, VidDisperseShare},
event::HotShotAction,
message::Proposal,
simple_certificate::{QuorumCertificate2, UpgradeCertificate},
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
traits::{
node_implementation::{ConsensusTime, NodeType},
storage::Storage,
Expand All @@ -28,8 +29,6 @@ use hotshot_types::{
};
use jf_vid::VidScheme;

use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};

type VidShares<TYPES> = HashMap<
<TYPES as NodeType>::View,
HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare<TYPES>>>,
Expand All @@ -42,6 +41,8 @@ pub struct TestStorageState<TYPES: NodeType> {
proposals2: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
next_epoch_high_qc2:
Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
action: TYPES::View,
epoch: TYPES::Epoch,
}
Expand All @@ -54,6 +55,7 @@ impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
proposals: BTreeMap::new(),
proposals2: BTreeMap::new(),
high_qc: None,
next_epoch_high_qc2: None,
high_qc2: None,
action: TYPES::View::genesis(),
epoch: TYPES::Epoch::genesis(),
Expand Down Expand Up @@ -100,6 +102,9 @@ impl<TYPES: NodeType> TestStorage<TYPES> {
pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
self.inner.read().await.high_qc2.clone()
}
pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
self.inner.read().await.next_epoch_high_qc2.clone()
}
pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
self.decided_upgrade_certificate.read().await.clone()
}
Expand Down Expand Up @@ -224,6 +229,26 @@ impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
}
Ok(())
}
async fn update_next_epoch_high_qc2(
&self,
new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
TYPES,
>,
) -> Result<()> {
if self.should_return_err {
bail!("Failed to update next epoch high qc to storage");
}
Self::run_delay_settings_from_config(&self.delay_config).await;
let mut inner = self.inner.write().await;
if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
}
} else {
inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
}
Ok(())
}
async fn update_undecided_state(
&self,
_leaves: CommitmentMap<Leaf<TYPES>>,
Expand Down
9 changes: 7 additions & 2 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use hotshot_types::{
data::{Leaf2, QuorumProposal, QuorumProposal2},
event::{EventType, LeafInfo},
message::{convert_proposal, DataMessage, Message, MessageKind, Proposal},
simple_certificate::{QuorumCertificate2, UpgradeCertificate},
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
traits::{
consensus_api::ConsensusApi,
election::Membership,
Expand All @@ -70,7 +70,6 @@ use hotshot_types::{
pub use rand;
use tokio::{spawn, time::sleep};
use tracing::{debug, instrument, trace};

// -- Rexports
// External
use crate::{
Expand Down Expand Up @@ -344,6 +343,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
saved_leaves,
saved_payloads,
initializer.high_qc,
initializer.next_epoch_high_qc,
Arc::clone(&consensus_metrics),
config.epoch_height,
);
Expand Down Expand Up @@ -1000,6 +1000,8 @@ pub struct HotShotInitializer<TYPES: NodeType> {
/// than `inner`s view number for the non genesis case because we must have seen higher QCs
/// to decide on the leaf.
high_qc: QuorumCertificate2<TYPES>,
/// Next epoch highest QC that was seen
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
/// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
/// Undecided leaves that were seen, but not yet decided on. These allow a restarting node
Expand Down Expand Up @@ -1030,6 +1032,7 @@ impl<TYPES: NodeType> HotShotInitializer<TYPES> {
actioned_view: TYPES::View::new(0),
saved_proposals: BTreeMap::new(),
high_qc,
next_epoch_high_qc: None,
decided_upgrade_certificate: None,
undecided_leaves: Vec::new(),
undecided_state: BTreeMap::new(),
Expand All @@ -1054,6 +1057,7 @@ impl<TYPES: NodeType> HotShotInitializer<TYPES> {
actioned_view: TYPES::View,
saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
high_qc: QuorumCertificate2<TYPES>,
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
undecided_leaves: Vec<Leaf2<TYPES>>,
undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
Expand All @@ -1068,6 +1072,7 @@ impl<TYPES: NodeType> HotShotInitializer<TYPES> {
actioned_view,
saved_proposals,
high_qc,
next_epoch_high_qc,
decided_upgrade_certificate,
undecided_leaves,
undecided_state,
Expand Down
103 changes: 93 additions & 10 deletions crates/task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,25 @@ use std::{
time::{Duration, Instant},
};

use crate::{
events::HotShotEvent,
helpers::{broadcast_event, parent_leaf_and_state},
quorum_proposal::{UpgradeLock, Versions},
};
use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use committable::Committable;
use hotshot_task::dependency_task::HandleDepOutput;
use either::Either;
use hotshot_task::{
dependency::{Dependency, EventDependency},
dependency_task::HandleDepOutput,
};
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{Leaf2, QuorumProposal2, VidDisperse, ViewChangeEvidence},
message::Proposal,
simple_certificate::{QuorumCertificate2, UpgradeCertificate},
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
traits::{
block_contents::BlockHeader,
election::Membership,
Expand All @@ -36,12 +45,6 @@ use tracing::instrument;
use utils::anytrace::*;
use vbs::version::StaticVersionType;

use crate::{
events::HotShotEvent,
helpers::{broadcast_event, parent_leaf_and_state},
quorum_proposal::{UpgradeLock, Versions},
};

/// Proposal dependency types. These types represent events that precipitate a proposal.
#[derive(PartialEq, Debug)]
pub(crate) enum ProposalDependency {
Expand Down Expand Up @@ -143,7 +146,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
}
None
}
/// Waits for the ocnfigured timeout for nodes to send HighQc messages to us. We'll
/// Waits for the configured timeout for nodes to send HighQc messages to us. We'll
/// then propose with the highest QC from among these proposals.
async fn wait_for_highest_qc(&mut self) {
tracing::error!("waiting for QC");
Expand Down Expand Up @@ -186,6 +189,74 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
}
}
}
/// Gets the next epoch QC corresponding to this epoch QC, times out if it takes too long.
/// We need the QC for the epoch transition proposals.
async fn get_next_epoch_qc(
&self,
high_qc: &QuorumCertificate2<TYPES>,
) -> Option<NextEpochQuorumCertificate2<TYPES>> {
tracing::debug!("getting the next epoch QC");
// If we haven't upgraded to Epochs just return None right away
if self
.upgrade_lock
.version(self.view_number)
.await
.is_ok_and(|version| version < V::Epochs::VERSION)
{
return None;
}
if let Some(next_epoch_qc) = self.consensus.read().await.next_epoch_high_qc() {
if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit {
// We have it already, no reason to wait
return Some(next_epoch_qc.clone());
}
};

let wait_duration = Duration::from_millis(self.timeout / 2);

// TODO configure timeout
let Some(time_spent) = Instant::now().checked_duration_since(self.view_start_time) else {
// Shouldn't be possible, now must be after the start
return None;
};
let Some(time_left) = wait_duration.checked_sub(time_spent) else {
// No time left
return None;
};
let receiver = self.receiver.clone();
let Ok(Some(event)) = tokio::time::timeout(time_left, async move {
let this_epoch_high_qc = high_qc.clone();
EventDependency::new(
receiver,
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::NextEpochQc2Formed(Either::Left(qc)) = event {
qc.data.leaf_commit == this_epoch_high_qc.data.leaf_commit
} else {
false
}
}),
)
.completed()
.await
})
.await
else {
// Check again, there is a chance we missed it
if let Some(next_epoch_qc) = self.consensus.read().await.next_epoch_high_qc() {
if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit {
// We have it already, no reason to wait
return Some(next_epoch_qc.clone());
}
};
return None;
};
let HotShotEvent::NextEpochQc2Formed(Either::Left(qc)) = event.as_ref() else {
// this shouldn't happen
return None;
};
Some(qc.clone())
}
/// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`]
/// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`],
/// with optional [`ViewChangeEvidence`].
Expand Down Expand Up @@ -311,11 +382,22 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
);
return Ok(());
}
let next_epoch_qc = if self
.consensus
.read()
.await
.is_leaf_for_last_block(parent_qc.data.leaf_commit)
{
self.get_next_epoch_qc(&parent_qc).await
} else {
None
};
let proposal = QuorumProposal2 {
block_header,
view_number: self.view_number,
epoch,
justify_qc: parent_qc,
next_epoch_justify_qc: next_epoch_qc,
upgrade_certificate,
view_change_evidence: proposal_certificate,
// TODO fix these to use the proper values
Expand All @@ -340,8 +422,9 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
_pd: PhantomData,
};
tracing::debug!(
"Sending proposal for view {:?}",
"Sending proposal for view {:?}, proposal: {:?}",
proposed_leaf.view_number(),
message,
);

broadcast_event(
Expand Down
29 changes: 29 additions & 0 deletions crates/task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,35 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
);
self.highest_qc = qc.clone();
}
HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
// Only update if the qc is from a newer view
let current_next_epoch_qc =
self.consensus.read().await.next_epoch_high_qc().cloned();
if current_next_epoch_qc.is_some()
&& next_epoch_qc.view_number <= current_next_epoch_qc.unwrap().view_number
{
tracing::trace!(
"Received a next epoch QC for a view that was not > than our current next epoch high QC"
);
}
self.consensus
.write()
.await
.update_next_epoch_high_qc(next_epoch_qc.clone())
.wrap()
.context(error!(
"Failed to update next epoch high QC in internal consensus state!"
))?;

// Then update the next epoch high QC in storage
self.storage
.write()
.await
.update_next_epoch_high_qc2(next_epoch_qc.clone())
.await
.wrap()
.context(error!("Failed to update next epoch high QC in storage!"))?;
}
_ => {}
}
Ok(())
Expand Down
17 changes: 10 additions & 7 deletions crates/testing/src/spinning_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use std::{
sync::Arc,
};

use crate::{
test_launcher::Network,
test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
test_task::{TestResult, TestTaskState},
};
use anyhow::Result;
use async_broadcast::broadcast;
use async_lock::RwLock;
Expand All @@ -28,7 +33,7 @@ use hotshot_types::{
constants::EVENT_CHANNEL_SIZE,
data::Leaf2,
event::Event,
simple_certificate::QuorumCertificate2,
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2},
traits::{
network::{AsyncGenerator, ConnectedNetwork},
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
Expand All @@ -37,12 +42,6 @@ use hotshot_types::{
ValidatorConfig,
};

use crate::{
test_launcher::Network,
test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
test_task::{TestResult, TestTaskState},
};

/// convenience type for state and block
pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);

Expand All @@ -65,6 +64,8 @@ pub struct SpinningTask<
pub(crate) last_decided_leaf: Leaf2<TYPES>,
/// Highest qc seen in the test for restarting nodes
pub(crate) high_qc: QuorumCertificate2<TYPES>,
/// Next epoch highest qc seen in the test for restarting nodes
pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
/// Add specified delay to async calls
pub(crate) async_delay_config: DelayConfig,
/// Context stored for nodes to be restarted with
Expand Down Expand Up @@ -160,6 +161,7 @@ where
TYPES::View::genesis(),
BTreeMap::new(),
self.high_qc.clone(),
self.next_epoch_high_qc.clone(),
None,
Vec::new(),
BTreeMap::new(),
Expand Down Expand Up @@ -249,6 +251,7 @@ where
)
.await,
),
read_storage.next_epoch_high_qc_cloned().await,
read_storage.decided_upgrade_certificate().await,
Vec::new(),
BTreeMap::new(),
Expand Down
1 change: 1 addition & 0 deletions crates/testing/src/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ where
&TestInstanceState::default(),
)
.await,
next_epoch_high_qc: None,
async_delay_config: launcher.metadata.async_delay_config,
restart_contexts: HashMap::new(),
channel_generator: launcher.resource_generator.channel_generator,
Expand Down
Loading

0 comments on commit 4488084

Please sign in to comment.