Skip to content

Commit

Permalink
[fix] #4267: Introduce p2p idle timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Apr 5, 2024
1 parent bc6be98 commit 3772c56
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,9 @@ impl Iroha {
genesis: Option<GenesisNetwork>,
logger: LoggerHandle,
) -> Result<Self> {
let network = IrohaNetwork::start(
config.common.p2p_address.clone(),
config.common.key_pair.clone(),
)
.await
.wrap_err("Unable to start P2P-network")?;
let network = IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone())
.await
.wrap_err("Unable to start P2P-network")?;

let (events_sender, _) = broadcast::channel(10000);
let world = World::with(
Expand Down
13 changes: 11 additions & 2 deletions config/src/parameters/actual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
#[allow(missing_docs)]
pub struct Root {
pub common: Common,
pub network: Network,
pub genesis: Genesis,
pub torii: Torii,
pub kura: Kura,
Expand Down Expand Up @@ -71,16 +72,24 @@ impl Root {
pub struct Common {
pub chain_id: ChainId,
pub key_pair: KeyPair,
pub p2p_address: SocketAddr,
pub peer_id: PeerId,
}

impl Common {
/// Construct an id of this peer
pub fn peer_id(&self) -> PeerId {
PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone())
self.peer_id.clone()
}
}

/// Network options
#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub struct Network {
pub address: SocketAddr,
pub idle_timeout: Duration,
}

/// Parsed genesis configuration
#[derive(Debug, Clone)]
pub enum Genesis {
Expand Down
2 changes: 2 additions & 0 deletions config/src/parameters/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod network {

pub const DEFAULT_MAX_TRANSACTIONS_PER_GOSSIP: NonZeroU32 = nonzero!(500u32);
pub const DEFAULT_MAX_BLOCKS_PER_GOSSIP: NonZeroU32 = nonzero!(4u32);

pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
}

pub mod snapshot {
Expand Down
28 changes: 22 additions & 6 deletions config/src/parameters/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Root {
}
}

let (p2p_address, block_sync, transaction_gossiper) = self.network.parse();
let (network, block_sync, transaction_gossiper) = self.network.parse();

let logger = self.logger;
let queue = self.queue;
Expand Down Expand Up @@ -216,18 +216,21 @@ impl Root {

let chain_wide = self.chain_wide.parse();

if p2p_address == torii.address {
if network.address == torii.address {
emitter.emit(eyre!(
"`iroha.p2p_address` and `torii.address` should not be the same"
))
}

emitter.finish()?;

let key_pair = key_pair.unwrap();
let peer_id = PeerId::new(network.address.clone(), key_pair.public_key().clone());

let peer = actual::Common {
chain_id: self.chain_id,
key_pair: key_pair.unwrap(),
p2p_address,
key_pair,
peer_id,
};
let telemetry = telemetry.unwrap();
let genesis = genesis.unwrap();
Expand All @@ -239,6 +242,7 @@ impl Root {

Ok(actual::Root {
common: peer,
network,
genesis,
torii,
kura,
Expand Down Expand Up @@ -457,20 +461,32 @@ pub struct Network {
pub block_gossip_period: Duration,
pub transaction_gossip_max_size: NonZeroU32,
pub transaction_gossip_period: Duration,
/// Duration of time after which connection with peer is terminated if peer is idle
pub idle_timeout: Duration,
}

impl Network {
fn parse(self) -> (SocketAddr, actual::BlockSync, actual::TransactionGossiper) {
fn parse(
self,
) -> (
actual::Network,
actual::BlockSync,
actual::TransactionGossiper,
) {
let Self {
address,
block_gossip_max_size,
block_gossip_period,
transaction_gossip_max_size,
transaction_gossip_period,
idle_timeout,
} = self;

(
address,
actual::Network {
address,
idle_timeout,
},
actual::BlockSync {
gossip_period: block_gossip_period,
gossip_max_size: block_gossip_max_size,
Expand Down
5 changes: 5 additions & 0 deletions config/src/parameters/user/boilerplate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ pub struct NetworkPartial {
pub block_gossip_period: UserField<HumanDuration>,
pub transaction_gossip_max_size: UserField<NonZeroU32>,
pub transaction_gossip_period: UserField<HumanDuration>,
pub idle_timeout: UserField<HumanDuration>,
}

impl UnwrapPartial for NetworkPartial {
Expand Down Expand Up @@ -445,6 +446,10 @@ impl UnwrapPartial for NetworkPartial {
.block_gossip_max_size
.get()
.unwrap_or(DEFAULT_MAX_BLOCKS_PER_GOSSIP),
idle_timeout: self
.idle_timeout
.map(HumanDuration::get)
.unwrap_or(DEFAULT_IDLE_TIMEOUT),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions configs/peer.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# block_gossip_max_size = 4
# transaction_gossip_period = "1s"
# transaction_gossip_max_size = 500
# idle_timeout = "60s"

[torii]
# address =
Expand Down
Binary file modified configs/swarm/executor.wasm
Binary file not shown.
8 changes: 6 additions & 2 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,18 @@ impl Drop for Peer {
impl Peer {
/// Returns per peer config with all addresses, keys, and id set up.
fn get_config(&self, config: Config) -> Config {
use iroha_config::parameters::actual::{Common, Torii};
use iroha_config::parameters::actual::{Common, Network, Torii};

Config {
common: Common {
key_pair: self.key_pair.clone(),
p2p_address: self.p2p_address.clone(),
peer_id: PeerId::new(self.p2p_address.clone(), self.key_pair.public_key().clone()),
..config.common
},
network: Network {
address: self.p2p_address.clone(),
..config.network
},
torii: Torii {
address: self.api_address.clone(),
..config.torii
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ iroha_logger = { workspace = true }
iroha_crypto = { workspace = true, default-features = true }
iroha_data_model = { workspace = true, default-features = true, features = ["transparent_api"] }
iroha_primitives = { workspace = true }
iroha_config = { workspace = true }
iroha_config_base = { workspace = true }
iroha_data_model_derive = { workspace = true }

Expand Down
17 changes: 15 additions & 2 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
};

use futures::{stream::FuturesUnordered, StreamExt};
use iroha_config::parameters::actual::Network as Config;
use iroha_crypto::{KeyPair, PublicKey};
use iroha_data_model::prelude::PeerId;
use iroha_logger::prelude::*;
Expand Down Expand Up @@ -67,7 +68,13 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
/// # Errors
/// - If binding to address fail
#[log(skip(key_pair))]
pub async fn start(listen_addr: SocketAddr, key_pair: KeyPair) -> Result<Self, Error> {
pub async fn start(
key_pair: KeyPair,
Config {
address: listen_addr,
idle_timeout,
}: Config,
) -> Result<Self, Error> {
let listener = TcpListener::bind(&listen_addr.to_string()).await?;
iroha_logger::info!("Network bound to listener");
let (online_peers_sender, online_peers_receiver) = watch::channel(HashSet::new());
Expand Down Expand Up @@ -95,6 +102,7 @@ impl<T: Pload, K: Kex + Sync, E: Enc + Sync> NetworkBaseHandle<T, K, E> {
service_message_sender,
current_conn_id: 0,
current_topology: HashMap::new(),
idle_timeout,
_key_exchange: core::marker::PhantomData::<K>,
_encryptor: core::marker::PhantomData::<E>,
};
Expand Down Expand Up @@ -192,6 +200,8 @@ struct NetworkBase<T: Pload, K: Kex, E: Enc> {
/// Current topology
/// Bool determines who is responsible for initiating connection
current_topology: HashMap<PeerId, bool>,
/// Duration after which terminate connection with idle peer
idle_timeout: Duration,
/// Key exchange used by network
_key_exchange: core::marker::PhantomData<K>,
/// Encryptor used by the network
Expand Down Expand Up @@ -278,6 +288,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
Connection::new(conn_id, stream),
service_message_sender,
self.idle_timeout,
);
}

Expand Down Expand Up @@ -341,6 +352,7 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
self.key_pair.clone(),
conn_id,
service_message_sender,
self.idle_timeout,
);
}

Expand All @@ -366,6 +378,8 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
disambiguator,
}: Connected<T>,
) {
self.connecting_peers.remove(&connection_id);

if !self.current_topology.contains_key(&peer_id) {
iroha_logger::warn!(%peer_id, topology=?self.current_topology, "Peer not present in topology is trying to connect");
return;
Expand Down Expand Up @@ -395,7 +409,6 @@ impl<T: Pload, K: Kex, E: Enc> NetworkBase<T, K, E> {
};
let _ = peer_message_sender.send(self.peer_message_sender.clone());
self.peers.insert(peer_id.public_key().clone(), ref_peer);
self.connecting_peers.remove(&connection_id);
Self::add_online_peer(&self.online_peers_sender, peer_id);
}

Expand Down
Loading

0 comments on commit 3772c56

Please sign in to comment.