Skip to content

Commit

Permalink
refactor(irohad): always start torii as task (#4699)
Browse files Browse the repository at this point in the history
* refactor(irohad): always start torii as task
* fix(integration): fix timing issue in mint_nft_for_every_user_every_1_sec

Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara authored Jun 10, 2024
1 parent 4362c94 commit f5537fd
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 139 deletions.
136 changes: 39 additions & 97 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,7 @@ pub mod samples;
/// [Orchestrator](https://en.wikipedia.org/wiki/Orchestration_%28computing%29)
/// of the system. It configures, coordinates and manages transactions
/// and queries processing, work of consensus and storage.
///
/// # Usage
/// Construct and then use [`Iroha::start_torii`] or [`Iroha::start_torii_as_task`]. If you experience
/// an immediate shutdown after constructing Iroha, then you probably forgot this step.
#[must_use = "run `.start_torii().await?` to not immediately stop Iroha"]
pub struct Iroha<ToriiState> {
main_state: IrohaMainState,
/// Torii web server
torii: ToriiState,
}

struct IrohaMainState {
pub struct Iroha {
/// Actor responsible for the configuration
_kiso: KisoHandle,
/// Queue of transactions
Expand All @@ -79,6 +68,8 @@ struct IrohaMainState {
_snapshot_maker: Option<SnapshotMakerHandle>,
/// State of blockchain
state: Arc<State>,
/// Shutdown signal
notify_shutdown: Arc<Notify>,
/// Thread handlers
thread_handlers: Vec<ThreadHandler>,
/// A boolean value indicating whether or not the peers will receive data from the network.
Expand All @@ -87,16 +78,10 @@ struct IrohaMainState {
pub freeze_status: Arc<AtomicBool>,
}

/// A state of [`Iroha`] for when the network is started, but [`Torii`] not yet.
pub struct ToriiNotStarted(Torii);

/// A state of [`Iroha`] for when the network & [`Torii`] are started.
#[allow(missing_copy_implementations)]
pub struct ToriiStarted;

impl Drop for IrohaMainState {
impl Drop for Iroha {
fn drop(&mut self) {
iroha_logger::trace!("Iroha instance dropped");
self.notify_shutdown.notify_waiters();
let _thread_handles = core::mem::take(&mut self.thread_handlers);
iroha_logger::debug!(
"Thread handles dropped. Dependent processes going for a graceful shutdown"
Expand Down Expand Up @@ -178,7 +163,7 @@ impl NetworkRelay {
}
}

impl Iroha<ToriiNotStarted> {
impl Iroha {
fn prepare_panic_hook(notify_shutdown: Arc<Notify>) {
#[cfg(not(feature = "test-network"))]
use std::panic::set_hook;
Expand Down Expand Up @@ -226,9 +211,9 @@ impl Iroha<ToriiNotStarted> {
}));
}

/// Creates new Iroha instance and starts all internal services, except [`Torii`].
/// Creates new Iroha instance and starts all internal services.
///
/// Torii is started separately with [`Self::start_torii`] or [`Self::start_torii_as_task`]
/// Returns iroha itself and future to await for iroha completion.
///
/// # Errors
/// - Reading telemetry configs
Expand All @@ -243,7 +228,7 @@ impl Iroha<ToriiNotStarted> {
config: Config,
genesis: Option<GenesisTransaction>,
logger: LoggerHandle,
) -> Result<Self, StartError> {
) -> Result<(impl core::future::Future<Output = ()>, Self), StartError> {
let network = IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone())
.await
.change_context(StartError::StartP2p)?;
Expand Down Expand Up @@ -384,81 +369,40 @@ impl Iroha<ToriiNotStarted> {
metrics_reporter,
);

tokio::spawn(async move {
torii
.start()
.await
.into_report()
.map_err(|report| report.change_context(StartError::StartTorii))
});

Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone());

Self::start_listening_signal(Arc::clone(&notify_shutdown))?;

Self::prepare_panic_hook(notify_shutdown);

Ok(Self {
main_state: IrohaMainState {
_kiso: kiso,
_queue: queue,
_sumeragi: sumeragi,
kura,
_snapshot_maker: snapshot_maker,
state,
thread_handlers: vec![kura_thread_handler],
#[cfg(debug_assertions)]
freeze_status,
},
torii: ToriiNotStarted(torii),
})
}
Self::prepare_panic_hook(Arc::clone(&notify_shutdown));

fn take_torii(self) -> (Torii, Iroha<ToriiStarted>) {
let Self {
main_state,
torii: ToriiNotStarted(torii),
} = self;
(
torii,
Iroha {
main_state,
torii: ToriiStarted,
},
)
}
// Future to wait for iroha completion
let wait = {
let notify_shutdown = Arc::clone(&notify_shutdown);
async move { notify_shutdown.notified().await }
};

/// To make `Iroha` peer work it should be started first. After
/// that moment it will listen for incoming requests and messages.
///
/// # Errors
/// - Forwards initialisation error.
#[iroha_futures::telemetry_future]
pub async fn start_torii(self) -> Result<Iroha<ToriiStarted>, StartError> {
let (torii, new_self) = self.take_torii();
iroha_logger::info!("Starting Iroha");
torii
.start()
.await
.into_report()
// https://github.com/hashintel/hash/issues/4295
.map_err(|report| report.change_context(StartError::StartTorii))?;
Ok(new_self)
}
let irohad = Self {
_kiso: kiso,
_queue: queue,
_sumeragi: sumeragi,
kura,
_snapshot_maker: snapshot_maker,
state,
notify_shutdown,
thread_handlers: vec![kura_thread_handler],
#[cfg(debug_assertions)]
freeze_status,
};

/// Starts Iroha in separate tokio task.
///
/// # Errors
/// - Forwards initialisation error.
#[cfg(feature = "test-network")]
pub fn start_torii_as_task(
self,
) -> (
task::JoinHandle<Result<(), StartError>>,
Iroha<ToriiStarted>,
) {
let (torii, new_self) = self.take_torii();
iroha_logger::info!("Starting Iroha as task");
let handle = tokio::spawn(async move {
torii
.start()
.await
.into_report()
.map_err(|report| report.change_context(StartError::StartTorii))
});
(handle, new_self)
Ok((wait, irohad))
}

#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -563,23 +507,21 @@ impl Iroha<ToriiNotStarted> {
}
})
}
}

