diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 5f16a4626b..0dfbc1ba33 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -344,24 +344,15 @@ def start(self) -> None: def process_task_incoming(self) -> None: """Process incoming task message(s). - TODO: is a poll event here level or edge triggered? (i.e. does this need - to loop until all messages are absorbed? I think it's level triggered, - based on how the other code is written. """ if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN: logger.debug("start task_incoming section") - try: - msg = self.task_incoming.recv_pyobj() - logger.debug("putting message onto pending_task_queue") - self.pending_task_queue.put(msg) - self.task_counter += 1 - logger.debug(f"Fetched {self.task_counter} tasks so far") - except zmq.Again: - # We got a poll event but no message - # In some ZMQ situations, this is a legitimate happening - # I'm not exactly sure here. - logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize())) + msg = self.task_incoming.recv_pyobj() + logger.debug("putting message onto pending_task_queue") + self.pending_task_queue.put(msg) + self.task_counter += 1 + logger.debug(f"Fetched {self.task_counter} tasks so far") def process_task_outgoing_incoming( self,