From 3e624f51d44eb13e6fa234b37608fbf176c939c5 Mon Sep 17 00:00:00 2001 From: Restioson Date: Fri, 10 Jun 2022 16:04:15 +0200 Subject: [PATCH] Addresses are sinks take 2 (#81) * Addresses are sinks POC (based on flume changes) * Add a debug implementation for Address (#67) * Add a debug implementation for Address * fmt * Ensure debug works for messagechannel & show RC info * format * Add test * Remove refcounter type from struct name * Ensure debug works for messagechannel & show RC info * Fix rebase issue * Turbofish over type annotation (@thomaseizinger) Thanks to @thomaseizinger! Co-authored-by: Thomas Eizinger * Remove unnecessary locking * rustfmt * Try fix MSRV issue * Revert (original change red herring) Technically this is not an MSRV issue, this is an actions + not commiting Cargo.lock issue, since having async_global_executor on 2021 should be fine as long as your compiler is also on 2021, and xtra won't specifically bring in the 2021 edition as it. Co-authored-by: Thomas Eizinger * Minimal 2021 edition update, MSRV -> 1.56, GHA update (#80) * Minimal ed2021 update, and MSRV -> 1.56 * Set MSRV in Cargo.toml * Use minimal versions for all CI runs * typo * Update minimum dep for basic_wasm_bindgen console_error_panic_hook < 0.1.5 was nightly only. This change was needed to make CI with minimum versions pass. Really, it's a little unnecessary for an example to have an MSRV, but to make this CI work with the fewest modifications it's the easiest option. * Update to HEAD of flume fork * Sink for Handler only * Add example showcasing usage of `Address` as `Sink` * Add example for sending on an interval based on stream * Add a debug implementation for Address (#67) * Add a debug implementation for Address * fmt * Ensure debug works for messagechannel & show RC info * format * Add test * Remove refcounter type from struct name * Ensure debug works for messagechannel & show RC info * Fix rebase issue * Turbofish over type annotation (@thomaseizinger) Thanks to @thomaseizinger! Co-authored-by: Thomas Eizinger * Remove unnecessary locking * rustfmt * Try fix MSRV issue * Revert (original change red herring) Technically this is not an MSRV issue, this is an actions + not commiting Cargo.lock issue, since having async_global_executor on 2021 should be fine as long as your compiler is also on 2021, and xtra won't specifically bring in the 2021 edition as it. Co-authored-by: Thomas Eizinger * Addresses are sinks POC (based on flume changes) * Fmt * update examples (Address is no longer Sync) * Fmt * 1.56 compliance (for CI) * Update flume to crates.io Co-authored-by: Thomas Eizinger --- BREAKING-CHANGES.md | 4 + Cargo.toml | 10 +- examples/address_is_sink.rs | 55 ++++++++ examples/interleaved_messages.rs | 8 +- examples/send_interval.rs | 41 ++++++ src/address.rs | 73 +++++++--- src/context.rs | 4 +- src/lib.rs | 3 - src/message_channel.rs | 39 +----- src/sink.rs | 221 ------------------------------- 10 files changed, 169 insertions(+), 289 deletions(-) create mode 100644 examples/address_is_sink.rs create mode 100644 examples/send_interval.rs delete mode 100644 src/sink.rs diff --git a/BREAKING-CHANGES.md b/BREAKING-CHANGES.md index 58f07ecb..59a446d7 100644 --- a/BREAKING-CHANGES.md +++ b/BREAKING-CHANGES.md @@ -8,6 +8,10 @@ ## 0.6.0 - Sealed `RefCounter`, `MessageChannel`, and `MessageSink` traits +- `AddressSink` no longer exists - `Address` now implements `Sink` directly for any handlers returning `()`. +- `Message` no longer exists - `Return` is now specified on the `Handler` trait itself. +- `Address` is no longer `Sync`, due to it implementing `AddressSink`. You should just be able to clone it and then send + it, though. ## 0.5.0 diff --git a/Cargo.toml b/Cargo.toml index e3be739a..860307ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ rust-version = "1.56.0" async-trait = "0.1.36" barrage = "0.2.1" catty = "0.1.4" -flume = { version = "0.10.9", default-features = false, features = ["async"] } +flume = { version = "0.10.13", default-features = false, features = ["async"] } futures-core = { version = "0.3.5", default-features = false, features = ["alloc"] } futures-sink = { version = "0.3.5", default-features = false } futures-util = { version = "0.3.5", default-features = false, features = ["sink", "alloc"] } @@ -94,6 +94,14 @@ required-features = ["with-smol-1"] name = "backpressure" required-features = ["with-tokio-1"] +[[example]] +name = "address_is_sink" +required-features = ["with-tokio-1", "tokio/full"] + +[[example]] +name = "send_interval" +required-features = ["with-tokio-1", "tokio/full"] + [[test]] name = "basic" required-features = ["with-tokio-1"] diff --git a/examples/address_is_sink.rs b/examples/address_is_sink.rs new file mode 100644 index 00000000..aed6c873 --- /dev/null +++ b/examples/address_is_sink.rs @@ -0,0 +1,55 @@ +use futures_util::stream::repeat; +use futures_util::StreamExt; +use xtra::prelude::*; +use xtra::spawn::Tokio; + +#[derive(Default)] +struct Accumulator { + sum: u32, +} + +#[async_trait] +impl Actor for Accumulator { + type Stop = (); + + async fn stopped(self) -> Self::Stop {} +} + +struct Add(u32); + +struct GetSum; + +#[async_trait] +impl Handler for Accumulator { + type Return = (); + + async fn handle(&mut self, Add(number): Add, _ctx: &mut Context) { + self.sum += number; + } +} + +#[async_trait] +impl Handler for Accumulator { + type Return = u32; + + async fn handle(&mut self, _: GetSum, _ctx: &mut Context) -> Self::Return { + self.sum + } +} + +#[tokio::main] +async fn main() { + let addr = Accumulator::default() + .create(None) + .spawn(&mut Tokio::Global); + + repeat(10) + .take(4) + .map(|number| Ok(Add(number))) + .forward(addr.clone()) + .await + .unwrap(); + + let sum = addr.send(GetSum).await.unwrap(); + println!("Sum is {}!", sum); +} diff --git a/examples/interleaved_messages.rs b/examples/interleaved_messages.rs index af754252..1fd41c9f 100644 --- a/examples/interleaved_messages.rs +++ b/examples/interleaved_messages.rs @@ -22,9 +22,8 @@ impl Handler for ActorA { async fn handle(&mut self, _: Hello, ctx: &mut Context) { println!("ActorA: Hello"); - ctx.handle_while(self, self.actor_b.send(Hello)) - .await - .unwrap(); + let fut = self.actor_b.send(Hello); + ctx.handle_while(self, fut).await.unwrap(); } } @@ -44,7 +43,8 @@ impl Handler for ActorB { async fn handle(&mut self, m: Initialized, ctx: &mut Context) { println!("ActorB: Initialized"); let actor_a = m.0; - ctx.handle_while(self, actor_a.send(Hello)).await.unwrap(); + let send = actor_a.send(Hello); + ctx.handle_while(self, send).await.unwrap(); } } diff --git a/examples/send_interval.rs b/examples/send_interval.rs new file mode 100644 index 00000000..e708b580 --- /dev/null +++ b/examples/send_interval.rs @@ -0,0 +1,41 @@ +use futures_core::Stream; +use futures_util::stream::repeat; +use futures_util::StreamExt; +use std::time::Duration; +use xtra::prelude::*; +use xtra::spawn::Tokio; +use xtra::Disconnected; + +#[derive(Default)] +struct Greeter; + +#[async_trait] +impl Actor for Greeter { + type Stop = (); + + async fn stopped(self) -> Self::Stop {} +} + +struct Greet; + +#[async_trait] +impl Handler for Greeter { + type Return = (); + + async fn handle(&mut self, _: Greet, _ctx: &mut Context) { + println!("Hello!"); + } +} + +#[tokio::main] +async fn main() { + let addr = Greeter::default().create(None).spawn(&mut Tokio::Global); + + greeter_stream(500).forward(addr).await.unwrap(); +} + +fn greeter_stream(delay: u64) -> impl Stream> { + repeat(Duration::from_millis(delay)) + .then(tokio::time::sleep) + .map(|_| Ok(Greet)) +} diff --git a/src/address.rs b/src/address.rs index 9435b1ab..6199f923 100644 --- a/src/address.rs +++ b/src/address.rs @@ -3,17 +3,19 @@ use std::fmt::{self, Debug, Display, Formatter}; use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{cmp::Ordering, error::Error, hash::Hash}; -use flume::Sender; +use flume::r#async::SendSink; use futures_core::Stream; +use futures_sink::Sink; use futures_util::{future, StreamExt}; -use crate::envelope::ReturningEnvelope; +use crate::envelope::{NonReturningEnvelope, ReturningEnvelope}; use crate::manager::AddressMessage; use crate::refcount::{Either, RefCounter, Strong, Weak}; use crate::send_future::{NameableSending, ResolveToHandlerReturn, SendFuture}; -use crate::sink::AddressSink; use crate::{Handler, KeepRunning}; /// The actor is no longer running and disconnected from the sending address. For why this could @@ -39,8 +41,8 @@ impl Error for Disconnected {} /// should be used instead. An address is created by calling the /// [`Actor::create`](../trait.Actor.html#method.create) or /// [`Context::run`](../struct.Context.html#method.run) methods, or by cloning another `Address`. -pub struct Address { - pub(crate) sender: Sender>, +pub struct Address { + pub(crate) sink: SendSink<'static, AddressMessage>, pub(crate) ref_counter: Rc, } @@ -65,7 +67,7 @@ impl Address { /// addresses exist. pub fn downgrade(&self) -> WeakAddress { WeakAddress { - sender: self.sender.clone(), + sink: self.sink.clone(), ref_counter: self.ref_counter.downgrade(), } } @@ -76,7 +78,7 @@ impl Address { /// Converts this address into a weak address. pub fn downgrade(&self) -> WeakAddress { WeakAddress { - sender: self.sender.clone(), + sink: self.sink.clone(), ref_counter: self.ref_counter.clone().into_weak(), } } @@ -112,17 +114,17 @@ impl Address { /// }) /// ``` pub fn is_connected(&self) -> bool { - !self.sender.is_disconnected() + !self.sink.is_disconnected() } /// Returns the number of messages in the actor's mailbox. pub fn len(&self) -> usize { - self.sender.len() + self.sink.len() } /// The total capacity of the actor's mailbox. pub fn capacity(&self) -> Option { - self.sender.capacity() + self.sink.capacity() } /// Returns whether the actor's mailbox is empty. @@ -133,8 +135,8 @@ impl Address { /// Convert this address into a generic address which can be weak or strong. pub fn as_either(&self) -> Address { Address { + sink: self.sink.clone(), ref_counter: self.ref_counter.clone().into_either(), - sender: self.sender.clone(), } } @@ -160,7 +162,8 @@ impl Address { if self.is_connected() { let (envelope, rx) = ReturningEnvelope::>::Return>::new(message); let tx = self - .sender + .sink + .sender() .clone() .into_send_async(AddressMessage::Message(Box::new(envelope))); @@ -204,25 +207,51 @@ impl Address { } } - /// Converts this address into a [futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html). - pub fn into_sink(self) -> AddressSink { - AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - } - } - /// Waits until this address becomes disconnected. pub fn join(&self) -> impl Future + Send + Unpin { self.ref_counter.disconnect_notice() } } +impl Sink for Address +where + A: Handler, + M: Send + 'static, + Rc: RefCounter, +{ + type Error = Disconnected; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink) + .poll_ready(cx) + .map_err(|_| Disconnected) + } + + fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> { + let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item))); + Pin::new(&mut self.sink) + .start_send(item) + .map_err(|_| Disconnected) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink) + .poll_flush(cx) + .map_err(|_| Disconnected) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.sink) + .poll_close(cx) + .map_err(|_| Disconnected) + } +} + // Required because #[derive] adds an A: Clone bound impl Clone for Address { fn clone(&self) -> Self { Address { - sender: self.sender.clone(), + sink: self.sink.clone(), ref_counter: self.ref_counter.clone(), } } @@ -234,7 +263,7 @@ impl Drop for Address { // We should notify the ActorManager that there are no more strong Addresses and the actor // should be stopped. if self.ref_counter.is_last_strong() { - let _ = self.sender.send(AddressMessage::LastAddress); + let _ = self.sink.sender().send(AddressMessage::LastAddress); } } } diff --git a/src/context.rs b/src/context.rs index a59e5bd2..629b4b0f 100644 --- a/src/context.rs +++ b/src/context.rs @@ -88,7 +88,7 @@ impl Context { let weak = strong.downgrade(); let addr = Address { - sender: sender.clone(), + sink: sender.clone().into_sink(), ref_counter: strong, }; @@ -136,7 +136,7 @@ impl Context { /// Get an address to the current actor if there are still external addresses to the actor. pub fn address(&self) -> Result, ActorShutdown> { Ok(Address { - sender: self.sender.clone(), + sink: self.sender.clone().into_sink(), ref_counter: self.ref_counter.upgrade().ok_or(ActorShutdown)?, }) } diff --git a/src/lib.rs b/src/lib.rs index ac4c3a34..220f9835 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,6 @@ mod receiver; /// influences whether the address will keep the actor alive for as long as it lives. pub mod refcount; mod send_future; -pub mod sink; /// This module contains a trait to spawn actors, implemented for all major async runtimes by default. pub mod spawn; #[cfg(feature = "with-tracing-0_1")] @@ -251,7 +250,6 @@ impl From<()> for KeepRunning { mod private { use crate::refcount::{Either, RefCounter, Strong, Weak}; - use crate::sink::AddressSink; use crate::{Actor, Address}; pub trait Sealed {} @@ -260,5 +258,4 @@ mod private { impl Sealed for Weak {} impl Sealed for Either {} impl Sealed for Address {} - impl Sealed for AddressSink {} } diff --git a/src/message_channel.rs b/src/message_channel.rs index 8c962c8d..a180ac53 100644 --- a/src/message_channel.rs +++ b/src/message_channel.rs @@ -14,7 +14,6 @@ use crate::private::Sealed; use crate::receiver::Receiver; use crate::refcount::{RefCounter, Shared, Strong}; use crate::send_future::{ResolveToHandlerReturn, SendFuture}; -use crate::sink::{AddressSink, MessageSink, StrongMessageSink, WeakMessageSink}; use crate::{Handler, KeepRunning}; /// A message channel is a channel through which you can send only one kind of message, but to @@ -79,7 +78,7 @@ use crate::{Handler, KeepRunning}; /// }) /// } /// ``` -pub trait MessageChannel: Sealed + Unpin + Debug + Send + Sync { +pub trait MessageChannel: Sealed + Unpin + Debug + Send { /// The return value of the handler for `M`. type Return: Send + 'static; /// Returns whether the actor referred to by this address is running and accepting messages. @@ -128,10 +127,6 @@ pub trait MessageChannel: Sealed + Unpin + Debug + Send + Sync { /// Clones this channel as a boxed trait object. fn clone_channel(&self) -> Box>; - /// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) - /// and asynchronously send messages through it. - fn sink(&self) -> Box>; - /// Determines whether this and the other message channel address the same actor mailbox. fn eq(&self, other: &dyn MessageChannel) -> bool; @@ -162,10 +157,6 @@ pub trait StrongMessageChannel: MessageChannel { /// Clones this channel as a boxed trait object. fn clone_channel(&self) -> Box>; - - /// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) - /// and asynchronously send messages through it. - fn sink(&self) -> Box>; } /// A message channel is a channel through which you can send only one kind of message, but to @@ -186,10 +177,6 @@ pub trait WeakMessageChannel: MessageChannel { /// Clones this channel as a boxed trait object. fn clone_channel(&self) -> Box>; - - /// Use this message channel as [a futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) - /// and asynchronously send messages through it. - fn sink(&self) -> Box>; } impl MessageChannel for Address @@ -219,7 +206,8 @@ where if self.is_connected() { let (envelope, rx) = ReturningEnvelope::::new(message); let sending = self - .sender + .sink + .sender() .clone() .into_send_async(AddressMessage::Message(Box::new(envelope))); @@ -246,13 +234,6 @@ where Box::new(self.clone()) } - fn sink(&self) -> Box> { - Box::new(AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - }) - } - fn eq(&self, other: &dyn MessageChannel) -> bool { other._ref_counter_eq(self.ref_counter.as_ptr()) } @@ -286,13 +267,6 @@ where fn clone_channel(&self) -> Box> { Box::new(self.clone()) } - - fn sink(&self) -> Box> { - Box::new(AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - }) - } } impl WeakMessageChannel for WeakAddress @@ -315,11 +289,4 @@ where fn clone_channel(&self) -> Box> { Box::new(self.clone()) } - - fn sink(&self) -> Box> { - Box::new(AddressSink { - sink: self.sender.clone().into_sink(), - ref_counter: self.ref_counter.clone(), - }) - } } diff --git a/src/sink.rs b/src/sink.rs deleted file mode 100644 index fbbce09e..00000000 --- a/src/sink.rs +++ /dev/null @@ -1,221 +0,0 @@ -//! Module for the sink equivalents to [`Address`](../address/struct.Address.html) and -//! [`MessageChannel`](../message_channel/trait.MessageChannel.html). - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use flume::r#async::SendSink; -use futures_sink::Sink; -use futures_util::SinkExt; - -use crate::address::Disconnected; -use crate::envelope::NonReturningEnvelope; -use crate::manager::AddressMessage; -use crate::private::Sealed; -use crate::refcount::{RefCounter, Strong, Weak}; -use crate::{Actor, Handler}; - -/// An `AddressSink` is the [futures `Sink`](https://docs.rs/futures/0.3/futures/io/struct.Sink.html) -/// returned by [`Address::into_sink`](../address/struct.Address.html#method.into_sink). Similarly to with -/// addresses, the strong variety of `AddressSink` will prevent the actor from being dropped, whereas -/// the [weak variety](struct.AddressSink.html) will not. -pub struct AddressSink { - pub(crate) sink: SendSink<'static, AddressMessage>, - pub(crate) ref_counter: Rc, -} - -impl Clone for AddressSink { - fn clone(&self) -> Self { - AddressSink { - sink: self.sink.clone(), - ref_counter: self.ref_counter.clone(), - } - } -} - -/// This variety of `AddressSink` will not prevent the actor from being dropped. -pub type WeakAddressSink = AddressSink; - -impl AddressSink { - /// Returns whether the actor referred to by this address sink is running and accepting messages. - pub fn is_connected(&self) -> bool { - self.ref_counter.is_connected() - } -} - -impl AddressSink { - /// Create a weak address sink. Unlike with the strong variety of address sink (this kind), - /// an actor will not be prevented from being dropped if only weak sinks, channels, and - /// addresses exist. - pub fn downgrade(&self) -> WeakAddressSink { - AddressSink { - sink: self.sink.clone(), - ref_counter: self.ref_counter.downgrade(), - } - } -} - -impl Drop for AddressSink { - fn drop(&mut self) { - // We should notify the ActorManager that there are no more strong Addresses and the actor - // should be stopped. - if self.ref_counter.is_last_strong() { - let _ = pollster::block_on(self.sink.send(AddressMessage::LastAddress)); - } - } -} - -impl Sink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - type Error = Disconnected; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sink) - .poll_ready(cx) - .map_err(|_| Disconnected) - } - - fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> { - let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item))); - Pin::new(&mut self.sink) - .start_send(item) - .map_err(|_| Disconnected) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sink) - .poll_flush(cx) - .map_err(|_| Disconnected) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sink) - .poll_close(cx) - .map_err(|_| Disconnected) - } -} - -/// A `MessageSink` is similar to a [`MessageChannel`](../message_channel/trait.MessageChannel.html), -/// but it is a sink and operates asynchronously. -pub trait MessageSink: Sealed + Sink + Unpin + Send { - /// Returns whether the actor referred to by this message sink is running and accepting messages. - fn is_connected(&self) -> bool; - - /// Returns the number of messages in the actor's mailbox. Note that this does **not** - /// differentiate between types of messages; it will return the count of all messages in the - /// actor's mailbox, not only the messages sent by this message channel type. - fn len(&self) -> usize; - - /// The total capacity of the actor's mailbox. Note that this does **not** differentiate between - /// types of messages; it will return the total capacity of actor's mailbox, not only the - /// messages sent by this message channel type - fn capacity(&self) -> Option; - - /// Returns whether the actor's mailbox is empty. - fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Clones this message sink as a boxed trait object. - fn clone_message_sink(&self) -> Box>; -} - -/// A `WeakMessageSink` is a [`MessageSink`](trait.MessageSink.html) which does not inhibit the actor -/// from being dropped while it exists. -pub trait WeakMessageSink: MessageSink { - /// Upcasts this weak message sink into a boxed generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast(self) -> Box>; - - /// Upcasts this weak message sink into a reference to the generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast_ref(&self) -> &dyn MessageSink; - - /// Clones this message sink as a boxed trait object. - fn clone_message_sink(&self) -> Box>; -} - -/// A `StrongMessageSink` is a [`MessageSink`](trait.MessageSink.html) which does inhibit the actor -/// from being dropped while it exists. -pub trait StrongMessageSink: MessageSink { - /// Create a weak message sink. Unlike with the strong variety of message sink (this kind), - /// an actor will not be prevented from being dropped if only weak sinks, channels, and - /// addresses exist. - fn downgrade(self) -> Box>; - - /// Upcasts this strong message sink into a boxed generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast(self) -> Box>; - - /// Upcasts this strong message sink into a reference to the generic - /// [`MessageSink`](trait.MessageSink.html) trait object - fn upcast_ref(&self) -> &dyn MessageSink; - - /// Clones this message sink as a boxed trait object. - fn clone_message_sink(&self) -> Box>; -} - -impl MessageSink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - fn is_connected(&self) -> bool { - self.ref_counter.is_connected() - } - - fn len(&self) -> usize { - self.sink.len() - } - - fn capacity(&self) -> Option { - self.sink.capacity() - } - - fn clone_message_sink(&self) -> Box> { - Box::new(self.clone()) - } -} - -impl StrongMessageSink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - fn downgrade(self) -> Box> { - Box::new(AddressSink::downgrade(&self)) - } - - fn upcast(self) -> Box> { - Box::new(self) - } - - fn upcast_ref(&self) -> &dyn MessageSink { - self - } - - fn clone_message_sink(&self) -> Box> { - Box::new(self.clone()) - } -} - -impl WeakMessageSink for AddressSink -where - A: Handler, - M: Send + 'static, -{ - fn upcast(self) -> Box> { - Box::new(self) - } - - fn upcast_ref(&self) -> &dyn MessageSink { - self - } - - fn clone_message_sink(&self) -> Box> { - Box::new(self.clone()) - } -}