From 28c3521417d5f2e503d1a68d1cc282c03489d699 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 12:05:05 -0500 Subject: [PATCH 1/9] Move task_launch_lock into initial TaskRecord (#2930) There's no reason for it to be omitted and added later, and this style is more defensive. --- parsl/dataflow/dflow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index c494346533..69d278b402 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -984,6 +984,7 @@ def submit(self, 'joins': None, 'try_id': 0, 'id': task_id, + 'task_launch_lock': threading.Lock(), 'time_invoked': datetime.datetime.now(), 'time_returned': None, 'try_time_launched': None, @@ -1029,8 +1030,6 @@ def submit(self, task_record['func_name'], waiting_message)) - task_record['task_launch_lock'] = threading.Lock() - app_fu.add_done_callback(partial(self.handle_app_update, task_record)) self.update_task_state(task_record, States.pending) logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu'])) From 77ae4f7b03eb392089bd2becaabecdf852388b39 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 12:40:48 -0500 Subject: [PATCH 2/9] Remove mypy reference to Extreme Scale Executor (#2932) This executor was removed in PR #2747 --- mypy.ini | 3 --- 1 file changed, 3 deletions(-) diff --git a/mypy.ini b/mypy.ini index 531337257b..8e10a0659e 100644 --- a/mypy.ini +++ b/mypy.ini @@ -75,9 +75,6 @@ disallow_any_expr = True [mypy-parsl.executors.high_throughput.interchange.*] check_untyped_defs = True -[mypy-parsl.executors.extreme_scale.*] -ignore_errors = True - [mypy-parsl.monitoring.*] disallow_untyped_decorators = True check_untyped_defs = True From c7a3918e2c75194059c57095e3078bdd1a7bbee5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 13:07:34 -0500 Subject: [PATCH 3/9] Re-enable test that was skipped because of issue #2908 (#2928) This should have been fixed by PR #2916 --- parsl/tests/test_python_apps/test_lifted.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/tests/test_python_apps/test_lifted.py b/parsl/tests/test_python_apps/test_lifted.py index 644792205a..52fba7943f 100644 --- a/parsl/tests/test_python_apps/test_lifted.py +++ b/parsl/tests/test_python_apps/test_lifted.py @@ -93,7 +93,6 @@ def test_returns_a_class_instance_no_underscores(): returns_a_class_instance()._nosuchattribute.result() -@pytest.mark.skip("returning classes is not supported in WorkQueue or Task Vine - see issue #2908") def test_returns_a_class(): # precondition that returns_a_class behaves From 0cbf96db28e0ee6bd67ab5b34d5d26e5325d7d82 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 13:40:57 -0500 Subject: [PATCH 4/9] Make pytest fail when tasks are running after a test (#2927) When pressing ctrl-c in pytest, prior to this PR, it would hang as wait_for_task_completion would be called in the unwinding of the fixture stack. However, because ctrl-c was pressed, tasks wouldn't be expected to all complete. This PR changes that to assert that all tasks are completed, rather than *waiting* for all tasks to complete. A non-finished task now gives a test error - which is arguably better anyway because it more aggressively flushes out tests that do not perform a complete shutdown. This means that pressing ctrl-C in a pytest leads to an assertion error; when previously it led to a hang. One recently introduced test is fixed to comply. This is part of safe-shutdown work. --- parsl/tests/conftest.py | 8 ++++++-- parsl/tests/test_python_apps/test_lifted.py | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index a35493810e..fcfd2a594e 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -243,7 +243,7 @@ def setup_data(tmpd_cwd): @pytest.fixture(autouse=True, scope='function') -def wait_for_task_completion(pytestconfig): +def assert_no_outstanding_tasks(pytestconfig): """If we're in a config-file based mode, wait for task completion between each test. This will detect early on (by hanging) if particular test tasks are not finishing, rather than silently falling off the end of @@ -254,7 +254,11 @@ def wait_for_task_completion(pytestconfig): config = pytestconfig.getoption('config')[0] yield if config != 'local': - parsl.dfk().wait_for_current_tasks() + logger.info("Checking no outstanding tasks") + for task_record in parsl.dfk().tasks.values(): + fut = task_record['app_fu'] + assert fut.done(), f"Incomplete task found, task id {task_record['id']}" + logger.info("No outstanding tasks found") def pytest_make_collect_report(collector): diff --git a/parsl/tests/test_python_apps/test_lifted.py b/parsl/tests/test_python_apps/test_lifted.py index 52fba7943f..7d22530a08 100644 --- a/parsl/tests/test_python_apps/test_lifted.py +++ b/parsl/tests/test_python_apps/test_lifted.py @@ -89,8 +89,10 @@ def test_returns_a_class_instance(): def test_returns_a_class_instance_no_underscores(): # test that _underscore attribute references are not lifted + f = returns_a_class_instance() with pytest.raises(AttributeError): - returns_a_class_instance()._nosuchattribute.result() + f._nosuchattribute.result() + f.exception() # wait for f to complete before the test ends def test_returns_a_class(): From d7bc4b9139e1a42188f27495e1127ef413a57539 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 14:13:45 -0500 Subject: [PATCH 5/9] Fix all instances of flake8 E124 indentation warnings and re-enable (#2937) --- .flake8 | 3 +-- parsl/configs/ad_hoc.py | 4 ++-- parsl/executors/high_throughput/executor.py | 4 ++-- parsl/executors/high_throughput/interchange.py | 4 ++-- .../executors/high_throughput/process_worker_pool.py | 4 ++-- parsl/executors/taskvine/factory.py | 2 +- parsl/monitoring/monitoring.py | 12 ++++++------ parsl/monitoring/remote.py | 2 +- .../visualization/plots/default/workflow_plots.py | 8 ++++---- .../plots/default/workflow_resource_plots.py | 4 ++-- parsl/tests/configs/ad_hoc_cluster_htex.py | 6 +++--- parsl/tests/configs/htex_ad_hoc_cluster.py | 2 +- parsl/tests/configs/local_threads_monitoring.py | 2 +- parsl/tests/scaling_tests/vineex_condor.py | 2 +- parsl/tests/scaling_tests/vineex_local.py | 2 +- parsl/tests/scaling_tests/wqex_condor.py | 2 +- parsl/tests/scaling_tests/wqex_local.py | 2 +- 17 files changed, 32 insertions(+), 33 deletions(-) diff --git a/.flake8 b/.flake8 index f2ac7f4e9a..951ea6fe5f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,4 @@ [flake8] -# E124: closing bracket does not match visual indentation # E126: continuation line over-indented for hanging indent # This one is bad. Sometimes ordering matters, conditional imports # setting env vars necessary etc. @@ -8,7 +7,7 @@ # https://github.com/PyCQA/pycodestyle/issues/386 # W504: line break after binary operator # (Raised by flake8 even when it is followed) -ignore = E124, E126, E402, E129, W504 +ignore = E126, E402, E129, W504 max-line-length = 158 exclude = test_import_fail.py, parsl/executors/workqueue/parsl_coprocess.py diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py index f92768f7f4..a289fb9457 100644 --- a/parsl/configs/ad_hoc.py +++ b/parsl/configs/ad_hoc.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] + } } -} config = Config( @@ -26,7 +26,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 2ae6dab314..94d0f135a4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -479,10 +479,10 @@ def _start_local_interchange_process(self): "heartbeat_threshold": self.heartbeat_threshold, "poll_period": self.poll_period, "logging_level": logging.DEBUG if self.worker_debug else logging.INFO - }, + }, daemon=True, name="HTEX-Interchange" - ) + ) self.interchange_proc.start() try: (self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 6c4ca961ec..f8f1da0f56 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -78,7 +78,7 @@ def __init__(self, logdir=".", logging_level=logging.INFO, poll_period=10, - ) -> None: + ) -> None: """ Parameters ---------- @@ -425,7 +425,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill self.current_platform['parsl_v']), "py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0], msg['parsl_v']) - ) + ) result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)} pkl_package = pickle.dumps(result_package) self.results_outgoing.send(pkl_package) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index f65ec560e5..b95a979221 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -234,7 +234,7 @@ def create_reg_message(self): 'dir': os.getcwd(), 'cpu_count': psutil.cpu_count(logical=False), 'total_memory': psutil.virtual_memory().total, - } + } b_msg = json.dumps(msg).encode('utf-8') return b_msg @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue logger.exception("Caught exception while trying to pickle the result package") pkl_package = pickle.dumps({'type': 'result', 'task_id': tid, 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info())) - }) + }) result_queue.put(pkl_package) tasks_in_progress.pop(worker_id) diff --git a/parsl/executors/taskvine/factory.py b/parsl/executors/taskvine/factory.py index 614413705d..484877d1bd 100644 --- a/parsl/executors/taskvine/factory.py +++ b/parsl/executors/taskvine/factory.py @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config): else: factory = Factory(batch_type=factory_config.batch_type, manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}", - ) + ) except Exception as e: raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}') diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 783ee8d098..94d33c27d1 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -194,10 +194,10 @@ def start(self, run_id: str, run_dir: str) -> int: "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "run_id": run_id - }, + }, name="Monitoring-Router-Process", daemon=True, - ) + ) self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, @@ -205,10 +205,10 @@ def start(self, run_id: str, run_dir: str) -> int: kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, - }, + }, name="Monitoring-DBM-Process", daemon=True, - ) + ) self.dbm_proc.start() self.logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid)) @@ -216,7 +216,7 @@ def start(self, run_id: str, run_dir: str) -> int: args=(self.logdir, self.resource_msgs, run_dir), name="Monitoring-Filesystem-Process", daemon=True - ) + ) self.filesystem_proc.start() self.logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") @@ -359,7 +359,7 @@ def __init__(self, run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3 # in seconds - ): + ): """ Initializes a monitoring configuration class. Parameters diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d42e0079b4..7861891d74 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -143,7 +143,7 @@ def send_first_last_message(try_id: int, 'first_msg': not is_last, 'last_msg': is_last, 'timestamp': datetime.datetime.now() - }) + }) radio.send(msg) return diff --git a/parsl/monitoring/visualization/plots/default/workflow_plots.py b/parsl/monitoring/visualization/plots/default/workflow_plots.py index 4cf876b188..ac5ae47285 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_plots.py @@ -22,7 +22,7 @@ 'exec_done': 'rgb(0, 200, 0)', 'memo_done': 'rgb(64, 200, 64)', 'fail_retryable': 'rgb(200, 128,128)' - } + } def task_gantt_plot(df_task, df_status, time_completed=None): @@ -50,7 +50,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': status['timestamp'], 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) last_status = status @@ -60,7 +60,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None): 'Start': last_status['timestamp'], 'Finish': time_completed, 'Resource': last_status['task_status_name'] - } + } parsl_tasks.extend([last_status_bar]) fig = ff.create_gantt(parsl_tasks, @@ -205,7 +205,7 @@ def y_axis_setup(value): "fail_retryable": (8, 'rgb(200, 128,128)'), "joining": (9, 'rgb(128, 128, 255)'), "running_ended": (10, 'rgb(64, 64, 255)') - } + } def workflow_dag_plot(df_tasks, group_by_apps=True): diff --git a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py index 44e7d31a22..44ffe8ed98 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py @@ -164,7 +164,7 @@ def worker_efficiency(task, node): y=[total_workers] * (end - start + 1), name='Total of workers in whole run', ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title='Number of workers'), @@ -230,7 +230,7 @@ def resource_efficiency(resource, node, label): y=[total] * (end - start + 1), name=name2, ) - ], + ], layout=go.Layout(xaxis=dict(autorange=True, title='Time (seconds)'), yaxis=dict(title=yaxis), diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py index 83df8e2991..5c90d27918 100644 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ b/parsl/tests/configs/ad_hoc_cluster_htex.py @@ -9,8 +9,8 @@ {'username': 'YOUR_USERNAME', 'script_dir': 'YOUR_SCRIPT_DIR', 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } -} # type: Dict[str, Dict[str, Any]] + } + } # type: Dict[str, Dict[str, Any]] config = Config( executors=[ @@ -25,7 +25,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py index 2e319ed541..80949f1d1e 100644 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ b/parsl/tests/configs/htex_ad_hoc_cluster.py @@ -20,7 +20,7 @@ channels=[SSHChannel(hostname=m, username=user_opts['adhoc']['username'], script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] + ) for m in user_opts['adhoc']['remote_hostnames']] ) ) ], diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 3ab4305c74..81b9095285 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -8,4 +8,4 @@ hub_port=55055, resource_monitoring_interval=3, ) - ) + ) diff --git a/parsl/tests/scaling_tests/vineex_condor.py b/parsl/tests/scaling_tests/vineex_condor.py index 62277374d3..59f8e7b07d 100644 --- a/parsl/tests/scaling_tests/vineex_condor.py +++ b/parsl/tests/scaling_tests/vineex_condor.py @@ -6,5 +6,5 @@ config = Config( executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=50055), provider=CondorProvider(), - )] + )] ) diff --git a/parsl/tests/scaling_tests/vineex_local.py b/parsl/tests/scaling_tests/vineex_local.py index e3fc9bba2f..4ade645f31 100644 --- a/parsl/tests/scaling_tests/vineex_local.py +++ b/parsl/tests/scaling_tests/vineex_local.py @@ -7,5 +7,5 @@ executors=[TaskVineExecutor(label='VineExec', worker_launch_method='factory', manager_config=TaskVineManagerConfig(port=50055), - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_condor.py b/parsl/tests/scaling_tests/wqex_condor.py index 91ec66e1ba..67123e7713 100644 --- a/parsl/tests/scaling_tests/wqex_condor.py +++ b/parsl/tests/scaling_tests/wqex_condor.py @@ -8,5 +8,5 @@ provider=CondorProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) diff --git a/parsl/tests/scaling_tests/wqex_local.py b/parsl/tests/scaling_tests/wqex_local.py index 320fedcded..4cf154e993 100644 --- a/parsl/tests/scaling_tests/wqex_local.py +++ b/parsl/tests/scaling_tests/wqex_local.py @@ -8,5 +8,5 @@ provider=LocalProvider(), # init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh; # echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log', - )] + )] ) From f8b18366269c308138e2eeadbc00a4dceb63bf01 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 14:50:05 -0500 Subject: [PATCH 6/9] Refresh docstring for base Channel class (#2931) This removes many uses of the word "simple" and makes the first sentence summarise the class (see PEP-257). --- parsl/channels/base.py | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 0069ba34ff..8c50a6efcf 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -4,33 +4,21 @@ class Channel(metaclass=ABCMeta): - """For certain resources such as campus clusters or supercomputers at + """Channels are abstractions that enable ExecutionProviders to talk to + resource managers of remote compute facilities. + + For certain resources such as campus clusters or supercomputers at research laboratories, resource requirements may require authentication. For instance some resources may allow access to their job schedulers from - only their login-nodes which require you to authenticate on through SSH, - GSI-SSH and sometimes even require two factor authentication. Channels are - simple abstractions that enable the ExecutionProvider component to talk to - the resource managers of compute facilities. The simplest Channel, - *LocalChannel*, simply executes commands locally on a shell, while the - *SshChannel* authenticates you to remote systems. - - Channels are usually called via the execute_wait function. - For channels that execute remotely, a push_file function allows you to copy over files. - - .. code:: python - - +------------------ - | - cmd, wtime ------->| execute_wait - (ec, stdout, stderr)<-|---+ - | - src, dst_dir ------->| push_file - dst_path <--------|----+ - | - dst_script_dir <------| script_dir - | - +------------------- + only their login-nodes which require you to authenticate through SSH, or + require two factor authentication. + + The simplest Channel, *LocalChannel*, executes commands locally in a + shell, while the *SSHChannel* authenticates you to remote systems. + Channels provide the ability to execute commands remotely, using the + execute_wait method, and manipulate the remote file system using methods + such as push_file, pull_file and makedirs. Channels should ensure that each launched command runs in a new process group, so that providers (such as AdHocProvider and LocalProvider) which From 746b91ed467723bb8a6ebdf5e4c48046a58a8166 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 5 Nov 2023 22:56:30 -0600 Subject: [PATCH 7/9] Fix a typo in Work Queue code comment (#2944) --- parsl/executors/workqueue/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 230b074ff0..80559bf5cc 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -449,7 +449,7 @@ def submit(self, func, resource_specification, *args, **kwargs): input_files = [] output_files = [] - # Determine the input and output files that will exist at the workes: + # Determine the input and output files that will exist at the workers: input_files += [self._register_file(f) for f in kwargs.get("inputs", []) if isinstance(f, File)] output_files += [self._register_file(f) for f in kwargs.get("outputs", []) if isinstance(f, File)] From 74e63ec605182a2ca9ee65cd48274f512c02c006 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 5 Nov 2023 23:44:28 -0600 Subject: [PATCH 8/9] Require type annotations throughout the interchange (#2940) Co-authored-by: Kevin Hunter Kesling --- mypy.ini | 3 +- .../executors/high_throughput/interchange.py | 67 ++++++++++--------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/mypy.ini b/mypy.ini index 8e10a0659e..2597ffaa4e 100644 --- a/mypy.ini +++ b/mypy.ini @@ -73,7 +73,8 @@ disallow_untyped_defs = True disallow_any_expr = True [mypy-parsl.executors.high_throughput.interchange.*] -check_untyped_defs = True +disallow_untyped_defs = True +warn_unreachable = True [mypy-parsl.monitoring.*] disallow_untyped_decorators = True diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index f8f1da0f56..67b70aa78d 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import multiprocessing import zmq import os import sys @@ -13,7 +14,7 @@ import threading import json -from typing import cast, Any, Dict, Set, Optional +from typing import cast, Any, Dict, NoReturn, Sequence, Set, Optional, Tuple from parsl.utils import setproctitle from parsl.version import VERSION as PARSL_VERSION @@ -36,23 +37,23 @@ class ManagerLost(Exception): ''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats have been missed. ''' - def __init__(self, manager_id, hostname): + def __init__(self, manager_id: bytes, hostname: str) -> None: self.manager_id = manager_id self.tstamp = time.time() self.hostname = hostname - def __str__(self): + def __str__(self) -> str: return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname) class VersionMismatch(Exception): ''' Manager and Interchange versions do not match ''' - def __init__(self, interchange_version, manager_version): + def __init__(self, interchange_version: str, manager_version: str): self.interchange_version = interchange_version self.manager_version = manager_version - def __str__(self): + def __str__(self) -> str: return "Manager version info {} does not match interchange version info {}, causing a critical failure".format( self.manager_version, self.interchange_version) @@ -67,17 +68,17 @@ class Interchange: 4. Service single and batch requests from workers """ def __init__(self, - client_address="127.0.0.1", + client_address: str = "127.0.0.1", interchange_address: Optional[str] = None, - client_ports=(50055, 50056, 50057), - worker_ports=None, - worker_port_range=(54000, 55000), - hub_address=None, - hub_port=None, - heartbeat_threshold=60, - logdir=".", - logging_level=logging.INFO, - poll_period=10, + client_ports: Tuple[int, int, int] = (50055, 50056, 50057), + worker_ports: Optional[Tuple[int, int]] = None, + worker_port_range: Tuple[int, int] = (54000, 55000), + hub_address: Optional[str] = None, + hub_port: Optional[int] = None, + heartbeat_threshold: int = 60, + logdir: str = ".", + logging_level: int = logging.INFO, + poll_period: int = 10, ) -> None: """ Parameters @@ -191,7 +192,7 @@ def __init__(self, logger.info("Platform info: {}".format(self.current_platform)) - def get_tasks(self, count): + def get_tasks(self, count: int) -> Sequence[dict]: """ Obtains a batch of tasks from the internal pending_task_queue Parameters @@ -216,7 +217,7 @@ def get_tasks(self, count): return tasks @wrap_with_logs(target="interchange") - def task_puller(self): + def task_puller(self) -> NoReturn: """Pull tasks from the incoming tasks zmq pipe onto the internal pending task queue """ @@ -237,7 +238,7 @@ def task_puller(self): task_counter += 1 logger.debug(f"Fetched {task_counter} tasks so far") - def _create_monitoring_channel(self): + def _create_monitoring_channel(self) -> Optional[zmq.Socket]: if self.hub_address and self.hub_port: logger.info("Connecting to monitoring") hub_channel = self.context.socket(zmq.DEALER) @@ -248,7 +249,7 @@ def _create_monitoring_channel(self): else: return None - def _send_monitoring_info(self, hub_channel, manager: ManagerRecord): + def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None: if hub_channel: logger.info("Sending message {} to hub".format(manager)) @@ -259,7 +260,7 @@ def _send_monitoring_info(self, hub_channel, manager: ManagerRecord): hub_channel.send_pyobj((MessageType.NODE_INFO, d)) @wrap_with_logs(target="interchange") - def _command_server(self): + def _command_server(self) -> NoReturn: """ Command server to run async command to the interchange """ logger.debug("Command Server Starting") @@ -326,7 +327,7 @@ def _command_server(self): continue @wrap_with_logs - def start(self): + def start(self) -> None: """ Start the interchange """ @@ -382,7 +383,7 @@ def start(self): logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) logger.warning("Exiting") - def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill_event): + def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None: # Listen for requests for work if self.task_outgoing in self.socks and self.socks[self.task_outgoing] == zmq.POLLIN: logger.debug("starting task_outgoing section") @@ -448,7 +449,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill logger.error("Unexpected non-heartbeat message received from manager {}") logger.debug("leaving task_outgoing section") - def process_tasks_to_send(self, interesting_managers): + def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # If we had received any requests, check if there are tasks that could be passed logger.debug("Managers count (interesting/total): {interesting}/{total}".format( @@ -474,14 +475,14 @@ def process_tasks_to_send(self, interesting_managers): tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {}".format(tids, manager_id)) + logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {} is now saturated".format(manager_id)) + logger.debug("Manager {!r} is now saturated".format(manager_id)) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) @@ -490,7 +491,7 @@ def process_tasks_to_send(self, interesting_managers): else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") - def process_results_incoming(self, interesting_managers, hub_channel): + def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: # Receive any results and forward to client if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN: logger.debug("entering results_incoming section") @@ -508,6 +509,12 @@ def process_results_incoming(self, interesting_managers, hub_channel): # process this for task ID and forward to executor b_messages.append((p_message, r)) elif r['type'] == 'monitoring': + # the monitoring code makes the assumption that no + # monitoring messages will be received if monitoring + # is not configured, and that hub_channel will only + # be None when monitoring is not configurated. + assert hub_channel is not None + hub_channel.send_pyobj(r['payload']) elif r['type'] == 'heartbeat': logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") @@ -552,7 +559,7 @@ def process_results_incoming(self, interesting_managers, hub_channel): interesting_managers.add(manager_id) logger.debug("leaving results_incoming section") - def expire_bad_managers(self, interesting_managers, hub_channel): + def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: @@ -576,7 +583,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel): interesting_managers.remove(manager_id) -def start_file_logger(filename, level=logging.DEBUG, format_string=None): +def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: Optional[str] = None) -> None: """Add a stream log handler. Parameters @@ -608,7 +615,7 @@ def start_file_logger(filename, level=logging.DEBUG, format_string=None): @wrap_with_logs(target="interchange") -def starter(comm_q, *args, **kwargs): +def starter(comm_q: multiprocessing.Queue, *args: Any, **kwargs: Any) -> None: """Start the interchange process The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ From 3e58c8b4179e4ce8cecd84d344e144b13100a879 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 6 Nov 2023 06:22:24 -0600 Subject: [PATCH 9/9] Upgrade cctools to bring in more bugfixes (#2939) See https://github.com/Parsl/parsl/issues/2914#issuecomment-1783121300 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index f8c3c4866f..78c7f59fba 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ CCTOOLS_INSTALL := /tmp/cctools MPICH=mpich OPENMPI=openmpi export PATH := $(CCTOOLS_INSTALL)/bin/:$(PATH) -export CCTOOLS_VERSION=7.7.1 +export CCTOOLS_VERSION=7.7.2 export HYDRA_LAUNCHER=fork export OMPI_MCA_rmaps_base_oversubscribe=yes MPI=$(MPICH)