Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Abstract Term #1284

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ mod tests {
type R = ();
type NodeId = u64;
type Node = ();
type Term = u64;
type Entry = crate::Entry<TickUTConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::storage::SnapshotMeta;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_term::RaftTerm;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
Expand Down Expand Up @@ -208,7 +209,7 @@ where C: RaftTypeConfig
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let new_term = self.state.vote.leader_id().term + 1;
let new_term = self.state.vote.leader_id().term.next();
let new_vote = Vote::new(new_term, self.config.id.clone());

let candidate = self.new_candidate(new_vote.clone());
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where N: Node + Ord
type NodeId = u64;
type Node = N;
type Entry = crate::Entry<Self>;
type Term = u64;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ mod runtime;
mod storage_error;
mod summary;
mod try_as_ref;
mod vote;

pub(crate) mod engine;
pub(crate) mod log_id_range;
Expand All @@ -70,6 +69,7 @@ pub mod raft;
pub mod storage;
pub mod testing;
pub mod type_config;
pub mod vote;

#[cfg(test)]
mod feature_serde_test;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::Vote;
pub enum Metric<C>
where C: RaftTypeConfig
{
Term(u64),
Term(C::Term),
Vote(Vote<C>),
LastLogIndex(Option<u64>),
Applied(Option<LogId<C>>),
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
// --- data ---
// ---
/// The current term of the Raft node.
pub current_term: u64,
pub current_term: C::Term,

/// The last flushed vote.
pub vote: Vote<C>,
Expand Down Expand Up @@ -167,7 +167,7 @@ where C: RaftTypeConfig
running_state: Ok(()),
id,

current_term: 0,
current_term: Default::default(),
vote: Vote::default(),
last_log_index: None,
last_applied: None,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ where C: RaftTypeConfig {
running_state: Ok(()),
id: NodeIdOf::<C>::default(),
state: ServerState::Learner,
current_term: 0,
current_term: Default::default(),
vote: Vote::default(),
last_log_index: None,
last_applied: None,
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ use crate::Vote;
/// R = ClientResponse,
/// NodeId = u64,
/// Node = openraft::BasicNode,
/// Term = u64,
/// Entry = openraft::Entry<TypeConfig>,
/// SnapshotData = Cursor<Vec<u8>>,
/// Responder = openraft::impls::OneshotResponder<TypeConfig>,
Expand Down Expand Up @@ -172,8 +173,9 @@ macro_rules! declare_raft_types {
(R , , String ),
(NodeId , , u64 ),
(Node , , $crate::impls::BasicNode ),
(Term , , u64 ),
(Entry , , $crate::impls::Entry<Self> ),
(SnapshotData , , std::io::Cursor<Vec<u8>> ),
(SnapshotData , , std::io::Cursor<Vec<u8>> ),
(Responder , , $crate::impls::OneshotResponder<Self> ),
(AsyncRuntime , , $crate::impls::TokioRuntime ),
);
Expand Down
18 changes: 13 additions & 5 deletions openraft/src/testing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@ use crate::RaftTypeConfig;

/// Builds a log id, for testing purposes.
pub fn log_id<C>(term: u64, node_id: C::NodeId, index: u64) -> LogId<C>
where C: RaftTypeConfig {
where
C: RaftTypeConfig,
C::Term: From<u64>,
{
LogId::<C> {
leader_id: CommittedLeaderId::new(term, node_id),
leader_id: CommittedLeaderId::new(term.into(), node_id),
index,
}
}

/// Create a blank log entry for test.
pub fn blank_ent<C: RaftTypeConfig>(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry<C> {
crate::Entry::<C>::new_blank(LogId::new(CommittedLeaderId::new(term, node_id), index))
pub fn blank_ent<C>(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry<C>
where
C: RaftTypeConfig,
C::Term: From<u64>,
{
crate::Entry::<C>::new_blank(log_id(term, node_id, index))
}

/// Create a membership log entry without learner config for test.
Expand All @@ -29,10 +36,11 @@ pub fn membership_ent<C: RaftTypeConfig>(
config: Vec<BTreeSet<C::NodeId>>,
) -> crate::Entry<C>
where
C::Term: From<u64>,
C::Node: Default,
{
crate::Entry::new_membership(
LogId::new(CommittedLeaderId::new(term, node_id), index),
log_id(term, node_id, index),
crate::Membership::new_with_defaults(config, []),
)
}
46 changes: 30 additions & 16 deletions openraft/src/testing/log/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ where
C: RaftTypeConfig,
C::D: Debug,
C::R: Debug,
C::Term: From<u64>,
C::NodeId: From<u64>,
C::Node: Default,
LS: RaftLogStorage<C>,
Expand Down Expand Up @@ -499,7 +500,7 @@ where
"unexpected value for last applied log"
);
assert_eq!(
Vote::new(1, NODE_ID.into()),
Vote::new(1u64.into(), NODE_ID.into()),
*initial.vote_ref(),
"unexpected value for default hard state"
);
Expand Down Expand Up @@ -632,8 +633,8 @@ where
}

pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError<C>> {
let log_id = |t, n: u64, i| LogId::<C> {
leader_id: CommittedLeaderId::new(t, n.into()),
let log_id = |t: u64, n: u64, i| LogId::<C> {
leader_id: CommittedLeaderId::<C>::new(t.into(), n.into()),
index: i,
};

Expand Down Expand Up @@ -820,11 +821,11 @@ where
}

pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError<C>> {
store.save_vote(&Vote::new(100, NODE_ID.into())).await?;
store.save_vote(&Vote::new(100.into(), NODE_ID.into())).await?;

let got = store.read_vote().await?;

assert_eq!(Some(Vote::new(100, NODE_ID.into())), got,);
assert_eq!(Some(Vote::new(100.into(), NODE_ID.into())), got,);
Ok(())
}

Expand Down Expand Up @@ -873,7 +874,7 @@ where
pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError<C>> {
Self::feed_10_logs_vote_self(&mut store).await?;

store.purge(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)).await?;
store.purge(log_id(0, 0, 0)).await?;

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
Expand Down Expand Up @@ -923,10 +924,7 @@ where
store.purge(log_id_0(0, 0)).await?;

let st = store.get_log_state().await?;
assert_eq!(
Some(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)),
st.last_purged_log_id
);
assert_eq!(Some(log_id(0, 0, 0)), st.last_purged_log_id);
assert_eq!(Some(log_id_0(1, 2)), st.last_log_id);
}

Expand Down Expand Up @@ -1368,33 +1366,37 @@ where
}

pub async fn default_vote(sto: &mut LS) -> Result<(), StorageError<C>> {
sto.save_vote(&Vote::new(1, NODE_ID.into())).await?;
sto.save_vote(&Vote::new(1u64.into(), NODE_ID.into())).await?;

Ok(())
}
}

/// Create a log id with node id 0 for testing.
fn log_id_0<C>(term: u64, index: u64) -> LogId<C>
fn log_id_0<C>(term: impl Into<C::Term>, index: u64) -> LogId<C>
where
C: RaftTypeConfig,
C::NodeId: From<u64>,
{
LogId {
leader_id: CommittedLeaderId::new(term, NODE_ID.into()),
leader_id: CommittedLeaderId::new(term.into(), NODE_ID.into()),
index,
}
}

/// Create a blank log entry with node_id 0 for test.
fn blank_ent_0<C: RaftTypeConfig>(term: u64, index: u64) -> C::Entry
where C::NodeId: From<u64> {
C::Entry::new_blank(log_id_0(term, index))
where
C::Term: From<u64>,
C::NodeId: From<u64>,
{
C::Entry::new_blank(log_id(term, 0, index))
}

/// Create a membership entry with node_id 0 for test.
fn membership_ent_0<C: RaftTypeConfig>(term: u64, index: u64, bs: BTreeSet<C::NodeId>) -> C::Entry
fn membership_ent_0<C>(term: impl Into<C::Term>, index: u64, bs: BTreeSet<C::NodeId>) -> C::Entry
where
C: RaftTypeConfig,
C::NodeId: From<u64>,
C::Node: Default,
{
Expand Down Expand Up @@ -1453,3 +1455,15 @@ where
}
Ok(())
}

fn log_id<C>(term: u64, node_id: u64, index: u64) -> LogId<C>
where
C: RaftTypeConfig,
C::Term: From<u64>,
C::NodeId: From<u64>,
{
LogId {
leader_id: CommittedLeaderId::new(term.into(), node_id.into()),
index,
}
}
13 changes: 13 additions & 0 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use util::TypeConfigExt;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::raft::responder::Responder;
use crate::vote::raft_term::RaftTerm;
use crate::AppData;
use crate::AppDataResponse;
use crate::Node;
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::OptionalSync;
/// R = ClientResponse,
/// NodeId = u64,
/// Node = openraft::BasicNode,
/// Term = u64,
/// Entry = openraft::Entry<TypeConfig>,
/// SnapshotData = Cursor<Vec<u8>>,
/// AsyncRuntime = openraft::TokioRuntime,
Expand All @@ -67,6 +69,16 @@ pub trait RaftTypeConfig:
/// Raft log entry, which can be built from an AppData.
type Entry: RaftEntry<Self> + FromAppData<Self::D>;

/// Type representing a Raft term number.
///
/// A term is a logical clock in Raft that is used to detect obsolete information,
/// such as old leaders. It must be totally ordered and monotonically increasing.
///
/// Common implementations are provided for standard integer types like `u64`, `i64` etc.
///
/// See: [`RaftTerm`] for the required methods.
type Term: RaftTerm;

/// Snapshot data for exposing a snapshot for reading & writing.
///
/// See the [storage chapter of the guide][sto] for details on log compaction / snapshotting.
Expand Down Expand Up @@ -106,6 +118,7 @@ pub mod alias {
pub type ROf<C> = <C as RaftTypeConfig>::R;
pub type NodeIdOf<C> = <C as RaftTypeConfig>::NodeId;
pub type NodeOf<C> = <C as RaftTypeConfig>::Node;
pub type TermOf<C> = <C as RaftTypeConfig>::Term;
pub type EntryOf<C> = <C as RaftTypeConfig>::Entry;
pub type SnapshotDataOf<C> = <C as RaftTypeConfig>::SnapshotData;
pub type AsyncRuntimeOf<C> = <C as RaftTypeConfig>::AsyncRuntime;
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/vote/leader_id/leader_id_adv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ use crate::RaftTypeConfig;
pub struct LeaderId<C>
where C: RaftTypeConfig
{
pub term: u64,
pub term: C::Term,
pub node_id: C::NodeId,
}

impl<C> LeaderId<C>
where C: RaftTypeConfig
{
pub fn new(term: u64, node_id: C::NodeId) -> Self {
pub fn new(term: C::Term, node_id: C::NodeId) -> Self {
Self { term, node_id }
}

pub fn get_term(&self) -> u64 {
pub fn get_term(&self) -> C::Term {
self.term
}

Expand Down
14 changes: 8 additions & 6 deletions openraft/src/vote/leader_id/leader_id_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::RaftTypeConfig;
pub struct LeaderId<C>
where C: RaftTypeConfig
{
pub term: u64,
pub term: C::Term,

pub voted_for: Option<C::NodeId>,
}
Expand Down Expand Up @@ -44,14 +44,14 @@ where C: RaftTypeConfig
impl<C> LeaderId<C>
where C: RaftTypeConfig
{
pub fn new(term: u64, node_id: C::NodeId) -> Self {
pub fn new(term: C::Term, node_id: C::NodeId) -> Self {
Self {
term,
voted_for: Some(node_id),
}
}

pub fn get_term(&self) -> u64 {
pub fn get_term(&self) -> C::Term {
self.term
}

Expand Down Expand Up @@ -84,8 +84,10 @@ where C: RaftTypeConfig
#[derive(PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(feature = "serde", serde(transparent))]
pub struct CommittedLeaderId<C> {
pub term: u64,
pub struct CommittedLeaderId<C>
where C: RaftTypeConfig
{
pub term: C::Term,
p: PhantomData<C>,
}

Expand All @@ -100,7 +102,7 @@ where C: RaftTypeConfig
impl<C> CommittedLeaderId<C>
where C: RaftTypeConfig
{
pub fn new(term: u64, node_id: C::NodeId) -> Self {
pub fn new(term: C::Term, node_id: C::NodeId) -> Self {
let _ = node_id;
Self { term, p: PhantomData }
}
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ pub use leader_id::CommittedLeaderId;
pub use leader_id::LeaderId;
pub(crate) use non_committed::NonCommittedVote;
pub use vote::Vote;

pub mod raft_term;
Loading
Loading