Skip to content

Commit

Permalink
Addresses are sinks take 2 (#81)
Browse files Browse the repository at this point in the history
* Addresses are sinks POC (based on flume changes)

* Add a debug implementation for Address (#67)

* Add a debug implementation for Address

* fmt

* Ensure debug works for messagechannel & show RC info

* format

* Add test

* Remove refcounter type from struct name

* Ensure debug works for messagechannel & show RC info

* Fix rebase issue

* Turbofish over type annotation (@thomaseizinger)

Thanks to @thomaseizinger!

Co-authored-by: Thomas Eizinger <[email protected]>

* Remove unnecessary locking

* rustfmt

* Try fix MSRV issue

* Revert (original change red herring)

Technically this is not an MSRV issue, this is an actions + not commiting Cargo.lock issue, since having async_global_executor on 2021 should be fine as long as your compiler is also on 2021, and xtra won't specifically bring in the 2021 edition as it.

Co-authored-by: Thomas Eizinger <[email protected]>

* Minimal 2021 edition update, MSRV -> 1.56, GHA update (#80)

* Minimal ed2021 update, and MSRV -> 1.56

* Set MSRV in Cargo.toml

* Use minimal versions for all CI runs

* typo

* Update minimum dep for basic_wasm_bindgen

console_error_panic_hook < 0.1.5 was nightly only. This change was needed to make CI with minimum versions pass. Really, it's a little unnecessary for an example to have an MSRV, but to make this CI work with the fewest modifications it's the easiest option.

* Update to HEAD of flume fork

* Sink for Handler<Return = ()> only

* Add example showcasing usage of `Address` as `Sink`

* Add example for sending on an interval based on stream

* Add a debug implementation for Address (#67)

* Add a debug implementation for Address

* fmt

* Ensure debug works for messagechannel & show RC info

* format

* Add test

* Remove refcounter type from struct name

* Ensure debug works for messagechannel & show RC info

* Fix rebase issue

* Turbofish over type annotation (@thomaseizinger)

Thanks to @thomaseizinger!

Co-authored-by: Thomas Eizinger <[email protected]>

* Remove unnecessary locking

* rustfmt

* Try fix MSRV issue

* Revert (original change red herring)

Technically this is not an MSRV issue, this is an actions + not commiting Cargo.lock issue, since having async_global_executor on 2021 should be fine as long as your compiler is also on 2021, and xtra won't specifically bring in the 2021 edition as it.

Co-authored-by: Thomas Eizinger <[email protected]>

* Addresses are sinks POC (based on flume changes)

* Fmt

* update examples (Address is no longer Sync)

* Fmt

* 1.56 compliance (for CI)

* Update flume to crates.io

Co-authored-by: Thomas Eizinger <[email protected]>
  • Loading branch information
Restioson and thomaseizinger authored Jun 10, 2022
1 parent b7049c3 commit 3e624f5
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 289 deletions.
4 changes: 4 additions & 0 deletions BREAKING-CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
## 0.6.0

- Sealed `RefCounter`, `MessageChannel`, and `MessageSink` traits
- `AddressSink` no longer exists - `Address` now implements `Sink` directly for any handlers returning `()`.
- `Message` no longer exists - `Return` is now specified on the `Handler` trait itself.
- `Address` is no longer `Sync`, due to it implementing `AddressSink`. You should just be able to clone it and then send
it, though.

## 0.5.0

Expand Down
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ rust-version = "1.56.0"
async-trait = "0.1.36"
barrage = "0.2.1"
catty = "0.1.4"
flume = { version = "0.10.9", default-features = false, features = ["async"] }
flume = { version = "0.10.13", default-features = false, features = ["async"] }
futures-core = { version = "0.3.5", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false, features = ["sink", "alloc"] }
Expand Down Expand Up @@ -94,6 +94,14 @@ required-features = ["with-smol-1"]
name = "backpressure"
required-features = ["with-tokio-1"]

[[example]]
name = "address_is_sink"
required-features = ["with-tokio-1", "tokio/full"]

[[example]]
name = "send_interval"
required-features = ["with-tokio-1", "tokio/full"]

[[test]]
name = "basic"
required-features = ["with-tokio-1"]
Expand Down
55 changes: 55 additions & 0 deletions examples/address_is_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use futures_util::stream::repeat;
use futures_util::StreamExt;
use xtra::prelude::*;
use xtra::spawn::Tokio;

#[derive(Default)]
struct Accumulator {
sum: u32,
}

#[async_trait]
impl Actor for Accumulator {
type Stop = ();

async fn stopped(self) -> Self::Stop {}
}

struct Add(u32);

struct GetSum;

#[async_trait]
impl Handler<Add> for Accumulator {
type Return = ();

async fn handle(&mut self, Add(number): Add, _ctx: &mut Context<Self>) {
self.sum += number;
}
}

#[async_trait]
impl Handler<GetSum> for Accumulator {
type Return = u32;

async fn handle(&mut self, _: GetSum, _ctx: &mut Context<Self>) -> Self::Return {
self.sum
}
}

#[tokio::main]
async fn main() {
let addr = Accumulator::default()
.create(None)
.spawn(&mut Tokio::Global);

repeat(10)
.take(4)
.map(|number| Ok(Add(number)))
.forward(addr.clone())
.await
.unwrap();

let sum = addr.send(GetSum).await.unwrap();
println!("Sum is {}!", sum);
}
8 changes: 4 additions & 4 deletions examples/interleaved_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ impl Handler<Hello> for ActorA {

async fn handle(&mut self, _: Hello, ctx: &mut Context<Self>) {
println!("ActorA: Hello");
ctx.handle_while(self, self.actor_b.send(Hello))
.await
.unwrap();
let fut = self.actor_b.send(Hello);
ctx.handle_while(self, fut).await.unwrap();
}
}

Expand All @@ -44,7 +43,8 @@ impl Handler<Initialized> for ActorB {
async fn handle(&mut self, m: Initialized, ctx: &mut Context<Self>) {
println!("ActorB: Initialized");
let actor_a = m.0;
ctx.handle_while(self, actor_a.send(Hello)).await.unwrap();
let send = actor_a.send(Hello);
ctx.handle_while(self, send).await.unwrap();
}
}

Expand Down
41 changes: 41 additions & 0 deletions examples/send_interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use futures_core::Stream;
use futures_util::stream::repeat;
use futures_util::StreamExt;
use std::time::Duration;
use xtra::prelude::*;
use xtra::spawn::Tokio;
use xtra::Disconnected;

#[derive(Default)]
struct Greeter;

#[async_trait]
impl Actor for Greeter {
type Stop = ();

async fn stopped(self) -> Self::Stop {}
}

struct Greet;

#[async_trait]
impl Handler<Greet> for Greeter {
type Return = ();

async fn handle(&mut self, _: Greet, _ctx: &mut Context<Self>) {
println!("Hello!");
}
}

#[tokio::main]
async fn main() {
let addr = Greeter::default().create(None).spawn(&mut Tokio::Global);

greeter_stream(500).forward(addr).await.unwrap();
}

fn greeter_stream(delay: u64) -> impl Stream<Item = Result<Greet, Disconnected>> {
repeat(Duration::from_millis(delay))
.then(tokio::time::sleep)
.map(|_| Ok(Greet))
}
73 changes: 51 additions & 22 deletions src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
use std::fmt::{self, Debug, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp::Ordering, error::Error, hash::Hash};

use flume::Sender;
use flume::r#async::SendSink;
use futures_core::Stream;
use futures_sink::Sink;
use futures_util::{future, StreamExt};

use crate::envelope::ReturningEnvelope;
use crate::envelope::{NonReturningEnvelope, ReturningEnvelope};
use crate::manager::AddressMessage;
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::send_future::{NameableSending, ResolveToHandlerReturn, SendFuture};
use crate::sink::AddressSink;
use crate::{Handler, KeepRunning};

/// The actor is no longer running and disconnected from the sending address. For why this could
Expand All @@ -39,8 +41,8 @@ impl Error for Disconnected {}
/// should be used instead. An address is created by calling the
/// [`Actor::create`](../trait.Actor.html#method.create) or
/// [`Context::run`](../struct.Context.html#method.run) methods, or by cloning another `Address`.
pub struct Address<A, Rc: RefCounter = Strong> {
pub(crate) sender: Sender<AddressMessage<A>>,
pub struct Address<A: 'static, Rc: RefCounter = Strong> {
pub(crate) sink: SendSink<'static, AddressMessage<A>>,
pub(crate) ref_counter: Rc,
}

Expand All @@ -65,7 +67,7 @@ impl<A> Address<A, Strong> {
/// addresses exist.
pub fn downgrade(&self) -> WeakAddress<A> {
WeakAddress {
sender: self.sender.clone(),
sink: self.sink.clone(),
ref_counter: self.ref_counter.downgrade(),
}
}
Expand All @@ -76,7 +78,7 @@ impl<A> Address<A, Either> {
/// Converts this address into a weak address.
pub fn downgrade(&self) -> WeakAddress<A> {
WeakAddress {
sender: self.sender.clone(),
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone().into_weak(),
}
}
Expand Down Expand Up @@ -112,17 +114,17 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
/// })
/// ```
pub fn is_connected(&self) -> bool {
!self.sender.is_disconnected()
!self.sink.is_disconnected()
}

/// Returns the number of messages in the actor's mailbox.
pub fn len(&self) -> usize {
self.sender.len()
self.sink.len()
}

/// The total capacity of the actor's mailbox.
pub fn capacity(&self) -> Option<usize> {
self.sender.capacity()
self.sink.capacity()
}

/// Returns whether the actor's mailbox is empty.
Expand All @@ -133,8 +135,8 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
/// Convert this address into a generic address which can be weak or strong.
pub fn as_either(&self) -> Address<A, Either> {
Address {
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone().into_either(),
sender: self.sender.clone(),
}
}

Expand All @@ -160,7 +162,8 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
if self.is_connected() {
let (envelope, rx) = ReturningEnvelope::<A, M, <A as Handler<M>>::Return>::new(message);
let tx = self
.sender
.sink
.sender()
.clone()
.into_send_async(AddressMessage::Message(Box::new(envelope)));

Expand Down Expand Up @@ -204,25 +207,51 @@ impl<A, Rc: RefCounter> Address<A, Rc> {
}
}

/// Converts this address into a [futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html).
pub fn into_sink(self) -> AddressSink<A, Rc> {
AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
}
}

/// Waits until this address becomes disconnected.
pub fn join(&self) -> impl Future<Output = ()> + Send + Unpin {
self.ref_counter.disconnect_notice()
}
}

impl<A, M, Rc> Sink<M> for Address<A, Rc>
where
A: Handler<M, Return = ()>,
M: Send + 'static,
Rc: RefCounter,
{
type Error = Disconnected;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_ready(cx)
.map_err(|_| Disconnected)
}

fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item)));
Pin::new(&mut self.sink)
.start_send(item)
.map_err(|_| Disconnected)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_flush(cx)
.map_err(|_| Disconnected)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_close(cx)
.map_err(|_| Disconnected)
}
}

