Skip to content

Commit

Permalink
Merge pull request #1460 from EspressoSystems/nfy/submit-transactions…
Browse files Browse the repository at this point in the history
…-improvement

Incorporate batch submitting in submit-transactions.rs
  • Loading branch information
nyospe authored May 14, 2024
2 parents 90d720a + 4136954 commit 7748450
Showing 1 changed file with 61 additions and 14 deletions.
75 changes: 61 additions & 14 deletions sequencer/src/bin/submit-transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ struct Options {
#[clap(long, name = "MAX_SIZE", default_value = "1kb", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_SIZE")]
max_size: u64,

/// Minimum size of batch of transactions to submit.
///
/// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches.
/// This is by selecting a random size S on each iteration I since last batch, and collecting a batch whenever that S <= I.
#[clap(long, name = "MIN_BATCH_SIZE", default_value = "1", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MIN_BATCH_SIZE")]
min_batch_size: u64,

/// Maximum size of batch of transactions to submit.
///
/// Batches will be a random count between MIN_BATCH_SIZE and MAX_BATCH_SIZE, with a falling distribution favoring smaller batches.
#[clap(long, name = "MAX_BATCH_SIZE", default_value = "20", value_parser = parse_size, env = "ESPRESSO_SUBMIT_TRANSACTIONS_MAX_BATCH_SIZE")]
max_batch_size: u64,

/// Minimum namespace ID to submit to.
#[clap(
long,
Expand Down Expand Up @@ -119,6 +132,9 @@ impl Options {
.clone()
.unwrap_or_else(|| self.url.join("submit").unwrap())
}
fn use_public_mempool(&self) -> bool {
self.submit_url.is_none()
}
}

#[async_std::main]
Expand Down Expand Up @@ -241,28 +257,59 @@ async fn submit_transactions<Ver: StaticVersionType>(
// mean `opt.delay`, or parameter `\lambda = 1 / opt.delay`.
let delay_distr = rand_distr::Exp::<f64>::new(1f64 / opt.delay.as_millis() as f64).unwrap();

let mut txns = Vec::new();
let mut hashes = Vec::new();
loop {
let tx = random_transaction(&opt, &mut rng);
let hash = tx.commit();
tracing::info!(
"submitting transaction {hash} for namespace {} of size {}",
"Adding transaction {hash} for namespace {} of size {}",
tx.namespace(),
tx.payload().len()
);
if let Err(err) = client
.post::<()>("submit")
.body_binary(&tx)
.unwrap()
.send()
.await
{
tracing::error!("failed to submit transaction: {err}");
txns.push(tx);
hashes.push(hash);

let randomized_batch_size = if opt.use_public_mempool() {
1
} else {
rng.gen_range(opt.min_batch_size..=opt.max_batch_size)
};
let txns_batch_count = txns.len() as u64;
if randomized_batch_size <= txns_batch_count {
if let Err(err) = if txns_batch_count == 1 {
// occasionally test the 'submit' endpoint, just for coverage
client
.post::<()>("submit")
.body_binary(&txns[0])
.unwrap()
.send()
.await
} else {
client
.post::<()>("batch")
.body_binary(&txns)
.unwrap()
.send()
.await
} {
tracing::error!("failed to submit batch of {txns_batch_count} transactions: {err}");
} else {
tracing::info!("submitted batch of {txns_batch_count} transactions");
let submitted_at = Instant::now();
for hash in hashes.iter() {
sender
.send(SubmittedTransaction {
hash: *hash,
submitted_at,
})
.await
.ok();
}
}
txns.clear();
hashes.clear();
}
let submitted_at = Instant::now();
sender
.send(SubmittedTransaction { hash, submitted_at })
.await
.ok();

let delay = Duration::from_millis(delay_distr.sample(&mut rng) as u64);
tracing::info!("sleeping for {delay:?}");
Expand Down

0 comments on commit 7748450

Please sign in to comment.