diff --git a/query-engine/core/src/interactive_transactions/actors.rs b/query-engine/core/src/interactive_transactions/actors.rs deleted file mode 100644 index edf70b3d010..00000000000 --- a/query-engine/core/src/interactive_transactions/actors.rs +++ /dev/null @@ -1,430 +0,0 @@ -use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse}; -use crate::{ - execute_many_operations, execute_single_operation, protocol::EngineProtocol, ClosedTx, Operation, ResponseData, - TxId, -}; -use connector::Connection; -use crosstarget_utils::task::{spawn, spawn_controlled, JoinHandle}; -use crosstarget_utils::time::ElapsedTimeCounter; -use schema::QuerySchemaRef; -use std::{collections::HashMap, sync::Arc}; -use tokio::{ - sync::{ - mpsc::{channel, Receiver, Sender}, - oneshot, RwLock, - }, - time::Duration, -}; -use tracing::Span; -use tracing_futures::Instrument; -use tracing_futures::WithSubscriber; - -#[cfg(not(feature = "metrics"))] -use crate::metrics::MetricsInstrumentationStub; -#[cfg(feature = "metrics")] -use prisma_metrics::WithMetricsInstrumentation; - -#[cfg(feature = "metrics")] -use crate::telemetry::helpers::set_span_link_from_traceparent; - -#[derive(PartialEq)] -enum RunState { - Continue, - Finished, -} - -pub struct ITXServer<'a> { - id: TxId, - pub cached_tx: CachedTx<'a>, - pub timeout: Duration, - receive: Receiver, - query_schema: QuerySchemaRef, -} - -impl<'a> ITXServer<'a> { - pub fn new( - id: TxId, - tx: CachedTx<'a>, - timeout: Duration, - receive: Receiver, - query_schema: QuerySchemaRef, - ) -> Self { - Self { - id, - cached_tx: tx, - timeout, - receive, - query_schema, - } - } - - // RunState is used to tell if the run loop should continue - async fn process_msg(&mut self, op: TxOpRequest) -> RunState { - match op.msg { - TxOpRequestMsg::Single(ref operation, traceparent) => { - let result = self.execute_single(operation, traceparent).await; - let _ = op.respond_to.send(TxOpResponse::Single(result)); - RunState::Continue - } - TxOpRequestMsg::Batch(ref operations, traceparent) => { - let result = self.execute_batch(operations, traceparent).await; - let _ = op.respond_to.send(TxOpResponse::Batch(result)); - RunState::Continue - } - TxOpRequestMsg::Commit => { - let resp = self.commit().await; - let _ = op.respond_to.send(TxOpResponse::Committed(resp)); - RunState::Finished - } - TxOpRequestMsg::Rollback => { - let resp = self.rollback(false).await; - let _ = op.respond_to.send(TxOpResponse::RolledBack(resp)); - RunState::Finished - } - } - } - - async fn execute_single( - &mut self, - operation: &Operation, - traceparent: Option, - ) -> crate::Result { - let span = info_span!("prisma:engine:itx_query_builder", user_facing = true); - - #[cfg(feature = "metrics")] - set_span_link_from_traceparent(&span, traceparent.clone()); - - let conn = self.cached_tx.as_open()?; - execute_single_operation( - self.query_schema.clone(), - conn.as_connection_like(), - operation, - traceparent, - ) - .instrument(span) - .await - } - - async fn execute_batch( - &mut self, - operations: &[Operation], - traceparent: Option, - ) -> crate::Result>> { - let span = info_span!("prisma:engine:itx_execute", user_facing = true); - - let conn = self.cached_tx.as_open()?; - execute_many_operations( - self.query_schema.clone(), - conn.as_connection_like(), - operations, - traceparent, - ) - .instrument(span) - .await - } - - pub(crate) async fn commit(&mut self) -> crate::Result<()> { - if let CachedTx::Open(_) = self.cached_tx { - let open_tx = self.cached_tx.as_open()?; - trace!("[{}] committing.", self.id.to_string()); - open_tx.commit().await?; - self.cached_tx = CachedTx::Committed; - } - - Ok(()) - } - - pub(crate) async fn rollback(&mut self, was_timeout: bool) -> crate::Result<()> { - debug!("[{}] rolling back, was timed out = {was_timeout}", self.name()); - if let CachedTx::Open(_) = self.cached_tx { - let open_tx = self.cached_tx.as_open()?; - open_tx.rollback().await?; - if was_timeout { - trace!("[{}] Expired Rolling back", self.id.to_string()); - self.cached_tx = CachedTx::Expired; - } else { - self.cached_tx = CachedTx::RolledBack; - trace!("[{}] Rolling back", self.id.to_string()); - } - } - - Ok(()) - } - - pub(crate) fn name(&self) -> String { - format!("itx-{:?}", self.id.to_string()) - } -} - -#[derive(Clone)] -pub struct ITXClient { - send: Sender, - tx_id: TxId, -} - -impl ITXClient { - pub(crate) async fn commit(&self) -> crate::Result<()> { - let msg = self.send_and_receive(TxOpRequestMsg::Commit).await?; - - if let TxOpResponse::Committed(resp) = msg { - debug!("[{}] COMMITTED {:?}", self.tx_id, resp); - resp - } else { - Err(self.handle_error(msg).into()) - } - } - - pub(crate) async fn rollback(&self) -> crate::Result<()> { - let msg = self.send_and_receive(TxOpRequestMsg::Rollback).await?; - - if let TxOpResponse::RolledBack(resp) = msg { - resp - } else { - Err(self.handle_error(msg).into()) - } - } - - pub async fn execute(&self, operation: Operation, traceparent: Option) -> crate::Result { - let msg_req = TxOpRequestMsg::Single(operation, traceparent); - let msg = self.send_and_receive(msg_req).await?; - - if let TxOpResponse::Single(resp) = msg { - resp - } else { - Err(self.handle_error(msg).into()) - } - } - - pub(crate) async fn batch_execute( - &self, - operations: Vec, - traceparent: Option, - ) -> crate::Result>> { - let msg_req = TxOpRequestMsg::Batch(operations, traceparent); - - let msg = self.send_and_receive(msg_req).await?; - - if let TxOpResponse::Batch(resp) = msg { - resp - } else { - Err(self.handle_error(msg).into()) - } - } - - async fn send_and_receive(&self, msg: TxOpRequestMsg) -> Result { - let (receiver, req) = self.create_receive_and_req(msg); - if let Err(err) = self.send.send(req).await { - debug!("channel send error {err}"); - return Err(TransactionError::Closed { - reason: "Could not perform operation".to_string(), - } - .into()); - } - - match receiver.await { - Ok(resp) => Ok(resp), - Err(_err) => Err(TransactionError::Closed { - reason: "Could not perform operation".to_string(), - } - .into()), - } - } - - fn create_receive_and_req(&self, msg: TxOpRequestMsg) -> (oneshot::Receiver, TxOpRequest) { - let (send, rx) = oneshot::channel::(); - let request = TxOpRequest { msg, respond_to: send }; - (rx, request) - } - - fn handle_error(&self, msg: TxOpResponse) -> TransactionError { - match msg { - TxOpResponse::Committed(..) => { - let reason = "Transaction is no longer valid. Last state: 'Committed'".to_string(); - TransactionError::Closed { reason } - } - TxOpResponse::RolledBack(..) => { - let reason = "Transaction is no longer valid. Last state: 'RolledBack'".to_string(); - TransactionError::Closed { reason } - } - other => { - error!("Unexpected iTx response, {}", other); - let reason = format!("response '{other}'"); - TransactionError::Closed { reason } - } - } - } -} - -#[allow(clippy::too_many_arguments)] -pub(crate) async fn spawn_itx_actor( - query_schema: QuerySchemaRef, - tx_id: TxId, - mut conn: Box, - isolation_level: Option, - timeout: Duration, - channel_size: usize, - send_done: Sender<(TxId, Option)>, - engine_protocol: EngineProtocol, -) -> crate::Result { - let span = Span::current(); - let tx_id_str = tx_id.to_string(); - span.record("itx_id", tx_id_str.as_str()); - - let (tx_to_server, rx_from_client) = channel::(channel_size); - let client = ITXClient { - send: tx_to_server, - tx_id: tx_id.clone(), - }; - let (open_transaction_send, open_transaction_rcv) = oneshot::channel(); - - spawn( - crate::executor::with_request_context(engine_protocol, async move { - // We match on the result in order to send the error to the parent task and abort this - // task, on error. This is a separate task (actor), not a function where we can just bubble up the - // result. - let c_tx = match conn.start_transaction(isolation_level).await { - Ok(c_tx) => { - open_transaction_send.send(Ok(())).unwrap(); - c_tx - } - Err(err) => { - open_transaction_send.send(Err(err)).unwrap(); - return; - } - }; - - let mut server = ITXServer::new( - tx_id.clone(), - CachedTx::Open(c_tx), - timeout, - rx_from_client, - query_schema, - ); - - let start_time = ElapsedTimeCounter::start(); - let sleep = crosstarget_utils::time::sleep(timeout); - tokio::pin!(sleep); - - loop { - tokio::select! { - _ = &mut sleep => { - trace!("[{}] interactive transaction timed out", server.id.to_string()); - let _ = server.rollback(true).await; - break; - } - msg = server.receive.recv() => { - if let Some(op) = msg { - let run_state = server.process_msg(op).await; - - if run_state == RunState::Finished { - break - } - } else { - break; - } - } - } - } - - trace!("[{}] completed with {}", server.id.to_string(), server.cached_tx); - - let _ = send_done - .send(( - server.id.clone(), - server.cached_tx.to_closed(start_time, server.timeout), - )) - .await; - - trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx); - }) - .instrument(span) - .with_current_subscriber() - .with_current_recorder(), - ); - - open_transaction_rcv.await.unwrap()?; - - Ok(client) -} - -/// Spawn the client list clear actor -/// It waits for messages from completed ITXServers and removes -/// the ITXClient from the clients hashmap - -/* A future improvement to this would be to change this to keep a queue of - clients to remove from the list and then periodically remove them. This - would be a nice optimization because we would take less write locks on the - hashmap. - - The downside to consider is that we can introduce a race condition where the - ITXServer has stopped running but the client hasn't been removed from the hashmap - yet. When the client tries to send a message to the ITXServer there will be a - send error. This isn't a huge obstacle but something to handle correctly. - And example implementation for this would be: - - ``` - let mut queue: Vec = Vec::new(); - - let sleep_duration = Duration::from_millis(100); - let clear_sleeper = time::sleep(sleep_duration); - tokio::pin!(clear_sleeper); - - loop { - tokio::select! { - _ = &mut clear_sleeper => { - let mut list = clients.write().await; - for id in queue.drain(..) { - trace!("removing {} from client list", id); - list.remove(&id); - } - clear_sleeper.as_mut().reset(Instant::now() + sleep_duration); - } - msg = rx.recv() => { - if let Some(id) = msg { - queue.push(id); - } - } - } - } - ``` -*/ -pub(crate) fn spawn_client_list_clear_actor( - clients: Arc>>, - closed_txs: Arc>>>, - mut rx: Receiver<(TxId, Option)>, -) -> JoinHandle<()> { - // Note: tasks implemented via loops cannot be cancelled implicitly, so we need to spawn them in a - // "controlled" way, via `spawn_controlled`. - // The `rx_exit` receiver is used to signal the loop to exit, and that signal is emitted whenever - // the task is aborted (likely, due to the engine shutting down and cleaning up the allocated resources). - spawn_controlled(Box::new( - |mut rx_exit: tokio::sync::broadcast::Receiver<()>| async move { - loop { - tokio::select! { - result = rx.recv() => { - match result { - Some((id, closed_tx)) => { - trace!("removing {} from client list", id); - - let mut clients_guard = clients.write().await; - - clients_guard.remove(&id); - drop(clients_guard); - - closed_txs.write().await.put(id, closed_tx); - } - None => { - // the `rx` channel is closed. - tracing::error!("rx channel is closed!"); - break; - } - } - }, - _ = rx_exit.recv() => { - break; - }, - } - } - }, - )) -}