Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set auction_id as request_id in autopilot #3243

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/autopilot/src/infra/solvers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ impl Driver {
if let Some(timeout) = timeout {
request = request.timeout(timeout);
}
if let Some(request_id) = observe::request_id::get_task_local_storage() {
request = request.header("X-REQUEST-ID", request_id);
}

let mut response = request.send().await.context("send")?;
let status = response.status().as_u16();
Expand Down
6 changes: 4 additions & 2 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ impl RunLoop {
.await;
if let Some(auction) = auction {
let auction_id = auction.id;
self_arc
let auction_task = self_arc
.single_run(auction)
.instrument(tracing::info_span!("auction", auction_id))
.instrument(tracing::info_span!("auction", auction_id));

::observe::request_id::set_task_local_storage(auction_id.to_string(), auction_task)
.await;
};
}
Expand Down
49 changes: 46 additions & 3 deletions crates/observe/src/request_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ where
F::Output: Send + 'static,
{
if let Some(id) = get_task_local_storage() {
tokio::task::spawn(REQUEST_ID.scope(id, future))
tokio::task::spawn(set_task_local_storage(id, future))
} else {
tokio::task::spawn(future)
}
Expand Down Expand Up @@ -82,8 +82,10 @@ macro_rules! make_service_with_task_local_storage {
)
};
let span = tracing::info_span!("request", id);
let handle_request = observe::request_id::REQUEST_ID
.scope(id, hyper::service::Service::call(&mut warp_svc, req));
let handle_request = observe::request_id::set_task_local_storage(
id,
hyper::service::Service::call(&mut warp_svc, req),
);
tracing::Instrument::instrument(handle_request, span)
});
Ok::<_, std::convert::Infallible>(svc)
Expand All @@ -92,3 +94,44 @@ macro_rules! make_service_with_task_local_storage {
}
}};
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn request_id_copied_to_new_task() {
// use channels to enforce that assertions happen in the desired order.
// First we assert that the parent task's storage is empty after we
// spawned the child task.
// Afterwards we assert that the child task still has the parent task's
// value at the time of spawning.
let (sender1, receiver1) = tokio::sync::oneshot::channel();
let (sender2, receiver2) = tokio::sync::oneshot::channel();

spawn_task_with_current_request_id(async {
assert_eq!(None, get_task_local_storage());
})
.await
.unwrap();

// create a task with some task local value
let _ = set_task_local_storage("1234".into(), async {
// spawn a new task that copies the parent's task local value
MartinquaXD marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(Some("1234".into()), get_task_local_storage());
spawn_task_with_current_request_id(async {
receiver1.await.unwrap();
assert_eq!(Some("1234".into()), get_task_local_storage());
sender2.send(()).unwrap();
});
})
.await;

// task local value is not populated outside of the previous scope
assert_eq!(None, get_task_local_storage());
sender1.send(()).unwrap();

// block test until the important assertion happened
receiver2.await.unwrap();
}
}
Loading