Skip to content

Commit

Permalink
refactor: exit early if tcp bind fails
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Jun 18, 2024
1 parent ff69ab7 commit 8f909ae
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,12 @@ impl Iroha {
metrics_reporter,
);

tokio::spawn(async move {
torii
.start()
.await
.into_report()
.map_err(|report| report.change_context(StartError::StartTorii))
});
let run_torii = torii
.start()
.await
.map_err(|report| report.change_context(StartError::StartTorii))?;

tokio::spawn(run_torii);

Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone());

Expand Down
1 change: 1 addition & 0 deletions torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ axum = { workspace = true, features = ["multipart", "ws", "query", "json", "toki
tower-http = { version = "0.5.0", features = ["trace", "timeout"] }
tokio = { workspace = true, features = ["sync", "time", "macros"] }
eyre = { workspace = true }
error-stack = { workspace = true, features = ["eyre"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, optional = true }
async-trait = { workspace = true }
Expand Down
47 changes: 31 additions & 16 deletions torii/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ use axum::{
routing::{get, post},
Router,
};
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
use iroha_config::{base::util::Bytes, parameters::actual::Torii as Config};
use error_stack::IntoReportCompat;
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use iroha_config::{
base::{util::Bytes, WithOrigin},
parameters::actual::Torii as Config,
};
#[cfg(feature = "telemetry")]
use iroha_core::metrics::MetricsReporter;
use iroha_core::{
Expand Down Expand Up @@ -58,7 +62,7 @@ pub struct Torii {
query_service: LiveQueryStoreHandle,
kura: Arc<Kura>,
transaction_max_content_len: Bytes<u64>,
address: SocketAddr,
address: WithOrigin<SocketAddr>,
state: Arc<State>,
#[cfg(feature = "telemetry")]
metrics_reporter: MetricsReporter,
Expand Down Expand Up @@ -90,7 +94,7 @@ impl Torii {
state,
#[cfg(feature = "telemetry")]
metrics_reporter,
address: config.address.into_value(),
address: config.address,
transaction_max_content_len: config.max_content_len,
}
}
Expand Down Expand Up @@ -244,25 +248,29 @@ impl Torii {
///
/// # Errors
/// Can fail due to listening to network or if http server fails
fn start_api(self: Arc<Self>) -> eyre::Result<Vec<task::JoinHandle<eyre::Result<()>>>> {
let torii_address = &self.address;
async fn start_api(self: Arc<Self>) -> eyre::Result<Vec<task::JoinHandle<eyre::Result<()>>>> {
let torii_address = self.address.value();

let handles = torii_address
.to_socket_addrs()?
.map(|addr| {
.map(TcpListener::bind)
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<TcpListener>>()
.await?
.into_iter()
.map(|listener| {
let torii = Arc::clone(&self);
let api_router = torii.create_api_router();

let signal = async move { torii.notify_shutdown.notified().await };

let serve_fut = async move {
let listener = TcpListener::bind(addr).await?;
axum::serve(listener, api_router)
.with_graceful_shutdown(signal)
.await
.map_err(eyre::Report::from)
};

task::spawn(serve_fut.map_err(eyre::Report::from))
task::spawn(serve_fut)
})
.collect();

Expand All @@ -274,13 +282,21 @@ impl Torii {
/// # Errors
/// Can fail due to listening to network or if http server fails
#[iroha_futures::telemetry_future]
pub async fn start(self) -> eyre::Result<()> {
pub async fn start(
self,
) -> error_stack::Result<impl core::future::Future<Output = ()>, eyre::Report> {
let torii = Arc::new(self);
let mut handles = vec![];

handles.extend(Arc::clone(&torii).start_api()?);
handles.extend(
Arc::clone(&torii)
.start_api()
.await
.into_report()
.map_err(|err| err.attach_printable(torii.address.clone().into_attachment()))?,
);

handles
let run = handles
.into_iter()
.collect::<FuturesUnordered<_>>()
.for_each(|handle| {
Expand All @@ -294,10 +310,9 @@ impl Torii {
_ => {}
}
futures::future::ready(())
})
.await;
});

Ok(())
Ok(run)
}
}

Expand Down

0 comments on commit 8f909ae

Please sign in to comment.