From 42c40568e75539e72c0aab20e3210e3b2d0bddb5 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Fri, 3 Jan 2025 13:59:29 -0800 Subject: [PATCH] More metrics for L1 client --- types/src/v0/impls/l1.rs | 41 ++++++++++++++++++++++++++++++++++------ types/src/v0/mod.rs | 3 ++- types/src/v0/v0_1/l1.rs | 15 ++++++++++++--- types/src/v0/v0_3/mod.rs | 3 ++- 4 files changed, 51 insertions(+), 11 deletions(-) diff --git a/types/src/v0/impls/l1.rs b/types/src/v0/impls/l1.rs index 2a77d3a35f..471b5bf494 100644 --- a/types/src/v0/impls/l1.rs +++ b/types/src/v0/impls/l1.rs @@ -19,7 +19,7 @@ use futures::{ future::{Future, FutureExt}, stream::{self, StreamExt}, }; -use hotshot_types::traits::metrics::Metrics; +use hotshot_types::traits::metrics::{CounterFamily, Metrics}; use lru::LruCache; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; @@ -34,7 +34,7 @@ use url::Url; use super::{ L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus, }; -use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot}; +use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Provider, L1Snapshot}; impl PartialOrd for L1BlockInfo { fn partial_cmp(&self, other: &Self) -> Option { @@ -121,6 +121,16 @@ impl L1ClientMetrics { reconnects: metrics .create_counter("stream_reconnects".into(), None) .into(), + failovers: metrics.create_counter("failovers".into(), None).into(), + } + } +} + +impl L1Provider { + fn new(url: Url, failures: &dyn CounterFamily) -> Self { + Self { + failures: failures.create(vec![url.to_string()]), + inner: Http::new(url), } } } @@ -134,12 +144,23 @@ impl MultiRpcClient { failover_send.set_await_active(false); failover_send.set_overflow(true); + let metrics = L1ClientMetrics::new(&**opt.metrics); + let failures = opt + .metrics + .counter_family("failed_requests".into(), vec!["provider".into()]); + Self { - clients: Arc::new(clients.into_iter().map(Http::new).collect()), + clients: Arc::new( + clients + .into_iter() + .map(|url| L1Provider::new(url, &*failures)) + .collect(), + ), status: Default::default(), failover_send, failover_recv: failover_recv.deactivate(), opt, + metrics, } } @@ -156,6 +177,7 @@ impl MultiRpcClient { status.rate_limited_until = None; status.last_failure = None; status.consecutive_failures = 0; + self.metrics.failovers.add(1); self.failover_send.broadcast_direct(()).await.ok(); } @@ -167,6 +189,10 @@ impl MultiRpcClient { fn options(&self) -> &L1ClientOptions { &self.opt } + + fn metrics(&self) -> &L1ClientMetrics { + &self.metrics + } } #[async_trait] @@ -202,6 +228,7 @@ impl JsonRpcClient for MultiRpcClient { Err(err) => { let t = Instant::now(); tracing::warn!(?t, method, ?params, "L1 client error: {err:#}"); + client.failures.add(1); // Keep track of failures, failing over to the next client if necessary. let mut status = self.status.write().await; @@ -250,7 +277,6 @@ impl JsonRpcClient for MultiRpcClient { impl L1Client { fn with_provider(mut provider: Provider) -> Self { let opt = provider.as_ref().options().clone(); - let metrics = L1ClientMetrics::new(&**opt.metrics); let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity); receiver.set_await_active(false); @@ -263,7 +289,6 @@ impl L1Client { sender, receiver: receiver.deactivate(), update_task: Default::default(), - metrics, } } @@ -302,7 +327,7 @@ impl L1Client { let subscription_timeout = opt.subscription_timeout; let state = self.state.clone(); let sender = self.sender.clone(); - let metrics = self.metrics.clone(); + let metrics = self.metrics().clone(); let span = tracing::warn_span!("L1 client update"); async move { @@ -716,6 +741,10 @@ impl L1Client { (*self.provider).as_ref().options() } + fn metrics(&self) -> &L1ClientMetrics { + (*self.provider).as_ref().metrics() + } + async fn retry_delay(&self) { sleep(self.options().l1_retry_delay).await; } diff --git a/types/src/v0/mod.rs b/types/src/v0/mod.rs index 5112820560..a3f6b8a019 100644 --- a/types/src/v0/mod.rs +++ b/types/src/v0/mod.rs @@ -124,7 +124,8 @@ reexport_unchanged_types!( ); pub(crate) use v0_3::{ - L1ClientMetrics, L1Event, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus, + L1ClientMetrics, L1Event, L1Provider, L1State, L1UpdateTask, MultiRpcClient, + MultiRpcClientStatus, }; #[derive( diff --git a/types/src/v0/v0_1/l1.rs b/types/src/v0/v0_1/l1.rs index b2fbf181c0..e7f87ade58 100644 --- a/types/src/v0/v0_1/l1.rs +++ b/types/src/v0/v0_1/l1.rs @@ -2,6 +2,7 @@ use crate::parse_duration; use async_broadcast::{InactiveReceiver, Sender}; use async_lock::RwLock; use clap::Parser; +use derive_more::Deref; use ethers::{ prelude::{H256, U256}, providers::{Http, Provider}, @@ -156,8 +157,6 @@ pub struct L1Client { pub(crate) receiver: InactiveReceiver, /// Async task which updates the shared state. pub(crate) update_task: Arc, - /// Metrics - pub(crate) metrics: L1ClientMetrics, } /// In-memory view of the L1 state, updated asynchronously. @@ -181,6 +180,7 @@ pub(crate) struct L1ClientMetrics { pub(crate) head: Arc, pub(crate) finalized: Arc, pub(crate) reconnects: Arc, + pub(crate) failovers: Arc, } /// An RPC client with multiple remote providers. @@ -189,11 +189,12 @@ pub(crate) struct L1ClientMetrics { /// failing state, it will automatically switch to the next provider in its list. #[derive(Clone, Debug)] pub(crate) struct MultiRpcClient { - pub(crate) clients: Arc>, + pub(crate) clients: Arc>, pub(crate) status: Arc>, pub(crate) failover_send: Sender<()>, pub(crate) failover_recv: InactiveReceiver<()>, pub(crate) opt: L1ClientOptions, + pub(crate) metrics: L1ClientMetrics, } /// The state of the current provider being used by a [`MultiRpcClient`]. @@ -204,3 +205,11 @@ pub(crate) struct MultiRpcClientStatus { pub(crate) consecutive_failures: usize, pub(crate) rate_limited_until: Option, } + +/// A single provider in a [`MultiRpcClient`]. +#[derive(Debug, Deref)] +pub(crate) struct L1Provider { + #[deref] + pub(crate) inner: Http, + pub(crate) failures: Box, +} diff --git a/types/src/v0/v0_3/mod.rs b/types/src/v0/v0_3/mod.rs index f4d73f50a6..65e1ab9102 100644 --- a/types/src/v0/v0_3/mod.rs +++ b/types/src/v0/v0_3/mod.rs @@ -14,7 +14,8 @@ pub use super::v0_1::{ NS_ID_BYTE_LEN, NS_OFFSET_BYTE_LEN, NUM_NSS_BYTE_LEN, NUM_TXS_BYTE_LEN, TX_OFFSET_BYTE_LEN, }; pub(crate) use super::v0_1::{ - L1ClientMetrics, L1Event, L1State, L1UpdateTask, MultiRpcClient, MultiRpcClientStatus, + L1ClientMetrics, L1Event, L1Provider, L1State, L1UpdateTask, MultiRpcClient, + MultiRpcClientStatus, }; pub const VERSION: Version = Version { major: 0, minor: 3 };