Skip to content

Commit

Permalink
Merge branch 'master' into benc-mypy-executor-shutdown-type
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Nov 6, 2023
2 parents 3a6219b + 3e58c8b commit c860f77
Show file tree
Hide file tree
Showing 24 changed files with 95 additions and 99 deletions.
3 changes: 1 addition & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[flake8]
# E124: closing bracket does not match visual indentation
# E126: continuation line over-indented for hanging indent
# This one is bad. Sometimes ordering matters, conditional imports
# setting env vars necessary etc.
Expand All @@ -8,7 +7,7 @@
# https://github.com/PyCQA/pycodestyle/issues/386
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E124, E126, E402, E129, W504
ignore = E126, E402, E129, W504
max-line-length = 158
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CCTOOLS_INSTALL := /tmp/cctools
MPICH=mpich
OPENMPI=openmpi
export PATH := $(CCTOOLS_INSTALL)/bin/:$(PATH)
export CCTOOLS_VERSION=7.7.1
export CCTOOLS_VERSION=7.7.2
export HYDRA_LAUNCHER=fork
export OMPI_MCA_rmaps_base_oversubscribe=yes
MPI=$(MPICH)
Expand Down
6 changes: 2 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ disallow_untyped_defs = True
disallow_any_expr = True

[mypy-parsl.executors.high_throughput.interchange.*]
check_untyped_defs = True

[mypy-parsl.executors.extreme_scale.*]
ignore_errors = True
disallow_untyped_defs = True
warn_unreachable = True

[mypy-parsl.monitoring.*]
disallow_untyped_decorators = True
Expand Down
36 changes: 12 additions & 24 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,21 @@


class Channel(metaclass=ABCMeta):
"""For certain resources such as campus clusters or supercomputers at
"""Channels are abstractions that enable ExecutionProviders to talk to
resource managers of remote compute facilities.
For certain resources such as campus clusters or supercomputers at
research laboratories, resource requirements may require authentication.
For instance some resources may allow access to their job schedulers from
only their login-nodes which require you to authenticate on through SSH,
GSI-SSH and sometimes even require two factor authentication. Channels are
simple abstractions that enable the ExecutionProvider component to talk to
the resource managers of compute facilities. The simplest Channel,
*LocalChannel*, simply executes commands locally on a shell, while the
*SshChannel* authenticates you to remote systems.
Channels are usually called via the execute_wait function.
For channels that execute remotely, a push_file function allows you to copy over files.
.. code:: python
+------------------
|
cmd, wtime ------->| execute_wait
(ec, stdout, stderr)<-|---+
|
src, dst_dir ------->| push_file
dst_path <--------|----+
|
dst_script_dir <------| script_dir
|
+-------------------
only their login-nodes which require you to authenticate through SSH, or
require two factor authentication.
The simplest Channel, *LocalChannel*, executes commands locally in a
shell, while the *SSHChannel* authenticates you to remote systems.
Channels provide the ability to execute commands remotely, using the
execute_wait method, and manipulate the remote file system using methods
such as push_file, pull_file and makedirs.
Channels should ensure that each launched command runs in a new process
group, so that providers (such as AdHocProvider and LocalProvider) which
Expand Down
4 changes: 2 additions & 2 deletions parsl/configs/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
}
}
}


config = Config(
Expand All @@ -26,7 +26,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
3 changes: 1 addition & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ def submit(self,
'joins': None,
'try_id': 0,
'id': task_id,
'task_launch_lock': threading.Lock(),
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
Expand Down Expand Up @@ -1029,8 +1030,6 @@ def submit(self,
task_record['func_name'],
waiting_message))

task_record['task_launch_lock'] = threading.Lock()

app_fu.add_done_callback(partial(self.handle_app_update, task_record))
self.update_task_state(task_record, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,10 @@ def _start_local_interchange_process(self):
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO
},
},
daemon=True,
name="HTEX-Interchange"
)
)
self.interchange_proc.start()
try:
(self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120)
Expand Down
71 changes: 39 additions & 32 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
import multiprocessing
import zmq
import os
import sys
Expand All @@ -13,7 +14,7 @@
import threading
import json