impl<T> Iroha<T> {
#[allow(missing_docs)]
#[cfg(debug_assertions)]
pub fn freeze_status(&self) -> &Arc<AtomicBool> {
&self.main_state.freeze_status
&self.freeze_status
}

#[allow(missing_docs)]
pub fn state(&self) -> &Arc<State> {
&self.main_state.state
&self.state
}

#[allow(missing_docs)]
pub fn kura(&self) -> &Arc<Kura> {
&self.main_state.kura
&self.kura
}
}

Expand Down
7 changes: 3 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ async fn main() -> error_stack::Result<(), MainError> {
iroha_logger::debug!("Submitting genesis.");
}

let _iroha = Iroha::start_network(config, genesis, logger)
Iroha::start_network(config, genesis, logger)
.await
.change_context(MainError::IrohaStart)?
.start_torii()
.await
.change_context(MainError::IrohaStart)?;
.0
.await;

Ok(())
}
Expand Down
16 changes: 9 additions & 7 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ fn pre_commit_trigger_should_be_executed() -> Result<()> {

#[test]
fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
const TRIGGER_PERIOD_MS: u64 = 1000;
const TRIGGER_PERIOD: Duration = Duration::from_millis(1000);
const EXPECTED_COUNT: u64 = 4;

let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_780).start_with_runtime();
Expand Down Expand Up @@ -241,9 +241,10 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
let event_listener = get_block_committed_event_listener(&test_client)?;

// Registering trigger
let start_time = curr_time();
let schedule = TimeSchedule::starting_at(start_time + Duration::from_secs(5))
.with_period(Duration::from_millis(TRIGGER_PERIOD_MS));
// Offset into the future to be able to register trigger
let offset = Duration::from_secs(10);
let start_time = curr_time() + offset;
let schedule = TimeSchedule::starting_at(start_time).with_period(TRIGGER_PERIOD);
let register_trigger = Register::trigger(Trigger::new(
"mint_nft_for_all".parse()?,
Action::new(
Expand All @@ -253,14 +254,15 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
TimeEventFilter::new(ExecutionTime::Schedule(schedule)),
),
));
test_client.submit(register_trigger)?;
test_client.submit_blocking(register_trigger)?;
std::thread::sleep(offset);

