Skip to content

Commit

Permalink
Add test file to typos exclusion list
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayiga committed Jul 24, 2024
1 parent 7b4b61d commit f72aab8
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 72 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ extend-exclude = [
"doc/*.svg",
"contracts/lib",
"contract-bindings",
"node-metrics/src/api/node_validator/v0/example_prometheus_metrics_output.txt",
]
171 changes: 99 additions & 72 deletions node-metrics/src/api/node_validator/v0/create_node_validator_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use super::{
get_stake_table_from_sequencer, process_node_identity_url_stream,
stream_leaves_from_hotshot_query_service, StateClientMessageSender, STATIC_VER_0_1,
stream_leaves_from_hotshot_query_service,
};
use crate::service::{
client_id::ClientId,
Expand All @@ -18,22 +18,11 @@ use crate::service::{
};
use async_std::{stream::StreamExt, sync::RwLock, task::JoinHandle};
use futures::{
channel::mpsc::{self, Sender},
channel::mpsc::{self, Receiver, Sender},
SinkExt,
};
use tide_disco::App;
use url::Url;

pub struct NodeValidatorAPIState {
pub sender: Sender<InternalClientMessage<Sender<ServerMessage>>>,
}

impl StateClientMessageSender<Sender<ServerMessage>> for NodeValidatorAPIState {
fn sender(&self) -> Sender<InternalClientMessage<Sender<ServerMessage>>> {
self.sender.clone()
}
}

pub struct NodeValidatorAPI {
pub task_handles: Vec<JoinHandle<()>>,
}
Expand All @@ -44,29 +33,22 @@ pub struct NodeValidatorConfig {
pub initial_node_public_base_urls: Vec<Url>,
}

pub async fn create_node_validator_api(
config: NodeValidatorConfig,
) -> (NodeValidatorAPI, JoinHandle<()>) {
let node_validator_api_result = super::define_api::<NodeValidatorAPIState>();

let node_validator_api = match node_validator_api_result {
Ok(api) => api,
Err(e) => {
panic!("Error: {:?}", e);
}
};

let (server_message_sender, server_message_receiver) = mpsc::channel(32);
let mut app: App<NodeValidatorAPIState, super::Error> =
App::with_state(NodeValidatorAPIState {
sender: server_message_sender,
});
let register_module_result = app.register_module("node-validator", node_validator_api);

if let Err(e) = register_module_result {
panic!("Error: {:?}", e);
}
#[derive(Debug)]
pub enum CreateNodeValidatorProcessingError {
FailedToGetStakeTable(hotshot_query_service::Error),
}

/**
* create_node_validator_processing is a function that creates a node validator
* processing environment. This function will create a number of tasks that
* will be responsible for processing the data streams that are coming in from
* the various sources. This function will also create the data state that
* will be used to store the state of the network.
*/
pub async fn create_node_validator_processing(
config: NodeValidatorConfig,
server_message_receiver: Receiver<InternalClientMessage<Sender<ServerMessage>>>,
) -> Result<NodeValidatorAPI, CreateNodeValidatorProcessingError> {
let mut data_state = DataState::new(
Default::default(),
Default::default(),
Expand All @@ -82,13 +64,12 @@ pub async fn create_node_validator_api(
ClientId::from_count(1),
);

let client = surf_disco::Client::new(
// "https://query.cappuccino.testnet.espresso.network/v0"
config.stake_table_url_base,
);
let client = surf_disco::Client::new(config.stake_table_url_base);

let stake_table = get_stake_table_from_sequencer(client.clone())
.await
.map_err(CreateNodeValidatorProcessingError::FailedToGetStakeTable)?;

let get_stake_table_result = get_stake_table_from_sequencer(client.clone()).await;
let stake_table = get_stake_table_result.unwrap();
data_state.replace_stake_table(stake_table);

let data_state = Arc::new(RwLock::new(data_state));
Expand Down Expand Up @@ -142,13 +123,20 @@ pub async fn create_node_validator_api(
));

let leaf_retriever_handle = async_std::task::spawn(async move {
// Alright, let's get some leaves, bro
// Alright, let's start processing leaves
// TODO: We should move this into its own function that can respond
// and react appropriately when a service or sequencer does down
// so that it can gracefully re-establish the stream as necessary.

let client = client;

let mut leaf_stream = stream_leaves_from_hotshot_query_service(None, client)
.await
.unwrap();
let mut leaf_stream = match stream_leaves_from_hotshot_query_service(None, client).await {
Ok(leaf_stream) => leaf_stream,
Err(err) => {
tracing::info!("error getting leaf stream: {}", err);
return;
}
};

let mut leaf_sender = leaf_sender;

Expand Down Expand Up @@ -184,37 +172,62 @@ pub async fn create_node_validator_api(
}
}

let app_serve_handle = async_std::task::spawn(async move {
let app_serve_result = app.serve("0.0.0.0:9000", STATIC_VER_0_1).await;
tracing::info!("app serve result: {:?}", app_serve_result);
});

tracing::info!("listening on: {:?}", config.bind_address);

(
NodeValidatorAPI {
task_handles: vec![
process_internal_client_message_handle,
process_distribute_block_detail_handle,
process_distribute_node_identity_handle,
process_distribute_voters_handle,
process_leaf_stream_handle,
process_node_identity_stream_handle,
process_url_stream_handle,
leaf_retriever_handle,
],
},
app_serve_handle,
)
Ok(NodeValidatorAPI {
task_handles: vec![
process_internal_client_message_handle,
process_distribute_block_detail_handle,
process_distribute_node_identity_handle,
process_distribute_voters_handle,
process_leaf_stream_handle,
process_node_identity_stream_handle,
process_url_stream_handle,
leaf_retriever_handle,
],
})
}

#[cfg(test)]
mod test {
use crate::{
api::node_validator::v0::{StateClientMessageSender, STATIC_VER_0_1},
service::{client_message::InternalClientMessage, server_message::ServerMessage},
};
use futures::channel::mpsc::{self, Sender};
use tide_disco::App;

struct TestState(Sender<InternalClientMessage<Sender<ServerMessage>>>);

impl StateClientMessageSender<Sender<ServerMessage>> for TestState {
fn sender(&self) -> Sender<InternalClientMessage<Sender<ServerMessage>>> {
self.0.clone()
}
}

#[async_std::test]
#[ignore]
async fn test_full_setup_example() {
let (node_validator_api, app_serve_handle) =
super::create_node_validator_api(super::NodeValidatorConfig {
let (internal_client_message_sender, internal_client_message_receiver) = mpsc::channel(32);
let state = TestState(internal_client_message_sender);
// let state = Arc::new(state);

let mut app: App<_, crate::api::node_validator::v0::Error> = App::with_state(state);
let node_validator_api_result = super::super::define_api::<TestState>();
let node_validator_api = match node_validator_api_result {
Ok(node_validator_api) => node_validator_api,
Err(err) => {
panic!("error defining node validator api: {:?}", err);
}
};

match app.register_module("node-validator", node_validator_api) {
Ok(_) => {}
Err(err) => {
panic!("error registering node validator api: {:?}", err);
}
}

let node_validator_task_state = match super::create_node_validator_processing(
super::NodeValidatorConfig {
bind_address: "0.0.0.0:9000".to_string(),
stake_table_url_base: "http://localhost:24000/v0".parse().unwrap(),
initial_node_public_base_urls: vec![
Expand All @@ -224,13 +237,27 @@ mod test {
"http://localhost:24003/".parse().unwrap(),
"http://localhost:24004/".parse().unwrap(),
],
})
.await;
},
internal_client_message_receiver,
)
.await
{
Ok(node_validator_task_state) => node_validator_task_state,

Err(err) => {
panic!("error defining node validator api: {:?}", err);
}
};

// We would like to wait until being signaled
let app_serve_handle = async_std::task::spawn(async move {
let app_serve_result = app.serve("0.0.0.0:9000", STATIC_VER_0_1).await;
tracing::info!("app serve result: {:?}", app_serve_result);
});

app_serve_handle.await;

for handle in node_validator_api.task_handles {
for handle in node_validator_task_state.task_handles {
handle.cancel().await;
}
}
Expand Down

0 comments on commit f72aab8

Please sign in to comment.