Skip to content

Commit

Permalink
Merge branch 'Parsl:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
AymenFJA authored Nov 1, 2023
2 parents d3ab39d + c7a3918 commit 34f25bf
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 38 deletions.
8 changes: 0 additions & 8 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
[mypy]
plugins = sqlalchemy.ext.mypy.plugin

# globally disabled error codes:
# str-bytes-safe warns that a byte string is formatted into a string.
# which is commonly done with manager IDs in the parsl
# codebase.
disable_error_code = str-bytes-safe
enable_error_code = ignore-without-code
no_implicit_reexport = True
warn_redundant_casts = True
Expand Down Expand Up @@ -80,9 +75,6 @@ disallow_any_expr = True
[mypy-parsl.executors.high_throughput.interchange.*]
check_untyped_defs = True

[mypy-parsl.executors.extreme_scale.*]
ignore_errors = True

[mypy-parsl.monitoring.*]
disallow_untyped_decorators = True
check_untyped_defs = True
Expand Down
3 changes: 1 addition & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ def submit(self,
'joins': None,
'try_id': 0,
'id': task_id,
'task_launch_lock': threading.Lock(),
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
Expand Down Expand Up @@ -1029,8 +1030,6 @@ def submit(self,
task_record['func_name'],
waiting_message))

task_record['task_launch_lock'] = threading.Lock()

app_fu.add_done_callback(partial(self.handle_app_update, task_record))
self.update_task_state(task_record, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))
Expand Down
41 changes: 20 additions & 21 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def get_tasks(self, count):
eg. [{'task_id':<x>, 'buffer':<buf>} ... ]
"""
tasks = []
for i in range(0, count):
for _ in range(0, count):
try:
x = self.pending_task_queue.get(block=False)
except queue.Empty:
Expand Down Expand Up @@ -305,7 +305,7 @@ def _command_server(self):
elif command_req.startswith("HOLD_WORKER"):
cmd, s_manager = command_req.split(';')
manager_id = s_manager.encode('utf-8')
logger.info("Received HOLD_WORKER for {}".format(manager_id))
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
Expand Down Expand Up @@ -396,9 +396,9 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
msg = json.loads(message[1].decode('utf-8'))
reg_flag = True
except Exception:
logger.warning("Got Exception reading registration message from manager: {}".format(
logger.warning("Got Exception reading registration message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{}\n".format(message[1]))
logger.debug("Message: \n{!r}\n".format(message[1]))
else:
# We set up an entry only if registration works correctly
self._ready_managers[manager_id] = {'last_heartbeat': time.time(),
Expand All @@ -410,15 +410,15 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
'tasks': []}
if reg_flag is True:
interesting_managers.add(manager_id)
logger.info("Adding manager: {} to ready queue".format(manager_id))
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
m = self._ready_managers[manager_id]
m.update(msg)
logger.info("Registration info for manager {}: {}".format(manager_id, msg))
logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {} has incompatible version info with the interchange".format(manager_id))
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -431,19 +431,18 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
else:
# Registration has failed.
logger.debug("Suppressing bad registration from manager: {}".format(
manager_id))
logger.debug("Suppressing bad registration from manager: {!r}".format(manager_id))

else:
tasks_requested = int.from_bytes(message[1], "little")
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
if tasks_requested == HEARTBEAT_CODE:
logger.debug("Manager {} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
else:
logger.error("Unexpected non-heartbeat message received from manager {}")
Expand Down Expand Up @@ -497,9 +496,9 @@ def process_results_incoming(self, interesting_managers, hub_channel):
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {}".format(manager_id))
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id}")
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")

b_messages = []

Expand All @@ -511,23 +510,23 @@ def process_results_incoming(self, interesting_managers, hub_channel):
elif r['type'] == 'monitoring':
hub_channel.send_pyobj(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id} sent heartbeat via results connection")
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))

got_result = False
m = self._ready_managers[manager_id]
for (b_message, r) in b_messages:
for (_, r) in b_messages:
assert 'type' in r, f"Message is missing type entry: {r}"
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id}")
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {} with task list {}".format(
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
r['task_id'],
manager_id,
m['tasks']))
Expand All @@ -541,7 +540,7 @@ def process_results_incoming(self, interesting_managers, hub_channel):
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id}: {m['tasks']}")
logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand All @@ -558,7 +557,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel):
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id} - removing manager")
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/high_throughput/zmq_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def run(self, message, max_retries=3):
"""
reply = '__PARSL_ZMQ_PIPES_MAGIC__'
with self._lock:
for i in range(max_retries):
for _ in range(max_retries):
try:
self.zmq_socket.send_pyobj(message, copy=True)
reply = self.zmq_socket.recv_pyobj()
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def scale_out(self, blocks: int = 1) -> List[str]:
raise ScalingFailed(self, "No execution provider available")
block_ids = []
logger.info(f"Scaling out by {blocks} blocks")
for i in range(blocks):
for _ in range(blocks):
block_id = str(self._block_id_counter.get_id())
logger.info(f"Allocated block ID {block_id}")
try:
Expand Down
1 change: 0 additions & 1 deletion parsl/tests/test_python_apps/test_lifted.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def test_returns_a_class_instance_no_underscores():
returns_a_class_instance()._nosuchattribute.result()


@pytest.mark.skip("returning classes is not supported in WorkQueue or Task Vine - see issue #2908")
def test_returns_a_class():

# precondition that returns_a_class behaves
Expand Down
8 changes: 4 additions & 4 deletions parsl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from contextlib import contextmanager
from types import TracebackType
from typing import Any, Callable, List, Tuple, Union, Generator, IO, AnyStr, Dict, Optional
from typing import Any, Callable, List, Sequence, Tuple, Union, Generator, IO, AnyStr, Dict, Optional

import typeguard
from typing_extensions import Type
Expand Down Expand Up @@ -47,7 +47,7 @@ def get_version() -> str:


@typeguard.typechecked
def get_all_checkpoints(rundir: str = "runinfo") -> List[str]:
def get_all_checkpoints(rundir: str = "runinfo") -> Sequence[str]:
"""Finds the checkpoints from all runs in the rundir.
Kwargs:
Expand Down Expand Up @@ -76,7 +76,7 @@ def get_all_checkpoints(rundir: str = "runinfo") -> List[str]:


@typeguard.typechecked
def get_last_checkpoint(rundir: str = "runinfo") -> List[str]:
def get_last_checkpoint(rundir: str = "runinfo") -> Sequence[str]:
"""Finds the checkpoint from the last run, if one exists.
Note that checkpoints are incremental, and this helper will not find
Expand Down Expand Up @@ -128,7 +128,7 @@ def get_std_fname_mode(

@contextmanager
def wait_for_file(path: str, seconds: int = 10) -> Generator[None, None, None]:
for i in range(0, int(seconds * 100)):
for _ in range(0, int(seconds * 100)):
time.sleep(seconds / 100.)
if os.path.exists(path):
break
Expand Down

0 comments on commit 34f25bf

Please sign in to comment.