Skip to content

Commit

Permalink
Merge branch 'main' into ss/test-epoch-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
ss-es committed Jan 9, 2025
2 parents f606506 + 0cad927 commit 52adf7e
Show file tree
Hide file tree
Showing 19 changed files with 397 additions and 188 deletions.
5 changes: 4 additions & 1 deletion crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ use hotshot_types::{
},
HotShotConfig, PeerConfig, ValidatorConfig,
};
use libp2p_networking::network::{GossipConfig, RequestResponseConfig};
use libp2p_networking::network::{
behaviours::dht::store::persistent::DhtNoPersistence, GossipConfig, RequestResponseConfig,
};
use rand::{rngs::StdRng, SeedableRng};
use surf_disco::Url;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -738,6 +740,7 @@ where
// Create the Libp2p network
let libp2p_network = Libp2pNetwork::from_config(
config.clone(),
DhtNoPersistence,
Arc::clone(membership),
GossipConfig::default(),
RequestResponseConfig::default(),
Expand Down
26 changes: 18 additions & 8 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ use libp2p_identity::{
pub use libp2p_networking::network::{GossipConfig, RequestResponseConfig};
use libp2p_networking::{
network::{
behaviours::dht::record::{Namespace, RecordKey, RecordValue},
behaviours::dht::{
record::{Namespace, RecordKey, RecordValue},
store::persistent::{DhtNoPersistence, DhtPersistentStorage},
},
spawn_network_node,
transport::construct_auth_message,
NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
Expand Down Expand Up @@ -273,6 +276,7 @@ impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
Arc::new(
match Libp2pNetwork::new(
Libp2pMetricsValue::default(),
DhtNoPersistence,
config,
pubkey.clone(),
lookup_record_value,
Expand Down Expand Up @@ -388,9 +392,10 @@ impl<T: NodeType> Libp2pNetwork<T> {
/// # Panics
/// If we are unable to calculate the replication factor
#[allow(clippy::too_many_arguments)]
pub async fn from_config(
pub async fn from_config<D: DhtPersistentStorage>(
mut config: NetworkConfig<T::SignatureKey>,
membership: Arc<RwLock<T::Membership>>,
dht_persistent_storage: D,
quorum_membership: Arc<RwLock<T::Membership>>,
gossip_config: GossipConfig,
request_response_config: RequestResponseConfig,
bind_address: Multiaddr,
Expand Down Expand Up @@ -421,7 +426,7 @@ impl<T: NodeType> Libp2pNetwork<T> {

// Set the auth message and stake table
config_builder
.stake_table(Some(membership))
.stake_table(Some(quorum_membership))
.auth_message(Some(auth_message));

// The replication factor is the minimum of [the default and 2/3 the number of nodes]
Expand Down Expand Up @@ -469,6 +474,7 @@ impl<T: NodeType> Libp2pNetwork<T> {

Ok(Libp2pNetwork::new(
metrics,
dht_persistent_storage,
node_config,
pub_key.clone(),
lookup_record_value,
Expand Down Expand Up @@ -509,18 +515,22 @@ impl<T: NodeType> Libp2pNetwork<T> {
///
/// This will panic if there are less than 5 bootstrap nodes
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub async fn new<D: DhtPersistentStorage>(
metrics: Libp2pMetricsValue,
dht_persistent_storage: D,
config: NetworkNodeConfig<T>,
pk: T::SignatureKey,
lookup_record_value: RecordValue<T::SignatureKey>,
bootstrap_addrs: BootstrapAddrs,
id: usize,
#[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
) -> Result<Libp2pNetwork<T>, NetworkError> {
let (mut rx, network_handle) = spawn_network_node::<T>(config.clone(), id)
.await
.map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
let (mut rx, network_handle) =
spawn_network_node::<T, D>(config.clone(), dht_persistent_storage, id)
.await
.map_err(|e| {
NetworkError::ConfigError(format!("failed to spawn network node: {e}"))
})?;

// Add our own address to the bootstrap addresses
let addr = network_handle.listen_addr();
Expand Down
21 changes: 12 additions & 9 deletions crates/libp2p-networking/src/network/behaviours/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use libp2p::kad::{
store::RecordStore, Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent,
};
use libp2p_identity::PeerId;
use store::{file_backed::FileBackedStore, validated::ValidatedStore};
use store::{
persistent::{DhtPersistentStorage, PersistentStore},
validated::ValidatedStore,
};
use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -57,7 +60,7 @@ use crate::network::{ClientRequest, NetworkEvent};
/// - bootstrapping into the network
/// - peer discovery
#[derive(Debug)]
pub struct DHTBehaviour<K: SignatureKey + 'static> {
pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
/// in progress queries for nearby peers
pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
/// List of in-progress get requests
Expand All @@ -77,8 +80,8 @@ pub struct DHTBehaviour<K: SignatureKey + 'static> {
/// Sender to the bootstrap task
bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,

/// Phantom type for the key
phantom: PhantomData<K>,
/// Phantom type for the key and persistent storage
phantom: PhantomData<(K, D)>,
}

/// State of bootstrapping
Expand Down Expand Up @@ -106,7 +109,7 @@ pub enum DHTEvent {
IsBootstrapped,
}

impl<K: SignatureKey + 'static> DHTBehaviour<K> {
impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
/// Give the handler a way to retry requests.
pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
self.retry_tx = Some(tx);
Expand Down Expand Up @@ -143,7 +146,7 @@ impl<K: SignatureKey + 'static> DHTBehaviour<K> {
/// print out the routing table to stderr
pub fn print_routing_table(
&mut self,
kadem: &mut KademliaBehaviour<FileBackedStore<ValidatedStore<MemoryStore, K>>>,
kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
) {
let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
let v = kadem.kbuckets().collect::<Vec<_>>();
Expand Down Expand Up @@ -179,7 +182,7 @@ impl<K: SignatureKey + 'static> DHTBehaviour<K> {
factor: NonZeroUsize,
backoff: ExponentialBackoff,
retry_count: u8,
kad: &mut KademliaBehaviour<FileBackedStore<ValidatedStore<MemoryStore, K>>>,
kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
) {
// noop
if retry_count == 0 {
Expand Down Expand Up @@ -247,7 +250,7 @@ impl<K: SignatureKey + 'static> DHTBehaviour<K> {
/// update state based on recv-ed get query
fn handle_get_query(
&mut self,
store: &mut FileBackedStore<ValidatedStore<MemoryStore, K>>,
store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
record_results: GetRecordResult,
id: QueryId,
mut last: bool,
Expand Down Expand Up @@ -405,7 +408,7 @@ impl<K: SignatureKey + 'static> DHTBehaviour<K> {
pub fn dht_handle_event(
&mut self,
event: KademliaEvent,
store: &mut FileBackedStore<ValidatedStore<MemoryStore, K>>,
store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
) -> Option<NetworkEvent> {
match event {
KademliaEvent::OutboundQueryProgressed {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod file_backed;
pub mod persistent;
pub mod validated;
Loading

0 comments on commit 52adf7e

Please sign in to comment.