Skip to content

Commit

Permalink
Fix: Clarify that receiving an equal vote does not grant leadership.
Browse files Browse the repository at this point in the history
A node's `vote` may be updated when a leader observes a higher vote.
In such cases, the leader updates its local vote and steps down.
However, this vote update does not imply that the node accepts the
higher vote as valid for leadership, as it has not yet compared their
logs.

In this commit, re-enable `VoteResponse.vote_granted` to indicate a vote
is granted.

This commit also fix:

- Fix: databendlabs#1236
  • Loading branch information
drmingdrmer committed Aug 28, 2024
1 parent 3c378d0 commit 94b1e84
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 105 deletions.
2 changes: 1 addition & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,7 @@ where
let _ = self.tx_notify.send(Notify::VoteResponse {
target: self.id,
// last_log_id is not used when sending VoteRequest to local node
resp: VoteResponse::new(vote, None),
resp: VoteResponse::new(vote, None, true),
sender_vote: vote,
});
}
Expand Down
46 changes: 9 additions & 37 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ where C: RaftTypeConfig
/// should be greater.
pub(crate) seen_greater_log: bool,

/// The greatest vote this node has ever seen.
///
/// It could be greater than `self.state.vote`,
/// because `self.state.vote` is update only when a node granted a vote,
/// i.e., the Leader with this vote is legal: has a greater log and vote.
///
/// This vote value is used for election.
pub(crate) last_seen_vote: Vote<C::NodeId>,

/// Represents the Leader state.
pub(crate) leader: LeaderState<C>,

Expand All @@ -110,12 +101,10 @@ where C: RaftTypeConfig
init_state: RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
config: EngineConfig<C::NodeId>,
) -> Self {
let vote = *init_state.vote_ref();
Self {
config,
state: Valid::new(init_state),
seen_greater_log: false,
last_seen_vote: vote,
leader: None,
candidate: None,
output: EngineOutput::new(4096),
Expand Down Expand Up @@ -227,14 +216,7 @@ where C: RaftTypeConfig
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
debug_assert!(
self.last_seen_vote >= *self.state.vote_ref(),
"expect: last_seen_vote({}) >= state.vote({}), when elect()",
self.last_seen_vote,
self.state.vote_ref()
);

let new_term = self.last_seen_vote.leader_id().term + 1;
let new_term = self.state.vote.leader_id().term + 1;
let new_vote = Vote::new(new_term, self.config.id);

let candidate = self.new_candidate(new_vote);
Expand Down Expand Up @@ -322,7 +304,7 @@ where C: RaftTypeConfig
vote_utime + lease - now
);

return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied());
return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false);
}
}

Expand All @@ -341,7 +323,7 @@ where C: RaftTypeConfig

// Return the updated vote, this way the candidate knows which vote is granted, in case
// the candidate's vote is changed after sending the vote request.
return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied());
return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false);
}

// Then check vote just as it does for every incoming event.
Expand All @@ -356,7 +338,7 @@ where C: RaftTypeConfig

// Return the updated vote, this way the candidate knows which vote is granted, in case
// the candidate's vote is changed after sending the vote request.
VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied())
VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), res.is_ok())
}

#[tracing::instrument(level = "debug", skip(self, resp))]
Expand All @@ -370,24 +352,14 @@ where C: RaftTypeConfig
func_name!()
);

// Update the last seen vote, but not `state.vote`.
// `state.vote` is updated only when the vote is granted
// (allows the vote owner to be a Leader).
//
// But in this case, the responded greater vote is not yet granted
// because the remote peer may have smaller log.
// And even when the remote peer has greater log, it does not have to grant the vote,
// if greater logs does not form a quorum.
self.vote_handler().update_last_seen(&resp.vote);

let Some(candidate) = self.candidate_mut() else {
// If the voting process has finished or canceled,
// just ignore the delayed vote_resp.
return;
};

// A vote request is granted iff the replied vote is the same as the requested vote.
if &resp.vote == candidate.vote_ref() {
// If resp.vote is different, it may be a delay response to previous voting.
if resp.vote_granted && &resp.vote == candidate.vote_ref() {
let quorum_granted = candidate.grant_by(&target);
if quorum_granted {
tracing::info!("a quorum granted my vote");
Expand All @@ -396,8 +368,6 @@ where C: RaftTypeConfig
return;
}

// TODO: resp.granted is never used.

// If not equal, vote is rejected:

// Note that it is still possible seeing a smaller vote:
Expand All @@ -415,6 +385,9 @@ where C: RaftTypeConfig
);
self.set_greater_log();
}

// Update if resp.vote is greater.
let _ = self.vote_handler().update_vote(&resp.vote);
}

