Skip to content

Commit

Permalink
minor drb cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ss-es committed Jan 6, 2025
1 parent eb3e357 commit 7eb945c
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 60 deletions.
116 changes: 56 additions & 60 deletions crates/task-impls/src/quorum_vote/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,32 +112,31 @@ async fn store_and_get_computed_drb_result<
{
return Ok(*computed_result);
}
if let Some((task_epoch, computation)) = &mut task_state.drb_computation {
if *task_epoch == epoch_number {
// If the computation is finished, remove the task and store the result.
if computation.is_finished() {
match computation.await {
Ok(computed_result) => {
let mut consensus_writer = task_state.consensus.write().await;
consensus_writer
.drb_seeds_and_results
.results
.insert(epoch_number, computed_result);
task_state.drb_computation = None;
Ok(computed_result)
}
Err(e) => {
bail!("Failed to get the DRB result though the computation is finished: {:?}.", e);
}
}
} else {
bail!("DRB computation isn't finished.");
}
} else {
bail!("DRB computation isn't for the next epoch.");

let (task_epoch, computation) =
(&mut task_state.drb_computation).context(warn!("DRB computation task doesn't exist."))?;

ensure!(
*task_epoch == epoch_number,
info!("DRB computation is not for the next epoch.")
);

ensure!(
computation.is_finished(),
info!("DRB computation has not yet finished.")
);

match computation.await {
Ok(result) => {
let mut consensus_writer = task_state.consensus.write().await;
consensus_writer
.drb_seeds_and_results
.results
.insert(epoch_number, result);
task_state.drb_computation = None;
Ok(result)
}
} else {
bail!("DRB computation task doesn't exist.");
Err(e) => Err(warn!("Error in DRB calculation: {:?}.", e)),
}
}

Expand All @@ -154,44 +153,41 @@ async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Ver
let current_block_number = proposal.block_header.block_number();

// Skip if this is not the expected block.
if task_state.epoch_height != 0
&& is_last_block_in_epoch(current_block_number, task_state.epoch_height)
{
let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
proposal.block_header.block_number(),
task_state.epoch_height,
));
ensure!(
task_state.epoch_height != 0
&& is_last_block_in_epoch(current_block_number, task_state.epoch_height),
debug!("Not expected block number.")
);

let Some(proposal_result) = proposal.next_drb_result else {
bail!(
"The proposal for the last block of an epoch should contain the DRB result for the next epoch."
);
};
let epoch = TYPES::Epoch::new(epoch_from_block_number(
proposal.block_header.block_number(),
task_state.epoch_height,
));

// Verify and store the result depending on our membership.
if task_state
.membership
.read()
.await
.has_stake(&task_state.public_key, current_epoch_number)
{
let computed_result =
store_and_get_computed_drb_result(current_epoch_number + 1, task_state).await?;
if proposal_result != computed_result {
bail!("Inconsistent DRB result for the next epoch.");
}
return Ok(());
} else if task_state
.membership
.read()
.await
.has_stake(&task_state.public_key, current_epoch_number + 1)
{
store_received_drb_result(current_epoch_number + 1, proposal_result, task_state)
.await?;
}
let proposal_result = proposal
.next_drb_result
.context(info!("Proposal is missing the DRB result."))?;

let membership_reader = task_state.membership.read().await;

let has_stake_current_epoch = membership_reader.has_stake(&task_state.public_key, epoch);
let has_stake_next_epoch = membership_reader.has_stake(&task_state.public_key, epoch + 1);

drop(membership_reader);

if has_stake_current_epoch {
let computed_result = store_and_get_computed_drb_result(epoch + 1, task_state).await?;

ensure!(proposal_result == computed_result, warn!("Our calculated DRB result is {:?}, which does not match the proposed DRB result of {:?}", computed_result, proposal_result));

Ok(())
} else if has_stake_next_epoch {
store_received_drb_result(epoch + 1, proposal_result, task_state).await
} else {
Err(error!(
"We are not participating in either the current or next epoch"
))
}
Ok(())
}

/// Start the DRB computation task for the next epoch.
Expand Down
9 changes: 9 additions & 0 deletions crates/utils/src/anytrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ impl<T> Context<T> for Option<T> {
}
}

impl<'a, T> Context<&'a mut T> for &'a mut Option<T> {
fn context(self, error: Error) -> Result<&'a mut T> {
match self {
Some(t) => Ok(t),
None => Err(error),
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit 7eb945c

Please sign in to comment.