From 3ca92b7b43bd98042e3dd5bea9dcb44920dede4f Mon Sep 17 00:00:00 2001 From: Martin Magnus Date: Fri, 3 Jan 2025 12:28:53 +0100 Subject: [PATCH] only mark tokens as unsupported based on metrics for a limited time (#3205) # Description Currently the bad token detection based on metrics will mark tokens as unsupported forever. This is problematic for tokens which only have issues temporarily. For example this can happen when the most important pool for a token gets into a weird state or when a token gets paused or a while. # Changes Adjusts the logic to freeze tokens for a configurable period of time. Once the freeze period is over we give the token another chance (even if the stats indicate that it's currently unsupported). To not run into issues when a token is always bad the logic was built such that 1 more `bad` measurement is enough to freeze the token again. That way we can safely configure a very high `min_measurements` without having periods where a token that was flagged as bad can issues again because we need to get a lot new measurements to mark it as unsupported again. Additionally the PR simplifies how the metrics based bad token detector gets instantiated and gives each solver their completely separate instance (how it was originally communicated because each solver may support different tokens). ## How to test added a unit test --- .../domain/competition/bad_tokens/metrics.rs | 130 ++++++++++++++---- .../src/domain/competition/bad_tokens/mod.rs | 2 +- crates/driver/src/infra/api/mod.rs | 5 +- crates/driver/src/infra/config/file/load.rs | 3 + crates/driver/src/infra/config/file/mod.rs | 13 ++ crates/driver/src/infra/solver/mod.rs | 3 +- 6 files changed, 121 insertions(+), 35 deletions(-) diff --git a/crates/driver/src/domain/competition/bad_tokens/metrics.rs b/crates/driver/src/domain/competition/bad_tokens/metrics.rs index 50917c6d53..43e444dca2 100644 --- a/crates/driver/src/domain/competition/bad_tokens/metrics.rs +++ b/crates/driver/src/domain/competition/bad_tokens/metrics.rs @@ -1,23 +1,18 @@ -use {super::Quality, crate::domain::eth, dashmap::DashMap, std::sync::Arc}; +use { + super::Quality, + crate::domain::eth, + dashmap::DashMap, + std::{ + sync::Arc, + time::{Duration, Instant}, + }, +}; -#[derive(Default)] +#[derive(Default, Debug)] struct TokenStatistics { attempts: u32, fails: u32, -} - -#[derive(Default, Clone)] -pub struct DetectorBuilder(Arc>); - -impl DetectorBuilder { - pub fn build(self, failure_ratio: f64, required_measurements: u32, log_only: bool) -> Detector { - Detector { - failure_ratio, - required_measurements, - counter: self.0, - log_only, - } - } + flagged_unsupported_at: Option, } /// Monitors tokens to determine whether they are considered "unsupported" based @@ -31,19 +26,46 @@ pub struct Detector { required_measurements: u32, counter: Arc>, log_only: bool, + token_freeze_time: Duration, } impl Detector { - pub fn get_quality(&self, token: ð::TokenAddress) -> Option { - let measurements = self.counter.get(token)?; - let is_unsupported = self.is_unsupported(&measurements); + pub fn new( + failure_ratio: f64, + required_measurements: u32, + log_only: bool, + token_freeze_time: Duration, + ) -> Self { + Self { + failure_ratio, + required_measurements, + counter: Default::default(), + log_only, + token_freeze_time, + } + } + + pub fn get_quality(&self, token: ð::TokenAddress, now: Instant) -> Option { + let stats = self.counter.get(token)?; + if stats + .flagged_unsupported_at + .is_some_and(|t| now.duration_since(t) > self.token_freeze_time) + { + // Sometimes tokens only cause issues temporarily. If the token's freeze + // period expired we give it another chance to see if it still behaves badly. + return None; + } + + let is_unsupported = self.stats_indicate_unsupported(&stats); (!self.log_only && is_unsupported).then_some(Quality::Unsupported) } - fn is_unsupported(&self, measurements: &TokenStatistics) -> bool { - let token_failure_ratio = measurements.fails as f64 / measurements.attempts as f64; - measurements.attempts >= self.required_measurements - && token_failure_ratio >= self.failure_ratio + fn stats_indicate_unsupported(&self, stats: &TokenStatistics) -> bool { + let token_failure_ratio = match stats.attempts { + 0 => return false, + attempts => f64::from(stats.fails) / f64::from(attempts), + }; + stats.attempts >= self.required_measurements && token_failure_ratio >= self.failure_ratio } /// Updates the tokens that participated in settlements by @@ -54,32 +76,80 @@ impl Detector { token_pairs: &[(eth::TokenAddress, eth::TokenAddress)], failure: bool, ) { - let mut unsupported_tokens = vec![]; + let now = Instant::now(); + let mut new_unsupported_tokens = vec![]; token_pairs .iter() .flat_map(|(token_a, token_b)| [token_a, token_b]) .for_each(|token| { - let measurement = self + let mut stats = self .counter .entry(*token) .and_modify(|counter| { counter.attempts += 1; - counter.fails += u32::from(failure) + counter.fails += u32::from(failure); }) .or_insert_with(|| TokenStatistics { attempts: 1, fails: u32::from(failure), + flagged_unsupported_at: None, }); - if self.is_unsupported(&measurement) { - unsupported_tokens.push(token); + + // token neeeds to be frozen as unsupported for a while + if self.stats_indicate_unsupported(&stats) + && stats + .flagged_unsupported_at + .is_none_or(|t| now.duration_since(t) > self.token_freeze_time) + { + new_unsupported_tokens.push(token); + stats.flagged_unsupported_at = Some(now); } }); - if !unsupported_tokens.is_empty() { + if !new_unsupported_tokens.is_empty() { tracing::debug!( - tokens = ?unsupported_tokens, + tokens = ?new_unsupported_tokens, "mark tokens as unsupported" ); } } } + +#[cfg(test)] +mod tests { + use {super::*, ethcontract::H160}; + + /// Tests that a token only gets marked temporarily as unsupported. + /// After the freeze period it will be allowed again. + #[tokio::test] + async fn unfreeze_bad_tokens() { + const FREEZE_DURATION: Duration = Duration::from_millis(50); + let detector = Detector::new(0.5, 2, false, FREEZE_DURATION); + + let token_a = eth::TokenAddress(eth::ContractAddress(H160([1; 20]))); + let token_b = eth::TokenAddress(eth::ContractAddress(H160([2; 20]))); + + // token is reported as supported while we don't have enough measurements + assert_eq!(detector.get_quality(&token_a, Instant::now()), None); + detector.update_tokens(&[(token_a, token_b)], true); + assert_eq!(detector.get_quality(&token_a, Instant::now()), None); + detector.update_tokens(&[(token_a, token_b)], true); + + // after we got enough measurements the token gets marked as bad + assert_eq!( + detector.get_quality(&token_a, Instant::now()), + Some(Quality::Unsupported) + ); + + // after the freeze period is over the token gets reported as good again + tokio::time::sleep(FREEZE_DURATION).await; + assert_eq!(detector.get_quality(&token_a, Instant::now()), None); + + // after an unfreeze another bad measurement is enough to freeze it again + detector.update_tokens(&[(token_a, token_b)], true); + assert_eq!( + detector.get_quality(&token_a, Instant::now()), + Some(Quality::Unsupported) + ); + } +} diff --git a/crates/driver/src/domain/competition/bad_tokens/mod.rs b/crates/driver/src/domain/competition/bad_tokens/mod.rs index ffa1578425..d724bea442 100644 --- a/crates/driver/src/domain/competition/bad_tokens/mod.rs +++ b/crates/driver/src/domain/competition/bad_tokens/mod.rs @@ -132,7 +132,7 @@ impl Detector { } if let Some(metrics) = &self.metrics { - return metrics.get_quality(&token); + return metrics.get_quality(&token, now); } None diff --git a/crates/driver/src/infra/api/mod.rs b/crates/driver/src/infra/api/mod.rs index 3047c796a6..de7897cfa8 100644 --- a/crates/driver/src/infra/api/mod.rs +++ b/crates/driver/src/infra/api/mod.rs @@ -58,8 +58,6 @@ impl Api { app = routes::metrics(app); app = routes::healthz(app); - let metrics_bad_token_detector_builder = bad_tokens::metrics::DetectorBuilder::default(); - // Multiplex each solver as part of the API. Multiple solvers are multiplexed // on the same driver so only one liquidity collector collects the liquidity // for all of them. This is important because liquidity collection is @@ -81,10 +79,11 @@ impl Api { } if bad_token_config.enable_metrics_strategy { - bad_tokens.with_metrics_detector(metrics_bad_token_detector_builder.clone().build( + bad_tokens.with_metrics_detector(bad_tokens::metrics::Detector::new( bad_token_config.metrics_strategy_failure_ratio, bad_token_config.metrics_strategy_required_measurements, bad_token_config.metrics_strategy_log_only, + bad_token_config.metrics_strategy_token_freeze_time, )); } diff --git a/crates/driver/src/infra/config/file/load.rs b/crates/driver/src/infra/config/file/load.rs index 9c19aad70a..0c3e2b61f7 100644 --- a/crates/driver/src/infra/config/file/load.rs +++ b/crates/driver/src/infra/config/file/load.rs @@ -124,6 +124,9 @@ pub async fn load(chain: Chain, path: &Path) -> infra::Config { .bad_token_detection .metrics_strategy_required_measurements, metrics_strategy_log_only: config.bad_token_detection.metrics_strategy_log_only, + metrics_strategy_token_freeze_time: config + .bad_token_detection + .metrics_strategy_token_freeze_time, }, settle_queue_size: config.settle_queue_size, } diff --git a/crates/driver/src/infra/config/file/mod.rs b/crates/driver/src/infra/config/file/mod.rs index 5677e60be7..39ed135509 100644 --- a/crates/driver/src/infra/config/file/mod.rs +++ b/crates/driver/src/infra/config/file/mod.rs @@ -716,6 +716,15 @@ pub struct BadTokenDetectionConfig { rename = "metrics-bad-token-detection-log-only" )] pub metrics_strategy_log_only: bool, + + /// How long the metrics based bad token detection should flag a token as + /// unsupported before it allows to solve for that token again. + #[serde( + default = "default_metrics_bad_token_detector_freeze_time", + rename = "metrics-bad-token-detection-token-freeze-time", + with = "humantime_serde" + )] + pub metrics_strategy_token_freeze_time: Duration, } impl Default for BadTokenDetectionConfig { @@ -742,3 +751,7 @@ fn default_settle_queue_size() -> usize { fn default_metrics_bad_token_detector_log_only() -> bool { true } + +fn default_metrics_bad_token_detector_freeze_time() -> Duration { + Duration::from_secs(60 * 10) +} diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index cc0802a5a2..1b361b1c0b 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -22,7 +22,7 @@ use { derive_more::{From, Into}, num::BigRational, reqwest::header::HeaderName, - std::collections::HashMap, + std::{collections::HashMap, time::Duration}, tap::TapFallible, thiserror::Error, tracing::Instrument, @@ -317,4 +317,5 @@ pub struct BadTokenDetection { pub metrics_strategy_failure_ratio: f64, pub metrics_strategy_required_measurements: u32, pub metrics_strategy_log_only: bool, + pub metrics_strategy_token_freeze_time: Duration, }