Skip to content

Commit

Permalink
Refactor wrap async tasks in separate structures / impls
Browse files Browse the repository at this point in the history
In an effort to keep in line with similar task implementations, and for better
organization and automatic cleanup on `drop`, the async task processing
has been adjusted to be created and managed by a struct.
  • Loading branch information
Ayiga committed Jul 25, 2024
1 parent 2041419 commit 511d23b
Show file tree
Hide file tree
Showing 4 changed files with 808 additions and 399 deletions.
144 changes: 53 additions & 91 deletions node-metrics/src/api/node_validator/v0/create_node_validator_api.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
use std::sync::Arc;

use super::{
get_stake_table_from_sequencer, process_node_identity_url_stream,
stream_leaves_from_hotshot_query_service,
get_stake_table_from_sequencer, ProcessNodeIdentityUrlStreamTask, ProcessProduceLeafStreamTask,
};
use crate::service::{
client_id::ClientId,
client_message::InternalClientMessage,
client_state::{
process_distribute_block_detail_handling_stream,
process_distribute_node_identity_handling_stream,
process_distribute_voters_handling_stream, process_internal_client_message_stream,
ClientThreadState,
ClientThreadState, InternalClientMessageProcessingTask,
ProcessDistributeBlockDetailHandlingTask, ProcessDistributeNodeIdentityHandlingTask,
ProcessDistributeVotersHandlingTask,
},
data_state::{process_leaf_stream, process_node_identity_stream, DataState},
data_state::{DataState, ProcessLeafStreamTask, ProcessNodeIdentityStreamTask},
server_message::ServerMessage,
};
use async_std::{stream::StreamExt, sync::RwLock, task::JoinHandle};
use async_std::sync::RwLock;
use futures::{
channel::mpsc::{self, Receiver, Sender},
SinkExt,
};
use url::Url;

pub struct NodeValidatorAPI {
pub task_handles: Vec<JoinHandle<()>>,
pub process_internal_client_message_handle: Option<InternalClientMessageProcessingTask>,
pub process_distribute_block_detail_handle: Option<ProcessDistributeBlockDetailHandlingTask>,
pub process_distribute_node_identity_handle: Option<ProcessDistributeNodeIdentityHandlingTask>,
pub process_distribute_voters_handle: Option<ProcessDistributeVotersHandlingTask>,
pub process_leaf_stream_handle: Option<ProcessLeafStreamTask>,
pub process_node_identity_stream_handle: Option<ProcessNodeIdentityStreamTask>,
pub process_url_stream_handle: Option<ProcessNodeIdentityUrlStreamTask>,
pub process_consume_leaves: Option<ProcessProduceLeafStreamTask>,
}

