diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 228a8c6664..ad1a8ec2cd 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -1,9 +1,8 @@ from __future__ import annotations import copy -import uuid from concurrent.futures import Future -from typing import Any, Callable, Dict, Optional, Union +from typing import Any, Callable, Dict, Optional import typeguard @@ -17,8 +16,6 @@ except ImportError: _globus_compute_enabled = False -UUID_LIKE_T = Union[uuid.UUID, str] - class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints @@ -40,11 +37,11 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): @typeguard.typechecked def __init__( - self, - executor: Executor, - label: str = 'GlobusComputeExecutor', - resource_specification: Optional[dict] = None, - user_endpoint_config: Optional[dict] = None, + self, + executor: Executor, + label: str = 'GlobusComputeExecutor', + resource_specification: Optional[dict] = None, + user_endpoint_config: Optional[dict] = None, ): """ Parameters @@ -119,9 +116,15 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: else: user_endpoint_config = self.user_endpoint_config - self.executor.resource_specification = res_spec - self.executor.user_endpoint_config = user_endpoint_config - return self.executor.submit(func, *args, **kwargs) + orig_spec = self.executor.resource_specification + orig_uep_cof = self.executor.user_endpoint_config + try: + self.executor.resource_specification = res_spec + self.executor.user_endpoint_config = user_endpoint_config + return self.executor.submit(func, *args, **kwargs) + finally: + self.executor.resource_specification = orig_spec + self.executor.user_endpoint_config = orig_uep_cof def shutdown(self): """Clean-up the resources associated with the Executor. diff --git a/parsl/tests/unit/test_globus_compute_executor.py b/parsl/tests/unit/test_globus_compute_executor.py new file mode 100644 index 0000000000..810e3650de --- /dev/null +++ b/parsl/tests/unit/test_globus_compute_executor.py @@ -0,0 +1,102 @@ +import random +from unittest import mock + +import pytest +from globus_compute_sdk import Executor + +from parsl.executors import GlobusComputeExecutor + + +@pytest.fixture +def mock_ex(): + # Not Parsl's job to test GC's Executor + yield mock.Mock(spec=Executor) + + +@pytest.mark.local +def test_gc_executor_mock_spec(mock_ex): + # a test of tests -- make sure we're using spec= in the mock + with pytest.raises(AttributeError): + mock_ex.aasdf() + + +@pytest.mark.local +def test_gc_executor_label_default(mock_ex): + gce = GlobusComputeExecutor(mock_ex) + assert gce.label == type(gce).__name__, "Expect reasonable default label" + + +@pytest.mark.local +def test_gc_executor_label(mock_ex, randomstring): + exp_label = randomstring() + gce = GlobusComputeExecutor(mock_ex, label=exp_label) + assert gce.label == exp_label + + +@pytest.mark.local +def test_gc_executor_resets_spec_after_submit(mock_ex, randomstring): + submit_res = {randomstring(): "some submit res"} + res = {"some": randomstring(), "spec": randomstring()} + gce = GlobusComputeExecutor(mock_ex, resource_specification=res) + + fn = mock.Mock() + orig_res = mock_ex.resource_specification + orig_uep = mock_ex.user_endpoint_config + + def mock_submit(*a, **k): + assert mock_ex.resource_specification == submit_res, "Expect set for submission" + assert mock_ex.user_endpoint_config is None + mock_ex.submit.side_effect = mock_submit + + gce.submit(fn, resource_specification=submit_res) + + assert mock_ex.resource_specification == orig_res + assert mock_ex.user_endpoint_config is orig_uep + + +@pytest.mark.local +def test_gc_executor_resets_uep_after_submit(mock_ex, randomstring): + uep_conf = randomstring() + res = {"some": randomstring()} + gce = GlobusComputeExecutor(mock_ex) + + fn = mock.Mock() + orig_res = mock_ex.resource_specification + orig_uep = mock_ex.user_endpoint_config + + def mock_submit(*a, **k): + + assert mock_ex.resource_specification == res, "Expect set for submission" + assert mock_ex.user_endpoint_config == uep_conf, "Expect set for submission" + mock_ex.submit.side_effect = mock_submit + + gce.submit(fn, resource_specification={"user_endpoint_config": uep_conf, **res}) + + assert mock_ex.resource_specification == orig_res + assert mock_ex.user_endpoint_config is orig_uep + + +@pytest.mark.local +def test_gc_executor_happy_path(mock_ex, randomstring): + mock_fn = mock.Mock() + args = tuple(randomstring() for _ in range(random.randint(0, 3))) + kwargs = {randomstring(): randomstring() for _ in range(random.randint(0, 3))} + + gce = GlobusComputeExecutor(mock_ex) + gce.submit(mock_fn, {}, *args, **kwargs) + + assert mock_ex.submit.called, "Expect proxying of args to underlying executor" + found_a, found_k = mock_ex.submit.call_args + assert found_a[0] is mock_fn + assert found_a[1:] == args + assert found_k == kwargs + + +@pytest.mark.local +def test_gc_executor_shuts_down_asynchronously(mock_ex): + gce = GlobusComputeExecutor(mock_ex) + gce.shutdown() + assert mock_ex.shutdown.called + a, k = mock_ex.shutdown.call_args + assert k["wait"] is False + assert k["cancel_futures"] is True