from typing import cast, Any, Dict, Set, Optional
from typing import cast, Any, Dict, NoReturn, Sequence, Set, Optional, Tuple

from parsl.utils import setproctitle
from parsl.version import VERSION as PARSL_VERSION
Expand All @@ -36,23 +37,23 @@ class ManagerLost(Exception):
''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats
have been missed.
'''
def __init__(self, manager_id, hostname):
def __init__(self, manager_id: bytes, hostname: str) -> None:
self.manager_id = manager_id
self.tstamp = time.time()
self.hostname = hostname

def __str__(self):
def __str__(self) -> str:
return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname)


class VersionMismatch(Exception):
''' Manager and Interchange versions do not match
'''
def __init__(self, interchange_version, manager_version):
def __init__(self, interchange_version: str, manager_version: str):
self.interchange_version = interchange_version
self.manager_version = manager_version

def __str__(self):
def __str__(self) -> str:
return "Manager version info {} does not match interchange version info {}, causing a critical failure".format(
self.manager_version,
self.interchange_version)
Expand All @@ -67,18 +68,18 @@ class Interchange:
4. Service single and batch requests from workers
"""
def __init__(self,
client_address="127.0.0.1",
client_address: str = "127.0.0.1",
interchange_address: Optional[str] = None,
client_ports=(50055, 50056, 50057),
worker_ports=None,
worker_port_range=(54000, 55000),
hub_address=None,
hub_port=None,
heartbeat_threshold=60,
logdir=".",
logging_level=logging.INFO,
poll_period=10,
) -> None:
client_ports: Tuple[int, int, int] = (50055, 50056, 50057),
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Tuple[int, int] = (54000, 55000),
hub_address: Optional[str] = None,
hub_port: Optional[int] = None,
heartbeat_threshold: int = 60,
logdir: str = ".",
logging_level: int = logging.INFO,
poll_period: int = 10,
) -> None:
"""
Parameters
----------
Expand Down Expand Up @@ -191,7 +192,7 @@ def __init__(self,

logger.info("Platform info: {}".format(self.current_platform))

def get_tasks(self, count):
def get_tasks(self, count: int) -> Sequence[dict]:
""" Obtains a batch of tasks from the internal pending_task_queue
Parameters
Expand All @@ -216,7 +217,7 @@ def get_tasks(self, count):
return tasks

@wrap_with_logs(target="interchange")
def task_puller(self):
def task_puller(self) -> NoReturn:
"""Pull tasks from the incoming tasks zmq pipe onto the internal
pending task queue
"""
Expand All @@ -237,7 +238,7 @@ def task_puller(self):
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _create_monitoring_channel(self):
def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
if self.hub_address and self.hub_port:
logger.info("Connecting to monitoring")
hub_channel = self.context.socket(zmq.DEALER)
Expand All @@ -248,7 +249,7 @@ def _create_monitoring_channel(self):
else:
return None

def _send_monitoring_info(self, hub_channel, manager: ManagerRecord):
def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None:
if hub_channel:
logger.info("Sending message {} to hub".format(manager))

Expand All @@ -259,7 +260,7 @@ def _send_monitoring_info(self, hub_channel, manager: ManagerRecord):
hub_channel.send_pyobj((MessageType.NODE_INFO, d))

@wrap_with_logs(target="interchange")
def _command_server(self):
def _command_server(self) -> NoReturn:
""" Command server to run async command to the interchange
"""
logger.debug("Command Server Starting")
Expand Down Expand Up @@ -326,7 +327,7 @@ def _command_server(self):
continue

@wrap_with_logs
def start(self):
def start(self) -> None:
""" Start the interchange
"""

Expand Down Expand Up @@ -382,7 +383,7 @@ def start(self):
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.warning("Exiting")

def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill_event):
def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None:
# Listen for requests for work
if self.task_outgoing in self.socks and self.socks[self.task_outgoing] == zmq.POLLIN:
logger.debug("starting task_outgoing section")
Expand Down Expand Up @@ -425,7 +426,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
self.current_platform['parsl_v']),
"py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0],
msg['parsl_v'])
)
)
result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)}
pkl_package = pickle.dumps(result_package)
self.results_outgoing.send(pkl_package)
Expand All @@ -448,7 +449,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
logger.error("Unexpected non-heartbeat message received from manager {}")
logger.debug("leaving task_outgoing section")

def process_tasks_to_send(self, interesting_managers):
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# If we had received any requests, check if there are tasks that could be passed

logger.debug("Managers count (interesting/total): {interesting}/{total}".format(
Expand All @@ -474,14 +475,14 @@ def process_tasks_to_send(self, interesting_managers):
tids = [t['task_id'] for t in tasks]
m['tasks'].extend(tids)
m['idle_since'] = None
logger.debug("Sent tasks: {} to manager {}".format(tids, manager_id))
logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id))
# recompute real_capacity after sending tasks
real_capacity = m['max_capacity'] - tasks_inflight
if real_capacity > 0:
logger.debug("Manager {} has free capacity {}".format(manager_id, real_capacity))
logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity))
# ... so keep it in the interesting_managers list
else:
logger.debug("Manager {} is now saturated".format(manager_id))
logger.debug("Manager {!r} is now saturated".format(manager_id))
interesting_managers.remove(manager_id)
else:
interesting_managers.remove(manager_id)
Expand All @@ -490,7 +491,7 @@ def process_tasks_to_send(self, interesting_managers):
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

def process_results_incoming(self, interesting_managers, hub_channel):
def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
# Receive any results and forward to client
if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN:
logger.debug("entering results_incoming section")
Expand All @@ -508,6 +509,12 @@ def process_results_incoming(self, interesting_managers, hub_channel):
# process this for task ID and forward to executor
b_messages.append((p_message, r))
elif r['type'] == 'monitoring':
# the monitoring code makes the assumption that no
# monitoring messages will be received if monitoring
# is not configured, and that hub_channel will only
# be None when monitoring is not configurated.
assert hub_channel is not None

hub_channel.send_pyobj(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
Expand Down Expand Up @@ -552,7 +559,7 @@ def process_results_incoming(self, interesting_managers, hub_channel):
interesting_managers.add(manager_id)
logger.debug("leaving results_incoming section")

def expire_bad_managers(self, interesting_managers, hub_channel):
def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
Expand All @@ -576,7 +583,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel):
interesting_managers.remove(manager_id)


def start_file_logger(filename, level=logging.DEBUG, format_string=None):
def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: Optional[str] = None) -> None:
"""Add a stream log handler.
Parameters
Expand Down Expand Up @@ -608,7 +615,7 @@ def start_file_logger(filename, level=logging.DEBUG, format_string=None):


@wrap_with_logs(target="interchange")
def starter(comm_q, *args, **kwargs):
def starter(comm_q: multiprocessing.Queue, *args: Any, **kwargs: Any) -> None:
"""Start the interchange process
The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def create_reg_message(self):
'dir': os.getcwd(),
'cpu_count': psutil.cpu_count(logical=False),
'total_memory': psutil.virtual_memory().total,
}
}
b_msg = json.dumps(msg).encode('utf-8')
return b_msg

Expand Down Expand Up @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue
logger.exception("Caught exception while trying to pickle the result package")
pkl_package = pickle.dumps({'type': 'result', 'task_id': tid,
'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))
})
})

result_queue.put(pkl_package)
tasks_in_progress.pop(worker_id)
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/taskvine/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config):
else:
factory = Factory(batch_type=factory_config.batch_type,
manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}",
)
)
except Exception as e:
raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}')

Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
input_files = []
output_files = []

# Determine the input and output files that will exist at the workes:
# Determine the input and output files that will exist at the workers:
input_files += [self._register_file(f) for f in kwargs.get("inputs", []) if isinstance(f, File)]
output_files += [self._register_file(f) for f in kwargs.get("outputs", []) if isinstance(f, File)]

Expand Down
Loading

0 comments on commit c860f77

Please sign in to comment.