diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs index d3ccdbac511f..06d1551f9405 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs @@ -140,6 +140,42 @@ impl Display for ExecutorProcessDiedError { impl StdError for ExecutorProcessDiedError {} +struct PendingRequests { + map: HashMap>>, + last_id: Option, +} + +impl PendingRequests { + fn new() -> Self { + Self { + map: HashMap::new(), + last_id: None, + } + } + + fn insert(&mut self, id: jsonrpc_core::Id, sender: oneshot::Sender>) { + self.map.insert(id.clone(), sender); + self.last_id = Some(id); + } + + fn respond(&mut self, id: &jsonrpc_core::Id, response: Result) { + self.map + .remove(id) + .expect("no sender for response") + .send(response) + .unwrap(); + } + + fn respond_to_last(&mut self, response: Result) { + let last_id = self + .last_id + .as_ref() + .expect("Expected last response to exist") + .to_owned(); + self.respond(&last_id, response); + } +} + pub(super) static EXTERNAL_PROCESS: Lazy = Lazy::new(RestartableExecutorProcess::new); type ReqImpl = ( @@ -173,8 +209,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { let mut stdout = BufReader::new(process.stdout.unwrap()).lines(); let mut stdin = process.stdin.unwrap(); - let mut pending_requests: HashMap>> = - HashMap::new(); + let mut pending_requests = PendingRequests::new(); loop { tokio::select! { @@ -187,20 +222,20 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { Ok(Some(line)) => // new response { match serde_json::from_str::(&line) { - Ok(response) => { - let sender = pending_requests.remove(response.id()).unwrap(); - match response { + Ok(ref response) => { + let res: Result = match response { jsonrpc_core::Output::Success(success) => { // The other end may be dropped if the whole // request future was dropped and not polled to // completion, so we ignore send errors here. - _ = sender.send(Ok(success.result)); + Ok(success.result.clone()) } jsonrpc_core::Output::Failure(err) => { tracing::error!("error response from jsonrpc: {err:?}"); - _ = sender.send(Err(Box::new(err.error))); + Err(Box::new(err.error.clone())) } - } + }; + pending_requests.respond(response.id(), res) } Err(err) => { tracing::error!(%err, "error when decoding response from child node process. Response was: `{}`", &line); @@ -210,7 +245,11 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { } Ok(None) => // end of the stream { - exit_with_message(1, "child node process stdout closed") + tracing::error!("Error when reading from child node process. Process might have exited. Restarting..."); + + pending_requests.respond_to_last(Err(Box::new(ExecutorProcessDiedError))); + EXTERNAL_PROCESS.restart().await; + break; } Err(err) => // log it { @@ -226,6 +265,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { } Some((request, response_sender)) => { pending_requests.insert(request.id.clone(), response_sender); + let mut req = serde_json::to_vec(&request).unwrap(); req.push(b'\n'); stdin.write_all(&req).await.unwrap();