Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move htex interchange tasks incoming thread into main thread #3752

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 19 additions & 27 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ def __init__(self,
self.hub_zmq_port = hub_zmq_port

self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)

# count of tasks that have been received from the submit side
self.task_counter = 0

# count of tasks that have been sent out to worker pools
self.count = 0

self.worker_ports = worker_ports
Expand Down Expand Up @@ -201,28 +206,6 @@ def get_tasks(self, count: int) -> Sequence[dict]:

return tasks

@wrap_with_logs(target="interchange")
def task_puller(self) -> NoReturn:
"""Pull tasks from the incoming tasks zmq pipe onto the internal
pending task queue
"""
logger.info("Starting")
task_counter = 0

while True:
logger.debug("launching recv_pyobj")
try:
msg = self.task_incoming.recv_pyobj()
except zmq.Again:
# We just timed out while attempting to receive
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize()))
continue

logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
if monitoring_radio:
logger.info("Sending message {} to MonitoringHub".format(manager))
Expand Down Expand Up @@ -326,11 +309,6 @@ def start(self) -> None:

start = time.time()

self._task_puller_thread = threading.Thread(target=self.task_puller,
name="Interchange-Task-Puller",
daemon=True)
self._task_puller_thread.start()

self._command_thread = threading.Thread(target=self._command_server,
name="Interchange-Command",
daemon=True)
Expand All @@ -341,6 +319,7 @@ def start(self) -> None:
poller = zmq.Poller()
poller.register(self.task_outgoing, zmq.POLLIN)
poller.register(self.results_incoming, zmq.POLLIN)
poller.register(self.task_incoming, zmq.POLLIN)

# These are managers which we should examine in an iteration
# for scheduling a job (or maybe any other attention?).
Expand All @@ -351,6 +330,7 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_task_incoming()
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
self.expire_bad_managers(interesting_managers, monitoring_radio)
Expand All @@ -362,6 +342,18 @@ def start(self) -> None:
logger.info(f"Processed {self.count} tasks in {delta} seconds")
logger.warning("Exiting")

def process_task_incoming(self) -> None:
"""Process incoming task message(s).
"""

if self.task_incoming in self.socks and self.socks[self.task_incoming] == zmq.POLLIN:
logger.debug("start task_incoming section")
msg = self.task_incoming.recv_pyobj()
logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
self.task_counter += 1
logger.debug(f"Fetched {self.task_counter} tasks so far")

def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
Expand Down