Skip to content

Commit

Permalink
Merge pull request #5594 from nextflow-io/5516-awsbatch-error-v2
Browse files Browse the repository at this point in the history
Improve kill task logic
  • Loading branch information
pditommaso authored Dec 10, 2024
2 parents 7c8ca0e + 6b03a46 commit 2e14391
Show file tree
Hide file tree
Showing 18 changed files with 41 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CachedTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( batch ) {
batch.collect(executor, jobId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class NopeTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StoredTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {
* Force the submitted job to quit
*/
@Override
void kill() {
protected void killTask() {
if( !process ) return
final pid = ProcessHelper.pid(process)
log.trace("Killing process with pid: ${pid}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NativeTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( result ) result.cancel(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
* Terminates the current task execution
*/
@Override
void kill() {
protected void killTask() {
if( cleanupDisabled() )
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.processor
import static nextflow.processor.TaskStatus.*

import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicBoolean

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
Expand All @@ -37,6 +38,8 @@ import nextflow.trace.TraceRecord
@CompileStatic
abstract class TaskHandler {

private AtomicBoolean killed = new AtomicBoolean()

protected TaskHandler(TaskRun task) {
this.task = task
}
Expand Down Expand Up @@ -77,10 +80,22 @@ abstract class TaskHandler {
abstract boolean checkIfCompleted()

/**
* Force the submitted job to quit
* Template method implementing the termination of a task execution.
* This is not mean to be invoked directly. See also {@link #kill()}
*/
abstract void kill()
abstract protected void killTask()

/**
* Kill a job execution.
*
* @see #killTask()
*/
void kill() {
if (!killed.getAndSet(true)) {
killTask()
}
}

/**
* Submit the task for execution.
*
Expand Down Expand Up @@ -301,12 +316,4 @@ abstract class TaskHandler {
return workflowId ? "tw-${workflowId}-${name}" : name
}

private volatile boolean terminated

final void terminate() {
if (!terminated) {
terminated = true
kill()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.util.Duration
import nextflow.util.SysHelper
import nextflow.util.Threads
import nextflow.util.Throttle
import static nextflow.util.SysHelper.dumpThreads

/**
* Monitors the queued tasks waiting for their termination
*
Expand Down Expand Up @@ -471,7 +470,7 @@ class TaskPollingMonitor implements TaskMonitor {
}

protected dumpCurrentThreads() {
log.trace "Current running threads:\n${dumpThreads()}"
log.trace "Current running threads:\n${SysHelper.dumpThreads()}"
}

protected void dumpRunningQueue() {
Expand Down Expand Up @@ -581,7 +580,7 @@ class TaskPollingMonitor implements TaskMonitor {
catch (Throwable error) {
// At this point NF assumes job is not running, but there could be errors at monitoring that could leave a job running (#5516).
// In this case, NF needs to ensure the job is killed.
handler.terminate()
handler.kill()
handleException(handler, error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,13 @@ class K8sTaskHandlerTest extends Specification {
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> false
1 * client.podDelete(POD_NAME) >> null

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> true
0 * client.podDelete(POD_NAME) >> null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TaskPollingMonitorTest extends Specification {
then:
1 * session.disableJobsCancellation >> true
and:
0 * handler.kill() >> null
0 * handler.killTask() >> null
0 * session.notifyTaskComplete(handler) >> null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ class MockTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
* {@inheritDoc}
*/
@Override
void kill() {
protected void killTask() {
assert jobId
log.trace "[AWS BATCH] Process `${task.lazyName()}` - killing job=$jobId"
final targetId = normaliseJobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,23 +881,23 @@ class AwsBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> true
and:
1 * handler.terminateJob('job1') >> null

when:
handler.@jobId = 'job1:task2'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> true
and:
1 * handler.terminateJob('job1') >> null

when:
handler.@jobId = 'job1:task2'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> false
and:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( !taskKey )
return
batchService.terminate(taskKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( isActive() ) {
log.trace "[GOOGLE BATCH] Process `${task.lazyName()}` - deleting job name=$jobId"
if( executor.shouldDeleteJob(jobId) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class GoogleLifeSciencesTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( !operation ) return
log.debug "[GLS] Killing task > $task.name - Pipeline Id: $pipelineId"
helper.cancelOperation(operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> false
0 * executor.shouldDeleteJob('job1') >> true
Expand All @@ -568,7 +568,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> true
1 * executor.shouldDeleteJob('job1') >> true
Expand All @@ -577,7 +577,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> true
1 * executor.shouldDeleteJob('job1') >> false
Expand Down

0 comments on commit 2e14391

Please sign in to comment.