diff --git a/.flake8 b/.flake8 index f2ac7f4e9a..951ea6fe5f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,4 @@ [flake8] -# E124: closing bracket does not match visual indentation # E126: continuation line over-indented for hanging indent # This one is bad. Sometimes ordering matters, conditional imports # setting env vars necessary etc. @@ -8,7 +7,7 @@ # https://github.com/PyCQA/pycodestyle/issues/386 # W504: line break after binary operator # (Raised by flake8 even when it is followed) -ignore = E124, E126, E402, E129, W504 +ignore = E126, E402, E129, W504 max-line-length = 158 exclude = test_import_fail.py, parsl/executors/workqueue/parsl_coprocess.py diff --git a/Makefile b/Makefile index f8c3c4866f..78c7f59fba 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ CCTOOLS_INSTALL := /tmp/cctools MPICH=mpich OPENMPI=openmpi export PATH := $(CCTOOLS_INSTALL)/bin/:$(PATH) -export CCTOOLS_VERSION=7.7.1 +export CCTOOLS_VERSION=7.7.2 export HYDRA_LAUNCHER=fork export OMPI_MCA_rmaps_base_oversubscribe=yes MPI=$(MPICH) diff --git a/mypy.ini b/mypy.ini index 531337257b..2597ffaa4e 100644 --- a/mypy.ini +++ b/mypy.ini @@ -73,10 +73,8 @@ disallow_untyped_defs = True disallow_any_expr = True [mypy-parsl.executors.high_throughput.interchange.*] -check_untyped_defs = True - -[mypy-parsl.executors.extreme_scale.*] -ignore_errors = True +disallow_untyped_defs = True +warn_unreachable = True [mypy-parsl.monitoring.*] disallow_untyped_decorators = True diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 0069ba34ff..8c50a6efcf 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -4,33 +4,21 @@ class Channel(metaclass=ABCMeta): - """For certain resources such as campus clusters or supercomputers at + """Channels are abstractions that enable ExecutionProviders to talk to + resource managers of remote compute facilities. + + For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance some resources may allow access to their job schedulers from - only their login-nodes which require you to authenticate on through SSH, - GSI-SSH and sometimes even require two factor authentication. Channels are - simple abstractions that enable the ExecutionProvider component to talk to - the resource managers of compute facilities. The simplest Channel, - *LocalChannel*, simply executes commands locally on a shell, while the - *SshChannel* authenticates you to remote systems. - - Channels are usually called via the execute_wait function. - For channels that execute remotely, a push_file function allows you to copy over files. - - .. code:: python - - +------------------ - | - cmd, wtime ------->| execute_wait - (ec, stdout, stderr)<-|---+ - | - src, dst_dir ------->| push_file - dst_path <--------|----+ - | - dst_script_dir <------| script_dir - | - +------------------- + only their login-nodes which require you to authenticate through SSH, or + require two factor authentication. + + The simplest Channel, *LocalChannel*, executes commands locally in a + shell, while the *SSHChannel* authenticates you to remote systems. + Channels provide the ability to execute commands remotely, using the + execute_wait method, and manipulate the remote file system using methods + such as push_file, pull_file and makedirs. Channels should ensure that each launched command runs in a new process group, so that providers (such as AdHocProvider and LocalProvider) which diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py index f92768f7f4..a289fb9457 100644 --- a/parsl/configs/ad_hoc.py +++ b/parsl/configs/ad_hoc.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] + } } -} config = Config( @@ -26,7 +26,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index c494346533..69d278b402 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -984,6 +984,7 @@ def submit(self, 'joins': None, 'try_id': 0, 'id': task_id, + 'task_launch_lock': threading.Lock(), 'time_invoked': datetime.datetime.now(), 'time_returned': None, 'try_time_launched': None, @@ -1029,8 +1030,6 @@ def submit(self, task_record['func_name'], waiting_message)) - task_record['task_launch_lock'] = threading.Lock() - app_fu.add_done_callback(partial(self.handle_app_update, task_record)) self.update_task_state(task_record, States.pending) logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu'])) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 2ae6dab314..94d0f135a4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -479,10 +479,10 @@ def _start_local_interchange_process(self): "heartbeat_threshold": self.heartbeat_threshold, "poll_period": self.poll_period, "logging_level": logging.DEBUG if self.worker_debug else logging.INFO - }, + }, daemon=True, name="HTEX-Interchange" - ) + ) self.interchange_proc.start() try: (self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 6c4ca961ec..67b70aa78d 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import multiprocessing import zmq import os import sys @@ -13,7 +14,7 @@ import threading import json -from typing import cast, Any, Dict, Set, Optional +from typing import cast, Any, Dict, NoReturn, Sequence, Set, Optional, Tuple from parsl.utils import setproctitle from parsl.version import VERSION as PARSL_VERSION @@ -36,23 +37,23 @@ class ManagerLost(Exception): ''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats have been missed. ''' - def __init__(self, manager_id, hostname): + def __init__(self, manager_id: bytes, hostname: str) -> None: self.manager_id = manager_id self.tstamp = time.time() self.hostname = hostname - def __str__(self): + def __str__(self) -> str: return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname) class VersionMismatch(Exception): ''' Manager and Interchange versions do not match ''' - def __init__(self, interchange_version, manager_version): + def __init__(self, interchange_version: str, manager_version: str): self.interchange_version = interchange_version self.manager_version = manager_version - def __str__(self): + def __str__(self) -> str: return "Manager version info {} does not match interchange version info {}, causing a critical failure".format( self.manager_version, self.interchange_version) @@ -67,18 +68,18 @@ class Interchange: 4. Service single and batch requests from workers """ def __init__(self, - client_address="127.0.0.1", + client_address: str = "127.0.0.1", interchange_address: Optional[str] = None, - client_ports=(50055, 50056, 50057), - worker_ports=None, - worker_port_range=(54000, 55000), - hub_address=None, - hub_port=None, - heartbeat_threshold=60, - logdir=".", - logging_level=logging.INFO, - poll_period=10, - ) -> None: + client_ports: Tuple[int, int, int] = (50055, 50056, 50057), + worker_ports: Optional[Tuple[int, int]] = None, + worker_port_range: Tuple[int, int] = (54000, 55000), + hub_address: Optional[str] = None, + hub_port: Optional[int] = None, + heartbeat_threshold: int = 60, + logdir: str = ".", + logging_level: int = logging.INFO, + poll_period: int = 10, + ) -> None: """ Parameters ---------- @@ -191,7 +192,7 @@ def __init__(self, logger.info("Platform info: {}".format(self.current_platform)) - def get_tasks(self, count): + def get_tasks(self, count: int) -> Sequence[dict]: """ Obtains a batch of tasks from the internal pending_task_queue Parameters @@ -216,7 +217,7 @@ def get_tasks(self, count): return tasks @wrap_with_logs(target="interchange") - def task_puller(self): + def task_puller(self) -> NoReturn: """Pull tasks from the incoming tasks zmq pipe onto the internal pending task queue """ @@ -237,7 +238,7 @@ def task_puller(self): task_counter += 1 logger.debug(f"Fetched {task_counter} tasks so far") - def _create_monitoring_channel(self): + def _create_monitoring_channel(self) -> Optional[zmq.Socket]: if self.hub_address and self.hub_port: logger.info("Connecting to monitoring") hub_channel = self.context.socket(zmq.DEALER) @@ -248,7 +249,7 @@ def _create_monitoring_channel(self): else: return None - def _send_monitoring_info(self, hub_channel, manager: ManagerRecord): + def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None: if hub_channel: logger.info("Sending message {} to hub".format(manager)) @@ -259,7 +260,7 @@ def _send_monitoring_info(self, hub_channel, manager: ManagerRecord): hub_channel.send_pyobj((MessageType.NODE_INFO, d)) @wrap_with_logs(target="interchange") - def _command_server(self): + def _command_server(self) -> NoReturn: """ Command server to run async command to the interchange """ logger.debug("Command Server Starting") @@ -326,7 +327,7 @@ def _command_server(self): continue @wrap_with_logs - def start(self): + def start(self) -> None: """ Start the interchange """ @@ -382,7 +383,7 @@ def start(self): logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) logger.warning("Exiting") - def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill_event): + def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None: # Listen for requests for work if self.task_outgoing in self.socks and self.socks[self.task_outgoing] == zmq.POLLIN: logger.debug("starting task_outgoing section") @@ -425,7 +426,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill self.current_platform['parsl_v']), "py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0], msg['parsl_v']) - ) + ) result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)} pkl_package = pickle.dumps(result_package) self.results_outgoing.send(pkl_package) @@ -448,7 +449,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill logger.error("Unexpected non-heartbeat message received from manager {}") logger.debug("leaving task_outgoing section") - def process_tasks_to_send(self, interesting_managers): + def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # If we had received any requests, check if there are tasks that could be passed logger.debug("Managers count (interesting/total): {interesting}/{total}".format( @@ -474,14 +475,14 @@ def process_tasks_to_send(self, interesting_managers): tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {}".format(tids, manager_id)) + logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {} is now saturated".format(manager_id)) + logger.debug("Manager {!r} is now saturated".format(manager_id)) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) @@ -490,7 +491,7 @@ def process_tasks_to_send(self, interesting_managers): else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") - def process_results_incoming(self, interesting_managers, hub_channel): + def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: # Receive any results and forward to client if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN: logger.debug("entering results_incoming section") @@ -508,6 +509,12 @@ def process_results_incoming(self, interesting_managers, hub_channel): # process this for task ID and forward to executor b_messages.append((p_message, r)) elif r['type'] == 'monitoring': + # the monitoring code makes the assumption that no + # monitoring messages will be received if monitoring + # is not configured, and that hub_channel will only + # be None when monitoring is not configurated. + assert hub_channel is not None + hub_channel.send_pyobj(r['payload']) elif r['type'] == 'heartbeat': logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") @@ -552,7 +559,7 @@ def process_results_incoming(self, interesting_managers, hub_channel): interesting_managers.add(manager_id) logger.debug("leaving results_incoming section") - def expire_bad_managers(self, interesting_managers, hub_channel): + def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: @@ -576,7 +583,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel): interesting_managers.remove(manager_id) -def start_file_logger(filename, level=logging.DEBUG, format_string=None): +def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: Optional[str] = None) -> None: """Add a stream log handler. Parameters @@ -608,7 +615,7 @@ def start_file_logger(filename, level=logging.DEBUG, format_string=None): @wrap_with_logs(target="interchange") -def starter(comm_q, *args, **kwargs): +def starter(comm_q: multiprocessing.Queue, *args: Any, **kwargs: Any) -> None: """Start the interchange process The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index f65ec560e5..b95a979221 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -234,7 +234,7 @@ def create_reg_message(self): 'dir': os.getcwd(), 'cpu_count': psutil.cpu_count(logical=False), 'total_memory': psutil.virtual_memory().total, - } + } b_msg = json.dumps(msg).encode('utf-8') return b_msg @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue logger.exception("Caught exception while trying to pickle the result package") pkl_package = pickle.dumps({'type': 'result', 'task_id': tid, 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info())) - }) + }) result_queue.put(pkl_package) tasks_in_progress.pop(worker_id) diff --git a/parsl/executors/taskvine/factory.py b/parsl/executors/taskvine/factory.py index 614413705d..484877d1bd 100644 --- a/parsl/executors/taskvine/factory.py +++ b/parsl/executors/taskvine/factory.py @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config): else: factory = Factory(batch_type=factory_config.batch_type, manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}", - ) + ) except Exception as e: raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}') diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 243b1c911a..8eb544a7c2 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -449,7 +449,7 @@ def submit(self, func, resource_specification, *args, **kwargs): input_files = [] output_files = [] - # Determine the input and output files that will exist at the workes: + # Determine the input and output files that will exist at the workers: input_files += [self._register_file(f) for f in kwargs.get("inputs", []) if isinstance(f, File)] output_files += [self._register_file(f) for f in kwargs.get("outputs", []) if isinstance(f, File)] diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 783ee8d098..94d33c27d1 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -194,10 +194,10 @@ def start(self, run_id: str, run_dir: str) -> int: "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "run_id": run_id - }, + }, name="Monitoring-Router-Process", daemon=True, - ) + ) self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, @@ -205,10 +205,10 @@ def start(self, run_id: str, run_dir: str) -> int: kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, - }, + }, name="Monitoring-DBM-Process", daemon=True, - ) + ) self.dbm_proc.start() self.logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid)) @@ -216,7 +216,7 @@ def start(self, run_id: str, run_dir: str) -> int: args=(self.logdir, self.resource_msgs, run_dir), name="Monitoring-Filesystem-Process", daemon=True - ) + ) self.filesystem_proc.start() self.logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") @@ -359,7 +359,7 @@ def __init__(self, run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3 # in seconds - ): + ): """ Initializes a monitoring configuration class. Parameters diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d42e0079b4..7861891d74 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -143,7 +143,7 @@ def send_first_last_message(try_id: int, 'first_msg': not is_last, 'last_msg': is_last, 'timestamp': datetime.datetime.now() - }) + }) radio.send(msg) return diff --git a/parsl/monitoring/visualization/plots/default/workflow_plots.py b/parsl/monitoring/visualization/plots/default/workflow_plots.py index 4cf876b188..ac5ae47285 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_plots.py @@ -22,7 +22,7 @@ 'exec_done': 'rgb(0, 200, 0)', 'memo_done': 'rgb(64, 200, 64)', 'fail_retryable': 'rgb(200, 128,128)' - } + } def task_gantt_plot(df_task, df_status, time_completed=None): @@ -50,7 +50,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': status['timestamp'], 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) last_status = status @@ -60,7 +60,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': time_completed, 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) fig = ff.create_gantt(parsl_tasks, @@ -205,7 +205,7 @@ def y_axis_setup(value): "fail_retryable": (8, 'rgb(200, 128,128)'), "joining": (9, 'rgb(128, 128, 255)'), "running_ended": (10, 'rgb(64, 64, 255)') - } + } def workflow_dag_plot(df_tasks, group_by_apps=True): diff --git a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py index 44e7d31a22..44ffe8ed98 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py @@ -164,7 +164,7 @@ def worker_efficiency(task, node): y=[total_workers] * (end - start + 1), name='Total of workers in whole run', ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title='Number of workers'), @@ -230,7 +230,7 @@ def resource_efficiency(resource, node, label): y=[total] * (end - start + 1), name=name2, ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title=yaxis), diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py index 83df8e2991..5c90d27918 100644 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ b/parsl/tests/configs/ad_hoc_cluster_htex.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } -} # type: Dict[str, Dict[str, Any]] + } + } # type: Dict[str, Dict[str, Any]] config = Config( executors=[ @@ -25,7 +25,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py index 2e319ed541..80949f1d1e 100644 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ b/parsl/tests/configs/htex_ad_hoc_cluster.py @@ -20,7 +20,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 3ab4305c74..81b9095285 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -8,4 +8,4 @@ hub_port=55055, resource_monitoring_interval=3, ) - ) + ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index a35493810e..fcfd2a594e 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -243,7 +243,7 @@ def setup_data(tmpd_cwd): @pytest.fixture(autouse=True, scope='function') -def wait_for_task_completion(pytestconfig): +def assert_no_outstanding_tasks(pytestconfig): """If we're in a config-file based mode, wait for task completion between each test. This will detect early on (by hanging) if particular test tasks are not finishing, rather than silently falling off the end of @@ -254,7 +254,11 @@ def wait_for_task_completion(pytestconfig): config = pytestconfig.getoption('config')[0] yield if config != 'local': - parsl.dfk().wait_for_current_tasks() + logger.info("Checking no outstanding tasks") + for task_record in parsl.dfk().tasks.values(): + fut = task_record['app_fu'] + assert fut.done(), f"Incomplete task found, task id {task_record['id']}" + logger.info("No outstanding tasks found") def pytest_make_collect_report(collector): diff --git a/parsl/tests/scaling_tests/vineex_condor.py b/parsl/tests/scaling_tests/vineex_condor.py index 62277374d3..59f8e7b07d 100644 --- a/parsl/tests/scaling_tests/vineex_condor.py +++ b/parsl/tests/scaling_tests/vineex_condor.py @@ -6,5 +6,5 @@ config = Config( executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=50055), provider=CondorProvider(), - )] + )] ) diff --git a/parsl/tests/scaling_tests/vineex_local.py b/parsl/tests/scaling_tests/vineex_local.py index e3fc9bba2f..4ade645f31 100644 --- a/parsl/tests/scaling_tests/vineex_local.py +++ b/parsl/tests/scaling_tests/vineex_local.py @@ -7,5 +7,5 @@ executors=[TaskVineExecutor(label='VineExec', worker_launch_method='factory', manager_config=TaskVineManagerConfig(port=50055), - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_condor.py b/parsl/tests/scaling_tests/wqex_condor.py index 91ec66e1ba..67123e7713 100644 --- a/parsl/tests/scaling_tests/wqex_condor.py +++ b/parsl/tests/scaling_tests/wqex_condor.py @@ -8,5 +8,5 @@ provider=CondorProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_local.py b/parsl/tests/scaling_tests/wqex_local.py index 320fedcded..4cf154e993 100644 --- a/parsl/tests/scaling_tests/wqex_local.py +++ b/parsl/tests/scaling_tests/wqex_local.py @@ -8,5 +8,5 @@ provider=LocalProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) diff --git a/parsl/tests/test_python_apps/test_lifted.py b/parsl/tests/test_python_apps/test_lifted.py index 644792205a..7d22530a08 100644 --- a/parsl/tests/test_python_apps/test_lifted.py +++ b/parsl/tests/test_python_apps/test_lifted.py @@ -89,11 +89,12 @@ def test_returns_a_class_instance(): def test_returns_a_class_instance_no_underscores(): # test that _underscore attribute references are not lifted + f = returns_a_class_instance() with pytest.raises(AttributeError): - returns_a_class_instance()._nosuchattribute.result() + f._nosuchattribute.result() + f.exception() # wait for f to complete before the test ends -@pytest.mark.skip("returning classes is not supported in WorkQueue or Task Vine - see issue #2908") def test_returns_a_class(): # precondition that returns_a_class behaves