From 590d9438c51d7d0e5ff6be2a681a60ccfd1608f2 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Fri, 26 Jul 2024 17:58:12 +0800 Subject: [PATCH] Refactor: replace tokio oneshot with the one configured --- openraft/src/core/tick.rs | 10 ++++++---- openraft/src/replication/mod.rs | 9 +++++---- openraft/src/storage/callback.rs | 8 ++++---- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 9fa7c9c79..4d25258e0 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -7,7 +7,6 @@ use std::sync::Mutex; use std::time::Duration; use futures::future::Either; -use tokio::sync::oneshot; use tracing::Instrument; use tracing::Level; use tracing::Span; @@ -16,6 +15,9 @@ use crate::async_runtime::MpscUnboundedSender; use crate::core::notification::Notification; use crate::type_config::alias::JoinHandleOf; use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::async_runtime::oneshot::OneshotSender; use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; @@ -35,7 +37,7 @@ pub(crate) struct TickHandle where C: RaftTypeConfig { enabled: Arc, - shutdown: Mutex>>, + shutdown: Mutex>>, join_handle: Mutex>>, } @@ -66,7 +68,7 @@ where C: RaftTypeConfig tx, }; - let (shutdown, shutdown_rx) = oneshot::channel(); + let (shutdown, shutdown_rx) = C::oneshot(); let shutdown = Mutex::new(Some(shutdown)); @@ -83,7 +85,7 @@ where C: RaftTypeConfig } } - pub(crate) async fn tick_loop(self, mut cancel_rx: oneshot::Receiver<()>) { + pub(crate) async fn tick_loop(self, mut cancel_rx: OneshotReceiverOf) { let mut i = 0; let mut cancel = std::pin::pin!(cancel_rx); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index f389be082..9fe857d0b 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -19,7 +19,6 @@ use request::Replicate; use response::ReplicationResult; pub(crate) use response::Response; use tokio::select; -use tokio::sync::oneshot; use tokio::sync::Mutex; use tracing_futures::Instrument; @@ -57,6 +56,8 @@ use crate::type_config::alias::LogIdOf; use crate::type_config::alias::MpscUnboundedReceiverOf; use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::alias::MpscUnboundedWeakSenderOf; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::OneshotSenderOf; use crate::type_config::TypeConfigExt; use crate::LogId; use crate::RaftLogId; @@ -120,7 +121,7 @@ where /// It includes a cancel signaler and the join handle of the snapshot replication task. /// When ReplicationCore is dropped, this Sender is dropped, the snapshot task will be notified /// to quit. - snapshot_state: Option<(oneshot::Sender<()>, JoinHandleOf)>, + snapshot_state: Option<(OneshotSenderOf, JoinHandleOf)>, /// The backoff policy if an [`Unreachable`](`crate::error::Unreachable`) error is returned. /// It will be reset to `None` when an successful response is received. @@ -731,7 +732,7 @@ where let mut option = RPCOption::new(self.config.install_snapshot_timeout()); option.snapshot_chunk_size = Some(self.config.snapshot_max_chunk_size as usize); - let (tx_cancel, rx_cancel) = oneshot::channel(); + let (tx_cancel, rx_cancel) = C::oneshot(); let jh = C::spawn(Self::send_snapshot( request_id, @@ -757,7 +758,7 @@ where vote: Vote, snapshot: Snapshot, option: RPCOption, - cancel: oneshot::Receiver<()>, + cancel: OneshotReceiverOf, weak_tx: MpscUnboundedWeakSenderOf>, ) { let meta = snapshot.meta.clone(); diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index f7ef6c39e..5deca6a25 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -2,12 +2,12 @@ use std::io; -use tokio::sync::oneshot; - use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::MpscUnboundedWeakSender; use crate::core::notification::Notification; use crate::type_config::alias::MpscUnboundedWeakSenderOf; +use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::async_runtime::oneshot::OneshotSender; use crate::ErrorSubject; use crate::ErrorVerb; use crate::LogId; @@ -101,7 +101,7 @@ pub struct LogApplied where C: RaftTypeConfig { last_log_id: LogId, - tx: oneshot::Sender, Vec), StorageError>>, + tx: OneshotSenderOf, Vec), StorageError>>, } impl LogApplied @@ -110,7 +110,7 @@ where C: RaftTypeConfig #[allow(dead_code)] pub(crate) fn new( last_log_id: LogId, - tx: oneshot::Sender, Vec), StorageError>>, + tx: OneshotSenderOf, Vec), StorageError>>, ) -> Self { Self { last_log_id, tx } }