Skip to content

Commit

Permalink
Add backoff to state catchup retry loops (#1525)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer authored May 30, 2024
2 parents c98abf5 + 071e234 commit 11c3127
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions sequencer/src/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,48 @@ use crate::{
state::{BlockMerkleTree, FeeAccount, FeeMerkleCommitment},
};
use anyhow::{bail, Context};
use async_std::sync::RwLock;
use async_std::{sync::RwLock, task::sleep};
use async_trait::async_trait;
use derive_more::From;
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _};
use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
use rand::Rng;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{cmp::min, fmt::Debug, sync::Arc, time::Duration};
use surf_disco::Request;
use tide_disco::error::ServerError;
use url::Url;
use vbs::version::StaticVersionType;

const MIN_RETRY_DELAY: Duration = Duration::from_millis(500);
const MAX_RETRY_DELAY: Duration = Duration::from_secs(5);
const BACKOFF_FACTOR: u32 = 2;
// Exponential backoff jitter as a fraction of the backoff delay, (numerator, denominator).
const BACKOFF_JITTER: (u64, u64) = (1, 10);

#[must_use]
fn backoff(delay: Duration) -> Duration {
if delay >= MAX_RETRY_DELAY {
return MAX_RETRY_DELAY;
}

let mut rng = rand::thread_rng();

// Increase the backoff by the backoff factor.
let ms = (delay * BACKOFF_FACTOR).as_millis() as u64;

// Sample a random jitter factor in the range [0, BACKOFF_JITTER.0 / BACKOFF_JITTER.1].
let jitter_num = rng.gen_range(0..BACKOFF_JITTER.0);
let jitter_den = BACKOFF_JITTER.1;

// Increase the delay by the jitter factor.
let jitter = ms * jitter_num / jitter_den;
let delay = Duration::from_millis(ms + jitter);

// Bound the delay by the maximum.
min(delay, MAX_RETRY_DELAY)
}

// This newtype is probably not worth having. It's only used to be able to log
// URLs before doing requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -59,15 +89,17 @@ pub trait StateCatchup: Send + Sync + std::fmt::Debug {
let mut ret = vec![];
for account in accounts {
// Retry until we succeed.
let mut delay = MIN_RETRY_DELAY;
let account = loop {
match self
.try_fetch_account(height, view, fee_merkle_tree_root, account)
.await
{
Ok(account) => break account,
Err(err) => {
tracing::warn!(%account, "Could not fetch account, retrying: {err:#}");
async_std::task::sleep(self.retry_interval()).await;
tracing::warn!(%account, ?delay, "Could not fetch account, retrying: {err:#}");
sleep(delay).await;
delay = backoff(delay);
}
}
};
Expand All @@ -91,22 +123,24 @@ pub trait StateCatchup: Send + Sync + std::fmt::Debug {
view: ViewNumber,
mt: &mut BlockMerkleTree,
) -> anyhow::Result<()> {
// Retry until we succeed.
let mut delay = MIN_RETRY_DELAY;
loop {
match self.try_remember_blocks_merkle_tree(height, view, mt).await {
Ok(()) => break,
Err(err) => {
tracing::warn!("Could not fetch frontier from any peer, retrying: {err:#}");
async_std::task::sleep(self.retry_interval()).await;
tracing::warn!(
?delay,
"Could not fetch frontier from any peer, retrying: {err:#}"
);
sleep(delay).await;
delay = backoff(delay);
}
}
}

Ok(())
}

fn retry_interval(&self) -> Duration {
Duration::from_millis(100)
}
}

/// A catchup implementation that falls back to a remote provider, but prefers a local provider when
Expand All @@ -127,7 +161,6 @@ pub(crate) async fn local_and_remote(
#[derive(Debug, Clone, Default)]
pub struct StatePeers<Ver: StaticVersionType> {
clients: Vec<Client<ServerError, Ver>>,
interval: Duration,
}

impl<Ver: StaticVersionType> StatePeers<Ver> {
Expand All @@ -138,7 +171,6 @@ impl<Ver: StaticVersionType> StatePeers<Ver> {

Self {
clients: urls.into_iter().map(Client::new).collect(),
interval: Duration::from_secs(1),
}
}
}
Expand Down Expand Up @@ -209,10 +241,6 @@ impl<Ver: StaticVersionType> StateCatchup for StatePeers<Ver> {
}
bail!("Could not fetch frontier from any peer");
}

fn retry_interval(&self) -> Duration {
self.interval
}
}

#[derive(Debug, From)]
Expand Down

0 comments on commit 11c3127

Please sign in to comment.