Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add Raft::data_metrics() and Raft::server_metrics() #990

Merged
merged 5 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ mod wait_condition;
use std::collections::BTreeMap;

pub use metric::Metric;
pub use raft_metrics::is_data_metrics_changed;
pub use raft_metrics::is_server_metrics_changed;
pub use raft_metrics::RaftDataMetrics;
pub use raft_metrics::RaftMetrics;
pub use raft_metrics::RaftServerMetrics;
pub use wait::Wait;
pub use wait::WaitError;
pub(crate) use wait_condition::Condition;
Expand Down
152 changes: 152 additions & 0 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,155 @@ where
}
}
}

pub fn is_data_metrics_changed<NID, N>(old: &RaftMetrics<NID, N>, new: &RaftMetrics<NID, N>) -> bool
where
NID: NodeId,
N: Node,
{
new.last_log_index.ne(&old.last_log_index)
|| new.last_applied.ne(&old.last_applied)
|| new.snapshot.ne(&old.snapshot)
|| new.purged.ne(&old.purged)
|| new.replication.ne(&old.replication)
}

pub fn is_server_metrics_changed<NID, N>(old: &RaftMetrics<NID, N>, new: &RaftMetrics<NID, N>) -> bool
where
NID: NodeId,
N: Node,
{
new.current_term.ne(&old.current_term)
|| new.vote.ne(&old.vote)
|| new.state.ne(&old.state)
|| new.current_leader.ne(&old.current_leader)
|| new.membership_config.ne(&old.membership_config)
}

/// Subset of RaftMetrics, only include data-related metrics
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftDataMetrics<NID>
where NID: NodeId
{
pub last_log_index: Option<u64>,
pub last_applied: Option<LogId<NID>>,
pub snapshot: Option<LogId<NID>>,
pub purged: Option<LogId<NID>>,
pub replication: Option<ReplicationMetrics<NID>>,
}

impl<NID> fmt::Display for RaftDataMetrics<NID>
where NID: NodeId
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DataMetrics{{")?;

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}",
DisplayOption(&self.last_log_index),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
)?;

write!(f, "}}")?;
Ok(())
}
}

impl<NID> MessageSummary<RaftDataMetrics<NID>> for RaftDataMetrics<NID>
where NID: NodeId
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<NID, N> From<RaftMetrics<NID, N>> for RaftDataMetrics<NID>
where
NID: NodeId,
N: Node,
{
fn from(metrics: RaftMetrics<NID, N>) -> Self {
Self {
last_log_index: metrics.last_log_index,
last_applied: metrics.last_applied,
snapshot: metrics.snapshot,
purged: metrics.purged,
replication: metrics.replication,
}
}
}

/// Subset of RaftMetrics, only include server-related metrics
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
pub id: NID,
pub current_term: u64,
pub vote: Vote<NID>,
pub state: ServerState,
pub current_leader: Option<NID>,
pub membership_config: Arc<StoredMembership<NID, N>>,
}

impl<NID, N> fmt::Display for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ServerMetrics{{")?;

write!(
f,
"id:{}, {:?}, term:{}, vote:{}, leader:{}, membership:{}",
self.id,
self.state,
self.current_term,
self.vote,
DisplayOption(&self.current_leader),
self.membership_config.summary(),
)?;

write!(f, "}}")?;
Ok(())
}
}

impl<NID, N> MessageSummary<RaftServerMetrics<NID, N>> for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<NID, N> From<RaftMetrics<NID, N>> for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn from(metrics: RaftMetrics<NID, N>) -> Self {
Self {
id: metrics.id,
current_term: metrics.current_term,
vote: metrics.vote,
state: metrics.state,
current_leader: metrics.current_leader,
membership_config: metrics.membership_config,
}
}
}
45 changes: 45 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::RaftError;
use crate::membership::IntoNodes;
use crate::metrics::is_data_metrics_changed;
use crate::metrics::is_server_metrics_changed;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::network::RaftNetworkFactory;
Expand Down Expand Up @@ -174,8 +178,37 @@ where C: RaftTypeConfig
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default());
let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default());
let (tx_shutdown, rx_shutdown) = oneshot::channel();

