Skip to content

Commit

Permalink
Feature: Raft::trigger()::allow_next_revert() allow to reset replic…
Browse files Browse the repository at this point in the history
…ation for next detected follower log revert

This method requests the RaftCore to allow to reset replication for a
specific node when log revert is detected.

- `allow=true`: This method instructs the RaftCore to allow the target
  node's log to revert to a previous state for one time.

- `allow=false`: This method instructs the RaftCore to panic if the
  target node's log revert

This method returns `Fatal` error if failed to send the request to
RaftCore, e.g. when RaftCore is shut down.
Otherwise, it returns a `Ok(Result<_,_>)`, the inner result is:
- `Ok(())` if the request is successfully processed,
- or `Err(AllowNextRevertError)` explaining why the request is rejected.

### Behavior

- If this node is the Leader, it will attempt to replicate logs to the
  target node from the beginning.
- If this node is not the Leader, the request is ignored.
- If the target node is not found, the request is ignored.

### Automatic Replication Reset

When the `loosen-follower-log-revert` feature flag is enabled, the
Leader automatically reset replication if it detects that the target
node's log has reverted. This feature is primarily useful in testing
environments.

### Production Considerations

In production environments, state reversion is a critical issue that
should not be automatically handled. However, there may be scenarios
where a Follower's data is intentionally removed and needs to rejoin the
cluster(without membership changes). In such cases, the Leader should
reinitialize replication for that node with the following steps:

- Shut down the target node.
- call [`Self::allow_next_revert`] on the Leader.
- Clear the target node's data directory.
- Restart the target node.

- Fix: #1251
  • Loading branch information