// Required because #[derive] adds an A: Clone bound
impl<A, Rc: RefCounter> Clone for Address<A, Rc> {
fn clone(&self) -> Self {
Address {
sender: self.sender.clone(),
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone(),
}
}
Expand All @@ -234,7 +263,7 @@ impl<A, Rc: RefCounter> Drop for Address<A, Rc> {
// We should notify the ActorManager that there are no more strong Addresses and the actor
// should be stopped.
if self.ref_counter.is_last_strong() {
let _ = self.sender.send(AddressMessage::LastAddress);
let _ = self.sink.sender().send(AddressMessage::LastAddress);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<A: Actor> Context<A> {
let weak = strong.downgrade();

let addr = Address {
sender: sender.clone(),
sink: sender.clone().into_sink(),
ref_counter: strong,
};

Expand Down Expand Up @@ -136,7 +136,7 @@ impl<A: Actor> Context<A> {
/// Get an address to the current actor if there are still external addresses to the actor.
pub fn address(&self) -> Result<Address<A>, ActorShutdown> {
Ok(Address {
sender: self.sender.clone(),
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.upgrade().ok_or(ActorShutdown)?,
})
}
Expand Down
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod receiver;
/// influences whether the address will keep the actor alive for as long as it lives.
pub mod refcount;
mod send_future;
pub mod sink;
/// This module contains a trait to spawn actors, implemented for all major async runtimes by default.
pub mod spawn;
#[cfg(feature = "with-tracing-0_1")]
Expand Down Expand Up @@ -251,7 +250,6 @@ impl From<()> for KeepRunning {

mod private {
use crate::refcount::{Either, RefCounter, Strong, Weak};
use crate::sink::AddressSink;
use crate::{Actor, Address};

pub trait Sealed {}
Expand All @@ -260,5 +258,4 @@ mod private {
impl Sealed for Weak {}
impl Sealed for Either {}
impl<A: Actor, Rc: RefCounter> Sealed for Address<A, Rc> {}
impl<A: Actor, Rc: RefCounter> Sealed for AddressSink<A, Rc> {}
}
Loading

0 comments on commit 3e624f5

Please sign in to comment.