From 4e562b8e4aedc43e218ed2cd909c20db0ce58b5e Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Mon, 20 Jan 2025 11:12:06 +0000 Subject: [PATCH 1/4] Set auction_id as request_id in autopilot --- crates/autopilot/src/run_loop.rs | 11 ++++++-- crates/observe/src/request_id.rs | 48 ++++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 3b55738364..12b3ab257c 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -107,10 +107,15 @@ impl RunLoop { .await; if let Some(auction) = auction { let auction_id = auction.id; - self_arc + let auction_task = self_arc .single_run(auction) - .instrument(tracing::info_span!("auction", auction_id)) - .await; + .instrument(tracing::info_span!("auction", auction_id)); + + ::observe::request_id::set_task_local_storage( + auction_id.to_string(), + auction_task, + ) + .await; }; } } diff --git a/crates/observe/src/request_id.rs b/crates/observe/src/request_id.rs index 268684409e..945ac00f00 100644 --- a/crates/observe/src/request_id.rs +++ b/crates/observe/src/request_id.rs @@ -48,7 +48,7 @@ where F::Output: Send + 'static, { if let Some(id) = get_task_local_storage() { - tokio::task::spawn(REQUEST_ID.scope(id, future)) + tokio::task::spawn(set_task_local_storage(id, future)) } else { tokio::task::spawn(future) } @@ -82,8 +82,10 @@ macro_rules! make_service_with_task_local_storage { ) }; let span = tracing::info_span!("request", id); - let handle_request = observe::request_id::REQUEST_ID - .scope(id, hyper::service::Service::call(&mut warp_svc, req)); + let handle_request = observe::request_id::set_task_local_storage( + id, + hyper::service::Service::call(&mut warp_svc, req), + ); tracing::Instrument::instrument(handle_request, span) }); Ok::<_, std::convert::Infallible>(svc) @@ -92,3 +94,43 @@ macro_rules! make_service_with_task_local_storage { } }}; } + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn request_id_copied_to_new_task() { + // use channels to enforce that assertions happen in the desired order. + // First we assert that the parent task's storage is empty after we + // spawned the child task. + // Afterwards we assert that the child task still has the parent task's + // value at the time of spawning. + let (sender1, receiver1) = tokio::sync::oneshot::channel(); + let (sender2, receiver2) = tokio::sync::oneshot::channel(); + + spawn_task_with_current_request_id(async { + assert_eq!(None, get_task_local_storage()); + }) + .await + .unwrap(); + + // create a task with some task local value + let _ = set_task_local_storage("1234".into(), async { + // spawn a new task that copies the parent's task local value + spawn_task_with_current_request_id(async { + receiver1.await.unwrap(); + assert_eq!(Some("1234".into()), get_task_local_storage()); + sender2.send(()).unwrap(); + }); + }) + .await; + + // task local value is not populated outside of the previous scope + assert_eq!(None, get_task_local_storage()); + sender1.send(()).unwrap(); + + // block test until the important assertion happened + receiver2.await.unwrap(); + } +} From f97fa3349773c51f633bf6d1b5e01cedd6b44198 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Mon, 20 Jan 2025 11:33:14 +0000 Subject: [PATCH 2/4] cargo fmt --- crates/autopilot/src/run_loop.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 12b3ab257c..ba2d6e9dd7 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -111,11 +111,8 @@ impl RunLoop { .single_run(auction) .instrument(tracing::info_span!("auction", auction_id)); - ::observe::request_id::set_task_local_storage( - auction_id.to_string(), - auction_task, - ) - .await; + ::observe::request_id::set_task_local_storage(auction_id.to_string(), auction_task) + .await; }; } } From ffec9dc186c4b7f74c970427611e1968bbdb67e6 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Mon, 20 Jan 2025 11:47:18 +0000 Subject: [PATCH 3/4] Set `X-REQUEST-ID` header when autopilot sends request to driver --- crates/autopilot/src/infra/solvers/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index 0d243805ce..b953e577c3 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -94,6 +94,9 @@ impl Driver { if let Some(timeout) = timeout { request = request.timeout(timeout); } + if let Some(request_id) = observe::request_id::get_task_local_storage() { + request = request.header("X-REQUEST-ID", request_id); + } let mut response = request.send().await.context("send")?; let status = response.status().as_u16(); From 1ee41c5b5e17aa44dd5788228b3af2d139cbf876 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Mon, 20 Jan 2025 11:54:03 +0000 Subject: [PATCH 4/4] extra assert --- crates/observe/src/request_id.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/observe/src/request_id.rs b/crates/observe/src/request_id.rs index 945ac00f00..69a688f887 100644 --- a/crates/observe/src/request_id.rs +++ b/crates/observe/src/request_id.rs @@ -118,6 +118,7 @@ mod test { // create a task with some task local value let _ = set_task_local_storage("1234".into(), async { // spawn a new task that copies the parent's task local value + assert_eq!(Some("1234".into()), get_task_local_storage()); spawn_task_with_current_request_id(async { receiver1.await.unwrap(); assert_eq!(Some("1234".into()), get_task_local_storage());