Skip to content

Commit

Permalink
Feature: Abstract Vote
Browse files Browse the repository at this point in the history
- Added an associated type `Vote: RaftVote` to `RaftTypeConfig`,
  allowing applications to customize the `Vote` implementation.

- Introduced the `OrdBy` trait with the method `fn ord_by() ->
  Option<Ordering>` to enable customized ordering for types.
  This trait is used internally only for determine `RaftVote` order.

  The ordering logic is consistent across different `Vote`
  implementations and does not require the application to implement
  `PartialOrd` directly for `Vote`. Instead, this ordering property is
  provided by OpenRaft.

- Implemented `RaftVote` for the struct `Vote`, which serves as the
  default `RaftVote` implementation. This ensures that applications
  upgrading OpenRaft do not need to make any changes.

- Part of #1278
  • Loading branch information
drmingdrmer committed Jan 8, 2025
1 parent f8dd4d5 commit e2d58ce
Show file tree
Hide file tree
Showing 49 changed files with 470 additions and 172 deletions.
1 change: 1 addition & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"openraftpb.LeaderId",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
27 changes: 3 additions & 24 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ openraft::declare_raft_types!(
D = pb::SetRequest,
R = pb::Response,
LeaderId = pb::LeaderId,
Vote = pb::Vote,
Node = pb::Node,
SnapshotData = StateMachineData,
);
Expand All @@ -34,17 +35,6 @@ pub mod protobuf {
#[path = "../../utils/declare_types.rs"]
pub mod typ;

impl From<pb::Vote> for Vote {
fn from(proto_vote: pb::Vote) -> Self {
let leader_id: LeaderId = proto_vote.leader_id.unwrap();
if proto_vote.committed {
Vote::new_committed(leader_id.term, leader_id.node_id)
} else {
Vote::new(leader_id.term, leader_id.node_id)
}
}
}

impl From<pb::LogId> for LogId {
fn from(proto_log_id: pb::LogId) -> Self {
LogId::new(proto_log_id.term, proto_log_id.index)
Expand All @@ -53,31 +43,20 @@ impl From<pb::LogId> for LogId {

impl From<pb::VoteRequest> for VoteRequest {
fn from(proto_vote_req: pb::VoteRequest) -> Self {
let vote: Vote = proto_vote_req.vote.unwrap().into();
let vote = proto_vote_req.vote.unwrap();
let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into());
VoteRequest::new(vote, last_log_id)
}
}

impl From<pb::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: pb::VoteResponse) -> Self {
let vote: Vote = proto_vote_resp.vote.unwrap().into();
let vote = proto_vote_resp.vote.unwrap();
let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into());
VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted)
}
}

