diff --git a/cli/src/lib.rs b/cli/src/lib.rs index fe04463ecbb..e272bd9d1c8 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::Arc}; use clap::Parser; use color_eyre::eyre::{eyre, Result, WrapErr}; use iroha_config::parameters::{actual::Root as Config, user::CliContext}; +#[cfg(feature = "telemetry")] +use iroha_core::metrics::MetricsReporter; use iroha_core::{ block_sync::{BlockSynchronizer, BlockSynchronizerHandle}, gossiper::{TransactionGossiper, TransactionGossiperHandle}, @@ -214,6 +216,7 @@ impl Iroha { ); let kura = Kura::new(&config.kura)?; + let kura_thread_handler = Kura::start(Arc::clone(&kura)); let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start(); let block_count = kura.init()?; @@ -254,7 +257,13 @@ impl Iroha { #[cfg(feature = "telemetry")] Self::start_telemetry(&logger, &config).await?; - let kura_thread_handler = Kura::start(Arc::clone(&kura)); + #[cfg(feature = "telemetry")] + let metrics_reporter = MetricsReporter::new( + Arc::clone(&state), + network.clone(), + kura.clone(), + queue.clone(), + ); let start_args = SumeragiStartArgs { sumeragi_config: config.sumeragi.clone(), @@ -269,6 +278,7 @@ impl Iroha { public_key: config.genesis.public_key().clone(), }, block_count, + dropped_messages: metrics_reporter.metrics().dropped_messages.clone(), }; // Starting Sumeragi requires no async context enabled let sumeragi = tokio::task::spawn_blocking(move || SumeragiHandle::start(start_args)) @@ -322,11 +332,11 @@ impl Iroha { Arc::clone(&queue), events_sender, Arc::clone(¬ify_shutdown), - #[cfg(feature = "telemetry")] - sumeragi.clone(), live_query_store_handle, Arc::clone(&kura), Arc::clone(&state), + #[cfg(feature = "telemetry")] + metrics_reporter, ); Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone()); diff --git a/core/src/lib.rs b/core/src/lib.rs index 06a0bd4103f..f9b25a0a831 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,6 +6,7 @@ pub mod executor; pub mod gossiper; pub mod kiso; pub mod kura; +pub mod metrics; pub mod query; pub mod queue; pub mod smartcontracts; diff --git a/core/src/metrics.rs b/core/src/metrics.rs new file mode 100644 index 00000000000..7de5b4f42bd --- /dev/null +++ b/core/src/metrics.rs @@ -0,0 +1,155 @@ +//! Metrics and status reporting + +use std::{sync::Arc, time::SystemTime}; + +use eyre::{Result, WrapErr as _}; +use iroha_telemetry::metrics::Metrics; +use parking_lot::Mutex; +use storage::storage::StorageReadOnly; + +use crate::{ + kura::Kura, + queue::Queue, + state::{State, StateReadOnly, WorldReadOnly}, + IrohaNetwork, +}; + +/// Responsible for collecting and updating metrics +#[derive(Clone)] +pub struct MetricsReporter { + state: Arc, + network: IrohaNetwork, + kura: Arc, + queue: Arc, + metrics: Metrics, + /// Latest observed and processed height by metrics reporter + latest_block_height: Arc>, +} + +impl MetricsReporter { + /// Construct [`Self`] + pub fn new( + state: Arc, + network: IrohaNetwork, + kura: Arc, + queue: Arc, + ) -> Self { + Self { + state, + network, + queue, + kura, + metrics: Metrics::default(), + latest_block_height: Arc::new(Mutex::new(0)), + } + } + + /// Update the metrics on the state. + /// + /// # Errors + /// - Domains fail to compose + /// + /// # Panics + /// - If either mutex is poisoned + #[allow(clippy::cast_precision_loss)] + pub fn update_metrics(&self) -> Result<()> { + let online_peers_count: u64 = self + .network + .online_peers( + #[allow(clippy::disallowed_types)] + std::collections::HashSet::len, + ) + .try_into() + .expect("casting usize to u64"); + + let state_view = self.state.view(); + + let mut lastest_block_height = self.latest_block_height.lock(); + + let start_index = *lastest_block_height; + { + let mut block_index = start_index; + while block_index < state_view.height() { + let Some(block) = self.kura.get_block_by_height(block_index + 1) else { + break; + }; + block_index += 1; + let mut block_txs_accepted = 0; + let mut block_txs_rejected = 0; + for tx in block.transactions() { + if tx.error.is_none() { + block_txs_accepted += 1; + } else { + block_txs_rejected += 1; + } + } + + self.metrics + .txs + .with_label_values(&["accepted"]) + .inc_by(block_txs_accepted); + self.metrics + .txs + .with_label_values(&["rejected"]) + .inc_by(block_txs_rejected); + self.metrics + .txs + .with_label_values(&["total"]) + .inc_by(block_txs_accepted + block_txs_rejected); + self.metrics.block_height.inc(); + } + *lastest_block_height = block_index; + } + + let new_tx_amounts = { + let mut new_buf = Vec::new(); + core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock()); + new_buf + }; + + for amount in &new_tx_amounts { + self.metrics.tx_amounts.observe(*amount); + } + + #[allow(clippy::cast_possible_truncation)] + if let Some(timestamp) = state_view.genesis_timestamp() { + let curr_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Failed to get the current system time"); + + // this will overflow in 584942417years. + self.metrics.uptime_since_genesis_ms.set( + (curr_time - timestamp) + .as_millis() + .try_into() + .expect("Timestamp should fit into u64"), + ) + }; + + self.metrics.connected_peers.set(online_peers_count); + + self.metrics + .domains + .set(state_view.world().domains().len() as u64); + for domain in state_view.world().domains_iter() { + self.metrics + .accounts + .get_metric_with_label_values(&[domain.id.name.as_ref()]) + .wrap_err("Failed to compose domains")? + .set(domain.accounts.len() as u64); + } + + self.metrics + .view_changes + .set(state_view.latest_block_view_change_index()); + + self.metrics.queue_size.set(self.queue.tx_len() as u64); + + Ok(()) + } + + /// Access node metrics. + pub fn metrics(&self) -> &Metrics { + &self.metrics + } +} diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index 93294a47828..a62de42c4e5 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -4,18 +4,16 @@ use std::{ fmt::{self, Debug, Formatter}, sync::{mpsc, Arc}, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant}, }; -use eyre::{Result, WrapErr as _}; +use eyre::Result; use iroha_config::parameters::actual::{Common as CommonConfig, Sumeragi as SumeragiConfig}; use iroha_crypto::{KeyPair, SignatureOf}; use iroha_data_model::{block::SignedBlock, prelude::*}; use iroha_genesis::GenesisNetwork; use iroha_logger::prelude::*; -use iroha_telemetry::metrics::Metrics; use network_topology::{Role, Topology}; -use storage::storage::StorageReadOnly; use crate::{ block::ValidBlock, @@ -29,30 +27,14 @@ pub mod message; pub mod network_topology; pub mod view_change; -use parking_lot::Mutex; - use self::{message::*, view_change::ProofChain}; use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, NetworkMessage}; -/* -The values in the following struct are not atomics because the code that -operates on them assumes their values does not change during the course of -the function. -*/ -#[derive(Debug)] -struct LastUpdateMetricsData { - block_height: u64, -} - /// Handle to `Sumeragi` actor #[derive(Clone)] pub struct SumeragiHandle { - state: Arc, - metrics: Metrics, - last_update_metrics_mutex: Arc>, - network: IrohaNetwork, - kura: Arc, - queue: Arc, + /// Counter for amount of dropped messages by sumeragi + dropped_messages: iroha_telemetry::metrics::IntCounter, _thread_handle: Arc, // Should be dropped after `_thread_handle` to prevent sumeargi thread from panicking control_message_sender: mpsc::SyncSender, @@ -60,119 +42,10 @@ pub struct SumeragiHandle { } impl SumeragiHandle { - /// Update the metrics on the world state view. - /// - /// # Errors - /// - Domains fail to compose - /// - /// # Panics - /// - If either mutex is poisoned - #[allow(clippy::cast_precision_loss)] - pub fn update_metrics(&self) -> Result<()> { - let online_peers_count: u64 = self - .network - .online_peers( - #[allow(clippy::disallowed_types)] - std::collections::HashSet::len, - ) - .try_into() - .expect("casting usize to u64"); - - let state_view = self.state.view(); - - let mut last_guard = self.last_update_metrics_mutex.lock(); - - let start_index = last_guard.block_height; - { - let mut block_index = start_index; - while block_index < state_view.height() { - let Some(block) = self.kura.get_block_by_height(block_index + 1) else { - break; - }; - block_index += 1; - let mut block_txs_accepted = 0; - let mut block_txs_rejected = 0; - for tx in block.transactions() { - if tx.error.is_none() { - block_txs_accepted += 1; - } else { - block_txs_rejected += 1; - } - } - - self.metrics - .txs - .with_label_values(&["accepted"]) - .inc_by(block_txs_accepted); - self.metrics - .txs - .with_label_values(&["rejected"]) - .inc_by(block_txs_rejected); - self.metrics - .txs - .with_label_values(&["total"]) - .inc_by(block_txs_accepted + block_txs_rejected); - self.metrics.block_height.inc(); - } - last_guard.block_height = block_index; - } - - let new_tx_amounts = { - let mut new_buf = Vec::new(); - core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock()); - new_buf - }; - - for amount in &new_tx_amounts { - self.metrics.tx_amounts.observe(*amount); - } - - #[allow(clippy::cast_possible_truncation)] - if let Some(timestamp) = state_view.genesis_timestamp() { - let curr_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Failed to get the current system time"); - - // this will overflow in 584942417years. - self.metrics.uptime_since_genesis_ms.set( - (curr_time - timestamp) - .as_millis() - .try_into() - .expect("Timestamp should fit into u64"), - ) - }; - - self.metrics.connected_peers.set(online_peers_count); - - self.metrics - .domains - .set(state_view.world().domains().len() as u64); - for domain in state_view.world().domains_iter() { - self.metrics - .accounts - .get_metric_with_label_values(&[domain.id().name.as_ref()]) - .wrap_err("Failed to compose domains")? - .set(domain.accounts.len() as u64); - } - - self.metrics - .view_changes - .set(state_view.latest_block_view_change_index()); - - self.metrics.queue_size.set(self.queue.tx_len() as u64); - - Ok(()) - } - - /// Access node metrics. - pub fn metrics(&self) -> &Metrics { - &self.metrics - } - /// Deposit a sumeragi control flow network message. pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) { if let Err(error) = self.control_message_sender.try_send(msg) { - self.metrics.dropped_messages.inc(); + self.dropped_messages.inc(); error!( ?error, "This peer is faulty. \ @@ -184,7 +57,7 @@ impl SumeragiHandle { /// Deposit a sumeragi network message. pub fn incoming_block_message(&self, msg: BlockMessage) { if let Err(error) = self.message_sender.try_send(msg) { - self.metrics.dropped_messages.inc(); + self.dropped_messages.inc(); error!( ?error, "This peer is faulty. \ @@ -255,6 +128,7 @@ impl SumeragiHandle { network, genesis_network, block_count: BlockCount(block_count), + dropped_messages, }: SumeragiStartArgs, ) -> SumeragiHandle { let (control_message_sender, control_message_receiver) = mpsc::sync_channel(100); @@ -353,16 +227,9 @@ impl SumeragiHandle { let thread_handle = ThreadHandler::new(Box::new(shutdown), thread_handle); SumeragiHandle { - state, - network, - queue, - kura, + dropped_messages, control_message_sender, message_sender, - metrics: Metrics::default(), - last_update_metrics_mutex: Arc::new(Mutex::new(LastUpdateMetricsData { - block_height: 0, - })), _thread_handle: Arc::new(thread_handle), } } @@ -428,6 +295,7 @@ pub struct SumeragiStartArgs { pub network: IrohaNetwork, pub genesis_network: GenesisWithPubKey, pub block_count: BlockCount, + pub dropped_messages: iroha_telemetry::metrics::IntCounter, } /// Optional genesis paired with genesis public key for verification diff --git a/telemetry/src/metrics.rs b/telemetry/src/metrics.rs index 7e93b02f94f..9c7c30d8659 100644 --- a/telemetry/src/metrics.rs +++ b/telemetry/src/metrics.rs @@ -3,7 +3,7 @@ use std::{ops::Deref, time::Duration}; use parity_scale_codec::{Compact, Decode, Encode}; -use prometheus::{ +pub use prometheus::{ core::{AtomicU64, GenericGauge, GenericGaugeVec}, Encoder, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, Opts, Registry, }; diff --git a/torii/src/lib.rs b/torii/src/lib.rs index ce43bbe8561..94d3b11ad79 100644 --- a/torii/src/lib.rs +++ b/torii/src/lib.rs @@ -15,7 +15,7 @@ use std::{ use futures::{stream::FuturesUnordered, StreamExt}; use iroha_config::parameters::actual::Torii as Config; #[cfg(feature = "telemetry")] -use iroha_core::sumeragi::SumeragiHandle; +use iroha_core::metrics::MetricsReporter; use iroha_core::{ kiso::{Error as KisoError, KisoHandle}, kura::Kura, @@ -50,13 +50,13 @@ pub struct Torii { queue: Arc, events: EventsSender, notify_shutdown: Arc, - #[cfg(feature = "telemetry")] - sumeragi: SumeragiHandle, query_service: LiveQueryStoreHandle, kura: Arc, transaction_max_content_length: u64, address: SocketAddr, state: Arc, + #[cfg(feature = "telemetry")] + metrics_reporter: MetricsReporter, } impl Torii { @@ -69,10 +69,10 @@ impl Torii { queue: Arc, events: EventsSender, notify_shutdown: Arc, - #[cfg(feature = "telemetry")] sumeragi: SumeragiHandle, query_service: LiveQueryStoreHandle, kura: Arc, state: Arc, + #[cfg(feature = "telemetry")] metrics_reporter: MetricsReporter, ) -> Self { Self { chain_id: Arc::new(chain_id), @@ -80,11 +80,11 @@ impl Torii { queue, events, notify_shutdown, - #[cfg(feature = "telemetry")] - sumeragi, query_service, kura, state, + #[cfg(feature = "telemetry")] + metrics_reporter, address: config.address, transaction_max_content_length: config.max_content_len_bytes, } @@ -108,21 +108,23 @@ impl Torii { #[cfg(feature = "telemetry")] let get_router = get_router .or(warp::path(uri::STATUS) - .and(add_state!(self.sumeragi.clone())) + .and(add_state!(self.metrics_reporter.clone())) .and(warp::header::optional(warp::http::header::ACCEPT.as_str())) .and(warp::path::tail()) - .and_then(|sumeragi, accept: Option, tail| async move { - Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_status( - &sumeragi, - accept.as_ref(), - &tail, - ))) - })) + .and_then( + |metrics_reporter, accept: Option, tail| async move { + Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_status( + &metrics_reporter, + accept.as_ref(), + &tail, + ))) + }, + )) .or(warp::path(uri::METRICS) - .and(add_state!(self.sumeragi)) - .and_then(|sumeragi| async move { + .and(add_state!(self.metrics_reporter)) + .and_then(|metrics_reporter| async move { Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_metrics( - &sumeragi, + &metrics_reporter, ))) })) .or(warp::path(uri::API_VERSION) diff --git a/torii/src/routing.rs b/torii/src/routing.rs index e071a8916ef..edff834fed4 100644 --- a/torii/src/routing.rs +++ b/torii/src/routing.rs @@ -9,8 +9,6 @@ use eyre::{eyre, WrapErr}; use futures::TryStreamExt; use iroha_config::client_api::ConfigDTO; -#[cfg(feature = "telemetry")] -use iroha_core::sumeragi::SumeragiHandle; use iroha_core::{query::store::LiveQueryStoreHandle, smartcontracts::query::ValidQueryRequest}; use iroha_data_model::{ block::{ @@ -296,16 +294,16 @@ pub async fn handle_version(state: Arc) -> Json { } #[cfg(feature = "telemetry")] -fn update_metrics_gracefully(sumeragi: &SumeragiHandle) { - if let Err(error) = sumeragi.update_metrics() { - iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`."); +fn update_metrics_gracefully(metrics_reporter: &MetricsReporter) { + if let Err(error) = metrics_reporter.update_metrics() { + iroha_logger::error!(%error, "Error while calling `metrics_reporter::update_metrics`."); } } #[cfg(feature = "telemetry")] -pub fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { - update_metrics_gracefully(sumeragi); - sumeragi +pub fn handle_metrics(metrics_reporter: &MetricsReporter) -> Result { + update_metrics_gracefully(metrics_reporter); + metrics_reporter .metrics() .try_to_string() .map_err(Error::Prometheus) @@ -314,14 +312,14 @@ pub fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { #[cfg(feature = "telemetry")] #[allow(clippy::unnecessary_wraps)] pub fn handle_status( - sumeragi: &SumeragiHandle, + metrics_reporter: &MetricsReporter, accept: Option>, tail: &warp::path::Tail, ) -> Result { use eyre::ContextCompat; - update_metrics_gracefully(sumeragi); - let status = Status::from(&sumeragi.metrics()); + update_metrics_gracefully(metrics_reporter); + let status = Status::from(&metrics_reporter.metrics()); let tail = tail.as_str(); if tail.is_empty() {