diff --git a/core/src/queue.rs b/core/src/queue.rs index eb6aa3ec32f..48ed31ad9bc 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -1,6 +1,6 @@ //! Module with queue actor use core::time::Duration; -use std::num::NonZeroUsize; +use std::{num::NonZeroUsize, ops::Deref, sync::Arc}; use crossbeam_queue::ArrayQueue; use dashmap::{mapref::entry::Entry, DashMap}; @@ -82,6 +82,27 @@ pub struct Failure { pub err: Error, } +/// Will remove transaction from the queue on drop. +/// See [`Queue::remove_stale_transaction`] for details. +pub struct TransactionGuard { + tx: AcceptedTransaction, + queue: Arc, +} + +impl Deref for TransactionGuard { + type Target = AcceptedTransaction; + + fn deref(&self) -> &Self::Target { + &self.tx + } +} + +impl Drop for TransactionGuard { + fn drop(&mut self) { + self.queue.remove_stale_transaction(&self.tx); + } +} + impl Queue { /// Makes queue from configuration pub fn from_config( @@ -238,11 +259,10 @@ impl Queue { /// Pop single transaction from the queue. Removes all transactions that fail the `tx_check`. fn pop_from_queue( - &self, - seen: &mut Vec>, + self: &Arc, state_view: &StateView, expired_transactions: &mut Vec, - ) -> Option { + ) -> Option { loop { let hash = self.tx_hashes.pop()?; @@ -267,8 +287,11 @@ impl Queue { continue; } - seen.push(hash); - return Some(tx.clone()); + let guard = TransactionGuard { + tx: tx.clone(), + queue: Arc::clone(self), + }; + return Some(guard); } } @@ -282,10 +305,10 @@ impl Queue { /// BEWARE: Shouldn't be called in parallel with itself. #[cfg(test)] fn collect_transactions_for_block( - &self, + self: &Arc, state_view: &StateView, max_txs_in_block: NonZeroUsize, - ) -> Vec { + ) -> Vec { let mut transactions = Vec::with_capacity(max_txs_in_block.get()); self.get_transactions_for_block(state_view, max_txs_in_block, &mut transactions); transactions @@ -295,21 +318,19 @@ impl Queue { /// /// BEWARE: Shouldn't be called in parallel with itself. pub fn get_transactions_for_block( - &self, + self: &Arc, state_view: &StateView, max_txs_in_block: NonZeroUsize, - transactions: &mut Vec, + transactions: &mut Vec, ) { if transactions.len() >= max_txs_in_block.get() { return; } - let mut seen_queue = Vec::new(); let mut expired_transactions = Vec::new(); - let txs_from_queue = core::iter::from_fn(|| { - self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions) - }); + let txs_from_queue = + core::iter::from_fn(|| self.pop_from_queue(state_view, &mut expired_transactions)); let transactions_hashes: IndexSet> = transactions.iter().map(|tx| tx.as_ref().hash()).collect(); @@ -318,11 +339,6 @@ impl Queue { .take(max_txs_in_block.get() - transactions.len()); transactions.extend(txs); - seen_queue - .into_iter() - .try_for_each(|hash| self.tx_hashes.push(hash)) - .expect("Exceeded the number of transactions pending"); - expired_transactions .into_iter() .map(|tx| TransactionEvent { @@ -335,6 +351,30 @@ impl Queue { }); } + /// Overview: + /// 1. Transaction is added to queue using [`Queue::push`] method. + /// 2. Transaction is moved to [`Sumeragi::transaction_cache`] using [`Queue::pop_from_queue`] method. + /// Note that transaction is removed from [`Queue::tx_hashes`], but kept in [`Queue::accepted_tx`], + /// this is needed to return `Error::IsInQueue` when adding same transaction twice. + /// 3. When transaction is removed from [`Sumeragi::transaction_cache`] + /// (either because it was expired, or because transaction is commited to blockchain), + /// we should remove transaction from [`Queue::accepted_tx`]. + fn remove_stale_transaction(&self, tx: &AcceptedTransaction) { + let removed = self.accepted_txs.remove(&tx.as_ref().hash()); + if removed.is_some() { + self.decrease_per_user_tx_count(tx.as_ref().authority()); + + if self.is_expired(tx) { + let event = TransactionEvent { + hash: tx.as_ref().hash(), + block_height: None, + status: TransactionStatus::Expired, + }; + let _ = self.events_sender.send(event.into()); + } + } + } + /// Check that the user adhered to the maximum transaction per user limit and increment their transaction count. fn check_and_increase_per_user_tx_count(&self, account_id: &AccountId) -> Result<(), Error> { match self.txs_per_user.entry(account_id.clone()) { @@ -518,6 +558,7 @@ pub mod tests { }, &time_source, ); + let queue = Arc::new(queue); for _ in 0..5 { queue .push(accepted_tx_by_someone(&time_source), &state_view) @@ -562,6 +603,7 @@ pub mod tests { let (_time_handle, time_source) = TimeSource::new_mock(Duration::default()); let tx = accepted_tx_by_someone(&time_source); let queue = Queue::test(config_factory(), &time_source); + let queue = Arc::new(queue); queue.push(tx.clone(), &state.view()).unwrap(); let mut state_block = state.block(); state_block @@ -594,6 +636,7 @@ pub mod tests { }, &time_source, ); + let queue = Arc::new(queue); for _ in 0..(max_txs_in_block.get() - 1) { queue .push(accepted_tx_by_someone(&time_source), &state_view) @@ -624,37 +667,6 @@ pub mod tests { ); } - // Queue should only drop transactions which are already committed or ttl expired. - // Others should stay in the queue until that moment. - #[test] - async fn transactions_available_after_pop() { - let max_txs_in_block = nonzero!(2_usize); - let kura = Kura::blank_kura_for_testing(); - let query_handle = LiveQueryStore::test().start(); - let state = Arc::new(State::new(world_with_test_domains(), kura, query_handle)); - let state_view = state.view(); - - let (_time_handle, time_source) = TimeSource::new_mock(Duration::default()); - - let queue = Queue::test(config_factory(), &time_source); - queue - .push(accepted_tx_by_someone(&time_source), &state_view) - .expect("Failed to push tx into queue"); - - let a = queue - .collect_transactions_for_block(&state_view, max_txs_in_block) - .into_iter() - .map(|tx| tx.as_ref().hash()) - .collect::>(); - let b = queue - .collect_transactions_for_block(&state_view, max_txs_in_block) - .into_iter() - .map(|tx| tx.as_ref().hash()) - .collect::>(); - assert_eq!(a.len(), 1); - assert_eq!(a, b); - } - #[test] async fn custom_expired_transaction_is_rejected() { const TTL_MS: u64 = 200; @@ -703,6 +715,7 @@ pub mod tests { let mut txs = Vec::new(); time_handle.advance(Duration::from_millis(TTL_MS + 1)); + let queue = Arc::new(queue); queue.get_transactions_for_block(&state_view, max_txs_in_block, &mut txs); let expired_tx_event = event_receiver.recv().await.unwrap(); assert!(txs.is_empty()); @@ -855,6 +868,7 @@ pub mod tests { }, &time_source, ); + let queue = Arc::new(queue); // First push by Alice should be fine queue diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 78bc3e09d67..1e134d28b5f 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -1,5 +1,5 @@ //! The main event loop that powers sumeragi. -use std::{collections::BTreeSet, sync::mpsc}; +use std::{collections::BTreeSet, ops::Deref, sync::mpsc}; use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId}; @@ -7,7 +7,7 @@ use iroha_p2p::UpdateTopology; use tracing::{span, Level}; use super::{view_change::ProofBuilder, *}; -use crate::{block::*, sumeragi::tracing::instrument}; +use crate::{block::*, queue::TransactionGuard, sumeragi::tracing::instrument}; /// `Sumeragi` is the implementation of the consensus. pub struct Sumeragi { @@ -38,7 +38,7 @@ pub struct Sumeragi { /// other subsystems where we can. This way the performance of /// sumeragi is more dependent on the code that is internal to the /// subsystem. - pub transaction_cache: Vec, + pub transaction_cache: Vec, /// Metrics for reporting number of view changes in current round pub view_changes_metric: iroha_telemetry::metrics::ViewChangesGauge, @@ -842,7 +842,11 @@ impl Sumeragi { let tx_cache_non_empty = !self.transaction_cache.is_empty(); if tx_cache_full || (deadline_reached && tx_cache_non_empty) { - let transactions = self.transaction_cache.clone(); + let transactions = self + .transaction_cache + .iter() + .map(|tx| tx.deref().clone()) + .collect::>(); let mut state_block = state.block(); let create_block_start_time = Instant::now(); diff --git a/defaults/executor.wasm b/defaults/executor.wasm index b1ff7e441d1..6933f17e8ad 100644 Binary files a/defaults/executor.wasm and b/defaults/executor.wasm differ