Skip to content

Commit

Permalink
refactor: tidied up a bit more
Browse files Browse the repository at this point in the history
Signed-off-by: 0x009922 <[email protected]>
  • Loading branch information
0x009922 committed Sep 20, 2024
1 parent 5d57d45 commit bf482f8
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 48 deletions.
26 changes: 21 additions & 5 deletions crates/iroha_test_network/src/fslock_ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
io::{Read, Write},
};

use color_eyre::Result;
use color_eyre::{
eyre::{eyre, Context},
Result,
};
use derive_more::{Deref, Display};
use serde::{Deserialize, Serialize};

Expand All @@ -22,10 +25,23 @@ struct LockContent {
impl LockContent {
fn read() -> Result<Self> {
let value = if std::fs::exists(DATA_FILE)? {
let mut content = String::new();
let mut file = OpenOptions::new().read(true).open(DATA_FILE)?;
file.read_to_string(&mut content)?;
serde_json::from_str(&content)?
OpenOptions::new()
.read(true)
.open(DATA_FILE)
.wrap_err("failed to open file")
.and_then(|mut file| {
let mut content = String::new();
file.read_to_string(&mut content)
.wrap_err("failed to read file")?;
serde_json::from_str(&content).wrap_err("failed to parse lock file contents")
})
.wrap_err_with(|| {
eyre!(
"Failed to read lock file at {}. Remove it manually to proceed.",
DATA_FILE
)
})
.unwrap()
} else {
Default::default()
};
Expand Down
113 changes: 70 additions & 43 deletions crates/iroha_test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ use tokio::{
runtime::{self, Runtime},
sync::{broadcast, oneshot, Mutex},
task::{spawn_blocking, JoinSet},
time::timeout,
time::{error::Elapsed, timeout},
};
use toml::Table;

const DEFAULT_PIPELINE_TIME: Duration = Duration::from_millis(100);
const DEFAULT_BLOCK_SYNC: Duration = Duration::from_millis(25);
// const PEER_START_TIMEOUT: Duration = Duration::from_secs(3);
// TODO: increase for CI
const PEER_START_TIMEOUT: Duration = Duration::from_secs(3);
const POLLING_TIMEOUT: Duration = Duration::from_secs(10);
const PEER_KILL_TIMEOUT: Duration = Duration::from_secs(2);

// TODO: read from ENV?
const IROHA_BIN: &'static str = "/Users/qua/dev/iroha/target/release/irohad";

/// Network of peers
Expand All @@ -61,7 +63,6 @@ pub struct Network {
commit_time: Duration,

config: Table,
block_sync_gossip_period: Duration,
}

impl Network {
Expand All @@ -75,78 +76,93 @@ impl Network {
self.peers.last().expect("just added one").clone()
}

/// Returns all peers.
/// Iterate over peers
pub fn peers(&self) -> impl Iterator<Item = &NetworkPeer> + '_ {
self.peers.iter()
}

/// Get the first peer in the network
pub fn peer(&self) -> &NetworkPeer {
self.peers.first().expect("there is always at least one")
}

/// Get a random peer in the network
pub fn random_peer(&self) -> &NetworkPeer {
self.peers()
.choose(&mut thread_rng())
.expect("there is at least one peer")
}

/// Start all peers.
///
/// The first one submits genesis.
/// Start all peers, waiting until they are up and have committed genesis (submitted by the first one).
///
/// # Panics
/// If some peer was already started
pub async fn start_all(&self) -> &Self {
let futures = FuturesUnordered::new();
for (i, peer) in self.peers().enumerate() {
futures.push(async move {
join!(
peer.spawn(self.config(), (i == 0).then_some(&self.genesis)),
peer.once(|e| matches!(e, PeerLifecycleEvent::ServerStarted)),
peer.once(|e| matches!(e, PeerLifecycleEvent::LogBlockCommitted { height: 1 })),
);
self.peers
.iter()
.enumerate()
.map(|(i, peer)| {
timeout(PEER_START_TIMEOUT, async move {
join!(
peer.spawn(self.config(), (i == 0).then_some(&self.genesis)),
peer.once(|e| matches!(e, PeerLifecycleEvent::ServerStarted)),
peer.once(|e| matches!(
e,
PeerLifecycleEvent::LogBlockCommitted { height: 1 }
)),
);
})
})
}
futures.collect::<()>().await;
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<(), Elapsed>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.expect("expected peers to start within timeout");
self
}

/// Pipeline time of the network.
///
/// Is relevant only if users haven't submitted [`SumeragiParameter`] changing it.
/// Users should do it through a network method (which hasn't been necessary yet).
pub fn pipeline_time(&self) -> Duration {
self.block_time + self.commit_time
}

pub fn block_sync_gossip_period(&self) -> Duration {
self.block_sync_gossip_period
}

/// Client of the first peer in the network
/// Alice's client for the first peer of the network
pub fn client(&self) -> Client {
self.peers
.first()
.expect("there is at least one peer")
.client(&ALICE_ID, Signatory::Alice.key_pair().private_key().clone())
self.peer().alice_client()
}

/// Chain ID of the network
pub fn chain_id(&self) -> ChainId {
config::chain_id()
}

/// Base configuration of all peers.
///
/// Includes `sumeragi.trusted_peers` parameter, containing all currently present peers.
pub fn config(&self) -> Table {
self.config
.clone()
.write(["sumeragi", "trusted_peers"], self.topology())
}

/// Network genesis block.
pub fn genesis(&self) -> &GenesisBlock {
&self.genesis
}

/// Shutdown running peers
pub async fn shutdown(&self) -> &Self {
for peer in self.peers() {
if peer.is_running() {
peer.kill().await;
}
}
self.peers
.iter()
.filter(|peer| peer.is_running())
.map(|peer| peer.kill())
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;
self
}

Expand All @@ -158,17 +174,12 @@ impl Network {
.build()
}

pub fn poll_sync<F>(&self, f: F) -> Result<()>
where
F: Fn() -> Result<()>,
{
backoff::retry(self.polling_backoff(), || {
f()?;
Ok(())
})
.map_err(|err| eyre!("polling failed: {err}"))
}

/// [Backoff](backoff)-based polling for a condition to become OK.
///
/// Usually used in combination with [`Client`] to wait until some query returns a desired
/// response.
///
/// Use [`Self::pipeline_time`] as a base for min/max intervals.
pub async fn poll<F, Fut>(&self, f: F) -> Result<()>
where
F: Fn() -> Fut,
Expand All @@ -181,10 +192,25 @@ impl Network {
.await
}

/// [`Self::poll`], but blocking.
pub fn poll_sync<F>(&self, f: F) -> Result<()>
where
F: Fn() -> Result<()>,
{
backoff::retry(self.polling_backoff(), || {
f()?;
Ok(())
})
.map_err(|err| eyre!("polling failed: {err}"))
}

fn topology(&self) -> UniqueVec<PeerId> {
self.peers().map(|x| x.id.clone()).collect()
}

/// Resolves when all peers have at least N blocks in their `GET /status` response.
/// # Errors
/// If this doesn't happen within a timeout.
pub async fn ensure_synchronised_blocks_height(&self, height: u64) -> Result<&Self> {
eprintln!("waiting until peers have {height} block(s)...");
if self
Expand Down Expand Up @@ -227,6 +253,7 @@ impl Network {
}
}

/// Builder of [`Network`]
pub struct NetworkBuilder {
n_peers: usize,
config: Table,
Expand All @@ -236,6 +263,7 @@ pub struct NetworkBuilder {

/// Test network builder
impl NetworkBuilder {
/// Constructor
pub fn new() -> Self {
Self {
n_peers: 1,
Expand Down Expand Up @@ -325,7 +353,6 @@ impl NetworkBuilder {
["network", "block_gossip_period_ms"],
block_sync_gossip_period.as_millis() as u64,
),
block_sync_gossip_period,
}
}

Expand Down

0 comments on commit bf482f8

Please sign in to comment.