// Time trigger will be executed on block commits, so we have to produce some transactions
submit_sample_isi_on_every_block_commit(
event_listener,
&mut test_client,
&alice_id,
Duration::from_millis(TRIGGER_PERIOD_MS),
TRIGGER_PERIOD,
usize::try_from(EXPECTED_COUNT)?,
)?;

Expand Down Expand Up @@ -294,7 +296,7 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> {
fn get_block_committed_event_listener(
client: &Client,
) -> Result<impl Iterator<Item = Result<EventBox>>> {
let block_filter = BlockEventFilter::default().for_status(BlockStatus::Committed);
let block_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
client.listen_for_events([block_filter])
}

Expand Down
42 changes: 11 additions & 31 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use iroha_primitives::{
addr::{socket_addr, SocketAddr},
unique_vec::UniqueVec,
};
use irohad::{Iroha, ToriiStarted};
use irohad::Iroha;
use rand::{seq::IteratorRandom, thread_rng};
use serde_json::json;
use tempfile::TempDir;
use test_samples::{ALICE_ID, ALICE_KEYPAIR, PEER_KEYPAIR, SAMPLE_GENESIS_ACCOUNT_KEYPAIR};
use tokio::{
runtime::{self, Runtime},
task::{self, JoinHandle},
time,
};
pub use unique_port;
Expand Down Expand Up @@ -375,10 +374,8 @@ pub struct Peer {
pub p2p_address: SocketAddr,
/// The key-pair for the peer
pub key_pair: KeyPair,
/// Shutdown handle
shutdown: Option<JoinHandle<()>>,
/// Iroha server
pub irohad: Option<Iroha<ToriiStarted>>,
pub irohad: Option<Iroha>,
/// Temporary directory
// Note: last field to be dropped after Iroha (struct fields drops in FIFO RFC 1857)
pub temp_dir: Option<Arc<TempDir>>,
Expand Down Expand Up @@ -453,43 +450,28 @@ impl Peer {
api_addr = %self.api_address,
);
let logger = iroha_logger::test_logger();
let (sender, receiver) = std::sync::mpsc::sync_channel(1);

let handle = task::spawn(
async move {
let irohad = Iroha::start_network(config, genesis, logger)
.await
.expect("Failed to start Iroha");
let (job_handle, irohad) = irohad.start_torii_as_task();
sender.send(irohad).unwrap();
job_handle.await.unwrap().unwrap();
}
.instrument(info_span),
);

self.irohad = Some(receiver.recv().unwrap());
let (_, irohad) = Iroha::start_network(config, genesis, logger)
.instrument(info_span)
.await
.expect("Failed to start Iroha");

self.irohad = Some(irohad);
time::sleep(Duration::from_millis(300)).await;
self.shutdown = Some(handle);
// Prevent temporary directory deleting
self.temp_dir = Some(temp_dir);
}

/// Stop the peer if it's running
pub fn stop(&mut self) -> Option<()> {
pub fn stop(&mut self) {
iroha_logger::info!(
p2p_addr = %self.p2p_address,
api_addr = %self.api_address,
"Stopping peer",
);

if let Some(shutdown) = self.shutdown.take() {
shutdown.abort();
iroha_logger::info!("Shutting down peer...");
self.irohad.take();
Some(())
} else {
None
}
iroha_logger::info!("Shutting down peer...");
self.irohad.take();
}

/// Creates peer
Expand All @@ -504,13 +486,11 @@ impl Peer {
let p2p_address = local_unique_port()?;
let api_address = local_unique_port()?;
let id = PeerId::new(p2p_address.clone(), key_pair.public_key().clone());
let shutdown = None;
Ok(Self {
id,
key_pair,
p2p_address,
api_address,
shutdown,
irohad: None,
temp_dir: None,
})
Expand Down

0 comments on commit f5537fd

Please sign in to comment.