/// Append entries to follower/learner.
Expand Down Expand Up @@ -739,7 +712,6 @@ where C: RaftTypeConfig
config: &mut self.config,
state: &mut self.state,
output: &mut self.output,
last_seen_vote: &mut self.last_seen_vote,
leader: &mut self.leader,
candidate: &mut self.candidate,
}
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/engine/handler/vote_handler/accept_vote_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ fn m01() -> Membership<u64, ()> {
}

/// Make a sample VoteResponse
fn mk_res() -> Result<VoteResponse<u64>, Infallible> {
Ok::<VoteResponse<u64>, Infallible>(VoteResponse::new(Vote::new(2, 1), None))
fn mk_res(granted: bool) -> Result<VoteResponse<u64>, Infallible> {
Ok::<VoteResponse<u64>, Infallible>(VoteResponse::new(Vote::new(2, 1), None, granted))
}

fn eng() -> Engine<UTConfig> {
Expand All @@ -48,7 +48,7 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> {
eng.output.take_commands();

let (tx, _rx) = UTConfig::oneshot();
let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res());
let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res(false));

assert!(resp.is_none());

Expand All @@ -58,7 +58,7 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> {
//
Command::Respond {
when: None,
resp: Respond::new(mk_res(), tx)
resp: Respond::new(mk_res(false), tx)
},
],
eng.output.take_commands()
Expand All @@ -74,7 +74,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> {
eng.output.take_commands();

let (tx, _rx) = UTConfig::oneshot();
let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res());
let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res(true));

assert!(resp.is_some());

Expand Down
26 changes: 0 additions & 26 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ where C: RaftTypeConfig
pub(crate) config: &'st mut EngineConfig<C::NodeId>,
pub(crate) state: &'st mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) output: &'st mut EngineOutput<C>,
pub(crate) last_seen_vote: &'st mut Vote<C::NodeId>,
pub(crate) leader: &'st mut LeaderState<C>,
pub(crate) candidate: &'st mut CandidateState<C>,
}
Expand Down Expand Up @@ -82,27 +81,6 @@ where C: RaftTypeConfig
Some(tx)
}

/// Update the `last_seen_vote` to a greater value.
///
/// Return the replaced value if it is updated.
/// Return None if not updated.
pub(crate) fn update_last_seen(&mut self, vote: &Vote<C::NodeId>) -> Option<Vote<C::NodeId>> {
tracing::debug!(
"about to update last_seen_vote from {} to {}",
self.last_seen_vote,
vote
);

if vote >= self.last_seen_vote {
tracing::info!("updated last_seen_vote from {} to {}", self.last_seen_vote, vote);
let last = *self.last_seen_vote;
*self.last_seen_vote = *vote;
Some(last)
} else {
None
}
}

/// Check and update the local vote and related state for every message received.
///
/// This is used by all incoming event, such as the three RPC append-entries, vote,
Expand All @@ -112,12 +90,8 @@ where C: RaftTypeConfig
///
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
///
/// This method also implies calling [`Self::update_last_seen`].
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), RejectVoteRequest<C::NodeId>> {
self.update_last_seen(vote);

// Partial ord compare:
// Vote does not has to be total ord.
// `!(a >= b)` does not imply `a < b`.
Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ fn test_elect_single_node_elect_again() -> anyhow::Result<()> {

// Build in-progress election state
eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(1, 2));
eng.last_seen_vote = *eng.state.vote_ref();
eng.testing_new_leader();
eng.candidate_mut().map(|candidate| candidate.grant_by(&1));

Expand Down
10 changes: 5 additions & 5 deletions openraft/src/engine/tests/handle_vote_req_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn test_handle_vote_req_rejected_by_leader_lease() -> anyhow::Result<()> {
last_log_id: Some(log_id(2, 1, 3)),
});

assert_eq!(VoteResponse::new(Vote::new_committed(2, 1), None), resp);
assert_eq!(VoteResponse::new(Vote::new_committed(2, 1), None, false), resp);

assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref());
assert!(eng.leader.is_none());
Expand All @@ -70,7 +70,7 @@ fn test_handle_vote_req_reject_smaller_vote() -> anyhow::Result<()> {
last_log_id: None,
});

assert_eq!(VoteResponse::new(Vote::new(2, 1), None), resp);
assert_eq!(VoteResponse::new(Vote::new(2, 1), None, false), resp);

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert!(eng.leader.is_none());
Expand All @@ -92,7 +92,7 @@ fn test_handle_vote_req_reject_smaller_last_log_id() -> anyhow::Result<()> {
last_log_id: Some(log_id(1, 1, 3)),
});

assert_eq!(VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 3))), resp);
assert_eq!(VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 3)), false), resp);

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert!(eng.leader.is_none());
Expand All @@ -119,7 +119,7 @@ fn test_handle_vote_req_granted_equal_vote_and_last_log_id() -> anyhow::Result<(
last_log_id: Some(log_id(2, 1, 3)),
});

