From d9508522d184dc75d7d3e1009bcc4928f4a6f3bb Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 14:26:42 +0100 Subject: [PATCH 01/15] Add SLURMRunner from jacobtomlinson/dask-hpc-runners --- dask_jobqueue/runner.py | 239 ++++++++++++++++++++++ dask_jobqueue/slurm.py | 86 +++++++- dask_jobqueue/tests/slurm_runner/basic.py | 8 + dask_jobqueue/tests/test_slurm.py | 23 +++ 4 files changed, 355 insertions(+), 1 deletion(-) create mode 100644 dask_jobqueue/runner.py create mode 100644 dask_jobqueue/tests/slurm_runner/basic.py diff --git a/dask_jobqueue/runner.py b/dask_jobqueue/runner.py new file mode 100644 index 00000000..641f29e7 --- /dev/null +++ b/dask_jobqueue/runner.py @@ -0,0 +1,239 @@ +import asyncio +import sys +import os +import signal +from contextlib import suppress +from enum import Enum +from typing import Dict +import warnings +from tornado.ioloop import IOLoop + +from distributed.core import CommClosedError, Status, rpc +from distributed.scheduler import Scheduler +from distributed.utils import LoopRunner, import_term, SyncMethodMixin +from distributed.worker import Worker + + +# Close gracefully when receiving a SIGINT +signal.signal(signal.SIGINT, lambda *_: sys.exit()) + + +class Role(Enum): + """ + This Enum contains the various roles processes can be. + """ + + worker = "worker" + scheduler = "scheduler" + client = "client" + + +class BaseRunner(SyncMethodMixin): + """Superclass for runner objects. + + This class contains common functionality for Dask cluster runner classes. + + To implement this class, you must provide + + 1. A ``get_role`` method which returns a role from the ``Role`` enum. + 2. A ``set_scheduler_address`` method for the scheduler process to communicate its address. + 3. A ``get_scheduler_address`` method for all other processed to recieve the scheduler address. + 4. Optionally, a ``get_worker_name`` to provide a platform specific name to the workers. + 5. Optionally, a ``before_scheduler_start`` to perform any actions before the scheduler is created. + 6. Optionally, a ``before_worker_start`` to perform any actions before the worker is created. + 7. Optionally, a ``before_client_start`` to perform any actions before the client code continues. + 8. Optionally, a ``on_scheduler_start`` to perform anything on the scheduler once it has started. + 9. Optionally, a ``on_worker_start`` to perform anything on the worker once it has started. + + For that, you should get the following: + + A context manager and object which can be used within a script that is run in parallel to decide which processes + run the scheduler, workers and client code. + + """ + + __loop: IOLoop | None = None + + def __init__( + self, + scheduler: bool = True, + scheduler_options: Dict = None, + worker_class: str = None, + worker_options: Dict = None, + client: bool = True, + asynchronous: bool = False, + loop: asyncio.BaseEventLoop = None, + ): + self.status = Status.created + self.scheduler = scheduler + self.scheduler_address = None + self.scheduler_comm = None + self.client = client + if self.client and not self.scheduler: + raise RuntimeError("Cannot run client code without a scheduler.") + self.scheduler_options = ( + scheduler_options if scheduler_options is not None else {} + ) + self.worker_class = ( + Worker if worker_class is None else import_term(worker_class) + ) + self.worker_options = worker_options if worker_options is not None else {} + self.role = None + self.__asynchronous = asynchronous + self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) + + if not self.__asynchronous: + self._loop_runner.start() + self.sync(self._start) + + async def get_role(self) -> str: + raise NotImplementedError() + + async def set_scheduler_address(self, scheduler: Scheduler) -> None: + raise NotImplementedError() + + async def get_scheduler_address(self) -> str: + raise NotImplementedError() + + async def get_worker_name(self) -> str: + return None + + async def before_scheduler_start(self) -> None: + return None + + async def before_worker_start(self) -> None: + return None + + async def before_client_start(self) -> None: + return None + + async def on_scheduler_start(self, scheduler: Scheduler) -> None: + return None + + async def on_worker_start(self, worker: Worker) -> None: + return None + + @property + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + if value is None: + raise ValueError("expected an IOLoop, got None") + self.__loop = value + + def __await__(self): + async def _await(): + if self.status != Status.running: + await self._start() + return self + + return _await().__await__() + + async def __aenter__(self): + await self + return self + + async def __aexit__(self, *args): + await self._close() + + def __enter__(self): + return self.sync(self.__aenter__) + + def __exit__(self, typ, value, traceback): + return self.sync(self.__aexit__) + + def __del__(self): + with suppress(AttributeError, RuntimeError): # during closing + self.loop.add_callback(self.close) + + async def _start(self) -> None: + self.role = await self.get_role() + if self.role == Role.scheduler: + await self.start_scheduler() + os.kill( + os.getpid(), signal.SIGINT + ) # Shutdown with a signal to give the event loop time to close + elif self.role == Role.worker: + await self.start_worker() + os.kill( + os.getpid(), signal.SIGINT + ) # Shutdown with a signal to give the event loop time to close + elif self.role == Role.client: + self.scheduler_address = await self.get_scheduler_address() + if self.scheduler_address: + self.scheduler_comm = rpc(self.scheduler_address) + await self.before_client_start() + self.status = Status.running + + async def start_scheduler(self) -> None: + await self.before_scheduler_start() + async with Scheduler(**self.scheduler_options) as scheduler: + await self.set_scheduler_address(scheduler) + await self.on_scheduler_start(scheduler) + await scheduler.finished() + + async def start_worker(self) -> None: + if ( + "scheduler_file" not in self.worker_options + and "scheduler_ip" not in self.worker_options + ): + self.worker_options["scheduler_ip"] = await self.get_scheduler_address() + worker_name = await self.get_worker_name() + await self.before_worker_start() + async with self.worker_class(name=worker_name, **self.worker_options) as worker: + await self.on_worker_start(worker) + await worker.finished() + + async def _close(self) -> None: + if self.status == Status.running: + if self.scheduler_comm: + with suppress(CommClosedError): + await self.scheduler_comm.terminate() + self.status = Status.closed + + def close(self) -> None: + return self.sync(self._close) + + +class AsyncCommWorld: + def __init__(self): + self.roles = {"scheduler": None, "client": None} + self.role_lock = asyncio.Lock() + self.scheduler_address = None + + +class AsyncRunner(BaseRunner): + def __init__(self, commworld: AsyncCommWorld, *args, **kwargs): + self.commworld = commworld + super().__init__(*args, **kwargs) + + async def get_role(self) -> str: + async with self.commworld.role_lock: + if self.commworld.roles["scheduler"] is None and self.scheduler: + self.commworld.roles["scheduler"] = self + return Role.scheduler + elif self.commworld.roles["client"] is None and self.client: + self.commworld.roles["client"] = self + return Role.client + else: + return Role.worker + + async def set_scheduler_address(self, scheduler: Scheduler) -> None: + self.commworld.scheduler_address = scheduler.address + + async def get_scheduler_address(self) -> str: + while self.commworld.scheduler_address is None: + await asyncio.sleep(0.1) + return self.commworld.scheduler_address diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 8e6c1a07..35cf9e23 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,10 +1,16 @@ import logging import math import warnings +import asyncio +import json +import os +from pathlib import Path import dask +from dask.distributed import Scheduler from .core import Job, JobQueueCluster, job_parameters, cluster_parameters +from .runner import Role, BaseRunner logger = logging.getLogger(__name__) @@ -26,7 +32,7 @@ def __init__( job_cpu=None, job_mem=None, config_name=None, - **base_class_kwargs + **base_class_kwargs, ): super().__init__( scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs @@ -177,3 +183,81 @@ class SLURMCluster(JobQueueCluster): job=job_parameters, cluster=cluster_parameters ) job_cls = SLURMJob + + +class WorldTooSmallException(RuntimeError): + """Not enough Slurm tasks to start all required processes.""" + + +class SLURMRunner(BaseRunner): + def __init__(self, *args, scheduler_file="scheduler-{job_id}.json", **kwargs): + try: + self.proc_id = int(os.environ["SLURM_PROCID"]) + self.world_size = self.n_workers = int(os.environ["SLURM_NTASKS"]) + self.job_id = int(os.environ["SLURM_JOB_ID"]) + except KeyError as e: + raise RuntimeError( + "SLURM_PROCID, SLURM_NTASKS, and SLURM_JOB_ID must be present in the environment." + ) from e + if not scheduler_file: + scheduler_file = kwargs.get("scheduler_options", {}).get("scheduler_file") + + if not scheduler_file: + raise RuntimeError( + "scheduler_file must be specified in either the " + "scheduler_options or as keyword argument to SlurmRunner." + ) + + # Encourage filename uniqueness by inserting the job ID + scheduler_file = scheduler_file.format(job_id=self.job_id) + scheduler_file = Path(scheduler_file) + + if isinstance(kwargs.get("scheduler_options"), dict): + kwargs["scheduler_options"]["scheduler_file"] = scheduler_file + else: + kwargs["scheduler_options"] = {"scheduler_file": scheduler_file} + if isinstance(kwargs.get("worker_options"), dict): + kwargs["worker_options"]["scheduler_file"] = scheduler_file + else: + kwargs["worker_options"] = {"scheduler_file": scheduler_file} + + self.scheduler_file = scheduler_file + + super().__init__(*args, **kwargs) + + async def get_role(self) -> str: + if self.scheduler and self.client and self.world_size < 3: + raise WorldTooSmallException( + f"Not enough Slurm tasks to start cluster, found {self.world_size}, " + "needs at least 3, one each for the scheduler, client and a worker." + ) + elif self.scheduler and self.world_size < 2: + raise WorldTooSmallException( + f"Not enough Slurm tasks to start cluster, found {self.world_size}, " + "needs at least 2, one each for the scheduler and a worker." + ) + self.n_workers -= int(self.scheduler) + int(self.client) + if self.proc_id == 0 and self.scheduler: + return Role.scheduler + elif self.proc_id == 1 and self.client: + return Role.client + else: + return Role.worker + + async def set_scheduler_address(self, scheduler: Scheduler) -> None: + return + + async def get_scheduler_address(self) -> str: + while not self.scheduler_file or not self.scheduler_file.exists(): + await asyncio.sleep(0.2) + cfg = json.loads(self.scheduler_file.read_text()) + return cfg["address"] + + async def on_scheduler_start(self, scheduler: Scheduler) -> None: + return + + async def get_worker_name(self) -> str: + return self.proc_id + + async def _close(self): + await super()._close() diff --git a/dask_jobqueue/tests/slurm_runner/basic.py b/dask_jobqueue/tests/slurm_runner/basic.py new file mode 100644 index 00000000..127756ec --- /dev/null +++ b/dask_jobqueue/tests/slurm_runner/basic.py @@ -0,0 +1,8 @@ +from dask.distributed import Client +from dask_jobqueue.slurm import SLURMRunner + +with SLURMRunner() as runner: + with Client(runner) as client: + assert client.submit(lambda x: x + 1, 10).result() == 11 + assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21 + print("Test passed") diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 129027e1..2a1e7928 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -1,3 +1,5 @@ +import os +import subprocess import sys from time import sleep, time @@ -12,6 +14,13 @@ from . import QUEUE_WAIT +def slurm_cores(): + "Use sinfo to get the number of available CPU cores" + return int( + subprocess.check_output(["sinfo", "-o", "%C"]).split()[1].decode().split("/")[1] + ) + + def test_header(): with SLURMCluster( walltime="00:02:00", processes=4, cores=8, memory="28GB" @@ -314,3 +323,17 @@ def test_deprecation_project(): ) job_script = job.job_script() assert "project" in job_script + + +@pytest.mark.env("slurm") +@pytest.mark.skipif(slurm_cores() < 4, reason="Need at least 4 CPUs to run this test") +def test_slurm_runner(): + script_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "slurm_runner", "basic.py" + ) + + output = subprocess.check_output( + ["srun", "--mpi=none", "-vvvv", "-n", "4", sys.executable, script_file] + ) + output = output.decode() + assert "Test passed" in output From 89190f0f9e208844d7fcf8ba011f9008255c9b7b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 15:18:36 +0100 Subject: [PATCH 02/15] Remove unused code --- dask_jobqueue/runner.py | 4 ++-- dask_jobqueue/slurm.py | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/dask_jobqueue/runner.py b/dask_jobqueue/runner.py index 641f29e7..230c1dc5 100644 --- a/dask_jobqueue/runner.py +++ b/dask_jobqueue/runner.py @@ -36,7 +36,7 @@ class BaseRunner(SyncMethodMixin): To implement this class, you must provide 1. A ``get_role`` method which returns a role from the ``Role`` enum. - 2. A ``set_scheduler_address`` method for the scheduler process to communicate its address. + 2. Optionally, a ``set_scheduler_address`` method for the scheduler process to communicate its address. 3. A ``get_scheduler_address`` method for all other processed to recieve the scheduler address. 4. Optionally, a ``get_worker_name`` to provide a platform specific name to the workers. 5. Optionally, a ``before_scheduler_start`` to perform any actions before the scheduler is created. @@ -90,7 +90,7 @@ async def get_role(self) -> str: raise NotImplementedError() async def set_scheduler_address(self, scheduler: Scheduler) -> None: - raise NotImplementedError() + raise None async def get_scheduler_address(self) -> str: raise NotImplementedError() diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 35cf9e23..d31e5562 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -253,11 +253,5 @@ async def get_scheduler_address(self) -> str: cfg = json.loads(self.scheduler_file.read_text()) return cfg["address"] - async def on_scheduler_start(self, scheduler: Scheduler) -> None: - return - async def get_worker_name(self) -> str: return self.proc_id - - async def _close(self): - await super()._close() From db0d01ed0c58ad7e08c908b515b6b9fa19391d0a Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 15:20:03 +0100 Subject: [PATCH 03/15] Add base class test --- dask_jobqueue/tests/test_runner.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 dask_jobqueue/tests/test_runner.py diff --git a/dask_jobqueue/tests/test_runner.py b/dask_jobqueue/tests/test_runner.py new file mode 100644 index 00000000..d6fec891 --- /dev/null +++ b/dask_jobqueue/tests/test_runner.py @@ -0,0 +1,29 @@ +import asyncio +from contextlib import suppress +from time import time + +import pytest + +from dask.distributed import Client + +from dask_jobqueue.runner import AsyncCommWorld, AsyncRunner + + +@pytest.mark.asyncio +@pytest.mark.timeout(10) +async def test_runner(): + commworld = AsyncCommWorld() + + async def run_code(commworld): + with suppress(SystemExit): + async with AsyncRunner(commworld, asynchronous=True) as runner: + async with Client(runner, asynchronous=True) as c: + start = time() + while len(c.scheduler_info()["workers"]) != 2: + assert time() < start + 10 + await asyncio.sleep(0.2) + + assert await c.submit(lambda x: x + 1, 10).result() == 11 + assert await c.submit(lambda x: x + 1, 20).result() == 21 + + await asyncio.gather(*[run_code(commworld) for _ in range(4)]) From e6e8e9ccf930ea6704221a03db889c4921838b6b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 15:33:07 +0100 Subject: [PATCH 04/15] Fix typing error in older Python versions --- dask_jobqueue/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/runner.py b/dask_jobqueue/runner.py index 230c1dc5..00d32e55 100644 --- a/dask_jobqueue/runner.py +++ b/dask_jobqueue/runner.py @@ -4,7 +4,7 @@ import signal from contextlib import suppress from enum import Enum -from typing import Dict +from typing import Dict, Optional import warnings from tornado.ioloop import IOLoop @@ -114,7 +114,7 @@ async def on_worker_start(self, worker: Worker) -> None: return None @property - def loop(self) -> IOLoop | None: + def loop(self) -> Optional[IOLoop]: loop = self.__loop if loop is None: # If the loop is not running when this is called, the LoopRunner.loop From 0b22db59201d1f7aaea237f19d7a8743894d5340 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 15:38:17 +0100 Subject: [PATCH 05/15] Fix another typing error in older Python versions --- dask_jobqueue/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/runner.py b/dask_jobqueue/runner.py index 00d32e55..60da426e 100644 --- a/dask_jobqueue/runner.py +++ b/dask_jobqueue/runner.py @@ -52,7 +52,7 @@ class BaseRunner(SyncMethodMixin): """ - __loop: IOLoop | None = None + __loop: Optional[IOLoop] = None def __init__( self, From a8e19aebce2b62470c6a9b45f6c3a134a6e51a27 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 15:43:37 +0100 Subject: [PATCH 06/15] If SLURM not installed report 0 cores --- dask_jobqueue/tests/test_slurm.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 2a1e7928..3f8da455 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -16,9 +16,15 @@ def slurm_cores(): "Use sinfo to get the number of available CPU cores" - return int( - subprocess.check_output(["sinfo", "-o", "%C"]).split()[1].decode().split("/")[1] - ) + try: + return int( + subprocess.check_output(["sinfo", "-o", "%C"]) + .split()[1] + .decode() + .split("/")[1] + ) + except FileNotFoundError: + return 0 def test_header(): From fe7121d5efa92c18856d395b7d68bb873e63c426 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Aug 2024 16:30:12 +0100 Subject: [PATCH 07/15] Remove timeout mark --- dask_jobqueue/tests/test_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_jobqueue/tests/test_runner.py b/dask_jobqueue/tests/test_runner.py index d6fec891..a486b654 100644 --- a/dask_jobqueue/tests/test_runner.py +++ b/dask_jobqueue/tests/test_runner.py @@ -10,7 +10,6 @@ @pytest.mark.asyncio -@pytest.mark.timeout(10) async def test_runner(): commworld = AsyncCommWorld() From 220655682c818898e66158fca1b5d155c175b7f4 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Aug 2024 12:39:42 +0100 Subject: [PATCH 08/15] Ensure Slurm workers have dev code to run runner script --- ci/slurm.sh | 4 +++- ci/slurm/docker-compose.yml | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ci/slurm.sh b/ci/slurm.sh index 217ad3e1..3265a736 100644 --- a/ci/slurm.sh +++ b/ci/slurm.sh @@ -28,7 +28,9 @@ function show_network_interfaces { } function jobqueue_install { - docker exec slurmctld conda run -n dask-jobqueue /bin/bash -c "cd /dask-jobqueue; pip install -e ." + for c in slurmctld c1 c2; do + docker exec $c conda run -n dask-jobqueue /bin/bash -c "cd /dask-jobqueue; pip install -e ." + done } function jobqueue_script { diff --git a/ci/slurm/docker-compose.yml b/ci/slurm/docker-compose.yml index cdb9475d..ac06352a 100644 --- a/ci/slurm/docker-compose.yml +++ b/ci/slurm/docker-compose.yml @@ -69,6 +69,7 @@ services: - slurm_jobdir:/data - var_log_slurm:/var/log/slurm - shared_space:/shared_space + - ../..:/dask-jobqueue expose: - "6818" depends_on: @@ -91,6 +92,7 @@ services: - slurm_jobdir:/data - var_log_slurm:/var/log/slurm - shared_space:/shared_space + - ../..:/dask-jobqueue expose: - "6818" depends_on: From c630c113d05e786baaf379fa60d6e44145f19615 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Aug 2024 14:28:03 +0100 Subject: [PATCH 09/15] Add extra nodes --- ci/slurm/docker-compose.yml | 46 +++++++++++++++++++++++++++++++++++++ ci/slurm/slurm.conf | 4 ++-- ci/slurm/start-slurm.sh | 2 ++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/ci/slurm/docker-compose.yml b/ci/slurm/docker-compose.yml index ac06352a..fe76c9a5 100644 --- a/ci/slurm/docker-compose.yml +++ b/ci/slurm/docker-compose.yml @@ -103,6 +103,52 @@ services: cap_add: - NET_ADMIN + c3: + image: daskdev/dask-jobqueue:slurm + build: . + command: ["slurmd"] + hostname: c3 + container_name: c3 + volumes: + - etc_munge:/etc/munge + - etc_slurm:/etc/slurm + - slurm_jobdir:/data + - var_log_slurm:/var/log/slurm + - shared_space:/shared_space + - ../..:/dask-jobqueue + expose: + - "6818" + depends_on: + - "slurmctld" + networks: + common-network: + ipv4_address: 10.1.1.13 + cap_add: + - NET_ADMIN + + c4: + image: daskdev/dask-jobqueue:slurm + build: . + command: ["slurmd"] + hostname: c4 + container_name: c4 + volumes: + - etc_munge:/etc/munge + - etc_slurm:/etc/slurm + - slurm_jobdir:/data + - var_log_slurm:/var/log/slurm + - shared_space:/shared_space + - ../..:/dask-jobqueue + expose: + - "6818" + depends_on: + - "slurmctld" + networks: + common-network: + ipv4_address: 10.1.1.14 + cap_add: + - NET_ADMIN + volumes: etc_munge: etc_slurm: diff --git a/ci/slurm/slurm.conf b/ci/slurm/slurm.conf index 1af3bdf8..98cad3f7 100644 --- a/ci/slurm/slurm.conf +++ b/ci/slurm/slurm.conf @@ -88,7 +88,7 @@ AccountingStoragePort=6819 #AccountingStorageUser= # # COMPUTE NODES -NodeName=c[1-2] RealMemory=4096 CPUs=2 State=UNKNOWN +NodeName=c[1-4] RealMemory=4096 CPUs=2 State=UNKNOWN # # PARTITIONS -PartitionName=normal Default=yes Nodes=c[1-2] Priority=50 DefMemPerCPU=2048 Shared=NO MaxNodes=2 MaxTime=5-00:00:00 DefaultTime=5-00:00:00 State=UP +PartitionName=normal Default=yes Nodes=c[1-4] Priority=50 DefMemPerCPU=2048 Shared=NO MaxNodes=4 MaxTime=5-00:00:00 DefaultTime=5-00:00:00 State=UP diff --git a/ci/slurm/start-slurm.sh b/ci/slurm/start-slurm.sh index 6b74c2d8..f61fe518 100755 --- a/ci/slurm/start-slurm.sh +++ b/ci/slurm/start-slurm.sh @@ -15,3 +15,5 @@ echo "SLURM properly configured" docker exec slurmctld ip addr add 10.1.1.20/24 dev eth0 label eth0:scheduler docker exec c1 ip addr add 10.1.1.21/24 dev eth0 label eth0:worker docker exec c2 ip addr add 10.1.1.22/24 dev eth0 label eth0:worker +docker exec c3 ip addr add 10.1.1.23/24 dev eth0 label eth0:worker +docker exec c4 ip addr add 10.1.1.24/24 dev eth0 label eth0:worker From 6926bf8373fff9bce6592677d05079762938a25c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Aug 2024 17:19:44 +0100 Subject: [PATCH 10/15] Revert "Add extra nodes" This reverts commit c630c113d05e786baaf379fa60d6e44145f19615. --- ci/slurm/docker-compose.yml | 46 ------------------------------------- ci/slurm/slurm.conf | 4 ++-- ci/slurm/start-slurm.sh | 2 -- 3 files changed, 2 insertions(+), 50 deletions(-) diff --git a/ci/slurm/docker-compose.yml b/ci/slurm/docker-compose.yml index fe76c9a5..ac06352a 100644 --- a/ci/slurm/docker-compose.yml +++ b/ci/slurm/docker-compose.yml @@ -103,52 +103,6 @@ services: cap_add: - NET_ADMIN - c3: - image: daskdev/dask-jobqueue:slurm - build: . - command: ["slurmd"] - hostname: c3 - container_name: c3 - volumes: - - etc_munge:/etc/munge - - etc_slurm:/etc/slurm - - slurm_jobdir:/data - - var_log_slurm:/var/log/slurm - - shared_space:/shared_space - - ../..:/dask-jobqueue - expose: - - "6818" - depends_on: - - "slurmctld" - networks: - common-network: - ipv4_address: 10.1.1.13 - cap_add: - - NET_ADMIN - - c4: - image: daskdev/dask-jobqueue:slurm - build: . - command: ["slurmd"] - hostname: c4 - container_name: c4 - volumes: - - etc_munge:/etc/munge - - etc_slurm:/etc/slurm - - slurm_jobdir:/data - - var_log_slurm:/var/log/slurm - - shared_space:/shared_space - - ../..:/dask-jobqueue - expose: - - "6818" - depends_on: - - "slurmctld" - networks: - common-network: - ipv4_address: 10.1.1.14 - cap_add: - - NET_ADMIN - volumes: etc_munge: etc_slurm: diff --git a/ci/slurm/slurm.conf b/ci/slurm/slurm.conf index 98cad3f7..1af3bdf8 100644 --- a/ci/slurm/slurm.conf +++ b/ci/slurm/slurm.conf @@ -88,7 +88,7 @@ AccountingStoragePort=6819 #AccountingStorageUser= # # COMPUTE NODES -NodeName=c[1-4] RealMemory=4096 CPUs=2 State=UNKNOWN +NodeName=c[1-2] RealMemory=4096 CPUs=2 State=UNKNOWN # # PARTITIONS -PartitionName=normal Default=yes Nodes=c[1-4] Priority=50 DefMemPerCPU=2048 Shared=NO MaxNodes=4 MaxTime=5-00:00:00 DefaultTime=5-00:00:00 State=UP +PartitionName=normal Default=yes Nodes=c[1-2] Priority=50 DefMemPerCPU=2048 Shared=NO MaxNodes=2 MaxTime=5-00:00:00 DefaultTime=5-00:00:00 State=UP diff --git a/ci/slurm/start-slurm.sh b/ci/slurm/start-slurm.sh index f61fe518..6b74c2d8 100755 --- a/ci/slurm/start-slurm.sh +++ b/ci/slurm/start-slurm.sh @@ -15,5 +15,3 @@ echo "SLURM properly configured" docker exec slurmctld ip addr add 10.1.1.20/24 dev eth0 label eth0:scheduler docker exec c1 ip addr add 10.1.1.21/24 dev eth0 label eth0:worker docker exec c2 ip addr add 10.1.1.22/24 dev eth0 label eth0:worker -docker exec c3 ip addr add 10.1.1.23/24 dev eth0 label eth0:worker -docker exec c4 ip addr add 10.1.1.24/24 dev eth0 label eth0:worker From 233cf3d5d9e0731b6cd24f329b59b3752498463c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Aug 2024 17:20:15 +0100 Subject: [PATCH 11/15] Set scheduler file path correctly --- dask_jobqueue/tests/slurm_runner/basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/tests/slurm_runner/basic.py b/dask_jobqueue/tests/slurm_runner/basic.py index 127756ec..dc01b468 100644 --- a/dask_jobqueue/tests/slurm_runner/basic.py +++ b/dask_jobqueue/tests/slurm_runner/basic.py @@ -1,7 +1,7 @@ from dask.distributed import Client from dask_jobqueue.slurm import SLURMRunner -with SLURMRunner() as runner: +with SLURMRunner(scheduler_file="/shared_space/{job_id}.json") as runner: with Client(runner) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21 From ef29d094526ae9705281c19f87c87dab29230536 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Aug 2024 17:26:30 +0100 Subject: [PATCH 12/15] Make debug output less noisy --- dask_jobqueue/tests/test_slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 3f8da455..5b4b166b 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -339,7 +339,7 @@ def test_slurm_runner(): ) output = subprocess.check_output( - ["srun", "--mpi=none", "-vvvv", "-n", "4", sys.executable, script_file] + ["srun", "--mpi=none", "-vv", "-n", "4", sys.executable, script_file] ) output = output.decode() assert "Test passed" in output From 17ef8e5190500d9703acb063e664e0e894a73cb2 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 21 Aug 2024 14:52:27 +0100 Subject: [PATCH 13/15] Refactor docs and add runners section --- docs/requirements-docs.txt | 2 +- ... => clusters-advanced-tips-and-tricks.rst} | 2 +- docs/source/{api.rst => clusters-api.rst} | 6 +- ...st => clusters-configuration-examples.rst} | 0 ...p.rst => clusters-configuration-setup.rst} | 0 ...uration.rst => clusters-configuration.rst} | 2 +- ...s.rst => clusters-example-deployments.rst} | 0 ...howitworks.rst => clusters-howitworks.rst} | 0 ...teractive.rst => clusters-interactive.rst} | 0 docs/source/clusters-overview.rst | 43 ++++++ docs/source/conf.py | 2 +- docs/source/develop.rst | 2 +- docs/source/index.rst | 113 +++++++++++----- docs/source/runners-api.rst | 12 ++ docs/source/runners-implementing-new.rst | 124 ++++++++++++++++++ docs/source/runners-overview.rst | 32 +++++ 16 files changed, 299 insertions(+), 41 deletions(-) rename docs/source/{advanced-tips-and-tricks.rst => clusters-advanced-tips-and-tricks.rst} (99%) rename docs/source/{api.rst => clusters-api.rst} (80%) rename docs/source/{configurations.rst => clusters-configuration-examples.rst} (100%) rename docs/source/{configuration-setup.rst => clusters-configuration-setup.rst} (100%) rename docs/source/{configuration.rst => clusters-configuration.rst} (97%) rename docs/source/{examples.rst => clusters-example-deployments.rst} (100%) rename docs/source/{howitworks.rst => clusters-howitworks.rst} (100%) rename docs/source/{interactive.rst => clusters-interactive.rst} (100%) create mode 100644 docs/source/clusters-overview.rst create mode 100644 docs/source/runners-api.rst create mode 100644 docs/source/runners-implementing-new.rst create mode 100644 docs/source/runners-overview.rst diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index b4923de1..8478b248 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,7 +1,7 @@ distributed numpydoc ipython -sphinx +sphinx<5 dask-sphinx-theme>=3.0.0 # FIXME: This workaround is required until we have sphinx>=5, as enabled by # dask-sphinx-theme no longer pinning sphinx-book-theme==0.2.0. This is diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/clusters-advanced-tips-and-tricks.rst similarity index 99% rename from docs/source/advanced-tips-and-tricks.rst rename to docs/source/clusters-advanced-tips-and-tricks.rst index 8100d7f2..257ed098 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/clusters-advanced-tips-and-tricks.rst @@ -11,7 +11,7 @@ This page is an attempt to document tips and tricks that are likely to be useful on some clusters (strictly more than one ideally although hard to be sure ...). Skipping unrecognised line in submission script with ``job_directives_skip`` --------------------------------------------------------------------- +---------------------------------------------------------------------------- *Note: the parameter* ``job_directives_skip`` *was named* ``header_skip`` *until version 0.8.0.* ``header_skip`` *can still be used, but is considered deprecated and will be removed in a future version.* diff --git a/docs/source/api.rst b/docs/source/clusters-api.rst similarity index 80% rename from docs/source/api.rst rename to docs/source/clusters-api.rst index 2d963054..1f6f427c 100644 --- a/docs/source/api.rst +++ b/docs/source/clusters-api.rst @@ -1,10 +1,10 @@ -.. _api: +.. _clusters_api: .. currentmodule:: dask_jobqueue -API -=== +Clusters API +============ .. autosummary:: :toctree: generated/ diff --git a/docs/source/configurations.rst b/docs/source/clusters-configuration-examples.rst similarity index 100% rename from docs/source/configurations.rst rename to docs/source/clusters-configuration-examples.rst diff --git a/docs/source/configuration-setup.rst b/docs/source/clusters-configuration-setup.rst similarity index 100% rename from docs/source/configuration-setup.rst rename to docs/source/clusters-configuration-setup.rst diff --git a/docs/source/configuration.rst b/docs/source/clusters-configuration.rst similarity index 97% rename from docs/source/configuration.rst rename to docs/source/clusters-configuration.rst index 8363e224..644ee6c4 100644 --- a/docs/source/configuration.rst +++ b/docs/source/clusters-configuration.rst @@ -71,7 +71,7 @@ recommend using a configuration file like the following: account: my-account walltime: 00:30:00 -See :doc:`Configuration Examples ` for real-world examples. +See :doc:`Configuration Examples ` for real-world examples. If you place this in your ``~/.config/dask/`` directory then Dask-jobqueue will use these values by default. You can then construct a cluster object without diff --git a/docs/source/examples.rst b/docs/source/clusters-example-deployments.rst similarity index 100% rename from docs/source/examples.rst rename to docs/source/clusters-example-deployments.rst diff --git a/docs/source/howitworks.rst b/docs/source/clusters-howitworks.rst similarity index 100% rename from docs/source/howitworks.rst rename to docs/source/clusters-howitworks.rst diff --git a/docs/source/interactive.rst b/docs/source/clusters-interactive.rst similarity index 100% rename from docs/source/interactive.rst rename to docs/source/clusters-interactive.rst diff --git a/docs/source/clusters-overview.rst b/docs/source/clusters-overview.rst new file mode 100644 index 00000000..0c62aaa9 --- /dev/null +++ b/docs/source/clusters-overview.rst @@ -0,0 +1,43 @@ +Overview +======== + +The Dask-jobqueue project provides a convenient interface that is accessible from interactive systems like Jupyter notebooks, or batch jobs. + + +.. _example: + +Example +------- + +.. code-block:: python + + from dask_jobqueue import PBSCluster + cluster = PBSCluster() + cluster.scale(jobs=10) # Deploy ten single-node jobs + + from dask.distributed import Client + client = Client(cluster) # Connect this local process to remote workers + + # wait for jobs to arrive, depending on the queue, this may take some time + + import dask.array as da + x = ... # Dask commands now use these distributed resources + +.. raw:: html + + + +Adaptive Scaling +---------------- + +Dask jobqueue can also adapt the cluster size dynamically based on current +load. This helps to scale up the cluster when necessary but scale it down and +save resources when not actively computing. + +.. code-block:: python + + cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs + cluster.adapt(maximum_memory="10 TB") # or use core/memory limits diff --git a/docs/source/conf.py b/docs/source/conf.py index 27f8079c..16ae5e3c 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -72,7 +72,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = "en" # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/docs/source/develop.rst b/docs/source/develop.rst index c3dd5c78..5606264d 100644 --- a/docs/source/develop.rst +++ b/docs/source/develop.rst @@ -62,7 +62,7 @@ Testing without CI scripts You can also manually launch tests with dockerized jobs schedulers (without CI commands), for a better understanding of what is going on. -This is basically a simplified version of what is in the ci/*.sh files. +This is basically a simplified version of what is in the ``ci/*.sh`` files. For example with Slurm:: diff --git a/docs/source/index.rst b/docs/source/index.rst index 01c960b7..de13ce9b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -4,49 +4,87 @@ Dask-Jobqueue *Easily deploy Dask on job queuing systems like PBS, Slurm, MOAB, SGE, LSF, and HTCondor.* -The Dask-jobqueue project makes it easy to deploy Dask on common job queuing +The ``dask-jobqueue`` project makes it easy to deploy Dask on common job queuing systems typically found in high performance supercomputers, academic research -institutions, and other clusters. It provides a convenient interface that is -accessible from interactive systems like Jupyter notebooks, or batch jobs. +institutions, and other clusters. +There are two common deployment patterns for Dask on HPC, **Dynamic Clusters** and **Batch Runners**, and ``dask-jobqueue`` has support for both. -.. _example: +Dynamic Clusters +---------------- -Example -------- +A **Dynamic Cluster** is a Dask cluster where new workers can be added dynamically while the cluster is running. + +In an HPC environment this generally means that the Dask Scheduler is run in the same location as the client code, +usually on a single compute node. Then workers for the Dask cluster are submitted as additional jobs to the job +queue scheduler. + +This pattern works well on clusters where it is favourable to submit many small jobs. + + +.. code-block:: bash + + srun -n 1 dynamic_workload.py .. code-block:: python - from dask_jobqueue import PBSCluster - cluster = PBSCluster() - cluster.scale(jobs=10) # Deploy ten single-node jobs + # dynamic_workload.py + from dask_jobqueue.slurm import SLURMCluster + cluster = SLURMCluster() + cluster.adapt(minimum=1, maximum=10) # Tells Dask to call `srun -n 1 ...` when it needs new workers from dask.distributed import Client client = Client(cluster) # Connect this local process to remote workers - # wait for jobs to arrive, depending on the queue, this may take some time - import dask.array as da - x = ... # Dask commands now use these distributed resources + x = ... # Dask commands now use these distributed resources + +**Benefits** + +- Clusters can autoscale as a workload progresses. +- Small gaps in the HPC that would be otherwise unused can be backfilled. +- A workload can run slowly with a few workers during busy times and then scale up during quiet times. +- Workloads in intaractive environments can scale up and down as users run code manually. +- You don't need to wait for all nodes to be available before your workload starts, so jobs often start sooner. -.. raw:: html +To learn more see the `Dynamic Cluster documentation `_. - +Batch Runners +------------- -Adaptivity ----------- +A **Batch Runner** is a Dask cluster where the whole workload, including the client code, scheduler and workers +are submitted as a single allocation to the job queue scheduler. All of the processes within the workload coordinate +during startup and then work together to compute the Dask workload. -Dask jobqueue can also adapt the cluster size dynamically based on current -load. This helps to scale up the cluster when necessary but scale it down and -save resources when not actively computing. +This pattern works well where large jobs are prioritised and node locality is important. + +.. code-block:: bash + + srun -n 12 batch_workload.py .. code-block:: python - cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs - cluster.adapt(maximum_memory="10 TB") # or use core/memory limits + # batch_workload.py + from dask_jobqueue.slurm import SLURMRunner + cluster = SLURMRunner() # Boostraps all the processes into a client + scheduler + 10 workers + + # Only the client process will continue past this point + + from dask.distributed import Client + client = Client(cluster) # Connect this client process to remote workers + + import dask.array as da + x = ... # Dask commands now use these distributed resources + +**Benefits** + +- Workers are generally colocated physically on the machine, so communication is faster, expecially with `UCX `_. +- Submitting many small jobs can be frowned upon on some HPCs, submitting a single large job is more typical of other HPC workloads. +- All workers are guaranteed to be available when the job starts which can avoid oversubscribing workers. +- Clusters comprised of one large allocation tends to be more reliable than many small allocations. +- All processes have the same start and wall time. + +To learn more see the `Batch Runner documentation `_. More details ------------ @@ -58,20 +96,29 @@ A good entry point to know more about how to use ``dask-jobqueue`` is :caption: Getting Started install - interactive talks-and-tutorials - howitworks - configuration .. toctree:: :maxdepth: 1 - :caption: Detailed use + :caption: Dynamic Clusters + + clusters-overview + clusters-interactive + clusters-howitworks + clusters-configuration + clusters-configuration-setup + clusters-example-deployments + clusters-configuration-examples + clusters-advanced-tips-and-tricks + clusters-api + +.. toctree:: + :maxdepth: 1 + :caption: Batch Runners - configuration-setup - examples - configurations - advanced-tips-and-tricks - api + runners-overview + runners-implementing-new + runners-api .. toctree:: :maxdepth: 1 diff --git a/docs/source/runners-api.rst b/docs/source/runners-api.rst new file mode 100644 index 00000000..a6e86039 --- /dev/null +++ b/docs/source/runners-api.rst @@ -0,0 +1,12 @@ +.. _runners_api: + +.. currentmodule:: dask_jobqueue + + +Runners API +=========== + +.. autosummary:: + :toctree: generated/ + + slurm.SLURMRunner diff --git a/docs/source/runners-implementing-new.rst b/docs/source/runners-implementing-new.rst new file mode 100644 index 00000000..cbb88dca --- /dev/null +++ b/docs/source/runners-implementing-new.rst @@ -0,0 +1,124 @@ +Writing your own runners +======================== + +This document describes the design of the runners class and how to implement your own Dask runners. + +The core assumption in the design of the runner model is that the same script will be executed many times by a job scheduler. + +.. code-block:: text + + (mpirun|srun|qsub|etc) -n4 myscript.py + ├── [0] myscript.py + ├── [1] myscript.py + ├── [2] myscript.py + └── [3] myscript.py + +Within the script the runner class is created early on in the execution. + +.. code-block:: python + + from dask_jobqueue import SomeRunner + from dask.distributed import Client + + with SomeRunner(**kwargs) as runner: + with Client(runner) as client: + client.wait_for_workers(2) + # Do some Dask work + +This will result in multiple processes runnning on an HPC that are all instantiating the runner class. + +The processes need to coordinate to decide which process should run the Dask Scheduler, which should be Dask Workers +and which should continue running the rest of the client code within the script. This coordination happens during the +``__init__()`` of the runner class. + +The Scheduler and Worker processes exit after they complete to avoid running the client code multiple times. +This means that only one of the processes will continue past the ``__init__()`` of the runner class, the rest will +exit at that point after the work is done. + +Base class +---------- + +In ``dask_jobqueue.runners`` there is a ``BaseRunner`` class that can be used for implementing other runners. + +The minimum required to implement a new runner is the following methods. + +.. code-block:: python + + from dask_jobqueue.runner import BaseRunner + + class MyRunner(BaseRunner): + + async def get_role(self) -> str: + """Figure out whether I am a scheduler, worker or client. + + A common way to do this is by using a process ID. Many job queues give each process + a monotonic index that starts from zero. So we can assume proc 0 is the scheduler, proc 1 + is the client and any other procs are workers. + """ + ... + + async def get_scheduler_address(self) -> str: + """If I am not the scheduler discover the scheduler address. + + A common way to do this is to read a scheduler file from a shared filesystem. + + Alternatively if the scheduler process can broadcast it's address via something like MPI + we can define ``BaseRunner.set_scheduler_address()`` which will be called on the scheduler + and then recieve the broadcast in this method. + """ + ... + +The ``BaseRunner`` class handles starting up Dask once these methods have been implemented. +It also provides many stubbed out hooks to allow you to write code that runs before/after each component is created. +E.g ``BaseRunner.before_scheduler_start()``, ``BaseRunner.before_worker_start()`` and ``BaseRunner.before_client_start()``. + +The runner must know the address of the scheduler so that it can coordinate the clean shutdown of all processes when we +reach the end of the code (either via ``__exit__()`` or a finalizer). This communication happens independently of +any clients that may be created. + +Slurm implementation example +---------------------------- + +As a concrete example you can look at the Slurm implementation. + +In the ``get_role()`` method we use the ``SLURM_PROCID`` environment variable to infer the role. + +We also add a default scheduler option to set the ``scheduler_file="scheduler-{job_id}.json"`` and I look up the +Job ID from the ``SLURM_JOB_ID`` environment variable to ensource uniqueness. This effectively allows us to broadcast +the scheduler address via the shared filesystem. + +Then in the ``get_scheduler_address()`` method we wait for the scheduler file to exist and then open and read the +address from the scheduler file in the same way the ``dask.distributed.Client`` does. + +Here's a cut down example for demonstration purposes. + + +.. code-block:: python + + from dask_jobqueue.runner import BaseRunner + + class SLURMRunner(BaseRunner): + def __init__(self, *args, scheduler_file="scheduler.json", **kwargs): + # Get the current process ID from the environment + self.proc_id = int(os.environ["SLURM_PROCID"]) + + # Tell the scheduler and workers to use a scheduler file on the shared filesystem + self.scheduler_file = scheduler_file + options = {"scheduler_file": self.scheduler_file} + super().__init__(*args, worker_options=options, scheduler_options=options) + + async def get_role(self) -> str: + # Choose the role for this process based on the process ID + if self.proc_id == 0 and self.scheduler: + return Role.scheduler + elif self.proc_id == 1 and self.client: + return Role.client + else: + return Role.worker + + async def get_scheduler_address(self) -> str: + # Wait for the scheduler file to be created and read the address from it + while not self.scheduler_file or not self.scheduler_file.exists(): + await asyncio.sleep(0.2) + cfg = json.loads(self.scheduler_file.read_text()) + return cfg["address"] diff --git a/docs/source/runners-overview.rst b/docs/source/runners-overview.rst new file mode 100644 index 00000000..a542399a --- /dev/null +++ b/docs/source/runners-overview.rst @@ -0,0 +1,32 @@ +Overview +======== + +The batch runner classes are designed to make it simple to write Python scripts that will leverage multi-node jobs in an HPC. + +For example if we write a Python script for a Slurm based system and call it with ``srun -n 6 python myscript.py`` the script will be invoked by Slurm +6 times in parallel on 6 different nodes/cores on the HPC. The Dask Runner class then uses the Slurm process ID environment +variable to decide what role reach process should play and uses the shared filesystem to bootstrap communications with a scheduler file. + +.. code-block:: python + + # myscript.py + from dask.distributed import Client + from dask_jobqueue.slurm import SlurmRunner + + # When entering the SlurmRunner context manager processes will decide if they should be + # the client, schdeduler or a worker. + # Only process ID 1 executes the contents of the context manager. + # All other processes start the Dask components and then block here forever. + with SlurmRunner(scheduler_file="/path/to/shared/filesystem/scheduler-{job_id}.json") as runner: + + # The runner object contains the scheduler address info and can be used to construct a client. + with Client(runner) as client: + + # Wait for all the workers to be ready before continuing. + client.wait_for_workers(runner.n_workers) + + # Then we can submit some work to the Dask scheduler. + assert client.submit(lambda x: x + 1, 10).result() == 11 + assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21 + + # When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes. From ad199bde73385d881f4eb3c639b6102cc0c0d35d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 21 Aug 2024 15:04:37 +0100 Subject: [PATCH 14/15] Add redirects --- docs/requirements-docs.txt | 1 + docs/source/conf.py | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 8478b248..e79e74ae 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -2,6 +2,7 @@ distributed numpydoc ipython sphinx<5 +sphinx-reredirects dask-sphinx-theme>=3.0.0 # FIXME: This workaround is required until we have sphinx>=5, as enabled by # dask-sphinx-theme no longer pinning sphinx-book-theme==0.2.0. This is diff --git a/docs/source/conf.py b/docs/source/conf.py index 16ae5e3c..4a08afb8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -48,8 +48,21 @@ "sphinx.ext.autosummary", "sphinx.ext.extlinks", "numpydoc", + "sphinx_reredirects", ] +redirects = { + "interactive": "clusters-interactive.html", + "advanced-tips-and-tricks": "clusters-advanced-tips-and-tricks.html", + "configuration": "clusters-configuration.html", + "howitworks": "clusters-howitworks.html", + "api": "clusters-api.html", + "configuration-setup": "clusters-configuration-setup.html", + "interactive": "clusters-interactive.html", + "configurations": "clusters-configuration-examples.html", + "examples": "clusters-example-deployments.html", +} + autosummary_generate = True numpydoc_class_members_toctree = True From 312355edebd5a95ed5226b367dc87498bcbbc0be Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 21 Aug 2024 15:34:42 +0100 Subject: [PATCH 15/15] Fix links --- docs/source/index.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index de13ce9b..0b2b287b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -47,7 +47,7 @@ This pattern works well on clusters where it is favourable to submit many small - Workloads in intaractive environments can scale up and down as users run code manually. - You don't need to wait for all nodes to be available before your workload starts, so jobs often start sooner. -To learn more see the `Dynamic Cluster documentation `_. +To learn more see the `Dynamic Cluster documentation `_. Batch Runners ------------- @@ -84,7 +84,7 @@ This pattern works well where large jobs are prioritised and node locality is im - Clusters comprised of one large allocation tends to be more reliable than many small allocations. - All processes have the same start and wall time. -To learn more see the `Batch Runner documentation `_. +To learn more see the `Batch Runner documentation `_. More details ------------