Skip to content

Commit

Permalink
Refactor: Explicitly store Heartbeat action
Browse files Browse the repository at this point in the history
Prior to this commit, the `ReplicationCore` being awakened without any
specific action in `ReplicationCore.next_action` implied a heartbeat was
to be sent. This approach was cryptic and the intent behind the action
was not clear.

In this commit, we have introduced explicitness to the process: if
`ReplicationCore` is intended to send a heartbeat, a distinct
`Data::Heartbeat` variant is now stored in `next_action`.

This change ensures clarity by explicitly representing heartbeat
actions, removing any ambiguity from the replication logic.
  • Loading branch information
drmingdrmer committed Feb 11, 2024
1 parent c1aa1b5 commit c958647
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
39 changes: 23 additions & 16 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ where
repl_id = d.request_id();

match d {
Data::Heartbeat => {
let m = &self.matching;
// request_id==None will be ignored by RaftCore.
let d = DataWithId::new(None, LogIdRange::new(*m, *m));

log_data = Some(d.clone());
self.send_log_entries(d).await
}
Data::Logs(log) => {
log_data = Some(log.clone());
self.send_log_entries(log).await
Expand Down Expand Up @@ -610,17 +618,9 @@ where
self.process_event(event);
}

self.try_drain_events().await?;

// No action filled after all events are processed, fill in an action to send committed
// index.
if self.next_action.is_none() {
let m = &self.matching;
// Returning from process_event(), next_action is never None.

// empty message, just for syncing the committed index
// request_id==None will be ignored by RaftCore.
self.next_action = Some(Data::new_logs(None, LogIdRange::new(*m, *m)));
}
self.try_drain_events().await?;

Ok(())
}
Expand Down Expand Up @@ -664,18 +664,25 @@ where
);

self.committed = c;

// If there is no action, fill in an heartbeat action to send committed index.
if self.next_action.is_none() {
self.next_action = Some(Data::new_heartbeat());
}
}
Replicate::Heartbeat => {
// Nothing to do. Heartbeat message is just for waking up replication to send
// something: When all messages are drained,
// - if self.next_action is None, it sends an empty AppendEntries request as
// heartbeat.
//- If self.next_action is not None, next_action will serve as a heartbeat.
// Never overwrite action with payload.
if self.next_action.is_none() {
self.next_action = Some(Data::new_heartbeat());
}
}
Replicate::Data(d) => {
// TODO: Currently there is at most 1 in flight data. But in future RaftCore may send next data
// actions without waiting for the previous to finish.
debug_assert!(self.next_action.is_none(), "there can not be two data action in flight");
debug_assert!(
!self.next_action.as_ref().map(|d| d.has_payload()).unwrap_or(false),
"there can not be two actions with payload in flight"
);
self.next_action = Some(d);
}
}
Expand Down
21 changes: 21 additions & 0 deletions openraft/src/replication/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::Snapshot;
pub(crate) enum Data<C>
where C: RaftTypeConfig
{
Heartbeat,
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<ResultReceiver<Option<Snapshot<C>>>>),
}
Expand All @@ -67,6 +68,9 @@ where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Data::Heartbeat => {
write!(f, "Data::Heartbeat")
}
Self::Logs(l) => f
.debug_struct("Data::Logs")
.field("request_id", &l.request_id())
Expand All @@ -80,6 +84,9 @@ where C: RaftTypeConfig
impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Data::Heartbeat => {
write!(f, "Heartbeat")
}
Self::Logs(l) => {
write!(
f,
Expand All @@ -106,6 +113,10 @@ where C: RaftTypeConfig
impl<C> Data<C>
where C: RaftTypeConfig
{
pub(crate) fn new_heartbeat() -> Self {
Self::Heartbeat
}

pub(crate) fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Logs(DataWithId::new(request_id, log_id_range))
}
Expand All @@ -116,10 +127,20 @@ where C: RaftTypeConfig

pub(crate) fn request_id(&self) -> Option<u64> {
match self {
Self::Heartbeat => None,
Self::Logs(l) => l.request_id(),
Self::Snapshot(s) => s.request_id(),
}
}

/// Return true if the data includes any payload, i.e., not a heartbeat.
pub(crate) fn has_payload(&self) -> bool {
match self {
Self::Heartbeat => false,
Self::Logs(_) => true,
Self::Snapshot(_) => true,
}
}
}

#[derive(Clone)]
Expand Down

0 comments on commit c958647

Please sign in to comment.