impl From<Vote> for pb::Vote {
fn from(vote: Vote) -> Self {
pb::Vote {
leader_id: Some(pb::LeaderId {
term: vote.leader_id().term,
node_id: vote.leader_id().node_id,
}),
committed: vote.is_committed(),
}
}
}
impl From<LogId> for pb::LogId {
fn from(log_id: LogId) -> Self {
pb::LogId {
Expand Down
34 changes: 34 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/impl_vote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::fmt;

use openraft::vote::RaftVote;

use crate::typ::*;
use crate::TypeConfig;

impl RaftVote<TypeConfig> for Vote {
fn from_leader_id(leader_id: LeaderId, committed: bool) -> Self {
Vote {
leader_id: Some(leader_id),
committed,
}
}

fn leader_id(&self) -> Option<&LeaderId> {
self.leader_id.as_ref()
}

fn is_committed(&self) -> bool {
self.committed
}
}

impl fmt::Display for Vote {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"<{}:{}>",
self.leader_id.as_ref().unwrap(),
if self.is_committed() { "Q" } else { "-" }
)
}
}
1 change: 1 addition & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Implements traits for protobuf types
mod impl_leader_id;
mod impl_vote;
2 changes: 1 addition & 1 deletion examples/utils/declare_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<TypeConfig>;
pub type Vote = <TypeConfig as openraft::RaftTypeConfig>::Vote;
pub type LeaderId = <TypeConfig as openraft::RaftTypeConfig>::LeaderId;
pub type LogId = openraft::LogId<TypeConfig>;
pub type Entry = openraft::Entry<TypeConfig>;
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Basic types used in the Raft implementation.
pub(crate) mod ord_by;

pub use serde_able::OptionalSerde;
pub use threaded::BoxAny;
pub use threaded::BoxAsyncOnceMut;
Expand Down
36 changes: 36 additions & 0 deletions openraft/src/base/ord_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/// A trait for types whose order can be determined by a key.
///
/// Types implementing this trait define how they should be compared by providing a key
/// that implements [`PartialOrd`].
///
/// OpenRaft uses this trait to compare types that may not be [`PartialOrd`] themselves.
///
/// # Type Parameters
/// - `Key<'k>`: The type of the comparison key, which must be partially ordered and must not out
/// live the value.
///
/// # Examples
/// ```rust,ignore
/// # use openraft::base::ord_by::OrdBy;
///
/// struct Person {
/// name: String,
/// age: u32,
/// }
///
/// impl OrdBy<()> for Person {
/// type By<'k> = &'k str;
///
/// fn ord_by(&self) -> Self::By<'_> {
/// &self.name
/// }
/// }
/// ```
pub(crate) trait OrdBy<C> {
/// The key type used for comparison.
type By<'k>: PartialOrd + 'k
where Self: 'k;

/// Returns the key used for comparing this value.
fn ord_by(&self) -> Self::By<'_>;
}
8 changes: 5 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ use crate::type_config::async_runtime::MpscUnboundedReceiver;
use crate::type_config::TypeConfigExt;
use crate::vote::committed::CommittedVote;
use crate::vote::non_committed::NonCommittedVote;
use crate::vote::raft_vote::RaftVoteExt;
use crate::vote::vote_status::VoteStatus;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -392,7 +394,7 @@ where
// request.
if let AppendEntriesResponse::HigherVote(vote) = append_res {
debug_assert!(
vote > my_vote,
vote.as_ref_vote() > my_vote.as_ref_vote(),
"committed vote({}) has total order relation with other votes({})",
my_vote,
vote
Expand Down Expand Up @@ -584,7 +586,7 @@ where
id: self.id.clone(),

// --- data ---
current_term: st.vote_ref().leader_id().term(),
current_term: st.vote_ref().to_leader_id().term(),
vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().cloned(),
Expand Down Expand Up @@ -727,7 +729,7 @@ where
}

// Safe unwrap(): vote that is committed has to already have voted for some node.
let id = vote.leader_id().node_id().cloned().unwrap();
let id = vote.to_leader_id().node_id().cloned().unwrap();

// TODO: `is_voter()` is slow, maybe cache `current_leader`,
// e.g., only update it when membership or vote changes
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where C: RaftTypeConfig
///
/// Returns a snapshot data handle for receiving data.
///
/// It does not check [`Vote`] because it is a read operation
/// It does not check `Vote` because it is a read operation
/// and does not break raft protocol.
BeginReceivingSnapshot {
tx: ResultSender<C, Box<SnapshotDataOf<C>>, Infallible>,
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ mod tests {
type Node = ();
type Term = u64;
type LeaderId = crate::impls::leader_id_adv::LeaderId<Self>;
type Entry = crate::Entry<TickUTConfig>;
type Vote = crate::impls::Vote<Self>;
type Entry = crate::Entry<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
17 changes: 9 additions & 8 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use validit::Valid;

use crate::base::ord_by::OrdBy;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ResultSender;
use crate::core::sm;
Expand Down Expand Up @@ -44,18 +45,20 @@ use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::storage::Snapshot;
use crate::storage::SnapshotMeta;
use crate::type_config::alias::LeaderIdOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::vote::RaftLeaderId;
use crate::vote::RaftTerm;
use crate::vote::RaftVote;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftLogId;
use crate::RaftTypeConfig;
use crate::Vote;

/// Raft protocol algorithm.
///
Expand Down Expand Up @@ -195,10 +198,7 @@ where C: RaftTypeConfig
self.check_members_contain_me(m)?;

// FollowingHandler requires vote to be committed.
let vote = Vote {
committed: true,
..Default::default()
};
let vote = <VoteOf<C> as RaftVote<C>>::from_leader_id(Default::default(), true);
self.state.vote.update(C::now(), Duration::default(), vote);
self.following_handler().do_append_entries(vec![entry]);

Expand All @@ -211,8 +211,9 @@ where C: RaftTypeConfig
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let new_term = self.state.vote.leader_id().term().next();
let new_vote = Vote::new(new_term, self.config.id.clone());
let new_term = self.state.vote.term().next();
let leader_id = LeaderIdOf::<C>::new(new_term, self.config.id.clone());
let new_vote = VoteOf::<C>::from_leader_id(leader_id, false);

let candidate = self.new_candidate(new_vote.clone());

Expand Down Expand Up @@ -754,7 +755,7 @@ where C: RaftTypeConfig
};

debug_assert!(
leader.committed_vote_ref().clone().into_vote() >= *self.state.vote_ref(),
leader.committed_vote_ref().ord_by() >= self.state.vote_ref().ord_by(),
"leader.vote({}) >= state.vote({})",
leader.committed_vote_ref(),
self.state.vote_ref()
Expand Down
7 changes: 4 additions & 3 deletions openraft/src/engine/handler/establish_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::base::ord_by::OrdBy;
use crate::engine::EngineConfig;
use crate::proposer::Candidate;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::proposer::LeaderState;
use crate::vote::RaftLeaderId;
use crate::vote::raft_vote::RaftVoteExt;
use crate::RaftTypeConfig;

/// Establish a leader for the Engine, when Candidate finishes voting stage.
Expand All @@ -25,14 +26,14 @@ where C: RaftTypeConfig
let vote = candidate.vote_ref().clone();

debug_assert_eq!(
vote.leader_id().node_id(),
vote.leader_node_id(),
Some(&self.config.id),
"it can only commit its own vote"
);

if let Some(l) = self.leader.as_ref() {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
if !(vote > l.committed_vote_ref().clone().into_vote()) {
if !(vote.ord_by() > l.committed_vote_ref().ord_by()) {
tracing::warn!(
"vote is not greater than current existing leader vote. Do not establish new leader and quit"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;
Expand All @@ -28,15 +30,13 @@ fn m23() -> Membership<UTConfig> {
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
let mut eng: Engine<UTConfig> = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.config.id = 2;
eng.state.vote.update(
UTConfig::<()>::now(),
Duration::from_millis(500),
Vote::new_committed(2, 1),
);
let vote = VoteOf::<UTConfig>::new_committed(2, 1);
let now = UTConfig::<()>::now();
eng.state.vote.update(now, Duration::from_millis(500), vote);
eng.state.log_ids.append(log_id(1, 1, 1));
eng.state.log_ids.append(log_id(2, 1, 3));
eng.state.membership_state = MembershipState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::raft_state::LogStateReader;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::EntryPayload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use crate::raft_state::LogStateReader;
use crate::storage::Snapshot;
use crate::storage::SnapshotMeta;
use crate::testing::log_id;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::StoredMembership;
Expand All @@ -31,14 +33,12 @@ fn m1234() -> Membership<UTConfig> {
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
let mut eng: Engine<UTConfig> = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.state.vote.update(
UTConfig::<()>::now(),
Duration::from_millis(500),
Vote::new_committed(2, 1),
);
let now = UTConfig::<()>::now();
let vote = VoteOf::<UTConfig>::new_committed(2, 1);
eng.state.vote.update(now, Duration::from_millis(500), vote);
eng.state.committed = Some(log_id(4, 1, 5));
eng.state.log_ids = LogIdList::new(vec![
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::Membership;
Expand Down
Loading

0 comments on commit e2d58ce

Please sign in to comment.