diff --git a/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs b/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs index 97a667a574..cfcc3238d5 100644 --- a/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs +++ b/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs @@ -1,8 +1,6 @@ use std::sync::Arc; -use super::{ - get_stake_table_from_sequencer, ProcessNodeIdentityUrlStreamTask, ProcessProduceLeafStreamTask, -}; +use super::{get_stake_table_from_sequencer, ProcessNodeIdentityUrlStreamTask}; use crate::service::{ client_id::ClientId, client_message::InternalClientMessage, @@ -20,6 +18,7 @@ use futures::{ channel::mpsc::{self, Receiver, SendError, Sender}, Sink, SinkExt, Stream, StreamExt, }; +use hotshot_query_service::Leaf; use hotshot_types::event::{Event, EventType}; use serde::{Deserialize, Serialize}; use url::Url; @@ -32,8 +31,6 @@ pub struct NodeValidatorAPI { pub process_leaf_stream_handle: Option, pub process_node_identity_stream_handle: Option, pub process_url_stream_handle: Option, - pub process_consume_leaves: Option, - pub hotshot_event_processing_task: Option, } pub struct NodeValidatorConfig { @@ -65,30 +62,54 @@ pub struct RollCallInfo { pub public_api_url: Url, } +/// [HotShotEventProcessingTask] is a task that is capable of processing events +/// that are coming in from a HotShot event stream. This task will keep an +/// eye out for ExternalMessageReceived events that can be decoded as a +/// RollCallResponse. When a RollCallResponse is received, the public API URL +/// of the node that sent the message will be sent to the provided url_sender. +/// +/// Additionally, this can receive Decide events and send the discovered leaves +/// to the provided leaf_sender. This can can be used as a means of receiving +/// leaves that doesn't involve hitting an external service like the task +/// [ProcessProduceLeafStreamTask] does. pub struct HotShotEventProcessingTask { pub task_handle: Option>, } impl HotShotEventProcessingTask { - pub fn new(event_stream: S, url_sender: K) -> Self + /// [new] creates a new [HotShotEventProcessingTask] that will process + /// events from the provided event_stream. + /// + /// Calls to [new] will spawn a new task that will start processing + /// immediately. The task handle will be stored in the returned structure. + pub fn new(event_stream: S, url_sender: K1, leaf_sender: K2) -> Self where S: Stream> + Send + Unpin + 'static, - K: Sink + Send + Unpin + 'static, + K1: Sink + Send + Unpin + 'static, + K2: Sink, Error = SendError> + Send + Unpin + 'static, { - let task_handle = async_std::task::spawn(Self::process_messages(event_stream, url_sender)); + let task_handle = async_std::task::spawn(Self::process_messages( + event_stream, + url_sender, + leaf_sender, + )); Self { task_handle: Some(task_handle), } } - async fn process_messages(event_receiver: S, url_sender: K) + /// [process_messages] is a function that will process messages from the + /// provided event stream. + async fn process_messages(event_receiver: S, url_sender: K1, leaf_sender: K2) where S: Stream> + Send + Unpin + 'static, - K: Sink + Unpin, + K1: Sink + Unpin, + K2: Sink, Error = SendError> + Unpin, { let mut event_stream = event_receiver; let mut url_sender = url_sender; + let mut leaf_sender = leaf_sender; loop { let event_result = event_stream.next().await; let event = match event_result { @@ -101,42 +122,151 @@ impl HotShotEventProcessingTask { let Event { event, .. } = event; - let external_message_deserialize_result = - if let EventType::ExternalMessageReceived(external_message_bytes) = event { - bincode::deserialize(&external_message_bytes) - } else { - // Ignore all events that are not external messages - continue; - }; - - let external_message: ExternalMessage = match external_message_deserialize_result { - Ok(external_message) => external_message, - Err(err) => { - tracing::info!( - "failed to deserialize external message, unrecognized: {}", - err - ); + match event { + EventType::Decide { leaf_chain, .. } => { + for leaf_info in leaf_chain.iter() { + let leaf = leaf_info.leaf.clone(); + + let send_result = leaf_sender.send(leaf).await; + if let Err(err) = send_result { + tracing::info!("leaf sender closed: {}", err); + return; + } + } + } + + EventType::ExternalMessageReceived(external_message_bytes) => { + let roll_call_info = match bincode::deserialize(&external_message_bytes) { + Ok(ExternalMessage::RollCallResponse(roll_call_info)) => roll_call_info, + + Err(err) => { + tracing::info!( + "failed to deserialize external message, unrecognized: {}", + err + ); + continue; + } + + _ => { + // Ignore any other potentially recognized messages + continue; + } + }; + + let public_api_url = roll_call_info.public_api_url; + + // Send the the discovered public url to the sink + let send_result = url_sender.send(public_api_url).await; + if let Err(err) = send_result { + tracing::info!("url sender closed: {}", err); + return; + } + } + _ => { + // Ignore all other events continue; } - }; + } + } + } +} + +/// [Drop] implementation for [HotShotEventProcessingTask] that will cancel the +/// task when the structure is dropped. +impl Drop for HotShotEventProcessingTask { + fn drop(&mut self) { + if let Some(task_handle) = self.task_handle.take() { + async_std::task::block_on(task_handle.cancel()); + } + } +} - let public_api_url = match external_message { - ExternalMessage::RollCallResponse(roll_call_response) => { - roll_call_response.public_api_url +/// [ProcessExternalMessageHandlingTask] is a task that is capable of processing +/// external messages that are coming in from an external message stream. This +/// task will keep an eye out for ExternalMessageReceived events that can be +/// decoded as a RollCallResponse. When a RollCallResponse is received, the +/// public API URL of the node that sent the message will be sent to the +/// provided url_sender. +/// +/// This task can be used as a means of processing [ExternalMessage]s that are +/// not being provided by a HotShot event stream. It can be used as an +/// alternative to the [HotShotEventProcessingTask] for processing external +/// messages. +pub struct ProcessExternalMessageHandlingTask { + pub task_handle: Option>, +} + +impl ProcessExternalMessageHandlingTask { + /// [new] creates a new [ProcessExternalMessageHandlingTask] that will + /// process external messages from the provided external_message_receiver. + /// + /// Calls to [new] will spawn a new task that will start processing + /// immediately. The task handle will be stored in the returned structure. + pub fn new(external_message_receiver: S, url_sender: K) -> Self + where + S: Stream + Send + Unpin + 'static, + K: Sink + Send + Unpin + 'static, + { + let task_handle = async_std::task::spawn(Self::process_external_messages( + external_message_receiver, + url_sender, + )); + + Self { + task_handle: Some(task_handle), + } + } + + /// [process_external_messages] is a function that will process messages from + /// the provided external message stream. + async fn process_external_messages(external_message_receiver: S, url_sender: K) + where + S: Stream + Send + Unpin + 'static, + K: Sink + Send + Unpin + 'static, + { + let mut external_message_receiver = external_message_receiver; + let mut url_sender = url_sender; + + loop { + let external_message_result = external_message_receiver.next().await; + let external_message = match external_message_result { + Some(external_message) => external_message, + None => { + tracing::info!("external message receiver closed"); + break; } - _ => continue, }; - // Send the the discovered public url to the sink - let send_result = url_sender.send(public_api_url).await; - if let Err(err) = send_result { - tracing::info!("url sender closed: {}", err); - break; + match external_message { + ExternalMessage::RollCallResponse(roll_call_info) => { + let public_api_url = roll_call_info.public_api_url; + + let send_result = url_sender.send(public_api_url).await; + if let Err(err) = send_result { + tracing::info!("url sender closed: {}", err); + break; + } + } + + _ => { + // Ignore all other messages + continue; + } } } } } +/// [Drop] implementation for [ProcessExternalMessageHandlingTask] that will +/// cancel the task when the structure is dropped. +impl Drop for ProcessExternalMessageHandlingTask { + fn drop(&mut self) { + if let Some(task_handle) = self.task_handle.take() { + async_std::task::block_on(task_handle.cancel()); + } + } +} + /** * create_node_validator_processing is a function that creates a node validator * processing environment. This function will create a number of tasks that @@ -144,17 +274,11 @@ impl HotShotEventProcessingTask { * the various sources. This function will also create the data state that * will be used to store the state of the network. */ -pub async fn create_node_validator_processing( +pub async fn create_node_validator_processing( config: NodeValidatorConfig, internal_client_message_receiver: Receiver>>, - public_key: PubKey, - event_stream: Option, - external_message_sink: Option, -) -> Result -where - M: Stream> + Send + Unpin + 'static, - K: Sink + Send + Unpin + 'static, -{ + leaf_receiver: Receiver>, +) -> Result { let mut data_state = DataState::new( Default::default(), Default::default(), @@ -171,7 +295,6 @@ where ); let client_stake_table = surf_disco::Client::new(config.stake_table_url_base.clone()); - let client_leaf_stream = surf_disco::Client::new(config.stake_table_url_base); let stake_table = get_stake_table_from_sequencer(client_stake_table) .await @@ -182,7 +305,6 @@ where let data_state = Arc::new(RwLock::new(data_state)); let client_thread_state = Arc::new(RwLock::new(client_thread_state)); let (block_detail_sender, block_detail_receiver) = mpsc::channel(32); - let (leaf_sender, leaf_receiver) = mpsc::channel(32); let (node_identity_sender_1, node_identity_receiver_1) = mpsc::channel(32); let (node_identity_sender_2, node_identity_receiver_2) = mpsc::channel(32); let (voters_sender, voters_receiver) = mpsc::channel(32); @@ -223,30 +345,6 @@ where let process_url_stream_handle = ProcessNodeIdentityUrlStreamTask::new(url_receiver, node_identity_sender_1); - let process_consume_leaves = ProcessProduceLeafStreamTask::new(client_leaf_stream, leaf_sender); - - let hotshot_event_processing_task = match (event_stream, external_message_sink) { - (Some(event_stream), Some(mut external_message_sink)) => { - let hotshot_event_processing_task = - HotShotEventProcessingTask::new(event_stream, url_sender.clone()); - - let send_roll_call_result = external_message_sink - .send(ExternalMessage::RollCallRequest(public_key)) - .await; - - if let Err(err) = send_roll_call_result { - tracing::info!("external message sink closed: {}", err); - } - - Some(hotshot_event_processing_task) - } - _ => { - // It doesn't make sne to send out a RollCall message if we don't - // have the ability to receive the response. - None - } - }; - // Send any initial URLS to the url sender for immediate processing. // These urls are supplied by the configuration of this function { @@ -269,20 +367,18 @@ where process_leaf_stream_handle: Some(process_leaf_stream_handle), process_node_identity_stream_handle: Some(process_node_identity_stream_handle), process_url_stream_handle: Some(process_url_stream_handle), - process_consume_leaves: Some(process_consume_leaves), - hotshot_event_processing_task, }) } #[cfg(test)] mod test { use crate::{ - api::node_validator::v0::{StateClientMessageSender, STATIC_VER_0_1}, + api::node_validator::v0::{ + ProcessProduceLeafStreamTask, StateClientMessageSender, STATIC_VER_0_1, + }, service::{client_message::InternalClientMessage, server_message::ServerMessage}, }; - use espresso_types::PubKey; use futures::channel::mpsc::{self, Sender}; - use hotshot_types::traits::signature_key::BuilderSignatureKey; use tide_disco::App; struct TestState(Sender>>); @@ -315,25 +411,38 @@ mod test { } } - let public_key = PubKey::generated_from_seed_indexed([0; 32], 0).0; - let (external_message_sender, _external_message_receiver) = mpsc::channel(10); - let (_event_sender, event_receiver) = mpsc::channel(10); + let (leaf_sender, leaf_receiver) = mpsc::channel(10); + + let client_leaf_stream = surf_disco::Client::new( + "https://query.cappuccino.testnet.espresso.network/v0" + .parse() + .unwrap(), + ); + let process_consume_leaves = + ProcessProduceLeafStreamTask::new(client_leaf_stream, leaf_sender); let node_validator_task_state = match super::create_node_validator_processing( super::NodeValidatorConfig { - stake_table_url_base: "http://localhost:24000/v0".parse().unwrap(), + stake_table_url_base: "https://query.cappuccino.testnet.espresso.network/v0" + .parse() + .unwrap(), initial_node_public_base_urls: vec![ - "http://localhost:24000/".parse().unwrap(), - "http://localhost:24001/".parse().unwrap(), - "http://localhost:24002/".parse().unwrap(), - "http://localhost:24003/".parse().unwrap(), - "http://localhost:24004/".parse().unwrap(), + "https://query-1.cappuccino.testnet.espresso.network/" + .parse() + .unwrap(), + "https://query-2.cappuccino.testnet.espresso.network/" + .parse() + .unwrap(), + "https://query-3.cappuccino.testnet.espresso.network/" + .parse() + .unwrap(), + "https://query-4.cappuccino.testnet.espresso.network/" + .parse() + .unwrap(), ], }, internal_client_message_receiver, - public_key, - Some(event_receiver), - Some(external_message_sender), + leaf_receiver, ) .await { @@ -354,5 +463,6 @@ mod test { app_serve_handle.await; drop(node_validator_task_state); + drop(process_consume_leaves); } }