Skip to content

Commit

Permalink
remove tasks incoming thread, process in main thread
Browse files Browse the repository at this point in the history
performance notes:

parsl-perf -t 30, my laptop, no logging

before this PR, 2320
post this PR, 2344
  • Loading branch information
benclifford committed Jan 18, 2025
1 parent 6ac930e commit 8d4bf58
Showing 1 changed file with 28 additions and 27 deletions.
55 changes: 28 additions & 27 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ def __init__(self,
self.hub_zmq_port = hub_zmq_port

self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)

# count of tasks that have been received from the submit side
self.task_counter = 0

# count of tasks that have been sent out to worker pools
self.count = 0

self.worker_ports = worker_ports
Expand Down Expand Up @@ -201,28 +206,6 @@ def get_tasks(self, count: int) -> Sequence[dict]:

return tasks

@wrap_with_logs(target="interchange")
def task_puller(self) -> NoReturn:
"""Pull tasks from the incoming tasks zmq pipe onto the internal
pending task queue
"""
logger.info("Starting")
task_counter = 0

while True:
logger.debug("launching recv_pyobj")
try:
msg = self.task_incoming.recv_pyobj()
except zmq.Again:
# We just timed out while attempting to receive
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize()))
continue

logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
if monitoring_radio:
logger.info("Sending message {} to MonitoringHub".format(manager))
Expand Down Expand Up @@ -326,11 +309,6 @@ def start(self) -> None:

start = time.time()

self._task_puller_thread = threading.Thread(target=self.task_puller,
name="Interchange-Task-Puller",
daemon=True)
self._task_puller_thread.start()

self._command_thread = threading.Thread(target=self._command_server,
name="Interchange-Command",
daemon=True)
Expand All @@ -341,6 +319,7 @@ def start(self) -> None:
poller = zmq.Poller()
poller.register(self.task_outgoing, zmq.POLLIN)
poller.register(self.results_incoming, zmq.POLLIN)
poller.register(self.task_incoming, zmq.POLLIN)

# These are managers which we should examine in an iteration
# for scheduling a job (or maybe any other attention?).
Expand All @@ -351,6 +330,7 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_task_incoming()
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
self.expire_bad_managers(interesting_managers, monitoring_radio)
Expand All @@ -362,6 +342,27 @@ def start(self) -> None:
logger.info(f"Processed {self.count} tasks in {delta} seconds")
logger.warning("Exiting")

def process_task_incoming(self) -> None:
"""Process incoming task message(s).
TODO: is a poll event here level or edge triggered? (i.e. does this need
to loop until all messages are absorbed? I think it's level triggered,
based on how the other code is written.
"""

if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN:
logger.debug("start task_incoming section")
try:
msg = self.task_incoming.recv_pyobj()
logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
self.task_counter += 1
logger.debug(f"Fetched {self.task_counter} tasks so far")
except zmq.Again:
# We got a poll event but no message
# In some ZMQ situations, this is a legitimate happening
# I'm not exactly sure here.
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize()))

def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
Expand Down

0 comments on commit 8d4bf58

Please sign in to comment.