Skip to content

Commit

Permalink
Fix WebSocket timeout handling in nasty-client (#1512)
Browse files Browse the repository at this point in the history
The server might close an idle connection after 60s. This should not be
considered an error. Currently, though, it is causing errors, because
our caluclation of idle time is not correct. We are using time since
last read, but a read doesn't necessarily mean traffic over the
connection. We might be reading a message that was sent long ago and has
been sitting in our TCP buffer while the connection has been idle.

To more accurately gauge how long the connection has been idle, we use
time since the last _blocking_ read. Thus, we are less likely to produce
an error, and more likely to just chalk ConnectionReset errors up to a
stale stream and refresh the stream and continue.
  • Loading branch information
jbearer authored Jun 7, 2024
2 parents ee94100 + 7c89f28 commit ed1350a
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions sequencer/src/bin/nasty-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct Options {
/// `failed_actions` metric.
#[clap(
long,
env = "ESPRESS_NASTY_CLIENT_HTTP_TIMEOUT",
env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT",
default_value = "30s",
value_parser = parse_duration,
)]
Expand Down Expand Up @@ -134,6 +134,10 @@ struct ClientConfig {

/// Time after which WebSockets connection failures are allowed.
///
/// The server is allowed to close connections which are idle for a certain amount of time. We
/// don't want to treat this as an error in the nasty client, as it is expected, and we should
/// simply reopen the stream.
///
/// If there is an error polling a WebSockets connection last used more recently than this
/// duration, it is considered an error. If the connection is staler than this, it is only a
/// warning, and the connection is automatically refreshed.
Expand Down Expand Up @@ -554,7 +558,8 @@ impl<T: Queryable> ResourceManager<T> {
let (id, stream) = self.open_streams.iter_mut().nth(index).unwrap();

// Check if the next item is immediately available or if we're going to block.
if stream.stream.as_mut().peek().now_or_never().is_none() {
let will_block = stream.stream.as_mut().peek().now_or_never().is_none();
if will_block {
blocking += 1;
if blocking > self.cfg.max_blocking_polls {
tracing::info!("aborting poll_stream action; exceeded maximum blocking polls");
Expand Down Expand Up @@ -584,10 +589,20 @@ impl<T: Queryable> ResourceManager<T> {
};
match res {
Ok(obj) => {
// Successfully polling a WebSockets connection should reset the connection
// timeout, so we don't expect errors from this connection in the near
// future.
stream.refreshed = Instant::now();
if will_block {
// Successfully reading from a WebSockets stream should reset the idle
// conenection timeout, so we don't expect errors from this connection
// in the near future. Note that this applies only to reads which
// actually block. Reads which don't block may come directly from the
// local TCP buffer, and thus not generate any traffic on the idle TCP
// connection.
stream.refreshed = Instant::now();
tracing::info!(
refreshed = ?stream.refreshed,
"{} stream refreshed due to blocking read",
Self::singular(),
);
}
break obj;
}
Err(err) if refreshed.elapsed() >= self.cfg.web_socket_timeout => {
Expand All @@ -602,6 +617,11 @@ impl<T: Queryable> ResourceManager<T> {
.context(format!("subscribing to {} from {pos}", Self::plural()))?;
stream.stream = Box::pin(conn.peekable());
stream.refreshed = Instant::now();
tracing::info!(
refreshed = ?stream.refreshed,
"{} stream refreshed due to connection reset",
Self::singular(),
);
}
Err(err) => {
// Errors on a relatively fresh connection are not allowed. Close the stream
Expand Down

0 comments on commit ed1350a

Please sign in to comment.