Skip to content

Commit

Permalink
use BlockPayload for calculate_vid_disperse (#4018)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrain authored Jan 9, 2025
1 parent 0cad927 commit 2ed0749
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 35 deletions.
5 changes: 1 addition & 4 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use hotshot_types::{
signature_key::SignatureKey,
states::ValidatedState,
storage::Storage,
EncodeBytes,
},
utils::epoch_from_block_number,
HotShotConfig,
Expand Down Expand Up @@ -321,9 +320,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
saved_leaves.insert(leaf.commit(), leaf.clone());
}
if let Some(payload) = anchored_leaf.block_payload() {
let encoded_txns = payload.encode();

saved_payloads.insert(anchored_leaf.view_number(), Arc::clone(&encoded_txns));
saved_payloads.insert(anchored_leaf.view_number(), Arc::new(payload));
}

let anchored_epoch = if config.epoch_height == 0 {
Expand Down
18 changes: 12 additions & 6 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use hotshot_types::{
node_implementation::{NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
storage::Storage,
BlockPayload, EncodeBytes,
},
utils::EpochTransitionIndicator,
vote::HasViewNumber,
Expand Down Expand Up @@ -108,7 +109,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
);

if let Some(payload) = self.consensus.read().await.saved_payloads().get(&view) {
ensure!(*payload == proposal.data.encoded_transactions, error!(
ensure!(payload.encode() == proposal.data.encoded_transactions, error!(
"Received DA proposal for view {:?} but we already have a payload for that view and they are not identical. Throwing it away",
view)
);
Expand Down Expand Up @@ -216,11 +217,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
tracing::trace!("{e:?}");
}

let payload = Arc::new(TYPES::BlockPayload::from_bytes(
proposal.data.encoded_transactions.as_ref(),
&proposal.data.metadata,
));
// Record the payload we have promised to make available.
if let Err(e) = consensus_writer.update_saved_payloads(
view_number,
Arc::clone(&proposal.data.encoded_transactions),
) {
if let Err(e) = consensus_writer.update_saved_payloads(view_number, payload) {
tracing::trace!("{e:?}");
}
// Optimistically calculate and update VID if we know that the primary network is down.
Expand Down Expand Up @@ -352,12 +354,16 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
&event_stream,
)
.await;
let payload = Arc::new(TYPES::BlockPayload::from_bytes(
encoded_transactions.as_ref(),
metadata,
));
// Save the payload early because we might need it to calculate VID for the next epoch nodes.
if let Err(e) = self
.consensus
.write()
.await
.update_saved_payloads(view_number, Arc::clone(encoded_transactions))
.update_saved_payloads(view_number, payload)
{
tracing::trace!("{e:?}");
}
Expand Down
17 changes: 5 additions & 12 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,12 @@ pub async fn decide_from_proposal_2<TYPES: NodeType>(
res.leaf_views.push(info.clone());
// If the block payload is available for this leaf, include it in
// the leaf chain that we send to the client.
if let Some(encoded_txns) = consensus_reader
if let Some(payload) = consensus_reader
.saved_payloads()
.get(&info.leaf.view_number())
{
let payload =
BlockPayload::from_bytes(encoded_txns, info.leaf.block_header().metadata());

info.leaf.fill_block_payload_unchecked(payload);
info.leaf
.fill_block_payload_unchecked(payload.as_ref().clone());
}

if let Some(ref payload) = info.leaf.block_payload() {
Expand Down Expand Up @@ -451,13 +449,8 @@ pub async fn decide_from_proposal<TYPES: NodeType>(
}
// If the block payload is available for this leaf, include it in
// the leaf chain that we send to the client.
if let Some(encoded_txns) =
consensus_reader.saved_payloads().get(&leaf.view_number())
{
let payload =
BlockPayload::from_bytes(encoded_txns, leaf.block_header().metadata());

leaf.fill_block_payload_unchecked(payload);
if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
leaf.fill_block_payload_unchecked(payload.as_ref().clone());
}

// Get the VID share at the leaf's view number, corresponding to our key
Expand Down
8 changes: 4 additions & 4 deletions crates/task-impls/src/vid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
return None;
}
let vid_disperse = VidDisperse::calculate_vid_disperse(
Arc::clone(encoded_transactions),
&payload,
&Arc::clone(&self.membership),
*view_number,
epoch,
Expand Down Expand Up @@ -192,19 +192,19 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
);

let consensus_reader = self.consensus.read().await;
let Some(txns) = consensus_reader.saved_payloads().get(&proposal_view_number)
let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
else {
tracing::warn!(
"We need to calculate VID for the nodes in the next epoch \
but we don't have the transactions"
);
return None;
};
let txns = Arc::clone(txns);
let payload = Arc::clone(payload);
drop(consensus_reader);

let next_epoch_vid_disperse = VidDisperse::calculate_vid_disperse(
txns,
payload.as_ref(),
&Arc::clone(&self.membership),
proposal_view_number,
target_epoch,
Expand Down
23 changes: 15 additions & 8 deletions crates/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub struct Consensus<TYPES: NodeType> {
/// Saved payloads.
///
/// Encoded transactions for every view if we got a payload for that view.
saved_payloads: BTreeMap<TYPES::View, Arc<[u8]>>,
saved_payloads: BTreeMap<TYPES::View, Arc<TYPES::BlockPayload>>,

/// the highqc per spec
high_qc: QuorumCertificate2<TYPES>,
Expand Down Expand Up @@ -418,7 +418,7 @@ impl<TYPES: NodeType> Consensus<TYPES> {
last_actioned_view: TYPES::View,
last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
saved_leaves: CommitmentMap<Leaf2<TYPES>>,
saved_payloads: BTreeMap<TYPES::View, Arc<[u8]>>,
saved_payloads: BTreeMap<TYPES::View, Arc<TYPES::BlockPayload>>,
high_qc: QuorumCertificate2<TYPES>,
next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
metrics: Arc<ConsensusMetricsValue>,
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<TYPES: NodeType> Consensus<TYPES> {
}

/// Get the saved payloads.
pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<[u8]>> {
pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<TYPES::BlockPayload>> {
&self.saved_payloads
}

Expand Down Expand Up @@ -732,13 +732,13 @@ impl<TYPES: NodeType> Consensus<TYPES> {
pub fn update_saved_payloads(
&mut self,
view_number: TYPES::View,
encoded_transaction: Arc<[u8]>,
payload: Arc<TYPES::BlockPayload>,
) -> Result<()> {
ensure!(
!self.saved_payloads.contains_key(&view_number),
"Payload with the same view already exists."
);
self.saved_payloads.insert(view_number, encoded_transaction);
self.saved_payloads.insert(view_number, payload);
Ok(())
}

Expand Down Expand Up @@ -946,16 +946,23 @@ impl<TYPES: NodeType> Consensus<TYPES> {
membership: Arc<RwLock<TYPES::Membership>>,
private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
) -> Option<()> {
let txns = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
let payload = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
let epoch = consensus
.read()
.await
.validated_state_map()
.get(&view)?
.view_inner
.epoch()?;
let vid =
VidDisperse::calculate_vid_disperse(txns, &membership, view, epoch, epoch, None).await;
let vid = VidDisperse::calculate_vid_disperse(
payload.as_ref(),
&membership,
view,
epoch,
epoch,
None,
)
.await;
let shares = VidDisperseShare2::from_vid_disperse(vid);
let mut consensus_writer = consensus.write().await;
for share in shares {
Expand Down
3 changes: 2 additions & 1 deletion crates/types/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl<TYPES: NodeType> VidDisperse<TYPES> {
/// Panics if the VID calculation fails, this should not happen.
#[allow(clippy::panic)]
pub async fn calculate_vid_disperse(
txns: Arc<[u8]>,
payload: &TYPES::BlockPayload,
membership: &Arc<RwLock<TYPES::Membership>>,
view: TYPES::View,
target_epoch: TYPES::Epoch,
Expand All @@ -268,6 +268,7 @@ impl<TYPES: NodeType> VidDisperse<TYPES> {
) -> Self {
let num_nodes = membership.read().await.total_nodes(target_epoch);

let txns = payload.encode();
let txns_clone = Arc::clone(&txns);
let vid_disperse = spawn_blocking(move || {
precompute_data
Expand Down

0 comments on commit 2ed0749

Please sign in to comment.