Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Leader as transaction ordering service #4967

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 60 additions & 42 deletions crates/iroha_core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,31 @@ mod pending {
Self(Pending { transactions })
}

/// Create new BlockPayload
pub fn new_unverified(
prev_block: Option<&SignedBlock>,
view_change_index: usize,
transactions_a: Vec<AcceptedTransaction>,
consensus_estimation: Duration,
) -> BlockPayload {
let transactions = transactions_a
.into_iter()
.map(|tx| CommittedTransaction {
value: tx.clone().into(),
error: None,
})
.collect::<Vec<_>>();
BlockPayload {
header: Self::make_header(
prev_block,
view_change_index,
&transactions,
consensus_estimation,
),
transactions,
}
}

fn make_header(
prev_block: Option<&SignedBlock>,
view_change_index: usize,
Expand Down Expand Up @@ -264,7 +289,7 @@ mod chained {
pub struct Chained(pub(super) BlockPayload);

impl BlockBuilder<Chained> {
/// Sign this block and get [`SignedBlock`].
/// Sign this block as Leader and get [`SignedBlock`].
pub fn sign(self, private_key: &PrivateKey) -> WithEvents<ValidBlock> {
WithEvents::new(ValidBlock(self.0 .0.sign(private_key)))
}
Expand Down Expand Up @@ -415,7 +440,8 @@ mod valid {
Ok(())
}

/// Validate a block against the current state of the world.
/// Validate a block against the current state of the world. Individual transaction
/// errors will be updated.
///
/// # Errors
///
Expand All @@ -430,7 +456,7 @@ mod valid {
/// - Error during validation of individual transactions
/// - Transaction in the genesis block is not signed by the genesis public key
pub fn validate(
block: SignedBlock,
mut block: SignedBlock,
topology: &Topology,
expected_chain_id: &ChainId,
genesis_account: &AccountId,
Expand All @@ -442,9 +468,12 @@ mod valid {
return WithEvents::new(Err((block, error)));
}

if let Err(error) =
Self::validate_transactions(&block, expected_chain_id, genesis_account, state_block)
{
if let Err(error) = Self::validate_transactions(
&mut block,
expected_chain_id,
genesis_account,
state_block,
) {
return WithEvents::new(Err((block, error.into())));
}

Expand All @@ -456,7 +485,7 @@ mod valid {
/// * If block header is valid, `voting_block` will be released,
/// and transactions will be validated with write state
pub fn validate_keep_voting_block<'state>(
block: SignedBlock,
mut block: SignedBlock,
topology: &Topology,
expected_chain_id: &ChainId,
genesis_account: &AccountId,
Expand All @@ -480,7 +509,7 @@ mod valid {
};

if let Err(error) = Self::validate_transactions(
&block,
&mut block,
expected_chain_id,
genesis_account,
&mut state_block,
Expand Down Expand Up @@ -577,7 +606,7 @@ mod valid {
}

fn validate_transactions(
block: &SignedBlock,
block: &mut SignedBlock,
expected_chain_id: &ChainId,
genesis_account: &AccountId,
state_block: &mut StateBlock<'_>,
Expand All @@ -589,40 +618,29 @@ mod valid {
(params.sumeragi().max_clock_drift(), params.transaction)
};

block
.transactions()
// TODO: Unnecessary clone?
.cloned()
.try_for_each(|CommittedTransaction { value, error }| {
let tx = if is_genesis {
AcceptedTransaction::accept_genesis(
value,
expected_chain_id,
max_clock_drift,
genesis_account,
)
} else {
AcceptedTransaction::accept(
value,
expected_chain_id,
max_clock_drift,
tx_limits,
)
}?;

if error.is_some() {
match state_block.validate(tx) {
Err(rejected_transaction) => Ok(rejected_transaction),
Ok(_) => Err(TransactionValidationError::RejectedIsValid),
}?;
} else {
state_block
.validate(tx)
.map_err(|(_tx, error)| TransactionValidationError::NotValid(error))?;
}
for CommittedTransaction { value, error } in block.transactions_mut() {
let tx = if is_genesis {
AcceptedTransaction::accept_genesis(
value.clone(),
expected_chain_id,
max_clock_drift,
genesis_account,
)
} else {
AcceptedTransaction::accept(
value.clone(),
expected_chain_id,
max_clock_drift,
tx_limits,
)
}?;

Ok(())
})
*error = match state_block.validate(tx) {
Ok(_) => None,
Err((_tx, error)) => Some(Box::new(error)),
};
}
Ok(())
}

/// Add additional signature for [`Self`]
Expand Down
84 changes: 48 additions & 36 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ impl Sumeragi {
block=%block_created.block.hash(),
"Block received"
);

if let Some(mut valid_block) = self.validate_block(
state,
&self.topology,
Expand All @@ -605,7 +604,6 @@ impl Sumeragi {
) {
// NOTE: Up until this point it was unknown which block is expected to be received,
// therefore all the signatures (of any hash) were collected and will now be pruned

for signature in core::mem::take(voting_signatures) {
if let Err(error) =
valid_block.block.add_signature(signature, &self.topology)
Expand Down Expand Up @@ -827,6 +825,7 @@ impl Sumeragi {
fn try_create_block<'state>(
&mut self,
state: &'state State,
genesis_account: &AccountId,
voting_block: &mut Option<VotingBlock<'state>>,
) {
assert_eq!(self.role(), Role::Leader);
Expand All @@ -839,7 +838,11 @@ impl Sumeragi {
.max_transactions
.try_into()
.expect("INTERNAL BUG: transactions in block exceed usize::MAX");
let block_time = state.world.view().parameters.sumeragi.block_time();
let block_time = if self.topology.view_change_index() > 0 {
Duration::from_secs(0)
} else {
state.world.view().parameters.sumeragi.block_time()
};
let tx_cache_full = self.transaction_cache.len() >= max_transactions.get();
let deadline_reached = self.round_start_time.elapsed() > block_time;
let tx_cache_non_empty = !self.transaction_cache.is_empty();
Expand All @@ -851,47 +854,57 @@ impl Sumeragi {
.map(|tx| tx.deref().clone())
.collect::<Vec<_>>();

let mut state_block = state.block();
let create_block_start_time = Instant::now();
let new_block = BlockBuilder::new(transactions)
.chain(self.topology.view_change_index(), &mut state_block)
.sign(self.key_pair.private_key())
.unpack(|e| self.send_event(e));

let created_in = create_block_start_time.elapsed();
let pipeline_time = state.world.view().parameters().sumeragi.pipeline_time();
if created_in > pipeline_time / 2 {
warn!(
role=%self.role(),
peer_id=%self.peer_id,
"Creating block takes too much time. \
This might prevent consensus from operating. \
Consider increasing `commit_time` or decreasing `max_transactions_in_block`"
);
}
let pre_signed_block = BlockBuilder::new_unverified(
state.view().latest_block().as_deref(),
self.topology.view_change_index(),
transactions,
state
.view()
.world
.parameters()
.sumeragi
.consensus_estimation(),
)
.sign(self.key_pair.private_key());

let block_created_msg = BlockCreated {
block: pre_signed_block,
};
if self.topology.is_consensus_required().is_some() {
info!(
peer_id=%self.peer_id,
block=%new_block.as_ref().hash(),
view_change_index=%self.topology.view_change_index(),
txns=%new_block.as_ref().transactions().len(),
created_in_ms=%created_in.as_millis(),
"Block created"
);
self.broadcast_packet(block_created_msg.clone());
}

let msg = BlockCreated::from(&new_block);
*voting_block = Some(VotingBlock::new(new_block, state_block));
self.broadcast_packet(msg);
info!(
peer_id=%self.peer_id,
view_change_index=%self.topology.view_change_index(),
block_hash=%block_created_msg.block.hash(),
txns=%block_created_msg.block.transactions().len(),
"Block created"
);

let new_voting_block = self
.validate_block(
state,
&self.topology,
genesis_account,
block_created_msg,
voting_block,
)
.expect("We just created this block ourselves, it has to be valid.");
SamHSmith marked this conversation as resolved.
Show resolved Hide resolved

if self.topology.is_consensus_required().is_some() {
*voting_block = Some(new_voting_block);
} else {
let committed_block = new_block
let committed_block = new_voting_block
.block
.commit(&self.topology)
.unpack(|e| self.send_event(e))
.expect("INTERNAL BUG: Leader failed to commit created block");

let msg = BlockCommitted::from(&committed_block);
self.broadcast_packet(msg);
self.commit_block(committed_block, state_block);
self.commit_block(committed_block, new_voting_block.state_block);
*voting_block = None;
}
}
}
Expand Down Expand Up @@ -1009,7 +1022,6 @@ pub(crate) fn run(
let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter();

let state_view = state.view();

sumeragi
.transaction_cache
// Checking if transactions are in the blockchain is costly
Expand Down Expand Up @@ -1167,7 +1179,7 @@ pub(crate) fn run(
.set(sumeragi.topology.view_change_index() as u64);

if sumeragi.role() == Role::Leader && voting_block.is_none() {
sumeragi.try_create_block(&state, &mut voting_block);
sumeragi.try_create_block(&state, &genesis_account, &mut voting_block);
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/iroha_data_model/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ impl SignedBlock {
block.payload.transactions.iter()
}

/// Block transactions with mutable access
#[inline]
pub fn transactions_mut(&mut self) -> &mut [CommittedTransaction] {
let SignedBlock::V1(block) = self;
block.payload.transactions.as_mut_slice()
}

/// Signatures of peers which approved this block.
#[inline]
pub fn signatures(
Expand Down