From 194f5ad5df3e606a01addf7d8de249f1d51ef2eb Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 14 Nov 2023 09:56:13 +0100 Subject: [PATCH] Use sqlalchemy 2.0 query syntax --- lib/galaxy/jobs/handler.py | 14 ++++++-------- lib/galaxy/model/__init__.py | 11 ++++++----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 977293cb2117..359fdeddfc89 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -125,22 +125,20 @@ def __init__( def setup_query(self): if self.grab_model is model.Job: - grab_condition = self.grab_model.table.c.state == self.grab_model.states.NEW + grab_condition = self.grab_model.state == self.grab_model.states.NEW elif self.grab_model is model.WorkflowInvocation: - grab_condition = self.grab_model.table.c.state.in_( - (self.grab_model.states.NEW, self.grab_model.states.CANCELLING) - ) + grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING)) else: raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented") subq = ( select(self.grab_model.id) .where( and_( - self.grab_model.table.c.handler.in_(self.self_handler_tags), + self.grab_model.handler.in_(self.self_handler_tags), grab_condition, ) ) - .order_by(self.grab_model.table.c.id) + .order_by(self.grab_model.id) ) if self.max_grab: subq = subq.limit(self.max_grab) @@ -148,11 +146,11 @@ def setup_query(self): subq = subq.with_for_update(skip_locked=True) self._grab_query = ( self.grab_model.table.update() - .where(self.grab_model.table.c.id.in_(subq)) + .where(self.grab_model.id.in_(subq)) .values(handler=self.app.config.server_name) ) if self._supports_returning: - self._grab_query = self._grab_query.returning(self.grab_model.table.c.id) + self._grab_query = self._grab_query.returning(self.grab_model.id) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._grab_conn_opts["isolation_level"] = "SERIALIZABLE" log.info( diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index ac2abf82df32..3c19fe0384a6 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1644,7 +1644,7 @@ def set_state(self, state: JobState) -> bool: # generate statement that will not revert DELETING or DELETED back to anything non-terminal rval = session.execute( update(Job.table) - .where(Job.table.c.id == self.id, ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED))) + .where(Job.id == self.id, ~Job.state.in_((Job.states.DELETING, Job.states.DELETED))) .values(state=state) ) if rval.rowcount == 1: @@ -8226,25 +8226,26 @@ def cancel(self): def cancel_invocation_steps(self): sa_session = object_session(self) + assert sa_session job_subq = ( - sa_session.query(Job.id) + select(Job.id) .join(WorkflowInvocationStep) .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) - .filter(~Job.table.c.state.in_(Job.finished_states)) + .filter(~Job.state.in_(Job.finished_states)) .with_for_update() .scalar_subquery() ) sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING})) job_collection_subq = ( - sa_session.query(Job.id) + select(Job.id) .join(ImplicitCollectionJobsJobAssociation) .join(ImplicitCollectionJobs) .join( WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id ) .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) - .filter(~Job.table.c.state.in_(Job.finished_states)) + .filter(~Job.state.in_(Job.finished_states)) .with_for_update() .subquery() )