Skip to content

Commit

Permalink
Make invocation cancellation two-step process
Browse files Browse the repository at this point in the history
API / users set invocation to cancelling, scheduler then deletes
outputs. This avoids race conditions where the cancelled invocation
still generates new jobs.
  • Loading branch information
mvdbeek committed Oct 11, 2023
1 parent 385e855 commit 7a815c8
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 45 deletions.
31 changes: 1 addition & 30 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
false,
or_,
true,
update,
)
from sqlalchemy.orm import (
aliased,
Expand Down Expand Up @@ -405,45 +404,17 @@ def get_invocation_report(self, trans, invocation_id, **kwd):
target_format=target_format,
)

def cancel_invocation(self, trans, decoded_invocation_id: int):
def request_invocation_cancellation(self, trans, decoded_invocation_id: int):
workflow_invocation = self.get_invocation(trans, decoded_invocation_id)
cancelled = workflow_invocation.cancel()

if cancelled:
workflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request"))
trans.sa_session.add(workflow_invocation)
with transaction(trans.sa_session):
trans.sa_session.commit()

job_subq = (
trans.sa_session.query(model.Job.id)
.join(model.WorkflowInvocationStep)
.filter(model.WorkflowInvocationStep.workflow_invocation_id == decoded_invocation_id)
.filter(~model.Job.table.c.state.in_(model.Job.terminal_states))
.with_for_update().scalar_subquery()
)
log.debug("CANCEL STEP JOBS")
trans.sa_session.execute(update(model.Job.table).where(model.Job.id == job_subq).values({"state": model.Job.states.DELETING}))

job_collection_subq = (
trans.sa_session.query(model.Job.id)
.join(model.ImplicitCollectionJobsJobAssociation)
.join(model.ImplicitCollectionJobs)
.join(model.WorkflowInvocationStep, model.WorkflowInvocationStep.implicit_collection_jobs_id == model.ImplicitCollectionJobs.id)
.filter(model.WorkflowInvocationStep.workflow_invocation_id == decoded_invocation_id)
.filter(~model.Job.table.c.state.in_(model.Job.terminal_states))
.with_for_update().subquery()
)

log.debug("CANCEL STEP IMPLICIT JOBS")
trans.sa_session.execute(update(model.Job.table).where(model.Job.table.c.id.in_(job_collection_subq.element)).values({"state": model.Job.states.DELETING}))

with transaction(trans.sa_session):
trans.sa_session.commit()

for invocation in workflow_invocation.subworkflow_invocations:
self.cancel_invocation(trans, invocation.subworkflow_invocation_id)

return workflow_invocation

def get_invocation_step(self, trans, decoded_workflow_invocation_step_id):
Expand Down
65 changes: 56 additions & 9 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
from galaxy.model.orm.now import now
from galaxy.model.orm.util import add_object_to_object_session
from galaxy.objectstore import ObjectStore
from galaxy.schema.invocation import InvocationCancellationUserRequest
from galaxy.schema.schema import (
DatasetCollectionPopulatedState,
DatasetState,
Expand Down Expand Up @@ -1646,12 +1647,13 @@ def set_state(self, state):
# Nothing changed, no action needed
return False
session = object_session(self)
if session and self.id and not state in Job.terminal_states:
if session and self.id and state not in Job.terminal_states:
# generate statement that will not revert DELETING or DELETED back to anything non-terminal
rval = session.execute(update(Job.table).where(
Job.table.c.id == self.id,
~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED))
).values(state=state))
rval = session.execute(
update(Job.table)
.where(Job.table.c.id == self.id, ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED)))
.values(state=state)
)
if rval.rowcount == 1:
self.state_history.append(JobStateHistory(self))
return True
Expand Down Expand Up @@ -8087,6 +8089,7 @@ class states(str, Enum):
READY = "ready" # Workflow ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow has been scheduled.
CANCELLED = "cancelled"
CANCELLING = "cancelling" # invocation scheduler will cancel job in next iteration
FAILED = "failed"

non_terminal_states = [states.NEW, states.READY]
Expand Down Expand Up @@ -8132,11 +8135,54 @@ def active(self):
return self.state in [states.NEW, states.READY]

def cancel(self):
if not self.active:
return False
else:
self.state = WorkflowInvocation.states.CANCELLED
if self.state not in [WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED]:
# No use cancelling workflow again, for all others we may still want to be able to cancel
# remaining tool and workflow steps
self.state = WorkflowInvocation.states.CANCELLING
return True
return False

def cancel_invocation_steps(self):
sa_session = object_session(self)
job_subq = (
sa_session.query(Job.id)
.join(WorkflowInvocationStep)
.filter(WorkflowInvocationStep.workflow_invocation_id == self.id)
.filter(~Job.table.c.state.in_(Job.terminal_states))
.with_for_update()
.scalar_subquery()
)
sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING}))

