Skip to content

Commit

Permalink
Merge pull request #2309 from subspace/relax-send-run-fut-dedicated-t…
Browse files Browse the repository at this point in the history
…hread

Relax `Send` and `Unpin` requirements for futures spawned using `run_future_in_dedicated_thread`
  • Loading branch information
nazar-pc authored Dec 11, 2023
2 parents d521915 + c0b4a8e commit 6989871
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
12 changes: 8 additions & 4 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,11 @@ where
));

let _piece_cache_worker = run_future_in_dedicated_thread(
Box::pin(piece_cache_worker.run(piece_getter.clone())),
{
let future = piece_cache_worker.run(piece_getter.clone());

move || future
},
"cache-worker".to_string(),
);

Expand Down Expand Up @@ -599,20 +603,20 @@ where
drop(readers_and_pieces);

let farm_fut = run_future_in_dedicated_thread(
Box::pin(async move {
move || async move {
while let Some(result) = single_disk_farms_stream.next().await {
let id = result?;

info!(%id, "Farm exited successfully");
}
anyhow::Ok(())
}),
},
"farmer-farm".to_string(),
)?;
let mut farm_fut = Box::pin(farm_fut).fuse();

let networking_fut = run_future_in_dedicated_thread(
Box::pin(async move { node_runner.run().await }),
move || async move { node_runner.run().await },
"farmer-networking".to_string(),
)?;
let mut networking_fut = Box::pin(networking_fut).fuse();
Expand Down
11 changes: 7 additions & 4 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future::Either;
use rayon::ThreadBuilder;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::{io, thread};
use tokio::runtime::Handle;
Expand Down Expand Up @@ -94,12 +94,13 @@ impl Deref for JoinOnDrop {

/// Runs future on a dedicated thread with the specified name, will block on drop until background
/// thread with future is stopped too, ensuring nothing is left in memory
pub fn run_future_in_dedicated_thread<Fut, T>(
future: Fut,
pub fn run_future_in_dedicated_thread<CreateFut, Fut, T>(
create_future: CreateFut,
thread_name: String,
) -> io::Result<impl Future<Output = Result<T, Canceled>> + Send>
where
Fut: Future<Output = T> + Unpin + Send + 'static,
CreateFut: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
let (drop_tx, drop_rx) = oneshot::channel::<()>();
Expand All @@ -108,6 +109,8 @@ where
let join_handle = thread::Builder::new().name(thread_name).spawn(move || {
let _tokio_handle_guard = handle.enter();

let future = pin!(create_future());

let result = match handle.block_on(futures::future::select(future, drop_rx)) {
Either::Left((result, _)) => result,
Either::Right(_) => {
Expand Down
21 changes: 7 additions & 14 deletions crates/subspace-farmer/src/utils/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ use tokio::sync::oneshot;

#[tokio::test]
async fn run_future_in_dedicated_thread_ready() {
let value = run_future_in_dedicated_thread(
Box::pin(async { future::ready(1).await }),
"ready".to_string(),
)
.unwrap()
.await
.unwrap();
let value = run_future_in_dedicated_thread(|| future::ready(1), "ready".to_string())
.unwrap()
.await
.unwrap();

assert_eq!(value, 1);
}
Expand All @@ -19,11 +16,7 @@ async fn run_future_in_dedicated_thread_ready() {
async fn run_future_in_dedicated_thread_cancellation() {
// This may hang if not implemented correctly
drop(
run_future_in_dedicated_thread(
Box::pin(async { future::pending::<()>().await }),
"cancellation".to_string(),
)
.unwrap(),
run_future_in_dedicated_thread(future::pending::<()>, "cancellation".to_string()).unwrap(),
);
}

Expand All @@ -44,11 +37,11 @@ fn run_future_in_dedicated_thread_tokio_on_drop() {

tokio::runtime::Runtime::new().unwrap().block_on(async {
drop(run_future_in_dedicated_thread(
Box::pin(async {
move || async move {
let s = S;
let _ = receiver.await;
drop(s);
}),
},
"tokio_on_drop".to_string(),
));
});
Expand Down

0 comments on commit 6989871

Please sign in to comment.