Skip to content

Commit

Permalink
Use sqlalchemy 2.0 query syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Nov 14, 2023
1 parent db9d5b5 commit 194f5ad
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
14 changes: 6 additions & 8 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,32 @@ def __init__(

def setup_query(self):
if self.grab_model is model.Job:
grab_condition = self.grab_model.table.c.state == self.grab_model.states.NEW
grab_condition = self.grab_model.state == self.grab_model.states.NEW
elif self.grab_model is model.WorkflowInvocation:
grab_condition = self.grab_model.table.c.state.in_(
(self.grab_model.states.NEW, self.grab_model.states.CANCELLING)
)
grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING))
else:
raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented")
subq = (
select(self.grab_model.id)
.where(
and_(
self.grab_model.table.c.handler.in_(self.self_handler_tags),
self.grab_model.handler.in_(self.self_handler_tags),
grab_condition,
)
)
.order_by(self.grab_model.table.c.id)
.order_by(self.grab_model.id)
)
if self.max_grab:
subq = subq.limit(self.max_grab)
if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED:
subq = subq.with_for_update(skip_locked=True)
self._grab_query = (
self.grab_model.table.update()
.where(self.grab_model.table.c.id.in_(subq))
.where(self.grab_model.id.in_(subq))
.values(handler=self.app.config.server_name)
)
if self._supports_returning:
self._grab_query = self._grab_query.returning(self.grab_model.table.c.id)
self._grab_query = self._grab_query.returning(self.grab_model.id)
if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION:
self._grab_conn_opts["isolation_level"] = "SERIALIZABLE"
log.info(
Expand Down
11 changes: 6 additions & 5 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ def set_state(self, state: JobState) -> bool:
# generate statement that will not revert DELETING or DELETED back to anything non-terminal
rval = session.execute(
update(Job.table)
.where(Job.table.c.id == self.id, ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED)))
.where(Job.id == self.id, ~Job.state.in_((Job.states.DELETING, Job.states.DELETED)))
.values(state=state)
)
if rval.rowcount == 1:
Expand Down Expand Up @@ -8226,25 +8226,26 @@ def cancel(self):

def cancel_invocation_steps(self):
sa_session = object_session(self)
assert sa_session
job_subq = (
sa_session.query(Job.id)
select(Job.id)
.join(WorkflowInvocationStep)
.filter(WorkflowInvocationStep.workflow_invocation_id == self.id)
.filter(~Job.table.c.state.in_(Job.finished_states))
.filter(~Job.state.in_(Job.finished_states))
.with_for_update()
.scalar_subquery()
)
sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING}))

job_collection_subq = (
sa_session.query(Job.id)
select(Job.id)
.join(ImplicitCollectionJobsJobAssociation)
.join(ImplicitCollectionJobs)
.join(
WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id
)
.filter(WorkflowInvocationStep.workflow_invocation_id == self.id)
.filter(~Job.table.c.state.in_(Job.finished_states))
.filter(~Job.state.in_(Job.finished_states))
.with_for_update()
.subquery()
)
Expand Down

0 comments on commit 194f5ad

Please sign in to comment.