From a11cb2b064665a5274a667ac1664768951e27ca6 Mon Sep 17 00:00:00 2001 From: Abel Aoun Date: Wed, 27 Dec 2023 10:50:41 +0100 Subject: [PATCH] DOC: Improve readability of dask doc --- doc/source/how_to/dask.rst | 417 ++++++++++++++++++++----------------- 1 file changed, 224 insertions(+), 193 deletions(-) diff --git a/doc/source/how_to/dask.rst b/doc/source/how_to/dask.rst index e07bcc36..02ba4b6e 100644 --- a/doc/source/how_to/dask.rst +++ b/doc/source/how_to/dask.rst @@ -10,43 +10,50 @@ You can configure dask to limit its memory footprint and CPU usage by instantiat dask.config options. A configuration working well for small to medium dataset and simple climate indices could be: ->>> import dask ->>> from distributed import Client ->>> client = Client(memory_limit="10GB", n_workers=1, threads_per_worker=8) ->>> dask.config.set({"array.slicing.split_large_chunks": False}) ->>> dask.config.set({"array.chunk-size": "100 MB"}) ->>> icclim.index(in_files="data.nc", indice_name="SU", out_file="output.nc") +.. code-block:: python + + import dask + from distributed import Client + client = Client(memory_limit="10GB", n_workers=1, threads_per_worker=8) + dask.config.set({"array.slicing.split_large_chunks": False}) + dask.config.set({"array.chunk-size": "100 MB"}) + icclim.index(in_files="data.nc", indice_name="SU", out_file="output.nc") ------------------------------------------------------------------------------------------------ -icclim uses xarray to manipulate data and xarray provides multiple backends to handle in-memory data. -By default, xarray uses numpy, this is overwritten in icclim to use dask whenever a path to a file is provided as input. -Numpy is fast and reliable for small dataset but may exhaust the memory on large dataset. -The other backend possibility of xarray is dask. dask can divide data into small chunk to minimize the memory footprint -of computations. This chunking also enable parallel computation by running the calculation on each chunk concurrently. -This parallelization can speed up the computation but because each parallel thread will need multiple chunks to be -in-memory at once, it can bring back memory issues that chunking was meant to avoid. +| icclim uses xarray to manipulate data and xarray provides multiple backends to handle in-memory data. +| By default, xarray uses numpy, this is overwritten in icclim to use dask whenever a path to a file is provided as input. +| Numpy is fast and reliable for small dataset but may exhaust the memory on large dataset. +| The other backend possibility of xarray is dask. dask can divide data into small chunk to minimize the memory footprint + of computations. This chunking also enable parallel computation by running the calculation on each chunk concurrently. +| This parallelization can speed up the computation but because each parallel thread will need multiple chunks to be + in-memory at once, it can bring memory issues that chunking was meant to avoid. -In this document we first explain some concepts around dask, parallelization and performances, then we propose multiple -dask configurations to be used with icclim. -You will also find a few dask pitfall dodging instructions which might help understanding why your computation doesn't run. -Each configuration aims to answer a specific scenario. For you own data, you will likely need to customize these -configurations to your needs. +| In this document we first explain some concepts around dask, parallelization and performances, then we propose multiple + dask configurations to be used with icclim. +| You will also find a few dask pitfall dodging instructions which might help understanding why your computation doesn't run. +| Each configuration aims to answer a specific scenario. For you own data, you will likely need to customize these + configurations to your needs. The art of chunking ------------------- -Dask proposes a way to divide large dataset into multiple smaller chunks that fit in memory. -This process is known as chunking and, with icclim there are 2 ways to control it. -First, you can open your dataset with xarray and do your own chunking: ->>> import xarray ->>> import icclim ->>> ds = xarray.open_dataset("data.nc") ->>> ds = ds.chunk({"time": 10, "lat": 20, "lon": 20}) +| Dask proposes a way to divide large dataset into multiple smaller chunks that fit in memory. +| This process is known as chunking and, with icclim there are 2 ways to control it. +| First, you can open your dataset with xarray and do your own chunking: + +.. code-block:: python + + import xarray + import icclim + ds = xarray.open_dataset("data.nc") + ds = ds.chunk({"time": 10, "lat": 20, "lon": 20}) And then use ``ds`` dataset object as input for icclim. ->>> icclim.index(in_files=ds, indice_name="SU", out_file="output.nc") +.. code-block:: python + + icclim.index(in_files=ds, indice_name="SU", out_file="output.nc") In that case, icclim will not re-chunk ``ds`` data. It is left to you to find the proper chunking. For more information on how to properly chunk see: @@ -58,9 +65,11 @@ Another option is to leave dask find the best chunking for each dimension. This is the recommended way and the default behavior of icclim when using file a path in `in_files` parameter. In this case, chunking can still be controlled by limiting the size of each individual chunk: ->>> import dask ->>> dask.config.set({"array.chunk-size": "50 MB"}) ->>> icclim.su(in_files="data.nc") +.. code-block:: python + + import dask + dask.config.set({"array.chunk-size": "50 MB"}) + icclim.su(in_files="data.nc") By default, the dask chunk-size is around 100MB. You can also use ``with`` python keyword if you don't want this configuration to spread globally. @@ -72,7 +81,9 @@ to their initial value but re-chunking is costly and we sometimes prefer to gene performances. If you wish to avoid this large chunking behavior, you can try the following dask configuration: ->>> dask.config.set({"array.slicing.split_large_chunks": True}) +.. code-block:: python + + dask.config.set({"array.slicing.split_large_chunks": True}) Create an optimized chunking on disk ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -104,23 +115,23 @@ In that case, you can use ``icclim.create_optimized_zarr_store`` to first create .. code-block:: python - import icclim - - ref_period = [datetime.datetime(1980, 1, 1), datetime.datetime(2009, 12, 31)] - with icclim.create_optimized_zarr_store( - in_files="netcdf_files/tas.nc", - var_names="tas", - target_zarr_store_name="opti.zarr", - keep_target_store=False, - chunking={"time": -1, "lat": "auto", "lon": "auto"}, - ) as opti_tas: - icclim.index( - index_name="TG90p", - in_files=opti_tas, - slice_mode="YS", - base_period_time_range=ref_period, - out_file="netcdf_files/output/tg90p.nc", - ) + import icclim + + ref_period = [datetime.datetime(1980, 1, 1), datetime.datetime(2009, 12, 31)] + with icclim.create_optimized_zarr_store( + in_files="netcdf_files/tas.nc", + var_names="tas", + target_zarr_store_name="opti.zarr", + keep_target_store=False, + chunking={"time": -1, "lat": "auto", "lon": "auto"}, + ) as opti_tas: + icclim.index( + index_name="TG90p", + in_files=opti_tas, + slice_mode="YS", + base_period_time_range=ref_period, + out_file="netcdf_files/output/tg90p.nc", + ) Actually this `chunking={"time": -1, "lat":"auto", "lon":"auto" }`, which avoid chunking on time is the default behavior of the function. `chunking` parameter could be omitted in the above example. @@ -140,39 +151,41 @@ because the smaller chunk are, the greater dask overhead is. By overhead, we mean here the necessary python code running to move around and handle each independent chunk. -Another important aspect of dask to consider for performances is the generated task graph. -Dask creates a graph of all the actions (tasks) it must accomplish to compute the calculation. -This graph, created before the computation, shows for each chunk the route to follow in order to compute the climat index. -This allows some nice optimizations, for example if some spatial or time selections are done within icclim/xclim, it -will only read and load in-memory the necessary data. -However, each task also adds some overhead and, most of the time a small graph will compute faster than a larger. +| Another important aspect of dask to consider for performances is the generated task graph. +| Dask creates a graph of all the actions (tasks) it must accomplish to compute the calculation. +| This graph, created before the computation, shows for each chunk the route to follow in order to compute the climat index. +| This allows some nice optimizations, for example if some spatial or time selections are done within icclim/xclim, it + will only read and load in-memory the necessary data. +| However, each task also adds some overhead and, most of the time a small graph will compute faster than a larger. -In this graph each chunk has it's own route of all the intermediary transformation it goes though. -The more there are chunks the more routes are created. -In extreme cases, when there are a lot of chunks, the graph may take eons to create and the computation may never start. -This means that configuring a small chunk size leads to a potentially large graph. +| In this graph each chunk has it's own route of all the intermediary transformation it goes though. + The more there are chunks the more routes are created. +| In extreme cases, when there are a lot of chunks, the graph may take eons to create and the computation may never start. +| This means that configuring a small chunk size leads to a potentially large graph. -The graph is also dependant of the actual calculation. A climate index like "SU" (count of days above 25°C) -will obviously create a much simpler graph than WSDI (longest spell of at least 6 consecutive days where maximum daily -temperature is above the 90th daily percentile). -Finally the resampling may also play a role in the graph complexity. In icclim we control it with ``slice_mode`` parameter. -A yearly slice_mode sampling result in a simpler graph than a monthly sampling. +| The graph is also dependant of the actual calculation. A climate index like "SU" (count of days above 25°C) + will obviously create a much simpler graph than WSDI (longest spell of at least 6 consecutive days where maximum daily +| temperature is above the 90th daily percentile). +| Finally the resampling may also play a role in the graph complexity. In icclim we control it with ``slice_mode`` parameter. +| A yearly slice_mode sampling result in a simpler graph than a monthly sampling. -Beside, when starting dask on a limited system (e.g a laptop) it's quite easy to exhaust all available memory. -In that case, dask has multiple safety mechanism and can even kill the computing process (a.k.a the worker) once it -reaches a memory limit (default is 95% of memory). -Even before this limit, the performances can deteriorate when dask measures a high memory use of a worker. -When a worker uses around 60% of its memory, dask will ask it to write to disk the intermediary results it has computed. -These i/o operations are much slower than in RAM manipulation, even on recent SSD disk. +| Beside, when starting dask on a limited system (e.g a laptop) it's quite easy to exhaust all available memory. +| In that case, dask has multiple safety mechanism and can even kill the computing process (a.k.a the worker) once it + reaches a memory limit (default is 95% of memory). +| Even before this limit, the performances can deteriorate when dask measures a high memory use of a worker. +| When a worker uses around 60% of its memory, dask will ask it to write to disk the intermediary results it has computed. +| These i/o operations are much slower than in RAM manipulation, even on recent SSD disk. Hence, there are multiple things to consider to maximize performances. -First, if your data (and the intermediary computation) fits in memory, it might be better to use Numpy backend directly. -To do so, simply provide the opened dataset to icclim: +| First, if your data (and the intermediary computation) fits in memory, it might be better to use Numpy backend directly. +| To do so, simply provide the opened dataset to icclim: ->>> ds = xarray.open_dataset("data.nc") ->>> icclim.index(in_files=ds, indice_name="SU", out_file="output.nc") +.. code-block:: python + + ds = xarray.open_dataset("data.nc") + icclim.index(in_files=ds, indice_name="SU", out_file="output.nc") There will be no parallelization but, on small dataset it's unnecessary. Numpy is natively really fast and dask overhead may slow it downs. @@ -193,18 +206,18 @@ This scheduler runs everything in the existing python process and will spawn mul to concurrently compute climate indices. You can find more information on the default scheduler here: https://docs.dask.org/en/stable/scheduling.html#local-threads -This can work on most cases for small to medium datasets and may yield the best performances. -However some percentiles based temperature indices (T_90p and T_10p families) may use a lot of memory even on medium datasets. -This memory footprint is caused by the bootstrapping of percentiles, an algorithm used to correct statistical biais. -This bootstrapping use a Monte Carlo simulation, which inherently use a lot of resources. -The longer the bootstrap period is, the more resources are necessary. The bootstrap period is the overlapping years -between the period where percentile are computed (a.k.a "in base") and the period where the climate index is computed -(a.k.a "out of base"). +| This can work on most cases for small to medium datasets and may yield the best performances. +| However some percentiles based temperature indices (T_90p and T_10p families) may use a lot of memory even on medium datasets. +| This memory footprint is caused by the bootstrapping of percentiles, an algorithm used to correct statistical biais. +| This bootstrapping use a Monte Carlo simulation, which inherently use a lot of resources. +| The longer the bootstrap period is, the more resources are necessary. The bootstrap period is the overlapping years + between the period where percentile are computed (a.k.a "in base") and the period where the climate index is computed + (a.k.a "out of base"). .. note:: - To control the "in base" period, ``icclim.index`` provides the ``base_period_time_range`` parameter. - To control the "out of base" period, ``icclim.index`` provides the ``time_range`` parameter. + * To control the "in base" period, ``icclim.index`` provides the ``base_period_time_range`` parameter. + * To control the "out of base" period, ``icclim.index`` provides the ``time_range`` parameter. For these percentile based indices, we recommend to use one of the following configuration. @@ -228,21 +241,26 @@ This ``Client`` object creates both a ``LocalCluster`` and a web application to This web dashboard is very powerful and helps to understand where are the computation bottlenecks as well as to visualize how dask is working. By default it runs on ``localhost:8787``, you can print the client object to see on which port it runs. ->>> from distributed import Client ->>> client = Client() ->>> print(client) +.. code-block:: python + + from distributed import Client + client = Client() + print(client) By default dask creates a ``LocalCluster`` with 1 worker (process), CPU count threads and a memory limit up to the system available memory. .. note:: - You can see how dask counts CPU here: https://github.com/dask/dask/blob/main/dask/system.py - How dask measures available memory here: https://github.com/dask/distributed/blob/main/distributed/worker.py#L4237 - Depending on your OS, these values are not exactly computed the same way. + + * You can see how dask counts CPU here: https://github.com/dask/dask/blob/main/dask/system.py + * How dask measures available memory here: https://github.com/dask/distributed/blob/main/distributed/worker.py#L4237 + * Depending on your OS, these values are not exactly computed the same way. The cluster can be configured directly through Client arguments. ->>> client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=8) +.. code-block:: python + + client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=8) A few notes: @@ -265,14 +283,18 @@ You should configure your local cluster to use not too many threads and processe each process (worker) has available. On my 4 cores, 16GB of RAM laptop I would consider: ->>> client = Client(memory_limit="10GB", n_workers=1, threads_per_worker=4) +.. code-block:: python + + client = Client(memory_limit="10GB", n_workers=1, threads_per_worker=4) Eventually, to reduce the amount of i/o on disk we can also increase dask memory thresholds: ->>> dask.config.set({"distributed.worker.memory.target": "0.8"}) ->>> dask.config.set({"distributed.worker.memory.spill": "0.9"}) ->>> dask.config.set({"distributed.worker.memory.pause": "0.95"}) ->>> dask.config.set({"distributed.worker.memory.terminate": "0.98"}) +.. code-block:: python + + dask.config.set({"distributed.worker.memory.target": "0.8"}) + dask.config.set({"distributed.worker.memory.spill": "0.9"}) + dask.config.set({"distributed.worker.memory.pause": "0.95"}) + dask.config.set({"distributed.worker.memory.terminate": "0.98"}) These thresholds are fractions of memory_limit used by dask to take a decision. @@ -292,7 +314,9 @@ If you want to have the result as quickly as possible it's a good idea to give d This may render your computer "laggy" thought. On my 4 cores (8 CPU threads), 16GB of RAM laptop I would consider: ->>> client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=8) +.. code-block:: python + + client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=8) On this kind of configuration, it can be useful to add 1 or 2 workers in case a lot of i/o is necessary. If there are multiple workers ``memory_limit`` should be reduced accordingly. @@ -309,7 +333,9 @@ probably much slower. And if you run out of swap, your computer will likely crash. To roll the dices use the following configuration ``memory_limit='0'`` in : ->>> client = Client(memory_limit="0") +.. code-block:: python + + client = Client(memory_limit="0") Dask will spawn a worker with multiple threads without any memory limits. @@ -335,13 +361,15 @@ Real example On CMIP6 data, when computing the percentile based indices Tx90p for 20 years and, bootstrapping on 19 years we use: ->>> client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=2) ->>> dask.config.set({"array.slicing.split_large_chunks": False}) ->>> dask.config.set({"array.chunk-size": "100 MB"}) ->>> dask.config.set({"distributed.worker.memory.target": "0.8"}) ->>> dask.config.set({"distributed.worker.memory.spill": "0.9"}) ->>> dask.config.set({"distributed.worker.memory.pause": "0.95"}) ->>> dask.config.set({"distributed.worker.memory.terminate": "0.98"}) +.. code-block:: python + + client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=2) + dask.config.set({"array.slicing.split_large_chunks": False}) + dask.config.set({"array.chunk-size": "100 MB"}) + dask.config.set({"distributed.worker.memory.target": "0.8"}) + dask.config.set({"distributed.worker.memory.spill": "0.9"}) + dask.config.set({"distributed.worker.memory.pause": "0.95"}) + dask.config.set({"distributed.worker.memory.terminate": "0.98"}) Troubleshooting and dashboard analysis @@ -363,117 +391,120 @@ Keep in mind that: * You can reduce memory footprint by using smaller chunks. * Each thread may load multiple chunks in memory at once. -To solve this issue, you must either increase available memory per worker or reduce the quantity of memory used by the computation. -You can increase memory_limit up to your physical memory available (RAM) with ``Client(memory_limit="16GB")``. -This increase can also speed up computation by reducing writes and reads on disk. -You can reduce the number of concurrently running threads (and workers) in the distributed Client configuration with -``Client(n_workers=1, threads_per_worker=1)``. This may slow down computation. -You can reduce the size of each chunk with ``dask.config.set({"array.chunk-size": "50 MB"})``, default is around 100MB. -This may slow down computation as well. -Or you can combine the three solutions above. -You can read more on this issue here: http://distributed.dask.org/en/stable/killed.html +| To solve this issue, you must either increase available memory per worker or reduce the quantity of memory used by the computation. +| You can increase memory_limit up to your physical memory available (RAM) with ``Client(memory_limit="16GB")``. +| This increase can also speed up computation by reducing writes and reads on disk. +| You can reduce the number of concurrently running threads (and workers) in the distributed Client configuration with +| ``Client(n_workers=1, threads_per_worker=1)``. This may slow down computation. +| You can reduce the size of each chunk with ``dask.config.set({"array.chunk-size": "50 MB"})``, default is around 100MB. +| This may slow down computation as well. +| Or you can combine the three solutions above. +| You can read more on this issue here: http://distributed.dask.org/en/stable/killed.html Garbage collection "wasting" CPU time ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The warning would be: ``distributed.utils_perf - WARNING - full garbage collections took xx% CPU time recently (threshold: 10%)`` -This is usually accompanied by: ``distributed.worker - WARNING - gc.collect() took 1.259s. This is usually a sign that some tasks handle too many Python objects at the same time. Rechunking the work into smaller tasks might help.`` -Python runs on a virtual machine (VM) which handles the memory allocations for us. -This means the VM sometimes needs to cleanup garbage objects that aren't referenced anymore. -This operation takes some CPU resource but free the RAM for other uses. -In our dask context, the warning may be raised when icclim/xclim has created large chunks which takes longer to be -garbage collected. -This warning means some CPU is wasted but the computation is still running. -It might help to re-chunk into smaller chunk. +| The warning would be: ``distributed.utils_perf - WARNING - full garbage collections took xx% CPU time recently (threshold: 10%)`` +| This is usually accompanied by: ``distributed.worker - WARNING - gc.collect() took 1.259s. This is usually a sign that some tasks handle too many Python objects at the same time. Rechunking the work into smaller tasks might help.`` +| Python runs on a virtual machine (VM) which handles the memory allocations for us. +| This means the VM sometimes needs to cleanup garbage objects that aren't referenced anymore. +| This operation takes some CPU resource but free the RAM for other uses. +| In our dask context, the warning may be raised when icclim/xclim has created large chunks which takes longer to be + garbage collected. +| This warning means some CPU is wasted but the computation is still running. + It might help to re-chunk into smaller chunk. Internal re-chunking ~~~~~~~~~~~~~~~~~~~~ -The warning would be: ``PerformanceWarning: Increasing number of chunks by factor of xx``. -This warning is usually raised when computing percentiles. -In percentiles calculation step, the intermediary data generated to compute percentiles is much larger than the initial data. -First, because of the rolling window used to retrieve all values of each day, the analysed data is multiplied by -window size (usually by 5). -Then, on temperatures indices such as Tx90p, we compute percentiles for each day of year (doy). -This means we must read almost all chunks of time dimension. -To avoid consuming all RAM at once with these, icclim/xclim internally re-chunk data that's why dask warns -that many chunks are being created. -In that case this warning can be ignored. - -Computation never start -~~~~~~~~~~~~~~~~~~~~~~~ -The error raised can be ``CancelledError``. -We can also acknowledge this by looking at dask dashboard and not seeing any task being schedule. -This usually means dask graph is too big and the scheduler has trouble creating it. -If your memory allows it, you can try to increase the chunk-size with ``dask.config.set({"array.chunk-size": "200 MB"})``. -This will reduce the amount of task created on dask graph. -To compensate, you may need to reduce the number of running threads with ``Client(n_workers=1, threads_per_worker=2)``. -This should help limit the memory footprint of the computation. - -.. Note:: +| The warning would be: ``PerformanceWarning: Increasing number of chunks by factor of xx``. +| This warning is usually raised when computing percentiles. +| In percentiles calculation step, the intermediary data generated to compute percentiles is much larger than the initial data. +| First, because of the rolling window used to retrieve all values of each day, the analysed data is multiplied by + window size (usually by 5). +| Then, on temperatures indices such as Tx90p, we compute percentiles for each day of year (doy). +| This means we must read almost all chunks of time dimension. +| To avoid consuming all RAM at once with these, icclim/xclim internally re-chunk data that's why dask warns + that many chunks are being created. +| In that case this warning can be ignored. + +Computation never starts +~~~~~~~~~~~~~~~~~~~~~~~~ +| The error raised can be ``CancelledError``. +| We can also acknowledge this by looking at dask dashboard and not seeing any task being schedule. +| This usually means dask graph is too big and the scheduler has trouble creating it. +| If your memory allows it, you can try to increase the chunk-size with ``dask.config.set({"array.chunk-size": "200 MB"})``. +| This will reduce the amount of task created on dask graph. +| To compensate, you may need to reduce the number of running threads with ``Client(n_workers=1, threads_per_worker=2)``. +| This should help limit the memory footprint of the computation. + +.. note:: Beware, if the computation is fast or if the client is not started in the same python process as icclim, the dashboard may also look empty but the computation is actually running. Disk read and write analysis - Dashboard ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -When poorly configured, the computation can spend most of its CPU time reading and writing chunks on disk. -You can visualize this case by opening dask dashboard, usually on ``localhost:8787``. -In the status page, you can see in the right panel each task dynamically being added. -In these the colourful boxes, each color represent a specific task. -I/O on disk is displayed as orange transparent boxes. You should also see all other threads of the same worker -stopping when one thread is reading or writing on disk. -If there is a lot of i/o you may need to reconfigure dask. -The solution to this are similar to the memory overload described above. -You can increase total available memory with ``Client(memory_limit="16GB")``. -You can decrease memory pressure by reducing chunk size with ``dask.config.set({"array.chunk-size": "50 MB"})`` or -by reducing number of threads with ``Client(n_workers=1, threads_per_worker=2)``. -Beside, you can also benefit from using multiple worker in this case. -Each worker is a separate non blocking process thus they are not locking each other when one of them need to write or -read on disk. They are however slower than threads to share memory, this can result in the "chatterbox" issue presented -below. - -.. Note:: - - - Don't instantiate multiple client with different configurations, put everything in the same Client constructor call. - - Beware, as of icclim 5.0.0, the bootstrapping of percentiles is known to produce **a lot** of i/o. +| When poorly configured, the computation can spend most of its CPU time reading and writing chunks on disk. +| You can visualize this case by opening dask dashboard, usually on ``localhost:8787``. +| In the status page, you can see in the right panel each task dynamically being added. +| In these the colourful boxes, each color represent a specific task. +| I/O on disk is displayed as orange transparent boxes. You should also see all other threads of the same worker + stopping when one thread is reading or writing on disk. +| If there is a lot of i/o you may need to reconfigure dask. +| The solution to this are similar to the memory overload described above: + + * You can increase total available memory with ``Client(memory_limit="16GB")``. + * You can decrease memory pressure by reducing chunk size with ``dask.config.set({"array.chunk-size": "50 MB"})`` or + by reducing number of threads with ``Client(n_workers=1, threads_per_worker=2)``. + +| Beside, you can also benefit from using multiple worker in this case. +| Each worker is a separate non blocking process thus they are not locking each other when one of them need to write or + read on disk. They are however slower than threads to share memory, this can result in the "chatterbox" issue presented + below. + +.. note:: + + * Don't instantiate multiple client with different configurations, put everything in the same Client constructor call. + * Beware, as of icclim 5.0.0, the bootstrapping of percentiles is known to produce **a lot** of i/o. Worker chatterbox syndrome - Dashboard ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -In all this document, we mainly recommend to use a single worker with multiple threads. -Most of the code icclim runs is relying on dask and numpy, and both release the python GLI (More details on GIL here: https://realpython.com/python-gil/). -This means we can benefit from multi threading and that's why we usually recommend to use a single process (worker) with -multiple threads. -However, some configuration can benefit from spawning multiple processes (workers). -In dask dashboard, you will see red transparent boxes representing the workers communicating between each other. -If you see a lot of these and if they do not overlap much with other tasks, it means the workers are -spending most of their CPU times exchanging data. -This can be caused by either: - -# Too many workers are spawned for the amount of work. -# The load balancer has a lot of work to do. - -In the first case, the solution is simply to reduce the number of workers and eventually to increase the number of -threads per worker. -For the second case, when a worker has been given too many task to do, the load balancer is charged of -redistributing these task to other worker. It can happen when some task take significant time to be processed. -In icclim/xclim this is for example the case of the ``cal_perc`` function used to compute percentiles. -There is no easy solution for this case, letting the load balancer do its job seems necessary. +| In all this document, we mainly recommend to use a single worker with multiple threads. +| Most of the code icclim runs is relying on dask and numpy, and both release the python GLI (More details on GIL here: https://realpython.com/python-gil/). +| This means we can benefit from multi threading and that's why we usually recommend to use a single process (worker) with + multiple threads. +| However, some configuration can benefit from spawning multiple processes (workers). +| In dask dashboard, you will see red transparent boxes representing the workers communicating between each other. +| If you see a lot of these and if they do not overlap much with other tasks, it means the workers are + spending most of their CPU times exchanging data. +| This can be caused by either: + + * Too many workers are spawned for the amount of work. + * The load balancer has a lot of work to do. + +| In the first case, the solution is simply to reduce the number of workers and eventually to increase the number of + threads per worker. +| For the second case, when a worker has been given too many task to do, the load balancer is charged of + redistributing these task to other worker. It can happen when some task take significant time to be processed. +| In icclim/xclim this is for example the case of the ``cal_perc`` function used to compute percentiles. +| There is no easy solution for this case, letting the load balancer do its job seems necessary. Idle threads ~~~~~~~~~~~~ -When looking at dask dashboard, the task timelines should be full of colors. -If you see a lot of emptiness between colored boxes, it means the threads are not doing anything. -It could be caused by a blocking operation in progress (e.g i/o on disk). -Read `Disk read and write analysis - Dashboard`_ above in that case. -It could also be because you have set too many threads and the work cannot be properly divided between each thread. -In that case, you can simply reduce the number of thread in Client configuration with ``Client(n_workers=1, threads_per_worker=4)``. +| When looking at dask dashboard, the task timelines should be full of colors. +| If you see a lot of emptiness between colored boxes, it means the threads are not doing anything. +| It could be caused by a blocking operation in progress (e.g i/o on disk). + Read `Disk read and write analysis - Dashboard`_ above in that case. +| It could also be because you have set too many threads and the work cannot be properly divided between each thread. +| In that case, you can simply reduce the number of thread in Client configuration with ``Client(n_workers=1, threads_per_worker=4)``. Conclusion ---------- -We can't provide a single configuration which fits all possible datasets and climate indices. -In this document we tried to summarize the few configurations we found useful while developing icclim. -You still need to tailor these configuration to your own needs. +| We can't provide a single configuration which fits all possible datasets and climate indices. +| In this document we tried to summarize the few configurations we found useful while developing icclim. +| You still need to tailor these configurations to your own needs. + +.. note:: -.. Note:: - This document has been rewritten in january 2022 and the whole stack under icclim is evolving rapidly. - Some information presented here might become outdated quickly. + * This document has been rewritten in january 2022 and the whole stack under icclim is evolving rapidly. + * Some information presented here might become outdated quickly.