diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 629126cac..322c88886 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -438,7 +438,7 @@ where &mut self, changes: ChangeMembers, retain: bool, - tx: ResultSender, ClientWriteResponse, ClientWriteError>, + tx: ResultSender, ClientWriteError>, ) { let res = self.engine.state.membership_state.change_handler().apply(changes, retain); let new_membership = match res { @@ -599,7 +599,7 @@ where pub(crate) fn handle_initialize( &mut self, member_nodes: BTreeMap, - tx: ResultSender, (), InitializeError>, + tx: ResultSender>, ) { tracing::debug!(member_nodes = debug(&member_nodes), "{}", func_name!()); @@ -624,7 +624,7 @@ where #[tracing::instrument(level = "trace", skip(self, tx))] pub(crate) fn reject_with_forward_to_leader( &self, - tx: ResultSender, T, E>, + tx: ResultSender, ) where E: From>, { @@ -1083,7 +1083,7 @@ where pub(super) fn handle_vote_request( &mut self, req: VoteRequest, - tx: VoteTx, C::NodeId>, + tx: VoteTx, ) { tracing::info!(req = display(req.summary()), func = func_name!()); @@ -1098,7 +1098,7 @@ where pub(super) fn handle_append_entries_request( &mut self, req: AppendEntriesRequest, - tx: AppendEntriesTx, C::NodeId>, + tx: AppendEntriesTx, ) { tracing::debug!(req = display(req.summary()), func = func_name!()); diff --git a/openraft/src/core/raft_msg/external_command.rs b/openraft/src/core/raft_msg/external_command.rs index 666d5bfd2..5714df39c 100644 --- a/openraft/src/core/raft_msg/external_command.rs +++ b/openraft/src/core/raft_msg/external_command.rs @@ -3,7 +3,6 @@ use std::fmt; use crate::core::raft_msg::ResultSender; -use crate::type_config::alias::AsyncRuntimeOf; use crate::RaftTypeConfig; use crate::Snapshot; @@ -24,9 +23,7 @@ pub(crate) enum ExternalCommand { Snapshot, /// Get a snapshot from the state machine, send back via a oneshot::Sender. - GetSnapshot { - tx: ResultSender, Option>>, - }, + GetSnapshot { tx: ResultSender>> }, /// Purge logs covered by a snapshot up to a specified index. /// diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 39871d134..8c0f2a6df 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -28,26 +28,23 @@ use crate::Vote; pub(crate) mod external_command; /// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`. -pub(crate) type ResultSender = ::OneshotSender>; +pub(crate) type ResultSender = as AsyncRuntime>::OneshotSender>; -pub(crate) type ResultReceiver = ::OneshotReceiver>; +pub(crate) type ResultReceiver = + as AsyncRuntime>::OneshotReceiver>; /// TX for Vote Response -pub(crate) type VoteTx = ResultSender>; +pub(crate) type VoteTx = ResultSender>>; /// TX for Append Entries Response -pub(crate) type AppendEntriesTx = ResultSender>; +pub(crate) type AppendEntriesTx = ResultSender>>; /// TX for Client Write Response -pub(crate) type ClientWriteTx = - ResultSender, ClientWriteResponse, ClientWriteError, NodeOf>>; +pub(crate) type ClientWriteTx = ResultSender, ClientWriteError, NodeOf>>; /// TX for Linearizable Read Response -pub(crate) type ClientReadTx = ResultSender< - AsyncRuntimeOf, - (Option>, Option>), - CheckIsLeaderError, NodeOf>, ->; +pub(crate) type ClientReadTx = + ResultSender>, Option>), CheckIsLeaderError, NodeOf>>; /// A message sent by application to the [`RaftCore`]. /// @@ -57,18 +54,18 @@ where C: RaftTypeConfig { AppendEntries { rpc: AppendEntriesRequest, - tx: AppendEntriesTx, C::NodeId>, + tx: AppendEntriesTx, }, RequestVote { rpc: VoteRequest, - tx: VoteTx, C::NodeId>, + tx: VoteTx, }, InstallFullSnapshot { vote: Vote, snapshot: Snapshot, - tx: ResultSender, SnapshotResponse>, + tx: ResultSender>, }, /// Begin receiving a snapshot from the leader. @@ -78,7 +75,7 @@ where C: RaftTypeConfig /// will be returned in a Err BeginReceivingSnapshot { vote: Vote, - tx: ResultSender, Box>, HigherVote>, + tx: ResultSender>, HigherVote>, }, ClientWriteRequest { @@ -92,7 +89,7 @@ where C: RaftTypeConfig Initialize { members: BTreeMap, - tx: ResultSender, (), InitializeError>, + tx: ResultSender>, }, ChangeMembership { @@ -102,7 +99,7 @@ where C: RaftTypeConfig /// config will be converted into learners, otherwise they will be removed. retain: bool, - tx: ResultSender, ClientWriteResponse, ClientWriteError>, + tx: ResultSender, ClientWriteError>, }, ExternalCoreRequest { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 5930322cf..11ddcf8bc 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -54,14 +54,12 @@ where C: RaftTypeConfig Command::new(payload) } - pub(crate) fn get_snapshot(tx: ResultSender>>) -> Self { + pub(crate) fn get_snapshot(tx: ResultSender>>) -> Self { let payload = CommandPayload::GetSnapshot { tx }; Command::new(payload) } - pub(crate) fn begin_receiving_snapshot( - tx: ResultSender>, HigherVote>, - ) -> Self { + pub(crate) fn begin_receiving_snapshot(tx: ResultSender>, HigherVote>) -> Self { let payload = CommandPayload::BeginReceivingSnapshot { tx }; Command::new(payload) } @@ -93,11 +91,11 @@ where C: RaftTypeConfig /// Get the latest built snapshot. GetSnapshot { - tx: ResultSender>>, + tx: ResultSender>>, }, BeginReceivingSnapshot { - tx: ResultSender>, HigherVote>, + tx: ResultSender>, HigherVote>, }, InstallFullSnapshot { diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index 78692202b..736026e9b 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -12,7 +12,6 @@ use crate::core::ApplyingEntry; use crate::entry::RaftPayload; use crate::storage::RaftStateMachine; use crate::summary::MessageSummary; -use crate::type_config::alias::AsyncRuntimeOf; use crate::AsyncRuntime; use crate::RaftLogId; use crate::RaftSnapshotBuilder; @@ -223,7 +222,7 @@ where #[tracing::instrument(level = "info", skip_all)] async fn get_snapshot( &mut self, - tx: ResultSender, Option>>, + tx: ResultSender>>, ) -> Result<(), StorageError> { tracing::info!("{}", func_name!()); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index e79f26993..2cf2074b1 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -39,7 +39,6 @@ use crate::raft::VoteResponse; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; use crate::summary::MessageSummary; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::SnapshotDataOf; use crate::AsyncRuntime; use crate::Instant; @@ -225,8 +224,8 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn get_leader_handler_or_reject( &mut self, - tx: Option>, - ) -> Option<(LeaderHandler, Option>)> + tx: Option>, + ) -> Option<(LeaderHandler, Option>)> where T: OptionalSend, E: OptionalSend, @@ -396,7 +395,7 @@ where C: RaftTypeConfig vote: &Vote, prev_log_id: Option>, entries: Vec, - tx: Option, C::NodeId>>, + tx: Option>, ) -> bool { tracing::debug!( vote = display(vote), @@ -459,7 +458,7 @@ where C: RaftTypeConfig &mut self, vote: Vote, snapshot: Snapshot, - tx: ResultSender>, + tx: ResultSender>, ) { tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!()); @@ -492,7 +491,7 @@ where C: RaftTypeConfig pub(crate) fn handle_begin_receiving_snapshot( &mut self, vote: Vote, - tx: ResultSender>, HigherVote>, + tx: ResultSender>, HigherVote>, ) { tracing::info!(vote = display(vote), "{}", func_name!()); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 8a3ffe87d..b43dca8a8 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -11,7 +11,6 @@ use crate::error::RejectVoteRequest; use crate::internal_server_state::InternalServerState; use crate::leader::Leading; use crate::raft_state::LogStateReader; -use crate::type_config::alias::AsyncRuntimeOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::Instant; @@ -52,9 +51,9 @@ where C: RaftTypeConfig pub(crate) fn accept_vote( &mut self, vote: &Vote, - tx: ResultSender, T, E>, + tx: ResultSender, f: F, - ) -> Option, T, E>> + ) -> Option> where T: Debug + Eq + OptionalSend, E: Debug + Eq + OptionalSend, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index a02e00b33..d716b5c93 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -733,7 +733,7 @@ where #[tracing::instrument(level = "info", skip_all)] async fn stream_snapshot( &mut self, - snapshot_rx: DataWithId, Option>>>, + snapshot_rx: DataWithId>>>, ) -> Result>, ReplicationError> { let request_id = snapshot_rx.request_id(); let rx = snapshot_rx.into_data(); diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index c145627a9..ee2ad6d07 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -22,10 +22,7 @@ where C: RaftTypeConfig Self::Data(Data::new_logs(id, log_id_range)) } - pub(crate) fn snapshot( - id: Option, - snapshot_rx: ResultReceiver, Option>>, - ) -> Self { + pub(crate) fn snapshot(id: Option, snapshot_rx: ResultReceiver>>) -> Self { Self::Data(Data::new_snapshot(id, snapshot_rx)) } @@ -59,7 +56,6 @@ use crate::error::StreamingError; use crate::log_id_range::LogIdRange; use crate::raft::SnapshotResponse; use crate::replication::callbacks::SnapshotCallback; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::LogId; use crate::MessageSummary; @@ -77,7 +73,7 @@ where C: RaftTypeConfig { Heartbeat, Logs(DataWithId>), - Snapshot(DataWithId, Option>>>), + Snapshot(DataWithId>>>), SnapshotCallback(DataWithId>), } @@ -152,10 +148,7 @@ where C: RaftTypeConfig Self::Logs(DataWithId::new(request_id, log_id_range)) } - pub(crate) fn new_snapshot( - request_id: Option, - snapshot_rx: ResultReceiver, Option>>, - ) -> Self { + pub(crate) fn new_snapshot(request_id: Option, snapshot_rx: ResultReceiver>>) -> Self { Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) }