Skip to content

Commit

Permalink
remove TODO and try/except after looking at zmq docs
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Jan 18, 2025
1 parent 8d4bf58 commit b7c6462
Showing 1 changed file with 5 additions and 14 deletions.
19 changes: 5 additions & 14 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,24 +344,15 @@ def start(self) -> None:

def process_task_incoming(self) -> None:
"""Process incoming task message(s).
TODO: is a poll event here level or edge triggered? (i.e. does this need
to loop until all messages are absorbed? I think it's level triggered,
based on how the other code is written.
"""

if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN:
logger.debug("start task_incoming section")
try:
msg = self.task_incoming.recv_pyobj()
logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
self.task_counter += 1
logger.debug(f"Fetched {self.task_counter} tasks so far")
except zmq.Again:
# We got a poll event but no message
# In some ZMQ situations, this is a legitimate happening
# I'm not exactly sure here.
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize()))
msg = self.task_incoming.recv_pyobj()
logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
self.task_counter += 1
logger.debug(f"Fetched {self.task_counter} tasks so far")

def process_task_outgoing_incoming(
self,
Expand Down

0 comments on commit b7c6462

Please sign in to comment.