Skip to content

Commit

Permalink
fix(interactive): Fix bugs of check ready (#4381)
Browse files Browse the repository at this point in the history
  • Loading branch information
lnfjpt authored Jan 9, 2025
1 parent 3b0e2f7 commit f8250fb
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ahash = "0.7.2"
dot = "0.1.4"
dyn-clonable = "0.9.0"
opentelemetry = { version = "0.22.0", features = ["trace", "metrics"] }
num_cpus = "1.11"

[features]
mem = ["pegasus_memory/mem"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ impl<D: Data, T: Debug + Send + 'static> Task for Worker<D, T> {

fn check_ready(&mut self) -> TaskState {
let _g = crate::worker_id::guard(self.id);
if self.is_finished && self.peer_guard.load(Ordering::SeqCst) == 0 {
return TaskState::Finished;
}
if self.check_cancel() {
self.sink.set_cancel_hook(true);
return TaskState::Finished;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::time::Duration;

use pegasus::api::{Collect, IterCondition, Iteration, Map, Sink};
use pegasus::api::{Collect, HasAny, IterCondition, Iteration, Map, Sink};
use pegasus::JobConf;

static CORE_POOL_SIZE: &'static str = "PEGASUS_CORE_POOL_SIZE";

/// test binary merge pipeline;
#[test]
fn timeout_test_01() {
Expand Down Expand Up @@ -133,3 +135,60 @@ fn timeout_resubmit_test() {
result.sort();
assert_eq!(result, [20, 20]);
}

#[test]
fn timeout_caused_by_large_job() {
let core = ::std::env::var(CORE_POOL_SIZE)
.map(|value| {
value
.parse::<usize>()
.unwrap_or_else(|_| num_cpus::get())
})
.unwrap_or_else(|_| num_cpus::get());
let mut conf_1 = JobConf::new("test");
conf_1.time_limit = 5000;
conf_1.set_workers(core as u32);
let mut conf_2 = JobConf::new("block_job");
conf_2.set_workers(core as u32);
let mut results_1 = pegasus::run(conf_1, || {
|input, output| {
let worker_id = input.get_worker_index();
input
.input_from(vec![0u32])?
.map(move |i| {
if worker_id != 0 {
std::thread::sleep(Duration::from_millis(2000));
}
Ok(i)
})?
.any()?
.sink_into(output)
}
})
.expect("submit job failure;");
let _ = pegasus::run(conf_2, || {
|input, output| {
let worker_id = input.get_worker_index();
input
.input_from(vec![0u32])?
.map(|i| {
std::thread::sleep(Duration::from_millis(4000));
Ok(i)
})?
.any()?
.sink_into(output)
}
})
.expect("submit job failure;");
let mut count = 0;
while let Some(result) = results_1.next() {
if let Ok(data) = result {
count += 1;
} else {
let err = result.err().unwrap();
assert_eq!(err.to_string(), "Job is canceled;".to_string());
break;
}
}
assert!(!results_1.is_cancel());
}

0 comments on commit f8250fb

Please sign in to comment.