pub struct NodeValidatorConfig {
Expand All @@ -47,7 +52,7 @@ pub enum CreateNodeValidatorProcessingError {
*/
pub async fn create_node_validator_processing(
config: NodeValidatorConfig,
server_message_receiver: Receiver<InternalClientMessage<Sender<ServerMessage>>>,
internal_client_message_receiver: Receiver<InternalClientMessage<Sender<ServerMessage>>>,
) -> Result<NodeValidatorAPI, CreateNodeValidatorProcessingError> {
let mut data_state = DataState::new(
Default::default(),
Expand All @@ -64,9 +69,10 @@ pub async fn create_node_validator_processing(
ClientId::from_count(1),
);

let client = surf_disco::Client::new(config.stake_table_url_base);
let client_stake_table = surf_disco::Client::new(config.stake_table_url_base.clone());
let client_leaf_stream = surf_disco::Client::new(config.stake_table_url_base);

let stake_table = get_stake_table_from_sequencer(client.clone())
let stake_table = get_stake_table_from_sequencer(client_stake_table)
.await
.map_err(CreateNodeValidatorProcessingError::FailedToGetStakeTable)?;

Expand All @@ -81,85 +87,45 @@ pub async fn create_node_validator_processing(
let (voters_sender, voters_receiver) = mpsc::channel(32);
let (mut url_sender, url_receiver) = mpsc::channel(32);

let process_internal_client_message_handle =
async_std::task::spawn(process_internal_client_message_stream(
server_message_receiver,
data_state.clone(),
client_thread_state.clone(),
));

let process_distribute_block_detail_handle =
async_std::task::spawn(process_distribute_block_detail_handling_stream(
client_thread_state.clone(),
block_detail_receiver,
));

let process_distribute_node_identity_handle =
async_std::task::spawn(process_distribute_node_identity_handling_stream(
client_thread_state.clone(),
node_identity_receiver_2,
));

let process_distribute_voters_handle = async_std::task::spawn(
process_distribute_voters_handling_stream(client_thread_state.clone(), voters_receiver),
let process_internal_client_message_handle = InternalClientMessageProcessingTask::new(
internal_client_message_receiver,
data_state.clone(),
client_thread_state.clone(),
);

let process_distribute_block_detail_handle = ProcessDistributeBlockDetailHandlingTask::new(
client_thread_state.clone(),
block_detail_receiver,
);

let process_leaf_stream_handle = async_std::task::spawn(process_leaf_stream(
let process_distribute_node_identity_handle = ProcessDistributeNodeIdentityHandlingTask::new(
client_thread_state.clone(),
node_identity_receiver_2,
);

let process_distribute_voters_handle =
ProcessDistributeVotersHandlingTask::new(client_thread_state.clone(), voters_receiver);

let process_leaf_stream_handle = ProcessLeafStreamTask::new(
leaf_receiver,
data_state.clone(),
block_detail_sender,
voters_sender,
));
);

let process_node_identity_stream_handle = async_std::task::spawn(process_node_identity_stream(
let process_node_identity_stream_handle = ProcessNodeIdentityStreamTask::new(
node_identity_receiver_1,
data_state.clone(),
node_identity_sender_2,
));

let process_url_stream_handle = async_std::task::spawn(process_node_identity_url_stream(
url_receiver,
node_identity_sender_1,
));

let leaf_retriever_handle = async_std::task::spawn(async move {
// Alright, let's start processing leaves
// TODO: We should move this into its own function that can respond
// and react appropriately when a service or sequencer does down
// so that it can gracefully re-establish the stream as necessary.

let client = client;

let mut leaf_stream = match stream_leaves_from_hotshot_query_service(None, client).await {
Ok(leaf_stream) => leaf_stream,
Err(err) => {
tracing::info!("error getting leaf stream: {}", err);
return;
}
};

let mut leaf_sender = leaf_sender;
);

loop {
let leaf_result = leaf_stream.next().await;
let leaf = if let Some(Ok(leaf)) = leaf_result {
leaf
} else {
tracing::info!("leaf stream closed");
break;
};
let process_url_stream_handle =
ProcessNodeIdentityUrlStreamTask::new(url_receiver, node_identity_sender_1);

let leaf_send_result = leaf_sender.send(leaf).await;
if let Err(err) = leaf_send_result {
tracing::info!("leaf sender closed: {}", err);
break;
}
}
});
let process_consume_leaves = ProcessProduceLeafStreamTask::new(client_leaf_stream, leaf_sender);

// send the original three node base urls
// This is assuming that demo-native is running, as such those Urls
// should be used / match
// Send any initial URLS to the url sender for immediate processing.
// These urls are supplied by the configuration of this function
{
let urls = config.initial_node_public_base_urls;

Expand All @@ -173,16 +139,14 @@ pub async fn create_node_validator_processing(
}

Ok(NodeValidatorAPI {
task_handles: vec![
process_internal_client_message_handle,
process_distribute_block_detail_handle,
process_distribute_node_identity_handle,
process_distribute_voters_handle,
process_leaf_stream_handle,
process_node_identity_stream_handle,
process_url_stream_handle,
leaf_retriever_handle,
],
process_internal_client_message_handle: Some(process_internal_client_message_handle),
process_distribute_block_detail_handle: Some(process_distribute_block_detail_handle),
process_distribute_node_identity_handle: Some(process_distribute_node_identity_handle),
process_distribute_voters_handle: Some(process_distribute_voters_handle),
process_leaf_stream_handle: Some(process_leaf_stream_handle),
process_node_identity_stream_handle: Some(process_node_identity_stream_handle),
process_url_stream_handle: Some(process_url_stream_handle),
process_consume_leaves: Some(process_consume_leaves),
})
}

Expand Down Expand Up @@ -257,8 +221,6 @@ mod test {

app_serve_handle.await;

for handle in node_validator_task_state.task_handles {
handle.cancel().await;
}
drop(node_validator_task_state);
}
}
Loading

0 comments on commit 511d23b

Please sign in to comment.