job_collection_subq = (
sa_session.query(Job.id)
.join(ImplicitCollectionJobsJobAssociation)
.join(ImplicitCollectionJobs)
.join(
WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id
)
.filter(WorkflowInvocationStep.workflow_invocation_id == self.id)
.filter(~Job.table.c.state.in_(Job.terminal_states))
.with_for_update()
.subquery()
)

sa_session.execute(
update(Job.table)
.where(Job.table.c.id.in_(job_collection_subq.element))
.values({"state": Job.states.DELETING})
)

for invocation in self.subworkflow_invocations:
subworkflow_invocation = invocation.subworkflow_invocation
cancelled = subworkflow_invocation.cancel()
if cancelled:
subworkflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request"))
sa_session.add(subworkflow_invocation)
sa_session.commit()

def mark_cancelled(self):
self.state = WorkflowInvocation.states.CANCELLED

def fail(self):
self.state = WorkflowInvocation.states.FAILED
Expand Down Expand Up @@ -8189,6 +8235,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None):
or_(
WorkflowInvocation.state == WorkflowInvocation.states.NEW,
WorkflowInvocation.state == WorkflowInvocation.states.READY,
WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING,
),
]
if scheduler is not None:
Expand Down
4 changes: 3 additions & 1 deletion lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,9 @@ def cancel_invocation(self, trans: ProvidesUserContext, invocation_id, **kwd):
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_workflow_invocation_id = self.decode_id(invocation_id)
workflow_invocation = self.workflow_manager.cancel_invocation(trans, decoded_workflow_invocation_id)
workflow_invocation = self.workflow_manager.request_invocation_cancellation(
trans, decoded_workflow_invocation_id
)
return self.__encode_invocation(workflow_invocation, **kwd)

@expose_api
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ def subworkflow_invoker(
when_values=None,
) -> WorkflowInvoker:
subworkflow_invocation = self._subworkflow_invocation(step)
subworkflow_invocation.handler = self.workflow_invocation.handler
subworkflow_invocation.scheduler = self.workflow_invocation.scheduler
workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job)
subworkflow_progress = self.subworkflow_progress(
subworkflow_invocation,
Expand Down
6 changes: 6 additions & 0 deletions lib/galaxy/workflow/scheduling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler):
workflow_invocation = session.get(model.WorkflowInvocation, invocation_id)

try:
if workflow_invocation.state == workflow_invocation.states.CANCELLING:
workflow_invocation.cancel_invocation_steps()
workflow_invocation.state = workflow_invocation.states.CANCELLED
session.commit()
return False

if not workflow_invocation or not workflow_invocation.active:
return False

Expand Down
10 changes: 8 additions & 2 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4039,12 +4039,18 @@ def test_cancel_workflow_invocation_deletes_jobs(self):
assert invocation_before_cancellation["state"] == "scheduled"
subworkflow_invocation_id = invocation_before_cancellation["steps"][2]["subworkflow_invocation_id"]
self.workflow_populator.cancel_invocation(summary.invocation_id)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id,
workflow_id=summary.workflow_id,
invocation_id=summary.invocation_id,
assert_ok=False,
)
invocation_jobs = self.workflow_populator.get_invocation_jobs(summary.invocation_id)
for job in invocation_jobs:
assert job["state"] == "deleted" or job["state"] == "deleting"
assert job["state"] == "deleted"
subworkflow_invocation_jobs = self.workflow_populator.get_invocation_jobs(subworkflow_invocation_id)
for job in subworkflow_invocation_jobs:
assert job["state"] == "deleted" or job["state"] == "deleting"
assert job["state"] == "deleted"

def test_workflow_failed_output_not_found(self, history_id):
summary = self._run_workflow(
Expand Down
19 changes: 16 additions & 3 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,9 @@ def invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]:

def active_history_jobs(self, history_id: str) -> list:
all_history_jobs = self.history_jobs(history_id)
active_jobs = [j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]]
active_jobs = [
j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running", "deleting"]
]
return active_jobs

def cancel_job(self, job_id: str) -> Response:
Expand Down Expand Up @@ -2072,7 +2074,7 @@ def get_invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]:
def wait_for_invocation_and_jobs(
self, history_id: str, workflow_id: str, invocation_id: str, assert_ok: bool = True
) -> None:
state = self.wait_for_invocation(workflow_id, invocation_id)
state = self.wait_for_invocation(workflow_id, invocation_id, assert_ok=assert_ok)
if assert_ok:
assert state == "scheduled", state
time.sleep(0.5)
Expand Down Expand Up @@ -3095,7 +3097,18 @@ def get_state():
return state

if skip_states is None:
skip_states = ["running", "queued", "new", "ready", "stop", "stopped", "setting_metadata", "waiting"]
skip_states = [
"running",
"queued",
"new",
"ready",
"stop",
"stopped",
"setting_metadata",
"waiting",
"cancelling",
"deleting",
]
if ok_states is None:
ok_states = ["ok", "scheduled", "deferred"]
# Remove ok_states from skip_states, so we can wait for a state to becoming running
Expand Down

0 comments on commit 7a815c8

Please sign in to comment.