Skip to content

Commit

Permalink
Refactor: Change type definition to pass a 'RaftTypeConfig' instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Miaxos committed Feb 28, 2024
1 parent 090fb36 commit 0e93e14
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 54 deletions.
10 changes: 5 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ where
&mut self,
changes: ChangeMembers<C::NodeId, C::Node>,
retain: bool,
tx: ResultSender<AsyncRuntimeOf<C>, ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
tx: ResultSender<C, ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
) {
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
let new_membership = match res {
Expand Down Expand Up @@ -599,7 +599,7 @@ where
pub(crate) fn handle_initialize(
&mut self,
member_nodes: BTreeMap<C::NodeId, C::Node>,
tx: ResultSender<AsyncRuntimeOf<C>, (), InitializeError<C::NodeId, C::Node>>,
tx: ResultSender<C, (), InitializeError<C::NodeId, C::Node>>,
) {
tracing::debug!(member_nodes = debug(&member_nodes), "{}", func_name!());

Expand All @@ -624,7 +624,7 @@ where
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(crate) fn reject_with_forward_to_leader<T: OptionalSend, E: OptionalSend>(
&self,
tx: ResultSender<AsyncRuntimeOf<C>, T, E>,
tx: ResultSender<C, T, E>,
) where
E: From<ForwardToLeader<C::NodeId, C::Node>>,
{
Expand Down Expand Up @@ -1083,7 +1083,7 @@ where
pub(super) fn handle_vote_request(
&mut self,
req: VoteRequest<C::NodeId>,
tx: VoteTx<AsyncRuntimeOf<C>, C::NodeId>,
tx: VoteTx<C>,
) {
tracing::info!(req = display(req.summary()), func = func_name!());

Expand All @@ -1098,7 +1098,7 @@ where
pub(super) fn handle_append_entries_request(
&mut self,
req: AppendEntriesRequest<C>,
tx: AppendEntriesTx<AsyncRuntimeOf<C>, C::NodeId>,
tx: AppendEntriesTx<C>,
) {
tracing::debug!(req = display(req.summary()), func = func_name!());

Expand Down
5 changes: 1 addition & 4 deletions openraft/src/core/raft_msg/external_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::fmt;

use crate::core::raft_msg::ResultSender;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::RaftTypeConfig;
use crate::Snapshot;

Expand All @@ -24,9 +23,7 @@ pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
Snapshot,

/// Get a snapshot from the state machine, send back via a oneshot::Sender.
GetSnapshot {
tx: ResultSender<AsyncRuntimeOf<C>, Option<Snapshot<C>>>,
},
GetSnapshot { tx: ResultSender<C, Option<Snapshot<C>>> },

/// Purge logs covered by a snapshot up to a specified index.
///
Expand Down
31 changes: 14 additions & 17 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,23 @@ use crate::Vote;
pub(crate) mod external_command;

/// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`.
pub(crate) type ResultSender<Runtime, T, E = Infallible> = <Runtime as AsyncRuntime>::OneshotSender<Result<T, E>>;
pub(crate) type ResultSender<C, T, E = Infallible> = <AsyncRuntimeOf<C> as AsyncRuntime>::OneshotSender<Result<T, E>>;

pub(crate) type ResultReceiver<Runtime, T, E = Infallible> = <Runtime as AsyncRuntime>::OneshotReceiver<Result<T, E>>;
pub(crate) type ResultReceiver<C, T, E = Infallible> =
<AsyncRuntimeOf<C> as AsyncRuntime>::OneshotReceiver<Result<T, E>>;

/// TX for Vote Response
pub(crate) type VoteTx<Runtime, NID> = ResultSender<Runtime, VoteResponse<NID>>;
pub(crate) type VoteTx<C> = ResultSender<C, VoteResponse<NodeIdOf<C>>>;

/// TX for Append Entries Response
pub(crate) type AppendEntriesTx<Runtime, NID> = ResultSender<Runtime, AppendEntriesResponse<NID>>;
pub(crate) type AppendEntriesTx<C> = ResultSender<C, AppendEntriesResponse<NodeIdOf<C>>>;

/// TX for Client Write Response
pub(crate) type ClientWriteTx<C> =
ResultSender<AsyncRuntimeOf<C>, ClientWriteResponse<C>, ClientWriteError<NodeIdOf<C>, NodeOf<C>>>;
pub(crate) type ClientWriteTx<C> = ResultSender<C, ClientWriteResponse<C>, ClientWriteError<NodeIdOf<C>, NodeOf<C>>>;

/// TX for Linearizable Read Response
pub(crate) type ClientReadTx<C> = ResultSender<
AsyncRuntimeOf<C>,
(Option<LogIdOf<C>>, Option<LogIdOf<C>>),
CheckIsLeaderError<NodeIdOf<C>, NodeOf<C>>,
>;
pub(crate) type ClientReadTx<C> =
ResultSender<C, (Option<LogIdOf<C>>, Option<LogIdOf<C>>), CheckIsLeaderError<NodeIdOf<C>, NodeOf<C>>>;

/// A message sent by application to the [`RaftCore`].
///
Expand All @@ -57,18 +54,18 @@ where C: RaftTypeConfig
{
AppendEntries {
rpc: AppendEntriesRequest<C>,
tx: AppendEntriesTx<AsyncRuntimeOf<C>, C::NodeId>,
tx: AppendEntriesTx<C>,
},

RequestVote {
rpc: VoteRequest<C::NodeId>,
tx: VoteTx<AsyncRuntimeOf<C>, C::NodeId>,
tx: VoteTx<C>,
},

InstallFullSnapshot {
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<AsyncRuntimeOf<C>, SnapshotResponse<C::NodeId>>,
tx: ResultSender<C, SnapshotResponse<C::NodeId>>,
},

/// Begin receiving a snapshot from the leader.
Expand All @@ -78,7 +75,7 @@ where C: RaftTypeConfig
/// will be returned in a Err
BeginReceivingSnapshot {
vote: Vote<C::NodeId>,
tx: ResultSender<AsyncRuntimeOf<C>, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
tx: ResultSender<C, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
},

ClientWriteRequest {
Expand All @@ -92,7 +89,7 @@ where C: RaftTypeConfig

Initialize {
members: BTreeMap<C::NodeId, C::Node>,
tx: ResultSender<AsyncRuntimeOf<C>, (), InitializeError<C::NodeId, C::Node>>,
tx: ResultSender<C, (), InitializeError<C::NodeId, C::Node>>,
},

ChangeMembership {
Expand All @@ -102,7 +99,7 @@ where C: RaftTypeConfig
/// config will be converted into learners, otherwise they will be removed.
retain: bool,

tx: ResultSender<AsyncRuntimeOf<C>, ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
tx: ResultSender<C, ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
},

ExternalCoreRequest {
Expand Down
10 changes: 4 additions & 6 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn get_snapshot(tx: ResultSender<C::AsyncRuntime, Option<Snapshot<C>>>) -> Self {
pub(crate) fn get_snapshot(tx: ResultSender<C, Option<Snapshot<C>>>) -> Self {
let payload = CommandPayload::GetSnapshot { tx };
Command::new(payload)
}

pub(crate) fn begin_receiving_snapshot(
tx: ResultSender<C::AsyncRuntime, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
) -> Self {
pub(crate) fn begin_receiving_snapshot(tx: ResultSender<C, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>) -> Self {
let payload = CommandPayload::BeginReceivingSnapshot { tx };
Command::new(payload)
}
Expand Down Expand Up @@ -93,11 +91,11 @@ where C: RaftTypeConfig

/// Get the latest built snapshot.
GetSnapshot {
tx: ResultSender<C::AsyncRuntime, Option<Snapshot<C>>>,
tx: ResultSender<C, Option<Snapshot<C>>>,
},

BeginReceivingSnapshot {
tx: ResultSender<C::AsyncRuntime, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
tx: ResultSender<C, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
},

InstallFullSnapshot {
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::core::ApplyingEntry;
use crate::entry::RaftPayload;
use crate::storage::RaftStateMachine;
use crate::summary::MessageSummary;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::RaftLogId;
use crate::RaftSnapshotBuilder;
Expand Down Expand Up @@ -223,7 +222,7 @@ where
#[tracing::instrument(level = "info", skip_all)]
async fn get_snapshot(
&mut self,
tx: ResultSender<AsyncRuntimeOf<C>, Option<Snapshot<C>>>,
tx: ResultSender<C, Option<Snapshot<C>>>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("{}", func_name!());

Expand Down
11 changes: 5 additions & 6 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::summary::MessageSummary;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::AsyncRuntime;
use crate::Instant;
Expand Down Expand Up @@ -225,8 +224,8 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn get_leader_handler_or_reject<T, E>(
&mut self,
tx: Option<ResultSender<C::AsyncRuntime, T, E>>,
) -> Option<(LeaderHandler<C>, Option<ResultSender<C::AsyncRuntime, T, E>>)>
tx: Option<ResultSender<C, T, E>>,
) -> Option<(LeaderHandler<C>, Option<ResultSender<C, T, E>>)>
where
T: OptionalSend,
E: OptionalSend,
Expand Down Expand Up @@ -396,7 +395,7 @@ where C: RaftTypeConfig
vote: &Vote<C::NodeId>,
prev_log_id: Option<LogId<C::NodeId>>,
entries: Vec<C::Entry>,
tx: Option<AppendEntriesTx<AsyncRuntimeOf<C>, C::NodeId>>,
tx: Option<AppendEntriesTx<C>>,
) -> bool {
tracing::debug!(
vote = display(vote),
Expand Down Expand Up @@ -459,7 +458,7 @@ where C: RaftTypeConfig
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<C::AsyncRuntime, SnapshotResponse<C::NodeId>>,
tx: ResultSender<C, SnapshotResponse<C::NodeId>>,
) {
tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!());

Expand Down Expand Up @@ -492,7 +491,7 @@ where C: RaftTypeConfig
pub(crate) fn handle_begin_receiving_snapshot(
&mut self,
vote: Vote<C::NodeId>,
tx: ResultSender<C::AsyncRuntime, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
tx: ResultSender<C, Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
) {
tracing::info!(vote = display(vote), "{}", func_name!());

Expand Down
5 changes: 2 additions & 3 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::error::RejectVoteRequest;
use crate::internal_server_state::InternalServerState;
use crate::leader::Leading;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::Instant;
Expand Down Expand Up @@ -52,9 +51,9 @@ where C: RaftTypeConfig
pub(crate) fn accept_vote<T, E, F>(
&mut self,
vote: &Vote<C::NodeId>,
tx: ResultSender<AsyncRuntimeOf<C>, T, E>,
tx: ResultSender<C, T, E>,
f: F,
) -> Option<ResultSender<AsyncRuntimeOf<C>, T, E>>
) -> Option<ResultSender<C, T, E>>
where
T: Debug + Eq + OptionalSend,
E: Debug + Eq + OptionalSend,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ where
#[tracing::instrument(level = "info", skip_all)]
async fn stream_snapshot(
&mut self,
snapshot_rx: DataWithId<ResultReceiver<AsyncRuntimeOf<C>, Option<Snapshot<C>>>>,
snapshot_rx: DataWithId<ResultReceiver<C, Option<Snapshot<C>>>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = snapshot_rx.request_id();
let rx = snapshot_rx.into_data();
Expand Down
13 changes: 3 additions & 10 deletions openraft/src/replication/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ where C: RaftTypeConfig
Self::Data(Data::new_logs(id, log_id_range))
}

pub(crate) fn snapshot(
id: Option<u64>,
snapshot_rx: ResultReceiver<AsyncRuntimeOf<C>, Option<Snapshot<C>>>,
) -> Self {
pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: ResultReceiver<C, Option<Snapshot<C>>>) -> Self {
Self::Data(Data::new_snapshot(id, snapshot_rx))
}

Expand Down Expand Up @@ -59,7 +56,6 @@ use crate::error::StreamingError;
use crate::log_id_range::LogIdRange;
use crate::raft::SnapshotResponse;
use crate::replication::callbacks::SnapshotCallback;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::LogId;
use crate::MessageSummary;
Expand All @@ -77,7 +73,7 @@ where C: RaftTypeConfig
{
Heartbeat,
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<ResultReceiver<AsyncRuntimeOf<C>, Option<Snapshot<C>>>>),
Snapshot(DataWithId<ResultReceiver<C, Option<Snapshot<C>>>>),
SnapshotCallback(DataWithId<SnapshotCallback<C>>),
}

Expand Down Expand Up @@ -152,10 +148,7 @@ where C: RaftTypeConfig
Self::Logs(DataWithId::new(request_id, log_id_range))
}

pub(crate) fn new_snapshot(
request_id: Option<u64>,
snapshot_rx: ResultReceiver<AsyncRuntimeOf<C>, Option<Snapshot<C>>>,
) -> Self {
pub(crate) fn new_snapshot(request_id: Option<u64>, snapshot_rx: ResultReceiver<C, Option<Snapshot<C>>>) -> Self {
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
}

Expand Down

0 comments on commit 0e93e14

Please sign in to comment.