diff --git a/Cargo.lock b/Cargo.lock index a010fd1bec..afba7ef22a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3420,11 +3420,13 @@ dependencies = [ "jf-utils", "jf-vid", "lazy_static", + "libp2p", "memoize", "rand 0.8.5", "rand_chacha 0.3.1", "reqwest", "serde", + "serde_bytes", "serde_json", "sha2 0.10.8", "snafu", @@ -6636,9 +6638,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.206" +version = "1.0.207" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b3e4cd94123dd520a128bcd11e34d9e9e423e7e3e50425cb1b4b1e3549d0284" +checksum = "5665e14a49a4ea1b91029ba7d3bca9f299e1f7cfa194388ccc20f14743e784f2" dependencies = [ "serde_derive", ] @@ -6665,9 +6667,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.206" +version = "1.0.207" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fabfb6138d2383ea8208cf98ccf69cdfb1aff4088460681d84189aa259762f97" +checksum = "6aea2634c86b0e8ef2cfdc0c340baede54ec27b1e46febd7f80dffb2aa44a00e" dependencies = [ "proc-macro2", "quote", diff --git a/crates/example-types/src/node_types.rs b/crates/example-types/src/node_types.rs index a3a657acaf..90e1394326 100644 --- a/crates/example-types/src/node_types.rs +++ b/crates/example-types/src/node_types.rs @@ -110,7 +110,7 @@ pub struct CombinedImpl; pub type StaticMembership = StaticCommittee; impl NodeImplementation for PushCdnImpl { - type Network = PushCdnNetwork; + type Network = PushCdnNetwork; type Storage = TestStorage; type AuctionResultsProvider = TestAuctionResultsProvider; } diff --git a/crates/examples/combined/all.rs b/crates/examples/combined/all.rs index 70363db3ec..4ca1e46f3f 100644 --- a/crates/examples/combined/all.rs +++ b/crates/examples/combined/all.rs @@ -72,27 +72,28 @@ async fn main() { let private_address = format!("127.0.0.1:{private_port}"); let public_address = format!("127.0.0.1:{public_port}"); - let config: cdn_broker::Config> = cdn_broker::Config { - discovery_endpoint: discovery_endpoint.clone(), - public_advertise_endpoint: public_address.clone(), - public_bind_endpoint: public_address, - private_advertise_endpoint: private_address.clone(), - private_bind_endpoint: private_address, - - keypair: KeyPair { - public_key: WrappedSignatureKey(broker_public_key), - private_key: broker_private_key.clone(), - }, - - metrics_bind_endpoint: None, - ca_cert_path: None, - ca_key_path: None, - global_memory_pool_size: Some(1024 * 1024 * 1024), - }; + let config: cdn_broker::Config::SignatureKey>> = + cdn_broker::Config { + discovery_endpoint: discovery_endpoint.clone(), + public_advertise_endpoint: public_address.clone(), + public_bind_endpoint: public_address, + private_advertise_endpoint: private_address.clone(), + private_bind_endpoint: private_address, + + keypair: KeyPair { + public_key: WrappedSignatureKey(broker_public_key), + private_key: broker_private_key.clone(), + }, + + metrics_bind_endpoint: None, + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), + }; // Create and spawn the broker async_spawn(async move { - let broker: Broker> = + let broker: Broker::SignatureKey>> = Broker::new(config).await.expect("broker failed to start"); // Error if we stopped unexpectedly @@ -120,9 +121,10 @@ async fn main() { // Spawn the marshal async_spawn(async move { - let marshal: Marshal> = Marshal::new(marshal_config) - .await - .expect("failed to spawn marshal"); + let marshal: Marshal::SignatureKey>> = + Marshal::new(marshal_config) + .await + .expect("failed to spawn marshal"); // Error if we stopped unexpectedly if let Err(err) = marshal.start().await { diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index 9ca8cc127a..8fe91fa855 100755 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -603,7 +603,7 @@ pub struct PushCdnDaRun { /// The underlying configuration config: NetworkConfig, /// The underlying network - network: PushCdnNetwork, + network: PushCdnNetwork, } #[async_trait] @@ -616,12 +616,12 @@ impl< >, NODE: NodeImplementation< TYPES, - Network = PushCdnNetwork, + Network = PushCdnNetwork, Storage = TestStorage, AuctionResultsProvider = TestAuctionResultsProvider, >, V: Versions, - > RunDa, NODE, V> for PushCdnDaRun + > RunDa, NODE, V> for PushCdnDaRun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -665,7 +665,7 @@ where PushCdnDaRun { config, network } } - fn network(&self) -> PushCdnNetwork { + fn network(&self) -> PushCdnNetwork { self.network.clone() } @@ -808,15 +808,14 @@ where .await; // Initialize our CDN network - let cdn_network: PushCdnDaRun = as RunDa< - TYPES, - PushCdnNetwork, - PushCdnImpl, - V, - >>::initialize_networking( - config.clone(), libp2p_advertise_address - ) - .await; + let cdn_network: PushCdnDaRun = + as RunDa< + TYPES, + PushCdnNetwork, + PushCdnImpl, + V, + >>::initialize_networking(config.clone(), libp2p_advertise_address) + .await; // Create our combined network config let delay_duration = config diff --git a/crates/examples/push-cdn/all.rs b/crates/examples/push-cdn/all.rs index 6d97d34d9f..3d3de5d42e 100644 --- a/crates/examples/push-cdn/all.rs +++ b/crates/examples/push-cdn/all.rs @@ -78,27 +78,28 @@ async fn main() { let private_address = format!("127.0.0.1:{private_port}"); let public_address = format!("127.0.0.1:{public_port}"); - let config: cdn_broker::Config> = cdn_broker::Config { - discovery_endpoint: discovery_endpoint.clone(), - public_advertise_endpoint: public_address.clone(), - public_bind_endpoint: public_address, - private_advertise_endpoint: private_address.clone(), - private_bind_endpoint: private_address, - - keypair: KeyPair { - public_key: WrappedSignatureKey(broker_public_key), - private_key: broker_private_key.clone(), - }, - - metrics_bind_endpoint: None, - ca_cert_path: None, - ca_key_path: None, - global_memory_pool_size: Some(1024 * 1024 * 1024), - }; + let config: cdn_broker::Config::SignatureKey>> = + cdn_broker::Config { + discovery_endpoint: discovery_endpoint.clone(), + public_advertise_endpoint: public_address.clone(), + public_bind_endpoint: public_address, + private_advertise_endpoint: private_address.clone(), + private_bind_endpoint: private_address, + + keypair: KeyPair { + public_key: WrappedSignatureKey(broker_public_key), + private_key: broker_private_key.clone(), + }, + + metrics_bind_endpoint: None, + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), + }; // Create and spawn the broker async_spawn(async move { - let broker: Broker> = + let broker: Broker::SignatureKey>> = Broker::new(config).await.expect("broker failed to start"); // Error if we stopped unexpectedly @@ -124,9 +125,10 @@ async fn main() { // Spawn the marshal async_spawn(async move { - let marshal: Marshal> = Marshal::new(marshal_config) - .await - .expect("failed to spawn marshal"); + let marshal: Marshal::SignatureKey>> = + Marshal::new(marshal_config) + .await + .expect("failed to spawn marshal"); // Error if we stopped unexpectedly if let Err(err) = marshal.start().await { diff --git a/crates/examples/push-cdn/broker.rs b/crates/examples/push-cdn/broker.rs index 00232e771d..7eabbec50f 100644 --- a/crates/examples/push-cdn/broker.rs +++ b/crates/examples/push-cdn/broker.rs @@ -33,7 +33,7 @@ struct Args { #[arg(long, default_value = "local_ip:1738")] public_advertise_endpoint: String, - /// The broker-facing endpoint in `IP:port` form to bind to for connections from + /// The broker-facing endpoint in `IP:port` form to bind to for connections from /// other brokers #[arg(long, default_value = "0.0.0.0:1739")] private_bind_endpoint: String, @@ -92,7 +92,7 @@ async fn main() -> Result<()> { ::SignatureKey::generated_from_seed_indexed(key_hash.into(), 1337); // Create config - let broker_config: Config> = Config { + let broker_config: Config::SignatureKey>> = Config { ca_cert_path: args.ca_cert_path, ca_key_path: args.ca_key_path, diff --git a/crates/examples/push-cdn/marshal.rs b/crates/examples/push-cdn/marshal.rs index fde57cd28d..39d2267bd8 100644 --- a/crates/examples/push-cdn/marshal.rs +++ b/crates/examples/push-cdn/marshal.rs @@ -12,6 +12,7 @@ use cdn_marshal::{Config, Marshal}; use clap::Parser; use hotshot::traits::implementations::ProductionDef; use hotshot_example_types::node_types::TestTypes; +use hotshot_types::traits::node_implementation::NodeType; use tracing_subscriber::EnvFilter; // TODO: forall, add logging where we need it @@ -80,7 +81,8 @@ async fn main() -> Result<()> { }; // Create new `Marshal` from the config - let marshal = Marshal::>::new(config).await?; + let marshal = + Marshal::::SignatureKey>>::new(config).await?; // Start the main loop, consuming it marshal.start().await?; diff --git a/crates/examples/push-cdn/types.rs b/crates/examples/push-cdn/types.rs index 963d51bd6e..8803c72152 100644 --- a/crates/examples/push-cdn/types.rs +++ b/crates/examples/push-cdn/types.rs @@ -9,6 +9,7 @@ use hotshot_example_types::{ auction_results_provider_types::TestAuctionResultsProvider, state_types::TestTypes, storage_types::TestStorage, }; +use hotshot_types::traits::node_implementation::NodeType; use serde::{Deserialize, Serialize}; use crate::infra::PushCdnDaRun; @@ -18,7 +19,7 @@ use crate::infra::PushCdnDaRun; pub struct NodeImpl {} /// Convenience type alias -pub type Network = PushCdnNetwork; +pub type Network = PushCdnNetwork<::SignatureKey>; impl NodeImplementation for NodeImpl { type Network = Network; diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index d01facfadd..ea5710dd23 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -27,7 +27,7 @@ use hotshot_task_impls::{ network, network::{NetworkEventTaskState, NetworkMessageTaskState}, request::NetworkRequestState, - response::{run_response_task, NetworkResponseState, RequestReceiver}, + response::{run_response_task, NetworkResponseState}, transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState, @@ -37,6 +37,7 @@ use hotshot_types::{ constants::EVENT_CHANNEL_SIZE, data::QuorumProposal, message::{Messages, Proposal}, + request_response::RequestReceiver, traits::{ network::ConnectedNetwork, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 74d858e912..8403964d60 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -37,8 +37,9 @@ use hotshot_types::{ COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL, }, data::ViewNumber, + request_response::NetworkMsgResponseChannel, traits::{ - network::{BroadcastDelay, ConnectedNetwork, ResponseChannel, Topic}, + network::{BroadcastDelay, ConnectedNetwork, Topic}, node_implementation::NodeType, }, BoxSyncFuture, @@ -93,7 +94,7 @@ impl CombinedNetworks { /// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0 #[must_use] pub fn new( - primary_network: PushCdnNetwork, + primary_network: PushCdnNetwork, secondary_network: Libp2pNetwork, delay_duration: Option, ) -> Self { @@ -120,7 +121,7 @@ impl CombinedNetworks { /// Get a ref to the primary network #[must_use] - pub fn primary(&self) -> &PushCdnNetwork { + pub fn primary(&self) -> &PushCdnNetwork { &self.networks.0 } @@ -249,7 +250,7 @@ impl CombinedNetworks { /// on the tuple #[derive(Clone)] pub struct UnderlyingCombinedNetworks( - pub PushCdnNetwork, + pub PushCdnNetwork, pub Libp2pNetwork, ); @@ -265,7 +266,7 @@ impl TestableNetworkingImplementation for CombinedNetwor secondary_network_delay: Duration, ) -> AsyncGenerator> { let generators = ( - as TestableNetworkingImplementation>::generator( + as TestableNetworkingImplementation>::generator( expected_node_count, num_bootstrap, network_id, @@ -291,7 +292,7 @@ impl TestableNetworkingImplementation for CombinedNetwor Box::pin(async move { // Generate the CDN network let cdn = gen0.await; - let cdn = Arc::>::into_inner(cdn).unwrap(); + let cdn = Arc::>::into_inner(cdn).unwrap(); // Generate the p2p network let p2p = gen1.await; @@ -345,7 +346,7 @@ impl ConnectedNetwork for CombinedNetworks async fn spawn_request_receiver_task( &self, - ) -> Option, ResponseChannel>)>> { + ) -> Option, NetworkMsgResponseChannel>)>> { self.secondary().spawn_request_receiver_task().await } diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index a6103213a2..1aaada8428 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -34,36 +34,37 @@ use async_lock::{Mutex, RwLock}; use async_trait::async_trait; use bimap::BiHashMap; use futures::{ - channel::mpsc::{self, channel, Receiver, Sender}, + channel::mpsc::{self, channel, Sender}, future::{join_all, Either}, FutureExt, StreamExt, }; use hotshot_orchestrator::config::NetworkConfig; -#[cfg(feature = "hotshot-testing")] -use hotshot_types::traits::network::{ - AsyncGenerator, NetworkReliability, TestableNetworkingImplementation, -}; use hotshot_types::{ boxed_sync, constants::LOOK_AHEAD, data::ViewNumber, message::{DataMessage::DataResponse, Message, MessageKind}, + request_response::{NetworkMsgResponseChannel, Request, Response}, traits::{ election::Membership, metrics::{Counter, Gauge, Metrics, NoMetrics}, - network::{self, ConnectedNetwork, NetworkError, ResponseMessage, Topic}, + network::{ConnectedNetwork, NetworkError, ResponseMessage, Topic}, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, }, BoxSyncFuture, }; +#[cfg(feature = "hotshot-testing")] +use hotshot_types::{ + request_response::TakeReceiver, + traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation}, +}; use libp2p_identity::{ ed25519::{self, SecretKey}, Keypair, PeerId, }; use libp2p_networking::{ network::{ - behaviours::request_response::{Request, Response}, spawn_network_node, transport::construct_auth_message, MeshParams, @@ -140,9 +141,6 @@ impl Debug for Libp2pNetwork { } } -/// Locked Option of a receiver for moving the value out of the option -type TakeReceiver = Mutex, ResponseChannel)>>>; - /// Type alias for a shared collection of peerid, multiaddrs pub type PeerInfoVec = Arc>>; @@ -879,7 +877,7 @@ impl ConnectedNetwork for Libp2pNetwork { async fn spawn_request_receiver_task( &self, - ) -> Option, network::ResponseChannel>)>> { + ) -> Option, NetworkMsgResponseChannel>)>> { let mut internal_rx = self.inner.requests_rx.lock().await.take()?; let handle = Arc::clone(&self.inner.handle); let (mut tx, rx) = mpsc::channel(100); @@ -889,7 +887,7 @@ impl ConnectedNetwork for Libp2pNetwork { if tx .try_send(( request, - network::ResponseChannel { + NetworkMsgResponseChannel { sender: response_tx, }, )) @@ -1125,7 +1123,18 @@ impl ConnectedNetwork for Libp2pNetwork { .try_send(Some((view_number, pk))) } - /// handles view update + /// The libp2p view update is a special operation intrinsic to its internal behavior. + /// + /// Libp2p needs to do a lookup because a libp2p address is not releated to + /// hotshot keys. So in libp2p we store a mapping of HotShot key to libp2p address + /// in a distributed hash table. + /// + /// This means to directly message someone on libp2p we need to lookup in the hash + /// table what their libp2p address is, using their HotShot public key as the key. + /// + /// So the logic with libp2p is to prefetch upcomming leaders libp2p address to + /// save time when we later need to direct message the leader our vote. Hence the + /// use of the future view and leader to queue the lookups. async fn update_view<'a, TYPES>(&'a self, view: u64, membership: &TYPES::Membership) where TYPES: NodeType + 'a, diff --git a/crates/hotshot/src/traits/networking/push_cdn_network.rs b/crates/hotshot/src/traits/networking/push_cdn_network.rs index 3a1c68db4f..36a3b5138f 100644 --- a/crates/hotshot/src/traits/networking/push_cdn_network.rs +++ b/crates/hotshot/src/traits/networking/push_cdn_network.rs @@ -125,27 +125,27 @@ impl Serializable for WrappedSignatureKey { /// The production run definition for the Push CDN. /// Uses the real protocols and a Redis discovery client. -pub struct ProductionDef(PhantomData); -impl RunDef for ProductionDef { - type User = UserDef; - type Broker = BrokerDef; +pub struct ProductionDef(PhantomData); +impl RunDef for ProductionDef { + type User = UserDef; + type Broker = BrokerDef; type DiscoveryClientType = Redis; type Topic = Topic; } /// The user definition for the Push CDN. /// Uses the Quic protocol and untrusted middleware. -pub struct UserDef(PhantomData); -impl ConnectionDef for UserDef { - type Scheme = WrappedSignatureKey; +pub struct UserDef(PhantomData); +impl ConnectionDef for UserDef { + type Scheme = WrappedSignatureKey; type Protocol = Quic; } /// The broker definition for the Push CDN. /// Uses the TCP protocol and trusted middleware. -pub struct BrokerDef(PhantomData); -impl ConnectionDef for BrokerDef { - type Scheme = WrappedSignatureKey; +pub struct BrokerDef(PhantomData); +impl ConnectionDef for BrokerDef { + type Scheme = WrappedSignatureKey; type Protocol = Tcp; } @@ -153,18 +153,18 @@ impl ConnectionDef for BrokerDef { /// protocol and no middleware. Differs from the user /// definition in that is on the client-side. #[derive(Clone)] -pub struct ClientDef(PhantomData); -impl ConnectionDef for ClientDef { - type Scheme = WrappedSignatureKey; +pub struct ClientDef(PhantomData); +impl ConnectionDef for ClientDef { + type Scheme = WrappedSignatureKey; type Protocol = Quic; } /// The testing run definition for the Push CDN. /// Uses the real protocols, but with an embedded discovery client. -pub struct TestingDef(PhantomData); -impl RunDef for TestingDef { - type User = UserDef; - type Broker = BrokerDef; +pub struct TestingDef(PhantomData); +impl RunDef for TestingDef { + type User = UserDef; + type Broker = BrokerDef; type DiscoveryClientType = Embedded; type Topic = Topic; } @@ -173,14 +173,16 @@ impl RunDef for TestingDef { /// that helps organize them all. #[derive(Clone)] /// Is generic over both the type of key and the network protocol. -pub struct PushCdnNetwork { +pub struct PushCdnNetwork { /// The underlying client - client: Client>, + client: Client>, /// The CDN-specific metrics metrics: Arc, /// Whether or not the underlying network is supposed to be paused #[cfg(feature = "hotshot-testing")] is_paused: Arc, + // The receiver channel for + // request_receiver_channel: TakeReceiver, } /// The enum for the topics we can subscribe to in the Push CDN @@ -197,7 +199,7 @@ pub enum Topic { /// topics that are not implemented at the application level. impl TopicTrait for Topic {} -impl PushCdnNetwork { +impl PushCdnNetwork { /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial /// topics we are interested in, and our wrapped keypair that we use to authenticate with the /// marshal. @@ -207,7 +209,7 @@ impl PushCdnNetwork { pub fn new( marshal_endpoint: String, topics: Vec, - keypair: KeyPair>, + keypair: KeyPair>, metrics: CdnMetricsValue, ) -> anyhow::Result { // Build config @@ -258,7 +260,9 @@ impl PushCdnNetwork { } #[cfg(feature = "hotshot-testing")] -impl TestableNetworkingImplementation for PushCdnNetwork { +impl TestableNetworkingImplementation + for PushCdnNetwork +{ /// Generate n Push CDN clients, a marshal, and two brokers (that run locally). /// Uses a `SQLite` database instead of Redis. #[allow(clippy::too_many_lines)] @@ -317,7 +321,7 @@ impl TestableNetworkingImplementation for PushCdnNetwork let other_broker_identifier = format!("{other_public_address}/{other_public_address}"); // Configure the broker - let config: BrokerConfig> = BrokerConfig { + let config: BrokerConfig> = BrokerConfig { public_advertise_endpoint: public_address.clone(), public_bind_endpoint: public_address, private_advertise_endpoint: private_address.clone(), @@ -336,7 +340,7 @@ impl TestableNetworkingImplementation for PushCdnNetwork // Create and spawn the broker async_spawn(async move { - let broker: Broker> = + let broker: Broker> = Broker::new(config).await.expect("broker failed to start"); // If we are the first broker by identifier, we need to sleep a bit @@ -369,7 +373,7 @@ impl TestableNetworkingImplementation for PushCdnNetwork // Spawn the marshal async_spawn(async move { - let marshal: Marshal> = Marshal::new(marshal_config) + let marshal: Marshal> = Marshal::new(marshal_config) .await .expect("failed to spawn marshal"); @@ -399,15 +403,16 @@ impl TestableNetworkingImplementation for PushCdnNetwork }; // Configure our client - let client_config: ClientConfig> = ClientConfig { - keypair: KeyPair { - public_key: WrappedSignatureKey(public_key), - private_key, - }, - subscribed_topics: topics, - endpoint: marshal_endpoint, - use_local_authority: true, - }; + let client_config: ClientConfig> = + ClientConfig { + keypair: KeyPair { + public_key: WrappedSignatureKey(public_key), + private_key, + }, + subscribed_topics: topics, + endpoint: marshal_endpoint, + use_local_authority: true, + }; // Create our client Arc::new(PushCdnNetwork { @@ -428,7 +433,23 @@ impl TestableNetworkingImplementation for PushCdnNetwork } #[async_trait] -impl ConnectedNetwork for PushCdnNetwork { +impl ConnectedNetwork for PushCdnNetwork { + // async fn request_data( + // &self, + // request: Vec, + // recipient: ReqDataK, + // ) -> Result, NetworkError> { + // self.client.send_direct_message(recipient, request).await; + + // Ok(vec![]) + // } + + // async fn spawn_request_receiver_task( + // &self, + // ) -> Option, NetworkMsgResponseChannel>)>> { + // None + // } + /// Pause sending and receiving on the PushCDN network. fn pause(&self) { #[cfg(feature = "hotshot-testing")] @@ -482,7 +503,7 @@ impl ConnectedNetwork for PushCdnNetwork, - _recipients: BTreeSet, + _recipients: BTreeSet, _broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { self.broadcast_message(message, Topic::Da) @@ -497,11 +518,7 @@ impl ConnectedNetwork for PushCdnNetwork, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { + async fn direct_message(&self, message: Vec, recipient: K) -> Result<(), NetworkError> { // If we're paused, don't send the message #[cfg(feature = "hotshot-testing")] if self.is_paused.load(Ordering::Relaxed) { @@ -566,8 +583,8 @@ impl ConnectedNetwork for PushCdnNetwork Result<(), TrySendError>> { + _pk: K, + ) -> Result<(), TrySendError>> { Ok(()) } } diff --git a/crates/libp2p-networking/src/network/behaviours/request_response.rs b/crates/libp2p-networking/src/network/behaviours/request_response.rs index 7c75e49fc1..82dd4dab05 100644 --- a/crates/libp2p-networking/src/network/behaviours/request_response.rs +++ b/crates/libp2p-networking/src/network/behaviours/request_response.rs @@ -7,23 +7,11 @@ use std::collections::HashMap; use futures::channel::oneshot::Sender; +use hotshot_types::request_response::{Request, Response}; use libp2p::request_response::{Message, OutboundRequestId}; -use serde::{Deserialize, Serialize}; use crate::network::NetworkEvent; -/// Request for Consenus data -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Request(#[serde(with = "serde_bytes")] pub Vec); - -/// Response for some VID data that we already collected -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Response( - /// Data - #[serde(with = "serde_bytes")] - pub Vec, -); - #[derive(Default, Debug)] /// Handler for request response messages pub(crate) struct RequestResponseState { diff --git a/crates/libp2p-networking/src/network/def.rs b/crates/libp2p-networking/src/network/def.rs index d4c09743ae..b43db8a04d 100644 --- a/crates/libp2p-networking/src/network/def.rs +++ b/crates/libp2p-networking/src/network/def.rs @@ -16,10 +16,8 @@ use libp2p_identity::PeerId; use libp2p_swarm_derive::NetworkBehaviour; use tracing::{debug, error}; -use super::{ - behaviours::request_response::{Request, Response}, - NetworkEventInternal, -}; +use super::NetworkEventInternal; +use hotshot_types::request_response::{Request, Response}; /// Overarching network behaviour performing: /// - network topology discovoery diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index b95acdc79a..f9dce21c42 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -18,7 +18,10 @@ pub mod transport; use std::{collections::HashSet, fmt::Debug, str::FromStr}; use futures::channel::oneshot::{self, Sender}; -use hotshot_types::traits::signature_key::SignatureKey; +use hotshot_types::{ + request_response::{Request, Response}, + traits::signature_key::SignatureKey, +}; #[cfg(async_executor_impl = "async-std")] use libp2p::dns::async_std::Transport as DnsTransport; #[cfg(async_executor_impl = "tokio")] @@ -42,7 +45,6 @@ use serde::{Deserialize, Serialize}; use tracing::instrument; use transport::StakeTableAuthentication; -use self::behaviours::request_response::{Request, Response}; pub use self::{ def::NetworkDef, error::NetworkError, diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index b63c415ebe..ff77c0675b 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -24,7 +24,9 @@ use async_compatibility_layer::{ }; use futures::{channel::mpsc, select, FutureExt, SinkExt, StreamExt}; use hotshot_types::{ - constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::signature_key::SignatureKey, + constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, + request_response::{Request, Response}, + traits::signature_key::SignatureKey, }; use libp2p::{ autonat, @@ -70,7 +72,7 @@ use crate::network::behaviours::{ dht::{DHTBehaviour, DHTProgress, KadPutQuery, NUM_REPLICATED_TO_TRUST}, direct_message::{DMBehaviour, DMRequest}, exponential_backoff::ExponentialBackoff, - request_response::{Request, RequestResponseState, Response}, + request_response::RequestResponseState, }; /// Maximum size of a message diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 175d37a558..89e820c12e 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -11,8 +11,9 @@ use async_compatibility_layer::{ channel::{Receiver, SendError, UnboundedReceiver, UnboundedRecvError, UnboundedSender}, }; use futures::channel::oneshot; -use hotshot_types::traits::{ - network::NetworkError as HotshotNetworkError, signature_key::SignatureKey, +use hotshot_types::{ + request_response::{Request, Response}, + traits::{network::NetworkError as HotshotNetworkError, signature_key::SignatureKey}, }; use libp2p::{request_response::ResponseChannel, Multiaddr}; use libp2p_identity::PeerId; @@ -20,7 +21,6 @@ use snafu::{ResultExt, Snafu}; use tracing::{debug, info, instrument}; use crate::network::{ - behaviours::request_response::{Request, Response}, error::{CancelledRequestSnafu, DHTError}, gen_multiaddr, ClientRequest, NetworkError, NetworkEvent, NetworkNode, NetworkNodeConfig, NetworkNodeConfigBuilderError, diff --git a/crates/task-impls/src/response.rs b/crates/task-impls/src/response.rs index 6462962190..e5d3563089 100644 --- a/crates/task-impls/src/response.rs +++ b/crates/task-impls/src/response.rs @@ -10,7 +10,7 @@ use async_broadcast::Receiver; use async_compatibility_layer::art::{async_sleep, async_spawn}; #[cfg(async_executor_impl = "async-std")] use async_std::task::JoinHandle; -use futures::{channel::mpsc, FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use hotshot_task::dependency::{Dependency, EventDependency}; use hotshot_types::{ consensus::{Consensus, LockedConsensusState, OuterConsensus}, @@ -19,9 +19,10 @@ use hotshot_types::{ DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal, SequencingMessage, }, + request_response::{NetworkMsgResponseChannel, RequestReceiver}, traits::{ election::Membership, - network::{DataRequest, RequestKind, ResponseChannel, ResponseMessage}, + network::{DataRequest, RequestKind, ResponseMessage}, node_implementation::NodeType, signature_key::SignatureKey, }, @@ -32,10 +33,6 @@ use tokio::task::JoinHandle; use tracing::instrument; use crate::events::HotShotEvent; - -/// Type alias for the channel that we receive requests from the network on. -pub type RequestReceiver = mpsc::Receiver<(Vec, ResponseChannel>)>; - /// Time to wait for txns before sending `ResponseMessage::NotFound` const TXNS_TIMEOUT: Duration = Duration::from_millis(100); @@ -98,7 +95,7 @@ impl NetworkResponseState { /// Handle an incoming message. First validates the sender, then handles the contained request. /// Sends the response via `chan` - async fn handle_message(&self, raw_req: Vec, chan: ResponseChannel>) { + async fn handle_message(&self, raw_req: Vec, chan: NetworkMsgResponseChannel>) { let req: Message = match bincode::deserialize(&raw_req) { Ok(deserialized) => deserialized, Err(e) => { diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index ebce2c9eeb..cb0f1faede 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -48,12 +48,14 @@ jf-signature = { workspace = true, features = ["schnorr"] } jf-utils = { workspace = true } rand_chacha = { workspace = true } serde = { workspace = true } +serde_bytes = { workspace = true } tagged-base64 = { workspace = true } vbs = { workspace = true } displaydoc = { version = "0.2.5", default-features = false } dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.17" } url = { workspace = true } vec1 = { workspace = true } +libp2p = { workspace = true } [dev-dependencies] serde_json = { workspace = true } diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index a5f67c1879..566cebeecf 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -26,6 +26,7 @@ pub mod event; pub mod light_client; pub mod message; pub mod qc; +pub mod request_response; pub mod signature_key; pub mod simple_certificate; pub mod simple_vote; diff --git a/crates/types/src/request_response.rs b/crates/types/src/request_response.rs new file mode 100644 index 0000000000..ca65c204c7 --- /dev/null +++ b/crates/types/src/request_response.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) +// This file is part of the HotShot repository. + +// You should have received a copy of the MIT License +// along with the HotShot repository. If not, see . + +//! Types for the request/response implementations. This module incorporates all +//! of the shared types for all of the network backends. + +use crate::traits::network::NetworkMsg; +use async_lock::Mutex; +use futures::channel::{mpsc::Receiver, oneshot}; +use libp2p::request_response::ResponseChannel; +use serde::{Deserialize, Serialize}; + +/// Request for Consenus data +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Request(#[serde(with = "serde_bytes")] pub Vec); + +/// Response for some VID data that we already collected +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Response( + /// Data + #[serde(with = "serde_bytes")] + pub Vec, +); + +/// Wraps a oneshot channel for responding to requests. This is a +/// specialized version of the libp2p request-response `ResponseChannel` +/// which accepts any generic response. +pub struct NetworkMsgResponseChannel { + /// underlying sender for this channel + pub sender: oneshot::Sender, +} + +/// Type alias for the channel that we receive requests from the network on. +pub type RequestReceiver = Receiver<(Vec, NetworkMsgResponseChannel>)>; + +/// Locked Option of a receiver for moving the value out of the option. This +/// type takes any `Response` type depending on the underlying network impl. +pub type TakeReceiver = Mutex, ResponseChannel)>>>; diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 0287f952b7..33fc55df30 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -14,7 +14,7 @@ use async_std::future::TimeoutError; use derivative::Derivative; use dyn_clone::DynClone; use futures::{ - channel::{mpsc, oneshot}, + channel::mpsc::{self}, Future, }; #[cfg(async_executor_impl = "tokio")] @@ -45,6 +45,7 @@ use super::{node_implementation::NodeType, signature_key::SignatureKey}; use crate::{ data::ViewNumber, message::{MessagePurpose, SequencingMessage}, + request_response::NetworkMsgResponseChannel, BoxSyncFuture, }; @@ -74,20 +75,6 @@ pub enum PushCdnNetworkError { FailedToSend, } -/// Web server specific errors -#[derive(Debug, Snafu, Serialize, Deserialize)] -#[snafu(visibility(pub))] -pub enum WebServerNetworkError { - /// The injected consensus data is incorrect - IncorrectConsensusData, - /// The client returned an error - ClientError, - /// Endpoint parsed incorrectly - EndpointError, - /// Client disconnected - ClientDisconnected, -} - /// the type of transmission #[derive(Debug, Clone, Serialize, Deserialize)] pub enum TransmitType { @@ -128,12 +115,6 @@ pub enum NetworkError { /// source of error source: CentralizedServerNetworkError, }, - - /// Web server specific errors - WebServer { - /// source of error - source: WebServerNetworkError, - }, /// unimplemented functionality UnimplementedFeature, /// Could not deliver a message to a specified recipient @@ -198,12 +179,6 @@ pub trait ViewMessage { fn purpose(&self) -> MessagePurpose; } -/// Wraps a oneshot channel for responding to requests -pub struct ResponseChannel { - /// underlying sender for this channel - pub sender: oneshot::Sender, -} - /// A request for some data that the consensus layer is asking for. #[derive(Serialize, Deserialize, Derivative, Clone, Debug, PartialEq, Eq, Hash)] #[serde(bound(deserialize = ""))] @@ -347,7 +322,7 @@ pub trait ConnectedNetwork: Clone + Send + Sync + 'st /// Returns `None`` if network does not support handling requests async fn spawn_request_receiver_task( &self, - ) -> Option, ResponseChannel>)>> { + ) -> Option, NetworkMsgResponseChannel>)>> { None } @@ -363,7 +338,8 @@ pub trait ConnectedNetwork: Clone + Send + Sync + 'st Ok(()) } - /// handles view update + /// Update view can be used for any reason, but mostly it's for canceling tasks, + /// and looking up the address of the leader of a future view. async fn update_view<'a, TYPES>(&'a self, _view: u64, _membership: &TYPES::Membership) where TYPES: NodeType + 'a,