assert_eq!(VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 3))), resp);
assert_eq!(VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 3)), true), resp);

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert!(eng.leader.is_none());
Expand All @@ -146,7 +146,7 @@ fn test_handle_vote_req_granted_greater_vote() -> anyhow::Result<()> {
});

// respond the updated vote.
assert_eq!(VoteResponse::new(Vote::new(3, 1), Some(log_id(2, 1, 3))), resp);
assert_eq!(VoteResponse::new(Vote::new(3, 1), Some(log_id(2, 1, 3)), true), resp);

assert_eq!(Vote::new(3, 1), *eng.state.vote_ref());
assert!(eng.leader.is_none());
Expand Down
33 changes: 10 additions & 23 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
.membership_state
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m12())));

eng.handle_vote_resp(2, VoteResponse::new(Vote::new(2, 2), Some(log_id(2, 1, 2))));
eng.handle_vote_resp(2, VoteResponse::new(Vote::new(2, 2), Some(log_id(2, 1, 2)), true));

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(Vote::new(2, 2), eng.last_seen_vote);

assert!(eng.leader.is_none());

Expand All @@ -78,10 +77,9 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse::new(Vote::new(1, 1), Some(log_id(2, 1, 2))));
eng.handle_vote_resp(2, VoteResponse::new(Vote::new(1, 1), Some(log_id(2, 1, 2)), true));

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(Vote::new(1, 1), eng.last_seen_vote);

assert_eq!(&Vote::new(2, 1), eng.candidate_ref().unwrap().vote_ref());
assert_eq!(
Expand Down Expand Up @@ -112,26 +110,17 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse::new(Vote::new(3, 2), Some(log_id(2, 1, 2))));
eng.handle_vote_resp(2, VoteResponse::new(Vote::new(3, 2), Some(log_id(2, 1, 2)), true));

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(Vote::new(3, 2), eng.last_seen_vote);
assert_eq!(Vote::new(3, 2), *eng.state.vote_ref());

assert!(eng.leader.is_none());

assert_eq!(
ServerState::Candidate,
eng.state.server_state,
"still in candidate state, until receives RequestVote/AppendEntries from other node"
);
assert_eq!(ServerState::Follower, eng.state.server_state,);

assert_eq!(
eng.output.take_commands(),
vec![
//
],
"no SaveVote because the higher vote is not yet granted by this node"
);
assert_eq!(eng.output.take_commands(), vec![Command::SaveVote {
vote: Vote::new(3, 2)
}],);
}

tracing::info!("--- equal vote, granted, but not constitute a quorum. nothing to do");
Expand All @@ -150,10 +139,9 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 2))));
eng.handle_vote_resp(2, VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 2)), true));

assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(Vote::new(2, 1), eng.last_seen_vote);

assert_eq!(&Vote::new(2, 1), eng.candidate_ref().unwrap().vote_ref());
assert_eq!(
Expand Down Expand Up @@ -185,10 +173,9 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> {

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 2))));
eng.handle_vote_resp(2, VoteResponse::new(Vote::new(2, 1), Some(log_id(2, 1, 2)), true));

assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref(),);
assert_eq!(Vote::new_committed(2, 1), eng.last_seen_vote);

assert_eq!(Some(log_id(2, 1, 1)), eng.leader.as_ref().unwrap().noop_log_id);
assert!(
Expand Down
13 changes: 6 additions & 7 deletions openraft/src/raft/message/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ impl<NID: NodeId> VoteRequest<NID> {
pub struct VoteResponse<NID: NodeId> {
/// vote after a node handling vote-request.
/// Thus `resp.vote >= req.vote` always holds.
///
/// `vote` that equals the candidate.vote does not mean the vote is granted.
/// The `vote` may be updated when a previous Leader sees a higher vote.
pub vote: Vote<NID>,

/// Previously, it is true if a node accepted and saved the VoteRequest.
/// Now, it is no longer used and is always false.
/// If `vote` is the same as the Candidate, the Vote is granted.
#[deprecated(note = "use new() and is_granted_to() instead", since = "0.10.0")]
/// It is true if a node accepted and saved the VoteRequest.
pub vote_granted: bool,

/// The last log id stored on the remote voter.
Expand All @@ -64,11 +64,10 @@ impl<NID: NodeId> MessageSummary<VoteResponse<NID>> for VoteResponse<NID> {
impl<NID> VoteResponse<NID>
where NID: NodeId
{
pub fn new(vote: impl Borrow<Vote<NID>>, last_log_id: Option<LogId<NID>>) -> Self {
#[allow(deprecated)]
pub fn new(vote: impl Borrow<Vote<NID>>, last_log_id: Option<LogId<NID>>, granted: bool) -> Self {
Self {
vote: *vote.borrow(),
vote_granted: false,
vote_granted: granted,
last_log_id: last_log_id.map(|x| *x.borrow()),
}
}
Expand Down

0 comments on commit 94b1e84

Please sign in to comment.