diff --git a/interactive_engine/executor/engine/pegasus/pegasus/Cargo.toml b/interactive_engine/executor/engine/pegasus/pegasus/Cargo.toml index 19a8e993244b..995da272e08f 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/Cargo.toml +++ b/interactive_engine/executor/engine/pegasus/pegasus/Cargo.toml @@ -27,6 +27,7 @@ ahash = "0.7.2" dot = "0.1.4" dyn-clonable = "0.9.0" opentelemetry = { version = "0.22.0", features = ["trace", "metrics"] } +num_cpus = "1.11" [features] mem = ["pegasus_memory/mem"] diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs index d313a823b9ce..4447c7dfa3b0 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs @@ -293,6 +293,9 @@ impl Task for Worker { fn check_ready(&mut self) -> TaskState { let _g = crate::worker_id::guard(self.id); + if self.is_finished && self.peer_guard.load(Ordering::SeqCst) == 0 { + return TaskState::Finished; + } if self.check_cancel() { self.sink.set_cancel_hook(true); return TaskState::Finished; diff --git a/interactive_engine/executor/engine/pegasus/pegasus/tests/timeout_test.rs b/interactive_engine/executor/engine/pegasus/pegasus/tests/timeout_test.rs index 3fa4314178c3..0fd66bc2abf8 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/tests/timeout_test.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/tests/timeout_test.rs @@ -15,9 +15,11 @@ use std::time::Duration; -use pegasus::api::{Collect, IterCondition, Iteration, Map, Sink}; +use pegasus::api::{Collect, HasAny, IterCondition, Iteration, Map, Sink}; use pegasus::JobConf; +static CORE_POOL_SIZE: &'static str = "PEGASUS_CORE_POOL_SIZE"; + /// test binary merge pipeline; #[test] fn timeout_test_01() { @@ -133,3 +135,60 @@ fn timeout_resubmit_test() { result.sort(); assert_eq!(result, [20, 20]); } + +#[test] +fn timeout_caused_by_large_job() { + let core = ::std::env::var(CORE_POOL_SIZE) + .map(|value| { + value + .parse::() + .unwrap_or_else(|_| num_cpus::get()) + }) + .unwrap_or_else(|_| num_cpus::get()); + let mut conf_1 = JobConf::new("test"); + conf_1.time_limit = 5000; + conf_1.set_workers(core as u32); + let mut conf_2 = JobConf::new("block_job"); + conf_2.set_workers(core as u32); + let mut results_1 = pegasus::run(conf_1, || { + |input, output| { + let worker_id = input.get_worker_index(); + input + .input_from(vec![0u32])? + .map(move |i| { + if worker_id != 0 { + std::thread::sleep(Duration::from_millis(2000)); + } + Ok(i) + })? + .any()? + .sink_into(output) + } + }) + .expect("submit job failure;"); + let _ = pegasus::run(conf_2, || { + |input, output| { + let worker_id = input.get_worker_index(); + input + .input_from(vec![0u32])? + .map(|i| { + std::thread::sleep(Duration::from_millis(4000)); + Ok(i) + })? + .any()? + .sink_into(output) + } + }) + .expect("submit job failure;"); + let mut count = 0; + while let Some(result) = results_1.next() { + if let Ok(data) = result { + count += 1; + } else { + let err = result.err().unwrap(); + assert_eq!(err.to_string(), "Job is canceled;".to_string()); + break; + } + } + assert!(!results_1.is_cancel()); +}