From bd566a6fb7b7a2abef5d42a3d7c1d24ecf0da282 Mon Sep 17 00:00:00 2001 From: Miguel Fernandez Date: Mon, 4 Dec 2023 12:33:03 +0100 Subject: [PATCH] Revert parts that make tests fail --- .../query-tests-setup/src/connector_tag/js.rs | 2 +- .../src/connector_tag/js/external_process.rs | 18 +++++------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs index c852924bbf69..2ec8513baeda 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs @@ -3,7 +3,7 @@ mod external_process; use super::*; use external_process::*; use serde::de::DeserializeOwned; -use std::sync::atomic::AtomicU64; +use std::{collections::HashMap, sync::atomic::AtomicU64}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; pub(crate) async fn executor_process_request( diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs index 912a5e6d8abf..d3ccdbac511f 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs @@ -173,7 +173,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { let mut stdout = BufReader::new(process.stdout.unwrap()).lines(); let mut stdin = process.stdin.unwrap(); - let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender>)> = None; + let mut pending_requests: HashMap>> = + HashMap::new(); loop { tokio::select! { @@ -187,11 +188,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { { match serde_json::from_str::(&line) { Ok(response) => { - let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request"); - if &id != response.id() { - unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`"); - } - + let sender = pending_requests.remove(response.id()).unwrap(); match response { jsonrpc_core::Output::Success(success) => { // The other end may be dropped if the whole @@ -213,12 +210,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { } Ok(None) => // end of the stream { - tracing::error!("Error when reading from child node process. Process might have exited. Restarting..."); - if let Some((_, sender)) = last_pending_request.take() { - sender.send(Err(Box::new(ExecutorProcessDiedError))).unwrap(); - } - EXTERNAL_PROCESS.restart().await; - break; + exit_with_message(1, "child node process stdout closed") } Err(err) => // log it { @@ -233,7 +225,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { exit_with_message(1, "The json-rpc client channel was closed"); } Some((request, response_sender)) => { - last_pending_request = Some((request.id.clone(), response_sender)); + pending_requests.insert(request.id.clone(), response_sender); let mut req = serde_json::to_vec(&request).unwrap(); req.push(b'\n'); stdin.write_all(&req).await.unwrap();