Skip to content

Commit

Permalink
Refactor: Consolidate Voting, Leading and InternalServerState t…
Browse files Browse the repository at this point in the history
…ype parameters into `C`
  • Loading branch information
drmingdrmer committed Mar 20, 2024
1 parent f5065fb commit 6af0292
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 84 deletions.
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where C: RaftTypeConfig
pub(crate) seen_greater_log: bool,

/// The internal server state used by Engine.
pub(crate) internal_server_state: InternalServerState<C::NodeId, InstantOf<C>>,
pub(crate) internal_server_state: InternalServerState<C>,

/// Output entry for the runtime.
pub(crate) output: EngineOutput<C>,
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::internal_server_state::LeaderQuorumSet;
use crate::leader::Leading;
use crate::type_config::alias::InstantOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
Expand All @@ -24,7 +23,7 @@ pub(crate) struct LeaderHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) leader: &'x mut Leading<C::NodeId, LeaderQuorumSet<C::NodeId>, InstantOf<C>>,
pub(crate) leader: &'x mut Leading<C, LeaderQuorumSet<C::NodeId>>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C>,
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) struct ReplicationHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) leader: &'x mut Leading<C::NodeId, LeaderQuorumSet<C::NodeId>, InstantOf<C>>,
pub(crate) leader: &'x mut Leading<C, LeaderQuorumSet<C::NodeId>>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C>,
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where C: RaftTypeConfig
pub(crate) config: &'st EngineConfig<C::NodeId>,
pub(crate) state: &'st mut RaftState<C>,
pub(crate) output: &'st mut EngineOutput<C>,
pub(crate) internal_server_state: &'st mut InternalServerState<C::NodeId, InstantOf<C>>,
pub(crate) internal_server_state: &'st mut InternalServerState<C>,
}

impl<'st, C> VoteHandler<'st, C>
Expand Down
29 changes: 11 additions & 18 deletions openraft/src/internal_server_state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::leader::voting::Voting;
use crate::leader::Leading;
use crate::quorum::Joint;
use crate::Instant;
use crate::NodeId;
use crate::RaftTypeConfig;

/// The quorum set type used by `Leader`.
pub(crate) type LeaderQuorumSet<NID> = Joint<NID, Vec<NID>, Vec<Vec<NID>>>;
Expand All @@ -23,52 +22,46 @@ pub(crate) type LeaderQuorumSet<NID> = Joint<NID, Vec<NID>, Vec<Vec<NID>>>;
#[derive(PartialEq, Eq)]
// TODO(9): Make InternalServerState an Option, separate Leading(Proposer) role and
// Following(Acceptor) role
pub(crate) enum InternalServerState<NID, I>
where
NID: NodeId,
I: Instant,
pub(crate) enum InternalServerState<C>
where C: RaftTypeConfig
{
/// Leader or candidate.
///
/// `vote.committed==true` means it is a leader.
Leading(Box<Leading<NID, LeaderQuorumSet<NID>, I>>),
Leading(Box<Leading<C, LeaderQuorumSet<C::NodeId>>>),

/// Follower or learner.
///
/// Being a voter means it is a follower.
Following,
}

impl<NID, I> Default for InternalServerState<NID, I>
where
NID: NodeId,
I: Instant,
impl<C> Default for InternalServerState<C>
where C: RaftTypeConfig
{
fn default() -> Self {
Self::Following
}
}