let mut raft_metrics_rx = rx_metrics.clone();

#[allow(clippy::let_underscore_future)]
let _ = C::AsyncRuntime::spawn(async move {
let mut last = RaftMetrics::new_initial(id);
loop {
let latest = raft_metrics_rx.borrow().clone();
if is_data_metrics_changed(&last, &latest) {
if let Err(err) = tx_data_metrics.send(latest.clone().into()) {
tracing::error!(error=%err, id=display(id), "error reporting data metrics");
}
}

if is_server_metrics_changed(&last, &latest) {
if let Err(err) = tx_server_metrics.send(latest.clone().into()) {
tracing::error!(error=%err, id=display(id), "error reporting server metrics");
}
}

last = latest;
if let Err(e) = raft_metrics_rx.changed().await {
tracing::info!(error=%e, id=display(id), "metrics sender closed, so close data_metrics sender and server_metrics sender");
return;
}
}
});

let tick_handle = Tick::spawn(
Duration::from_millis(config.heartbeat_interval * 3 / 2),
tx_notify.clone(),
Expand Down Expand Up @@ -240,6 +273,8 @@ where C: RaftTypeConfig
tick_handle,
tx_api,
rx_metrics,
rx_data_metrics,
rx_server_metrics,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
core_state: Mutex::new(CoreState::Running(core_handle)),
};
Expand Down Expand Up @@ -822,6 +857,16 @@ where C: RaftTypeConfig
self.inner.rx_metrics.clone()
}

/// Get a handle to the data metrics channel.
pub fn data_metrics(&self) -> watch::Receiver<RaftDataMetrics<C::NodeId>> {
self.inner.rx_data_metrics.clone()
}

/// Get a handle to the server metrics channel.
pub fn server_metrics(&self) -> watch::Receiver<RaftServerMetrics<C::NodeId, C::Node>> {
self.inner.rx_server_metrics.clone()
}

/// Get a handle to wait for the metrics to satisfy some condition.
///
/// If `timeout` is `None`, then it will wait forever(10 years).
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::AsyncRuntime;
use crate::Config;
Expand All @@ -28,6 +30,8 @@ where C: RaftTypeConfig
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,
pub(in crate::raft) rx_data_metrics: watch::Receiver<RaftDataMetrics<C::NodeId>>,
pub(in crate::raft) rx_server_metrics: watch::Receiver<RaftServerMetrics<C::NodeId, C::Node>>,

// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
Expand Down
1 change: 1 addition & 0 deletions tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod fixtures;

mod t10_current_leader;
mod t10_purged;
mod t10_server_metrics_and_data_metrics;
mod t20_metrics_state_machine_consistency;
mod t30_leader_metrics;
mod t40_metrics_wait;
54 changes: 54 additions & 0 deletions tests/tests/metrics/t10_server_metrics_and_data_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::Arc;

use anyhow::Result;
use maplit::btreeset;
use openraft::Config;
#[allow(unused_imports)] use pretty_assertions::assert_eq;
#[allow(unused_imports)] use pretty_assertions::assert_ne;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Server metrics and data metrics method should work.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn server_metrics_and_data_metrics() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let node = router.get_raft_handle(&0)?;
let mut server_metrics = node.server_metrics();
let data_metrics = node.data_metrics();

let current_leader = router.current_leader(0).await;
let leader = server_metrics.borrow_and_update().current_leader;
assert_eq!(leader, current_leader, "current_leader should be {:?}", current_leader);

// Write some logs.
let n = 10;
tracing::info!(log_index, "--- write {} logs", n);
log_index += router.client_request_many(0, "foo", n).await?;

let last_log_index = data_metrics.borrow().last_log_index;
assert_eq!(
last_log_index,
Some(log_index),
"last_log_index should be {:?}",
Some(log_index)
);
assert!(
!server_metrics.borrow().has_changed(),
"server metrics should not update"
);
Ok(())
}
Loading