From 03b2c6a2e3cb7b805b9baa2526defc648b71c6f4 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Fri, 3 Jan 2025 13:12:48 -0800 Subject: [PATCH] L1 client failover Allow the L1 client to be configured with multiple RPC provider URLs. Fail over to the next provider in the list if * two requests fail in a given time period * N requests fail in a row Handle rate limit errors specially by preventing requests to the provider until the rate limit period expires. Closes #2360 --- Cargo.lock | 22 +- builder/src/bin/permissionless-builder.rs | 11 +- builder/src/non_permissioned.rs | 2 +- .../src/bin/marketplace-builder.rs | 11 +- marketplace-builder/src/builder.rs | 2 +- sequencer-sqlite/Cargo.lock | 20 +- sequencer/src/genesis.rs | 29 +- sequencer/src/lib.rs | 4 +- sequencer/src/options.rs | 8 +- sequencer/src/run.rs | 4 +- types/Cargo.toml | 2 + types/src/v0/impls/l1.rs | 389 +++++++++++++++--- types/src/v0/mod.rs | 4 +- types/src/v0/v0_1/l1.rs | 69 +++- types/src/v0/v0_3/mod.rs | 4 +- 15 files changed, 487 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0844e2b46a..429a441b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2824,6 +2824,7 @@ dependencies = [ "anyhow", "ark-serialize", "async-broadcast", + "async-lock 3.4.0", "async-trait", "base64-bytes", "bincode", @@ -2853,6 +2854,7 @@ dependencies = [ "portpicker", "pretty_assertions", "rand 0.8.5", + "reqwest 0.11.27", "sequencer-utils", "serde", "serde_json", @@ -4595,7 +4597,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite 0.2.15", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -4665,6 +4667,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.8.0", + "hyper 0.14.31", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -8000,10 +8015,12 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.31", "hyper-rustls 0.24.2", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite 0.2.15", @@ -8015,6 +8032,7 @@ dependencies = [ "sync_wrapper 0.1.2", "system-configuration 0.5.1", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -8042,7 +8060,7 @@ dependencies = [ "http-body-util", "hyper 1.5.0", "hyper-rustls 0.27.3", - "hyper-tls", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", diff --git a/builder/src/bin/permissionless-builder.rs b/builder/src/bin/permissionless-builder.rs index 0c98ebc805..1ff329399e 100644 --- a/builder/src/bin/permissionless-builder.rs +++ b/builder/src/bin/permissionless-builder.rs @@ -40,8 +40,13 @@ struct NonPermissionedBuilderOptions { eth_account_index: u32, /// Url we will use for RPC communication with L1. - #[clap(long, env = "ESPRESSO_BUILDER_L1_PROVIDER")] - l1_provider_url: Url, + #[clap( + long, + env = "ESPRESSO_BUILDER_L1_PROVIDER", + value_delimiter = ',', + num_args = 1.., + )] + l1_provider_url: Vec, /// Peer nodes use to fetch missing state #[clap(long, env = "ESPRESSO_SEQUENCER_STATE_PEERS", value_delimiter = ',')] @@ -133,7 +138,7 @@ async fn run( opt: NonPermissionedBuilderOptions, ) -> anyhow::Result<()> { let l1_params = L1Params { - url: opt.l1_provider_url, + urls: opt.l1_provider_url, options: Default::default(), }; diff --git a/builder/src/non_permissioned.rs b/builder/src/non_permissioned.rs index 946a3dda12..b4a8fbb92a 100644 --- a/builder/src/non_permissioned.rs +++ b/builder/src/non_permissioned.rs @@ -46,7 +46,7 @@ pub fn build_instance_state( l1_params: L1Params, state_peers: Vec, ) -> NodeState { - let l1_client = l1_params.options.connect(l1_params.url); + let l1_client = l1_params.options.connect(l1_params.urls); NodeState::new( u64::MAX, // dummy node ID, only used for debugging chain_config, diff --git a/marketplace-builder/src/bin/marketplace-builder.rs b/marketplace-builder/src/bin/marketplace-builder.rs index d8a294eac3..c23cf6a1b3 100644 --- a/marketplace-builder/src/bin/marketplace-builder.rs +++ b/marketplace-builder/src/bin/marketplace-builder.rs @@ -48,8 +48,13 @@ struct NonPermissionedBuilderOptions { eth_account_index: u32, /// Url we will use for RPC communication with L1. - #[clap(long, env = "ESPRESSO_BUILDER_L1_PROVIDER")] - l1_provider_url: Url, + #[clap( + long, + env = "ESPRESSO_BUILDER_L1_PROVIDER", + value_delimiter = ',', + num_args = 1.., + )] + l1_provider_url: Vec, /// Peer nodes use to fetch missing state #[clap(long, env = "ESPRESSO_SEQUENCER_STATE_PEERS", value_delimiter = ',')] @@ -140,7 +145,7 @@ async fn run( opt: NonPermissionedBuilderOptions, ) -> anyhow::Result<()> { let l1_params = L1Params { - url: opt.l1_provider_url, + urls: opt.l1_provider_url, options: Default::default(), }; diff --git a/marketplace-builder/src/builder.rs b/marketplace-builder/src/builder.rs index dfc2aff471..1d36f9399e 100644 --- a/marketplace-builder/src/builder.rs +++ b/marketplace-builder/src/builder.rs @@ -66,7 +66,7 @@ pub fn build_instance_state( l1_params: L1Params, state_peers: Vec, ) -> NodeState { - let l1_client = l1_params.options.connect(l1_params.url); + let l1_client = l1_params.options.connect(l1_params.urls); NodeState::new( u64::MAX, // dummy node ID, only used for debugging chain_config, diff --git a/sequencer-sqlite/Cargo.lock b/sequencer-sqlite/Cargo.lock index 0779e290f8..54b7866235 100644 --- a/sequencer-sqlite/Cargo.lock +++ b/sequencer-sqlite/Cargo.lock @@ -2742,6 +2742,7 @@ dependencies = [ "anyhow", "ark-serialize", "async-broadcast", + "async-lock 3.4.0", "async-trait", "base64-bytes", "bincode", @@ -2770,6 +2771,7 @@ dependencies = [ "paste", "pretty_assertions", "rand 0.8.5", + "reqwest 0.11.27", "sequencer-utils", "serde", "serde_json", @@ -4537,6 +4539,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.9.0", + "hyper 0.14.31", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -7718,10 +7733,12 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.31", "hyper-rustls 0.24.2", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite 0.2.15", @@ -7733,6 +7750,7 @@ dependencies = [ "sync_wrapper 0.1.2", "system-configuration 0.5.1", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -7760,7 +7778,7 @@ dependencies = [ "http-body-util", "hyper 1.5.1", "hyper-rustls 0.27.3", - "hyper-tls", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", diff --git a/sequencer/src/genesis.rs b/sequencer/src/genesis.rs index e5eb58a7db..60947c9ad9 100644 --- a/sequencer/src/genesis.rs +++ b/sequencer/src/genesis.rs @@ -11,6 +11,7 @@ use espresso_types::{ use ethers::types::H160; use sequencer_utils::deployer::is_proxy_contract; use serde::{Deserialize, Serialize}; +use url::Url; use vbs::version::Version; /// Initial configuration of an Espresso stake table. @@ -85,8 +86,8 @@ impl Genesis { } impl Genesis { - pub async fn validate_fee_contract(&self, l1_rpc_url: String) -> anyhow::Result<()> { - let l1 = L1Client::new(l1_rpc_url.parse().context("invalid url")?); + pub async fn validate_fee_contract(&self, l1_rpc_url: Url) -> anyhow::Result<()> { + let l1 = L1Client::new(l1_rpc_url); if let Some(fee_contract_address) = self.chain_config.fee_contract { tracing::info!("validating fee contract at {fee_contract_address:x}"); @@ -593,7 +594,9 @@ mod test { let genesis: Genesis = toml::from_str(&toml).unwrap_or_else(|err| panic!("{err:#}")); // validate the fee_contract address - let result = genesis.validate_fee_contract(anvil.endpoint()).await; + let result = genesis + .validate_fee_contract(anvil.endpoint().parse().unwrap()) + .await; // check if the result from the validation is an error if let Err(e) = result { @@ -639,7 +642,9 @@ mod test { let genesis: Genesis = toml::from_str(&toml).unwrap_or_else(|err| panic!("{err:#}")); // Call the validation logic for the fee_contract address - let result = genesis.validate_fee_contract(anvil.endpoint()).await; + let result = genesis + .validate_fee_contract(anvil.endpoint().parse().unwrap()) + .await; assert!( result.is_ok(), @@ -711,7 +716,9 @@ mod test { let genesis: Genesis = toml::from_str(&toml).unwrap_or_else(|err| panic!("{err:#}")); // Call the validation logic for the fee_contract address - let result = genesis.validate_fee_contract(anvil.endpoint()).await; + let result = genesis + .validate_fee_contract(anvil.endpoint().parse().unwrap()) + .await; assert!( result.is_ok(), @@ -783,7 +790,9 @@ mod test { let genesis: Genesis = toml::from_str(&toml).unwrap_or_else(|err| panic!("{err:#}")); // Call the validation logic for the fee_contract address - let result = genesis.validate_fee_contract(anvil.endpoint()).await; + let result = genesis + .validate_fee_contract(anvil.endpoint().parse().unwrap()) + .await; // check if the result from the validation is an error if let Err(e) = result { @@ -851,7 +860,9 @@ mod test { let rpc_url = "https://ethereum-sepolia.publicnode.com"; // validate the fee_contract address - let result = genesis.validate_fee_contract(rpc_url.to_string()).await; + let result = genesis + .validate_fee_contract(rpc_url.parse().unwrap()) + .await; // check if the result from the validation is an error if let Err(e) = result { @@ -904,7 +915,9 @@ mod test { let rpc_url = "https://ethereum-sepolia.publicnode.com"; // validate the fee_contract address - let result = genesis.validate_fee_contract(rpc_url.to_string()).await; + let result = genesis + .validate_fee_contract(rpc_url.parse().unwrap()) + .await; // check if the result from the validation is an error if let Err(e) = result { diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 9a29c8425d..a54d21d909 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -185,7 +185,7 @@ pub struct NetworkParams { } pub struct L1Params { - pub url: Url, + pub urls: Vec, pub options: L1ClientOptions, } @@ -491,7 +491,7 @@ pub async fn init_node( let l1_client = l1_params .options .with_metrics(metrics) - .connect(l1_params.url); + .connect(l1_params.urls); l1_client.spawn_tasks().await; let l1_genesis = match genesis.l1_finalized { L1Finalized::Block(b) => b, diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index b7fabe1d78..05dbf4b39b 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -340,10 +340,12 @@ pub struct Options { #[clap( long, env = "ESPRESSO_SEQUENCER_L1_PROVIDER", - default_value = "http://localhost:8545" + default_value = "http://localhost:8545", + value_delimiter = ',', + num_args = 1.., )] - #[derivative(Debug(format_with = "Display::fmt"))] - pub l1_provider_url: Url, + #[derivative(Debug = "ignore")] + pub l1_provider_url: Vec, /// Configuration for the L1 client. #[clap(flatten)] diff --git a/sequencer/src/run.rs b/sequencer/src/run.rs index a7f864e12d..59cf5dcafc 100644 --- a/sequencer/src/run.rs +++ b/sequencer/src/run.rs @@ -28,7 +28,7 @@ pub async fn main() -> anyhow::Result<()> { // validate that the fee contract is a proxy and panic otherwise genesis - .validate_fee_contract(opt.l1_provider_url.to_string()) + .validate_fee_contract(opt.l1_provider_url[0].clone()) .await .unwrap(); @@ -130,7 +130,7 @@ where { let (private_staking_key, private_state_key) = opt.private_keys()?; let l1_params = L1Params { - url: opt.l1_provider_url, + urls: opt.l1_provider_url, options: opt.l1_options, }; diff --git a/types/Cargo.toml b/types/Cargo.toml index 32d0c8e389..65f9167892 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -11,6 +11,7 @@ testing = [] anyhow = { workspace = true } ark-serialize = { workspace = true } async-broadcast = { workspace = true } +async-lock = { workspace = true } async-trait = { workspace = true } base64-bytes = { workspace = true } bincode = { workspace = true } @@ -39,6 +40,7 @@ num-traits = { workspace = true } paste = { workspace = true } pretty_assertions = { workspace = true } rand = { workspace = true } +reqwest = "0.11" # Same version used by ethers sequencer-utils = { path = "../utils" } serde = { workspace = true } serde_json = { workspace = true } diff --git a/types/src/v0/impls/l1.rs b/types/src/v0/impls/l1.rs index b682bacb8a..2a77d3a35f 100644 --- a/types/src/v0/impls/l1.rs +++ b/types/src/v0/impls/l1.rs @@ -1,32 +1,39 @@ use std::{ cmp::{min, Ordering}, + fmt::Debug, num::NonZeroUsize, sync::Arc, + time::Instant, }; use anyhow::Context; +use async_trait::async_trait; use clap::Parser; use committable::{Commitment, Committable, RawCommitmentBuilder}; use contract_bindings::fee_contract::FeeContract; use ethers::{ prelude::{Address, BlockNumber, Middleware, Provider, H256, U256}, - providers::{Http, Ws}, + providers::{Http, HttpClientError, JsonRpcClient, JsonRpcError, Ws}, }; use futures::{ - future::Future, + future::{Future, FutureExt}, stream::{self, StreamExt}, }; use hotshot_types::traits::metrics::Metrics; use lru::LruCache; +use reqwest::StatusCode; +use serde::{de::DeserializeOwned, Serialize}; use tokio::{ spawn, sync::{Mutex, MutexGuard}, - time::sleep, + time::{sleep, Duration}, }; use tracing::Instrument; use url::Url; -use super::{L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask}; +use super::{ + L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus, +}; use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot}; impl PartialOrd for L1BlockInfo { @@ -96,10 +103,13 @@ impl L1ClientOptions { self } - /// Instantiate an `L1Client` for a given provider `Url`. - pub fn connect(self, url: Url) -> L1Client { - let metrics = L1ClientMetrics::new(&**self.metrics); - L1Client::with_provider(self, url.to_string().try_into().unwrap(), metrics) + /// Instantiate an `L1Client` for a given list of provider `Url`s. + pub fn connect(self, urls: impl IntoIterator) -> L1Client { + L1Client::with_provider(Provider::new(MultiRpcClient::new(self, urls))) + } + + fn rate_limit_delay(&self) -> Duration { + self.l1_rate_limit_delay.unwrap_or(self.l1_retry_delay) } } @@ -115,23 +125,140 @@ impl L1ClientMetrics { } } +impl MultiRpcClient { + fn new(opt: L1ClientOptions, clients: impl IntoIterator) -> Self { + // The type of messages in this channel is (), i.e. all messages are identical, and we only + // ever use it to await the next single failure (see `next_failover`). In other words, it + // functions as a oneshot broadcast channel, so a capacity of 1 is safe. + let (mut failover_send, failover_recv) = async_broadcast::broadcast(1); + failover_send.set_await_active(false); + failover_send.set_overflow(true); + + Self { + clients: Arc::new(clients.into_iter().map(Http::new).collect()), + status: Default::default(), + failover_send, + failover_recv: failover_recv.deactivate(), + opt, + } + } + + async fn failover(&self, time: Instant, status: &mut MultiRpcClientStatus) { + tracing::warn!( + ?status, + ?time, + frequent_failure_tolerance = ?self.opt.l1_frequent_failure_tolerance, + consecutive_failure_tolerance = ?self.opt.l1_consecutive_failure_tolerance, + current = status.client, + "L1 client failing over", + ); + status.client += 1; + status.rate_limited_until = None; + status.last_failure = None; + status.consecutive_failures = 0; + self.failover_send.broadcast_direct(()).await.ok(); + } + + fn next_failover(&self) -> impl Future { + let recv = self.failover_recv.activate_cloned(); + recv.into_future().map(|_| ()) + } + + fn options(&self) -> &L1ClientOptions { + &self.opt + } +} + +#[async_trait] +impl JsonRpcClient for MultiRpcClient { + type Error = HttpClientError; + + async fn request(&self, method: &str, params: T) -> Result + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send, + { + let current = { + let status = self.status.read().await; + + // If we've been rate limited, back off until the limit (hopefully) expires. + if let Some(t) = status.rate_limited_until { + if t > Instant::now() { + // Return an error with a non-standard code to indicate client-side rate limit. + return Err(JsonRpcError { + code: -20000, + message: "rate limit exceeded".into(), + data: None, + } + .into()); + } + } + + status.client + }; + let client = &self.clients[current % self.clients.len()]; + match client.request(method, ¶ms).await { + Ok(res) => Ok(res), + Err(err) => { + let t = Instant::now(); + tracing::warn!(?t, method, ?params, "L1 client error: {err:#}"); + + // Keep track of failures, failing over to the next client if necessary. + let mut status = self.status.write().await; + if status.client != current { + // Someone else has also gotten a failure, and the client has already been + // failed over. + return Err(err); + } + + // Treat rate limited errors specially; these should not cause failover, but instead + // should only cause us to temporarily back off on making requests to the RPC + // server. + if let HttpClientError::ReqwestError(e) = &err { + if matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)) { + status.rate_limited_until = Some(t + self.opt.rate_limit_delay()); + return Err(err); + } + } + + if let Some(prev) = status.last_failure { + if t - prev < self.opt.l1_frequent_failure_tolerance { + // We have failed twice inside the allowed window, so we should failover to + // the next client. + self.failover(t, &mut status).await; + return Err(err); + } + } + + status.consecutive_failures += 1; + if status.consecutive_failures >= self.opt.l1_consecutive_failure_tolerance { + // We have failed too many times in a row, albeit not rapidly enough to trigger + // the frequent failure tolerance. Still, we now trigger a failover based on the + // consecutive failures policy. + self.failover(t, &mut status).await; + return Err(err); + } + + // If we're not failing over, update the last failure time. + status.last_failure = Some(t); + Err(err) + } + } + } +} + impl L1Client { - fn with_provider( - opt: L1ClientOptions, - mut provider: Provider, - metrics: L1ClientMetrics, - ) -> Self { + fn with_provider(mut provider: Provider) -> Self { + let opt = provider.as_ref().options().clone(); + let metrics = L1ClientMetrics::new(&**opt.metrics); + let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity); receiver.set_await_active(false); receiver.set_overflow(true); provider.set_interval(opt.l1_polling_interval); Self { - retry_delay: opt.l1_retry_delay, - subscription_timeout: opt.subscription_timeout, provider: Arc::new(provider), - ws_provider: opt.ws_provider, - events_max_block_range: opt.l1_events_max_block_range, state: Arc::new(Mutex::new(L1State::new(opt.l1_blocks_cache_size))), sender, receiver: receiver.deactivate(), @@ -142,7 +269,7 @@ impl L1Client { /// Construct a new L1 client with the default options. pub fn new(url: Url) -> Self { - L1ClientOptions::default().connect(url) + L1ClientOptions::default().connect([url]) } /// Start the background tasks which keep the L1 client up to date. @@ -168,23 +295,27 @@ impl L1Client { } fn update_loop(&self) -> impl Future { + let opt = self.options(); let rpc = self.provider.clone(); - let ws_url = self.ws_provider.clone(); - let retry_delay = self.retry_delay; - let subscription_timeout = self.subscription_timeout; + let ws_urls = opt.l1_ws_provider.clone(); + let retry_delay = opt.l1_retry_delay; + let subscription_timeout = opt.subscription_timeout; let state = self.state.clone(); let sender = self.sender.clone(); let metrics = self.metrics.clone(); let span = tracing::warn_span!("L1 client update"); async move { - loop { - let mut ws; - - // Subscribe to new blocks. This task cannot fail; retry until we succeed. - let mut block_stream = loop { - let res = match &ws_url { - Some(url) => { + for i in 0.. { + let ws; + + // Subscribe to new blocks. + let mut block_stream = { + let res = match &ws_urls { + Some(urls) => { + // Use a new WebSockets host each time we retry in case there is a + // problem with one of the hosts specifically. + let url = &urls[i % urls.len()]; ws = match Provider::::connect(url.clone()).await { Ok(ws) => ws, Err(err) => { @@ -195,7 +326,11 @@ impl L1Client { }; ws.subscribe_blocks().await.map(StreamExt::boxed) } - None => rpc + None => { + let failover = (*rpc).as_ref().next_failover().map(|()| { + tracing::warn!("aborting subscription stream due to provider failover"); + }); + rpc .watch_blocks() .await .map(|stream| { @@ -223,13 +358,16 @@ impl L1Client { } }) } - .boxed()), + .take_until(failover) + .boxed()) + } }; match res { - Ok(stream) => break stream, + Ok(stream) => stream, Err(err) => { tracing::error!("error subscribing to L1 blocks: {err:#}"); sleep(retry_delay).await; + continue; } } }; @@ -352,7 +490,7 @@ impl L1Client { // This should not happen: the event stream ended. All we can do is try again. tracing::warn!(number, "L1 event stream ended unexpectedly; retry"); - sleep(self.retry_delay).await; + self.retry_delay().await; } } @@ -398,7 +536,7 @@ impl L1Client { // This should not happen: the event stream ended. All we can do is try again. tracing::warn!(number, "L1 event stream ended unexpectedly; retry",); - sleep(self.retry_delay).await; + self.retry_delay().await; } } @@ -440,7 +578,7 @@ impl L1Client { // This should not happen: the event stream ended. All we can do is try again. tracing::warn!(%timestamp, "L1 event stream ended unexpectedly; retry",); - sleep(self.retry_delay).await; + self.retry_delay().await; }; // It is possible there is some earlier block that also has the proper timestamp. Work @@ -482,18 +620,18 @@ impl L1Client { number, "provider error: finalized L1 block should always be available" ); - sleep(self.retry_delay).await; + self.retry_delay().await; continue; } Err(err) => { tracing::warn!(number, "failed to get finalized L1 block: {err:#}"); - sleep(self.retry_delay).await; + self.retry_delay().await; continue; } }; let Some(hash) = block.hash else { tracing::warn!(number, ?block, "finalized L1 block has no hash"); - sleep(self.retry_delay).await; + self.retry_delay().await; continue; }; break L1BlockInfo { @@ -523,6 +661,8 @@ impl L1Client { return vec![]; } + let opt = self.options(); + // `prev` should have already been processed unless we // haven't processed *any* blocks yet. let prev = prev_finalized.map(|prev| prev + 1).unwrap_or(0); @@ -531,7 +671,7 @@ impl L1Client { // `events_max_block_range`. let mut start = prev; let end = new_finalized; - let chunk_size = self.events_max_block_range; + let chunk_size = opt.l1_events_max_block_range; let chunks = std::iter::from_fn(move || { let chunk_end = min(start + chunk_size - 1, end); if chunk_end < start { @@ -545,7 +685,7 @@ impl L1Client { // Fetch events for each chunk. let events = stream::iter(chunks).then(|(from, to)| { - let retry_delay = self.retry_delay; + let retry_delay = opt.l1_retry_delay; let fee_contract = FeeContract::new(fee_contract_address, self.provider.clone()); async move { tracing::debug!(from, to, "fetch events in range"); @@ -571,6 +711,14 @@ impl L1Client { }); events.flatten().map(FeeInfo::from).collect().await } + + fn options(&self) -> &L1ClientOptions { + (*self.provider).as_ref().options() + } + + async fn retry_delay(&self) { + sleep(self.options().l1_retry_delay).await; + } } impl L1State { @@ -601,7 +749,9 @@ impl L1State { } } -async fn get_finalized_block(rpc: &Provider) -> anyhow::Result> { +async fn get_finalized_block( + rpc: &Provider, +) -> anyhow::Result> { let Some(block) = rpc.get_block(BlockNumber::Finalized).await? else { // This can happen in rare cases where the L1 chain is very young and has not finalized a // block yet. This is more common in testing and demo environments. In any case, we proceed @@ -631,6 +781,7 @@ mod test { use contract_bindings::fee_contract::FeeContract; use ethers::{ prelude::{LocalWallet, Signer, SignerMiddleware, H160, U64}, + providers::Http, utils::{hex, parse_ether, Anvil, AnvilInstance}, }; use portpicker::pick_unused_port; @@ -645,20 +796,21 @@ mod test { l1_events_max_block_range: 1, l1_polling_interval: Duration::from_secs(1), subscription_timeout: Duration::from_secs(5), - ws_provider: if ws { - Some(anvil.ws_endpoint().parse().unwrap()) + l1_ws_provider: if ws { + Some(vec![anvil.ws_endpoint().parse().unwrap()]) } else { None }, ..Default::default() } - .connect(anvil.endpoint().parse().unwrap()); + .connect([anvil.endpoint().parse().unwrap()]); client.spawn_tasks().await; client } - async fn test_get_finalized_deposits_helper(ws: bool) -> anyhow::Result<()> { + #[tokio::test(flavor = "multi_thread")] + async fn test_get_finalized_deposits() -> anyhow::Result<()> { setup_test(); // how many deposits will we make @@ -667,7 +819,7 @@ mod test { let anvil = Anvil::new().spawn(); let wallet_address = anvil.addresses().first().cloned().unwrap(); - let l1_client = new_l1_client(&anvil, ws).await; + let l1_client = new_l1_client(&anvil, false).await; let wallet: LocalWallet = anvil.keys()[0].clone().into(); // In order to deposit we need a provider that can sign. @@ -750,7 +902,7 @@ mod test { assert_eq!(deposits + deploy_txn_count, head.as_u64()); // Use non-signing `L1Client` to retrieve data. - let l1_client = new_l1_client(&anvil, ws).await; + let l1_client = new_l1_client(&anvil, false).await; // Set prev deposits to `None` so `Filter` will start at block // 0. The test would also succeed if we pass `0` (b/c first // block did not deposit). @@ -817,16 +969,6 @@ mod test { Ok(()) } - #[tokio::test(flavor = "multi_thread")] - async fn test_get_finalized_deposits_ws() -> anyhow::Result<()> { - test_get_finalized_deposits_helper(true).await - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_get_finalized_deposits_http() -> anyhow::Result<()> { - test_get_finalized_deposits_helper(false).await - } - async fn test_wait_for_finalized_block_helper(ws: bool) { setup_test(); @@ -990,4 +1132,139 @@ mod test { async fn test_reconnect_update_task_http() { test_reconnect_update_task_helper(false).await } + + async fn test_failover_update_task_helper(ws: bool) { + setup_test(); + + let anvil = Anvil::new().block_time(1u32).spawn(); + + // Create an L1 client with fake providers, and check that the state is still updated after + // it correctly fails over to the real providers. + let client = L1ClientOptions { + l1_polling_interval: Duration::from_secs(1), + // Use a very long subscription timeout, so that we only succeed by triggering a + // failover. + subscription_timeout: Duration::from_secs(1000), + l1_ws_provider: if ws { + Some(vec![ + "ws://notarealurl:1234".parse().unwrap(), + anvil.ws_endpoint().parse().unwrap(), + ]) + } else { + None + }, + ..Default::default() + } + .connect([ + "http://notarealurl:1234".parse().unwrap(), + anvil.endpoint().parse().unwrap(), + ]); + + client.spawn_tasks().await; + + let initial_state = client.snapshot().await; + tracing::info!(?initial_state, "initial state"); + + // Check the state is updating. + let mut retry = 0; + let updated_state = loop { + assert!(retry < 10, "state did not update in time"); + + let updated_state = client.snapshot().await; + if updated_state.head > initial_state.head { + break updated_state; + } + tracing::info!(retry, "waiting for state update"); + sleep(Duration::from_secs(1)).await; + retry += 1; + }; + tracing::info!(?updated_state, "state updated"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_failover_update_task_ws() { + test_failover_update_task_helper(true).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_failover_update_task_http() { + test_failover_update_task_helper(false).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_failover_consecutive_failures() { + setup_test(); + + let anvil = Anvil::new().block_time(1u32).spawn(); + let provider = Provider::new(MultiRpcClient::new( + L1ClientOptions { + l1_polling_interval: Duration::from_secs(1), + // Set a very short tolerance for frequent failovers, so that we will only + // successfully trigger a failover via the consecutive failover rule. + l1_frequent_failure_tolerance: Duration::from_millis(0), + l1_consecutive_failure_tolerance: 3, + ..Default::default() + }, + [ + "http://notarealurl:1234".parse().unwrap(), + anvil.endpoint().parse().unwrap(), + ], + )); + + // Make just enough failed requests not to trigger a failover. + for _ in 0..2 { + let failover = provider.as_ref().next_failover(); + provider.get_block_number().await.unwrap_err(); + assert!(failover.now_or_never().is_none()); + assert_eq!(provider.as_ref().status.read().await.client, 0); + } + + // The final request triggers failover. + let failover = provider.as_ref().next_failover(); + provider.get_block_number().await.unwrap_err(); + assert!(failover.now_or_never().is_some()); + assert_eq!(provider.as_ref().status.read().await.client, 1); + + // Now requests succeed. + provider.get_block_number().await.unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_failover_frequent_failures() { + setup_test(); + + let anvil = Anvil::new().block_time(1u32).spawn(); + let provider = Provider::new(MultiRpcClient::new( + L1ClientOptions { + l1_polling_interval: Duration::from_secs(1), + l1_frequent_failure_tolerance: Duration::from_millis(100), + ..Default::default() + }, + [ + "http://notarealurl:1234".parse().unwrap(), + anvil.endpoint().parse().unwrap(), + ], + )); + + // Two failed requests that are not within the tolerance window do not trigger a failover. + let failover = provider.as_ref().next_failover(); + provider.get_block_number().await.unwrap_err(); + sleep(Duration::from_secs(1)).await; + provider.get_block_number().await.unwrap_err(); + + // Check that we didn't fail over. + assert!(failover.now_or_never().is_none()); + assert_eq!(provider.as_ref().status.read().await.client, 0); + + // Reset the window. + sleep(Duration::from_secs(1)).await; + + // Two failed requests in a row trigger failover. + let failover = provider.as_ref().next_failover(); + provider.get_block_number().await.unwrap_err(); + provider.get_block_number().await.unwrap_err(); + provider.get_block_number().await.unwrap(); + assert!(failover.now_or_never().is_some()); + assert_eq!(provider.as_ref().status.read().await.client, 1); + } } diff --git a/types/src/v0/mod.rs b/types/src/v0/mod.rs index c119cf5c0c..5112820560 100644 --- a/types/src/v0/mod.rs +++ b/types/src/v0/mod.rs @@ -123,7 +123,9 @@ reexport_unchanged_types!( BlockSize, ); -pub(crate) use v0_3::{L1ClientMetrics, L1Event, L1State, L1UpdateTask}; +pub(crate) use v0_3::{ + L1ClientMetrics, L1Event, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus, +}; #[derive( Clone, Copy, Debug, Default, Hash, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, diff --git a/types/src/v0/v0_1/l1.rs b/types/src/v0/v0_1/l1.rs index 4da2b7b57a..b2fbf181c0 100644 --- a/types/src/v0/v0_1/l1.rs +++ b/types/src/v0/v0_1/l1.rs @@ -1,5 +1,6 @@ use crate::parse_duration; use async_broadcast::{InactiveReceiver, Sender}; +use async_lock::RwLock; use clap::Parser; use ethers::{ prelude::{H256, U256}, @@ -8,7 +9,11 @@ use ethers::{ use hotshot_types::traits::metrics::{Counter, Gauge, Metrics, NoMetrics}; use lru::LruCache; use serde::{Deserialize, Serialize}; -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{ + num::NonZeroUsize, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::{sync::Mutex, task::JoinHandle}; use url::Url; @@ -94,11 +99,39 @@ pub struct L1ClientOptions { )] pub subscription_timeout: Duration, + /// Fail over to another provider if the current provider fails twice within this window. + #[clap( + long, + env = "ESPRESSO_SEQUENCER_L1_FREQUENT_FAILURE_TOLERANCE", + default_value = "1m", + value_parser = parse_duration, + )] + pub l1_frequent_failure_tolerance: Duration, + + /// Fail over to another provider if the current provider fails many times in a row, within any + /// time window. + #[clap( + long, + env = "ESPRESSO_SEQUENCER_L1_CONSECUTIVE_FAILURE_TOLERANCE", + default_value = "10" + )] + pub l1_consecutive_failure_tolerance: usize, + + /// Amount of time to wait after receiving a 429 response before making more L1 RPC requests. + /// + /// If not set, the general l1-retry-delay will be used. + #[clap( + long, + env = "ESPRESSO_SEQUENCER_L1_RATE_LIMIT_DELAY", + value_parser = parse_duration, + )] + pub l1_rate_limit_delay: Option, + /// Separate provider to use for subscription feeds. /// /// Typically this would be a WebSockets endpoint while the main provider uses HTTP. - #[clap(long, env = "ESPRESSO_SEQUENCER_L1_WS_PROVIDER")] - pub ws_provider: Option, + #[clap(long, env = "ESPRESSO_SEQUENCER_L1_WS_PROVIDER", value_delimiter = ',')] + pub l1_ws_provider: Option>, #[clap(skip = Arc::>::new(Box::new(NoMetrics)))] pub metrics: Arc>, @@ -113,14 +146,8 @@ pub struct L1ClientOptions { /// easy to use a subscription instead of polling for new blocks, vastly reducing the number of L1 /// RPC calls we make. pub struct L1Client { - pub(crate) retry_delay: Duration, - pub(crate) subscription_timeout: Duration, /// `Provider` from `ethers-provider`. - pub(crate) provider: Arc>, - /// Provider to use for subscriptions, if different from `provider`. - pub(crate) ws_provider: Option, - /// Maximum number of L1 blocks that can be scanned for events in a single query. - pub(crate) events_max_block_range: u64, + pub(crate) provider: Arc>, /// Shared state updated by an asynchronous task which polls the L1. pub(crate) state: Arc>, /// Channel used by the async update task to send events to clients. @@ -155,3 +182,25 @@ pub(crate) struct L1ClientMetrics { pub(crate) finalized: Arc, pub(crate) reconnects: Arc, } + +/// An RPC client with multiple remote providers. +/// +/// This client utilizes one RPC provider at a time, but if it detects that the provider is in a +/// failing state, it will automatically switch to the next provider in its list. +#[derive(Clone, Debug)] +pub(crate) struct MultiRpcClient { + pub(crate) clients: Arc>, + pub(crate) status: Arc>, + pub(crate) failover_send: Sender<()>, + pub(crate) failover_recv: InactiveReceiver<()>, + pub(crate) opt: L1ClientOptions, +} + +/// The state of the current provider being used by a [`MultiRpcClient`]. +#[derive(Debug, Default)] +pub(crate) struct MultiRpcClientStatus { + pub(crate) client: usize, + pub(crate) last_failure: Option, + pub(crate) consecutive_failures: usize, + pub(crate) rate_limited_until: Option, +} diff --git a/types/src/v0/v0_3/mod.rs b/types/src/v0/v0_3/mod.rs index 751c38053d..f4d73f50a6 100644 --- a/types/src/v0/v0_3/mod.rs +++ b/types/src/v0/v0_3/mod.rs @@ -13,7 +13,9 @@ pub use super::v0_1::{ UpgradeType, ViewBasedUpgrade, BLOCK_MERKLE_TREE_HEIGHT, FEE_MERKLE_TREE_HEIGHT, NS_ID_BYTE_LEN, NS_OFFSET_BYTE_LEN, NUM_NSS_BYTE_LEN, NUM_TXS_BYTE_LEN, TX_OFFSET_BYTE_LEN, }; -pub(crate) use super::v0_1::{L1ClientMetrics, L1Event, L1State, L1UpdateTask}; +pub(crate) use super::v0_1::{ + L1ClientMetrics, L1Event, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus, +}; pub const VERSION: Version = Version { major: 0, minor: 3 };