From b3142d0f8acdd8f3acad96e06c7967e05541c891 Mon Sep 17 00:00:00 2001 From: Nathan F Yospe Date: Tue, 14 May 2024 11:48:58 -0400 Subject: [PATCH 1/3] Incorporate batch submitting in submit-transactions.rs --- sequencer/src/bin/submit-transactions.rs | 67 +++++++++++++++++++----- 1 file changed, 53 insertions(+), 14 deletions(-) diff --git a/sequencer/src/bin/submit-transactions.rs b/sequencer/src/bin/submit-transactions.rs index 8c76d457fa..e5aa0de2d6 100644 --- a/sequencer/src/bin/submit-transactions.rs +++ b/sequencer/src/bin/submit-transactions.rs @@ -41,6 +41,19 @@ struct Options { #[clap(long, name = "MAX_SIZE", default_value = "1kb", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_SIZE")] max_size: u64, + /// Minimum size of batch of transactions to submit. + /// + /// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches. + /// This is by selecting a random size S on each iteration I since last batch, and collecting a batch whenever that S <= I. + #[clap(long, name = "MIN_BATCH_SIZE", default_value = "1", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MIN_BATCH_SIZE")] + min_batch_size: u64, + + /// Maximum size of batch of transactions to submit. + /// + /// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches. + #[clap(long, name = "MAX_BATCH_SIZE", default_value = "20", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_BATCH_SIZE")] + max_batch_size: u64, + /// Minimum namespace ID to submit to. #[clap( long, @@ -241,28 +254,54 @@ async fn submit_transactions( // mean `opt.delay`, or parameter `\lambda = 1 / opt.delay`. let delay_distr = rand_distr::Exp::::new(1f64 / opt.delay.as_millis() as f64).unwrap(); + let mut txns = Vec::new(); + let mut hashes = Vec::new(); loop { let tx = random_transaction(&opt, &mut rng); let hash = tx.commit(); tracing::info!( - "submitting transaction {hash} for namespace {} of size {}", + "Adding transaction {hash} for namespace {} of size {}", tx.namespace(), tx.payload().len() ); - if let Err(err) = client - .post::<()>("submit") - .body_binary(&tx) - .unwrap() - .send() - .await - { - tracing::error!("failed to submit transaction: {err}"); + txns.push(tx); + hashes.push(hash); + + let randomized_batch_size = rng.gen_range(opt.min_batch_size..=opt.max_batch_size); + let txns_batch_count = txns.len() as u64; + if randomized_batch_size <= txns_batch_count { + if let Err(err) = if txns_batch_count == 1 { + // occasionally test the 'submit' endpoint, just for coverage + client + .post::<()>("submit") + .body_binary(&txns[0]) + .unwrap() + .send() + .await + } else { + client + .post::<()>("batch") + .body_binary(&txns) + .unwrap() + .send() + .await + } { + tracing::error!("failed to submit batch of {txns_batch_count} transactions: {err}"); + } else { + let submitted_at = Instant::now(); + for hash in hashes.iter() { + sender + .send(SubmittedTransaction { + hash: *hash, + submitted_at, + }) + .await + .ok(); + } + } + txns.clear(); + hashes.clear(); } - let submitted_at = Instant::now(); - sender - .send(SubmittedTransaction { hash, submitted_at }) - .await - .ok(); let delay = Duration::from_millis(delay_distr.sample(&mut rng) as u64); tracing::info!("sleeping for {delay:?}"); From 5c24d9cd04a171e0b9c7ab544cb3454fe69fdfda Mon Sep 17 00:00:00 2001 From: Nathan F Yospe Date: Tue, 14 May 2024 14:00:19 -0400 Subject: [PATCH 2/3] Also log successful submits --- sequencer/src/bin/submit-transactions.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sequencer/src/bin/submit-transactions.rs b/sequencer/src/bin/submit-transactions.rs index e5aa0de2d6..19844b92d9 100644 --- a/sequencer/src/bin/submit-transactions.rs +++ b/sequencer/src/bin/submit-transactions.rs @@ -288,6 +288,7 @@ async fn submit_transactions( } { tracing::error!("failed to submit batch of {txns_batch_count} transactions: {err}"); } else { + tracing::info!("submitted batch of {txns_batch_count} transactions"); let submitted_at = Instant::now(); for hash in hashes.iter() { sender From 4136954251c57614db13f90112ec1a28746f12e9 Mon Sep 17 00:00:00 2001 From: Nathan F Yospe Date: Tue, 14 May 2024 14:44:16 -0400 Subject: [PATCH 3/3] Only batch for private mempool --- sequencer/src/bin/submit-transactions.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sequencer/src/bin/submit-transactions.rs b/sequencer/src/bin/submit-transactions.rs index 19844b92d9..2dd1e38b08 100644 --- a/sequencer/src/bin/submit-transactions.rs +++ b/sequencer/src/bin/submit-transactions.rs @@ -132,6 +132,9 @@ impl Options { .clone() .unwrap_or_else(|| self.url.join("submit").unwrap()) } + fn use_public_mempool(&self) -> bool { + self.submit_url.is_none() + } } #[async_std::main] @@ -267,7 +270,11 @@ async fn submit_transactions( txns.push(tx); hashes.push(hash); - let randomized_batch_size = rng.gen_range(opt.min_batch_size..=opt.max_batch_size); + let randomized_batch_size = if opt.use_public_mempool() { + 1 + } else { + rng.gen_range(opt.min_batch_size..=opt.max_batch_size) + }; let txns_batch_count = txns.len() as u64; if randomized_batch_size <= txns_batch_count { if let Err(err) = if txns_batch_count == 1 {