Skip to content

Commit

Permalink
Revert parts that make tests fail
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff authored and Miguel Fernández committed Dec 4, 2023
1 parent 2a2565a commit bd566a6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod external_process;
use super::*;
use external_process::*;
use serde::de::DeserializeOwned;
use std::sync::atomic::AtomicU64;
use std::{collections::HashMap, sync::atomic::AtomicU64};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

pub(crate) async fn executor_process_request<T: DeserializeOwned>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>)> = None;
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>> =
HashMap::new();

loop {
tokio::select! {
Expand All @@ -187,11 +188,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
{
match serde_json::from_str::<jsonrpc_core::Output>(&line) {
Ok(response) => {
let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request");
if &id != response.id() {
unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`");
}

let sender = pending_requests.remove(response.id()).unwrap();
match response {
jsonrpc_core::Output::Success(success) => {
// The other end may be dropped if the whole
Expand All @@ -213,12 +210,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
}
Ok(None) => // end of the stream
{
tracing::error!("Error when reading from child node process. Process might have exited. Restarting...");
if let Some((_, sender)) = last_pending_request.take() {
sender.send(Err(Box::new(ExecutorProcessDiedError))).unwrap();
}
EXTERNAL_PROCESS.restart().await;
break;
exit_with_message(1, "child node process stdout closed")
}
Err(err) => // log it
{
Expand All @@ -233,7 +225,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
last_pending_request = Some((request.id.clone(), response_sender));
pending_requests.insert(request.id.clone(), response_sender);
let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
Expand Down

0 comments on commit bd566a6

Please sign in to comment.