From ec74b61681808df3f1c92b675af559612b51b76d Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 29 Nov 2023 22:10:42 -0500 Subject: [PATCH] add force shutdown with signals --- Cargo.lock | 1 + Cargo.toml | 1 + nft_ingester2/Cargo.toml | 1 + nft_ingester2/src/grpc.rs | 27 +++++++++++----------- nft_ingester2/src/ingester.rs | 43 ++++++++++++++++++++--------------- nft_ingester2/src/redis.rs | 14 +++++++----- nft_ingester2/src/util.rs | 15 +++++++----- 7 files changed, 59 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 507db2f19..a7d18c629 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3539,6 +3539,7 @@ name = "nft_ingester2" version = "0.7.2" dependencies = [ "anyhow", + "async-stream", "atty", "cargo-lock", "clap 4.4.8", diff --git a/Cargo.toml b/Cargo.toml index 98c8efe9f..2ee41f36e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ anchor-client = "0.28.0" anchor-lang = "0.28.0" anyhow = "1.0.75" async-std = "1.0.0" +async-stream = "0.3.5" async-trait = "0.1.60" atty = "0.2.14" blockbuster = "0.9.0-beta.1" diff --git a/nft_ingester2/Cargo.toml b/nft_ingester2/Cargo.toml index b440287f5..df11b29fb 100644 --- a/nft_ingester2/Cargo.toml +++ b/nft_ingester2/Cargo.toml @@ -7,6 +7,7 @@ publish = { workspace = true } [dependencies] anyhow = { workspace = true } +async-stream = { workspace = true } atty = { workspace = true } clap = { workspace = true, features = ["cargo", "derive"] } futures = { workspace = true } diff --git a/nft_ingester2/src/grpc.rs b/nft_ingester2/src/grpc.rs index 5ec284b1b..66b4ead4c 100644 --- a/nft_ingester2/src/grpc.rs +++ b/nft_ingester2/src/grpc.rs @@ -7,10 +7,10 @@ use { redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue}, std::{sync::Arc, time::Duration}, tokio::{ - signal::unix::SignalKind, task::JoinSet, time::{sleep, Instant}, }, + tracing::warn, yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{prelude::subscribe_update::UpdateOneof, prost::Message}, }; @@ -64,15 +64,8 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { Ok(Err(error)) => break Err(error), Err(error) => break Err(error.into()), }, - signal = &mut shutdown => { - let signal = if signal == SignalKind::interrupt() { - "SIGINT" - } else if signal == SignalKind::terminate() { - "SIGTERM" - } else { - "UNKNOWN" - }; - tracing::warn!("{signal} received, waiting spawned tasks..."); + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); break Ok(()); }, msg = geyser.next() => { @@ -142,9 +135,17 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { } }; - while let Some(result) = tasks.join_next().await { - result??; - } + tokio::select! { + Some(signal) = shutdown.next() => { + anyhow::bail!("{signal} received, force shutdown..."); + } + result = async move { + while let Some(result) = tasks.join_next().await { + result??; + } + Ok::<(), anyhow::Error>(()) + } => result?, + }; result } diff --git a/nft_ingester2/src/ingester.rs b/nft_ingester2/src/ingester.rs index 0d7d43478..479e5bddb 100644 --- a/nft_ingester2/src/ingester.rs +++ b/nft_ingester2/src/ingester.rs @@ -9,7 +9,10 @@ use { redis::{metrics_xlen, ProgramTransformerInfo, RedisStream}, util::create_shutdown, }, - futures::future::{pending, BoxFuture, FusedFuture, FutureExt}, + futures::{ + future::{pending, BoxFuture, FusedFuture, FutureExt}, + stream::StreamExt, + }, program_transformers::{ error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier, ProgramTransformer, @@ -19,7 +22,6 @@ use { Arc, }, tokio::{ - signal::unix::SignalKind, task::JoinSet, time::{sleep, Duration}, }, @@ -103,14 +105,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { Ok(Err(error)) => break Err(error), Err(error) => break Err(error.into()), }, - signal = &mut shutdown => { - let signal = if signal == SignalKind::interrupt() { - "SIGINT" - } else if signal == SignalKind::terminate() { - "SIGTERM" - } else { - "UNKNOWN" - }; + Some(signal) = shutdown.next() => { warn!("{signal} received, waiting spawned tasks..."); break Ok(()); }, @@ -195,14 +190,26 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { }); }; - redis_messages.shutdown(); - while let Some(result) = pt_tasks.join_next().await { - result??; - } - if !redis_tasks_fut.is_terminated() { - redis_tasks_fut.await?; - } - pgpool.close().await; + tokio::select! { + Some(signal) = shutdown.next() => { + anyhow::bail!("{signal} received, force shutdown..."); + } + result = async move { + // shutdown `prefetch` channel (but not Receiver) + redis_messages.shutdown(); + // wait all `program_transformer` spawned tasks + while let Some(result) = pt_tasks.join_next().await { + result??; + } + // wait all `ack` spawned tasks + if !redis_tasks_fut.is_terminated() { + redis_tasks_fut.await?; + } + // shutdown database connection + pgpool.close().await; + Ok::<(), anyhow::Error>(()) + } => result?, + }; result } diff --git a/nft_ingester2/src/redis.rs b/nft_ingester2/src/redis.rs index b823f4fee..1baec8799 100644 --- a/nft_ingester2/src/redis.rs +++ b/nft_ingester2/src/redis.rs @@ -251,7 +251,7 @@ impl RedisStream { .collect::>(); // spawn xack tasks - let mut tasks = ack_tasks + let ack_jh_vec = ack_tasks .into_iter() .map(|(stream, ack_rx)| { let connection = connection.clone(); @@ -261,15 +261,16 @@ impl RedisStream { // spawn prefetch task let (messages_tx, messages_rx) = mpsc::channel(config.prefetch_queue_size); - tasks.push(tokio::spawn({ + let jh_prefetch = tokio::spawn({ let shutdown = Arc::clone(&shutdown); async move { Self::run_prefetch(config, streams, connection, messages_tx, shutdown).await } - })); + }); // merge spawned xack / prefetch tasks let spawned_tasks = async move { - for task in tasks.into_iter() { - task.await??; + jh_prefetch.await??; + for jh in ack_jh_vec.into_iter() { + jh.await??; } Ok::<(), anyhow::Error>(()) }; @@ -287,8 +288,9 @@ impl RedisStream { self.messages_rx.recv().await } - pub fn shutdown(self) { + pub fn shutdown(mut self) { self.shutdown.store(true, Ordering::Relaxed); + tokio::spawn(async move { while self.messages_rx.recv().await.is_some() {} }); } async fn run_prefetch( diff --git a/nft_ingester2/src/util.rs b/nft_ingester2/src/util.rs index 6de97e42f..0a7800a12 100644 --- a/nft_ingester2/src/util.rs +++ b/nft_ingester2/src/util.rs @@ -1,15 +1,18 @@ use { - futures::future::{BoxFuture, FutureExt}, + async_stream::stream, + futures::stream::{BoxStream, StreamExt}, tokio::signal::unix::{signal, SignalKind}, }; -pub fn create_shutdown() -> anyhow::Result> { +pub fn create_shutdown() -> anyhow::Result> { let mut sigint = signal(SignalKind::interrupt())?; let mut sigterm = signal(SignalKind::terminate())?; - Ok(async move { - tokio::select! { - _ = sigint.recv() => SignalKind::interrupt(), - _ = sigterm.recv() => SignalKind::terminate(), + Ok(stream! { + loop { + yield tokio::select! { + _ = sigint.recv() => "SIGINT", + _ = sigterm.recv() => "SIGTERM", + }; } } .boxed())