Skip to content

Commit

Permalink
Refactor: expose AsyncRuntime Oneshot via trait
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC authored and drmingdrmer committed Jul 25, 2024
1 parent ba8bb67 commit 8b789e3
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 47 deletions.
4 changes: 2 additions & 2 deletions openraft/src/raft/responder/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ where C: RaftTypeConfig
impl<C> OneshotResponder<C>
where C: RaftTypeConfig
{
/// Create a new instance from a [`AsyncRuntime::OneshotSender`].
/// Create a new instance from a [`AsyncRuntime::Oneshot::Sender`].
///
/// [`AsyncRuntime::OneshotSender`]: `crate::async_runtime::AsyncRuntime::OneshotSender`
/// [`AsyncRuntime::Oneshot::Sender`]: `crate::async_runtime::Oneshot::Sender`
pub fn new(tx: OneshotSenderOf<C, ClientWriteResult<C>>) -> Self {
Self { tx }
}
Expand Down
10 changes: 7 additions & 3 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub trait RaftTypeConfig:
pub mod alias {
use crate::async_runtime::watch;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
use crate::raft::responder::Responder;
use crate::type_config::AsyncRuntime;
use crate::RaftTypeConfig;
Expand All @@ -118,9 +119,12 @@ pub mod alias {
pub type InstantOf<C> = <Rt<C> as AsyncRuntime>::Instant;
pub type TimeoutErrorOf<C> = <Rt<C> as AsyncRuntime>::TimeoutError;
pub type TimeoutOf<C, R, F> = <Rt<C> as AsyncRuntime>::Timeout<R, F>;
pub type OneshotSenderOf<C, T> = <Rt<C> as AsyncRuntime>::OneshotSender<T>;
pub type OneshotReceiverErrorOf<C> = <Rt<C> as AsyncRuntime>::OneshotReceiverError;
pub type OneshotReceiverOf<C, T> = <Rt<C> as AsyncRuntime>::OneshotReceiver<T>;

pub type OneshotOf<C> = <Rt<C> as AsyncRuntime>::Oneshot;
pub type OneshotSenderOf<C, T> = <OneshotOf<C> as Oneshot>::Sender<T>;
pub type OneshotReceiverErrorOf<C> = <OneshotOf<C> as Oneshot>::ReceiverError;
pub type OneshotReceiverOf<C, T> = <OneshotOf<C> as Oneshot>::Receiver<T>;

pub type MpscUnboundedOf<C> = <Rt<C> as AsyncRuntime>::MpscUnbounded;

type Mpsc<C> = MpscUnboundedOf<C>;
Expand Down
43 changes: 26 additions & 17 deletions openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::watch as tokio_watch;

use crate::async_runtime::mpsc_unbounded;
use crate::async_runtime::mpsc_unbounded::MpscUnbounded;
use crate::async_runtime::oneshot;
use crate::async_runtime::watch;
use crate::type_config::OneshotSender;
use crate::AsyncRuntime;
Expand All @@ -25,9 +26,6 @@ impl AsyncRuntime for TokioRuntime {
type TimeoutError = tokio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + OptionalSend> = tokio::time::Timeout<T>;
type ThreadLocalRng = rand::rngs::ThreadRng;
type OneshotSender<T: OptionalSend> = tokio::sync::oneshot::Sender<T>;
type OneshotReceiver<T: OptionalSend> = tokio::sync::oneshot::Receiver<T>;
type OneshotReceiverError = tokio::sync::oneshot::error::RecvError;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
Expand Down Expand Up @@ -75,22 +73,9 @@ impl AsyncRuntime for TokioRuntime {
rand::thread_rng()
}

#[inline]
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend {
let (tx, rx) = tokio::sync::oneshot::channel();
(tx, rx)
}

type MpscUnbounded = TokioMpscUnbounded;
type Watch = TokioWatch;
}

impl<T> OneshotSender<T> for tokio::sync::oneshot::Sender<T> {
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
type Oneshot = TokioOneshot;
}

pub struct TokioMpscUnbounded;
Expand Down Expand Up @@ -188,3 +173,27 @@ where T: OptionalSend + OptionalSync
self.borrow()
}
}

pub struct TokioOneshot;

impl oneshot::Oneshot for TokioOneshot {
type Sender<T: OptionalSend> = tokio::sync::oneshot::Sender<T>;
type Receiver<T: OptionalSend> = tokio::sync::oneshot::Receiver<T>;
type ReceiverError = tokio::sync::oneshot::error::RecvError;

#[inline]
fn channel<T>() -> (Self::Sender<T>, Self::Receiver<T>)
where T: OptionalSend {
let (tx, rx) = tokio::sync::oneshot::channel();
(tx, rx)
}
}

impl<T> OneshotSender<T> for tokio::sync::oneshot::Sender<T>
where T: OptionalSend
{
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
}
25 changes: 3 additions & 22 deletions openraft/src/type_config/async_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use mpsc_unbounded::MpscUnboundedSender;
pub use mpsc_unbounded::MpscUnboundedWeakSender;
pub use mpsc_unbounded::SendError;
pub use mpsc_unbounded::TryRecvError;
pub use oneshot::Oneshot;
pub use oneshot::OneshotSender;
pub use watch::Watch;

Expand Down Expand Up @@ -66,18 +67,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
/// Type of a thread-local random number generator.
type ThreadLocalRng: rand::Rng;

/// Type of a `oneshot` sender.
type OneshotSender<T: OptionalSend>: OneshotSender<T> + OptionalSend + OptionalSync + Sized;

/// Type of a `oneshot` receiver error.
type OneshotReceiverError: std::error::Error + OptionalSend;

/// Type of a `oneshot` receiver.
type OneshotReceiver<T: OptionalSend>: OptionalSend
+ OptionalSync
+ Future<Output = Result<T, Self::OneshotReceiverError>>
+ Unpin;

/// Spawn a new task.
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
Expand Down Expand Up @@ -107,17 +96,9 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
/// sent to another thread.
fn thread_rng() -> Self::ThreadLocalRng;

/// Creates a new one-shot channel for sending single values.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
/// handle is used by the producer to send the value. The `Receiver` handle is
/// used by the consumer to receive the value.
///
/// Each handle can be used on separate tasks.
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend;

type MpscUnbounded: MpscUnbounded;

type Watch: Watch;

type Oneshot: Oneshot;
}
31 changes: 30 additions & 1 deletion openraft/src/type_config/async_runtime/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
pub trait OneshotSender<T> {
use std::future::Future;

use crate::OptionalSend;
use crate::OptionalSync;

pub trait Oneshot {
/// Type of a `oneshot` sender.
type Sender<T: OptionalSend>: OneshotSender<T>;
/// Type of a `oneshot` receiver.
type Receiver<T: OptionalSend>: OptionalSend
+ OptionalSync
+ Future<Output = Result<T, Self::ReceiverError>>
+ Unpin;
/// Type of a `oneshot` receiver error.
type ReceiverError: std::error::Error + OptionalSend;

/// Creates a new one-shot channel for sending single values.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
/// handle is used by the producer to send the value. The `Receiver` handle is
/// used by the consumer to receive the value.
///
/// Each handle can be used on separate tasks.
fn channel<T>() -> (Self::Sender<T>, Self::Receiver<T>)
where T: OptionalSend;
}

pub trait OneshotSender<T>: OptionalSend + OptionalSync + Sized
where T: OptionalSend
{
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/type_config/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use openraft_macros::since;

use crate::async_runtime::watch::Watch;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SleepOf;
Expand Down Expand Up @@ -62,10 +64,10 @@ pub trait TypeConfigExt: RaftTypeConfig {
/// Creates a new one-shot channel for sending single values.
///
/// This is just a wrapper of
/// [`AsyncRuntime::oneshot`](`crate::async_runtime::AsyncRuntime::oneshot`).
/// [`AsyncRuntime::Oneshot::channel()`](`crate::async_runtime::Oneshot::channel`).
fn oneshot<T>() -> (OneshotSenderOf<Self, T>, OneshotReceiverOf<Self, T>)
where T: OptionalSend {
AsyncRuntimeOf::<Self>::oneshot()
OneshotOf::<Self>::channel()
}

/// Creates an unbounded mpsc channel for communicating between asynchronous
Expand Down

0 comments on commit 8b789e3

Please sign in to comment.