From 5d0bad135cc41c063194a5ec32d39212601d9d27 Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Thu, 21 Mar 2024 12:17:42 +0300 Subject: [PATCH] [refactor]: Move metrics related functionality into MetricsReporter Signed-off-by: Shanin Roman --- cli/src/lib.rs | 12 ++- core/src/lib.rs | 1 + core/src/metrics.rs | 163 +++++++++++++++++++++++++++++++++++++++ core/src/sumeragi/mod.rs | 144 ++-------------------------------- telemetry/src/metrics.rs | 2 +- torii/src/lib.rs | 32 ++++---- torii/src/routing.rs | 23 +++--- 7 files changed, 210 insertions(+), 167 deletions(-) create mode 100644 core/src/metrics.rs diff --git a/cli/src/lib.rs b/cli/src/lib.rs index eb631467a21..53e1cee2c3a 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -16,6 +16,7 @@ use iroha_core::{ handler::ThreadHandler, kiso::KisoHandle, kura::Kura, + metrics::MetricsReporter, query::store::LiveQueryStore, queue::Queue, smartcontracts::isi::Registrable as _, @@ -216,6 +217,7 @@ impl Iroha { ); let kura = Kura::new(&config.kura)?; + let kura_thread_handler = Kura::start(Arc::clone(&kura)); let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start(); let block_count = kura.init()?; @@ -257,7 +259,12 @@ impl Iroha { TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"), }; - let kura_thread_handler = Kura::start(Arc::clone(&kura)); + let metrics_reporter = MetricsReporter::new( + Arc::clone(&state), + network.clone(), + kura.clone(), + queue.clone(), + ); let start_args = SumeragiStartArgs { sumeragi_config: config.sumeragi.clone(), @@ -269,6 +276,7 @@ impl Iroha { network: network.clone(), genesis_network: genesis, block_count, + dropped_messages: metrics_reporter.metrics().dropped_messages.clone(), }; // Starting Sumeragi requires no async context enabled let sumeragi = tokio::task::spawn_blocking(move || SumeragiHandle::start(start_args)) @@ -322,10 +330,10 @@ impl Iroha { Arc::clone(&queue), events_sender, Arc::clone(¬ify_shutdown), - sumeragi.clone(), live_query_store_handle, Arc::clone(&kura), Arc::clone(&state), + metrics_reporter, ); Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone()); diff --git a/core/src/lib.rs b/core/src/lib.rs index ab0b9be0d6b..aabbc60f240 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,6 +6,7 @@ pub mod executor; pub mod gossiper; pub mod kiso; pub mod kura; +pub mod metrics; pub mod query; pub mod queue; pub mod smartcontracts; diff --git a/core/src/metrics.rs b/core/src/metrics.rs new file mode 100644 index 00000000000..845a99e40b5 --- /dev/null +++ b/core/src/metrics.rs @@ -0,0 +1,163 @@ +//! Metrics and status reporting + +use std::sync::Arc; + +use eyre::{Result, WrapErr as _}; +use iroha_data_model::current_time; +use iroha_telemetry::metrics::Metrics; +use parking_lot::Mutex; +use storage::storage::StorageReadOnly; + +use crate::{ + kura::Kura, + queue::Queue, + state::{State, StateReadOnly, WorldReadOnly}, + IrohaNetwork, +}; + +/* +The values in the following struct are not atomics because the code that +operates on them assumes their values does not change during the course of +the function. +*/ +#[derive(Debug)] +struct LastUpdateMetricsData { + block_height: u64, +} + +/// Responsible for collecting and updating metrics +#[derive(Clone)] +pub struct MetricsReporter { + state: Arc, + network: IrohaNetwork, + kura: Arc, + queue: Arc, + metrics: Metrics, + last_update_metrics_mutex: Arc>, +} + +impl MetricsReporter { + /// Construct [`Self`] + pub fn new( + state: Arc, + network: IrohaNetwork, + kura: Arc, + queue: Arc, + ) -> Self { + Self { + state, + network, + queue, + kura, + metrics: Metrics::default(), + last_update_metrics_mutex: Arc::new(Mutex::new(LastUpdateMetricsData { + block_height: 0, + })), + } + } + + /// Update the metrics on the state. + /// + /// # Errors + /// - Domains fail to compose + /// + /// # Panics + /// - If either mutex is poisoned + #[allow(clippy::cast_precision_loss)] + pub fn update_metrics(&self) -> Result<()> { + let online_peers_count: u64 = self + .network + .online_peers( + #[allow(clippy::disallowed_types)] + std::collections::HashSet::len, + ) + .try_into() + .expect("casting usize to u64"); + + let state_view = self.state.view(); + + let mut last_guard = self.last_update_metrics_mutex.lock(); + + let start_index = last_guard.block_height; + { + let mut block_index = start_index; + while block_index < state_view.height() { + let Some(block) = self.kura.get_block_by_height(block_index + 1) else { + break; + }; + block_index += 1; + let mut block_txs_accepted = 0; + let mut block_txs_rejected = 0; + for tx in block.transactions() { + if tx.error.is_none() { + block_txs_accepted += 1; + } else { + block_txs_rejected += 1; + } + } + + self.metrics + .txs + .with_label_values(&["accepted"]) + .inc_by(block_txs_accepted); + self.metrics + .txs + .with_label_values(&["rejected"]) + .inc_by(block_txs_rejected); + self.metrics + .txs + .with_label_values(&["total"]) + .inc_by(block_txs_accepted + block_txs_rejected); + self.metrics.block_height.inc(); + } + last_guard.block_height = block_index; + } + + let new_tx_amounts = { + let mut new_buf = Vec::new(); + core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock()); + new_buf + }; + + for amount in &new_tx_amounts { + self.metrics.tx_amounts.observe(*amount); + } + + #[allow(clippy::cast_possible_truncation)] + if let Some(timestamp) = state_view.genesis_timestamp() { + // this will overflow in 584942417years. + self.metrics.uptime_since_genesis_ms.set( + (current_time() - timestamp) + .as_millis() + .try_into() + .expect("Timestamp should fit into u64"), + ) + }; + + self.metrics.connected_peers.set(online_peers_count); + + self.metrics + .domains + .set(state_view.world().domains().len() as u64); + for domain in state_view.world().domains_iter() { + self.metrics + .accounts + .get_metric_with_label_values(&[domain.id.name.as_ref()]) + .wrap_err("Failed to compose domains")? + .set(domain.accounts.len() as u64); + } + + self.metrics + .view_changes + .set(state_view.latest_block_view_change_index()); + + self.metrics.queue_size.set(self.queue.tx_len() as u64); + + Ok(()) + } + + /// Access node metrics. + pub fn metrics(&self) -> &Metrics { + &self.metrics + } +} diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index 1e10895b992..e4d011148c3 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -7,15 +7,13 @@ use std::{ time::{Duration, Instant}, }; -use eyre::{Result, WrapErr as _}; +use eyre::Result; use iroha_config::parameters::actual::{Common as CommonConfig, Sumeragi as SumeragiConfig}; use iroha_crypto::{KeyPair, SignatureOf}; use iroha_data_model::{block::SignedBlock, prelude::*}; use iroha_genesis::GenesisNetwork; use iroha_logger::prelude::*; -use iroha_telemetry::metrics::Metrics; use network_topology::{Role, Topology}; -use storage::storage::StorageReadOnly; use crate::{ block::ValidBlock, @@ -29,30 +27,14 @@ pub mod message; pub mod network_topology; pub mod view_change; -use parking_lot::Mutex; - use self::{message::*, view_change::ProofChain}; use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, NetworkMessage}; -/* -The values in the following struct are not atomics because the code that -operates on them assumes their values does not change during the course of -the function. -*/ -#[derive(Debug)] -struct LastUpdateMetricsData { - block_height: u64, -} - /// Handle to `Sumeragi` actor #[derive(Clone)] pub struct SumeragiHandle { - state: Arc, - metrics: Metrics, - last_update_metrics_mutex: Arc>, - network: IrohaNetwork, - kura: Arc, - queue: Arc, + /// Counter for amount of dropped messages by sumeragi + dropped_messages: iroha_telemetry::metrics::IntCounter, _thread_handle: Arc, // Should be dropped after `_thread_handle` to prevent sumeargi thread from panicking control_message_sender: mpsc::SyncSender, @@ -60,115 +42,10 @@ pub struct SumeragiHandle { } impl SumeragiHandle { - /// Update the metrics on the world state view. - /// - /// # Errors - /// - Domains fail to compose - /// - /// # Panics - /// - If either mutex is poisoned - #[allow(clippy::cast_precision_loss)] - pub fn update_metrics(&self) -> Result<()> { - let online_peers_count: u64 = self - .network - .online_peers( - #[allow(clippy::disallowed_types)] - std::collections::HashSet::len, - ) - .try_into() - .expect("casting usize to u64"); - - let state_view = self.state.view(); - - let mut last_guard = self.last_update_metrics_mutex.lock(); - - let start_index = last_guard.block_height; - { - let mut block_index = start_index; - while block_index < state_view.height() { - let Some(block) = self.kura.get_block_by_height(block_index + 1) else { - break; - }; - block_index += 1; - let mut block_txs_accepted = 0; - let mut block_txs_rejected = 0; - for tx in block.transactions() { - if tx.error.is_none() { - block_txs_accepted += 1; - } else { - block_txs_rejected += 1; - } - } - - self.metrics - .txs - .with_label_values(&["accepted"]) - .inc_by(block_txs_accepted); - self.metrics - .txs - .with_label_values(&["rejected"]) - .inc_by(block_txs_rejected); - self.metrics - .txs - .with_label_values(&["total"]) - .inc_by(block_txs_accepted + block_txs_rejected); - self.metrics.block_height.inc(); - } - last_guard.block_height = block_index; - } - - let new_tx_amounts = { - let mut new_buf = Vec::new(); - core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock()); - new_buf - }; - - for amount in &new_tx_amounts { - self.metrics.tx_amounts.observe(*amount); - } - - #[allow(clippy::cast_possible_truncation)] - if let Some(timestamp) = state_view.genesis_timestamp() { - // this will overflow in 584942417years. - self.metrics.uptime_since_genesis_ms.set( - (current_time() - timestamp) - .as_millis() - .try_into() - .expect("Timestamp should fit into u64"), - ) - }; - - self.metrics.connected_peers.set(online_peers_count); - - self.metrics - .domains - .set(state_view.world().domains().len() as u64); - for domain in state_view.world().domains_iter() { - self.metrics - .accounts - .get_metric_with_label_values(&[domain.id().name.as_ref()]) - .wrap_err("Failed to compose domains")? - .set(domain.accounts.len() as u64); - } - - self.metrics - .view_changes - .set(state_view.latest_block_view_change_index()); - - self.metrics.queue_size.set(self.queue.tx_len() as u64); - - Ok(()) - } - - /// Access node metrics. - pub fn metrics(&self) -> &Metrics { - &self.metrics - } - /// Deposit a sumeragi control flow network message. pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) { if let Err(error) = self.control_message_sender.try_send(msg) { - self.metrics.dropped_messages.inc(); + self.dropped_messages.inc(); error!( ?error, "This peer is faulty. \ @@ -180,7 +57,7 @@ impl SumeragiHandle { /// Deposit a sumeragi network message. pub fn incoming_block_message(&self, msg: BlockMessage) { if let Err(error) = self.message_sender.try_send(msg) { - self.metrics.dropped_messages.inc(); + self.dropped_messages.inc(); error!( ?error, "This peer is faulty. \ @@ -235,6 +112,7 @@ impl SumeragiHandle { network, genesis_network, block_count: BlockCount(block_count), + dropped_messages, }: SumeragiStartArgs, ) -> SumeragiHandle { let (control_message_sender, control_message_receiver) = mpsc::sync_channel(100); @@ -331,16 +209,9 @@ impl SumeragiHandle { let thread_handle = ThreadHandler::new(Box::new(shutdown), thread_handle); SumeragiHandle { - state, - network, - queue, - kura, + dropped_messages, control_message_sender, message_sender, - metrics: Metrics::default(), - last_update_metrics_mutex: Arc::new(Mutex::new(LastUpdateMetricsData { - block_height: 0, - })), _thread_handle: Arc::new(thread_handle), } } @@ -401,4 +272,5 @@ pub struct SumeragiStartArgs { pub network: IrohaNetwork, pub genesis_network: Option, pub block_count: BlockCount, + pub dropped_messages: iroha_telemetry::metrics::IntCounter, } diff --git a/telemetry/src/metrics.rs b/telemetry/src/metrics.rs index 404e32c3916..13833b6b3fa 100644 --- a/telemetry/src/metrics.rs +++ b/telemetry/src/metrics.rs @@ -6,7 +6,7 @@ use std::{ }; use parity_scale_codec::{Compact, Decode, Encode}; -use prometheus::{ +pub use prometheus::{ core::{AtomicU64, GenericGauge, GenericGaugeVec}, Encoder, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, Opts, Registry, }; diff --git a/torii/src/lib.rs b/torii/src/lib.rs index 46b37d5cfe0..d75a8c24bc9 100644 --- a/torii/src/lib.rs +++ b/torii/src/lib.rs @@ -17,11 +17,11 @@ use iroha_config::parameters::actual::Torii as Config; use iroha_core::{ kiso::{Error as KisoError, KisoHandle}, kura::Kura, + metrics::MetricsReporter, prelude::*, query::store::LiveQueryStoreHandle, queue::{self, Queue}, state::State, - sumeragi::SumeragiHandle, EventsSender, }; use iroha_data_model::ChainId; @@ -49,12 +49,12 @@ pub struct Torii { queue: Arc, events: EventsSender, notify_shutdown: Arc, - sumeragi: SumeragiHandle, query_service: LiveQueryStoreHandle, kura: Arc, transaction_max_content_length: u64, address: SocketAddr, state: Arc, + metrics_reporter: MetricsReporter, } impl Torii { @@ -67,10 +67,10 @@ impl Torii { queue: Arc, events: EventsSender, notify_shutdown: Arc, - sumeragi: SumeragiHandle, query_service: LiveQueryStoreHandle, kura: Arc, state: Arc, + metrics_reporter: MetricsReporter, ) -> Self { Self { chain_id: Arc::new(chain_id), @@ -78,10 +78,10 @@ impl Torii { queue, events, notify_shutdown, - sumeragi, query_service, kura, state, + metrics_reporter, address: config.address, transaction_max_content_length: config.max_content_len_bytes, } @@ -105,21 +105,23 @@ impl Torii { #[cfg(feature = "telemetry")] let get_router = get_router .or(warp::path(uri::STATUS) - .and(add_state!(self.sumeragi.clone())) + .and(add_state!(self.metrics_reporter.clone())) .and(warp::header::optional(warp::http::header::ACCEPT.as_str())) .and(warp::path::tail()) - .and_then(|sumeragi, accept: Option, tail| async move { - Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_status( - &sumeragi, - accept.as_ref(), - &tail, - ))) - })) + .and_then( + |metrics_reporter, accept: Option, tail| async move { + Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_status( + &metrics_reporter, + accept.as_ref(), + &tail, + ))) + }, + )) .or(warp::path(uri::METRICS) - .and(add_state!(self.sumeragi)) - .and_then(|sumeragi| async move { + .and(add_state!(self.metrics_reporter)) + .and_then(|metrics_reporter| async move { Ok::<_, Infallible>(crate::utils::WarpResult(routing::handle_metrics( - &sumeragi, + &metrics_reporter, ))) })) .or(warp::path(uri::API_VERSION) diff --git a/torii/src/routing.rs b/torii/src/routing.rs index 89095543651..cd893ce472e 100644 --- a/torii/src/routing.rs +++ b/torii/src/routing.rs @@ -9,10 +9,7 @@ use eyre::{eyre, WrapErr}; use futures::TryStreamExt; use iroha_config::client_api::ConfigDTO; -use iroha_core::{ - query::store::LiveQueryStoreHandle, smartcontracts::query::ValidQueryRequest, - sumeragi::SumeragiHandle, -}; +use iroha_core::{query::store::LiveQueryStoreHandle, smartcontracts::query::ValidQueryRequest}; use iroha_data_model::{ block::{ stream::{BlockMessage, BlockSubscriptionRequest}, @@ -297,16 +294,16 @@ pub async fn handle_version(state: Arc) -> Json { } #[cfg(feature = "telemetry")] -fn update_metrics_gracefully(sumeragi: &SumeragiHandle) { - if let Err(error) = sumeragi.update_metrics() { - iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`."); +fn update_metrics_gracefully(metrics_reporter: &MetricsReporter) { + if let Err(error) = metrics_reporter.update_metrics() { + iroha_logger::error!(%error, "Error while calling `metrics_reporter::update_metrics`."); } } #[cfg(feature = "telemetry")] -pub fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { - update_metrics_gracefully(sumeragi); - sumeragi +pub fn handle_metrics(metrics_reporter: &MetricsReporter) -> Result { + update_metrics_gracefully(metrics_reporter); + metrics_reporter .metrics() .try_to_string() .map_err(Error::Prometheus) @@ -315,14 +312,14 @@ pub fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { #[cfg(feature = "telemetry")] #[allow(clippy::unnecessary_wraps)] pub fn handle_status( - sumeragi: &SumeragiHandle, + metrics_reporter: &MetricsReporter, accept: Option>, tail: &warp::path::Tail, ) -> Result { use eyre::ContextCompat; - update_metrics_gracefully(sumeragi); - let status = Status::from(&sumeragi.metrics()); + update_metrics_gracefully(metrics_reporter); + let status = Status::from(&metrics_reporter.metrics()); let tail = tail.as_str(); if tail.is_empty() {