impl<NID, I> InternalServerState<NID, I>
where
NID: NodeId,
I: Instant,
impl<C> InternalServerState<C>
where C: RaftTypeConfig
{
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<NID, LeaderQuorumSet<NID>, I>> {
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<C, LeaderQuorumSet<C::NodeId>>> {
match self {
InternalServerState::Leading(l) => l.voting_mut(),
InternalServerState::Following => None,
}
}

pub(crate) fn leading(&self) -> Option<&Leading<NID, LeaderQuorumSet<NID>, I>> {
pub(crate) fn leading(&self) -> Option<&Leading<C, LeaderQuorumSet<C::NodeId>>> {
match self {
InternalServerState::Leading(l) => Some(l),
InternalServerState::Following => None,
}
}

pub(crate) fn leading_mut(&mut self) -> Option<&mut Leading<NID, LeaderQuorumSet<NID>, I>> {
pub(crate) fn leading_mut(&mut self) -> Option<&mut Leading<C, LeaderQuorumSet<C::NodeId>>> {
match self {
InternalServerState::Leading(l) => Some(l),
InternalServerState::Following => None,
Expand Down
68 changes: 31 additions & 37 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::Vote;

/// Leading state data.
Expand All @@ -27,37 +28,38 @@ use crate::Vote;
/// But instead it will be able to upgrade its `leader_id` without losing leadership.
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Leading<NID: NodeId, QS: QuorumSet<NID>, I: Instant> {
pub(crate) struct Leading<C, QS: QuorumSet<C::NodeId>>
where C: RaftTypeConfig
{
/// The vote this leader works in.
pub(crate) vote: Vote<NID>,
pub(crate) vote: Vote<C::NodeId>,

quorum_set: QS,

/// Voting state, i.e., there is a Candidate running.
voting: Option<Voting<NID, QS, I>>,
voting: Option<Voting<C, QS>>,

/// Tracks the replication progress and committed index
pub(crate) progress: VecProgress<NID, ProgressEntry<NID>, Option<LogId<NID>>, QS>,
pub(crate) progress: VecProgress<C::NodeId, ProgressEntry<C::NodeId>, Option<LogIdOf<C>>, QS>,

/// Tracks the clock time acknowledged by other nodes.
///
/// See [`docs::leader_lease`] for more details.
///
/// [`docs::leader_lease`]: `crate::docs::protocol::replication::leader_lease`
pub(crate) clock_progress: VecProgress<NID, Option<I>, Option<I>, QS>,
pub(crate) clock_progress: VecProgress<C::NodeId, Option<InstantOf<C>>, Option<InstantOf<C>>, QS>,
}

impl<NID, QS, I> Leading<NID, QS, I>
impl<C, QS> Leading<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID> + Clone + fmt::Debug + 'static,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + Clone + fmt::Debug + 'static,
{
pub(crate) fn new(
vote: Vote<NID>,
vote: Vote<C::NodeId>,
quorum_set: QS,
learner_ids: impl Iterator<Item = NID>,
last_log_id: Option<LogId<NID>>,
learner_ids: impl Iterator<Item = C::NodeId>,
last_log_id: Option<LogIdOf<C>>,
) -> Self {
let learner_ids = learner_ids.collect::<Vec<_>>();

Expand All @@ -75,22 +77,26 @@ where
}

#[allow(dead_code)]
pub(crate) fn voting(&self) -> Option<&Voting<NID, QS, I>> {
pub(crate) fn voting(&self) -> Option<&Voting<C, QS>> {
self.voting.as_ref()
}

#[allow(dead_code)]
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<NID, QS, I>> {
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<C, QS>> {
self.voting.as_mut()
}

pub(crate) fn initialize_voting(&mut self, last_log_id: Option<LogId<NID>>, now: I) -> &mut Voting<NID, QS, I> {
pub(crate) fn initialize_voting(
&mut self,
last_log_id: Option<LogIdOf<C>>,
now: InstantOf<C>,
) -> &mut Voting<C, QS> {
self.voting = Some(Voting::new(now, self.vote, last_log_id, self.quorum_set.clone()));
self.voting.as_mut().unwrap()
}

/// Finish the voting process and return the state.
pub(crate) fn finish_voting(&mut self) -> Voting<NID, QS, I> {
pub(crate) fn finish_voting(&mut self) -> Voting<C, QS> {
// it has to be in voting progress
self.voting.take().unwrap()
}
Expand All @@ -107,7 +113,7 @@ where
/// Note that the leader may not be in the QuorumSet at all.
/// In such a case, the update operation will be just ignored,
/// and the quorum-acked-time is totally determined by remove voters.
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<I> {
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
// For `Leading`, the vote is always the leader's vote.
// Thus vote.voted_for() is this node.

Expand Down Expand Up @@ -142,12 +148,8 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 1),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);
let mut leading =
Leading::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 1), vec![1, 2, 3], vec![4].into_iter(), None);

let now1 = InstantOf::<UTConfig>::now();

Expand All @@ -158,12 +160,8 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 4),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);
let mut leading =
Leading::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 4), vec![1, 2, 3], vec![4].into_iter(), None);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand All @@ -178,12 +176,8 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 5),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);
let mut leading =
Leading::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 5), vec![1, 2, 3], vec![4].into_iter(), None);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand Down
50 changes: 26 additions & 24 deletions openraft/src/leader/voting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,35 @@ use crate::display_ext::DisplayOptionExt;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::Instant;
use crate::LogId;
use crate::NodeId;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::RaftTypeConfig;
use crate::Vote;

/// Voting state.
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Voting<NID, QS, I>
pub(crate) struct Voting<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID>,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId>,
{
/// When the voting is started.
starting_time: I,
starting_time: InstantOf<C>,

/// The vote.
vote: Vote<NID>,
vote: Vote<C::NodeId>,

last_log_id: Option<LogId<NID>>,
last_log_id: Option<LogIdOf<C>>,

/// Which nodes have granted the the vote at certain time point.
progress: VecProgress<NID, bool, bool, QS>,
progress: VecProgress<C::NodeId, bool, bool, QS>,
}

impl<NID, QS, I> fmt::Display for Voting<NID, QS, I>
impl<C, QS> fmt::Display for Voting<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID> + fmt::Debug + 'static,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + fmt::Debug + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
Expand All @@ -48,13 +46,17 @@ where
}
}

impl<NID, QS, I> Voting<NID, QS, I>
impl<C, QS> Voting<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID> + fmt::Debug + 'static,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + fmt::Debug + 'static,
{
pub(crate) fn new(starting_time: I, vote: Vote<NID>, last_log_id: Option<LogId<NID>>, quorum_set: QS) -> Self {
pub(crate) fn new(
starting_time: InstantOf<C>,
vote: Vote<C::NodeId>,
last_log_id: Option<LogIdOf<C>>,
quorum_set: QS,
) -> Self {
Self {
starting_time,
vote,
Expand All @@ -63,16 +65,16 @@ where
}
}

pub(crate) fn vote_ref(&self) -> &Vote<NID> {
pub(crate) fn vote_ref(&self) -> &Vote<C::NodeId> {
&self.vote
}

pub(crate) fn progress(&self) -> &VecProgress<NID, bool, bool, QS> {
pub(crate) fn progress(&self) -> &VecProgress<C::NodeId, bool, bool, QS> {
&self.progress
}

/// Grant the vote by a node.
pub(crate) fn grant_by(&mut self, target: &NID) -> bool {
pub(crate) fn grant_by(&mut self, target: &C::NodeId) -> bool {
let granted = *self.progress.update(target, true).expect("target not in quorum set");

tracing::info!(voting = debug(&self), "{}", func_name!());
Expand All @@ -82,7 +84,7 @@ where

/// Return the node ids that has granted this vote.
#[allow(dead_code)]
pub(crate) fn granters(&self) -> impl Iterator<Item = NID> + '_ {
pub(crate) fn granters(&self) -> impl Iterator<Item = C::NodeId> + '_ {
self.progress().iter().filter(|(_, granted)| *granted).map(|(target, _)| *target)
}
}

0 comments on commit 6af0292

Please sign in to comment.