Skip to content

Commit

Permalink
Change: refine the RaftEntry trait
Browse files Browse the repository at this point in the history
- The `RaftEntry` trait now requires `AsRef<LogIdOf<C>>` and
  `AsMut<LogIdOf<C>>`, providing a more standard API for accessing the
  log ID of a log entry. As a result, the `RaftEntry: RaftLogId`
  requirement is no longer needed.

- A new method, `new_normal()`, has been added to the `RaftEntry` trait
  to replace the `FromAppData` trait.

- Additional utility methods for working with entries are now provided
  in the `RaftEntryExt` trait.

Upgrade tips:

1. **For applications using a custom `RaftEntry` implementation** (e.g.,
   declared with `declare_raft_types!(MyTypes: Entry = MyEntry)`):
   - Update the `RaftEntry` implementation for your custom entry type
     (`MyEntry`) by adding the `new_normal()` method.
   - Implement `AsRef<LogId>` and `AsMut<LogId>` for your custom entry
     type.

2. **For applications using the default `Entry` provided by OpenRaft**:
   - No changes are required.
  • Loading branch information
drmingdrmer committed Jan 8, 2025
1 parent d4b4b02 commit 4be3038
Show file tree
Hide file tree
Showing 20 changed files with 146 additions and 101 deletions.
6 changes: 3 additions & 3 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::ops::RangeBounds;
use std::sync::Arc;

use openraft::alias::VoteOf;
use openraft::entry::RaftEntryExt;
use openraft::storage::IOFlushed;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
}

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone());
let last = self.log.iter().next_back().map(|(_, ent)| ent.to_log_id());

let last_purged = self.last_purged_log_id.clone();

Expand Down Expand Up @@ -97,7 +97,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.get_log_id().index, entry);
self.log.insert(entry.to_log_id().index, entry);
}
callback.io_completed(Ok(()));

Expand Down
7 changes: 3 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::engine::Condition;
use crate::engine::Engine;
use crate::engine::ReplicationProgress;
use crate::engine::Respond;
use crate::entry::FromAppData;
use crate::entry::traits::RaftEntryExt;
use crate::entry::RaftEntry;
use crate::error::AllowNextRevertError;
use crate::error::ClientWriteError;
Expand All @@ -55,7 +55,6 @@ use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
Expand Down Expand Up @@ -1246,7 +1245,7 @@ where
self.handle_check_is_leader_request(tx).await;
}
RaftMsg::ClientWriteRequest { app_data, tx } => {
self.write_entry(C::Entry::from_app_data(app_data), Some(tx));
self.write_entry(C::Entry::new_normal(Default::default(), app_data), Some(tx));
}
RaftMsg::Initialize { members, tx } => {
tracing::info!(
Expand Down Expand Up @@ -1746,7 +1745,7 @@ where
committed_vote: vote,
entries,
} => {
let last_log_id = entries.last().unwrap().get_log_id();
let last_log_id = entries.last().unwrap().log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

let io_id = IOId::new_log_io(vote, Some(last_log_id.clone()));
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::core::ApplyResult;
use crate::core::ApplyingEntry;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySliceExt;
use crate::entry::traits::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::storage::RaftStateMachine;
use crate::storage::Snapshot;
Expand All @@ -23,7 +24,6 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftLogId;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -186,7 +186,7 @@ where
#[allow(clippy::needless_collect)]
let applying_entries = entries
.iter()
.map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership().cloned()))
.map(|e| ApplyingEntry::new(e.to_log_id(), e.get_membership().cloned()))
.collect::<Vec<_>>();

let n_entries = end - since;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineOutput;
use crate::engine::Respond;
use crate::entry::traits::RaftEntryExt;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::error::ForwardToLeader;
Expand Down Expand Up @@ -57,7 +58,6 @@ use crate::vote::RaftVote;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftLogId;
use crate::RaftTypeConfig;

/// Raft protocol algorithm.
Expand Down
17 changes: 7 additions & 10 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::traits::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::error::RejectAppendEntries;
use crate::raft_state::IOId;
Expand All @@ -20,7 +21,6 @@ use crate::vote::committed::CommittedVote;
use crate::EffectiveMembership;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::StoredMembership;
Expand Down Expand Up @@ -69,10 +69,10 @@ where C: RaftTypeConfig
);

if let Some(x) = entries.first() {
debug_assert!(x.get_log_id().index == prev_log_id.next_index());
debug_assert!(x.index() == prev_log_id.next_index());
}

