Skip to content

Commit

Permalink
fix tests and ignore tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AymenFJA committed Nov 8, 2023
1 parent dcb2c41 commit 2dc02bc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config

.PHONY: radical_local_test
radical_local_test:
LOCAL_SANDBOX=True pytest parsl/tests/ -k "not cleannet and not dynamically_loaded_module and not test_check_importlib_file_function and not test_apptimeout and not test_parallel_for and not test_file_staging" --config parsl/tests/configs/local_radical.py --random-order --durations 10
LOCAL_SANDBOX=True pytest parsl/tests/ -k "not cleannet and not issue363 and not dynamically_loaded_module and not test_check_importlib_file_function and not test_apptimeout and not test_parallel_for and not test_staging_https and not test_python_walltime and not test_fail and not test_join" --config parsl/tests/configs/local_radical.py --durations 10

.PHONY: config_local_test
config_local_test:
Expand Down
60 changes: 47 additions & 13 deletions parsl/executors/radical/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import queue
import logging
import inspect
import requests
import typeguard
import threading as mt

Expand All @@ -14,22 +15,27 @@

from functools import partial
from typing import Optional, Dict
from globus_sdk import GlobusError # noqa: F401
from pathlib import Path, PosixPath
from concurrent.futures import Future

from .rpex_resources import ResourceConfig

from parsl.app.errors import * # noqa: F403
from parsl.dataflow.errors import * # noqa: F403
from parsl.executors.errors import * # noqa: F403
from radical.pilot import PythonTask
from .rpex_resources import ResourceConfig
from parsl.data_provider.files import File
from parsl.utils import RepresentationMixin
from parsl.executors.base import ParslExecutor
from parsl.app.errors import AppException, BashExitFailure


RPEX = 'RPEX'
BASH = 'bash'
CWD = os.getcwd()
PYTHON = 'python'

os.environ["RADICAL_REPORT"] = "False"

CWD = os.getcwd()
PWD = os.path.abspath(os.path.dirname(__file__))

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -108,6 +114,9 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
project : Optional[str]
The project name for resource allocation. Default is None.
working_dir : str
The working dir to be used by the executor.
For more information: https://radicalpilot.readthedocs.io/en/stable/
"""

Expand All @@ -121,6 +130,7 @@ def __init__(self,
bulk_mode: bool = False,
project: Optional[str] = None,
partition: Optional[str] = None,
working_dir: Optional[str] = None,
access_schema: Optional[str] = None,
rpex_cfg: Optional[ResourceConfig] = None):

Expand All @@ -134,6 +144,7 @@ def __init__(self,
self.walltime = walltime
self.label = label
self.future_tasks: Dict[str, Future] = {}
self.working_dir = working_dir
self.cores = cores
self.gpus = gpus
self.run_dir = '.'
Expand Down Expand Up @@ -167,10 +178,11 @@ def task_state_cb(self, task, state):
if not task.description.get('use_mpi'):
return_value = rp.utils.deserialize_obj(eval(task.return_value))
parsl_task.set_result(return_value)
parsl_task.set_result(task.return_value)
else:
parsl_task.set_result(task.return_value)

elif state == rp.CANCELED:
parsl_task.set_exception(AppException(rp.CANCELED))
parsl_task.cancel()

elif state == rp.FAILED:
if task.description['mode'] in [rp.TASK_EXEC,
Expand All @@ -180,10 +192,13 @@ def task_state_cb(self, task, state):
if task.exit_code == 0:
parsl_task.set_result(int(task.exit_code))
else:
parsl_task.set_exception(BashExitFailure(task.name,
parsl_task.set_exception(BashExitFailure(task.name, # noqa: F405
task.exit_code))
else:
parsl_task.set_exception(eval(task.exception))
if task.exception:
parsl_task.set_exception(eval(task.exception))
else:
parsl_task.set_exception('unknow failure for this task')

def start(self):
"""Create the Pilot component and pass it.
Expand Down Expand Up @@ -379,6 +394,8 @@ def task_translate(self, tid, func, args, kwargs):
task.output_staging = self._stage_files(kwargs.get("outputs", []),
mode='out')

task.input_staging.extend(self._stage_files(list(args), mode='in'))

self._set_stdout_stderr(task, kwargs)

task.timeout = kwargs.get('walltime', 0.0)
Expand Down Expand Up @@ -427,23 +444,40 @@ def _set_stdout_stderr(self, task, kwargs):

def _stage_files(self, files, mode):
"""
a function to stage list of input/output a
a function to stage list of input/output
files between two locations.
"""
to_stage = []
files = [f for f in files if isinstance(f, File)]
for file in files:
if mode == 'in':
# a workaround RP not supportting
# staging https file
if file.scheme == 'https':
r = requests.get(file.url)
p = CWD + '/' + file.filename
with open(p, 'wb') as ff:
ff.write(r.content)
file = File(p)

f = {'source': file.url,
'action': rp.TRANSFER}
to_stage.append(f)

elif mode == 'out':
f = {'source': file.filename,
'target': file.url,
'action': rp.TRANSFER}
# this indicates that the user
# did not provided a specific
# output file and RP will stage out
# the task.output from pilot://task_folder
# to the CWD
if '/' not in file.url:
f = {'source': file.filename,
'target': file.url,
'action': rp.TRANSFER}
to_stage.append(f)
else:
raise ValueError('unknown staging mode')

to_stage.append(f)
return to_stage

def _bulk_collector(self):
Expand Down
4 changes: 0 additions & 4 deletions parsl/executors/radical/rpex_worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import radical.pilot as rp


# ------------------------------------------------------------------------------
#
class DefaultWorker(rp.raptor.DefaultWorker):

def _dispatch_func(self, task):
Expand All @@ -13,8 +11,6 @@ def _dispatch_func(self, task):
return out, err, ret, str(ser_val), exc


# ------------------------------------------------------------------------------
#
class MPIWorker(rp.raptor.MPIWorker):

def _dispatch_func(self, task):
Expand Down

0 comments on commit 2dc02bc

Please sign in to comment.