diff --git a/sequencer/src/bin/submit-transactions.rs b/sequencer/src/bin/submit-transactions.rs index 8c76d457fa..2dd1e38b08 100644 --- a/sequencer/src/bin/submit-transactions.rs +++ b/sequencer/src/bin/submit-transactions.rs @@ -41,6 +41,19 @@ struct Options { #[clap(long, name = "MAX_SIZE", default_value = "1kb", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_SIZE")] max_size: u64, + /// Minimum size of batch of transactions to submit. + /// + /// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches. + /// This is by selecting a random size S on each iteration I since last batch, and collecting a batch whenever that S <= I. + #[clap(long, name = "MIN_BATCH_SIZE", default_value = "1", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MIN_BATCH_SIZE")] + min_batch_size: u64, + + /// Maximum size of batch of transactions to submit. + /// + /// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches. + #[clap(long, name = "MAX_BATCH_SIZE", default_value = "20", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_BATCH_SIZE")] + max_batch_size: u64, + /// Minimum namespace ID to submit to. #[clap( long, @@ -119,6 +132,9 @@ impl Options { .clone() .unwrap_or_else(|| self.url.join("submit").unwrap()) } + fn use_public_mempool(&self) -> bool { + self.submit_url.is_none() + } } #[async_std::main] @@ -241,28 +257,59 @@ async fn submit_transactions( // mean `opt.delay`, or parameter `\lambda = 1 / opt.delay`. let delay_distr = rand_distr::Exp::::new(1f64 / opt.delay.as_millis() as f64).unwrap(); + let mut txns = Vec::new(); + let mut hashes = Vec::new(); loop { let tx = random_transaction(&opt, &mut rng); let hash = tx.commit(); tracing::info!( - "submitting transaction {hash} for namespace {} of size {}", + "Adding transaction {hash} for namespace {} of size {}", tx.namespace(), tx.payload().len() ); - if let Err(err) = client - .post::<()>("submit") - .body_binary(&tx) - .unwrap() - .send() - .await - { - tracing::error!("failed to submit transaction: {err}"); + txns.push(tx); + hashes.push(hash); + + let randomized_batch_size = if opt.use_public_mempool() { + 1 + } else { + rng.gen_range(opt.min_batch_size..=opt.max_batch_size) + }; + let txns_batch_count = txns.len() as u64; + if randomized_batch_size <= txns_batch_count { + if let Err(err) = if txns_batch_count == 1 { + // occasionally test the 'submit' endpoint, just for coverage + client + .post::<()>("submit") + .body_binary(&txns[0]) + .unwrap() + .send() + .await + } else { + client + .post::<()>("batch") + .body_binary(&txns) + .unwrap() + .send() + .await + } { + tracing::error!("failed to submit batch of {txns_batch_count} transactions: {err}"); + } else { + tracing::info!("submitted batch of {txns_batch_count} transactions"); + let submitted_at = Instant::now(); + for hash in hashes.iter() { + sender + .send(SubmittedTransaction { + hash: *hash, + submitted_at, + }) + .await + .ok(); + } + } + txns.clear(); + hashes.clear(); } - let submitted_at = Instant::now(); - sender - .send(SubmittedTransaction { hash, submitted_at }) - .await - .ok(); let delay = Duration::from_millis(delay_distr.sample(&mut rng) as u64); tracing::info!("sleeping for {delay:?}");