Skip to content

Commit

Permalink
Fix txn-emitter precission of worker sleeping (#15552)
Browse files Browse the repository at this point in the history
Fix txn-emitter precission of worker sleeping
  • Loading branch information
igor-aptos authored Dec 18, 2024
1 parent 38a5f0d commit ac9302a
Showing 1 changed file with 70 additions and 31 deletions.
101 changes: 70 additions & 31 deletions crates/transaction-emitter-lib/src/emitter/submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use std::{
sync::{atomic::AtomicU64, Arc},
time::Instant,
};
use tokio::time::sleep;
use tokio::time::{sleep, sleep_until};

const ALLOWED_EARLY: Duration = Duration::from_micros(500);

pub struct SubmissionWorker {
pub(crate) accounts: Vec<Arc<LocalAccount>>,
Expand Down Expand Up @@ -82,31 +84,18 @@ impl SubmissionWorker {
pub(crate) async fn run(mut self, start_instant: Instant) -> Vec<LocalAccount> {
let mut wait_until = start_instant + self.start_sleep_duration;

let now = Instant::now();
if wait_until > now {
self.sleep_check_done(wait_until - now).await;
}
self.sleep_check_done(wait_until).await;
let wait_duration = Duration::from_millis(self.params.wait_millis);

while !self.stop.load(Ordering::Relaxed) {
let stats_clone = self.stats.clone();
let loop_stats = stats_clone.get_cur();

let loop_start_time = Instant::now();
if wait_duration.as_secs() > 0
&& loop_start_time.duration_since(wait_until) > Duration::from_secs(5)
{
sample!(
SampleRate::Duration(Duration::from_secs(5)),
error!(
"[{:?}] txn_emitter worker drifted out of sync too much: {}s. Is expiration too short, or 5s buffer on top of it?",
self.client().path_prefix_string(),
loop_start_time.duration_since(wait_until).as_secs()
)
);

if wait_duration.as_secs() > 0 {
self.verify_loop_start_drift(loop_start_time, wait_until);
}
// always add expected cycle duration, to not drift from expected pace.
wait_until += wait_duration;

let stats_clone = self.stats.clone();
let loop_stats = stats_clone.get_cur();

let requests = self.gen_requests();
if !requests.is_empty() {
Expand Down Expand Up @@ -175,9 +164,10 @@ impl SubmissionWorker {
if self.skip_latency_stats {
// we also don't want to be stuck waiting for txn_expiration_time_secs
// after stop is called, so we sleep until time or stop is set.
self.sleep_check_done(Duration::from_secs(
self.params.txn_expiration_time_secs + 3,
))
self.sleep_check_done(
Instant::now()
+ Duration::from_secs(self.params.txn_expiration_time_secs + 3),
)
.await
}

Expand All @@ -203,9 +193,11 @@ impl SubmissionWorker {
.await;
}

let now = Instant::now();
if wait_until > now {
self.sleep_check_done(wait_until - now).await;
if wait_duration.as_secs() > 0 {
// always add expected cycle duration, to not drift from expected pace,
// irrespectively of how long our iteration lasted.
wait_until += wait_duration;
self.sleep_check_done(wait_until).await;
}
}

Expand All @@ -216,16 +208,63 @@ impl SubmissionWorker {
}

// returns true if it returned early
async fn sleep_check_done(&self, duration: Duration) {
let start_time = Instant::now();
async fn sleep_check_done(&self, sleep_until_time: Instant) {
// sleep has millisecond granularity - so round the sleep
let sleep_poll_interval = Duration::from_secs(1);
loop {
sleep(Duration::from_secs(1)).await;
if self.stop.load(Ordering::Relaxed) {
return;
}
if start_time.elapsed() >= duration {

let now = Instant::now();
if now + ALLOWED_EARLY > sleep_until_time {
return;
}

if sleep_until_time > now + sleep_poll_interval {
sleep(sleep_poll_interval).await;
} else {
sleep_until(sleep_until_time.into()).await;
}
}
}

fn verify_loop_start_drift(&self, loop_start_time: Instant, wait_until: Instant) {
if loop_start_time > wait_until {
let delay_s = loop_start_time
.saturating_duration_since(wait_until)
.as_secs_f32();
if delay_s > 5.0 {
sample!(
SampleRate::Duration(Duration::from_secs(2)),
error!(
"[{:?}] txn_emitter worker drifted out of sync too much: {:.3}s. Is machine underprovisioned? Is expiration too short, or 5s buffer on top of it?",
self.client().path_prefix_string(),
delay_s,
)
);
} else if delay_s > 0.3 {
sample!(
SampleRate::Duration(Duration::from_secs(5)),
error!(
"[{:?}] txn_emitter worker called a bit out of sync: {:.3}s. Is machine underprovisioned? Is expiration too short, or 5s buffer on top of it?",
self.client().path_prefix_string(),
delay_s,
)
);
}
} else {
let early_s = wait_until.saturating_duration_since(loop_start_time);
if early_s > ALLOWED_EARLY {
sample!(
SampleRate::Duration(Duration::from_secs(5)),
error!(
"[{:?}] txn_emitter worker called too early: {:.3}s. There is some bug in waiting.",
self.client().path_prefix_string(),
early_s.as_secs_f32(),
)
);
}
}
}

Expand Down

0 comments on commit ac9302a

Please sign in to comment.