Skip to content

Commit

Permalink
Re-enable str-bytes-safe mypy error, and fix uses
Browse files Browse the repository at this point in the history
These changes are all in the interchange. In most cases, this error is
found when a manager id (a byte sequence) is used in a string substitution,
and to make happy, the format string is modified to indicate that we
really do want to represent that byte sequence as a repr.

The only other place is one rendering of a binary message into a log
string, and the fix is the same.

from benc-mypy branch
  • Loading branch information
benclifford committed Nov 1, 2023
1 parent 471fb90 commit b249875
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
1 change: 0 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ plugins = sqlalchemy.ext.mypy.plugin
# str-bytes-safe warns that a byte string is formatted into a string.
# which is commonly done with manager IDs in the parsl
# codebase.
disable_error_code = str-bytes-safe
enable_error_code = ignore-without-code
no_implicit_reexport = True
warn_redundant_casts = True
Expand Down
34 changes: 17 additions & 17 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def _command_server(self):
elif command_req.startswith("HOLD_WORKER"):
cmd, s_manager = command_req.split(';')
manager_id = s_manager.encode('utf-8')
logger.info("Received HOLD_WORKER for {}".format(manager_id))
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
Expand Down Expand Up @@ -396,9 +396,9 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
msg = json.loads(message[1].decode('utf-8'))
reg_flag = True
except Exception:
logger.warning("Got Exception reading registration message from manager: {}".format(
logger.warning("Got Exception reading registration message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{}\n".format(message[1]))
logger.debug("Message: \n{!r}\n".format(message[1]))
else:
# We set up an entry only if registration works correctly
self._ready_managers[manager_id] = {'last_heartbeat': time.time(),
Expand All @@ -410,15 +410,15 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
'tasks': []}
if reg_flag is True:
interesting_managers.add(manager_id)
logger.info("Adding manager: {} to ready queue".format(manager_id))
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
m = self._ready_managers[manager_id]
m.update(msg)
logger.info("Registration info for manager {}: {}".format(manager_id, msg))
logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {} has incompatible version info with the interchange".format(manager_id))
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -431,19 +431,19 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {} has compatible Python version {}".format(manager_id,
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
else:
# Registration has failed.
logger.debug("Suppressing bad registration from manager: {}".format(
logger.debug("Suppressing bad registration from manager: {!r}".format(
manager_id))

else:
tasks_requested = int.from_bytes(message[1], "little")
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
if tasks_requested == HEARTBEAT_CODE:
logger.debug("Manager {} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
else:
logger.error("Unexpected non-heartbeat message received from manager {}")
Expand Down Expand Up @@ -497,9 +497,9 @@ def process_results_incoming(self, interesting_managers, hub_channel):
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {}".format(manager_id))
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id}")
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")

b_messages = []

Expand All @@ -511,7 +511,7 @@ def process_results_incoming(self, interesting_managers, hub_channel):
elif r['type'] == 'monitoring':
hub_channel.send_pyobj(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id} sent heartbeat via results connection")
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))
Expand All @@ -523,11 +523,11 @@ def process_results_incoming(self, interesting_managers, hub_channel):
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id}")
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {} with task list {}".format(
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
r['task_id'],
manager_id,
m['tasks']))
Expand All @@ -541,7 +541,7 @@ def process_results_incoming(self, interesting_managers, hub_channel):
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id}: {m['tasks']}")
logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand All @@ -558,7 +558,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel):
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id} - removing manager")
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
Expand Down

0 comments on commit b249875

Please sign in to comment.