drmingdrmer committed Oct 31, 2024
1 parent 3ab0d49 commit 47e0220
Show file tree
Hide file tree
Showing 14 changed files with 349 additions and 33 deletions.
15 changes: 15 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::engine::ReplicationProgress;
use crate::engine::Respond;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::error::AllowNextRevertError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
Expand Down Expand Up @@ -1312,6 +1313,20 @@ where
ExternalCommand::TriggerTransferLeader { to } => {
self.engine.trigger_transfer_leader(to);
}
ExternalCommand::AllowNextRevert { to, allow, tx } => {
//
let res = match self.engine.leader_handler() {
Ok(mut l) => {
let res = l.replication_handler().allow_next_revert(to, allow);
res.map_err(AllowNextRevertError::from)
}
Err(e) => {
tracing::warn!("AllowNextRevert: current node is not a Leader");
Err(AllowNextRevertError::from(e))
}
};
let _ = tx.send(res);
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
let res = self.sm_handle.send(sm_cmd);
if let Err(e) = res {
Expand Down
16 changes: 16 additions & 0 deletions openraft/src/core/raft_msg/external_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt;

use crate::core::raft_msg::ResultSender;
use crate::core::sm;
use crate::error::AllowNextRevertError;
use crate::RaftTypeConfig;
use crate::Snapshot;

Expand Down Expand Up @@ -36,6 +37,13 @@ pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
/// Submit a command to inform RaftCore to transfer leadership to the specified node.
TriggerTransferLeader { to: C::NodeId },

/// Allow or not the next revert of the replication to the specified node.
AllowNextRevert {
to: C::NodeId,
allow: bool,
tx: ResultSender<C, (), AllowNextRevertError<C>>,
},

/// Send a [`sm::Command`] to [`sm::worker::Worker`].
/// This command is run in the sm task.
StateMachineCommand { sm_cmd: sm::Command<C> },
Expand Down Expand Up @@ -72,6 +80,14 @@ where C: RaftTypeConfig
ExternalCommand::TriggerTransferLeader { to } => {
write!(f, "TriggerTransferLeader: to {}", to)
}
ExternalCommand::AllowNextRevert { to, allow, .. } => {
write!(
f,
"{}-on-next-log-revert: to {}",
if *allow { "AllowReset" } else { "Panic" },
to
)
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
write!(f, "StateMachineCommand: {}", sm_cmd)
}
Expand Down
30 changes: 30 additions & 0 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::engine::ReplicationProgress;
use crate::error::NodeNotFound;
use crate::error::Operation;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
Expand Down Expand Up @@ -216,6 +218,34 @@ where C: RaftTypeConfig
prog_entry.update_conflicting(conflict.index);
}

/// Enable one-time replication reset for a specific node upon log reversion detection.
///
/// This method sets a flag to allow the replication process to be reset once for the specified
/// target node when a log reversion is detected. This is typically used to handle scenarios
/// where a follower node's log has unexpectedly reverted to a previous state.
///
/// # Behavior
///
/// - Sets the `reset_on_reversion` flag to `true` for the specified node in the leader's
/// progress tracker.
/// - This flag will be consumed upon the next log reversion detection, allowing for a one-time
/// reset.
/// - If the node is not found in the progress tracker, this method ignore it.
pub(crate) fn allow_next_revert(&mut self, target: C::NodeId, allow: bool) -> Result<(), NodeNotFound<C>> {
let Some(prog_entry) = self.leader.progress.get_mut(&target) else {
tracing::warn!(
"target node {} not found in progress tracker, when {}",
target,
func_name!()
);
return Err(NodeNotFound::new(target, Operation::AllowNextRevert));
};

prog_entry.reset_on_reversion = allow;

Ok(())
}

/// Update replication progress when a response is received.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result<ReplicationResult<C>, String>) {
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
targets: vec![ReplicationProgress(3, ProgressEntry {
matching: None,
inflight: Inflight::None,
searching_end: 4
searching_end: 4,
reset_on_reversion: false,
})]
},
Command::AppendInputEntries {
Expand Down Expand Up @@ -128,7 +129,8 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> {
targets: vec![ReplicationProgress(3, ProgressEntry {
matching: None,
inflight: Inflight::None,
searching_end: 7
searching_end: 7,
reset_on_reversion: false,
})]
},
Command::Replicate {
Expand Down
6 changes: 6 additions & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! Error types exposed by this crate.
mod allow_next_revert_error;
pub mod decompose;
pub mod into_ok;
mod invalid_sm;
mod node_not_found;
mod operation;
mod replication_closed;
mod streaming_error;

Expand All @@ -14,7 +17,10 @@ use std::time::Duration;

use anyerror::AnyError;

pub use self::allow_next_revert_error::AllowNextRevertError;
pub use self::invalid_sm::InvalidStateMachineType;
pub use self::node_not_found::NodeNotFound;
pub use self::operation::Operation;
pub use self::replication_closed::ReplicationClosed;
pub use self::streaming_error::StreamingError;
use crate::network::RPCTypes;
Expand Down
12 changes: 12 additions & 0 deletions openraft/src/error/allow_next_revert_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use crate::error::ForwardToLeader;
use crate::error::NodeNotFound;
use crate::RaftTypeConfig;

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum AllowNextRevertError<C: RaftTypeConfig> {
#[error("Can not set allow_next_revert; error: {0}")]
NodeNotFound(#[from] NodeNotFound<C>),
#[error("Can not set allow_next_revert; error: {0}")]
ForwardToLeader(#[from] ForwardToLeader<C>),
}
16 changes: 16 additions & 0 deletions openraft/src/error/node_not_found.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::error::Operation;
use crate::RaftTypeConfig;

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("Node {node_id} not found when: ({operation})")]
pub struct NodeNotFound<C: RaftTypeConfig> {
pub node_id: C::NodeId,
pub operation: Operation,
}

impl<C: RaftTypeConfig> NodeNotFound<C> {
pub fn new(node_id: C::NodeId, operation: Operation) -> Self {
Self { node_id, operation }
}
}
49 changes: 49 additions & 0 deletions openraft/src/error/operation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::fmt;

/// Operations that can be executed on a Raft node.
///
/// These commands represent operations that affect the Raft node's behavior or state. They are
/// primarily used in error reporting to provide context about what operation was attempted when an
/// error occurred.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum Operation {
/// Set a flag to allow a target replication state to revert to a previous state for one time.
AllowNextRevert,

/// Transfer leadership to the specified node.
TransferLeader,

/// Send a heartbeat message to a follower or learner.
SendHeartbeat,

/// Receive a snapshot.
ReceiveSnapshot,

/// Install a snapshot.
InstallSnapshot,

/// Write application data via Raft protocol.
ClientWrite,

/// Initialize an empty Raft node with a cluster membership config.
Initialize,

/// Start an election.
Elect,
}

impl fmt::Display for Operation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Operation::AllowNextRevert => write!(f, "set flag to allow replication revert for once"),
Operation::TransferLeader => write!(f, "transfer leadership"),
Operation::SendHeartbeat => write!(f, "send heartbeat"),
Operation::ReceiveSnapshot => write!(f, "receive snapshot"),
Operation::InstallSnapshot => write!(f, "install snapshot"),
Operation::ClientWrite => write!(f, "write application data"),
Operation::Initialize => write!(f, "initialize"),
Operation::Elect => write!(f, "elect"),
}
}
}
24 changes: 21 additions & 3 deletions openraft/src/progress/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ where C: RaftTypeConfig

/// One plus the max log index on the following node that might match the leader log.
pub(crate) searching_end: u64,

