diff --git a/src/connectors/fname/mod.rs b/src/connectors/fname/mod.rs index ca833eb9..66eb56a9 100644 --- a/src/connectors/fname/mod.rs +++ b/src/connectors/fname/mod.rs @@ -1,14 +1,14 @@ -use std::collections::HashMap; - use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::time::{sleep, Duration}; +use tokio::{ + sync::mpsc, + time::{sleep, Duration}, +}; use tracing::{debug, error, info, warn}; use crate::{ - mempool::routing::MessageRouter, proto::{FnameTransfer, UserNameProof, UserNameType, ValidatorMessage}, - storage::store::engine::{MempoolMessage, Senders}, + storage::store::engine::MempoolMessage, }; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -77,25 +77,16 @@ pub struct Fetcher { position: u64, transfers: Vec, cfg: Config, - shard_senders: HashMap, - message_router: Box, - num_shards: u32, + mempool_tx: mpsc::Sender, } impl Fetcher { - pub fn new( - cfg: Config, - shard_senders: HashMap, - num_shards: u32, - message_router: Box, - ) -> Self { + pub fn new(cfg: Config, mempool_tx: mpsc::Sender) -> Self { Fetcher { position: cfg.start_from, transfers: vec![], cfg: cfg, - shard_senders, - num_shards, - message_router, + mempool_tx, } } @@ -127,41 +118,32 @@ impl Fetcher { self.position = t.id; self.transfers.push(t.clone()); // Just store these for now, we'll use them later - let shard = self.message_router.route_message(t.to, self.num_shards); - let senders = self.shard_senders.get(&shard); - match senders { - None => { - error!(id = t.id, "Unable to find shard to send fname transfer to") - } - Some(senders) => { - let username_proof = UserNameProof { - timestamp: t.timestamp, - name: t.username.into_bytes(), - owner: t.owner.into_bytes(), - signature: t.server_signature.into_bytes(), - fid: t.to, - r#type: UserNameType::UsernameTypeFname as i32, - }; - if let Err(err) = senders - .messages_tx - .send(MempoolMessage::ValidatorMessage(ValidatorMessage { - on_chain_event: None, - fname_transfer: Some(FnameTransfer { - id: t.to, - from_fid: t.from, - proof: Some(username_proof), - }), - })) - .await - { - error!( - from = t.from, - to = t.to, - err = err.to_string(), - "Unable to send fname transfer to mempool" - ) - } - } + let username_proof = UserNameProof { + timestamp: t.timestamp, + name: t.username.into_bytes(), + owner: t.owner.into_bytes(), + signature: t.server_signature.into_bytes(), + fid: t.to, + r#type: UserNameType::UsernameTypeFname as i32, + }; + if let Err(err) = self + .mempool_tx + .send(MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: None, + fname_transfer: Some(FnameTransfer { + id: t.to, + from_fid: t.from, + proof: Some(username_proof), + }), + })) + .await + { + error!( + from = t.from, + to = t.to, + err = err.to_string(), + "Unable to send fname transfer to mempool" + ) } } } diff --git a/src/connectors/onchain_events/mod.rs b/src/connectors/onchain_events/mod.rs index f2477170..e9e3dabb 100644 --- a/src/connectors/onchain_events/mod.rs +++ b/src/connectors/onchain_events/mod.rs @@ -10,16 +10,16 @@ use foundry_common::ens::EnsError; use futures_util::stream::StreamExt; use serde::{Deserialize, Serialize}; use thiserror::Error; +use tokio::sync::mpsc; use tracing::{error, info}; use crate::{ - mempool::routing::MessageRouter, proto::{ on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType, SignerEventBody, SignerEventType, SignerMigratedEventBody, StorageRentEventBody, ValidatorMessage, }, - storage::store::engine::{MempoolMessage, Senders}, + storage::store::engine::MempoolMessage, }; sol!( @@ -135,9 +135,7 @@ impl L1Client for RealL1Client { pub struct Subscriber { provider: RootProvider>, onchain_events_by_block: HashMap>, - shard_senders: HashMap, - message_router: Box, - num_shards: u32, + mempool_tx: mpsc::Sender, start_block_number: u64, stop_block_number: u64, } @@ -146,9 +144,7 @@ pub struct Subscriber { impl Subscriber { pub fn new( config: Config, - shard_senders: HashMap, - num_shards: u32, - message_router: Box, + mempool_tx: mpsc::Sender, ) -> Result { if config.rpc_url.is_empty() { return Err(SubscribeError::EmptyRpcUrl); @@ -158,9 +154,7 @@ impl Subscriber { Ok(Subscriber { provider, onchain_events_by_block: HashMap::new(), - shard_senders, - num_shards, - message_router, + mempool_tx, start_block_number: config.start_block_number, stop_block_number: config.stop_block_number, }) @@ -208,35 +202,21 @@ impl Subscriber { } Some(events) => events.push(event.clone()), } - let shard = self.message_router.route_message(fid, self.num_shards); - let senders = self.shard_senders.get(&shard); - match senders { - None => { - error!( - block_number = event.block_number, - tx_hash = hex::encode(&event.transaction_hash), - log_index = event.log_index, - "Unable to find shard to send onchain event to" - ) - } - Some(senders) => { - if let Err(err) = senders - .messages_tx - .send(MempoolMessage::ValidatorMessage(ValidatorMessage { - on_chain_event: Some(event.clone()), - fname_transfer: None, - })) - .await - { - error!( - block_number = event.block_number, - tx_hash = hex::encode(&event.transaction_hash), - log_index = event.log_index, - err = err.to_string(), - "Unable to send onchain event to mempool" - ) - } - } + if let Err(err) = self + .mempool_tx + .send(MempoolMessage::ValidatorMessage(ValidatorMessage { + on_chain_event: Some(event.clone()), + fname_transfer: None, + })) + .await + { + error!( + block_number = event.block_number, + tx_hash = hex::encode(&event.transaction_hash), + log_index = event.log_index, + err = err.to_string(), + "Unable to send onchain event to mempool" + ) } } diff --git a/src/main.rs b/src/main.rs index 1d207c47..d5cef86c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use informalsystems_malachitebft_metrics::{Metrics, SharedRegistry}; use snapchain::connectors::onchain_events::{L1Client, RealL1Client}; use snapchain::consensus::consensus::SystemMessage; use snapchain::core::types::proto; +use snapchain::mempool::mempool::Mempool; use snapchain::mempool::routing; use snapchain::network::admin_server::{DbManager, MyAdminService}; use snapchain::network::gossip::GossipEvent; @@ -121,7 +122,6 @@ async fn main() -> Result<(), Box> { let node = SnapchainNode::create( keypair.clone(), app_config.consensus.clone(), - app_config.mempool.clone(), Some(app_config.rpc_address.clone()), gossip_tx.clone(), None, @@ -132,19 +132,20 @@ async fn main() -> Result<(), Box> { ) .await; - let admin_service = MyAdminService::new( - db_manager, - node.shard_senders.clone(), + let (mempool_tx, mempool_rx) = mpsc::channel(app_config.mempool.queue_size as usize); + let mut mempool = Mempool::new( + mempool_rx, app_config.consensus.num_shards, - Box::new(routing::ShardRouter {}), + node.shard_senders.clone(), ); + tokio::spawn(async move { mempool.run().await }); + + let admin_service = MyAdminService::new(db_manager, mempool_tx.clone()); if !app_config.fnames.disable { let mut fetcher = snapchain::connectors::fname::Fetcher::new( app_config.fnames.clone(), - node.shard_senders.clone(), - app_config.consensus.num_shards, - Box::new(routing::ShardRouter {}), + mempool_tx.clone(), ); tokio::spawn(async move { @@ -155,9 +156,7 @@ async fn main() -> Result<(), Box> { if !app_config.onchain_events.rpc_url.is_empty() { let mut onchain_events_subscriber = snapchain::connectors::onchain_events::Subscriber::new( app_config.onchain_events, - node.shard_senders.clone(), - app_config.consensus.num_shards, - Box::new(routing::ShardRouter {}), + mempool_tx.clone(), )?; tokio::spawn(async move { let result = onchain_events_subscriber.run(false).await; @@ -186,6 +185,7 @@ async fn main() -> Result<(), Box> { statsd_client.clone(), app_config.consensus.num_shards, Box::new(routing::ShardRouter {}), + mempool_tx.clone(), l1_client, ); diff --git a/src/mempool/mempool.rs b/src/mempool/mempool.rs index f26fcb84..615c2b0f 100644 --- a/src/mempool/mempool.rs +++ b/src/mempool/mempool.rs @@ -1,4 +1,12 @@ +use std::collections::HashMap; + use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; + +use crate::storage::store::engine::{MempoolMessage, Senders}; + +use super::routing::{MessageRouter, ShardRouter}; +use tracing::error; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -10,3 +18,43 @@ impl Default for Config { Self { queue_size: 500 } } } + +pub struct Mempool { + shard_senders: HashMap, + message_router: Box, + num_shards: u32, + mempool_rx: mpsc::Receiver, +} + +impl Mempool { + pub fn new( + mempool_rx: mpsc::Receiver, + num_shards: u32, + shard_senders: HashMap, + ) -> Self { + Mempool { + shard_senders, + num_shards, + mempool_rx, + message_router: Box::new(ShardRouter {}), + } + } + + pub async fn run(&mut self) { + while let Some(message) = self.mempool_rx.recv().await { + let fid = message.fid(); + let shard = self.message_router.route_message(fid, self.num_shards); + let senders = self.shard_senders.get(&shard); + match senders { + None => { + error!("Unable to find shard to send message to") + } + Some(senders) => { + if let Err(err) = senders.messages_tx.send(message).await { + error!("Unable to send message to engine: {}", err.to_string()) + } + } + } + } + } +} diff --git a/src/network/admin_server.rs b/src/network/admin_server.rs index 1382431f..361a2201 100644 --- a/src/network/admin_server.rs +++ b/src/network/admin_server.rs @@ -1,12 +1,11 @@ -use crate::mempool::routing::MessageRouter; use crate::proto::admin_service_server::AdminService; use crate::proto::{self, FnameTransfer, OnChainEvent}; use crate::proto::{UserNameProof, ValidatorMessage}; -use crate::storage::store::engine::{MempoolMessage, Senders}; +use crate::storage::store::engine::MempoolMessage; use rocksdb; -use std::collections::HashMap; use std::{io, path, process}; use thiserror::Error; +use tokio::sync::mpsc; use tonic::{Request, Response, Status}; use tracing::{info, warn}; @@ -62,9 +61,7 @@ impl DbManager { pub struct MyAdminService { db_manager: DbManager, - num_shards: u32, - message_router: Box, - pub shard_senders: HashMap, + pub mempool_tx: mpsc::Sender, } #[derive(Debug, Error)] @@ -79,17 +76,10 @@ pub enum AdminServiceError { const DB_DESTROY_KEY: &[u8] = b"__destroy_all_databases_on_start__"; impl MyAdminService { - pub fn new( - db_manager: DbManager, - shard_senders: HashMap, - num_shards: u32, - message_router: Box, - ) -> Self { + pub fn new(db_manager: DbManager, mempool_tx: mpsc::Sender) -> Self { Self { db_manager, - shard_senders, - num_shards, - message_router, + mempool_tx, } } } @@ -136,19 +126,8 @@ impl AdminService for MyAdminService { )); } - let dst_shard = self.message_router.route_message(fid, self.num_shards); - - let sender = match self.shard_senders.get(&dst_shard) { - Some(sender) => sender, - None => { - return Err(Status::invalid_argument( - "no shard sender for fid".to_string(), - )) - } - }; - - let result = sender - .messages_tx + let result = self + .mempool_tx .send(MempoolMessage::ValidatorMessage(ValidatorMessage { on_chain_event: Some(onchain_event.clone()), fname_transfer: None, @@ -179,19 +158,8 @@ impl AdminService for MyAdminService { )); } - let dst_shard = self.message_router.route_message(fid, self.num_shards); - - let sender = match self.shard_senders.get(&dst_shard) { - Some(sender) => sender, - None => { - return Err(Status::invalid_argument( - "no shard sender for fid".to_string(), - )) - } - }; - - let result = sender - .messages_tx + let result = self + .mempool_tx .send(MempoolMessage::ValidatorMessage(ValidatorMessage { on_chain_event: None, fname_transfer: Some(FnameTransfer { diff --git a/src/network/server.rs b/src/network/server.rs index 55546241..45dfa8e3 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -46,6 +46,7 @@ pub struct MyHubService { message_router: Box, statsd_client: StatsdClientWrapper, l1_client: Option>, + mempool_tx: mpsc::Sender, } impl MyHubService { @@ -56,6 +57,7 @@ impl MyHubService { statsd_client: StatsdClientWrapper, num_shards: u32, message_router: Box, + mempool_tx: mpsc::Sender, l1_client: Option>, ) -> Self { Self { @@ -66,6 +68,7 @@ impl MyHubService { message_router, num_shards, l1_client, + mempool_tx, } } @@ -83,15 +86,6 @@ impl MyHubService { let dst_shard = self.message_router.route_message(fid, self.num_shards); - let sender = match self.shard_senders.get(&dst_shard) { - Some(sender) => sender, - None => { - return Err(Status::invalid_argument( - "no shard sender for fid".to_string(), - )) - } - }; - let stores = match self.shard_stores.get(&dst_shard) { Some(store) => store, None => { @@ -110,7 +104,6 @@ impl MyHubService { stores.store_limits.clone(), self.statsd_client.clone(), 100, - 200, ); let result = readonly_engine.simulate_message(&message); @@ -142,8 +135,8 @@ impl MyHubService { } } - match sender - .messages_tx + match self + .mempool_tx .try_send(MempoolMessage::UserMessage(message.clone())) { Ok(_) => { diff --git a/src/network/server_tests.rs b/src/network/server_tests.rs index 661a8d26..d2b1ee76 100644 --- a/src/network/server_tests.rs +++ b/src/network/server_tests.rs @@ -8,6 +8,7 @@ mod tests { use std::time::Duration; use crate::connectors::onchain_events::L1Client; + use crate::mempool::mempool::Mempool; use crate::mempool::routing; use crate::mempool::routing::MessageRouter; use crate::network::server::MyHubService; @@ -118,7 +119,7 @@ mod tests { db } - fn make_server() -> ( + async fn make_server() -> ( HashMap, HashMap, [ShardEngine; 2], @@ -167,6 +168,10 @@ mod tests { assert_eq!(message_router.route_message(SHARD1_FID, 2), 1); assert_eq!(message_router.route_message(SHARD2_FID, 2), 2); + let (mempool_tx, mempool_rx) = mpsc::channel(1000); + let mut mempool = Mempool::new(mempool_rx, num_shards, senders.clone()); + tokio::spawn(async move { mempool.run().await }); + ( stores.clone(), senders.clone(), @@ -178,6 +183,7 @@ mod tests { statsd_client, num_shards, message_router, + mempool_tx, Some(Box::new(MockL1Client {})), ), ) @@ -185,7 +191,7 @@ mod tests { #[tokio::test] async fn test_subscribe_rpc() { - let (stores, senders, _, service) = make_server(); + let (stores, senders, _, service) = make_server().await; let num_shard1_pre_existing_events = 10; let num_shard2_pre_existing_events = 20; @@ -233,7 +239,7 @@ mod tests { #[tokio::test] async fn test_submit_message_fails_with_error_for_invalid_messages() { - let (_stores, _senders, _, service) = make_server(); + let (_stores, _senders, _, service) = make_server().await; // Message with no fid registration let invalid_message = messages_factory::casts::create_cast_add(123, "test", None, None); @@ -249,7 +255,7 @@ mod tests { #[tokio::test] async fn test_good_ens_proof() { - let (_stores, _senders, [mut engine1, mut _engine2], service) = make_server(); + let (_stores, _senders, [mut engine1, mut _engine2], service) = make_server().await; let signer = test_helper::default_signer(); let owner = hex::decode("91031dcfdea024b4d51e775486111d2b2a715871").unwrap(); let fid = SHARD1_FID; @@ -274,7 +280,7 @@ mod tests { #[tokio::test] async fn test_ens_proof_with_bad_owner() { - let (_stores, _senders, [mut engine1, mut _engine2], service) = make_server(); + let (_stores, _senders, [mut engine1, mut _engine2], service) = make_server().await; let signer = test_helper::default_signer(); let owner = test_helper::default_custody_address(); let fid = SHARD1_FID; @@ -299,7 +305,7 @@ mod tests { #[tokio::test] async fn test_ens_proof_with_bad_custody_address() { - let (_stores, _senders, [mut engine1, mut _engine2], service) = make_server(); + let (_stores, _senders, [mut engine1, mut _engine2], service) = make_server().await; let signer = test_helper::default_signer(); let owner = test_helper::default_custody_address(); let fid = SHARD1_FID; @@ -330,7 +336,7 @@ mod tests { #[tokio::test] async fn test_ens_proof_with_verified_address() { - let (_stores, _senders, [mut _engine1, mut engine2], service) = make_server(); + let (_stores, _senders, [mut _engine1, mut engine2], service) = make_server().await; let signer = test_helper::default_signer(); let fid = 2; let owner = test_helper::default_custody_address(); @@ -368,7 +374,7 @@ mod tests { #[tokio::test] async fn test_cast_apis() { - let (_, _, [mut engine1, mut engine2], service) = make_server(); + let (_, _, [mut engine1, mut engine2], service) = make_server().await; let engine1 = &mut engine1; let engine2 = &mut engine2; test_helper::register_user( @@ -498,7 +504,7 @@ mod tests { #[tokio::test] async fn test_storage_limits() { // Works with no storage - let (_, _, [mut engine1, _], service) = make_server(); + let (_, _, [mut engine1, _], service) = make_server().await; let response = service .get_current_storage_limits_by_fid(FidRequest::for_fid(SHARD1_FID)) diff --git a/src/node/snapchain_node.rs b/src/node/snapchain_node.rs index 7458dd11..6e8318dd 100644 --- a/src/node/snapchain_node.rs +++ b/src/node/snapchain_node.rs @@ -5,7 +5,6 @@ use crate::core::types::{ Address, Height, ShardId, SnapchainShard, SnapchainValidator, SnapchainValidatorContext, SnapchainValidatorSet, }; -use crate::mempool::mempool; use crate::network::gossip::GossipEvent; use crate::proto::{Block, ShardChunk}; use crate::storage::db::RocksDB; @@ -37,7 +36,6 @@ impl SnapchainNode { pub async fn create( keypair: Keypair, config: Config, - mempool_config: mempool::Config, rpc_address: Option, gossip_tx: mpsc::Sender>, block_tx: Option>, @@ -93,7 +91,6 @@ impl SnapchainNode { StoreLimits::default(), statsd_client.clone(), config.max_messages_per_block, - mempool_config.queue_size, ); shard_senders.insert(shard_id, engine.get_senders()); diff --git a/src/storage/store/engine.rs b/src/storage/store/engine.rs index bcbe3a04..dd87bf5c 100644 --- a/src/storage/store/engine.rs +++ b/src/storage/store/engine.rs @@ -155,11 +155,9 @@ impl ShardEngine { store_limits: StoreLimits, statsd_client: StatsdClientWrapper, max_messages_per_block: u32, - mempool_queue_size: u32, ) -> ShardEngine { // TODO: adding the trie here introduces many calls that want to return errors. Rethink unwrap strategy. - let (messages_tx, messages_rx) = - mpsc::channel::(mempool_queue_size as usize); + let (messages_tx, messages_rx) = mpsc::channel::(100); ShardEngine { shard_id, stores: Stores::new(db.clone(), trie, store_limits), diff --git a/src/storage/store/test_helper.rs b/src/storage/store/test_helper.rs index dc986c8b..da9cc4e0 100644 --- a/src/storage/store/test_helper.rs +++ b/src/storage/store/test_helper.rs @@ -120,7 +120,6 @@ pub fn new_engine_with_options(options: EngineOptions) -> (ShardEngine, tempfile test_limits, statsd_client, 256, - 256 * 2, ), dir, ) diff --git a/tests/consensus_test.rs b/tests/consensus_test.rs index b9641608..75ed293e 100644 --- a/tests/consensus_test.rs +++ b/tests/consensus_test.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use hex; use libp2p::identity::ed25519::Keypair; -use snapchain::mempool::{mempool, routing}; +use snapchain::mempool::mempool::Mempool; +use snapchain::mempool::routing; use snapchain::network::server::MyHubService; use snapchain::node::snapchain_node::SnapchainNode; use snapchain::proto::hub_service_server::HubServiceServer; @@ -72,7 +73,6 @@ impl NodeForTest { let node = SnapchainNode::create( keypair.clone(), config, - mempool::Config::default(), None, gossip_tx, Some(block_tx), @@ -112,6 +112,10 @@ impl NodeForTest { let grpc_block_store = block_store.clone(); let grpc_shard_stores = node.shard_stores.clone(); let grpc_shard_senders = node.shard_senders.clone(); + let (mempool_tx, mempool_rx) = mpsc::channel(100); + let mut mempool = Mempool::new(mempool_rx, num_shards, node.shard_senders.clone()); + tokio::spawn(async move { mempool.run().await }); + tokio::spawn(async move { let service = MyHubService::new( grpc_block_store, @@ -120,6 +124,7 @@ impl NodeForTest { statsd_client.clone(), num_shards, Box::new(routing::EvenOddRouterForTest {}), + mempool_tx, None, );