Skip to content

Commit

Permalink
[fix]: Smarter transaction expire logic
Browse files Browse the repository at this point in the history
Signed-off-by: Sam H. Smith <[email protected]>
  • Loading branch information
SamHSmith committed Jul 4, 2024
1 parent d1d9c95 commit d2781c2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
24 changes: 13 additions & 11 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ impl Queue {
}
}

fn is_pending(&self, tx: &AcceptedTransaction, state_view: &StateView) -> bool {
!self.is_expired(tx) && !tx.is_in_blockchain(state_view)
fn is_pending(&self, tx: &AcceptedTransaction, state_view: &StateView, time_padding: Duration) -> bool {
!self.is_expired(tx, time_padding) && !tx.is_in_blockchain(state_view)
}

/// Checks if the transaction is waiting longer than its TTL or than the TTL from [`Config`].
pub fn is_expired(&self, tx: &AcceptedTransaction) -> bool {
pub fn is_expired(&self, tx: &AcceptedTransaction, time_padding: Duration) -> bool {
let tx_creation_time = tx.as_ref().creation_time();

let time_limit = tx.as_ref().time_to_live().map_or_else(
Expand All @@ -120,7 +120,7 @@ impl Queue {
);

let curr_time = self.time_source.get_unix_time();
curr_time.saturating_sub(tx_creation_time) > time_limit
curr_time.saturating_sub(tx_creation_time) + time_padding > time_limit
}

/// If `true`, this transaction is regarded to have been tampered to have a future timestamp.
Expand All @@ -136,7 +136,7 @@ impl Queue {
state_view: &'state StateView,
) -> impl Iterator<Item = AcceptedTransaction> + 'state {
self.accepted_txs.iter().filter_map(|tx| {
if self.is_pending(tx.value(), state_view) {
if self.is_pending(tx.value(), state_view, Duration::from_secs(0)) {
return Some(tx.value().clone());
}

Expand All @@ -152,18 +152,18 @@ impl Queue {
) -> Vec<AcceptedTransaction> {
self.accepted_txs
.iter()
.filter(|e| self.is_pending(e.value(), state_view))
.filter(|e| self.is_pending(e.value(), state_view, Duration::from_secs(0)))
.map(|e| e.value().clone())
.choose_multiple(
&mut rand::thread_rng(),
n.try_into().expect("u32 should always fit in usize"),
)
}

fn check_tx(&self, tx: &AcceptedTransaction, state_view: &StateView) -> Result<(), Error> {
fn check_tx(&self, tx: &AcceptedTransaction, state_view: &StateView, time_padding: Duration) -> Result<(), Error> {
if self.is_in_future(tx) {
Err(Error::InFuture)
} else if self.is_expired(tx) {
} else if self.is_expired(tx, time_padding) {
Err(Error::Expired)
} else if tx.is_in_blockchain(state_view) {
Err(Error::InBlockchain)
Expand All @@ -178,7 +178,7 @@ impl Queue {
/// See [`enum@Error`]
pub fn push(&self, tx: AcceptedTransaction, state_view: &StateView) -> Result<(), Failure> {
trace!(?tx, "Pushing to the queue");
if let Err(err) = self.check_tx(&tx, state_view) {
if let Err(err) = self.check_tx(&tx, state_view, Duration::from_secs(0)) {
return Err(Failure { tx, err });
}

Expand Down Expand Up @@ -242,6 +242,7 @@ impl Queue {
seen: &mut Vec<HashOf<SignedTransaction>>,
state_view: &StateView,
expired_transactions: &mut Vec<AcceptedTransaction>,
time_padding: Duration,
) -> Option<AcceptedTransaction> {
loop {
let hash = self.tx_hashes.pop()?;
Expand All @@ -258,7 +259,7 @@ impl Queue {
};

let tx = entry.get();
if let Err(e) = self.check_tx(tx, state_view) {
if let Err(e) = self.check_tx(tx, state_view, time_padding) {
let (_, tx) = entry.remove_entry();
self.decrease_per_user_tx_count(tx.as_ref().authority());
if let Error::Expired = e {
Expand Down Expand Up @@ -299,6 +300,7 @@ impl Queue {
state_view: &StateView,
max_txs_in_block: NonZeroUsize,
transactions: &mut Vec<AcceptedTransaction>,
time_padding: Duration,
) {
if transactions.len() >= max_txs_in_block.get() {
return;
Expand All @@ -308,7 +310,7 @@ impl Queue {
let mut expired_transactions = Vec::new();

let txs_from_queue = core::iter::from_fn(|| {
self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions)
self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions, time_padding)
});

let transactions_hashes: IndexSet<HashOf<SignedTransaction>> =
Expand Down
5 changes: 3 additions & 2 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl Sumeragi {

fn cache_transaction(&mut self, state_block: &StateBlock<'_>) {
self.transaction_cache.retain(|tx| {
!state_block.has_transaction(tx.as_ref().hash()) && !self.queue.is_expired(tx)
!state_block.has_transaction(tx.as_ref().hash()) && !self.queue.is_expired(tx, self.block_time + self.commit_time)
});
}

Expand Down Expand Up @@ -1026,7 +1026,7 @@ pub(crate) fn run(
.transaction_cache
// Checking if transactions are in the blockchain is costly
.retain(|tx| {
let expired = sumeragi.queue.is_expired(tx);
let expired = sumeragi.queue.is_expired(tx, sumeragi.block_time + sumeragi.commit_time);
if expired {
debug!(?tx, "Transaction expired")
}
Expand All @@ -1044,6 +1044,7 @@ pub(crate) fn run(
.try_into()
.expect("INTERNAL BUG: transactions in block exceed usize::MAX"),
&mut sumeragi.transaction_cache,
sumeragi.block_time + sumeragi.commit_time,
);

let view_change_index = sumeragi.prune_view_change_proofs_and_calculate_current_index(
Expand Down

0 comments on commit d2781c2

Please sign in to comment.