Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: expose AsyncRuntime Oneshot via trait #1200

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions openraft/src/raft/responder/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ where C: RaftTypeConfig
impl<C> OneshotResponder<C>
where C: RaftTypeConfig
{
/// Create a new instance from a [`AsyncRuntime::OneshotSender`].
/// Create a new instance from a [`AsyncRuntime::Oneshot::Sender`].
///
/// [`AsyncRuntime::OneshotSender`]: `crate::async_runtime::AsyncRuntime::OneshotSender`
/// [`AsyncRuntime::Oneshot::Sender`]: `crate::async_runtime::Oneshot::Sender`
pub fn new(tx: OneshotSenderOf<C, ClientWriteResult<C>>) -> Self {
Self { tx }
}
Expand Down
10 changes: 7 additions & 3 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub trait RaftTypeConfig:
pub mod alias {
use crate::async_runtime::watch;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
use crate::raft::responder::Responder;
use crate::type_config::AsyncRuntime;
use crate::RaftTypeConfig;
Expand All @@ -118,9 +119,12 @@ pub mod alias {
pub type InstantOf<C> = <Rt<C> as AsyncRuntime>::Instant;
pub type TimeoutErrorOf<C> = <Rt<C> as AsyncRuntime>::TimeoutError;
pub type TimeoutOf<C, R, F> = <Rt<C> as AsyncRuntime>::Timeout<R, F>;
pub type OneshotSenderOf<C, T> = <Rt<C> as AsyncRuntime>::OneshotSender<T>;
pub type OneshotReceiverErrorOf<C> = <Rt<C> as AsyncRuntime>::OneshotReceiverError;
pub type OneshotReceiverOf<C, T> = <Rt<C> as AsyncRuntime>::OneshotReceiver<T>;

pub type OneshotOf<C> = <Rt<C> as AsyncRuntime>::Oneshot;
pub type OneshotSenderOf<C, T> = <OneshotOf<C> as Oneshot>::Sender<T>;
pub type OneshotReceiverErrorOf<C> = <OneshotOf<C> as Oneshot>::ReceiverError;
pub type OneshotReceiverOf<C, T> = <OneshotOf<C> as Oneshot>::Receiver<T>;

pub type MpscUnboundedOf<C> = <Rt<C> as AsyncRuntime>::MpscUnbounded;

type Mpsc<C> = MpscUnboundedOf<C>;
Expand Down
43 changes: 26 additions & 17 deletions openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::watch as tokio_watch;

use crate::async_runtime::mpsc_unbounded;
use crate::async_runtime::mpsc_unbounded::MpscUnbounded;
use crate::async_runtime::oneshot;
use crate::async_runtime::watch;
use crate::type_config::OneshotSender;
use crate::AsyncRuntime;
Expand All @@ -25,9 +26,6 @@ impl AsyncRuntime for TokioRuntime {
type TimeoutError = tokio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + OptionalSend> = tokio::time::Timeout<T>;
type ThreadLocalRng = rand::rngs::ThreadRng;
type OneshotSender<T: OptionalSend> = tokio::sync::oneshot::Sender<T>;
type OneshotReceiver<T: OptionalSend> = tokio::sync::oneshot::Receiver<T>;
type OneshotReceiverError = tokio::sync::oneshot::error::RecvError;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
Expand Down Expand Up @@ -75,22 +73,9 @@ impl AsyncRuntime for TokioRuntime {
rand::thread_rng()
}

#[inline]
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend {
let (tx, rx) = tokio::sync::oneshot::channel();
(tx, rx)
}

type MpscUnbounded = TokioMpscUnbounded;
type Watch = TokioWatch;
}

impl<T> OneshotSender<T> for tokio::sync::oneshot::Sender<T> {
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
type Oneshot = TokioOneshot;
}

pub struct TokioMpscUnbounded;
Expand Down Expand Up @@ -188,3 +173,27 @@ where T: OptionalSend + OptionalSync
self.borrow()
}
}

pub struct TokioOneshot;

impl oneshot::Oneshot for TokioOneshot {
type Sender<T: OptionalSend> = tokio::sync::oneshot::Sender<T>;
type Receiver<T: OptionalSend> = tokio::sync::oneshot::Receiver<T>;
type ReceiverError = tokio::sync::oneshot::error::RecvError;

#[inline]
fn channel<T>() -> (Self::Sender<T>, Self::Receiver<T>)
where T: OptionalSend {
let (tx, rx) = tokio::sync::oneshot::channel();
(tx, rx)
}
}

impl<T> OneshotSender<T> for tokio::sync::oneshot::Sender<T>
where T: OptionalSend
{
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
}
25 changes: 3 additions & 22 deletions openraft/src/type_config/async_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use mpsc_unbounded::MpscUnboundedSender;
pub use mpsc_unbounded::MpscUnboundedWeakSender;
pub use mpsc_unbounded::SendError;
pub use mpsc_unbounded::TryRecvError;
pub use oneshot::Oneshot;
pub use oneshot::OneshotSender;
pub use watch::Watch;

Expand Down Expand Up @@ -66,18 +67,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
/// Type of a thread-local random number generator.
type ThreadLocalRng: rand::Rng;

/// Type of a `oneshot` sender.
type OneshotSender<T: OptionalSend>: OneshotSender<T> + OptionalSend + OptionalSync + Sized;

/// Type of a `oneshot` receiver error.
type OneshotReceiverError: std::error::Error + OptionalSend;

/// Type of a `oneshot` receiver.
type OneshotReceiver<T: OptionalSend>: OptionalSend
+ OptionalSync
+ Future<Output = Result<T, Self::OneshotReceiverError>>
+ Unpin;

/// Spawn a new task.
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
Expand Down Expand Up @@ -107,17 +96,9 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
/// sent to another thread.
fn thread_rng() -> Self::ThreadLocalRng;

/// Creates a new one-shot channel for sending single values.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
/// handle is used by the producer to send the value. The `Receiver` handle is
/// used by the consumer to receive the value.
///
/// Each handle can be used on separate tasks.
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend;

type MpscUnbounded: MpscUnbounded;

type Watch: Watch;

type Oneshot: Oneshot;
}
31 changes: 30 additions & 1 deletion openraft/src/type_config/async_runtime/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
pub trait OneshotSender<T> {
use std::future::Future;

use crate::OptionalSend;
use crate::OptionalSync;

pub trait Oneshot {
/// Type of a `oneshot` sender.
type Sender<T: OptionalSend>: OneshotSender<T>;
/// Type of a `oneshot` receiver.
type Receiver<T: OptionalSend>: OptionalSend
+ OptionalSync
+ Future<Output = Result<T, Self::ReceiverError>>
+ Unpin;
/// Type of a `oneshot` receiver error.
type ReceiverError: std::error::Error + OptionalSend;

/// Creates a new one-shot channel for sending single values.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
/// handle is used by the producer to send the value. The `Receiver` handle is
/// used by the consumer to receive the value.
///
/// Each handle can be used on separate tasks.
fn channel<T>() -> (Self::Sender<T>, Self::Receiver<T>)
where T: OptionalSend;
}

pub trait OneshotSender<T>: OptionalSend + OptionalSync + Sized
where T: OptionalSend
{
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/type_config/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use openraft_macros::since;

use crate::async_runtime::watch::Watch;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SleepOf;
Expand Down Expand Up @@ -62,10 +64,10 @@ pub trait TypeConfigExt: RaftTypeConfig {
/// Creates a new one-shot channel for sending single values.
///
/// This is just a wrapper of
/// [`AsyncRuntime::oneshot`](`crate::async_runtime::AsyncRuntime::oneshot`).
/// [`AsyncRuntime::Oneshot::channel()`](`crate::async_runtime::Oneshot::channel`).
fn oneshot<T>() -> (OneshotSenderOf<Self, T>, OneshotReceiverOf<Self, T>)
where T: OptionalSend {
AsyncRuntimeOf::<Self>::oneshot()
OneshotOf::<Self>::channel()
}

/// Creates an unbounded mpsc channel for communicating between asynchronous
Expand Down
Loading