Skip to content

Commit

Permalink
[refactor]: Move metrics related functionality into MetricsReporter
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Apr 19, 2024
1 parent d3f21bb commit 4748a33
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 173 deletions.
16 changes: 13 additions & 3 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::Arc};
use clap::Parser;
use color_eyre::eyre::{eyre, Result, WrapErr};
use iroha_config::parameters::{actual::Root as Config, user::CliContext};
#[cfg(feature = "telemetry")]
use iroha_core::metrics::MetricsReporter;
use iroha_core::{
block_sync::{BlockSynchronizer, BlockSynchronizerHandle},
gossiper::{TransactionGossiper, TransactionGossiperHandle},
Expand Down Expand Up @@ -214,6 +216,7 @@ impl Iroha {
);

let kura = Kura::new(&config.kura)?;
let kura_thread_handler = Kura::start(Arc::clone(&kura));
let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start();

let block_count = kura.init()?;
Expand Down Expand Up @@ -254,7 +257,13 @@ impl Iroha {
#[cfg(feature = "telemetry")]
Self::start_telemetry(&logger, &config).await?;

let kura_thread_handler = Kura::start(Arc::clone(&kura));
#[cfg(feature = "telemetry")]
let metrics_reporter = MetricsReporter::new(
Arc::clone(&state),
network.clone(),
kura.clone(),
queue.clone(),
);

let start_args = SumeragiStartArgs {
sumeragi_config: config.sumeragi.clone(),
Expand All @@ -269,6 +278,7 @@ impl Iroha {
public_key: config.genesis.public_key().clone(),
},
block_count,
dropped_messages: metrics_reporter.metrics().dropped_messages.clone(),
};
// Starting Sumeragi requires no async context enabled
let sumeragi = tokio::task::spawn_blocking(move || SumeragiHandle::start(start_args))
Expand Down Expand Up @@ -322,11 +332,11 @@ impl Iroha {
Arc::clone(&queue),
events_sender,
Arc::clone(&notify_shutdown),
#[cfg(feature = "telemetry")]
sumeragi.clone(),
live_query_store_handle,
Arc::clone(&kura),
Arc::clone(&state),
#[cfg(feature = "telemetry")]
metrics_reporter,
);

Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone());
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod executor;
pub mod gossiper;
pub mod kiso;
pub mod kura;
pub mod metrics;
pub mod query;
pub mod queue;
pub mod smartcontracts;
Expand Down
155 changes: 155 additions & 0 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! Metrics and status reporting
use std::{sync::Arc, time::SystemTime};

use eyre::{Result, WrapErr as _};
use iroha_telemetry::metrics::Metrics;
use parking_lot::Mutex;
use storage::storage::StorageReadOnly;

use crate::{
kura::Kura,
queue::Queue,
state::{State, StateReadOnly, WorldReadOnly},
IrohaNetwork,
};

/// Responsible for collecting and updating metrics
#[derive(Clone)]
pub struct MetricsReporter {
state: Arc<State>,
network: IrohaNetwork,
kura: Arc<Kura>,
queue: Arc<Queue>,
metrics: Metrics,
/// Latest observed and processed height by metrics reporter
latest_block_height: Arc<Mutex<u64>>,
}

impl MetricsReporter {
/// Construct [`Self`]
pub fn new(
state: Arc<State>,
network: IrohaNetwork,
kura: Arc<Kura>,
queue: Arc<Queue>,
) -> Self {
Self {
state,
network,
queue,
kura,
metrics: Metrics::default(),
latest_block_height: Arc::new(Mutex::new(0)),
}
}

/// Update the metrics on the state.
///
/// # Errors
/// - Domains fail to compose
///
/// # Panics
/// - If either mutex is poisoned
#[allow(clippy::cast_precision_loss)]
pub fn update_metrics(&self) -> Result<()> {
let online_peers_count: u64 = self
.network
.online_peers(
#[allow(clippy::disallowed_types)]
std::collections::HashSet::len,
)
.try_into()
.expect("casting usize to u64");

let state_view = self.state.view();

let mut lastest_block_height = self.latest_block_height.lock();

let start_index = *lastest_block_height;
{
let mut block_index = start_index;
while block_index < state_view.height() {
let Some(block) = self.kura.get_block_by_height(block_index + 1) else {
break;
};
block_index += 1;
let mut block_txs_accepted = 0;
let mut block_txs_rejected = 0;
for tx in block.transactions() {
if tx.error.is_none() {
block_txs_accepted += 1;
} else {
block_txs_rejected += 1;
}
}

self.metrics
.txs
.with_label_values(&["accepted"])
.inc_by(block_txs_accepted);
self.metrics
.txs
.with_label_values(&["rejected"])
.inc_by(block_txs_rejected);
self.metrics
.txs
.with_label_values(&["total"])
.inc_by(block_txs_accepted + block_txs_rejected);
self.metrics.block_height.inc();
}
*lastest_block_height = block_index;
}

let new_tx_amounts = {
let mut new_buf = Vec::new();
core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock());
new_buf
};

for amount in &new_tx_amounts {
self.metrics.tx_amounts.observe(*amount);
}

#[allow(clippy::cast_possible_truncation)]
if let Some(timestamp) = state_view.genesis_timestamp() {
let curr_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get the current system time");

// this will overflow in 584942417years.
self.metrics.uptime_since_genesis_ms.set(
(curr_time - timestamp)
.as_millis()
.try_into()
.expect("Timestamp should fit into u64"),
)
};

self.metrics.connected_peers.set(online_peers_count);

self.metrics
.domains
.set(state_view.world().domains().len() as u64);
for domain in state_view.world().domains_iter() {
self.metrics
.accounts
.get_metric_with_label_values(&[domain.id.name.as_ref()])
.wrap_err("Failed to compose domains")?
.set(domain.accounts.len() as u64);
}

self.metrics
.view_changes
.set(state_view.latest_block_view_change_index());

self.metrics.queue_size.set(self.queue.tx_len() as u64);

Ok(())
}

/// Access node metrics.
pub fn metrics(&self) -> &Metrics {
&self.metrics
}
}
Loading

0 comments on commit 4748a33

Please sign in to comment.