Skip to content

Commit

Permalink
Refactor: move state machine Handle and Worker into separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Mar 25, 2024
1 parent 29cefe5 commit bb75f7b
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 228 deletions.
3 changes: 2 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::core::raft_msg::RaftMsg;
use crate::core::raft_msg::ResultSender;
use crate::core::raft_msg::VoteTx;
use crate::core::sm;
use crate::core::sm::handle;
use crate::core::sm::CommandSeq;
use crate::core::ServerState;
use crate::display_ext::DisplayOption;
Expand Down Expand Up @@ -175,7 +176,7 @@ where
pub(crate) log_store: LS,

/// A controlling handle to the [`RaftStateMachine`] worker.
pub(crate) sm_handle: sm::Handle<C>,
pub(crate) sm_handle: handle::Handle<C>,

pub(crate) engine: Engine<C>,

Expand Down
26 changes: 26 additions & 0 deletions openraft/src/core/sm/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! State machine control handle
use tokio::sync::mpsc;

use crate::alias::JoinHandleOf;
use crate::core::sm::Command;
use crate::RaftTypeConfig;

/// State machine worker handle for sending command to it.
pub(crate) struct Handle<C>
where C: RaftTypeConfig
{
pub(in crate::core::sm) cmd_tx: mpsc::UnboundedSender<Command<C>>,

#[allow(dead_code)]
pub(in crate::core::sm) join_handle: JoinHandleOf<C, ()>,
}

impl<C> Handle<C>
where C: RaftTypeConfig
{
pub(crate) fn send(&mut self, cmd: Command<C>) -> Result<(), mpsc::error::SendError<Command<C>>> {
tracing::debug!("sending command to state machine worker: {:?}", cmd);
self.cmd_tx.send(cmd)
}
}
224 changes: 2 additions & 222 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,233 +4,13 @@
//! It is responsible for applying log entries, building/receiving snapshot and sending responses
//! to the RaftCore.
use tokio::sync::mpsc;

use crate::async_runtime::AsyncOneshotSendExt;
use crate::core::ApplyResult;
use crate::core::ApplyingEntry;
use crate::entry::RaftPayload;
use crate::storage::RaftStateMachine;
use crate::summary::MessageSummary;
use crate::AsyncRuntime;
use crate::RaftLogId;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::StorageError;

pub(crate) mod command;
pub(crate) mod handle;
pub(crate) mod response;
pub(crate) mod worker;

pub(crate) use command::Command;
pub(crate) use command::CommandPayload;
#[allow(unused_imports)] pub(crate) use command::CommandSeq;
pub(crate) use response::CommandResult;
pub(crate) use response::Response;

use crate::core::notify::Notify;
use crate::core::raft_msg::ResultSender;
use crate::type_config::alias::JoinHandleOf;

/// State machine worker handle for sending command to it.
pub(crate) struct Handle<C>
where C: RaftTypeConfig
{
cmd_tx: mpsc::UnboundedSender<Command<C>>,
#[allow(dead_code)]
join_handle: JoinHandleOf<C, ()>,
}

impl<C> Handle<C>
where C: RaftTypeConfig
{
pub(crate) fn send(&mut self, cmd: Command<C>) -> Result<(), mpsc::error::SendError<Command<C>>> {
tracing::debug!("sending command to state machine worker: {:?}", cmd);
self.cmd_tx.send(cmd)
}
}

pub(crate) struct Worker<C, SM>
where
C: RaftTypeConfig,
SM: RaftStateMachine<C>,
{
state_machine: SM,

cmd_rx: mpsc::UnboundedReceiver<Command<C>>,

resp_tx: mpsc::UnboundedSender<Notify<C>>,
}

impl<C, SM> Worker<C, SM>
where
C: RaftTypeConfig,
SM: RaftStateMachine<C>,
{
/// Spawn a new state machine worker, return a controlling handle.
pub(crate) fn spawn(state_machine: SM, resp_tx: mpsc::UnboundedSender<Notify<C>>) -> Handle<C> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();

let worker = Worker {
state_machine,
cmd_rx,
resp_tx,
};

let join_handle = worker.do_spawn();

Handle { cmd_tx, join_handle }
}

fn do_spawn(mut self) -> JoinHandleOf<C, ()> {
C::AsyncRuntime::spawn(async move {
let res = self.worker_loop().await;

if let Err(err) = res {
tracing::error!("{} while execute state machine command", err,);

let _ = self.resp_tx.send(Notify::StateMachine {
command_result: CommandResult {
command_seq: 0,
result: Err(err),
},
});
}
})
}

