-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Convert, trim test_workflows (all asserts unchanged)
- Loading branch information
Showing
2 changed files
with
111 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,10 +17,6 @@ | |
from galaxy.model.base import transaction | ||
from galaxy.model.database_utils import create_database | ||
from galaxy.model.metadata import MetadataTempFile | ||
from galaxy.model.orm.util import ( | ||
add_object_to_object_session, | ||
get_object_session, | ||
) | ||
from galaxy.model.security import GalaxyRBACAgent | ||
from galaxy.objectstore import QuotaSourceMap | ||
from galaxy.util.unittest import TestCase | ||
|
@@ -343,113 +339,6 @@ def test_flush_refreshes(self): | |
session.flush(model.GalaxySession()) | ||
assert "id" not in inspect(galaxy_model_object_new).unloaded | ||
|
||
def test_workflows(self): | ||
user = model.User(email="[email protected]", password="password") | ||
|
||
child_workflow = _workflow_from_steps(user, []) | ||
self.persist(child_workflow) | ||
|
||
workflow_step_1 = model.WorkflowStep() | ||
workflow_step_1.order_index = 0 | ||
workflow_step_1.type = "data_input" | ||
workflow_step_2 = model.WorkflowStep() | ||
workflow_step_2.order_index = 1 | ||
workflow_step_2.type = "subworkflow" | ||
add_object_to_object_session(workflow_step_2, child_workflow) | ||
workflow_step_2.subworkflow = child_workflow | ||
|
||
workflow_step_1.get_or_add_input("moo1") | ||
workflow_step_1.get_or_add_input("moo2") | ||
workflow_step_2.get_or_add_input("moo") | ||
workflow_step_1.add_connection("foo", "cow", workflow_step_2) | ||
|
||
workflow = _workflow_from_steps(user, [workflow_step_1, workflow_step_2]) | ||
self.persist(workflow) | ||
workflow_id = workflow.id | ||
|
||
annotation = model.WorkflowStepAnnotationAssociation() | ||
annotation.annotation = "Test Step Annotation" | ||
annotation.user = user | ||
add_object_to_object_session(annotation, workflow_step_1) | ||
annotation.workflow_step = workflow_step_1 | ||
self.persist(annotation) | ||
|
||
assert workflow_step_1.id is not None | ||
workflow_invocation = _invocation_for_workflow(user, workflow) | ||
|
||
invocation_uuid = uuid.uuid1() | ||
|
||
workflow_invocation.uuid = invocation_uuid | ||
|
||
workflow_invocation_step1 = model.WorkflowInvocationStep() | ||
add_object_to_object_session(workflow_invocation_step1, workflow_invocation) | ||
workflow_invocation_step1.workflow_invocation = workflow_invocation | ||
workflow_invocation_step1.workflow_step = workflow_step_1 | ||
|
||
subworkflow_invocation = model.WorkflowInvocation() | ||
workflow_invocation.attach_subworkflow_invocation_for_step(workflow_step_2, subworkflow_invocation) | ||
|
||
workflow_invocation_step2 = model.WorkflowInvocationStep() | ||
add_object_to_object_session(workflow_invocation_step2, workflow_invocation) | ||
workflow_invocation_step2.workflow_invocation = workflow_invocation | ||
workflow_invocation_step2.workflow_step = workflow_step_2 | ||
|
||
h1 = workflow_invocation.history | ||
add_object_to_object_session(workflow_invocation, h1) | ||
d1 = self.new_hda(h1, name="1") | ||
workflow_request_dataset = model.WorkflowRequestToInputDatasetAssociation() | ||
add_object_to_object_session(workflow_request_dataset, workflow_invocation) | ||
workflow_request_dataset.workflow_invocation = workflow_invocation | ||
workflow_request_dataset.workflow_step = workflow_step_1 | ||
workflow_request_dataset.dataset = d1 | ||
self.persist(workflow_invocation) | ||
assert workflow_request_dataset is not None | ||
assert workflow_invocation.id is not None | ||
|
||
history_id = h1.id | ||
self.expunge() | ||
|
||
loaded_invocation = self.model.session.get(model.WorkflowInvocation, workflow_invocation.id) | ||
assert loaded_invocation.uuid == invocation_uuid, f"{loaded_invocation.uuid} != {invocation_uuid}" | ||
assert loaded_invocation | ||
assert loaded_invocation.history.id == history_id | ||
|
||
step_1, step_2 = loaded_invocation.workflow.steps | ||
|
||
assert not step_1.subworkflow | ||
assert step_2.subworkflow | ||
assert len(loaded_invocation.steps) == 2 | ||
|
||
subworkflow_invocation_assoc = loaded_invocation.get_subworkflow_invocation_association_for_step(step_2) | ||
assert subworkflow_invocation_assoc is not None | ||
assert isinstance(subworkflow_invocation_assoc.subworkflow_invocation, model.WorkflowInvocation) | ||
assert isinstance(subworkflow_invocation_assoc.parent_workflow_invocation, model.WorkflowInvocation) | ||
|
||
assert subworkflow_invocation_assoc.subworkflow_invocation.history.id == history_id | ||
|
||
loaded_workflow = self.model.session.get(model.Workflow, workflow_id) | ||
assert len(loaded_workflow.steps[0].annotations) == 1 | ||
copied_workflow = loaded_workflow.copy(user=user) | ||
annotations = copied_workflow.steps[0].annotations | ||
assert len(annotations) == 1 | ||
|
||
stored_workflow = loaded_workflow.stored_workflow | ||
counts = stored_workflow.invocation_counts() | ||
assert counts | ||
|
||
workflow_invocation_0 = _invocation_for_workflow(user, loaded_workflow) | ||
workflow_invocation_1 = _invocation_for_workflow(user, loaded_workflow) | ||
workflow_invocation_1.state = "scheduled" | ||
self.model.session.add(workflow_invocation_0) | ||
self.model.session.add(workflow_invocation_1) | ||
# self.persist(workflow_invocation_0) | ||
# self.persist(workflow_invocation_1) | ||
self.model.session.flush() | ||
counts = stored_workflow.invocation_counts() | ||
print(counts) | ||
assert counts.root["new"] == 2 | ||
assert counts.root["scheduled"] == 1 | ||
|
||
def test_role_creation(self): | ||
security_agent = GalaxyRBACAgent(self.model) | ||
|
||
|
@@ -684,31 +573,6 @@ def _db_uri(cls): | |
return postgres_url | ||
|
||
|
||
def _invocation_for_workflow(user, workflow): | ||
h1 = galaxy.model.History(name="WorkflowHistory1", user=user) | ||
workflow_invocation = galaxy.model.WorkflowInvocation() | ||
workflow_invocation.workflow = workflow | ||
workflow_invocation.history = h1 | ||
workflow_invocation.state = "new" | ||
return workflow_invocation | ||
|
||
|
||
def _workflow_from_steps(user, steps): | ||
stored_workflow = galaxy.model.StoredWorkflow() | ||
add_object_to_object_session(stored_workflow, user) | ||
stored_workflow.user = user | ||
workflow = galaxy.model.Workflow() | ||
if steps: | ||
for step in steps: | ||
if get_object_session(step): | ||
add_object_to_object_session(workflow, step) | ||
break | ||
|
||
workflow.steps = steps | ||
workflow.stored_workflow = stored_workflow | ||
return workflow | ||
|
||
|
||
class MockObjectStore: | ||
def __init__(self, quota_source_map=None): | ||
self._quota_source_map = quota_source_map or QuotaSourceMap() | ||
|