From 8d4bf582bf7d01c310e7fc7bc72068c7b7000bfa Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 18 Jan 2025 14:37:11 +0000 Subject: [PATCH 1/2] remove tasks incoming thread, process in main thread performance notes: parsl-perf -t 30, my laptop, no logging before this PR, 2320 post this PR, 2344 --- .../executors/high_throughput/interchange.py | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 12d3e07f31..5f16a4626b 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -132,6 +132,11 @@ def __init__(self, self.hub_zmq_port = hub_zmq_port self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6) + + # count of tasks that have been received from the submit side + self.task_counter = 0 + + # count of tasks that have been sent out to worker pools self.count = 0 self.worker_ports = worker_ports @@ -201,28 +206,6 @@ def get_tasks(self, count: int) -> Sequence[dict]: return tasks - @wrap_with_logs(target="interchange") - def task_puller(self) -> NoReturn: - """Pull tasks from the incoming tasks zmq pipe onto the internal - pending task queue - """ - logger.info("Starting") - task_counter = 0 - - while True: - logger.debug("launching recv_pyobj") - try: - msg = self.task_incoming.recv_pyobj() - except zmq.Again: - # We just timed out while attempting to receive - logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize())) - continue - - logger.debug("putting message onto pending_task_queue") - self.pending_task_queue.put(msg) - task_counter += 1 - logger.debug(f"Fetched {task_counter} tasks so far") - def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None: if monitoring_radio: logger.info("Sending message {} to MonitoringHub".format(manager)) @@ -326,11 +309,6 @@ def start(self) -> None: start = time.time() - self._task_puller_thread = threading.Thread(target=self.task_puller, - name="Interchange-Task-Puller", - daemon=True) - self._task_puller_thread.start() - self._command_thread = threading.Thread(target=self._command_server, name="Interchange-Command", daemon=True) @@ -341,6 +319,7 @@ def start(self) -> None: poller = zmq.Poller() poller.register(self.task_outgoing, zmq.POLLIN) poller.register(self.results_incoming, zmq.POLLIN) + poller.register(self.task_incoming, zmq.POLLIN) # These are managers which we should examine in an iteration # for scheduling a job (or maybe any other attention?). @@ -351,6 +330,7 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) + self.process_task_incoming() self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) self.process_results_incoming(interesting_managers, monitoring_radio) self.expire_bad_managers(interesting_managers, monitoring_radio) @@ -362,6 +342,27 @@ def start(self) -> None: logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") + def process_task_incoming(self) -> None: + """Process incoming task message(s). + TODO: is a poll event here level or edge triggered? (i.e. does this need + to loop until all messages are absorbed? I think it's level triggered, + based on how the other code is written. + """ + + if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN: + logger.debug("start task_incoming section") + try: + msg = self.task_incoming.recv_pyobj() + logger.debug("putting message onto pending_task_queue") + self.pending_task_queue.put(msg) + self.task_counter += 1 + logger.debug(f"Fetched {self.task_counter} tasks so far") + except zmq.Again: + # We got a poll event but no message + # In some ZMQ situations, this is a legitimate happening + # I'm not exactly sure here. + logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize())) + def process_task_outgoing_incoming( self, interesting_managers: Set[bytes], From b7c6462238b7d76cfc00516d7bcf8224e37a7400 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 18 Jan 2025 15:22:49 +0000 Subject: [PATCH 2/2] remove TODO and try/except after looking at zmq docs --- .../executors/high_throughput/interchange.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 5f16a4626b..0dfbc1ba33 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -344,24 +344,15 @@ def start(self) -> None: def process_task_incoming(self) -> None: """Process incoming task message(s). - TODO: is a poll event here level or edge triggered? (i.e. does this need - to loop until all messages are absorbed? I think it's level triggered, - based on how the other code is written. """ if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN: logger.debug("start task_incoming section") - try: - msg = self.task_incoming.recv_pyobj() - logger.debug("putting message onto pending_task_queue") - self.pending_task_queue.put(msg) - self.task_counter += 1 - logger.debug(f"Fetched {self.task_counter} tasks so far") - except zmq.Again: - # We got a poll event but no message - # In some ZMQ situations, this is a legitimate happening - # I'm not exactly sure here. - logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize())) + msg = self.task_incoming.recv_pyobj() + logger.debug("putting message onto pending_task_queue") + self.pending_task_queue.put(msg) + self.task_counter += 1 + logger.debug(f"Fetched {self.task_counter} tasks so far") def process_task_outgoing_incoming( self,