Skip to content

Commit

Permalink
fix(sumeragi): Use proper view_change_index on init block load
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed May 22, 2024
1 parent 686679b commit eace2aa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
30 changes: 14 additions & 16 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl Sumeragi {
new_wsv.world_mut().trusted_peers_ids =
block.payload().commit_topology.clone();
self.commit_block(block, new_wsv);
return Err(EarlyReturn::GenesisBlockReceivedAndCommitted);
return Ok(());
}
}
Err(mpsc::TryRecvError::Disconnected) => return Err(EarlyReturn::Disconnected),
Expand Down Expand Up @@ -789,21 +789,21 @@ pub(crate) fn run(
sumeragi.connect_peers(&sumeragi.current_topology);

let span = span!(tracing::Level::TRACE, "genesis").entered();
let is_genesis_peer = if sumeragi.wsv.height() == 0
|| sumeragi.wsv.latest_block_hash().is_none()
{
if let Some(genesis_network) = genesis_network {
sumeragi.sumeragi_init_commit_genesis(genesis_network);
true
let is_genesis_peer =
if sumeragi.wsv.height() == 0 || sumeragi.wsv.latest_block_hash().is_none() {
if let Some(genesis_network) = genesis_network {
sumeragi.sumeragi_init_commit_genesis(genesis_network);
true
} else {
if let Err(err) = sumeragi.init_listen_for_genesis(&mut shutdown_receiver) {
info!(?err, "Sumeragi Thread is being shut down.");
return;
}
false
}
} else {
sumeragi
.init_listen_for_genesis(&mut shutdown_receiver)
.unwrap_or_else(|err| assert_ne!(EarlyReturn::Disconnected, err, "Disconnected"));
false
}
} else {
false
};
};
span.exit();

info!(
Expand Down Expand Up @@ -1027,8 +1027,6 @@ fn vote_for_block(
/// FromResidual`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum EarlyReturn {
/// Genesis block received and committed
GenesisBlockReceivedAndCommitted,
/// Shutdown message received.
ShutdownMessageReceived,
/// Disconnected
Expand Down
31 changes: 22 additions & 9 deletions core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ impl SumeragiHandle {
fn replay_block(
block: &SignedBlock,
wsv: &mut WorldStateView,
mut current_topology: Topology,
) -> Topology {
recreate_topology: RecreateTopologyByViewChangeIndex,
) -> RecreateTopologyByViewChangeIndex {
// NOTE: topology need to be updated up to block's view_change_index
current_topology.rotate_all_n(block.payload().header.view_change_index);
let current_topology = recreate_topology(block.payload().header.view_change_index);

let block = ValidBlock::validate(block.clone(), &current_topology, wsv)
.expect("Kura blocks should be valid")
Expand All @@ -248,7 +248,10 @@ impl SumeragiHandle {
Blocks loaded from kura assumed to be valid",
);

Topology::recreate_topology(block.as_ref(), 0, wsv.peers().cloned().collect())
let peers = wsv.peers().cloned().collect();
Box::new(move |view_change_index| {
Topology::recreate_topology(block.as_ref(), view_change_index, peers)
})
}

/// Start [`Sumeragi`] actor and return handle to it.
Expand Down Expand Up @@ -279,33 +282,40 @@ impl SumeragiHandle {
)
});

let mut current_topology = match wsv.height() {
let mut recreate_topology: RecreateTopologyByViewChangeIndex = match wsv.height() {
0 => {
assert!(!configuration.trusted_peers.peers.is_empty());
Topology::new(configuration.trusted_peers.peers.clone())
let peers = configuration.trusted_peers.peers.clone();
Box::new(|_view_change_index| Topology::new(peers))
}
height => {
let block_ref = kura.get_block_by_height(height).expect(
"Sumeragi could not load block that was reported as present. \
Please check that the block storage was not disconnected.",
);
Topology::recreate_topology(&block_ref, 0, wsv.peers().cloned().collect())
let peers = wsv.peers().cloned().collect();
Box::new(move |view_change_index| {
Topology::recreate_topology(&block_ref, view_change_index, peers)
})
}
};

let block_iter_except_last =
(&mut blocks_iter).take(block_count.saturating_sub(skip_block_count + 1));
for block in block_iter_except_last {
current_topology = Self::replay_block(&block, &mut wsv, current_topology);
recreate_topology = Self::replay_block(&block, &mut wsv, recreate_topology);
}

// finalized_wsv is one block behind
let finalized_wsv = wsv.clone();

if let Some(block) = blocks_iter.next() {
current_topology = Self::replay_block(&block, &mut wsv, current_topology);
recreate_topology = Self::replay_block(&block, &mut wsv, recreate_topology);
}

// There is no more blocks so we pick 0 as view change index
let current_topology = recreate_topology(0);

info!("Sumeragi has finished loading blocks and setting up the WSV");

let (public_wsv_sender, public_wsv_receiver) = watch::channel(wsv.clone());
Expand Down Expand Up @@ -372,6 +382,9 @@ impl SumeragiHandle {
}
}

/// Closure to get topology recreated at certain view change index
type RecreateTopologyByViewChangeIndex = Box<dyn FnOnce(u64) -> Topology>;

/// The interval at which sumeragi checks if there are tx in the
/// `queue`. And will create a block if is leader and the voting is
/// not already in progress.
Expand Down

0 comments on commit eace2aa

Please sign in to comment.