#[tracing::instrument(level = "debug", skip_all)]
async fn worker_loop(&mut self) -> Result<(), StorageError<C::NodeId>> {
loop {
let cmd = self.cmd_rx.recv().await;
let cmd = match cmd {
None => {
tracing::info!("{}: rx closed, state machine worker quit", func_name!());
return Ok(());
}
Some(x) => x,
};

tracing::debug!("{}: received command: {:?}", func_name!(), cmd);

match cmd.payload {
CommandPayload::BuildSnapshot => {
tracing::info!("{}: build snapshot", func_name!());

// It is a read operation and is spawned, and it responds in another task
self.build_snapshot(cmd.seq, self.resp_tx.clone()).await;
}
CommandPayload::GetSnapshot { tx } => {
tracing::info!("{}: get snapshot", func_name!());

self.get_snapshot(tx).await?;
// GetSnapshot does not respond to RaftCore
}
CommandPayload::InstallFullSnapshot { snapshot } => {
tracing::info!("{}: install complete snapshot", func_name!());

let meta = snapshot.meta.clone();
self.state_machine.install_snapshot(&meta, snapshot.snapshot).await?;

tracing::info!("Done install complete snapshot, meta: {}", meta);

let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(Some(meta))));
let _ = self.resp_tx.send(Notify::sm(res));
}
CommandPayload::BeginReceivingSnapshot { tx } => {
tracing::info!("{}: BeginReceivingSnapshot", func_name!());

let snapshot_data = self.state_machine.begin_receiving_snapshot().await?;

let _ = tx.send(Ok(snapshot_data));
// No response to RaftCore
}
CommandPayload::Apply { entries } => {
let resp = self.apply(entries).await?;
let res = CommandResult::new(cmd.seq, Ok(Response::Apply(resp)));
let _ = self.resp_tx.send(Notify::sm(res));
}
};
}
}
#[tracing::instrument(level = "debug", skip_all)]
async fn apply(&mut self, entries: Vec<C::Entry>) -> Result<ApplyResult<C>, StorageError<C::NodeId>> {
// TODO: prepare response before apply_to_state_machine,
// so that an Entry does not need to be Clone,
// and no references will be used by apply_to_state_machine

let since = entries.first().map(|x| x.get_log_id().index).unwrap();
let end = entries.last().map(|x| x.get_log_id().index + 1).unwrap();
let last_applied = entries.last().map(|x| *x.get_log_id()).unwrap();

// Fake complain: avoid using `collect()` when not needed
#[allow(clippy::needless_collect)]
let applying_entries = entries
.iter()
.map(|e| ApplyingEntry::new(*e.get_log_id(), e.get_membership().cloned()))
.collect::<Vec<_>>();

let n_entries = applying_entries.len();

let apply_results = self.state_machine.apply(entries).await?;

let n_replies = apply_results.len();

debug_assert_eq!(
n_entries, n_replies,
"n_entries: {} should equal n_replies: {}",
n_entries, n_replies
);

let resp = ApplyResult {
since,
end,
last_applied,
applying_entries,
apply_results,
};

Ok(resp)
}

/// Build a snapshot from the state machine.
///
/// Building snapshot is a read-only operation, so it can be run in another task in parallel.
/// This parallelization depends on the [`RaftSnapshotBuilder`] implementation returned by
/// [`get_snapshot_builder()`](`RaftStateMachine::get_snapshot_builder()`): The builder must:
/// - hold a consistent view of the state machine that won't be affected by further writes such
/// as applying a log entry,
/// - or it must be able to acquire a lock that prevents any write operations.
#[tracing::instrument(level = "info", skip_all)]
async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: mpsc::UnboundedSender<Notify<C>>) {
// TODO: need to be abortable?
// use futures::future::abortable;
// let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await });

tracing::info!("{}", func_name!());

let mut builder = self.state_machine.get_snapshot_builder().await;

let _handle = C::AsyncRuntime::spawn(async move {
let res = builder.build_snapshot().await;
let res = res.map(|snap| Response::BuildSnapshot(snap.meta));
let cmd_res = CommandResult::new(seq, res);
let _ = resp_tx.send(Notify::sm(cmd_res));
});
tracing::info!("{} returning; spawned building snapshot task", func_name!());
}

#[tracing::instrument(level = "info", skip_all)]
async fn get_snapshot(&mut self, tx: ResultSender<C, Option<Snapshot<C>>>) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("{}", func_name!());

let snapshot = self.state_machine.get_current_snapshot().await?;

tracing::info!(
"sending back snapshot: meta: {:?}",
snapshot.as_ref().map(|s| s.meta.summary())
);
let _ = tx.send(Ok(snapshot));
Ok(())
}
}
Loading

0 comments on commit bb75f7b

Please sign in to comment.