diff --git a/.env b/.env index 1ba174c83e..93b97db065 100644 --- a/.env +++ b/.env @@ -119,9 +119,6 @@ ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT=25 # Query service stress test ESPRESSO_NASTY_CLIENT_PORT=24011 -# Query service stress test -ESPRESSO_NASTY_CLIENT_PORT=24011 - # Openzeppelin Defender Deployment Profile DEFENDER_KEY= DEFENDER_SECRET= diff --git a/sequencer/api/public-env-vars.toml b/sequencer/api/public-env-vars.toml index 00661ba674..8fc6f71e59 100644 --- a/sequencer/api/public-env-vars.toml +++ b/sequencer/api/public-env-vars.toml @@ -24,7 +24,8 @@ variables = [ "ESPRESSO_COMMITMENT_TASK_PORT", "ESPRESSO_COMMITMENT_TASK_REQUEST_TIMEOUT", "ESPRESSO_DEPLOYER_OUT_PATH", - "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT", + "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_ERROR", + "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_WARNING", "ESPRESSO_NASTY_CLIENT_MAX_BLOCKING_POLLS", "ESPRESSO_NASTY_CLIENT_MAX_OPEN_STREAMS", "ESPRESSO_NASTY_CLIENT_MAX_RETRIES", diff --git a/sequencer/src/bin/nasty-client.rs b/sequencer/src/bin/nasty-client.rs index 1db828f326..cedf882ddc 100644 --- a/sequencer/src/bin/nasty-client.rs +++ b/sequencer/src/bin/nasty-client.rs @@ -32,7 +32,7 @@ use hotshot_query_service::{ node::TimeWindowQueryData, }; use hotshot_types::{ - traits::metrics::{Counter, Gauge, Metrics as _}, + traits::metrics::{Counter, Gauge, Histogram, Metrics as _}, vid::{vid_scheme, VidSchemeType}, }; use jf_merkle_tree::{ @@ -56,7 +56,7 @@ use std::{ time::{Duration, Instant}, }; use strum::{EnumDiscriminants, VariantArray}; -use surf_disco::{error::ClientError, socket, Url}; +use surf_disco::{error::ClientError, socket, Error, StatusCode, Url}; use tide_disco::{error::ServerError, App}; use time::OffsetDateTime; use toml::toml; @@ -65,18 +65,6 @@ use tracing::info_span; /// An adversarial stress test for sequencer APIs. #[derive(Clone, Debug, Parser)] struct Options { - /// Timeout for HTTP requests. - /// - /// Requests that take longer than this will fail, causing an error log and an increment of the - /// `failed_actions` metric. - #[clap( - long, - env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT", - default_value = "30s", - value_parser = parse_duration, - )] - http_timeout: Duration, - /// Port on which to serve the nasty-client API. #[clap( short, @@ -99,6 +87,30 @@ struct Options { #[derive(Clone, Copy, Debug, Parser)] struct ClientConfig { + /// Timeout for HTTP requests. + /// + /// Requests that take longer than this will fail, causing an error log and an increment of the + /// `failed_actions` metric. + #[clap( + long, + env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_ERROR", + default_value = "5s", + value_parser = parse_duration, + )] + http_timeout_error: Duration, + + /// Timeout for issuing a warning due to slow HTTP requests. + /// + /// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR will not generate an + /// error but will output a warning and increment a counter of slow HTTP requests. + #[clap( + long, + env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_WARNING", + default_value = "1s", + value_parser = parse_duration, + )] + http_timeout_warning: Duration, + /// The maximum number of open WebSockets connections for each resource type at any time. #[clap( long, @@ -239,6 +251,8 @@ struct Metrics { query_namespace_actions: Box, query_block_state_actions: Box, query_fee_state_actions: Box, + slow_requests: Box, + request_latency: Box, } impl Metrics { @@ -309,6 +323,12 @@ impl Metrics { .create_counter("query_block_state_actions".into(), None), query_fee_state_actions: registry .create_counter("query_fee_state_actions".into(), None), + slow_requests: registry + .subgroup("http".into()) + .create_counter("slow_requests".into(), None), + request_latency: registry + .subgroup("http".into()) + .create_histogram("latency".into(), Some("s".into())), } } } @@ -319,43 +339,69 @@ trait Queryable: DeserializeOwned + Debug + Eq { /// URL segment used to indicate that we want to fetch this resource by block hash. const HASH_URL_SEGMENT: &'static str; + /// URL segment used to indicate that we want to fetch this resource by payload hash. + /// + /// This may be none if the resource does not support fetching by payload hash. + const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str>; + fn hash(&self) -> String; + fn payload_hash(&self) -> String; } impl Queryable for BlockQueryData { const RESOURCE: Resource = Resource::Blocks; const HASH_URL_SEGMENT: &'static str = "hash"; + const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("payload-hash"); fn hash(&self) -> String { self.hash().to_string() } + + fn payload_hash(&self) -> String { + self.payload_hash().to_string() + } } impl Queryable for LeafQueryData { const RESOURCE: Resource = Resource::Leaves; const HASH_URL_SEGMENT: &'static str = "hash"; + const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = None; fn hash(&self) -> String { self.hash().to_string() } + + fn payload_hash(&self) -> String { + self.payload_hash().to_string() + } } impl Queryable for Header { const RESOURCE: Resource = Resource::Headers; const HASH_URL_SEGMENT: &'static str = "hash"; + const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("payload-hash"); fn hash(&self) -> String { self.commit().to_string() } + + fn payload_hash(&self) -> String { + self.payload_commitment.to_string() + } } impl Queryable for PayloadQueryData { const RESOURCE: Resource = Resource::Payloads; const HASH_URL_SEGMENT: &'static str = "block-hash"; + const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("hash"); fn hash(&self) -> String { self.block_hash().to_string() } + + fn payload_hash(&self) -> String { + self.hash().to_string() + } } type Connection = socket::Connection; @@ -382,7 +428,7 @@ impl ResourceManager { fn new(opt: &Options, metrics: Arc) -> Self { Self { client: surf_disco::Client::builder(opt.url.clone()) - .set_timeout(Some(opt.http_timeout)) + .set_timeout(Some(opt.client_config.http_timeout_error)) .build(), open_streams: BTreeMap::new(), next_stream_id: 0, @@ -451,17 +497,43 @@ impl ResourceManager { } } + /// Send an HTTP GET request and deserialize the response. + /// + /// This method is a wrapper around `self.client.get()`, which adds instrumentation and metrics + /// for request latency. + async fn get(&self, path: impl Into) -> anyhow::Result { + let path = path.into(); + tracing::debug!("-> GET {path}"); + + let start = Instant::now(); + let res = self.client.get::(&path).send().await; + let elapsed = start.elapsed(); + + let status = match &res { + Ok(_) => StatusCode::Ok, + Err(err) => err.status(), + }; + tracing::debug!("<- GET {path} {} ({elapsed:?})", u16::from(status)); + + self.metrics + .request_latency + .add_point((elapsed.as_millis() as f64) / 1000.); + if elapsed >= self.cfg.http_timeout_warning { + self.metrics.slow_requests.add(1); + tracing::warn!(%path, ?elapsed, "slow request"); + } + + res.context(format!("GET {path}")) + } + async fn query(&self, at: u64) -> anyhow::Result<()> { let at = self.adjust_index(at).await?; let obj = self .retry( info_span!("query", resource = Self::singular(), at), || async { - self.client - .get::(&format!("availability/{}/{at}", Self::singular())) - .send() + self.get::(format!("availability/{}/{at}", Self::singular())) .await - .context(format!("fetching {} {at}", Self::singular())) }, ) .await?; @@ -472,15 +544,12 @@ impl ResourceManager { .retry( info_span!("query by hash", resource = Self::singular(), at, hash), || async { - self.client - .get(&format!( - "availability/{}/{}/{hash}", - Self::singular(), - T::HASH_URL_SEGMENT, - )) - .send() - .await - .context(format!("fetching {} {hash}", Self::singular())) + self.get(format!( + "availability/{}/{}/{hash}", + Self::singular(), + T::HASH_URL_SEGMENT, + )) + .await }, ) .await?; @@ -492,6 +561,37 @@ impl ResourceManager { ) ); + // Query by payload hash and check consistency. + if let Some(segment) = T::PAYLOAD_HASH_URL_SEGMENT { + let payload_hash = obj.payload_hash(); + let by_payload_hash = self + .retry( + info_span!( + "query by payload hash", + resource = Self::singular(), + at, + payload_hash + ), + || async { + self.get::(format!( + "availability/{}/{segment}/{payload_hash}", + Self::singular(), + )) + .await + }, + ) + .await?; + // We might not get the exact object this time, due to non-uniqueness of payloads, but we + // should get an object with the same payload. + ensure!( + payload_hash == by_payload_hash.payload_hash(), + format!( + "query for {} {at} by payload hash {payload_hash} is not consistent", + Self::singular() + ) + ); + } + self.metrics.query_actions[&T::RESOURCE].add(1); Ok(()) } @@ -597,7 +697,7 @@ impl ResourceManager { // local TCP buffer, and thus not generate any traffic on the idle TCP // connection. stream.refreshed = Instant::now(); - tracing::info!( + tracing::debug!( refreshed = ?stream.refreshed, "{} stream refreshed due to blocking read", Self::singular(), @@ -642,11 +742,8 @@ impl ResourceManager { let id = *id; let expected = self .retry(info_span!("fetching expected object"), || async { - self.client - .get(&format!("availability/{}/{pos}", Self::singular())) - .send() + self.get(format!("availability/{}/{pos}", Self::singular())) .await - .context(format!("fetching {} {pos}", Self::singular())) }) .await?; ensure!( @@ -663,12 +760,7 @@ impl ResourceManager { async fn adjust_index(&self, at: u64) -> anyhow::Result { let block_height = loop { - let block_height: u64 = self - .client - .get("status/block-height") - .send() - .await - .context("getting block height")?; + let block_height: u64 = self.get("status/block-height").await?; if block_height == 0 { // None of our tests work with an empty history, but if we just wait briefly there // should be some blocks produced soon. @@ -692,13 +784,10 @@ impl ResourceManager
{ .retry( info_span!("timestamp window", resource = Self::singular(), start, end), || async { - self.client - .get::>(&format!( - "node/header/window/{start}/{end}" - )) - .send() - .await - .context(format!("fetching timestamp window from {start} to {end}")) + self.get::>(format!( + "node/header/window/{start}/{end}" + )) + .await }, ) .await?; @@ -769,33 +858,24 @@ impl ResourceManager
{ // going to look up from the Merkle tree, so we can later verify our results. let block_header = self .retry(info_span!("get block header", block), || async { - self.client - .get::
(&format!("availability/header/{block}")) - .send() + self.get::
(format!("availability/header/{block}")) .await - .context(format!("getting header {block}")) }) .await?; let index_header = self .retry(info_span!("get index header", index), || async { - self.client - .get::
(&format!("availability/header/{index}")) - .send() + self.get::
(format!("availability/header/{index}")) .await - .context(format!("getting header {index}")) }) .await?; // Get a Merkle proof for the block commitment at position `index` from state `block`. let proof = self .retry(info_span!("get block proof", block, index), || async { - self.client - .get::<::MembershipProof>(&format!( - "block-state/{block}/{index}" - )) - .send() - .await - .context(format!("getting merkle proof {block},{index}")) + self.get::<::MembershipProof>(format!( + "block-state/{block}/{index}" + )) + .await }) .await?; @@ -822,17 +902,11 @@ impl ResourceManager
{ commitment = %block_header.block_merkle_tree_root, ), || async { - self.client - .get::<::MembershipProof>(&format!( - "block-state/commit/{}/{index}", - block_header.block_merkle_tree_root, - )) - .send() - .await - .context(format!( - "getting merkle proof {},{index}", - block_header.block_merkle_tree_root - )) + self.get::<::MembershipProof>(format!( + "block-state/commit/{}/{index}", + block_header.block_merkle_tree_root, + )) + .await }, ) .await?; @@ -853,11 +927,8 @@ impl ResourceManager
{ // query. let builder_header = self .retry(info_span!("get builder header", builder), || async { - self.client - .get::
(&format!("availability/header/{builder}")) - .send() + self.get::
(format!("availability/header/{builder}")) .await - .context(format!("getting header {builder}")) }) .await?; let builder_address = builder_header.fee_info.account(); @@ -866,11 +937,8 @@ impl ResourceManager
{ // results. let block_header = self .retry(info_span!("get block header", block), || async { - self.client - .get::
(&format!("availability/header/{block}")) - .send() + self.get::
(format!("availability/header/{block}")) .await - .context(format!("getting header {block}")) }) .await?; @@ -879,13 +947,10 @@ impl ResourceManager
{ .retry( info_span!("get account proof", block, %builder_address), || async { - self.client - .get::<::MembershipProof>(&format!( - "fee-state/{block}/{builder_address}" - )) - .send() - .await - .context(format!("getting merkle proof {block},{builder_address}")) + self.get::<::MembershipProof>(&format!( + "fee-state/{block}/{builder_address}" + )) + .await }, ) .await?; @@ -919,17 +984,11 @@ impl ResourceManager
{ commitment = %block_header.fee_merkle_tree_root, ), || async { - self.client - .get::<::MembershipProof>(&format!( - "fee-state/commit/{}/{builder_address}", - block_header.fee_merkle_tree_root, - )) - .send() - .await - .context(format!( - "getting merkle proof {},{builder_address}", - block_header.fee_merkle_tree_root - )) + self.get::<::MembershipProof>(format!( + "fee-state/commit/{}/{builder_address}", + block_header.fee_merkle_tree_root, + )) + .await }, ) .await?; @@ -953,11 +1012,7 @@ impl ResourceManager> { // the namespace table. let header: Header = self .retry(info_span!("fetch header"), || async { - self.client - .get(&format!("availability/header/{block}")) - .send() - .await - .context(format!("fetching header {block}")) + self.get(format!("availability/header/{block}")).await }) .await?; if header.ns_table.is_empty() { @@ -968,22 +1023,15 @@ impl ResourceManager> { let ns_proof: NamespaceProofQueryData = self .retry(info_span!("fetch namespace", %ns), || async { - self.client - .get(&format!("availability/block/{block}/namespace/{ns}")) - .send() + self.get(format!("availability/block/{block}/namespace/{ns}")) .await - .context(format!("fetching namespace {block}:{ns}")) }) .await?; // Verify proof. let vid_common: VidCommonQueryData = self .retry(info_span!("fetch VID common"), || async { - self.client - .get(&format!("availability/vid/common/{block}")) - .send() - .await - .context(format!("fetching VID common {block}")) + self.get(format!("availability/vid/common/{block}")).await }) .await?; let vid = vid_scheme(VidSchemeType::get_num_storage_nodes(vid_common.common()) as usize); @@ -1215,6 +1263,10 @@ async fn main() { let metrics = PrometheusMetrics::default(); let total_actions = metrics.create_counter("total_actions".into(), None); let failed_actions = metrics.create_counter("failed_actions".into(), None); + metrics + .subgroup("http".into()) + .create_gauge("slow_request_threshold".into(), Some("s".into())) + .set(opt.client_config.http_timeout_warning.as_secs() as usize); let mut client = Client::new(&opt, &metrics); let mut rng = rand::thread_rng();