Skip to content

Commit

Permalink
Added features:
Browse files Browse the repository at this point in the history
- Adding tasks staging input based on Parsl magic keyword inputs
- Adding tasks staging output based on Parsl magic keyword outputs
- Fix resource config issue
- update tests accordingly
  • Loading branch information
AymenFJA committed Nov 3, 2023
1 parent 93977e7 commit 0e4c709
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config

.PHONY: radical_local_test
radical_local_test:
LOCAL_SANDBOX=True pytest parsl/tests/ -k "not cleannet and not dynamically_loaded_module and not test_check_importlib_file_function and not test_apptimeout and not test_parallel_for" --config parsl/tests/configs/local_radical.py --random-order --durations 10
LOCAL_SANDBOX=True pytest parsl/tests/ -k "not cleannet and not dynamically_loaded_module and not test_check_importlib_file_function and not test_apptimeout and not test_parallel_for and not test_file_staging" --config parsl/tests/configs/local_radical.py --random-order --durations 10

.PHONY: config_local_test
config_local_test:
Expand Down
27 changes: 27 additions & 0 deletions parsl/executors/radical/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .rpex_resources import ResourceConfig

from radical.pilot import PythonTask
from parsl.data_provider.files import File
from parsl.utils import RepresentationMixin
from parsl.executors.base import ParslExecutor
from parsl.app.errors import AppException, BashExitFailure
Expand Down Expand Up @@ -360,6 +361,11 @@ def task_translate(self, tid, func, args, kwargs):
task.gpus_per_rank = kwargs.get('gpus_per_rank', 0)
task.gpu_type = kwargs.get('gpu_type', '')
task.mem_per_rank = kwargs.get('mem_per_rank', 0)
task.input_staging = self._stage_files(kwargs.get("inputs", []),
mode='in')
task.output_staging = self._stage_files(kwargs.get("outputs", []),
mode='out')

stderr_stdout = ['stdout', 'stderr']
for k in stderr_stdout:
k_val = kwargs.get(k, '')
Expand All @@ -372,6 +378,27 @@ def task_translate(self, tid, func, args, kwargs):

return task

def _stage_files(self, files, mode):
"""
a function to stage input/output a
list of files between two locations.
"""
to_stage = []
files = [f for f in files if isinstance(f, File)]
for file in files:
if mode == 'in':
f = {'source': file.url,
'action': rp.TRANSFER}
elif mode == 'out':
f = {'source': file.filename,
'target': file.url,
'action': rp.TRANSFER}
else:
raise ValueError('unknown staging mode')

to_stage.append(f)
return to_stage

def _bulk_collector(self):

bulk = list()
Expand Down
1 change: 0 additions & 1 deletion parsl/executors/radical/rpex_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class ResourceConfig:
python_v: str = f'{sys.version_info[0]}.{sys.version_info[1]}'
worker_type: str = "DefaultWorker"

@classmethod
def _get_cfg_file(cls, path=None):
if "mpi" in cls.worker_type.lower() and \
"mpi4py" not in cls.pilot_env_setup:
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/local_radical_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
rpex_cfg = ResourceConfig()
rpex_cfg.pilot_env_setup.extend([parsl_src, "pytest"])
rpex_cfg.worker_cores_per_node = 7
rpex_cfg.worker_type = "MPI"


def fresh_config():
rpex_cfg.worker_type = "MPI"

return Config(
executors=[
Expand Down
4 changes: 2 additions & 2 deletions parsl/tests/test_radical/test_mpi_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


@parsl.python_app
def test_mpi_func(comm, msg, sleep, ranks):
def test_mpi_func(msg, sleep, ranks, comm=None):
import time
msg = 'hello %d/%d: %s' % (comm.rank, comm.size, msg)
time.sleep(sleep)
Expand All @@ -21,6 +21,6 @@ def test_radical_mpi(n=7):
# rank size should be > 1 for the
# radical runtime system to run this function in MPI env
for i in range(2, n):
t = test_mpi_func(None, msg='mpi.func.%06d' % i, sleep=1, ranks=i)
t = test_mpi_func(msg='mpi.func.%06d' % i, sleep=1, ranks=i, comm=None)
apps.append(t)
assert [len(app.result()) for app in apps] == list(range(2, n))

0 comments on commit 0e4c709

Please sign in to comment.