From 511d23b1244da1e1aa1f331af1749abc76761276 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Thu, 25 Jul 2024 12:01:25 -0600 Subject: [PATCH] Refactor wrap async tasks in separate structures / impls In an effort to keep in line with similar task implementations, and for better organization and automatic cleanup on `drop`, the async task processing has been adjusted to be created and managed by a struct. --- .../v0/create_node_validator_api.rs | 144 ++--- node-metrics/src/api/node_validator/v0/mod.rs | 287 +++++++--- node-metrics/src/service/client_state/mod.rs | 513 +++++++++++++----- node-metrics/src/service/data_state/mod.rs | 263 ++++++--- 4 files changed, 808 insertions(+), 399 deletions(-) diff --git a/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs b/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs index 1efe92c5d4..1ff83f25c4 100644 --- a/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs +++ b/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs @@ -1,22 +1,20 @@ use std::sync::Arc; use super::{ - get_stake_table_from_sequencer, process_node_identity_url_stream, - stream_leaves_from_hotshot_query_service, + get_stake_table_from_sequencer, ProcessNodeIdentityUrlStreamTask, ProcessProduceLeafStreamTask, }; use crate::service::{ client_id::ClientId, client_message::InternalClientMessage, client_state::{ - process_distribute_block_detail_handling_stream, - process_distribute_node_identity_handling_stream, - process_distribute_voters_handling_stream, process_internal_client_message_stream, - ClientThreadState, + ClientThreadState, InternalClientMessageProcessingTask, + ProcessDistributeBlockDetailHandlingTask, ProcessDistributeNodeIdentityHandlingTask, + ProcessDistributeVotersHandlingTask, }, - data_state::{process_leaf_stream, process_node_identity_stream, DataState}, + data_state::{DataState, ProcessLeafStreamTask, ProcessNodeIdentityStreamTask}, server_message::ServerMessage, }; -use async_std::{stream::StreamExt, sync::RwLock, task::JoinHandle}; +use async_std::sync::RwLock; use futures::{ channel::mpsc::{self, Receiver, Sender}, SinkExt, @@ -24,7 +22,14 @@ use futures::{ use url::Url; pub struct NodeValidatorAPI { - pub task_handles: Vec>, + pub process_internal_client_message_handle: Option, + pub process_distribute_block_detail_handle: Option, + pub process_distribute_node_identity_handle: Option, + pub process_distribute_voters_handle: Option, + pub process_leaf_stream_handle: Option, + pub process_node_identity_stream_handle: Option, + pub process_url_stream_handle: Option, + pub process_consume_leaves: Option, } pub struct NodeValidatorConfig { @@ -47,7 +52,7 @@ pub enum CreateNodeValidatorProcessingError { */ pub async fn create_node_validator_processing( config: NodeValidatorConfig, - server_message_receiver: Receiver>>, + internal_client_message_receiver: Receiver>>, ) -> Result { let mut data_state = DataState::new( Default::default(), @@ -64,9 +69,10 @@ pub async fn create_node_validator_processing( ClientId::from_count(1), ); - let client = surf_disco::Client::new(config.stake_table_url_base); + let client_stake_table = surf_disco::Client::new(config.stake_table_url_base.clone()); + let client_leaf_stream = surf_disco::Client::new(config.stake_table_url_base); - let stake_table = get_stake_table_from_sequencer(client.clone()) + let stake_table = get_stake_table_from_sequencer(client_stake_table) .await .map_err(CreateNodeValidatorProcessingError::FailedToGetStakeTable)?; @@ -81,85 +87,45 @@ pub async fn create_node_validator_processing( let (voters_sender, voters_receiver) = mpsc::channel(32); let (mut url_sender, url_receiver) = mpsc::channel(32); - let process_internal_client_message_handle = - async_std::task::spawn(process_internal_client_message_stream( - server_message_receiver, - data_state.clone(), - client_thread_state.clone(), - )); - - let process_distribute_block_detail_handle = - async_std::task::spawn(process_distribute_block_detail_handling_stream( - client_thread_state.clone(), - block_detail_receiver, - )); - - let process_distribute_node_identity_handle = - async_std::task::spawn(process_distribute_node_identity_handling_stream( - client_thread_state.clone(), - node_identity_receiver_2, - )); - - let process_distribute_voters_handle = async_std::task::spawn( - process_distribute_voters_handling_stream(client_thread_state.clone(), voters_receiver), + let process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state.clone(), + client_thread_state.clone(), + ); + + let process_distribute_block_detail_handle = ProcessDistributeBlockDetailHandlingTask::new( + client_thread_state.clone(), + block_detail_receiver, ); - let process_leaf_stream_handle = async_std::task::spawn(process_leaf_stream( + let process_distribute_node_identity_handle = ProcessDistributeNodeIdentityHandlingTask::new( + client_thread_state.clone(), + node_identity_receiver_2, + ); + + let process_distribute_voters_handle = + ProcessDistributeVotersHandlingTask::new(client_thread_state.clone(), voters_receiver); + + let process_leaf_stream_handle = ProcessLeafStreamTask::new( leaf_receiver, data_state.clone(), block_detail_sender, voters_sender, - )); + ); - let process_node_identity_stream_handle = async_std::task::spawn(process_node_identity_stream( + let process_node_identity_stream_handle = ProcessNodeIdentityStreamTask::new( node_identity_receiver_1, data_state.clone(), node_identity_sender_2, - )); - - let process_url_stream_handle = async_std::task::spawn(process_node_identity_url_stream( - url_receiver, - node_identity_sender_1, - )); - - let leaf_retriever_handle = async_std::task::spawn(async move { - // Alright, let's start processing leaves - // TODO: We should move this into its own function that can respond - // and react appropriately when a service or sequencer does down - // so that it can gracefully re-establish the stream as necessary. - - let client = client; - - let mut leaf_stream = match stream_leaves_from_hotshot_query_service(None, client).await { - Ok(leaf_stream) => leaf_stream, - Err(err) => { - tracing::info!("error getting leaf stream: {}", err); - return; - } - }; - - let mut leaf_sender = leaf_sender; + ); - loop { - let leaf_result = leaf_stream.next().await; - let leaf = if let Some(Ok(leaf)) = leaf_result { - leaf - } else { - tracing::info!("leaf stream closed"); - break; - }; + let process_url_stream_handle = + ProcessNodeIdentityUrlStreamTask::new(url_receiver, node_identity_sender_1); - let leaf_send_result = leaf_sender.send(leaf).await; - if let Err(err) = leaf_send_result { - tracing::info!("leaf sender closed: {}", err); - break; - } - } - }); + let process_consume_leaves = ProcessProduceLeafStreamTask::new(client_leaf_stream, leaf_sender); - // send the original three node base urls - // This is assuming that demo-native is running, as such those Urls - // should be used / match + // Send any initial URLS to the url sender for immediate processing. + // These urls are supplied by the configuration of this function { let urls = config.initial_node_public_base_urls; @@ -173,16 +139,14 @@ pub async fn create_node_validator_processing( } Ok(NodeValidatorAPI { - task_handles: vec![ - process_internal_client_message_handle, - process_distribute_block_detail_handle, - process_distribute_node_identity_handle, - process_distribute_voters_handle, - process_leaf_stream_handle, - process_node_identity_stream_handle, - process_url_stream_handle, - leaf_retriever_handle, - ], + process_internal_client_message_handle: Some(process_internal_client_message_handle), + process_distribute_block_detail_handle: Some(process_distribute_block_detail_handle), + process_distribute_node_identity_handle: Some(process_distribute_node_identity_handle), + process_distribute_voters_handle: Some(process_distribute_voters_handle), + process_leaf_stream_handle: Some(process_leaf_stream_handle), + process_node_identity_stream_handle: Some(process_node_identity_stream_handle), + process_url_stream_handle: Some(process_url_stream_handle), + process_consume_leaves: Some(process_consume_leaves), }) } @@ -257,8 +221,6 @@ mod test { app_serve_handle.await; - for handle in node_validator_task_state.task_handles { - handle.cancel().await; - } + drop(node_validator_task_state); } } diff --git a/node-metrics/src/api/node_validator/v0/mod.rs b/node-metrics/src/api/node_validator/v0/mod.rs index 5cac91004b..e960f8fbfd 100644 --- a/node-metrics/src/api/node_validator/v0/mod.rs +++ b/node-metrics/src/api/node_validator/v0/mod.rs @@ -2,13 +2,16 @@ pub mod create_node_validator_api; use crate::service::client_message::{ClientMessage, InternalClientMessage}; use crate::service::data_state::{LocationDetails, NodeIdentity}; use crate::service::server_message::ServerMessage; -use espresso_types::FeeAccount; +use async_std::task::JoinHandle; +use espresso_types::{FeeAccount, SeqTypes}; +use futures::channel::mpsc::SendError; use futures::future::Either; -use futures::Sink; use futures::{ channel::mpsc::{self, Sender}, FutureExt, SinkExt, StreamExt, }; +use futures::{Sink, Stream}; +use hotshot_query_service::Leaf; use hotshot_stake_table::vec_based::StakeTable; use hotshot_types::light_client::{CircuitField, StateVerKey}; use hotshot_types::signature_key::BLSPubKey; @@ -405,53 +408,139 @@ pub async fn get_node_identity_from_url( } } -/// [stream_leaves_from_hotshot_query_service] retrieves a stream of -/// [sequencer::Leaf]s from the Hotshot Query Service. It expects a -/// [current_block_height] to be provided so that it can determine the starting -/// block height to begin streaming from. No matter what the value of -/// [current_block_height] is the stream will always check what the latest -/// block height is on the hotshot query service. It will then attempt to -/// pull as few Leaves as it needs from the stream. -pub async fn stream_leaves_from_hotshot_query_service( - current_block_height: Option, - client: surf_disco::Client, -) -> Result< - impl futures::Stream> + Unpin, - hotshot_query_service::Error, -> { - let block_height_result = client.get("status/block-height").send().await; - let block_height: u64 = match block_height_result { - Ok(block_height) => block_height, - Err(err) => { - tracing::info!("retrieve block height request failed: {}", err); - return Err(err); +/// [ProcessProduceLeafStreamTask] is a task that produce a stream of [Leaf]s +/// from the Hotshot Query Service. It will attempt to retrieve the [Leaf]s +/// from the Hotshot Query Service and then send them to the [Sink] provided. +pub struct ProcessProduceLeafStreamTask { + pub task_handle: Option>, +} + +impl ProcessProduceLeafStreamTask { + /// [new] creates a new [ProcessConsumeLeafStreamTask] that produces a + /// stream of [Leaf]s from the Hotshot Query Service. + /// + /// Calling this function will create an async task that will start + /// processing immediately. The task's handle will be stored in the + /// returned state. + pub fn new( + client: surf_disco::Client, + leaf_sender: K, + ) -> Self + where + K: Sink, Error = SendError> + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = + async_std::task::spawn(Self::process_consume_leaf_stream(client, leaf_sender)); + + Self { + task_handle: Some(task_handle), } - }; + } - let latest_block_start = block_height.saturating_sub(50); - let start_block_height = if let Some(known_height) = current_block_height { - std::cmp::min(known_height, latest_block_start) - } else { - latest_block_start - }; + /// [process_consume_leaf_stream] produces a stream of [Leaf]s from the + /// Hotshot Query Service. It will attempt to retrieve the [Leaf]s from the + /// Hotshot Query Service and then send them to the [Sink] provided. If the + /// [Sink] is closed, or if the Stream ends prematurely, then the function + /// will return. + async fn process_consume_leaf_stream( + client: surf_disco::Client, + leaf_sender: K, + ) where + K: Sink, Error = SendError> + Clone + Send + Sync + Unpin + 'static, + { + // Alright, let's start processing leaves + // TODO: We should move this into its own function that can respond + // and react appropriately when a service or sequencer does down + // so that it can gracefully re-establish the stream as necessary. - let leaves_stream_result = client - .socket(&format!( - "availability/stream/leaves/{}", - start_block_height - )) - .subscribe::() - .await; + let client = client; - let leaves_stream = match leaves_stream_result { - Ok(leaves_stream) => leaves_stream, - Err(err) => { - tracing::info!("retrieve leaves stream failed: {}", err); - return Err(err); + let mut leaf_stream = + match Self::stream_leaves_from_hotshot_query_service(None, client).await { + Ok(leaf_stream) => leaf_stream, + Err(err) => { + tracing::info!("error getting leaf stream: {}", err); + return; + } + }; + + let mut leaf_sender = leaf_sender; + + loop { + let leaf_result = leaf_stream.next().await; + let leaf = if let Some(Ok(leaf)) = leaf_result { + leaf + } else { + tracing::info!("leaf stream closed"); + break; + }; + + let leaf_send_result = leaf_sender.send(leaf).await; + if let Err(err) = leaf_send_result { + tracing::info!("leaf sender closed: {}", err); + break; + } } - }; + } + + /// [stream_leaves_from_hotshot_query_service] retrieves a stream of + /// [sequencer::Leaf]s from the Hotshot Query Service. It expects a + /// [current_block_height] to be provided so that it can determine the starting + /// block height to begin streaming from. No matter what the value of + /// [current_block_height] is the stream will always check what the latest + /// block height is on the hotshot query service. It will then attempt to + /// pull as few Leaves as it needs from the stream. + async fn stream_leaves_from_hotshot_query_service( + current_block_height: Option, + client: surf_disco::Client, + ) -> Result< + impl futures::Stream> + Unpin, + hotshot_query_service::Error, + > { + let block_height_result = client.get("status/block-height").send().await; + let block_height: u64 = match block_height_result { + Ok(block_height) => block_height, + Err(err) => { + tracing::info!("retrieve block height request failed: {}", err); + return Err(err); + } + }; + + let latest_block_start = block_height.saturating_sub(50); + let start_block_height = if let Some(known_height) = current_block_height { + std::cmp::min(known_height, latest_block_start) + } else { + latest_block_start + }; + + let leaves_stream_result = client + .socket(&format!( + "availability/stream/leaves/{}", + start_block_height + )) + .subscribe::() + .await; + + let leaves_stream = match leaves_stream_result { + Ok(leaves_stream) => leaves_stream, + Err(err) => { + tracing::info!("retrieve leaves stream failed: {}", err); + return Err(err); + } + }; + + Ok(leaves_stream) + } +} - Ok(leaves_stream) +/// [Drop] implementation for [ProcessConsumeLeafStreamTask] that will cancel +/// the task if it hasn't already been completed. +impl Drop for ProcessProduceLeafStreamTask { + fn drop(&mut self) { + if let Some(task_handle) = self.task_handle.take() { + async_std::task::block_on(task_handle.cancel()); + } + } } /// [populate_node_identity_general_from_scrape] populates the general @@ -683,48 +772,90 @@ pub fn node_identity_from_scrape(scrape: Scrape) -> Option { Some(node_identity) } -/// [process_node_identity_url_stream] processes a stream of [Url]s that are -/// expected to contain a Node Identity. It will attempt to retrieve the Node -/// Identity from the [Url] and then send it to the [Sink] provided. If the -/// [Sink] is closed, then the function will return. -pub async fn process_node_identity_url_stream( - node_identity_url_stream: T, - node_identity_sink: K, -) where - T: futures::Stream + Unpin, - K: Sink + Unpin, -{ - let mut node_identity_url_stream = node_identity_url_stream; - let mut node_identity_sender = node_identity_sink; - loop { - let node_identity_url_result = node_identity_url_stream.next().await; - let node_identity_url = match node_identity_url_result { - Some(node_identity_url) => node_identity_url, - None => { - tracing::info!("node identity url stream closed"); - return; - } - }; +/// [ProcessNodeIdentityUrlStreamTask] is a task that processes a stream of +/// [Url]s that are expected to contain a Node Identity. It will attempt to +/// retrieve the Node Identity from the [Url] and then send it to the [Sink] +/// provided. +pub struct ProcessNodeIdentityUrlStreamTask { + pub task_handle: Option>, +} + +impl ProcessNodeIdentityUrlStreamTask { + /// [new] creates a new [ProcessNodeIdentityUrlStreamTask] that processes a + /// stream of [Url]s that are expected to contain a Node Identity. + /// + /// Calling this function will spawn a new task that will start processing + /// immediately. The tasks' handle will be stored in the returned + /// state. + pub fn new(url_receiver: S, node_identity_sender: K) -> Self + where + S: Stream + Send + Sync + Unpin + 'static, + K: Sink + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = async_std::task::spawn(Self::process_node_identity_url_stream( + url_receiver, + node_identity_sender, + )); - // Alright we have a new Url to try and scrape for a Node Identity. - // Let's attempt to do that. - let node_identity_result = get_node_identity_from_url(node_identity_url).await; + Self { + task_handle: Some(task_handle), + } + } - let node_identity = match node_identity_result { - Ok(node_identity) => node_identity, - Err(err) => { - tracing::warn!("get node identity from url failed. bad base url?: {}", err); - continue; + /// [process_node_identity_url_stream] processes a stream of [Url]s that are + /// expected to contain a Node Identity. It will attempt to retrieve the Node + /// Identity from the [Url] and then send it to the [Sink] provided. If the + /// [Sink] is closed, then the function will return. + async fn process_node_identity_url_stream( + node_identity_url_stream: T, + node_identity_sink: K, + ) where + T: futures::Stream + Unpin, + K: Sink + Unpin, + { + let mut node_identity_url_stream = node_identity_url_stream; + let mut node_identity_sender = node_identity_sink; + loop { + let node_identity_url_result = node_identity_url_stream.next().await; + let node_identity_url = match node_identity_url_result { + Some(node_identity_url) => node_identity_url, + None => { + tracing::info!("node identity url stream closed"); + return; + } + }; + + // Alright we have a new Url to try and scrape for a Node Identity. + // Let's attempt to do that. + let node_identity_result = get_node_identity_from_url(node_identity_url).await; + + let node_identity = match node_identity_result { + Ok(node_identity) => node_identity, + Err(err) => { + tracing::warn!("get node identity from url failed. bad base url?: {}", err); + continue; + } + }; + + let send_result = node_identity_sender.send(node_identity).await; + if let Err(err) = send_result { + tracing::info!("node identity sender closed: {}", err); + return; } - }; + } + } +} - let send_result = node_identity_sender.send(node_identity).await; - if let Err(err) = send_result { - tracing::info!("node identity sender closed: {}", err); - return; +/// [ProcessNodeIdentityUrlStreamTask] will cancel the task when it is dropped. +impl Drop for ProcessNodeIdentityUrlStreamTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); } } } + #[cfg(test)] mod tests { use espresso_types::FeeAccount; diff --git a/node-metrics/src/service/client_state/mod.rs b/node-metrics/src/service/client_state/mod.rs index 44213cbb9c..b3826eaea9 100644 --- a/node-metrics/src/service/client_state/mod.rs +++ b/node-metrics/src/service/client_state/mod.rs @@ -4,7 +4,10 @@ use super::{ data_state::{DataState, NodeIdentity}, server_message::ServerMessage, }; -use async_std::sync::{RwLock, RwLockWriteGuard}; +use async_std::{ + sync::{RwLock, RwLockWriteGuard}, + task::JoinHandle, +}; use bitvec::vec::BitVec; use espresso_types::SeqTypes; use futures::{channel::mpsc::SendError, Sink, SinkExt, Stream, StreamExt}; @@ -888,124 +891,309 @@ async fn handle_received_voters( drop_failed_client_sends(client_thread_state, failed_client_sends).await; } -/// [process_internal_client_message_stream] is a function that processes the -/// client handling stream. This stream is responsible for managing the state -/// of the connected clients, and their subscriptions. -pub async fn process_internal_client_message_stream( - mut stream: S, - data_state: Arc>, - client_thread_state: Arc>>, -) where - S: Stream> + Unpin, - K: Sink + Clone + Unpin, -{ - loop { - let message_result = stream.next().await; - let message = if let Some(message) = message_result { - message - } else { - tracing::info!("internal client message handler closed."); - return; - }; +/// InternalClientMessageProcessingTask represents an async task for +/// InternalClientMessages, and making the appropriate updates to the +/// [ClientThreadState] and [DataState]. +pub struct InternalClientMessageProcessingTask { + pub task_handle: Option>, +} - if let Err(err) = - process_client_message(message, data_state.clone(), client_thread_state.clone()).await - { - tracing::info!( - "internal client message processing encountered an error: {}", - err, - ); - return; +impl InternalClientMessageProcessingTask { + /// new creates a new [InternalClientMessageProcessingTask] with the + /// given internal_client_message_receiver, data_state, and + /// client_thread_state. + /// + /// Calling this function will start an async task that will start + /// processing. The handle for the async task is stored within the + /// returned state. + pub fn new( + internal_client_message_receiver: S, + data_state: Arc>, + client_thread_state: Arc>>, + ) -> Self + where + S: Stream> + Send + Sync + Unpin + 'static, + K: Sink + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = async_std::task::spawn(Self::process_internal_client_message_stream( + internal_client_message_receiver, + data_state.clone(), + client_thread_state.clone(), + )); + + Self { + task_handle: Some(task_handle), + } + } + + /// [process_internal_client_message_stream] is a function that processes the + /// client handling stream. This stream is responsible for managing the state + /// of the connected clients, and their subscriptions. + async fn process_internal_client_message_stream( + mut stream: S, + data_state: Arc>, + client_thread_state: Arc>>, + ) where + S: Stream> + Unpin, + K: Sink + Clone + Unpin, + { + loop { + let message_result = stream.next().await; + let message = if let Some(message) = message_result { + message + } else { + tracing::info!("internal client message handler closed."); + return; + }; + + if let Err(err) = + process_client_message(message, data_state.clone(), client_thread_state.clone()) + .await + { + tracing::info!( + "internal client message processing encountered an error: {}", + err, + ); + return; + } } } } -/// [process_distribute_block_detail_handling_stream] is a function that -/// processes the the [Stream] of incoming [BlockDetail] and distributes them -/// to all subscribed clients. -pub async fn process_distribute_block_detail_handling_stream( - client_thread_state: Arc>>, - mut stream: S, -) where - S: Stream> + Unpin, - K: Sink + Clone + Unpin, -{ - loop { - let block_detail_result = stream.next().await; - - let block_detail = if let Some(block_detail) = block_detail_result { - block_detail - } else { - tracing::info!("block detail stream closed. shutting down client handling stream.",); - return; - }; +/// [drop] implementation for [InternalClientMessageProcessingTask] that will +/// cancel the task if it is still running. +impl Drop for InternalClientMessageProcessingTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); + } + } +} + +/// [ProcessDistributeBlockDetailHandlingTask] represents an async task for +/// processing the incoming [BlockDetail] and distributing them to all +/// subscribed clients. +pub struct ProcessDistributeBlockDetailHandlingTask { + pub task_handle: Option>, +} + +impl ProcessDistributeBlockDetailHandlingTask { + /// [new] creates a new [ProcessDistributeBlockDetailHandlingTask] with the + /// given client_thread_state and block_detail_receiver. + /// + /// Calling this function will start an async task that will start + /// processing. The handle for the async task is stored within the + /// returned state. + pub fn new( + client_thread_state: Arc>>, + block_detail_receiver: S, + ) -> Self + where + S: Stream> + Send + Sync + Unpin + 'static, + K: Sink + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = + async_std::task::spawn(Self::process_distribute_block_detail_handling_stream( + client_thread_state.clone(), + block_detail_receiver, + )); - handle_received_block_detail(client_thread_state.clone(), block_detail).await + Self { + task_handle: Some(task_handle), + } + } + + /// [process_distribute_block_detail_handling_stream] is a function that + /// processes the the [Stream] of incoming [BlockDetail] and distributes them + /// to all subscribed clients. + async fn process_distribute_block_detail_handling_stream( + client_thread_state: Arc>>, + mut stream: S, + ) where + S: Stream> + Unpin, + K: Sink + Clone + Unpin, + { + loop { + let block_detail_result = stream.next().await; + + let block_detail = if let Some(block_detail) = block_detail_result { + block_detail + } else { + tracing::info!( + "block detail stream closed. shutting down client handling stream.", + ); + return; + }; + + handle_received_block_detail(client_thread_state.clone(), block_detail).await + } } } -/// [process_distribute_node_identity_handling_stream] is a function that -/// processes the the [Stream] of incoming [NodeIdentity] and distributes them -/// to all subscribed clients. -pub async fn process_distribute_node_identity_handling_stream( - client_thread_state: Arc>>, - mut stream: S, -) where - S: Stream + Unpin, - K: Sink + Clone + Unpin, -{ - loop { - let node_identity_result = stream.next().await; - - let node_identity = if let Some(node_identity) = node_identity_result { - node_identity - } else { - tracing::info!("node identity stream closed. shutting down client handling stream.",); - return; - }; +/// [drop] implementation for [ProcessDistributeBlockDetailHandlingTask] that will +/// cancel the task if it is still running. +impl Drop for ProcessDistributeBlockDetailHandlingTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); + } + } +} + +/// [ProcessDistributeNodeIdentityHandlingTask] represents an async task for +/// processing the incoming [NodeIdentity] and distributing them to all +/// subscribed clients. +pub struct ProcessDistributeNodeIdentityHandlingTask { + pub task_handle: Option>, +} - handle_received_node_identity(client_thread_state.clone(), node_identity).await +impl ProcessDistributeNodeIdentityHandlingTask { + /// [new] creates a new [ProcessDistributeNodeIdentityHandlingTask] with the + /// given client_thread_state and node_identity_receiver. + /// + /// Calling this function will start an async task that will start + /// processing. The handle for the async task is stored within the + /// returned state. + pub fn new( + client_thread_state: Arc>>, + node_identity_receiver: S, + ) -> Self + where + S: Stream + Send + Sync + Unpin + 'static, + K: Sink + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = + async_std::task::spawn(Self::process_distribute_node_identity_handling_stream( + client_thread_state.clone(), + node_identity_receiver, + )); + + Self { + task_handle: Some(task_handle), + } + } + + /// [process_distribute_node_identity_handling_stream] is a function that + /// processes the the [Stream] of incoming [NodeIdentity] and distributes them + /// to all subscribed clients. + async fn process_distribute_node_identity_handling_stream( + client_thread_state: Arc>>, + mut stream: S, + ) where + S: Stream + Unpin, + K: Sink + Clone + Unpin, + { + loop { + let node_identity_result = stream.next().await; + + let node_identity = if let Some(node_identity) = node_identity_result { + node_identity + } else { + tracing::info!( + "node identity stream closed. shutting down client handling stream.", + ); + return; + }; + + handle_received_node_identity(client_thread_state.clone(), node_identity).await + } } } -/// [process_distribute_voters_handling_stream] is a function that processes -/// the the [Stream] of incoming [BitVec] and distributes them to all +/// [drop] implementation for [ProcessDistributeNodeIdentityHandlingTask] that +/// will cancel the task if it is still running. +impl Drop for ProcessDistributeNodeIdentityHandlingTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); + } + } +} + +/// [ProcessDistributeVotersHandlingTask] represents an async task for +/// processing the incoming [BitVec] and distributing them to all /// subscribed clients. -pub async fn process_distribute_voters_handling_stream( - client_thread_state: Arc>>, - mut stream: S, -) where - S: Stream> + Unpin, - K: Sink + Clone + Unpin, -{ - loop { - let voters_result = stream.next().await; - - let voters = if let Some(voters) = voters_result { - voters - } else { - tracing::info!("voters stream closed. shutting down client handling stream.",); - return; - }; +pub struct ProcessDistributeVotersHandlingTask { + pub task_handle: Option>, +} + +impl ProcessDistributeVotersHandlingTask { + /// [new] creates a new [ProcessDistributeVotersHandlingTask] with the + /// given client_thread_state and voters_receiver. + /// + /// Calling this function will start an async task that will start + /// processing. The handle for the async task is stored within the + /// returned state. + pub fn new( + client_thread_state: Arc>>, + voters_receiver: S, + ) -> Self + where + S: Stream> + Send + Sync + Unpin + 'static, + K: Sink + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = async_std::task::spawn(Self::process_distribute_voters_handling_stream( + client_thread_state.clone(), + voters_receiver, + )); + + Self { + task_handle: Some(task_handle), + } + } + + /// [process_distribute_voters_handling_stream] is a function that processes + /// the the [Stream] of incoming [BitVec] and distributes them to all + /// subscribed clients. + async fn process_distribute_voters_handling_stream( + client_thread_state: Arc>>, + mut stream: S, + ) where + S: Stream> + Unpin, + K: Sink + Clone + Unpin, + { + loop { + let voters_result = stream.next().await; + + let voters = if let Some(voters) = voters_result { + voters + } else { + tracing::info!("voters stream closed. shutting down client handling stream.",); + return; + }; + + handle_received_voters(client_thread_state.clone(), voters).await + } + } +} - handle_received_voters(client_thread_state.clone(), voters).await +/// [drop] implementation for [ProcessDistributeVotersHandlingTask] that will +/// cancel the task if it is still running. +impl Drop for ProcessDistributeVotersHandlingTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); + } } } #[cfg(test)] pub mod tests { - use super::{process_internal_client_message_stream, ClientThreadState}; + use super::{ClientThreadState, InternalClientMessageProcessingTask}; use crate::service::{ client_id::ClientId, client_message::InternalClientMessage, client_state::{ - process_distribute_block_detail_handling_stream, - process_distribute_node_identity_handling_stream, - process_distribute_voters_handling_stream, + ProcessDistributeBlockDetailHandlingTask, ProcessDistributeNodeIdentityHandlingTask, + ProcessDistributeVotersHandlingTask, }, data_state::{ - create_block_detail_from_leaf, process_leaf_stream, DataState, LocationDetails, - NodeIdentity, + create_block_detail_from_leaf, DataState, LocationDetails, NodeIdentity, + ProcessLeafStreamTask, }, server_message::ServerMessage, }; @@ -1103,18 +1291,20 @@ pub mod tests { let (mut internal_client_message_sender, internal_client_message_receiver) = mpsc::channel(1); - let process_internal_client_message_handle: async_std::task::JoinHandle<()> = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state, - client_thread_state, - )); + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state, + client_thread_state, + ); // disconnect the last internal client message sender internal_client_message_sender.disconnect(); // Join the async task. if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1139,12 +1329,11 @@ pub mod tests { let (internal_client_message_sender, internal_client_message_receiver) = mpsc::channel(1); let (server_message_sender_1, mut server_message_receiver_1) = mpsc::channel(1); let (server_message_sender_2, mut server_message_receiver_2) = mpsc::channel(1); - let process_internal_client_message_handle = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state, - client_thread_state, - )); + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state, + client_thread_state, + ); // Send a Connected Message to the server let mut internal_client_message_sender_1 = internal_client_message_sender.clone(); @@ -1199,6 +1388,9 @@ pub mod tests { assert_eq!(server_message_receiver_2.next().await, None); if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1226,12 +1418,11 @@ pub mod tests { let (internal_client_message_sender, internal_client_message_receiver) = mpsc::channel(1); let (server_message_sender_1, mut server_message_receiver_1) = mpsc::channel(1); let (server_message_sender_2, mut server_message_receiver_2) = mpsc::channel(1); - let process_internal_client_message_handle = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state, - client_thread_state, - )); + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state, + client_thread_state, + ); // Send a Connected Message to the server let mut internal_client_message_sender_1 = internal_client_message_sender.clone(); @@ -1285,6 +1476,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1304,12 +1498,11 @@ pub mod tests { let (internal_client_message_sender, internal_client_message_receiver) = mpsc::channel(1); let (server_message_sender_1, mut server_message_receiver_1) = mpsc::channel(1); let (server_message_sender_2, mut server_message_receiver_2) = mpsc::channel(1); - let process_internal_client_message_handle: async_std::task::JoinHandle<()> = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state, - client_thread_state, - )); + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state, + client_thread_state, + ); // Send a Connected Message to the server let mut internal_client_message_sender_1 = internal_client_message_sender.clone(); @@ -1370,6 +1563,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1393,29 +1589,27 @@ pub mod tests { let (server_message_sender_1, mut server_message_receiver_1) = mpsc::channel(1); let (server_message_sender_2, mut server_message_receiver_2) = mpsc::channel(1); let (server_message_sender_3, mut server_message_receiver_3) = mpsc::channel(1); - let process_internal_client_message_handle = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state.clone(), - client_thread_state.clone(), - )); + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state.clone(), + client_thread_state.clone(), + ); - let process_distribute_block_detail_handle = - async_std::task::spawn(process_distribute_block_detail_handling_stream( + let mut process_distribute_block_detail_handle = + ProcessDistributeBlockDetailHandlingTask::new( client_thread_state.clone(), block_detail_receiver, - )); + ); - let process_distribute_voters_handle = async_std::task::spawn( - process_distribute_voters_handling_stream(client_thread_state, voters_receiver), - ); + let mut process_distribute_voters_handle = + ProcessDistributeVotersHandlingTask::new(client_thread_state, voters_receiver); - let process_leaf_stream_handle = async_std::task::spawn(process_leaf_stream( + let mut process_leaf_stream_handle = ProcessLeafStreamTask::new( leaf_receiver, data_state, block_detail_sender, voters_sender, - )); + ); // Send a Connected Message to the server let mut internal_client_message_sender_1 = internal_client_message_sender.clone(); @@ -1500,6 +1694,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_leaf_stream_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1511,6 +1708,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_distribute_block_detail_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1521,6 +1721,9 @@ pub mod tests { } if let Err(timeout_error) = process_distribute_voters_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1543,6 +1746,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1564,18 +1770,17 @@ pub mod tests { let (server_message_sender_1, mut server_message_receiver_1) = mpsc::channel(1); let (server_message_sender_2, mut server_message_receiver_2) = mpsc::channel(1); let (server_message_sender_3, mut server_message_receiver_3) = mpsc::channel(1); - let process_internal_client_message_handle = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state.clone(), - client_thread_state.clone(), - )); + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state.clone(), + client_thread_state.clone(), + ); - let process_distribute_node_identity_handle = - async_std::task::spawn(process_distribute_node_identity_handling_stream( + let mut process_distribute_node_identity_handle = + ProcessDistributeNodeIdentityHandlingTask::new( client_thread_state, node_identity_receiver, - )); + ); // Send a Connected Message to the server let mut internal_client_message_sender_1 = internal_client_message_sender.clone(); @@ -1662,6 +1867,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_distribute_node_identity_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1684,6 +1892,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1705,17 +1916,15 @@ pub mod tests { let (server_message_sender_1, mut server_message_receiver_1) = mpsc::channel(1); let (server_message_sender_2, mut server_message_receiver_2) = mpsc::channel(1); let (server_message_sender_3, mut server_message_receiver_3) = mpsc::channel(1); - let process_internal_client_message_handle = - async_std::task::spawn(process_internal_client_message_stream( - internal_client_message_receiver, - data_state.clone(), - client_thread_state.clone(), - )); - - let process_distribute_voters_handle = async_std::task::spawn( - process_distribute_voters_handling_stream(client_thread_state, voters_receiver), + let mut process_internal_client_message_handle = InternalClientMessageProcessingTask::new( + internal_client_message_receiver, + data_state.clone(), + client_thread_state.clone(), ); + let mut process_distribute_voters_handle = + ProcessDistributeVotersHandlingTask::new(client_thread_state, voters_receiver); + // Send a Connected Message to the server let mut internal_client_message_sender_1 = internal_client_message_sender.clone(); assert_eq!( @@ -1796,6 +2005,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_distribute_voters_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { @@ -1818,6 +2030,9 @@ pub mod tests { // Join the async task. if let Err(timeout_error) = process_internal_client_message_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await { diff --git a/node-metrics/src/service/data_state/mod.rs b/node-metrics/src/service/data_state/mod.rs index e05c06be46..2ea1caafdd 100644 --- a/node-metrics/src/service/data_state/mod.rs +++ b/node-metrics/src/service/data_state/mod.rs @@ -1,7 +1,7 @@ pub mod location_details; pub mod node_identity; -use async_std::sync::RwLock; +use async_std::{sync::RwLock, task::JoinHandle}; use bitvec::vec::BitVec; use circular_buffer::CircularBuffer; use espresso_types::{Header, Payload, SeqTypes}; @@ -304,41 +304,89 @@ where Ok(()) } -/// [process_leaf_stream] allows for the consumption of a [Stream] when -/// attempting to process new incoming [Leaf]s. -pub async fn process_leaf_stream( - mut stream: S, - data_state: Arc>, - block_sender: BDSink, - voters_senders: BVSink, -) where - S: Stream> + Unpin, - Header: BlockHeader + QueryableHeader + ExplorerHeader, - Payload: BlockPayload, - BDSink: Sink, Error = SendError> + Clone + Unpin, - BVSink: Sink, Error = SendError> + Clone + Unpin, -{ - loop { - let leaf_result = stream.next().await; - let leaf = if let Some(leaf) = leaf_result { - leaf - } else { - // We have reached the end of the stream - tracing::info!("process leaf stream: end of stream reached for leaf stream."); - return; - }; +/// [ProcessLeafStreamTask] represents the task that is responsible for +/// processing a stream of incoming [Leaf]s. +pub struct ProcessLeafStreamTask { + pub task_handle: Option>, +} - if let Err(err) = process_incoming_leaf( - leaf, +impl ProcessLeafStreamTask { + /// [new] creates a new [ProcessLeafStreamTask] that will process a stream + /// of incoming [Leaf]s. + /// + /// Calling this function will create an asynchronous task that will start + /// processing immediately. The handle for the task will be stored within + /// the returned structure. + pub fn new( + leaf_receiver: S, + data_state: Arc>, + block_detail_sender: K1, + voters_sender: K2, + ) -> Self + where + S: Stream> + Send + Sync + Unpin + 'static, + K1: Sink, Error = SendError> + Clone + Send + Sync + Unpin + 'static, + K2: Sink, Error = SendError> + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = async_std::task::spawn(Self::process_leaf_stream( + leaf_receiver, data_state.clone(), - block_sender.clone(), - voters_senders.clone(), - ) - .await - { - // We have an error that prevents us from continuing - tracing::info!("process leaf stream: error processing leaf: {}", err); - break; + block_detail_sender, + voters_sender, + )); + + Self { + task_handle: Some(task_handle), + } + } + + /// [process_leaf_stream] allows for the consumption of a [Stream] when + /// attempting to process new incoming [Leaf]s. + async fn process_leaf_stream( + mut stream: S, + data_state: Arc>, + block_sender: BDSink, + voters_senders: BVSink, + ) where + S: Stream> + Unpin, + Header: BlockHeader + QueryableHeader + ExplorerHeader, + Payload: BlockPayload, + BDSink: Sink, Error = SendError> + Clone + Unpin, + BVSink: Sink, Error = SendError> + Clone + Unpin, + { + loop { + let leaf_result = stream.next().await; + let leaf = if let Some(leaf) = leaf_result { + leaf + } else { + // We have reached the end of the stream + tracing::info!("process leaf stream: end of stream reached for leaf stream."); + return; + }; + + if let Err(err) = process_incoming_leaf( + leaf, + data_state.clone(), + block_sender.clone(), + voters_senders.clone(), + ) + .await + { + // We have an error that prevents us from continuing + tracing::info!("process leaf stream: error processing leaf: {}", err); + break; + } + } + } +} + +/// [Drop] implementation for [ProcessLeafStreamTask] that will cancel the +/// task if it is dropped. +impl Drop for ProcessLeafStreamTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); } } } @@ -393,53 +441,101 @@ where Ok(()) } -/// [process_node_identity_stream] allows for the consumption of a [Stream] when -/// attempting to process new incoming [NodeIdentity]s. -/// This function will process the incoming [NodeIdentity] and update the -/// [DataState] with the new information. -/// Additionally, the [NodeIdentity] will be sent to the [Sink] so that it can -/// be processed for real-time considerations. -pub async fn process_node_identity_stream( - mut stream: S, - data_state: Arc>, - node_identity_sender: NISink, -) where - S: Stream + Unpin, - NISink: Sink + Clone + Unpin, -{ - loop { - let node_identity_result = stream.next().await; - let node_identity = if let Some(node_identity) = node_identity_result { - node_identity - } else { - // We have reached the end of the stream - tracing::info!( - "process node identity stream: end of stream reached for node identity stream." - ); - return; - }; +/// [ProcessNodeIdentityStreamTask] represents the task that is responsible for +/// processing a stream of incoming [NodeIdentity]s and updating the [DataState] +/// with the new information. +pub struct ProcessNodeIdentityStreamTask { + pub task_handle: Option>, +} - if let Err(err) = process_incoming_node_identity( - node_identity, +impl ProcessNodeIdentityStreamTask { + /// [new] creates a new [ProcessNodeIdentityStreamTask] that will process a + /// stream of incoming [NodeIdentity]s. + /// + /// Calling this function will create an asynchronous task that will start + /// processing immediately. The handle for the task will be stored within + /// the returned structure. + pub fn new( + node_identity_receiver: S, + data_state: Arc>, + node_identity_sender: K, + ) -> Self + where + S: Stream + Send + Sync + Unpin + 'static, + K: Sink + Clone + Send + Sync + Unpin + 'static, + { + let task_handle = async_std::task::spawn(Self::process_node_identity_stream( + node_identity_receiver, data_state.clone(), - node_identity_sender.clone(), - ) - .await - { - // We have an error that prevents us from continuing - tracing::info!( - "process node identity stream: error processing node identity: {}", - err - ); - break; + node_identity_sender, + )); + + Self { + task_handle: Some(task_handle), + } + } + + /// [process_node_identity_stream] allows for the consumption of a [Stream] when + /// attempting to process new incoming [NodeIdentity]s. + /// This function will process the incoming [NodeIdentity] and update the + /// [DataState] with the new information. + /// Additionally, the [NodeIdentity] will be sent to the [Sink] so that it can + /// be processed for real-time considerations. + async fn process_node_identity_stream( + mut stream: S, + data_state: Arc>, + node_identity_sender: NISink, + ) where + S: Stream + Unpin, + NISink: Sink + Clone + Unpin, + { + loop { + let node_identity_result = stream.next().await; + let node_identity = if let Some(node_identity) = node_identity_result { + node_identity + } else { + // We have reached the end of the stream + tracing::info!( + "process node identity stream: end of stream reached for node identity stream." + ); + return; + }; + + if let Err(err) = process_incoming_node_identity( + node_identity, + data_state.clone(), + node_identity_sender.clone(), + ) + .await + { + // We have an error that prevents us from continuing + tracing::info!( + "process node identity stream: error processing node identity: {}", + err + ); + break; + } + } + } +} + +/// [Drop] implementation for [ProcessNodeIdentityStreamTask] that will cancel +/// the task if it is dropped. +impl Drop for ProcessNodeIdentityStreamTask { + fn drop(&mut self) { + let task_handle = self.task_handle.take(); + if let Some(task_handle) = task_handle { + async_std::task::block_on(task_handle.cancel()); } } } #[cfg(test)] mod tests { - use super::{process_leaf_stream, DataState}; - use crate::service::data_state::{process_node_identity_stream, LocationDetails, NodeIdentity}; + use super::{DataState, ProcessLeafStreamTask}; + use crate::service::data_state::{ + LocationDetails, NodeIdentity, ProcessNodeIdentityStreamTask, + }; use async_std::{prelude::FutureExt, sync::RwLock}; use espresso_types::{ BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleTree, Leaf, NodeState, ValidatedState, @@ -477,12 +573,12 @@ mod tests { let (voters_sender, voters_receiver) = futures::channel::mpsc::channel(1); let (leaf_sender, leaf_receiver) = futures::channel::mpsc::channel(1); - let process_leaf_stream_task_handle = async_std::task::spawn(process_leaf_stream( + let mut process_leaf_stream_task_handle = ProcessLeafStreamTask::new( leaf_receiver, data_state.clone(), block_sender, voters_sender, - )); + ); { let data_state = data_state.read().await; @@ -530,6 +626,9 @@ mod tests { assert_eq!( process_leaf_stream_task_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await, Ok(()) @@ -543,12 +642,11 @@ mod tests { let (node_identity_sender_1, node_identity_receiver_1) = futures::channel::mpsc::channel(1); let (node_identity_sender_2, node_identity_receiver_2) = futures::channel::mpsc::channel(1); - let process_node_identity_task_handle = - async_std::task::spawn(process_node_identity_stream( - node_identity_receiver_1, - data_state.clone(), - node_identity_sender_2, - )); + let mut process_node_identity_task_handle = ProcessNodeIdentityStreamTask::new( + node_identity_receiver_1, + data_state.clone(), + node_identity_sender_2, + ); { let data_state = data_state.read().await; @@ -661,6 +759,9 @@ mod tests { assert_eq!( process_node_identity_task_handle + .task_handle + .take() + .unwrap() .timeout(Duration::from_millis(200)) .await, Ok(())