let last_log_id = entries.last().map(|x| x.get_log_id().clone());
let last_log_id = entries.last().map(|x| x.to_log_id());
let last_log_id = std::cmp::max(prev_log_id, last_log_id);

let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone()));
Expand All @@ -84,7 +84,7 @@ where C: RaftTypeConfig
// the entries after it has to be deleted first.
// Raft requires log ids are in total order by (term,index).
// Otherwise the log id with max index makes committed entry invisible in election.
self.truncate_logs(entries[since].get_log_id().index);
self.truncate_logs(entries[since].index());

let entries = entries.split_off(since);
self.do_append_entries(entries);
Expand Down Expand Up @@ -143,11 +143,8 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self, entries))]
pub(crate) fn do_append_entries(&mut self, entries: Vec<C::Entry>) {
debug_assert!(!entries.is_empty());
debug_assert_eq!(
entries[0].get_log_id().index,
self.state.log_ids.last().cloned().next_index(),
);
debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last());
debug_assert_eq!(entries[0].index(), self.state.log_ids.last().cloned().next_index(),);
debug_assert!(Some(entries[0].log_id()) > self.state.log_ids.last());

self.state.extend_log_ids(&entries);
self.append_membership(entries.iter());
Expand Down Expand Up @@ -343,7 +340,7 @@ where C: RaftTypeConfig
// Find the last 2 membership config entries: the committed and the effective.
for ent in entries.rev() {
if let Some(m) = ent.get_membership() {
memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m.clone()));
memberships.insert(0, StoredMembership::new(Some(ent.to_log_id()), m.clone()));
if memberships.len() == 2 {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::traits::RaftEntryExt;
use crate::entry::RaftPayload;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
Expand All @@ -10,7 +11,6 @@ use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::LogIdOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;

Expand Down Expand Up @@ -67,7 +67,7 @@ where C: RaftTypeConfig
membership_entry.is_none(),
"only one membership entry is allowed in a batch"
);
membership_entry = Some((entry.get_log_id().clone(), m.clone()));
membership_entry = Some((entry.to_log_id(), m.clone()));
}
}

