Skip to content

Commit

Permalink
fix: Leader as transaction ordering service
Browse files Browse the repository at this point in the history
Signed-off-by: Sam H. Smith <[email protected]>
  • Loading branch information
SamHSmith committed Sep 29, 2024
1 parent 002803f commit fda1bd5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 60 deletions.
5 changes: 3 additions & 2 deletions crates/iroha_core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ mod pending {
Self(Pending { transactions })
}

fn make_header(
/// Create a block header
pub fn make_header(
prev_block: Option<&SignedBlock>,
view_change_index: usize,
transactions: &[CommittedTransaction],
Expand Down Expand Up @@ -270,7 +271,7 @@ mod chained {
pub struct Chained(pub(super) BlockPayload);

impl BlockBuilder<Chained> {
/// Sign this block and get [`SignedBlock`].
/// Sign this block as Leader and get [`SignedBlock`].
pub fn sign(self, private_key: &PrivateKey) -> WithEvents<ValidBlock> {
WithEvents::new(ValidBlock(self.0 .0.sign(private_key)))
}
Expand Down
111 changes: 62 additions & 49 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,9 @@ impl Sumeragi {
.control_message_receiver
.try_recv()
.map_err(|recv_error| {
assert!(
recv_error != mpsc::TryRecvError::Disconnected,
"INTERNAL ERROR: Sumeragi control message pump disconnected"
)
if recv_error == mpsc::TryRecvError::Disconnected {
error!("INTERNAL ERROR: Sumeragi control message pump disconnected");
}
})
{
should_sleep = false;
Expand Down Expand Up @@ -171,10 +170,9 @@ impl Sumeragi {
.message_receiver
.try_recv()
.map_err(|recv_error| {
assert!(
recv_error != mpsc::TryRecvError::Disconnected,
"INTERNAL ERROR: Sumeragi message pump disconnected"
)
if recv_error == mpsc::TryRecvError::Disconnected {
error!("INTERNAL ERROR: Sumeragi message pump disconnected");
}
})
.ok()?;

Expand Down Expand Up @@ -220,8 +218,7 @@ impl Sumeragi {
match self.message_receiver.try_recv() {
Ok(message) => {
let block = match message {
BlockMessage::BlockCreated(BlockCreated { block })
| BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block,
BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block,
msg => {
trace!(?msg, "Not handling the message, waiting for genesis...");
continue;
Expand Down Expand Up @@ -312,14 +309,15 @@ impl Sumeragi {
// NOTE: By this time genesis block is executed and list of trusted peers is updated
self.topology = Topology::new(state_block.world.trusted_peers_ids.clone());

let msg = BlockCreated::from(&genesis);
let genesis = genesis
.commit(&self.topology)
.unpack(|e| self.send_event(e))
.expect("Genesis invalid");

self.broadcast_packet(msg);
self.commit_block(genesis, state_block);
self.commit_block(genesis.clone(), state_block);
self.broadcast_packet(BlockSyncUpdate {
block: genesis.into(),
});
}

fn commit_block(&mut self, block: CommittedBlock, state_block: StateBlock<'_>) {
Expand Down Expand Up @@ -595,7 +593,6 @@ impl Sumeragi {
block=%block_created.block.hash(),
"Block received"
);

if let Some(mut valid_block) = self.validate_block(
state,
&self.topology,
Expand All @@ -605,7 +602,6 @@ impl Sumeragi {
) {
// NOTE: Up until this point it was unknown which block is expected to be received,
// therefore all the signatures (of any hash) were collected and will now be pruned

for signature in core::mem::take(voting_signatures) {
if let Err(error) =
valid_block.block.add_signature(signature, &self.topology)
Expand Down Expand Up @@ -827,6 +823,7 @@ impl Sumeragi {
fn try_create_block<'state>(
&mut self,
state: &'state State,
genesis_account: &AccountId,
voting_block: &mut Option<VotingBlock<'state>>,
) {
assert_eq!(self.role(), Role::Leader);
Expand All @@ -841,57 +838,74 @@ impl Sumeragi {
.expect("INTERNAL BUG: transactions in block exceed usize::MAX");
let block_time = state.world.view().parameters.sumeragi.block_time();
let tx_cache_full = self.transaction_cache.len() >= max_transactions.get();
let deadline_reached = self.round_start_time.elapsed() > block_time;
let deadline_reached =
self.topology.view_change_index() > 0 || self.round_start_time.elapsed() > block_time;
let tx_cache_non_empty = !self.transaction_cache.is_empty();

if tx_cache_full || (deadline_reached && tx_cache_non_empty) {
let transactions = self
.transaction_cache
.iter()
.map(|tx| tx.deref().clone())
.map(|tx| CommittedTransaction {
value: tx.deref().clone().into(),
error: None,
})
.collect::<Vec<_>>();

let mut state_block = state.block();
let create_block_start_time = Instant::now();
let new_block = BlockBuilder::new(transactions)
.chain(self.topology.view_change_index(), &mut state_block)
.sign(self.key_pair.private_key())
.unpack(|e| self.send_event(e));

let created_in = create_block_start_time.elapsed();
let pipeline_time = state.world.view().parameters().sumeragi.pipeline_time();
if created_in > pipeline_time / 2 {
warn!(
role=%self.role(),
peer_id=%self.peer_id,
"Creating block takes too much time. \
This might prevent consensus from operating. \
Consider increasing `commit_time` or decreasing `max_transactions_in_block`"
);
let pre_signed_block = BlockPayload {
header: BlockBuilder::make_header(
state.view().latest_block().as_deref(),
self.topology.view_change_index(),
&transactions,
state
.view()
.world
.parameters()
.sumeragi
.consensus_estimation(),
),
transactions,
}
.sign(self.key_pair.private_key());

let block_created_msg = BlockCreated {
block: pre_signed_block,
};
if self.topology.is_consensus_required().is_some() {
info!(
peer_id=%self.peer_id,
block=%new_block.as_ref().hash(),
view_change_index=%self.topology.view_change_index(),
txns=%new_block.as_ref().transactions().len(),
created_in_ms=%created_in.as_millis(),
"Block created"
);
self.broadcast_packet(block_created_msg.clone());
}

let msg = BlockCreated::from(&new_block);
*voting_block = Some(VotingBlock::new(new_block, state_block));
self.broadcast_packet(msg);
info!(
peer_id=%self.peer_id,
view_change_index=%self.topology.view_change_index(),
block_hash=%block_created_msg.block.hash(),
txns=%block_created_msg.block.transactions().len(),
"Block created"
);

let new_voting_block = self
.validate_block(
state,
&self.topology,
genesis_account,
block_created_msg,
voting_block,
)
.expect("We just created this block ourselves, it has to be valid.");

if self.topology.is_consensus_required().is_some() {
*voting_block = Some(new_voting_block);
} else {
let committed_block = new_block
let committed_block = new_voting_block
.block
.commit(&self.topology)
.unpack(|e| self.send_event(e))
.expect("INTERNAL BUG: Leader failed to commit created block");

let msg = BlockCommitted::from(&committed_block);
self.broadcast_packet(msg);
self.commit_block(committed_block, state_block);
self.commit_block(committed_block, new_voting_block.state_block);
*voting_block = None;
}
}
}
Expand Down Expand Up @@ -1009,7 +1023,6 @@ pub(crate) fn run(
let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter();

let state_view = state.view();

sumeragi
.transaction_cache
// Checking if transactions are in the blockchain is costly
Expand Down Expand Up @@ -1167,7 +1180,7 @@ pub(crate) fn run(
.set(sumeragi.topology.view_change_index() as u64);

if sumeragi.role() == Role::Leader && voting_block.is_none() {
sumeragi.try_create_block(&state, &mut voting_block);
sumeragi.try_create_block(&state, &genesis_account, &mut voting_block);
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions crates/iroha_core/src/sumeragi/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ pub struct BlockCreated {
pub block: SignedBlock,
}

impl From<&ValidBlock> for BlockCreated {
fn from(block: &ValidBlock) -> Self {
Self {
// TODO: Redundant clone
block: block.clone().into(),
}
}
}

/// `BlockSigned` message structure.
#[derive(Debug, Clone, Decode, Encode)]
pub struct BlockSigned {
Expand Down

0 comments on commit fda1bd5

Please sign in to comment.