Skip to content

Commit

Permalink
Implement server side connection limit
Browse files Browse the repository at this point in the history
This change implements a configurable limit on the number of simultaneous
HTTP connections allowed by a query server. This should allow us to
efficiently filter out requests during periods of high load so that
the burden is pushed back onto clients, and the server continues
operating within normal limits. This in turn will allow us to turn off
or scale back the AWS rate limiter.
  • Loading branch information
jbearer committed Jun 4, 2024
1 parent 490c81a commit bf2dd4a
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 44 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ ESPRESSO_SEQUENCER2_API_PORT=24002
ESPRESSO_SEQUENCER3_API_PORT=24003
ESPRESSO_SEQUENCER4_API_PORT=24004
ESPRESSO_SEQUENCER_URL=http://sequencer0:${ESPRESSO_SEQUENCER_API_PORT}
ESPRESSO_SEQUENCER_MAX_CONNECTIONS=25
ESPRESSO_SEQUENCER_STORAGE_PATH=/store/sequencer
ESPRESSO_SEQUENCER_GENESIS_FILE=/genesis/demo.toml
ESPRESSO_SEQUENCER_L1_PORT=8545
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ services:
- ESPRESSO_SEQUENCER_ORCHESTRATOR_URL
- ESPRESSO_SEQUENCER_CDN_ENDPOINT
- ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_MAX_CONNECTIONS
- ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT
- ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer1:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_POSTGRES_HOST=sequencer-db-0
Expand Down Expand Up @@ -265,6 +266,7 @@ services:
- ESPRESSO_SEQUENCER_ORCHESTRATOR_URL
- ESPRESSO_SEQUENCER_CDN_ENDPOINT
- ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_MAX_CONNECTIONS
- ESPRESSO_SEQUENCER_API_PEERS=http://sequencer2:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer2:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_POSTGRES_HOST=sequencer-db-1
Expand Down Expand Up @@ -311,6 +313,7 @@ services:
- ESPRESSO_SEQUENCER_ORCHESTRATOR_URL
- ESPRESSO_SEQUENCER_CDN_ENDPOINT
- ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_MAX_CONNECTIONS
- ESPRESSO_SEQUENCER_API_PEERS=http://sequencer1:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer3:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_L1_PROVIDER
Expand Down Expand Up @@ -350,6 +353,7 @@ services:
- ESPRESSO_SEQUENCER_ORCHESTRATOR_URL
- ESPRESSO_SEQUENCER_CDN_ENDPOINT
- ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_MAX_CONNECTIONS
- ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer4:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_L1_PROVIDER
- ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE
Expand Down Expand Up @@ -387,6 +391,7 @@ services:
- ESPRESSO_SEQUENCER_ORCHESTRATOR_URL
- ESPRESSO_SEQUENCER_CDN_ENDPOINT
- ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_MAX_CONNECTIONS
- ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer0:$ESPRESSO_SEQUENCER_API_PORT
- ESPRESSO_SEQUENCER_L1_PROVIDER
- ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE
Expand Down
1 change: 1 addition & 0 deletions sequencer/api/public-env-vars.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ variables = [
"ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE",
"ESPRESSO_SEQUENCER_LIBP2P_ADVERTISE_ADDRESS",
"ESPRESSO_SEQUENCER_LIBP2P_BIND_ADDRESS",
"ESPRESSO_SEQUENCER_MAX_CONNECTIONS",
"ESPRESSO_SEQUENCER_ORCHESTRATOR_URL",
"ESPRESSO_SEQUENCER_POSTGRES_DATABASE",
"ESPRESSO_SEQUENCER_POSTGRES_HOST",
Expand Down
25 changes: 11 additions & 14 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ pub mod test_helpers {
let url = format!("http://localhost:{port}").parse().unwrap();
let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = opt(Options::from(options::Http { port }).status(Default::default()));
let options = opt(Options::with_port(port).status(Default::default()));
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let _network =
Expand Down Expand Up @@ -506,7 +506,7 @@ pub mod test_helpers {
let url = format!("http://localhost:{port}").parse().unwrap();
let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = opt(Options::from(options::Http { port }).submit(Default::default()));
let options = opt(Options::with_port(port).submit(Default::default()));
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let network =
Expand Down Expand Up @@ -538,7 +538,7 @@ pub mod test_helpers {
let url = format!("http://localhost:{port}").parse().unwrap();
let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = opt(Options::from(options::Http { port }));
let options = opt(Options::with_port(port));
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let network =
Expand Down Expand Up @@ -577,7 +577,7 @@ pub mod test_helpers {
let url = format!("http://localhost:{port}").parse().unwrap();
let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = opt(Options::from(options::Http { port }).catchup(Default::default()));
let options = opt(Options::with_port(port).catchup(Default::default()));
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let network =
Expand Down Expand Up @@ -715,7 +715,7 @@ mod api_tests {
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let network = TestNetwork::new(
D::options(&storage, options::Http { port }.into()).submit(Default::default()),
D::options(&storage, Options::with_port(port)).submit(Default::default()),
[no_storage::Options; TestConfig::NUM_NODES],
l1,
)
Expand Down Expand Up @@ -810,10 +810,7 @@ mod api_tests {

let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = Options::from(options::Http {
port: query_service_port,
})
.hotshot_events(hotshot_events);
let options = Options::with_port(query_service_port).hotshot_events(hotshot_events);

let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
Expand Down Expand Up @@ -893,7 +890,7 @@ mod test {
let port = pick_unused_port().expect("No ports free");
let url = format!("http://localhost:{port}").parse().unwrap();
let client: Client<ServerError, SequencerVersion> = Client::new(url);
let options = Options::from(options::Http { port });
let options = Options::with_port(port);
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let _network =
Expand Down Expand Up @@ -934,7 +931,7 @@ mod test {
let storage = SqlDataSource::create_storage().await;
let options = SqlDataSource::options(
&storage,
Options::from(options::Http { port })
Options::with_port(port)
.state(Default::default())
.status(Default::default()),
);
Expand Down Expand Up @@ -1004,7 +1001,7 @@ mod test {
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let mut network = TestNetwork::with_state(
Options::from(options::Http { port }).catchup(Default::default()),
Options::with_port(port).catchup(Default::default()),
Default::default(),
[no_storage::Options; TestConfig::NUM_NODES],
std::array::from_fn(|_| {
Expand Down Expand Up @@ -1107,7 +1104,7 @@ mod test {
let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let mut network = TestNetwork::with_state(
SqlDataSource::options(&storage[0], options::Http { port }.into())
SqlDataSource::options(&storage[0], Options::with_port(port))
.state(Default::default())
.status(Default::default()),
Default::default(),
Expand Down Expand Up @@ -1176,7 +1173,7 @@ mod test {
.try_into()
.unwrap();
let _network = TestNetwork::with_state(
SqlDataSource::options(&storage[0], options::Http { port }.into())
SqlDataSource::options(&storage[0], Options::with_port(port))
.catchup(Default::default()),
Default::default(),
persistence,
Expand Down
80 changes: 62 additions & 18 deletions sequencer/src/api/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_std::sync::{Arc, RwLock};
use clap::Parser;
use futures::{
channel::oneshot,
future::{BoxFuture, FutureExt},
future::{BoxFuture, Future, FutureExt},
};
use hotshot_query_service::{
data_source::{ExtensibleDataSource, MetricsDataSource},
Expand All @@ -29,6 +29,7 @@ use hotshot_query_service::{
};
use hotshot_types::traits::metrics::{Metrics, NoMetrics};
use tide_disco::{
listener::RateLimitListener,
method::{ReadState, WriteState},
App, Url,
};
Expand Down Expand Up @@ -70,6 +71,11 @@ impl From<Http> for Options {
}

impl Options {
/// Default options for running a web server on the given port.
pub fn with_port(port: u16) -> Self {
Http::with_port(port).into()
}

/// Add a query API module backed by a Postgres database.
pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
self.query = Some(query);
Expand Down Expand Up @@ -215,10 +221,7 @@ impl Options {
)?;
}

tasks.spawn(
"API server",
app.serve(format!("0.0.0.0:{}", self.http.port), bind_version),
);
tasks.spawn("API server", self.listen(self.http.port, app, bind_version));

metrics
} else {
Expand All @@ -240,10 +243,7 @@ impl Options {
)?;
}

tasks.spawn(
"API server",
app.serve(format!("0.0.0.0:{}", self.http.port), bind_version),
);
tasks.spawn("API server", self.listen(self.http.port, app, bind_version));

Box::new(NoMetrics)
};
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Options {

tasks.spawn(
"API server",
app.serve(format!("0.0.0.0:{}", self.http.port), Ver::instance()),
self.listen(self.http.port, app, Ver::instance()),
);
Ok(metrics)
}
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Options {

tasks.spawn(
"API server",
app.serve(format!("0.0.0.0:{}", self.http.port), Ver::instance()),
self.listen(self.http.port, app, Ver::instance()),
);
Ok(metrics)
}
Expand Down Expand Up @@ -460,28 +460,72 @@ impl Options {

tasks.spawn(
"Hotshot Events Streaming API server",
app.serve(
format!(
"0.0.0.0:{}",
self.hotshot_events.unwrap().events_service_port
),
self.listen(
self.hotshot_events.unwrap().events_service_port,
app,
bind_version,
),
);

Ok(())
}

fn listen<S, E, Ver>(
&self,
port: u16,
app: App<S, E>,
bind_version: Ver,
) -> impl Future<Output = anyhow::Result<()>>
where
S: Send + Sync + 'static,
E: Send + Sync + tide_disco::Error,
Ver: StaticVersionType + 'static,
{
let max_connections = self.http.max_connections;

async move {
if let Some(limit) = max_connections {
app.serve(RateLimitListener::with_port(port, limit), bind_version)
.await?;
} else {
app.serve(format!("0.0.0.0:{}", port), bind_version).await?;
}
Ok(())
}
}
}

/// The minimal HTTP API.
///
/// The API automatically includes health and version endpoints. Additional API modules can be
/// added by including the query-api or submit-api modules.
#[derive(Parser, Clone, Debug)]
#[derive(Parser, Clone, Copy, Debug)]
pub struct Http {
/// Port that the HTTP API will use.
#[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT")]
pub port: u16,
pub(super) port: u16,

/// Maximum number of concurrent HTTP connections the server will allow.
///
/// Connections exceeding this will receive and immediate 429 response and be closed.
///
/// Leave unset for no connection limit.
#[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
pub(super) max_connections: Option<usize>,
}

impl Http {
/// Default options for running a web server on the given port.
pub fn with_port(port: u16) -> Self {
Self {
port,
max_connections: None,
}
}

pub fn port(&self) -> u16 {
self.port
}
}

/// Options for the submission API module.
Expand Down
21 changes: 10 additions & 11 deletions sequencer/src/bin/espresso-dev-node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ struct Args {
default_value = "0"
)]
account_index: u32,
/// Port that the HTTP API will use.
#[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT")]
sequencer_api_port: u16,

/// If provided, the service will run a basic HTTP server on the given port.
///
/// The server provides healthcheck and version endpoints.
#[clap(short, long, env = "ESPRESSO_COMMITMENT_TASK_PORT")]
commitment_task_port: u16,

#[clap(flatten)]
http: options::Http,

#[clap(flatten)]
sql: persistence::sql::Options,
}
Expand All @@ -65,13 +66,11 @@ async fn main() -> anyhow::Result<()> {
setup_backtrace();

let opt = Args::parse();
let options = options::Options::from(options::Http {
port: opt.sequencer_api_port,
})
.status(Default::default())
.state(Default::default())
.submit(Default::default())
.query_sql(Default::default(), opt.sql);
let options = options::Options::from(opt.http)
.status(Default::default())
.state(Default::default())
.submit(Default::default())
.query_sql(Default::default(), opt.sql);

let (url, _anvil) = if let Some(url) = opt.rpc_url {
(url, None)
Expand Down Expand Up @@ -119,7 +118,7 @@ async fn main() -> anyhow::Result<()> {
start_commitment_server(opt.commitment_task_port, hotshot_address, SEQUENCER_VERSION).unwrap();

let sequencer_url =
Url::parse(format!("http://localhost:{}", opt.sequencer_api_port).as_str()).unwrap();
Url::parse(format!("http://localhost:{}", opt.http.port()).as_str()).unwrap();
let commitment_task_options = CommitmentTaskOptions {
l1_provider: url,
l1_chain_id: None,
Expand Down
2 changes: 1 addition & 1 deletion sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ mod test {
genesis.to_file(&genesis_file).unwrap();

let modules = Modules {
http: Some(Http { port }),
http: Some(Http::with_port(port)),
status: Some(Status),
..Default::default()
};
Expand Down

0 comments on commit bf2dd4a

Please sign in to comment.