From b249875a58aed77acc52718f6debfb219a415490 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 1 Nov 2023 14:19:55 +0000 Subject: [PATCH] Re-enable str-bytes-safe mypy error, and fix uses These changes are all in the interchange. In most cases, this error is found when a manager id (a byte sequence) is used in a string substitution, and to make happy, the format string is modified to indicate that we really do want to represent that byte sequence as a repr. The only other place is one rendering of a binary message into a log string, and the fix is the same. from benc-mypy branch --- mypy.ini | 1 - .../executors/high_throughput/interchange.py | 34 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/mypy.ini b/mypy.ini index 22acd55173..028ab618e7 100644 --- a/mypy.ini +++ b/mypy.ini @@ -5,7 +5,6 @@ plugins = sqlalchemy.ext.mypy.plugin # str-bytes-safe warns that a byte string is formatted into a string. # which is commonly done with manager IDs in the parsl # codebase. -disable_error_code = str-bytes-safe enable_error_code = ignore-without-code no_implicit_reexport = True warn_redundant_casts = True diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 70a42ff344..5746893e03 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -305,7 +305,7 @@ def _command_server(self): elif command_req.startswith("HOLD_WORKER"): cmd, s_manager = command_req.split(';') manager_id = s_manager.encode('utf-8') - logger.info("Received HOLD_WORKER for {}".format(manager_id)) + logger.info("Received HOLD_WORKER for {!r}".format(manager_id)) if manager_id in self._ready_managers: m = self._ready_managers[manager_id] m['active'] = False @@ -396,9 +396,9 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill msg = json.loads(message[1].decode('utf-8')) reg_flag = True except Exception: - logger.warning("Got Exception reading registration message from manager: {}".format( + logger.warning("Got Exception reading registration message from manager: {!r}".format( manager_id), exc_info=True) - logger.debug("Message: \n{}\n".format(message[1])) + logger.debug("Message: \n{!r}\n".format(message[1])) else: # We set up an entry only if registration works correctly self._ready_managers[manager_id] = {'last_heartbeat': time.time(), @@ -410,15 +410,15 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill 'tasks': []} if reg_flag is True: interesting_managers.add(manager_id) - logger.info("Adding manager: {} to ready queue".format(manager_id)) + logger.info("Adding manager: {!r} to ready queue".format(manager_id)) m = self._ready_managers[manager_id] m.update(msg) - logger.info("Registration info for manager {}: {}".format(manager_id, msg)) + logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) self._send_monitoring_info(hub_channel, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {} has incompatible version info with the interchange".format(manager_id)) + logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -431,19 +431,19 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {} has compatible Python version {}".format(manager_id, + logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) + logger.info("Manager {!r} has compatible Python version {}".format(manager_id, msg['python_v'].rsplit(".", 1)[0])) else: # Registration has failed. - logger.debug("Suppressing bad registration from manager: {}".format( + logger.debug("Suppressing bad registration from manager: {!r}".format( manager_id)) else: tasks_requested = int.from_bytes(message[1], "little") self._ready_managers[manager_id]['last_heartbeat'] = time.time() if tasks_requested == HEARTBEAT_CODE: - logger.debug("Manager {} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) else: logger.error("Unexpected non-heartbeat message received from manager {}") @@ -497,9 +497,9 @@ def process_results_incoming(self, interesting_managers, hub_channel): logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {}".format(manager_id)) + logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id}") + logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") b_messages = [] @@ -511,7 +511,7 @@ def process_results_incoming(self, interesting_managers, hub_channel): elif r['type'] == 'monitoring': hub_channel.send_pyobj(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id} sent heartbeat via results connection") + logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") b_messages.append((p_message, r)) else: logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) @@ -523,11 +523,11 @@ def process_results_incoming(self, interesting_managers, hub_channel): if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id}") + logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {} with task list {}".format( + logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( r['task_id'], manager_id, m['tasks'])) @@ -541,7 +541,7 @@ def process_results_incoming(self, interesting_managers, hub_channel): self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id}: {m['tasks']}") + logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time() @@ -558,7 +558,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel): time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time())) - logger.warning(f"Too many heartbeats missed for manager {manager_id} - removing manager") + logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager") if m['active']: m['active'] = False self._send_monitoring_info(hub_channel, m)