diff --git a/test/unit/data/data_access/test_misc.py b/test/unit/data/data_access/test_misc.py index 3a7b1ceedc1c..3d0030a3bff3 100644 --- a/test/unit/data/data_access/test_misc.py +++ b/test/unit/data/data_access/test_misc.py @@ -1,4 +1,5 @@ import random +import uuid import pytest from sqlalchemy import inspect @@ -6,6 +7,7 @@ from galaxy import model as m from galaxy.managers import hdcas as lib +from galaxy.model.orm.util import add_object_to_object_session from . import ( MockTransaction, PRIVATE_OBJECT_STORE_ID, @@ -309,3 +311,112 @@ def test_task_metrics(make_task): assert task.text_metrics[1].metric_name == "BIG_PATH" ## Ensure big values truncated assert len(task.text_metrics[1].metric_value) <= 1023 + + +def test_workflows(session, make_user, make_stored_workflow, make_hda, make_history): + + def workflow_from_steps(steps): + stored_workflow = make_stored_workflow() + workflow = m.Workflow() + workflow.steps = steps + workflow.stored_workflow = stored_workflow + return workflow + + def invocation_for_workflow(workflow): + workflow_invocation = m.WorkflowInvocation() + workflow_invocation.workflow = workflow + workflow_invocation.history = m.History() + workflow_invocation.state = "new" + return workflow_invocation + + child_workflow = workflow_from_steps([]) + + workflow_step_1 = m.WorkflowStep() + workflow_step_1.order_index = 0 + workflow_step_1.type = "data_input" + workflow_step_2 = m.WorkflowStep() + workflow_step_2.order_index = 1 + workflow_step_2.type = "subworkflow" + workflow_step_2.subworkflow = child_workflow + + workflow_step_1.get_or_add_input("moo1") + workflow_step_1.get_or_add_input("moo2") + workflow_step_2.get_or_add_input("moo") + workflow_step_1.add_connection("foo", "cow", workflow_step_2) + + workflow = workflow_from_steps([workflow_step_1, workflow_step_2]) + + session.add(workflow) + session.flush() + + annotation = m.WorkflowStepAnnotationAssociation() + add_object_to_object_session(annotation, workflow_step_1) + annotation.workflow_step = workflow_step_1 + + assert workflow_step_1.id is not None + workflow_invocation = invocation_for_workflow(workflow) + + invocation_uuid = uuid.uuid1() + + workflow_invocation.uuid = invocation_uuid + + workflow_invocation_step1 = m.WorkflowInvocationStep() + workflow_invocation_step1.workflow_invocation = workflow_invocation + workflow_invocation_step1.workflow_step = workflow_step_1 + + subworkflow_invocation = m.WorkflowInvocation() + workflow_invocation.attach_subworkflow_invocation_for_step(workflow_step_2, subworkflow_invocation) + + workflow_invocation_step2 = m.WorkflowInvocationStep() + workflow_invocation_step2.workflow_invocation = workflow_invocation + workflow_invocation_step2.workflow_step = workflow_step_2 + + h1 = workflow_invocation.history + make_hda(history=h1) + workflow_request_dataset = m.WorkflowRequestToInputDatasetAssociation() + workflow_request_dataset.workflow_invocation = workflow_invocation + workflow_request_dataset.workflow_step = workflow_step_1 + assert workflow_request_dataset is not None + assert workflow_invocation.id is not None + + # store IDs before expunging. However, why do we expunge here? + history_id = h1.id + workflow_id = workflow.id + session.flush() + session.expunge_all() + + loaded_invocation = session.get(m.WorkflowInvocation, workflow_invocation.id) + assert loaded_invocation + assert loaded_invocation.uuid == invocation_uuid, f"{loaded_invocation.uuid} != {invocation_uuid}" + assert loaded_invocation.history.id == history_id + + step_1, step_2 = loaded_invocation.workflow.steps + + assert not step_1.subworkflow + assert step_2.subworkflow + assert len(loaded_invocation.steps) == 2 + + subworkflow_invocation_assoc = loaded_invocation.get_subworkflow_invocation_association_for_step(step_2) + assert subworkflow_invocation_assoc is not None + assert isinstance(subworkflow_invocation_assoc.subworkflow_invocation, m.WorkflowInvocation) + assert isinstance(subworkflow_invocation_assoc.parent_workflow_invocation, m.WorkflowInvocation) + + assert subworkflow_invocation_assoc.subworkflow_invocation.history.id == history_id + + loaded_workflow = session.get(m.Workflow, workflow_id) + assert len(loaded_workflow.steps[0].annotations) == 1 + copied_workflow = loaded_workflow.copy(user=make_user()) + annotations = copied_workflow.steps[0].annotations + assert len(annotations) == 1 + + stored_workflow = loaded_workflow.stored_workflow + counts = stored_workflow.invocation_counts() + assert counts + + workflow_invocation_0 = invocation_for_workflow(loaded_workflow) + workflow_invocation_1 = invocation_for_workflow(loaded_workflow) + workflow_invocation_1.state = "scheduled" + session.add_all([workflow_invocation_0, workflow_invocation_1]) + counts = stored_workflow.invocation_counts() + assert counts.root["new"] == 2 + assert counts.root["scheduled"] == 1 diff --git a/test/unit/data/test_galaxy_mapping.py b/test/unit/data/test_galaxy_mapping.py index f5e5c69f26fe..36ae8dfc1420 100644 --- a/test/unit/data/test_galaxy_mapping.py +++ b/test/unit/data/test_galaxy_mapping.py @@ -17,10 +17,6 @@ from galaxy.model.base import transaction from galaxy.model.database_utils import create_database from galaxy.model.metadata import MetadataTempFile -from galaxy.model.orm.util import ( - add_object_to_object_session, - get_object_session, -) from galaxy.model.security import GalaxyRBACAgent from galaxy.objectstore import QuotaSourceMap from galaxy.util.unittest import TestCase @@ -343,113 +339,6 @@ def test_flush_refreshes(self): session.flush(model.GalaxySession()) assert "id" not in inspect(galaxy_model_object_new).unloaded - def test_workflows(self): - user = model.User(email="testworkflows@bx.psu.edu", password="password") - - child_workflow = _workflow_from_steps(user, []) - self.persist(child_workflow) - - workflow_step_1 = model.WorkflowStep() - workflow_step_1.order_index = 0 - workflow_step_1.type = "data_input" - workflow_step_2 = model.WorkflowStep() - workflow_step_2.order_index = 1 - workflow_step_2.type = "subworkflow" - add_object_to_object_session(workflow_step_2, child_workflow) - workflow_step_2.subworkflow = child_workflow - - workflow_step_1.get_or_add_input("moo1") - workflow_step_1.get_or_add_input("moo2") - workflow_step_2.get_or_add_input("moo") - workflow_step_1.add_connection("foo", "cow", workflow_step_2) - - workflow = _workflow_from_steps(user, [workflow_step_1, workflow_step_2]) - self.persist(workflow) - workflow_id = workflow.id - - annotation = model.WorkflowStepAnnotationAssociation() - annotation.annotation = "Test Step Annotation" - annotation.user = user - add_object_to_object_session(annotation, workflow_step_1) - annotation.workflow_step = workflow_step_1 - self.persist(annotation) - - assert workflow_step_1.id is not None - workflow_invocation = _invocation_for_workflow(user, workflow) - - invocation_uuid = uuid.uuid1() - - workflow_invocation.uuid = invocation_uuid - - workflow_invocation_step1 = model.WorkflowInvocationStep() - add_object_to_object_session(workflow_invocation_step1, workflow_invocation) - workflow_invocation_step1.workflow_invocation = workflow_invocation - workflow_invocation_step1.workflow_step = workflow_step_1 - - subworkflow_invocation = model.WorkflowInvocation() - workflow_invocation.attach_subworkflow_invocation_for_step(workflow_step_2, subworkflow_invocation) - - workflow_invocation_step2 = model.WorkflowInvocationStep() - add_object_to_object_session(workflow_invocation_step2, workflow_invocation) - workflow_invocation_step2.workflow_invocation = workflow_invocation - workflow_invocation_step2.workflow_step = workflow_step_2 - - h1 = workflow_invocation.history - add_object_to_object_session(workflow_invocation, h1) - d1 = self.new_hda(h1, name="1") - workflow_request_dataset = model.WorkflowRequestToInputDatasetAssociation() - add_object_to_object_session(workflow_request_dataset, workflow_invocation) - workflow_request_dataset.workflow_invocation = workflow_invocation - workflow_request_dataset.workflow_step = workflow_step_1 - workflow_request_dataset.dataset = d1 - self.persist(workflow_invocation) - assert workflow_request_dataset is not None - assert workflow_invocation.id is not None - - history_id = h1.id - self.expunge() - - loaded_invocation = self.model.session.get(model.WorkflowInvocation, workflow_invocation.id) - assert loaded_invocation.uuid == invocation_uuid, f"{loaded_invocation.uuid} != {invocation_uuid}" - assert loaded_invocation - assert loaded_invocation.history.id == history_id - - step_1, step_2 = loaded_invocation.workflow.steps - - assert not step_1.subworkflow - assert step_2.subworkflow - assert len(loaded_invocation.steps) == 2 - - subworkflow_invocation_assoc = loaded_invocation.get_subworkflow_invocation_association_for_step(step_2) - assert subworkflow_invocation_assoc is not None - assert isinstance(subworkflow_invocation_assoc.subworkflow_invocation, model.WorkflowInvocation) - assert isinstance(subworkflow_invocation_assoc.parent_workflow_invocation, model.WorkflowInvocation) - - assert subworkflow_invocation_assoc.subworkflow_invocation.history.id == history_id - - loaded_workflow = self.model.session.get(model.Workflow, workflow_id) - assert len(loaded_workflow.steps[0].annotations) == 1 - copied_workflow = loaded_workflow.copy(user=user) - annotations = copied_workflow.steps[0].annotations - assert len(annotations) == 1 - - stored_workflow = loaded_workflow.stored_workflow - counts = stored_workflow.invocation_counts() - assert counts - - workflow_invocation_0 = _invocation_for_workflow(user, loaded_workflow) - workflow_invocation_1 = _invocation_for_workflow(user, loaded_workflow) - workflow_invocation_1.state = "scheduled" - self.model.session.add(workflow_invocation_0) - self.model.session.add(workflow_invocation_1) - # self.persist(workflow_invocation_0) - # self.persist(workflow_invocation_1) - self.model.session.flush() - counts = stored_workflow.invocation_counts() - print(counts) - assert counts.root["new"] == 2 - assert counts.root["scheduled"] == 1 - def test_role_creation(self): security_agent = GalaxyRBACAgent(self.model) @@ -684,31 +573,6 @@ def _db_uri(cls): return postgres_url -def _invocation_for_workflow(user, workflow): - h1 = galaxy.model.History(name="WorkflowHistory1", user=user) - workflow_invocation = galaxy.model.WorkflowInvocation() - workflow_invocation.workflow = workflow - workflow_invocation.history = h1 - workflow_invocation.state = "new" - return workflow_invocation - - -def _workflow_from_steps(user, steps): - stored_workflow = galaxy.model.StoredWorkflow() - add_object_to_object_session(stored_workflow, user) - stored_workflow.user = user - workflow = galaxy.model.Workflow() - if steps: - for step in steps: - if get_object_session(step): - add_object_to_object_session(workflow, step) - break - - workflow.steps = steps - workflow.stored_workflow = stored_workflow - return workflow - - class MockObjectStore: def __init__(self, quota_source_map=None): self._quota_source_map = quota_source_map or QuotaSourceMap()