diff --git a/sequencer/src/bin/nasty-client.rs b/sequencer/src/bin/nasty-client.rs index 75f205aa21..1db828f326 100644 --- a/sequencer/src/bin/nasty-client.rs +++ b/sequencer/src/bin/nasty-client.rs @@ -71,7 +71,7 @@ struct Options { /// `failed_actions` metric. #[clap( long, - env = "ESPRESS_NASTY_CLIENT_HTTP_TIMEOUT", + env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT", default_value = "30s", value_parser = parse_duration, )] @@ -134,6 +134,10 @@ struct ClientConfig { /// Time after which WebSockets connection failures are allowed. /// + /// The server is allowed to close connections which are idle for a certain amount of time. We + /// don't want to treat this as an error in the nasty client, as it is expected, and we should + /// simply reopen the stream. + /// /// If there is an error polling a WebSockets connection last used more recently than this /// duration, it is considered an error. If the connection is staler than this, it is only a /// warning, and the connection is automatically refreshed. @@ -554,7 +558,8 @@ impl ResourceManager { let (id, stream) = self.open_streams.iter_mut().nth(index).unwrap(); // Check if the next item is immediately available or if we're going to block. - if stream.stream.as_mut().peek().now_or_never().is_none() { + let will_block = stream.stream.as_mut().peek().now_or_never().is_none(); + if will_block { blocking += 1; if blocking > self.cfg.max_blocking_polls { tracing::info!("aborting poll_stream action; exceeded maximum blocking polls"); @@ -584,10 +589,20 @@ impl ResourceManager { }; match res { Ok(obj) => { - // Successfully polling a WebSockets connection should reset the connection - // timeout, so we don't expect errors from this connection in the near - // future. - stream.refreshed = Instant::now(); + if will_block { + // Successfully reading from a WebSockets stream should reset the idle + // conenection timeout, so we don't expect errors from this connection + // in the near future. Note that this applies only to reads which + // actually block. Reads which don't block may come directly from the + // local TCP buffer, and thus not generate any traffic on the idle TCP + // connection. + stream.refreshed = Instant::now(); + tracing::info!( + refreshed = ?stream.refreshed, + "{} stream refreshed due to blocking read", + Self::singular(), + ); + } break obj; } Err(err) if refreshed.elapsed() >= self.cfg.web_socket_timeout => { @@ -602,6 +617,11 @@ impl ResourceManager { .context(format!("subscribing to {} from {pos}", Self::plural()))?; stream.stream = Box::pin(conn.peekable()); stream.refreshed = Instant::now(); + tracing::info!( + refreshed = ?stream.refreshed, + "{} stream refreshed due to connection reset", + Self::singular(), + ); } Err(err) => { // Errors on a relatively fresh connection are not allowed. Close the stream