Expand Down
12 changes: 6 additions & 6 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ where C: RaftTypeConfig
/// Extends a list of `log_id` that are proposed by a same leader.
///
/// The log ids in the input has to be continuous.
pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId<C> + 'a>(&mut self, new_ids: &[LID]) {
pub(crate) fn extend_from_same_leader<'a, LID: AsRef<LogIdOf<C>> + 'a>(&mut self, new_ids: &[LID]) {
if let Some(first) = new_ids.first() {
let first_id = first.get_log_id();
let first_id = first.as_ref();
self.append(first_id.clone());

if let Some(last) = new_ids.last() {
let last_id = last.get_log_id();
let last_id = last.as_ref();
assert_eq!(last_id.leader_id, first_id.leader_id);

if last_id != first_id {
Expand All @@ -150,11 +150,11 @@ where C: RaftTypeConfig
/// Extends a list of `log_id`.
// leader_id: Copy is feature gated
#[allow(clippy::clone_on_copy)]
pub(crate) fn extend<'a, LID: RaftLogId<C> + 'a>(&mut self, new_ids: &[LID]) {
pub(crate) fn extend<'a, LID: AsRef<LogIdOf<C>> + 'a>(&mut self, new_ids: &[LID]) {
let mut prev = self.last().map(|x| x.leader_id.clone());

for x in new_ids.iter() {
let log_id = x.get_log_id();
let log_id = x.as_ref();

if prev.as_ref() != Some(&log_id.leader_id) {
self.append(log_id.clone());
Expand All @@ -164,7 +164,7 @@ where C: RaftTypeConfig
}

if let Some(last) = new_ids.last() {
let log_id = last.get_log_id();
let log_id = last.as_ref();

if self.last() != Some(log_id) {
self.append(log_id.clone());
Expand Down
40 changes: 21 additions & 19 deletions openraft/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
use std::fmt;
use std::fmt::Debug;

use crate::log_id::RaftLogId;
use crate::LogId;
use crate::Membership;
use crate::RaftTypeConfig;

pub mod payload;
mod traits;
pub(crate) mod traits;

pub use payload::EntryPayload;
pub use traits::FromAppData;
pub use traits::RaftEntry;
pub use traits::RaftEntryExt;
pub use traits::RaftPayload;

use crate::type_config::alias::AppDataOf;
use crate::type_config::alias::LogIdOf;

/// A Raft log entry.
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct Entry<C>
Expand Down Expand Up @@ -97,43 +99,43 @@ where C: RaftTypeConfig
}
}

impl<C> RaftLogId<C> for Entry<C>
impl<C> AsRef<LogIdOf<C>> for Entry<C>
where C: RaftTypeConfig
{
fn get_log_id(&self) -> &LogId<C> {
fn as_ref(&self) -> &LogIdOf<C> {
&self.log_id
}
}

fn set_log_id(&mut self, log_id: &LogId<C>) {
self.log_id = log_id.clone();
impl<C> AsMut<LogIdOf<C>> for Entry<C>
where C: RaftTypeConfig
{
fn as_mut(&mut self) -> &mut LogIdOf<C> {
&mut self.log_id
}
}

impl<C> RaftEntry<C> for Entry<C>
where C: RaftTypeConfig
{
fn new_blank(log_id: LogId<C>) -> Self {
fn new_blank(log_id: LogIdOf<C>) -> Self {
Self {
log_id,
payload: EntryPayload::Blank,
}
}

fn new_membership(log_id: LogId<C>, m: Membership<C>) -> Self {
Self {
fn new_normal(log_id: LogIdOf<C>, data: AppDataOf<C>) -> Self {
Entry {
log_id,
payload: EntryPayload::Membership(m),
payload: EntryPayload::Normal(data),
}
}
}

impl<C> FromAppData<C::D> for Entry<C>
where C: RaftTypeConfig
{
fn from_app_data(d: C::D) -> Self {
Entry {
log_id: LogId::default(),
payload: EntryPayload::Normal(d),
fn new_membership(log_id: LogIdOf<C>, m: Membership<C>) -> Self {
Self {
log_id,
payload: EntryPayload::Membership(m),
}
}
}
54 changes: 43 additions & 11 deletions openraft/src/entry/traits.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::fmt::Debug;
use std::fmt::Display;

use crate::base::OptionalFeatures;
use crate::log_id::RaftLogId;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::LogIdOf;
use crate::LogId;
use crate::Membership;
use crate::OptionalSend;
use crate::OptionalSerde;
use crate::OptionalSync;
use crate::RaftTypeConfig;

/// Defines operations on an entry payload.
Expand All @@ -21,26 +21,58 @@ where C: RaftTypeConfig
}

/// Defines operations on an entry.
pub trait RaftEntry<C>: RaftPayload<C> + RaftLogId<C>
pub trait RaftEntry<C>
where
C: RaftTypeConfig,
Self: OptionalSerde + Debug + Display + OptionalSend + OptionalSync,
Self: OptionalFeatures + Debug + Display,
Self: RaftPayload<C> + AsRef<LogIdOf<C>> + AsMut<LogIdOf<C>>,
{
/// Create a new blank log entry.
///
/// The returned instance must return `true` for `Self::is_blank()`.
fn new_blank(log_id: LogId<C>) -> Self;

/// Create a new normal log entry that contains application data.
fn new_normal(log_id: LogId<C>, data: C::D) -> Self;

/// Create a new membership log entry.
///
/// The returned instance must return `Some()` for `Self::get_membership()`.
fn new_membership(log_id: LogId<C>, m: Membership<C>) -> Self;
}

/// Build a raft log entry from app data.
///
/// A concrete Entry should implement this trait to let openraft create an entry when needed.
pub trait FromAppData<T> {
/// Build a raft log entry from app data.
fn from_app_data(t: T) -> Self;
pub trait RaftEntryExt<C>: RaftEntry<C>
where C: RaftTypeConfig
{
fn log_id(&self) -> &LogId<C> {
AsRef::<LogIdOf<C>>::as_ref(self)
}

fn to_log_id(&self) -> LogId<C> {
self.log_id().clone()
}

fn committed_leader_id(&self) -> &CommittedLeaderIdOf<C> {
AsRef::<LogIdOf<C>>::as_ref(self).leader_id()
}

fn to_committed_leader_id(&self) -> CommittedLeaderIdOf<C> {
self.committed_leader_id().clone()
}

fn index(&self) -> u64 {
AsRef::<LogIdOf<C>>::as_ref(self).index
}

fn set_log_id(&mut self, new: &LogId<C>) {
let log_id = AsMut::<LogIdOf<C>>::as_mut(self);
*log_id = new.clone();
}
}

impl<C, T> RaftEntryExt<C> for T
where
C: RaftTypeConfig,
T: RaftEntry<C>,
{
}
Loading

0 comments on commit 4be3038

Please sign in to comment.