Skip to content

Commit

Permalink
[refactor]: Move metrics related functionality into MetricsReporter
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Mar 21, 2024
1 parent 47f278d commit 5d0bad1
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 167 deletions.
12 changes: 10 additions & 2 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use iroha_core::{
handler::ThreadHandler,
kiso::KisoHandle,
kura::Kura,
metrics::MetricsReporter,
query::store::LiveQueryStore,
queue::Queue,
smartcontracts::isi::Registrable as _,
Expand Down Expand Up @@ -216,6 +217,7 @@ impl Iroha {
);

let kura = Kura::new(&config.kura)?;
let kura_thread_handler = Kura::start(Arc::clone(&kura));
let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start();

let block_count = kura.init()?;
Expand Down Expand Up @@ -257,7 +259,12 @@ impl Iroha {
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
};

let kura_thread_handler = Kura::start(Arc::clone(&kura));
let metrics_reporter = MetricsReporter::new(
Arc::clone(&state),
network.clone(),
kura.clone(),
queue.clone(),
);

let start_args = SumeragiStartArgs {
sumeragi_config: config.sumeragi.clone(),
Expand All @@ -269,6 +276,7 @@ impl Iroha {
network: network.clone(),
genesis_network: genesis,
block_count,
dropped_messages: metrics_reporter.metrics().dropped_messages.clone(),
};
// Starting Sumeragi requires no async context enabled
let sumeragi = tokio::task::spawn_blocking(move || SumeragiHandle::start(start_args))
Expand Down Expand Up @@ -322,10 +330,10 @@ impl Iroha {
Arc::clone(&queue),
events_sender,
Arc::clone(&notify_shutdown),
sumeragi.clone(),
live_query_store_handle,
Arc::clone(&kura),
Arc::clone(&state),
metrics_reporter,
);

Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone());
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod executor;
pub mod gossiper;
pub mod kiso;
pub mod kura;
pub mod metrics;
pub mod query;
pub mod queue;
pub mod smartcontracts;
Expand Down
163 changes: 163 additions & 0 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! Metrics and status reporting
use std::sync::Arc;

use eyre::{Result, WrapErr as _};
use iroha_data_model::current_time;
use iroha_telemetry::metrics::Metrics;
use parking_lot::Mutex;
use storage::storage::StorageReadOnly;

use crate::{
kura::Kura,
queue::Queue,
state::{State, StateReadOnly, WorldReadOnly},
IrohaNetwork,
};

/*
The values in the following struct are not atomics because the code that
operates on them assumes their values does not change during the course of
the function.
*/
#[derive(Debug)]
struct LastUpdateMetricsData {
block_height: u64,
}

/// Responsible for collecting and updating metrics
#[derive(Clone)]
pub struct MetricsReporter {
state: Arc<State>,
network: IrohaNetwork,
kura: Arc<Kura>,
queue: Arc<Queue>,
metrics: Metrics,
last_update_metrics_mutex: Arc<Mutex<LastUpdateMetricsData>>,
}

impl MetricsReporter {
/// Construct [`Self`]
pub fn new(
state: Arc<State>,
network: IrohaNetwork,
kura: Arc<Kura>,
queue: Arc<Queue>,
) -> Self {
Self {
state,
network,
queue,
kura,
metrics: Metrics::default(),
last_update_metrics_mutex: Arc::new(Mutex::new(LastUpdateMetricsData {
block_height: 0,
})),
}
}

/// Update the metrics on the state.
///
/// # Errors
/// - Domains fail to compose
///
/// # Panics
/// - If either mutex is poisoned
#[allow(clippy::cast_precision_loss)]
pub fn update_metrics(&self) -> Result<()> {
let online_peers_count: u64 = self
.network
.online_peers(
#[allow(clippy::disallowed_types)]
std::collections::HashSet::len,
)
.try_into()
.expect("casting usize to u64");

let state_view = self.state.view();

let mut last_guard = self.last_update_metrics_mutex.lock();

let start_index = last_guard.block_height;
{
let mut block_index = start_index;
while block_index < state_view.height() {
let Some(block) = self.kura.get_block_by_height(block_index + 1) else {
break;
};
block_index += 1;
let mut block_txs_accepted = 0;
let mut block_txs_rejected = 0;
for tx in block.transactions() {
if tx.error.is_none() {
block_txs_accepted += 1;
} else {
block_txs_rejected += 1;
}
}

self.metrics
.txs
.with_label_values(&["accepted"])
.inc_by(block_txs_accepted);
self.metrics
.txs
.with_label_values(&["rejected"])
.inc_by(block_txs_rejected);
self.metrics
.txs
.with_label_values(&["total"])
.inc_by(block_txs_accepted + block_txs_rejected);
self.metrics.block_height.inc();
}
last_guard.block_height = block_index;
}

let new_tx_amounts = {
let mut new_buf = Vec::new();
core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock());
new_buf
};

for amount in &new_tx_amounts {
self.metrics.tx_amounts.observe(*amount);
}

#[allow(clippy::cast_possible_truncation)]
if let Some(timestamp) = state_view.genesis_timestamp() {
// this will overflow in 584942417years.
self.metrics.uptime_since_genesis_ms.set(
(current_time() - timestamp)
.as_millis()
.try_into()
.expect("Timestamp should fit into u64"),
)
};

self.metrics.connected_peers.set(online_peers_count);

self.metrics
.domains
.set(state_view.world().domains().len() as u64);
for domain in state_view.world().domains_iter() {
self.metrics
.accounts
.get_metric_with_label_values(&[domain.id.name.as_ref()])
.wrap_err("Failed to compose domains")?
.set(domain.accounts.len() as u64);
}

self.metrics
.view_changes
.set(state_view.latest_block_view_change_index());

self.metrics.queue_size.set(self.queue.tx_len() as u64);

Ok(())
}

/// Access node metrics.
pub fn metrics(&self) -> &Metrics {
&self.metrics
}
}
Loading

0 comments on commit 5d0bad1

Please sign in to comment.