/// If true, reset the progress, by setting [`Self::matching`] to `None`, when the follower's
/// log is found reverted to an early state.
///
/// This allows the target node to clean its data and wait for the leader to replicate all data
/// to it.
///
/// This flag will be cleared after the progress entry is reset.
pub(crate) reset_on_reversion: bool,
}

impl<C> ProgressEntry<C>
Expand All @@ -40,6 +49,7 @@ where C: RaftTypeConfig
matching: matching.clone(),
inflight: Inflight::None,
searching_end: matching.next_index(),
reset_on_reversion: false,
}
}

Expand All @@ -51,6 +61,7 @@ where C: RaftTypeConfig
matching: None,
inflight: Inflight::None,
searching_end: end,
reset_on_reversion: false,
}
}

Expand Down Expand Up @@ -117,8 +128,14 @@ where C: RaftTypeConfig
//
// - If log reversion is allowed, just restart the binary search from the beginning.
// - Otherwise, panic it.
{
#[cfg(feature = "loosen-follower-log-revert")]

let allow_reset = if cfg!(feature = "loosen-follower-log-revert") {
true
} else {
self.reset_on_reversion
};

if allow_reset {
if conflict < self.matching.next_index() {
tracing::warn!(
"conflict {} < last matching {}: follower log is reverted; with 'loosen-follower-log-revert' enabled, this is allowed.",
Expand All @@ -127,8 +144,9 @@ where C: RaftTypeConfig
);

self.matching = None;
self.reset_on_reversion = false;
}

} else {
debug_assert!(
conflict >= self.matching.next_index(),
"follower log reversion is not allowed \
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ where C: RaftTypeConfig
&self.inner.config
}

/// Return a handle to manually trigger raft actions, such as elect or build snapshot.
/// Return a [`Trigger`] handle to manually trigger raft actions, such as elect or build
/// snapshot.
///
/// Example:
/// ```ignore
Expand Down
61 changes: 61 additions & 0 deletions openraft/src/raft/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Trigger an action to RaftCore by external caller.
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::error::AllowNextRevertError;
use crate::error::Fatal;
use crate::raft::RaftInner;
use crate::type_config::TypeConfigExt;
use crate::RaftTypeConfig;

/// Trigger is an interface to trigger an action to RaftCore by external caller.
Expand Down Expand Up @@ -86,4 +88,63 @@ where C: RaftTypeConfig
.send_external_command(ExternalCommand::TriggerTransferLeader { to }, "transfer_leader")
.await
}

/// Request the RaftCore to allow to reset replication for a specific node when log revert is
/// detected.
///
/// - `allow=true`: This method instructs the RaftCore to allow the target node's log to revert
/// to a previous state for one time.
/// - `allow=false`: This method instructs the RaftCore to panic if the target node's log revert
///
/// This method returns [`Fatal`] error if failed to send the request to RaftCore, e.g. when
/// RaftCore is shut down.
/// Otherwise, it returns a `Ok(Result<_,_>)`, the inner result is:
/// - `Ok(())` if the request is successfully processed,
/// - or `Err(AllowNextRevertError)` explaining why the request is rejected.
///
/// ### Behavior
///
/// - If this node is the Leader, it will attempt to replicate logs to the target node from the
/// beginning.
/// - If this node is not the Leader, the request is ignored.
/// - If the target node is not found, the request is ignored.
///
/// ### Automatic Replication Reset
///
/// When the [`loosen-follower-log-revert`](`crate::docs::feature_flags#
/// feature-flag-loosen-follower-log-revert) feature flag is enabled, the Leader automatically
/// reset replication if it detects that the target node's log has reverted. This
/// feature is primarily useful in testing environments.
///
/// ### Production Considerations
///
/// In production environments, state reversion is a critical issue that should not be
/// automatically handled. However, there may be scenarios where a Follower's data is
/// intentionally removed and needs to rejoin the cluster(without membership changes). In such
/// cases, the Leader should reinitialize replication for that node with the following steps:
/// - Shut down the target node.
/// - call [`Self::allow_next_revert`] on the Leader.
/// - Clear the target node's data directory.
/// - Restart the target node.
pub async fn allow_next_revert(
&self,
to: &C::NodeId,
allow: bool,
) -> Result<Result<(), AllowNextRevertError<C>>, Fatal<C>> {
let (tx, rx) = C::oneshot();
self.raft_inner
.send_external_command(
ExternalCommand::AllowNextRevert {
to: to.clone(),
allow,
tx,
},
func_name!(),
)
.await?;

let res: Result<(), AllowNextRevertError<C>> = self.raft_inner.recv_msg(rx).await?;

Ok(res)
}
}
Loading

0 comments on commit 47e0220

Please sign in to comment.