diff --git a/docs/staged_changes.rst b/docs/staged_changes.rst new file mode 100644 index 00000000..7df34f90 --- /dev/null +++ b/docs/staged_changes.rst @@ -0,0 +1,428 @@ +Staging changes in memory +========================= +This section explains the low level design of ``InMemoryDataset``. + +When a ``InMemoryDataset`` is modified in any way, all changed chunks are held in memory +until they are ready to be committed to disk. To do so, ``InMemoryDataset`` wraps around +a ``StagedChangesArray`` object, which is a numpy.ndarray-like object. + +The ``StagedChangesArray`` holds its data in *slabs*, which is a list of array-like +objects built as the concatenation of chunks along axis 0, typically with shape +``(n*chunk_size[0], *chunk_size[1:])``, where n is the number of chunks on the slab. +This is the same layout as the ``raw_data`` dataset on disk. + +The list of slabs is made of three parts: + +- The *full slab* is a special slab that is always present and contains exactly one + read-only chunk, a broadcasted numpy array full of the fill_value. + It's always at index 0. +- The *base slabs* are array-likes that are treated as read-only. At the moment of + writing, when the ``InMemoryDataset`` creates the ``StagedChangesArray``, it passes to + it only one base slab, that is the ``raw_data`` ``h5py.Dataset`` - which, + conveniently, is a numpy-like object. This slab is typically found at index 1, but may + be missing for a dataset that's completely empty. +- The *staged slabs* are writeable numpy arrays that are created automatically by + the ``StagedChangesArray`` whenever there is need to modify a chunk that lies on + either the full slab or a base slab. + +Two numpy arrays of metadata are used to keep track of the chunks: + +- ``slab_indices`` is an array of integers, with the same dimensionality as the virtual + array represented by the ``StagedChangesArray`` and one point per chunk, which + contains the index of the slab in the ``slabs`` list that contains the chunk. +- ``slab_offsets`` is an array of identical shape to ``slab_indices`` that contains the + offset of the chunk within the slab along axis 0. + +e.g.:: + + chunk_size[0] = 10 + + slab_indices slab_offsets slabs[1] slabs[2] + (0 = fill_value) + + 1 1 0 30 10 0 0:10 (unreferenced) 0:10 (unreferenced) + 0 2 0 0 20 0 10:20 10:20 + 0 2 0 0 10 0 20:30 (unreferenced) 20:30 + 30:40 + + virtual array (* = chunk completely covered in fill_value) + + slabs[1][30:40] slabs[1][10:20] slabs[0][0:10]* + slabs[0][ 0:10]* slabs[2][20:30] slabs[0][0:10]* + slabs[0][ 0:10]* slabs[2][10:20] slabs[0][0:10]* + +In order to be performant, operations are chained together in a way that minimizes +Python interaction and is heavily optimized with Cython. To this extent, each user +request (``__getitem__``, ``__setitem__``, ``resize()``, or ``load()``) is broken down +into a series of slice pairs, each covering at most one chunk, that are encoded +in one numpy array per pair of slabs (source and destination) involved in the transfer. +Each slice pair consists of + +- an n-dimensional *source start* index, e.g. the coordinates of the top-left corner to + read from the source array-like; +- an n-dimensional *destination start* index, e.g. the coordinates of the top-left + corner to write to in the destination array; +- an n-dimensional *count*, e.g. the number of points to read from the source array-like + and write to the destination array (they always match, so there's only one count); +- an n-dimensional *source stride*, a.k.a. step, with 1 meaning contiguous (not to be + confused with numpy's strides, which is the number of bytes between points along an + axis); +- an n-dimensional *destination stride*. + +Those familiar with the HDF5 C API may have recognized that this is a direct mapping to +the ``start``, ``stride``, and ``count`` parameters of the two `H5Sselect_hyperslab`_ +function calls - one for the source and one for the destination spaces - that the +library needs to prepend to a `H5Dread`_ call. + +The ``StagedChangesArray`` is completely agnostic of the underlying storage - anything +will work as long as it's got a basic numpy-like API. Once the data is ready to be +transferred between two slabs, the ``StagedChangesArray`` calls the +``read_many_slices()`` function, which identifies if the source slab is a +``h5py.Dataset`` or a numpy array and calls two different implementations to execute the +transfer - in the case of ``h5py.Dataset``, a for loop in C, directly to the underlying +HDF5 C library, of `H5Sselect_hyperslab`_ (source dataset) ➞ +`H5Sselect_hyperslab`_ (destination numpy array) ➞ `H5Dread`_. + +The source array-like can be either: + +- a base slab (the ``raw_data`` ``h5py.Dataset``); all source start indices along + axis 0 need to be shifted by the value indicated in ``slab_offsets``; +- a staged slab (a numpy array in memory), again shifted by ``slab_offsets``; +- the full slab (a broadcasted numpy array); +- the value parameter of the ``__setitem__`` method. + +The destination array (always an actual ``numpy.ndarray``) can be either: + +- a staged slab, shifted by ``slab_offsets``; +- the return value of the ``__getitem__`` method, which is created empty at the + beginning of the method call and then progressively filled slice by slice. + + +Plans +----- +To encapsulate the complex decision-making logic of the ``StagedChangesArray`` methods, +the actual methods of the class are designed as a fairly dumb wrappers which + +1. create a ``*Plan`` class with all the information needed to execute the operation + (``GetItemPlan`` for ``__getitem__()``, ``SetItemPlan`` for ``__setitem__()``, etc.); +2. consume the plan, implementing its decisions - chiefly by calling the + ``read_many_slices()`` function for each pair of slabs involved in the transfer; +3. discard the plan. + +The ``*Plan`` classes are agnostic to data types, never access the actual data (slabs, +``__getitem__`` return value, or ``__setitem__`` value parameter) and exclusively deal +in shapes, chunks, and indices. + +For debugging purposes, these classes can be generated without executing the method that +consumes them by calling the ``StagedChangesArray._*_plan()`` methods; this allows +pretty-printing the list of their instructions e.g. in a Jupyter notebook. + +For example, in order to debug what will happen when you call ``dset[2:5, 3:6] = 42``, +where ``dset`` is a staged versioned_hdf5 dataset, you can run:: + + >>> dset.dataset.staged_changes._setitem_plan((slice(2, 5), slice(3, 6))) + + SetItemPlan + slabs.append(np.empty((6, 2))) # slabs[2] + slabs.append(np.empty((2, 2))) # slabs[3] + # 3 transfers from slabs[1] to slabs[2] + slabs[2][0:2, 0:2] = slabs[1][10:12, 0:2] + slabs[2][2:4, 0:2] = slabs[1][18:20, 0:2] + slabs[2][4:6, 0:2] = slabs[1][20:22, 0:2] + # 1 transfers from value to slabs[3] + slabs[3][0:2, 0:2] = value[0:2, 1:3] + # 3 transfers from value to slabs[2] + slabs[2][0:2, 1:2] = value[0:2, 0:1] + slabs[2][2:3, 1:2] = value[2:3, 0:1] + slabs[2][4:5, 0:2] = value[2:3, 1:3] + slab_indices: + [[1 1 1 1] + [1 2 3 1] + [1 2 2 1] + [1 1 1 1]] + slab_offsets: + [[ 0 2 4 6] + [ 8 0 0 14] + [16 2 4 22] + [24 26 28 30]] + + +General plans algorithm +----------------------- +All plans share a similar workflow: + +1. Preprocess the index, passed by the user as a parameter to ``__getitem__`` and + ``__setitem__``, into a list of ``IndexChunkMapper`` objects (one per axis). + +2. Query the ``IndexChunkMapper``'s to convert the index of points provided by the user + to an index of chunks along each axis, then use the indices of chunks to slice the + ``slab_indices`` and ``slab_offsets`` arrays to obtain the metadata of only the + chunks that are impacted by the selection. + +3. Further refine the above selection on a chunk-by-chunk basis using a mask, depending + on the value of the matching point of ``slab_indices``. Different masking functions, + which depend on the specific use case, select/deselect the full slab, the base slabs, + or the staged slabs. For example, the ``load()`` method - which ensures that + everything is loaded into memory - will only select the chunks that lie on the base + slabs. + +4. You now have three aligned flattened lists: + + - n-dimensional chunk indices that were selected both at step 2 and 3; + - the corresponding point of ``slab_indices``, and + - the corresponding point of ``slab_offsets``. + +5. Sort by ``slab_indices`` and partition along them. This is to break the rest of the + algorithm into separate calls to ``read_many_slices()``, one per pair of source and + destination slabs. Note that a transfer operation is always from N slabs to 1 slab + or to the ``__getitem__`` return value, of from 1 slab or the ``__setitem__`` value + parameter to N slabs, and that ``slab_indices`` can mean either source or destination + depending on context. + +6. For each *(chunk index, slab index, slab offset)* triplet from the above lists, query + the ``IndexChunkMapper``'s again, independently for each axis, to convert the global + n-dimensional index of points that was originally provided by the user to a local + index that only impacts the chunk. For each axis, this will return: + + - exactly one 1-dimensional slice pair, in case of basic indices (scalars of slices); + - one or more 1-dimensional slice pairs, in case of advanced indices (arrays of + indices or arrays of bools). + +7. Put the list of 1-dimensional slices in pseudo-cartesian product to produce a list of + n-dimensional slices, one for each point impacted by the selection. + It is pseudo-cartesian because at step 3 we have been cherry-picking points in the + hyperspace of chunks; if we hadn't done that, only limiting ourselves to the + selection along each axis at step 2, it would be a true cartesian product. + +8. If the destination array is a new slab, update ``slab_indices`` and ``slab_offsets`` + to reflect the new position of the chunks. + +9. Feed the list of n-dimensional slices to the ``read_many_slices()`` function, which + will actually transfer the data. + +10. Go back to step 6 and repeat for the next pair of source and destination + arrays/slabs. + + +``__getitem__`` algorithm +------------------------- +``GetItemPlan`` is one of the simplest plans once you have encapsulated the general +algorithm described at the previous paragraphs. +It makes no distinction between full, base, or staged slabs and there is no per-chunk +masking. It figures out the shape of the return value, creates it with ``numpy.empty``, +and then transfers from each slab into it. + +**There is no cache on read**: calling the same index twice will result in two separate +reads to the base slabs, which typically translates to two calls to +``h5py.Dataset.__getitem__`` and two disk accesses. However, note that the HDF5 C +library does perform *some* caching of its own. + +For this reason, this method never modifies the state of the ``StagedChangesArray``. + + +``__setitem__`` algorithm +------------------------- +``SetItemPlan`` is substantially more complex than ``GetItemPlan`` because it needs to +handle the following use cases: + +1. The index *completely* covers a chunk that lies either on the full slab or on a base + slab. The chunk must be replaced with a brand new one in a new staged slab, which is + filled with a copy of the contents of the ``__setitem__`` value parameter. + ``slab_indices`` and ``slab_offsets`` are updated to reflect the new position of the + chunk on a staged slab. The original full or base slab is never accessed. +2. The index *partially* covers a chunk that lies on the full slab or on a base slab. + The chunk is first copied over from the full or base slab to a brand new staged slab, + which is then updated with the contents of the ``__setitem__`` value parameter. + ``slab_indices`` and ``slab_offsets`` are updated to reflect the new position of the + chunk on a staged slab. +3. The index covers a chunk that is already lying on a staged slab. The slab is + updated in place; ``slab_indices`` and ``slab_offsets`` are not modified. + +To help handle the first two use cases, the ``IndexChunkMapper``'s have the concept of +*selected chunks*, which are chunks that contain at least one point of the index along +one axis, and *whole chunks*, which are chunks where *all* points of the chunk are +covered by the index along one axis. + +Moving to the n-dimensional space, + +- a chunk is selected when it's caught by the intersection of the selected chunk indices + along all axes; +- a chunk is *wholly* selected when it's caught by the intersection of the whole chunk + indices along all axes; +- a chunk is *partially* selected if it's selected along all axes, but not wholly + selected along at least one axis. + +**Example** + +>>> arr.shape +(30, 50) +>> arr.chunk_size +(10, 10) +>>> arr[5:20, 30:] = 42 + +The above example partially selects chunks (0, 3) and (0, 4) and wholly selects chunks +(1, 3) and (1, 4):: + + 01234 + 0 ...pp + 1 ...ww + 2 ..... + +The ``SetItemPlan`` thus runs the general algorithm twice: + +1. With a mask that picks the chunks that lie either on full of base slabs, intersected + with the mask of partially selected chunks. These chunks are moved to the staged + slabs. +2. Without any mask, as now all chunks lie on staged slabs. These chunks are copied from + the ``__setitem__`` value parameter. + + +``resize()`` algorithm +---------------------- + +``ResizePlan`` iterates along all axes and resizes the array independently for each axis +that changed shape. This typically causes the ``slab_indices`` and ``slab_offsets`` +arrays to change shape too. + +Special attention needs to be paid to *edge chunks*, that is the last row or column of +chunks along one axis, which may not be exactly divisible by the ``chunk_size`` before +and/or after the resize operation. + +Shrinking is trivial: if an edge chunk needs to be reduced in size along one or more +axes, it doesn't need to be actually modified on the slabs. The ``StagedChangesArray`` +simply knows that, from this moment on, everything beyond the edge of the chunk on the +slab is to be treated as uninitialised memory. + +Creating brand new chunks when enlarging is also trivial, as they are simply filled with +0 on both ``slab_indices`` and ``slab_offsets`` to represent that they lie on the full +slab. They won't exist on the staged slabs until someone writes to them with +``__setitem__``. + +Enlarging edge chunks that don't lie on the full slab is more involved, as they need to +be physically filled with the fill_value: + +1. If a chunk lies on a base slab, it first needs to be transferred over to a staged + slab, which is created brand new for the occasion; +2. then, there is a transfer from the full slab to the staged slab for the extra area + that needs to be filled with the fill_value. + + +``load()`` algorithm +-------------------- + +``LoadPlan`` ensures that all chunks are either on the full slab or on a staged slab. It +selects all chunks in that lie on a base slab and transfers them to a brand new staged +slab. + + +Reclaiming memory +----------------- +Each plan that mutates the state - ``SetItemPlan``, ``ResizePlan``, and ``LoadPlan`` - +has a chance of not needing a chunk anymore on a particular slab, either because that +chunk does not exist anymore (``resize()`` to shrink the shape) or because it's been +moved from a base slab to a staged slab (``__setitem__``, ``resize()``, or ``load()``). + +When a chunk leaves a slab, it leaves an empty area in the old slab. This is normal and +fine when the slab is disk-backed (the ``raw_data`` ``h5py.Datataset`` that serves as a +base slab), but results in memory fragmentation and potentially a perceived "memory +leak" from the final user when the slab is in memory (a staged slab). For the sake of +simplicity, the surface is never reused; later operations just create new slabs. + +In practice, fragmentation should not be a problem, as it only happens if someone +updates a chunk with ``__setitem__`` and later drops that very same chunk with +``resize()`` - which is obviously wasteful so it should not be part of a typical +workflow. Additionally, slabs are cleaned up as soon as the staged version is committed. + +If a slab is completely empty, however - in other words, it no longer appears in +``slab_indices`` - it *may* be dropped. This is guaranteed to happen for staged slabs +and *may* happen for base slabs too (if computationally cheap to determine). Note that +nothing particular happens today when the ``raw_data`` base slab, which is a hdf5 +dataset,is deferenced by the ``StagedChangesArray``. + +When a slab is dropped, it is replaced by None in the ``slabs`` list, which dereferences +it. This allows not to change all the following slab indices after the operation. +The full slab is never dropped, as it may be needed later by ``resize()`` to create new +chunks or partially fill existing edge chunks. + + +API interaction +--------------- +.. graphviz:: + + digraph { + node [shape=box]; + + user [shape=ellipse; label="Final user"] + + subgraph cluster_0 { + label="wrappers.py"; + + DatasetWrapper -> InMemoryDataset; + DatasetWrapper -> InMemorySparseDataset; + DatasetWrapper -> InMemoryArrayDataset; + } + + subgraph cluster_1 { + label="staged_changes.py"; + + StagedChangesArray; + Plans [ + label="GetItemPlan\nSetItemPlan\nResizePlan\nLoadPlan\nChangesPlan"; + ] + + StagedChangesArray -> Plans -> TransferPlan; + } + + subgraph cluster_2 { + label="subchunk_map.py"; + + read_many_slices_param_nd -> IndexChunkMapper; + } + + subgraph cluster_3 { + label="slicetools.pyx"; + + build_slab_indices_and_offsets; + read_many_slices; + hdf5_c [label="libhdf5 C API (via Cython)"]; + build_slab_indices_and_offsets -> hdf5_c; + read_many_slices -> hdf5_c; + } + + subgraph cluster_4 { + label="versions.py"; + commit_version; + } + + user -> DatasetWrapper; + + InMemoryDataset -> StagedChangesArray; + InMemoryDataset -> build_slab_indices_and_offsets; + InMemorySparseDataset -> StagedChangesArray; + Plans -> IndexChunkMapper; + TransferPlan -> read_many_slices; + TransferPlan -> read_many_slices_param_nd; + + InMemorySparseDataset -> commit_version; + InMemoryArrayDataset -> commit_version; + InMemoryDataset -> commit_version; + + h5py; + commit_version -> h5py; + hdf5_file [label="HDF5 file"; shape=cylinder]; + h5py -> hdf5_file; + hdf5_c -> hdf5_file; + } + +**Notes** + +- ``read_many_slices_param_nd`` has API awareness of ``read_many_slices`` to craft its + input parameters, but no API integration. +- Likewise, ``build_slab_indices_and_offsets`` knows about the format of the + ``slab_indices`` and ``slab_offsets`` of ``StagedChangesArray``, but does not directly + interact with it. + + +.. _H5Sselect_hyperslab: https://support.hdfgroup.org/documentation/hdf5/latest/group___h5_s.html#ga6adfdf1b95dc108a65bf66e97d38536d +.. _H5DRead: https://support.hdfgroup.org/documentation/hdf5/latest/group___h5_d.html#ga8287d5a7be7b8e55ffeff68f7d26811c diff --git a/versioned_hdf5/meson.build b/versioned_hdf5/meson.build index e9d7e543..cc4f7aaa 100644 --- a/versioned_hdf5/meson.build +++ b/versioned_hdf5/meson.build @@ -80,3 +80,12 @@ py.extension_module( dependencies: compiled_deps, cython_args: cython_args, ) + +py.extension_module( + 'staged_changes', + 'staged_changes.pyx', + install: true, + subdir: 'versioned_hdf5', + dependencies: compiled_deps, + cython_args: cython_args, +) diff --git a/versioned_hdf5/staged_changes.py b/versioned_hdf5/staged_changes.py new file mode 100644 index 00000000..51072f73 --- /dev/null +++ b/versioned_hdf5/staged_changes.py @@ -0,0 +1,1743 @@ +# Note: this entire module is compiled by cython with wraparound=False +# See meson.build for details + +# Read docs/staged_changes.rst for high-level documentation. + +from __future__ import annotations + +import copy +import itertools +from collections.abc import Callable, Iterator, Sequence +from dataclasses import dataclass, field +from typing import Any, Literal, MutableMapping, TypeVar, cast + +import cython +import numpy as np +from cython import bint, ssize_t +from numpy.typing import ArrayLike, DTypeLike, NDArray + +from .cytools import ceil_a_over_b, count2stop, np_hsize_t +from .slicetools import read_many_slices +from .subchunk_map import ( + EntireChunksMapper, + IndexChunkMapper, + TransferType, + index_chunk_mappers, + read_many_slices_params_nd, +) +from .tools import asarray, format_ndindex, ix_with_slices + +if cython.compiled: # pragma: nocover + from cython.cimports.versioned_hdf5.cytools import ( # type: ignore + ceil_a_over_b, + count2stop, + hsize_t, + ) + from cython.cimports.versioned_hdf5.subchunk_map import ( # type: ignore + IndexChunkMapper, + read_many_slices_params_nd, + ) + + +T = TypeVar("T", bound=np.generic) + + +class StagedChangesArray(MutableMapping[Any, T]): + """Writeable numpy array-like, a.k.a. virtual array, which wraps around a + sequence of read-only array-likes of chunks, known as the base slabs. + + The base slabs must be concatenations of chunks along axis 0, typically with shape + (n*chunk_size[0], *chunk_size[1:]), where n is the number of chunks on the slab. + + All changes to the data or the shape are stored in memory into additional slabs + backed by plain numpy arrays, which are appended after the full slab and the base + slabs. Nothing writes back to the base slabs. + + In order to build a StagedChangesArray, in addition to the base slabs you must + provide a mapping from each chunk of the virtual array to + - the index in the sequence of base slabs, off by 1: element 0 in the sequence of + base slabs is indexed as 1 etc. Index 0 refers to a special slab, a.k.a. the full + slab, which is created internally and contains exactly 1 chunk full of the + fill_value. + - the offset along axis 0 of the referenced base slab that contains the chunk. + Chunks full of the fill_value must always have offset 0. + + slabs = [full slab] + base_slabs + staged_slabs + + These two mappings are two arrays of integers, slab_indices and slab_offsets, with + the same dimensionality as the virtual array and one point per chunk. + + e.g.:: + + chunk_size[0] = 10 + + slab_indices slab_offsets slabs[1] slabs[2] + (0 = fill_value) + + 1 1 0 30 10 0 0:10 (unreferenced) 0:10 (unreferenced) + 0 2 0 0 20 0 10:20 10:20 + 0 2 0 0 10 0 20:30 (unreferenced) 20:30 + 30:40 + + virtual array (* = chunk completely covered in fill_value) + + slabs[1][30:40] slabs[1][10:20] slabs[0][0:10]* + slabs[0][ 0:10]* slabs[2][20:30] slabs[0][0:10]* + slabs[0][ 0:10]* slabs[2][10:30] slabs[0][0:10]* + + The base slabs may not cover the whole surface of the virtual array, which thus + becomes a sparse array with missing chunks filled with a fill_value. Chunks are + individually dense and setting a single point in a chunk materializes the whole + chunk, so this differs from e.g. sparse.COO which is sparse at the single point + level. + + Likewise, the slab_indices and slab_offsets mapping may point to the same chunk + multiple times for multiple virtual slabs. This allows effectively implementing + copy-on-write (CoW) system. + + High level documentation on how the class works internally: :doc:`staged_changes`. + + Parameters + ---------- + shape: + The shape of the presented array + chunk_size: + The shape of each chunk + base_slabs: + Sequence of zero or more read-only numpy-like objects containing the baseline + data, with the chunks concatenated along axis 0. They will typically be shaped + (n*chunk_size[0], *chunk_size[1:]), with n sufficiently large to accomodate all + unique chunks, but may be larger than that. + slab_indices: + Numpy array of integers with shape equal to the number of chunks along each + axis, set to 0 for chunks that are full of fill_value and to i+1 for chunks that + lie on base_slabs[i]. + It will be modified in place. + slab_offsets: + Numpy array of integers with shape matching slab_indices, mapping each chunk to + the offset along axis 0 of its base slab. The offset of full chunks is always 0. + It will be modified in place. + fill_value: optional + Value to fill chunks with where slab_indices=1. Defaults to 0. + """ + + #: current shape of the StagedChangesArray, e.g. downstream of resize() + shape: tuple[int, ...] + + #: True if the user called resize() to alter the shape of the array; False otherwise + _resized: bool + + #: Map from each chunk to the index of the corresponding slab in the slabs list + slab_indices: NDArray[np_hsize_t] + + #: Offset of each chunk within the corresponding slab along axis 0 + slab_offsets: NDArray[np_hsize_t] + + #: Slabs of data, each containing one or more chunk stacked on top of each other + #: along axis 0. + #: + #: slabs[0] contains exactly one chunk full of fill_value. + #: It's broadcasted to the chunk_size and read-only. + #: slabs[1:n_base_slabs + 1] are the slabs containing the original data. + #: They can be any read-only numpy-like object. + #: slabs[n_base_slabs + 1:] contain the staged chunks. + #: + #: Edge slabs that don't fully cover the chunk_size are padded with uninitialized + #: cells; the shape of each staged slab is always + #: (n*chunk_size[0], *chunk_size[1:]). + #: + #: When a slab no longer holds any current chunk, it is dereferenced and replaced + #: with None in this list. This happens: + #: 1. after modifying every chunk of a base slab, so that they now live entirely + #: in the staged slabs, and/or + #: 2. when shrinking the array with resize(). + #: slabs[0] is never dereferenced. + slabs: list[NDArray[T] | None] + + #: Number of base slabs in the slabs list. + #: Staged slabs start at index n_base_slabs + 1. + n_base_slabs: int + + __slots__ = tuple(__annotations__) + + def __init__( + self, + shape: tuple[int, ...], + chunk_size: tuple[int, ...], + base_slabs: Sequence[NDArray[T]], # actually any numpy-compatible object + slab_indices: ArrayLike, + slab_offsets: ArrayLike, + fill_value: Any | None = None, + ): + ndim = len(shape) + if len(chunk_size) != ndim: + raise ValueError("shape and chunk_size must have the same length") + if any(s < 0 for s in shape): + raise ValueError("shape must be non-negative") + if any(c <= 0 for c in chunk_size): + raise ValueError("chunk_size must be strictly positive") + + self.shape = shape + self._resized = False + + dtype = base_slabs[0].dtype if base_slabs else None + if fill_value is None: + # Unlike 0.0, this works for weird dtypes such as np.void + fill_value = np.zeros((), dtype=dtype) + else: + fill_value = np.array(fill_value, dtype=dtype, copy=True) + if fill_value.ndim != 0: + raise ValueError("fill_value must be a scalar") + assert fill_value.base is None + + for base_slab in base_slabs: + if base_slab.dtype != fill_value.dtype: + raise ValueError( + f"Mismatched dtypes {base_slab.dtype} != {fill_value.dtype}" + ) + # Testing the minimum viable base_slab.shape would require a lot of nuance + # and a full scan of the slab_indices and slab_offsets arrays, so it would + # be overkill for a health check. + if base_slab.ndim != ndim: + raise ValueError( + "base_slabs must have the same dimensionality as shape" + ) + + self.slabs = [np.broadcast_to(fill_value, chunk_size)] + self.slabs.extend(base_slabs) + self.n_base_slabs = len(base_slabs) + + self.slab_indices = asarray(slab_indices, dtype=np_hsize_t) + self.slab_offsets = asarray(slab_offsets, dtype=np_hsize_t) + + n_chunks = self.n_chunks + if self.slab_indices.shape != n_chunks: + raise ValueError(f"{self.slab_indices.shape=}; expected {n_chunks}") + if self.slab_offsets.shape != n_chunks: + raise ValueError(f"{self.slab_offsets.shape=}; expected {n_chunks}") + + @property + def full_slab(self) -> NDArray[T]: + return cast(NDArray[T], self.slabs[0]) + + @property + def fill_value(self) -> NDArray[T]: + """Return array with ndim=0""" + return cast(NDArray[T], self.full_slab.base) + + @property + def dtype(self) -> np.dtype[T]: + return self.full_slab.dtype + + @property + def chunk_size(self) -> tuple[int, ...]: + """Size of the tiles that will be modified at once. A write to + less than a whole chunk will cause the remainder of the chunk + to be read from the underlying array. + """ + return self.full_slab.shape + + @property + def itemsize(self) -> int: + return self.full_slab.itemsize + + @property + def ndim(self) -> int: + return len(self.shape) + + @property + def size(self) -> int: + return int(np.prod(self.shape)) + + @property + def nbytes(self) -> int: + """Size in bytes of this array if it were completely loaded. Actual used memory + can be less or more depending on duplication between base and staged slabs, + on full chunks, on chunks referenced multiple times, and on slab fragmentation. + """ + return self.size * self.itemsize + + @property + def n_chunks(self) -> tuple[int, ...]: + """Number of chunks on each axis""" + return tuple(ceil_a_over_b(s, c) for s, c in zip(self.shape, self.chunk_size)) + + @property + def n_slabs(self) -> int: + """Total number of slabs""" + return len(self.slabs) + + @property + def n_staged_slabs(self) -> int: + """Number of staged slabs containing modified chunks""" + return self.n_slabs - self.n_base_slabs - 1 + + @property + def base_slabs(self) -> Sequence[NDArray[T] | None]: + return self.slabs[1 : self.n_base_slabs + 1] + + @property + def staged_slabs(self) -> Sequence[NDArray[T] | None]: + return self.slabs[self.n_base_slabs + 1 :] + + @property + def has_changes(self) -> bool: + """Return True if any chunks have been modified or if a lazy transformation + took place; False otherwise. + """ + return self.n_staged_slabs > 0 or self._resized + + def __array__(self, dtype=None, copy=None) -> NDArray[T]: + if copy is False: + raise ValueError("Cannot return a ndarray view of a StagedChangesArray") + return np.asarray(self[()], dtype=dtype) + + def __len__(self) -> int: + return self.shape[0] + + def __iter__(self) -> Iterator[NDArray[T]]: + c = self.chunk_size[0] + for start in range(0, self.shape[0], c): + yield from self[start : start + c] + + def __delitem__(self, idx: Any) -> None: + raise ValueError("Cannot delete array elements") + + def __repr__(self) -> str: + return ( + f"StagedChangesArray" + f"\nslab_indices:\n{self.slab_indices}" + f"\nslab_offsets:\n{self.slab_offsets}" + ) + + # *Plan class factories. These are kept separate from the methods that consume them + # for debugging purposes. Note that all plan feature a __repr__ method that is + # intended to be used during demo or debugging sessions, e.g. in a Jupyter notebook. + + def _changes_plan(self) -> ChangesPlan: + """Formulate a plan to export all the staged and unchanged chunks.""" + return ChangesPlan( + shape=self.shape, + chunk_size=self.chunk_size, + slab_indices=self.slab_indices, + slab_offsets=self.slab_offsets, + ) + + def _getitem_plan(self, idx: Any) -> GetItemPlan: + """Formulate a plan to get a slice of the array.""" + return GetItemPlan( + idx, + shape=self.shape, + chunk_size=self.chunk_size, + slab_indices=self.slab_indices, + slab_offsets=self.slab_offsets, + ) + + def _setitem_plan(self, idx: Any, copy: bool = True) -> SetItemPlan: + """Formulate a plan to set a slice of the array. + + Parameters + ---------- + idx: + First argument of __setitem__. Note that the plan doesn't need to know about + the value parameter. + copy: + False + Generating the plan may update alter the state of the + StagedChangesArray, so can only be done once without resulting in a + broken state. + This is faster, particularly when the impacted number of chunks is much + smaller than the total, but you must follow through with the plan. + True (default) + Generating the plan won't alter the state of the StagedChangesArray + until it's consumed by __setitem__. + This is useful for debugging and testing. + """ + return SetItemPlan( + idx, + shape=self.shape, + chunk_size=self.chunk_size, + slab_indices=self.slab_indices.copy() if copy else self.slab_indices, + slab_offsets=self.slab_offsets.copy() if copy else self.slab_offsets, + n_slabs=self.n_slabs, + n_base_slabs=self.n_base_slabs, + ) + + def _resize_plan(self, shape: tuple[int, ...], copy: bool = True) -> ResizePlan: + """Formulate a plan to resize the array.""" + return ResizePlan( + old_shape=self.shape, + new_shape=shape, + chunk_size=self.chunk_size, + slab_indices=self.slab_indices.copy() if copy else self.slab_indices, + slab_offsets=self.slab_offsets.copy() if copy else self.slab_offsets, + n_slabs=self.n_slabs, + n_base_slabs=self.n_base_slabs, + ) + + def _load_plan(self, copy: bool = True) -> LoadPlan: + """Formulate a plan to load all chunks from the base slabs.""" + return LoadPlan( + shape=self.shape, + chunk_size=self.chunk_size, + slab_indices=self.slab_indices.copy() if copy else self.slab_indices, + slab_offsets=self.slab_offsets.copy() if copy else self.slab_offsets, + n_slabs=self.n_slabs, + n_base_slabs=self.n_base_slabs, + ) + + def _get_slab( + self, idx: int | None, default: NDArray[T] | None = None + ) -> NDArray[T]: + slab = self.slabs[idx] if idx is not None else default + assert slab is not None + assert slab.dtype == self.dtype, (slab.dtype, self.dtype) + assert slab.ndim == self.ndim + return slab + + def changes( + self, + ) -> Iterator[tuple[tuple[slice, ...], int, NDArray[T] | tuple[slice, ...]]]: + """Yield all the changed chunks so far, as tuples of + + - slice index in the base virtual array + - index of the slab in the slabs list + - chunk value array, if a staged slab, or slice of the base slab otherwise + + Chunks that are completely full of the fill_value are not yielded. + + This lets you update the base virtual array: + + >> for idx, _, value in staged_array.changes(): + .. if isinstance(value, np.ndarray): + .. virtual_base[idx] = value + """ + plan = self._changes_plan() + + for base_slice, slab_idx, slab_slice in plan.chunks: + assert slab_idx > 0 # No full chunks + if slab_idx <= self.n_base_slabs: + yield base_slice, slab_idx, slab_slice + else: + slab = self._get_slab(slab_idx) + chunk = slab[slab_slice] + yield base_slice, slab_idx, chunk + + def __getitem__(self, idx: Any): + """Get a slice of data from the array. This reads from the staged slabs + in memory when available and from either the base slab or the fill_value + otherwise. + + Returns + ------- + T if idx selects a scalar, otherwise NDArray[T] + """ + plan = self._getitem_plan(idx) + + out = np.empty(plan.output_shape, dtype=self.dtype) + out_view = out[plan.output_view] + + for tplan in plan.transfers: + src_slab = self._get_slab(tplan.src_slab_idx) + assert tplan.dst_slab_idx is None + tplan.transfer(src_slab, out_view) + + # Return scalar value instead of a 0D array + return out[()] if out.ndim == 0 else out + + def _apply_mutating_plan( + self, plan: MutatingPlan, default_slab: NDArray[T] | None = None + ) -> None: + """Implement common workflow of __setitem__, resize, and load.""" + for shape in plan.append_slabs: + self.slabs.append(np.empty(shape, dtype=self.dtype)) + + for tplan in plan.transfers: + src_slab = self._get_slab(tplan.src_slab_idx, default_slab) + + assert tplan.dst_slab_idx is not None + assert tplan.dst_slab_idx > self.n_base_slabs + dst_slab = self._get_slab(tplan.dst_slab_idx) + + tplan.transfer(src_slab, dst_slab) + + for slab_idx in plan.drop_slabs: + assert slab_idx != 0 # Never drop the full slab + self.slabs[slab_idx] = None + + # Even if we pass the indices arrays by reference to the *Plan + # constructors, they may nonetheless be copied internally. + self.slab_indices = plan.slab_indices + self.slab_offsets = plan.slab_offsets + + def __setitem__(self, idx: Any, value: ArrayLike) -> None: + """Update the slabs containing the chunks selected by the index. + + The full slab (slabs[0]) and the base slabs are read-only. If the selected + chunks lie on any of them, append a new empty slab with the necessary space to + hold all such chunks, then copy the chunks from the full slab or a base slab to + the new slab, and finally update the new slab from the value parameter. + """ + plan = self._setitem_plan(idx, copy=False) + if not plan.mutates: + return + + # Preprocess value parameter + # Avoid double deep-copy of array-like objects that support the __array_* + # interface (e.g. sparse arrays). + value = cast(NDArray[T], asarray(value, self.dtype)) + + if plan.value_shape != value.shape: + value = np.broadcast_to(value, plan.value_shape) + value_view = value[plan.value_view] + + self._apply_mutating_plan(plan, value_view) + + def resize(self, shape: tuple[int, ...]) -> None: + """Change the array shape in place and fill new elements with self.fill_value. + + When enlarging, edge chunks which are not exactly divisible by chunk size are + partially filled with fill_value. This is a transfer from slabs[0] to a staged + slab. + + If such slabs are not already in memory, they are first loaded. + Just like in __setitem__, this appends a new empty slab to the slabs list, then + transfers from a base slab to the new slab, and finally transfers from slabs[0] + to the new slab. + + Slabs that are no longer needed are dereferenced; their location in the slabs + list is replaced with None. + This may cause base slabs to be dereferenced, but never the full slab. + """ + plan = self._resize_plan(shape, copy=False) + + # A resize may change the slab_indices and slab_offsets, but won't necessarily + # impact any chunks. In such cases, this is a no-op. + self._apply_mutating_plan(plan) + + self.shape = shape + self._resized = True + + def load(self) -> None: + """Load all chunks that are not yet in memory from the base array.""" + if all(slab is None for slab in self.slabs[1 : self.n_base_slabs + 1]): + return + plan = self._load_plan(copy=False) + self._apply_mutating_plan(plan) # This may be a no-op + + def copy(self, deep: bool = True) -> StagedChangesArray[T]: + """Return a copy of self. If deep=True, slabs are deep-copied. + If deep=False, the copy gets read-only views of the slabs and attempts to + change them will fail. + + Read-only full slab and base slabs are never copied. + """ + out = copy.copy(self) + out.slab_indices = self.slab_indices.copy() + out.slab_offsets = self.slab_offsets.copy() + out.slabs = self.slabs[: self.n_base_slabs + 1] + for slab in self.staged_slabs: + if slab is not None: + if deep: + slab = slab.copy() + else: + slab = slab[()] + slab.flags.writeable = False + out.slabs.append(slab) + return out + + def astype(self, dtype: DTypeLike, casting: Any = "unsafe") -> StagedChangesArray: + """Return a new StagedChangesArray with a different dtype. + + Chunks that are not yet staged are loaded from the base slabs. + """ + if self.dtype == dtype: + return self.copy(deep=True) + + out = self.copy(deep=False) + out.load() # Create new slabs and set all the base slabs to None + out.slabs[0] = np.broadcast_to( + out.fill_value.astype(dtype, casting=casting), + out.chunk_size, + ) + out.slabs[1:] = [ + slab.astype(dtype, casting=casting) if slab is not None else None + for slab in out.slabs[1:] + ] + return out + + def refill(self, fill_value: Any) -> StagedChangesArray[T]: + """Create a copy of self with changed fill_value. + + TODO this method is implemented very naively, as it loads all chunks from the + base slabs into the staged slabs, even when they don't contain any points set + to the fill_value. + """ + fill_value = np.asarray(fill_value, self.dtype) + if fill_value.ndim != 0: + raise ValueError("fill_value must be a scalar") + + out = self.copy(deep=True) + if fill_value != self.fill_value: + out.load() + out.slabs[0] = np.broadcast_to(fill_value, out.chunk_size) + for slab in out.staged_slabs: + if slab is not None: + slab[slab == self.fill_value] = fill_value + + return out + + @staticmethod + def full( + shape: tuple[int, ...], + chunk_size: tuple[int, ...], + fill_value: Any | None = None, + dtype: DTypeLike | None = None, + ) -> StagedChangesArray: + """Create a new StagedChangesArray with all chunks already in memory and + full of fill_value. + It won't consume any significant amounts of memory until it's modified. + """ + # ceil_a_over_b coerces these to unsigned integers. The interpreter will + # fail with MemoryError if either shape is negative or chunk_size is zero. + if any(s < 0 for s in shape): + raise ValueError("shape must be non-negative") + if any(c <= 0 for c in chunk_size): + raise ValueError("chunk_size must be strictly positive") + n_chunks = tuple(ceil_a_over_b(s, c) for s, c in zip(shape, chunk_size)) + + # StagedChangesArray.__init__ reads the dtype from fill_value + if fill_value is not None: + fill_value = np.array(fill_value, dtype=dtype) + else: + fill_value = np.zeros((), dtype=dtype) + + return StagedChangesArray( + shape=shape, + chunk_size=chunk_size, + base_slabs=[], + slab_indices=np.zeros(n_chunks, dtype=np_hsize_t), + slab_offsets=np.zeros(n_chunks, dtype=np_hsize_t), + fill_value=fill_value, + ) + + @staticmethod + def from_array( + arr: ArrayLike, + chunk_size: tuple[int, ...], + fill_value: Any | None = None, + ) -> StagedChangesArray: + """Create a new StagedChangesArray where the base slabs are read-only views + (if supported, otherwise deep-copies) of an existing array-like. + """ + # Don't deep-copy array-like objects, as long as they allow for views + arr = cast(np.ndarray, asarray(arr)) + + out = StagedChangesArray.full(arr.shape, chunk_size, fill_value, arr.dtype) + if out.size == 0: + return out + + # Iterate on all dimensions beyond the first. For each chunk, create a base slab + # that is a view of arr of full length along axis 0 and 1 chunk in size along + # all other axes. + n_chunks = out.n_chunks + slab_indices = np.arange(1, np.prod(n_chunks[1:]) + 1, dtype=np_hsize_t) + out.slab_indices[()] = slab_indices.reshape(n_chunks[1:])[None] + slab_offsets = np.arange(0, arr.shape[0], step=chunk_size[0]) + out.slab_offsets[()] = slab_offsets[(slice(None),) + (None,) * (out.ndim - 1)] + + for chunk_idx in itertools.product(*[list(range(c)) for c in n_chunks[1:]]): + view_idx = (slice(None),) + tuple( + slice(start := c * s, start + s) + for c, s in zip(chunk_idx, chunk_size[1:]) + ) + # Note: if the backend of arr doesn't support views + # (e.g. h5py.Dataset), this is a deep-copy + view = arr[view_idx] + if isinstance(view, np.ndarray): + view.flags.writeable = False + out.slabs.append(view) + + out.n_base_slabs = out.n_slabs - 1 + return out + + +@cython.cclass +@dataclass(init=False, eq=False, repr=False) +class TransferPlan: + """Instructions to transfer data: + + - from a slab to the return value of __getitem__, or + - from the value parameter of __setitem__ to a slab, or + - between two slabs. + + Parameters + ---------- + mappers: + List of IndexChunkMapper objects, one for each axis, defining the selection. + src_slab_idx: + Index of the source slab in StagedChangesArray.slabs. + During __setitem__, it must be None to indicate the value parameter. + dst_slab_idx: + Index of the destination slab in StagedChangesArray.slabs. + During __getitem__, it must be None to indicate the output array. + chunk_idxidx: + 2D array with shape (nchunks, ndim), one row per chunk being transferred and one + columns per dimension. chunk_idxidx[i, j] represents the address on + mappers[j].chunk_indices for the i-th chunk; in other words, + mappers[j].chunk_indices[chunk_idxidx[i, j]] * chunk_size[j] is the address + along axis j of the top-left corner of the chunk in the virtual dataset. + src_slab_offsets: + Offset of each chunk within the source slab along axis 0. + Ignored during __setitem__. + dst_slab_offsets: + Offset of each chunk within the destination slab along axis 0. + Ignored during __getitem__. + slab_indices: optional + StagedChangesArray.slab_indices. It will be updated in place. + Ignored during __getitem__. + slab_offsets: optional + StagedChangesArray.slab_offsets. It will be updated in place. + Ignored during __getitem__. + """ + + #: Index of the source slab in StagedChangesArray.slabs. + #: During __setitem__, it can be None to indicate the value parameter. + src_slab_idx: int | None + + #: Index of the destination slab in StagedChangesArray.slabs. + #: During __getitem__, it's always None to indicate the output array. + dst_slab_idx: int | None + + #: Parameters for read_many_slices(). + src_start: hsize_t[:, :] + dst_start: hsize_t[:, :] + count: hsize_t[:, :] + src_stride: hsize_t[:, :] + dst_stride: hsize_t[:, :] + + def __init__( + self, + mappers: list[IndexChunkMapper], + *, + src_slab_idx: int | None, + dst_slab_idx: int | None, + chunk_idxidx: hsize_t[:, :], + src_slab_offsets: hsize_t[:], + dst_slab_offsets: hsize_t[:], + slab_indices: NDArray[np_hsize_t] | None = None, + slab_offsets: NDArray[np_hsize_t] | None = None, + ): + self.src_slab_idx = src_slab_idx + self.dst_slab_idx = dst_slab_idx + nchunks = chunk_idxidx.shape[0] + ndim = chunk_idxidx.shape[1] + + assert src_slab_idx != dst_slab_idx + assert dst_slab_idx is None or dst_slab_idx > 0 + if src_slab_idx is None: + transfer_type = TransferType.setitem + elif dst_slab_idx is None: + transfer_type = TransferType.getitem + else: # slab-to-slab + transfer_type = TransferType.slab_to_slab + + # Generate slices for each axis, then put them in + # pseudo cartesian product following chunk_idxidx. + slices_nd = read_many_slices_params_nd( + transfer_type, + mappers, + chunk_idxidx, + src_slab_offsets, + dst_slab_offsets, + ) + # Refer to subchunk_map.pxd::ReadManySlicesNDColumn for column indices + self.src_start = slices_nd[:, 0, :] + self.dst_start = slices_nd[:, 1, :] + self.count = slices_nd[:, 2, :] + self.src_stride = slices_nd[:, 3, :] + self.dst_stride = slices_nd[:, 4, :] + + # Finally, update slab_indices and slab_offsets in place + if slab_indices is not None: + assert slab_offsets is not None + + chunk_indices = np.empty((ndim, nchunks), dtype=np_hsize_t) + chunk_indices_v: hsize_t[:, :] = chunk_indices + for j in range(ndim): + mapper: IndexChunkMapper = mappers[j] + for i in range(nchunks): + chunk_indices_v[j, i] = mapper.chunk_indices[chunk_idxidx[i, j]] + chunk_indices_tup = tuple(chunk_indices) + slab_indices[chunk_indices_tup] = dst_slab_idx + slab_offsets[chunk_indices_tup] = np.asarray(dst_slab_offsets) + + @cython.ccall + def transfer(self, src: NDArray[T], dst: NDArray[T]): + """Call read_many_slices() to transfer slices of data from src to dst""" + read_many_slices( + src, + dst, + self.src_start, + self.dst_start, + self.count, + self.src_stride, + self.dst_stride, + ) + + def __len__(self) -> int: + """Return number of slices that will be transferred""" + return self.src_start.shape[0] + + @cython.cfunc + def _repr_idx(self, i: ssize_t, start: hsize_t[:, :], stride: hsize_t[:, :]) -> str: + """Return a string representation of the i-th row""" + ndim = start.shape[1] + idx = [] + for j in range(ndim): + start_ij = start[i, j] + count_ij = self.count[i, j] + stride_ij = stride[i, j] + stop_ij = count2stop(start_ij, count_ij, stride_ij) + idx.append(slice(start_ij, stop_ij, stride_ij)) + return format_ndindex(tuple(idx)) + + def __repr__(self) -> str: + """This is meant to be incorporated by the __repr__ method of the + other *Plan classes + """ + src = f"slabs[{i}]" if (i := self.src_slab_idx) is not None else "value" + dst = f"slabs[{i}]" if (i := self.dst_slab_idx) is not None else "out" + nslices = self.src_start.shape[0] + s = f"\n # {nslices} transfers from {src} to {dst}" + for i in range(nslices): + src_idx = self._repr_idx(i, self.src_start, self.src_stride) + dst_idx = self._repr_idx(i, self.dst_start, self.dst_stride) + s += f"\n {dst}[{dst_idx}] = {src}[{src_idx}]" + + return s + + +@cython.cclass +@dataclass(init=False, repr=False) +class GetItemPlan: + """Instructions to execute StagedChangesArray.__getitem__""" + + #: Shape of the array returned by __getitem__ + output_shape: tuple[int, ...] + + #: Index to slice the output array to add extra dimensions to ensure it's got + #: the same dimensionality as the base array + output_view: tuple[slice | None, ...] + + transfers: list[TransferPlan] + + def __init__( + self, + idx: Any, + shape: tuple[int, ...], + chunk_size: tuple[int, ...], + slab_indices: NDArray[np_hsize_t], + slab_offsets: NDArray[np_hsize_t], + ): + """Generate instructions to execute StagedChangesArray.__getitem__ + + Parameters + ---------- + idx: + Arbitrary numpy fancy index passed as a parameter to __getitem__ + + All other parameters are the matching attributes of StagedChangesArray + """ + idx, mappers = index_chunk_mappers(idx, shape, chunk_size) + self.output_shape = idx.newshape(shape) + self.output_view = tuple(mapper.value_view_idx for mapper in mappers) + self.transfers = [] + + if not mappers: + # Empty selection + return + + chunks = _chunks_in_selection( + slab_indices, + slab_offsets, + mappers, + idxidx=True, + ) + self.transfers.extend( + _make_transfer_plans( + mappers, + chunks, + src_slab_idx="chunks", + dst_slab_idx=None, + ) + ) + + @property + def head(self) -> str: + ntransfers = sum(len(tplan) for tplan in self.transfers) + return ( + f"GetItemPlan" + ) + + def __repr__(self) -> str: + return self.head + "".join(str(tplan) for tplan in self.transfers) + + +@cython.cclass +@dataclass(repr=False) +class MutatingPlan: + """Common ancestor of all plans that mutate StagedChangesArray""" + + #: Metadata arrays of StagedChangesArray. The parameters passed to __init__ may + #: either be updated in place or replaced while formulating the plan. + slab_indices: NDArray[np_hsize_t] + slab_offsets: NDArray[np_hsize_t] + + #: Create new uninitialized slabs with the given shapes + #: and append them StagedChangesArray.slabs + append_slabs: list[tuple[int, ...]] = field(init=False, default_factory=list) + + #: data transfers between slabs or from the __setitem__ value to a slab. + #: dst_slab_idx can include the slabs just created by append_slabs. + transfers: list[TransferPlan] = field(init=False, default_factory=list) + + #: indices of StagedChangesArray.slabs to replace with None, + #: thus dereferencing the slab. This must happen *after* the transfers. + drop_slabs: list[int] = field(init=False, default_factory=list) + + @property + def mutates(self) -> bool: + """True if this plan alters the state of the StagedChangesArray""" + return bool(self.transfers or self.drop_slabs) + + @property + def head(self) -> str: + """This is meant to be incorporated by the head() property of the subclasses""" + ntransfers = sum(len(tplan) for tplan in self.transfers) + return ( + f"append {len(self.append_slabs)} empty slabs, " + f"{ntransfers} slice transfers among {len(self.transfers)} slab pairs, " + f"drop {len(self.drop_slabs)} slabs>" + ) + + def __repr__(self) -> str: + """This is meant to be incorporated by the __repr__ method of the subclasses""" + s = self.head + + if self.append_slabs: + max_slab_idx = self.slab_indices.max() + slab_start_idx = int(max_slab_idx) - len(self.append_slabs) + 1 + assert slab_start_idx > 0 + for slab_idx, shape in enumerate(self.append_slabs, slab_start_idx): + s += f"\n slabs.append(np.empty({shape})) # slabs[{slab_idx}]" + + s += "".join(str(tplan) for tplan in self.transfers) + for slab_idx in self.drop_slabs: + s += f"\n slabs[{slab_idx}] = None" + s += f"\nslab_indices:\n{self.slab_indices}" + s += f"\nslab_offsets:\n{self.slab_offsets}" + return s + + +@cython.cclass +@dataclass(init=False, repr=False) +class SetItemPlan(MutatingPlan): + """Instructions to execute StagedChangesArray.__setitem__""" + + #: Shape the value parameter must be broadcasted to + value_shape: tuple[int, ...] + + #: Index to slice the value parameter array to add extra dimensions to ensure it's + #: got the same dimensionality as the base array + value_view: tuple[slice | None, ...] + + def __init__( + self, + idx: Any, + shape: tuple[int, ...], + chunk_size: tuple[int, ...], + slab_indices: NDArray[np_hsize_t], + slab_offsets: NDArray[np_hsize_t], + n_slabs: int, + n_base_slabs: int, + ): + """Generate instructions to execute StagedChangesArray.__setitem__. + + Parameters + ---------- + idx: + Arbitrary numpy fancy index passed as a parameter to __setitem__ + + All other parameters are the matching attributes and properties of + StagedChangesArray. + + **Implementation notes** + There are three use cases to cover: + + 1. A chunk is full of fill_value or lies on a base slab, + and is only partially covered by the selection. We need a 4-pass process: + a. create a new empty slab; + b. transfer the chunk from the base or full slab to the new slab; + c. update slab_indices and slab_offsets to reflect the transfer; + d. potentially dereference the base slab if it contains no other chunks; + e. update the new slab with the __setitem__ value. + + 2. A chunk is full of fill_value or lies on a base slab, + and is wholly covered by the selection: + a. create a new empty slab; + b. update the new slab with the __setitem__ value; + c. update slab_indices and slab_offsets to reflect that the chunk is no + longer on the base or full slab; + d. potentially dereference the base slab if it contains no other chunks. + + 3. A chunk is on a staged slab: + a. update the staged slab with the __setitem__ value. + """ + super().__init__(slab_indices, slab_offsets) + + ndim = len(shape) + idx, mappers = index_chunk_mappers(idx, shape, chunk_size) + self.value_shape = idx.newshape(shape) + self.value_view = tuple(mapper.value_view_idx for mapper in mappers) + + # We'll deep-copy later, only if needed + if not mappers: + # Empty selection + return + + # Use case 1 + # Find all chunks that lie on a base or full slab and are only partially + # covered by the selection. + partial_chunks = _chunks_in_selection( + slab_indices, + slab_offsets, + mappers, + filter=lambda slab_idx: slab_idx <= n_base_slabs, + idxidx=True, + only_partial=True, + ) + n_partial_chunks = partial_chunks.shape[0] + if n_partial_chunks > 0: + # Create a new empty slab and copy the chunks entirely from the base or full + # slab. Then, update slab_indices and slab_offsets. + + self.append_slabs.append( + (n_partial_chunks * chunk_size[0],) + chunk_size[1:] + ) + entire_chunks_mappers: list[IndexChunkMapper] = [ + EntireChunksMapper(mapper) for mapper in mappers + ] + self.transfers.extend( + _make_transfer_plans( + entire_chunks_mappers, + partial_chunks, + src_slab_idx="chunks", + dst_slab_idx=n_slabs, + slab_indices=slab_indices, # Modified in place + slab_offsets=slab_offsets, # Modified in place + ) + ) + + # Now all chunks are either on staged slabs or are wholly covered by the + # selection. Effectively use case 1 has been simplified to use case 3: as long + # as a chunk already lies on a staged slab, we don't care if the selection is + # whole or partial. + setitem_chunks = _chunks_in_selection( + slab_indices, + slab_offsets, + mappers, + idxidx=True, + ) + n_setitem_chunks = setitem_chunks.shape[0] + + # Break the above into use cases 2 and 3 + cut: ssize_t = int(np.searchsorted(setitem_chunks[:, ndim], n_base_slabs + 1)) + if cut > 0: + # Use case 2. These chunks lie on a read-only full or base slab, so we can't + # overwrite them with the __setitem__ value parameter. + + self.append_slabs.append((int(cut) * chunk_size[0],) + chunk_size[1:]) + self.transfers.extend( + _make_transfer_plans( + mappers, + setitem_chunks[:cut], + src_slab_idx=None, + dst_slab_idx=n_slabs + (n_partial_chunks > 0), + slab_indices=slab_indices, # Modified in place + slab_offsets=slab_offsets, # Modified in place + ) + ) + + if cut < n_setitem_chunks: + # Use case 3. These chunks are on staged slabs and can be updated in place + # with the __setitem__ value parameter. + # Note that this includes chunks that were previously in use case 1. + # slab_indices and slab_offsets don't need to be modified. + self.transfers.extend( + _make_transfer_plans( + mappers, + setitem_chunks[cut:], + src_slab_idx=None, + dst_slab_idx="chunks", + ) + ) + + # DO NOT perform a full scan of self.slab_indices in order to populate + # self.drop_slabs. Everything so far has been O(selected chunks), do not + # introduce an operation that is O(all chunks)! + + @property + def head(self) -> str: + return ( + f"SetItemPlan 0: + self.append_slabs.append((nchunks * chunk_size[0],) + chunk_size[1:]) + + self.transfers.extend( + _make_transfer_plans( + mappers, + chunks, + src_slab_idx="chunks", + dst_slab_idx=n_slabs, + slab_indices=slab_indices, # Modified in place + slab_offsets=slab_offsets, # Modified in place + ) + ) + + # Also drop slabs that were previously loaded by SetItemPlan or ResizePlan, but + # which were not in SetItemPlan.drop_slabs because of performance reasons. + self.drop_slabs.extend(range(1, n_base_slabs + 1)) + + @property + def head(self) -> str: + return "LoadPlan<" + super().head + + +@cython.cclass +@dataclass(init=False, repr=False) +class ChangesPlan: + """Instructions to execute StagedChangesArray.changes().""" + + #: List of all chunks that aren't full of the fill_value. + #: + #: List of tuples of + #: - index to slice the base array with + #: - index of StagedChangesArray.slabs + #: - index to slice the slab to retrieve the chunk value + chunks: list[tuple[tuple[slice, ...], int, tuple[slice, ...]]] + + def __init__( + self, + shape: tuple[int, ...], + chunk_size: tuple[int, ...], + slab_indices: NDArray[np_hsize_t], + slab_offsets: NDArray[np_hsize_t], + ): + """Generate instructions to execute StagedChangesArray.changes(). + + All parameters are the matching attributes of StagedChangesArray. + """ + self.chunks = [] + + _, mappers = index_chunk_mappers((), shape, chunk_size) + if not mappers: + return # size 0 + + # Build rulers of slices for each axis + dst_slices: list[list[slice]] = [] # Slices in the represented array + slab_slices: list[list[slice]] = [[]] # Slices in the slab (except axis 0) + + mapper: IndexChunkMapper + for mapper in mappers: + dst_slices_ix = [] + a: hsize_t = 0 + assert mapper.n_chunks > 0 # not size 0 + for i in range(mapper.n_chunks - 1): + b = a + mapper.chunk_size + dst_slices_ix.append(slice(a, b, 1)) + a = b + b = a + mapper.last_chunk_size + dst_slices_ix.append(slice(a, b, 1)) + dst_slices.append(dst_slices_ix) + + # slab slices on axis 0 must be built on the fly for each chunk, + # as each chunk has a different slab offset + mapper = mappers[0] + axis0_chunk_sizes: hsize_t[:] = np.full( + mapper.n_chunks, mapper.chunk_size, dtype=np_hsize_t + ) + axis0_chunk_sizes[mapper.n_chunks - 1] = mapper.last_chunk_size + + # slab slices on the other axes can be built with a ruler + # (and they'll be all the same except for the last chunk) + for mapper in mappers[1:]: + slab_slices.append( + [slice(0, mapper.chunk_size, 1)] * (mapper.n_chunks - 1) + + [slice(0, mapper.last_chunk_size, 1)] + ) + + # Find all non-full chunks + chunks = _chunks_in_selection( + slab_indices, + slab_offsets, + mappers, + filter=lambda slab_idx: slab_idx > 0, + idxidx=False, + sort_by_slab=False, + ) + nchunks = chunks.shape[0] + ndim = chunks.shape[1] - 2 + + for i in range(nchunks): + dst_ndslice = [] + for j in range(ndim): + chunk_idx = chunks[i, j] + dst_ndslice.append(dst_slices[j][chunk_idx]) + + chunk_idx = chunks[i, 0] + slab_idx = chunks[i, ndim] # slab_indices[chunk_idx] + start = chunks[i, ndim + 1] # slab_offsets[chunk_idx] + stop = start + axis0_chunk_sizes[chunk_idx] + slab_ndslice = [slice(start, stop, 1)] + for j in range(1, ndim): + chunk_idx = chunks[i, j] + slab_ndslice.append(slab_slices[j][chunk_idx]) + + self.chunks.append((tuple(dst_ndslice), slab_idx, tuple(slab_ndslice))) + + @property + def head(self) -> str: + return f"ChangesPlan<{len(self.chunks)} chunks>" + + def __repr__(self) -> str: + s = self.head + fmt = format_ndindex + for base_slice, slab_idx, slab_slice in self.chunks: + s += f"\n base[{fmt(base_slice)}] = slabs[{slab_idx}][{fmt(slab_slice)}]" + return s + + +@cython.cclass +@dataclass(init=False, repr=False) +class ResizePlan(MutatingPlan): + """Instructions to execute StagedChangesArray.resize()""" + + def __init__( + self, + old_shape: tuple[int, ...], + new_shape: tuple[int, ...], + chunk_size: tuple[int, ...], + slab_indices: NDArray[np_hsize_t], + slab_offsets: NDArray[np_hsize_t], + n_slabs: int, + n_base_slabs: int, + ): + """Generate instructions to execute StagedChangesArray.resize(). + + Parameters + ---------- + old_shape: + StagedChangesArray.shape before the resize operation + new_shape: + StagedChangesArray.shape after the resize operation + + All other parameters are the matching attributes of StagedChangesArray. + """ + if len(new_shape) != len(old_shape): + raise ValueError( + "Can't change dimensionality from {old_shape} to {new_shape}" + ) + if any(s < 0 for s in new_shape): + raise ValueError("shape must be non-negative") + + super().__init__(slab_indices, slab_offsets) + if old_shape == new_shape: + return + + # Shrinking along an axis can't alter chunks on the slabs. + # However, it can reduce the amount of chunks impacted by enlarges on other + # axes, so it should be done first. + # It can also causes slabs to drop. + shrunk_shape = tuple(min(o, n) for o, n in zip(old_shape, new_shape)) + if shrunk_shape != old_shape: + chunks_slice = tuple( + slice(ceil_a_over_b(s, c)) for s, c in zip(shrunk_shape, chunk_size) + ) + # Just a view. This won't change the shape of the arrays when shrinking the + # edge chunks without reducing the number of chunks. + self.slab_indices = self.slab_indices[chunks_slice] + self.slab_offsets = self.slab_offsets[chunks_slice] + chunks_dropped = self.slab_indices.size < slab_indices.size + + if shrunk_shape != new_shape: + # Enlarging along one or more axes. This is more involved than shrinking, as + # we may need to potentially load and then update edge chunks. + + # If we're actually adding chunks, and not just resizing the edge chunks, + # we need to enlarge slab_indices and slab_offsets too. + pad_width = [ + (0, ceil_a_over_b(s, c) - n) + for s, c, n in zip(new_shape, chunk_size, self.slab_indices.shape) + ] + # np.pad is a deep-copy; skip if unnecessary. + if any(p != (0, 0) for p in pad_width): + self.slab_indices = np.pad(self.slab_indices, pad_width) + self.slab_offsets = np.pad(self.slab_offsets, pad_width) + + # No need to transfer anything if there are only full chunks + if n_slabs > 1: + prev_shape = shrunk_shape + for axis in range(len(new_shape)): + next_shape = new_shape[: axis + 1] + prev_shape[axis + 1 :] + if next_shape != prev_shape: + self._enlarge_along_axis( + prev_shape, + next_shape, + chunk_size, + axis, + n_slabs + len(self.append_slabs), + n_base_slabs, + ) + prev_shape = next_shape + + # Shrinking may drop any slab. Crucially, they may be staged slabs, and + # dereferencing them means releasing memory. + # Enlarging may only drop base slabs. Let's not do that, for the same + # reason we don't do it in __setitem__: an all-too-common pattern is to + # enlarge over and over again by one or a few points. + if chunks_dropped: + # This may set to None again slabs that were already None. That's fine. + # On the upside, it also cleans up slabs dereferenced by __setitem__ + # or by enlarge operations. + self.drop_slabs = np.setdiff1d( + np.arange(1, n_slabs, dtype=np_hsize_t), # Never drop the full slab + np.unique(self.slab_indices), # fairly expensive + assume_unique=True, + ).tolist() + + @cython.cfunc + def _enlarge_along_axis( + self, + old_shape: tuple[int, ...], + new_shape: tuple[int, ...], + chunk_size: tuple[int, ...], + axis: ssize_t, + n_slabs: int, + n_base_slabs: int, + ): + """Enlarge along a single axis""" + old_size = old_shape[axis] + + # Old size, rounded down or up to the nearest chunk + old_floor_size = old_size - old_size % chunk_size[axis] + if old_floor_size == old_size: + return # Everything we're doing is adding extra empty chunks + + new_size = min(new_shape[axis], old_floor_size + chunk_size[axis]) + + # Two steps: + # 1. Find edge chunks on base slabs that were partial and became full, or + # vice versa, or remain partial but larger. + # Load them into a new slab at slab_idx=n_slabs. + # 2. Find edge chunks that need filling with fill_value + # and transfer from slabs[0] (the full chunk) into them. + # This includes chunks we just transferred in the previous step) + + # Step 1 + if n_base_slabs > 0: + idx = (slice(None),) * axis + (slice(old_floor_size, old_size),) + _, mappers = index_chunk_mappers(idx, new_shape, chunk_size) + if not mappers: + return # Resizing from size 0 + + chunks = _chunks_in_selection( + self.slab_indices, + self.slab_offsets, + mappers, + filter=lambda slab_idx: (0 < slab_idx) & (slab_idx <= n_base_slabs), + idxidx=True, + ) + nchunks = chunks.shape[0] + if nchunks > 0: + self.append_slabs.append((nchunks * chunk_size[0],) + chunk_size[1:]) + self.transfers.extend( + _make_transfer_plans( + mappers, + chunks, + src_slab_idx="chunks", + dst_slab_idx=n_slabs, + slab_indices=self.slab_indices, # Modified in place + slab_offsets=self.slab_offsets, # Modified in place + ) + ) + + # Step 2 + idx = (slice(None),) * axis + (slice(old_size, new_size),) + _, mappers = index_chunk_mappers(idx, new_shape, chunk_size) + if not mappers: + return # Resizing from size 0 + + chunks = _chunks_in_selection( + self.slab_indices, + self.slab_offsets, + mappers, + filter=lambda slab_idx: slab_idx > 0, + idxidx=True, + ) + nchunks = chunks.shape[0] + + if nchunks > 0: + ndim = chunks.shape[1] - 2 + assert chunks[0, ndim] > n_base_slabs + self.transfers.extend( + _make_transfer_plans( + mappers, + chunks, + src_slab_idx=0, + dst_slab_idx="chunks", + ) + ) + + @property + def head(self) -> str: + return "ResizePlan<" + super().head + + +@cython.ccall +def _chunks_in_selection( + slab_indices: NDArray[np_hsize_t], + slab_offsets: NDArray[np_hsize_t], + mappers: list[IndexChunkMapper], + filter: Callable[[NDArray[np_hsize_t]], NDArray[np.bool_]] | None = None, + idxidx: bint = False, + sort_by_slab: bint = True, + only_partial: bint = False, +) -> hsize_t[:, :]: + """Find all chunks within a selection + + Parameters + ---------- + slab_indices: + StagedChangesArray.slab_indices + slab_offsets: + StagedChangesArray.slab_offsets + mappers: + Output of index_chunk_mappers() + filter: optional + Function to apply to a (selection of) slab_indices that returns a boolean mask. + Chunks where the mask is False won't be returned. + only_partial: optional + If True, only return the chunks which are only partially covered by the + selection along one or more axes. + sort_by_slab: optional + True (default) + The chunks are returned sorted by slab index first and slaf offset next. + Sorting by slab index will later allow cutting the result into separate + calls to read_many_chunks() later. Sorting by slan offset within eacn slab + is to improve access performance on disk-backed storage, where accessing two + adjacent chunks will typically avoid a costly seek() syscall and may benefit + from glibc-level and/or OS-level buffering. + False + The chunks are returned sorted by chunk index + idxidx: optional + If set to True, return the indices along chunk_indices instead of the values + + Returns + ------- + Flattened sequence of chunks that match both the selection performed by the mappers + along each axis and the mask performed by the filter on each individual chunk. + + The returned object is a 2D view of indices where each row corresponds to a chunk + and ndim+2 columns: + + - columns 0:ndim are the chunk indices, + or the indices to chunk_indices if idxidx=True. + Multiply by chunk_size to get the top-left corner of the chunk + within the virtual array. + - column ndim is the slab index + (point of slab_indices; the index within StagedChangesArray.slabs) + - column ndim+1 is the slab offset + (point of slab_offsets, the offset on axis 0 within the slab) + + **Examples** + + >>> index = (slice(5, 20), slice(15, 45)) + >>> chunk_size = (10, 10) + >>> shape = (30, 60) + >>> slab_indices = np.array( + ... [[0, 0, 1, 2, 0, 0], + ... [0, 0, 0, 0, 1, 0], + ... [0, 0, 2, 0, 1, 0]], + ... dtype=np_hsize_t + ... ) + >>> slab_offsets = np.array( + ... [[0, 0, 50, 0, 0, 0], + ... [0, 0, 0, 0, 30, 0], + ... [0, 0, 60, 0, 0, 0]], + ... dtype=np_hsize_t + ... ) + >>> _, mappers = index_chunk_mappers(index, shape, chunk_size) + >>> tuple(list(m.chunk_indices) for m in mappers) + ([0, 1], [1, 2, 3, 4]) + >>> tuple(m.chunks_indexer() for m in mappers) + (slice(0, 2, 1), slice(1, 5, 1)) + >>> tuple(m.whole_chunks_idxidx() for m in mappers) + (slice(1, 2, 1), slice(1, 3, 1)) + >>> np.asarray(_chunks_in_selection( + ... slab_indices, slab_offsets, mappers, sort_by_slab=False + ... )) + array([[ 0, 1, 0, 0], + [ 0, 2, 1, 50], + [ 0, 3, 2, 0], + [ 0, 4, 0, 0], + [ 1, 1, 0, 0], + [ 1, 2, 0, 0], + [ 1, 3, 0, 0], + [ 1, 4, 1, 30]], dtype=uint64) + >>> np.asarray(_chunks_in_selection( + ... slab_indices, slab_offsets, mappers, sort_by_slab=False, only_partial=True + ... )) + array([[ 0, 1, 0, 0], + [ 0, 2, 1, 50], + [ 0, 3, 2, 0], + [ 0, 4, 0, 0], + [ 1, 1, 0, 0], + [ 1, 4, 1, 30]], dtype=uint64) + + chunk_indices = ([0, 1], [1, 2, 3, 4]) + whole chunks_indices = ([0 ], [ 2, 3 ]) + + A chunk is partially selected if its coordinates are in the intersection of the + indices across all axes, but they are not wholly selected along all axes. + + slab_indices slab_offsets selection only_partial=False only_partial=True + 001200 0 0 50 0 0 0 .pppp. .xxxx. .xxxx. + 000010 0 0 0 0 30 0 .pwwp. .xxxx. .x..x. + 002010 0 0 60 0 0 0 ...... ...... ...... + + p=partial (0, 1) [0][ 0:] (0, 1) [0][ 0:] + w=whole (0, 2) [1][50:] (0, 2) [1][50:] + (0, 3) [2][ 0:] (0, 3) [2][ 0:] + (0, 4) [0][ 0:] (0, 4) [0][ 0:] + (1, 0) [0][ 0:] + (1, 1) [0][ 0:] (1, 1) [0][ 0:] + (1, 2) [0][ 0:] + (1, 3) [1][30:] (1, 3) [1][30:] + """ + mapper: IndexChunkMapper # noqa: F841 + ndim = len(mappers) + assert ndim > 0 + + if only_partial: + # A partial chunk is a chunk that is selected by chunks_indexer() along all + # axes, but it is not selected by whole_chunks_idxidx() along at least one axis + any_partial = False + whole_chunks_idxidx = [] + for mapper in mappers: + wcidx = mapper.whole_chunks_idxidx() + whole_chunks_idxidx.append(wcidx) + if not isinstance(wcidx, slice): + any_partial = True + elif wcidx != slice(0, len(mapper.chunk_indices), 1): + any_partial = True + + if not any_partial: + # All chunks are wholly selected + return np.empty((0, ndim + 2), dtype=np_hsize_t) + + # Slice slab_indices and slab_offsets with mappers + indexers = ix_with_slices( + *[mapper.chunks_indexer() for mapper in mappers], shape=slab_indices.shape + ) + slab_indices = slab_indices[indexers] + slab_offsets = slab_offsets[indexers] + + if filter: + mask = filter(slab_indices) + else: + mask = np.broadcast_to(True, slab_indices.shape) + + if only_partial: + whole_chunks_idxidx_tup = ix_with_slices(*whole_chunks_idxidx, shape=mask.shape) + if not filter: + mask = mask.copy() # broadcasted + mask[whole_chunks_idxidx_tup] = False + + # Apply mask and flatten + nz = np.nonzero(mask) + slab_indices = slab_indices[nz] + slab_offsets = slab_offsets[nz] + + if idxidx: + # Don't copy when converting from np.intp to uint64 on 64-bit platforms + columns = [asarray(nz_i, np_hsize_t) for nz_i in nz] + else: + columns = [ + np.asarray(mapper.chunk_indices)[nz_i] for mapper, nz_i in zip(mappers, nz) + ] + columns += [slab_indices, slab_offsets] + + stacked = np.empty((slab_indices.size, ndim + 2), dtype=np_hsize_t) + + if sort_by_slab: + sort_idx = np.lexsort((slab_offsets, slab_indices)) + for i, col in enumerate(columns): + col.take(sort_idx, axis=0, out=stacked[:, i]) + else: + for i, col in enumerate(columns): + stacked[:, i] = col + + return stacked + + +def _make_transfer_plans( + mappers: list[IndexChunkMapper], + chunks: hsize_t[:, :], + *, + src_slab_idx: Literal[0, "chunks", None], + dst_slab_idx: int | Literal["chunks", None], + slab_indices: NDArray[np_hsize_t] | None = None, + slab_offsets: NDArray[np_hsize_t] | None = None, +) -> Iterator[TransferPlan]: + """Generate one or more TransferPlan, one for each pair of source and destination + slabs. + + Parameters + ---------- + slab_indices: + StagedChangesArray.slab_indices. It will be updated in place. + Ignored if dst_slab_idx is None. + slab_offsets: + StagedChangesArray.slab_offsets. It will be updated in place. + Ignored if dst_slab_idx is None. + mappers: + List of IndexChunkMapper objects, one for each axis, defining the selection. + chunks: + Output of _chunks_in_selection(..., idxidx=True). + One row per chunk to transfer. + The first ndim columns are the indices to mappers[j].chunk_indices. + for the i-th chunk; in other words, + mappers[j].chunk_indices[chunk_idxidx[i, j]] * chunk_size[j] is the address + along axis j of the top-left corner of the chunk in the virtual dataset. + + The last two columns are the slab_idx and slab offset, either source or + destination depending on the other parameters. + src_slab_idx: + Index of the source slab in StagedChangesArray.slabs. + int + All transfers are from a single slab with this index. + "chunks" + Transfers are from the slabs pointed to by the slab_idx column in chunks. + None + Transfers are from the value parameter of __setitem__. + dst_slab_idx: + Index of the destination slab in StagedChangesArray.slabs. + int + All transfers are to a single slab with this index. + This is used when filling a brand new slab. + "chunks" + Transfers are to the slabs pointed to by the slab_idx column in chunks. + src_slab_idx can't be "chunks". + None + Transfers are to the return value of __getitem__. + src_slab_idx can't be None. + """ + nchunks = chunks.shape[0] + ndim = chunks.shape[1] - 2 + chunk_size = mappers[0].chunk_size + + src_slab_offsets_v: hsize_t[:] + dst_slab_offsets_v: hsize_t[:] + + if src_slab_idx is None: # __setitem__ + src_slab_offsets_v = chunks[:0, 0] # dummy; ignored + elif src_slab_idx == 0: # fill_value + src_slab_offsets_v = np.zeros(nchunks, dtype=np_hsize_t) + elif src_slab_idx == "chunks": + src_slab_offsets_v = chunks[:, ndim + 1] + else: # pragma: nocover + raise ValueError(f"Invalid {src_slab_idx=}") + + if dst_slab_idx is None: # __getitem__ + assert src_slab_idx is not None + dst_slab_offsets_v = chunks[:0, 0] # dummy; ignored + elif isinstance(dst_slab_idx, int): # new slab + assert dst_slab_idx > 0 + dst_slab_offsets_v = np.arange( + 0, nchunks * chunk_size, chunk_size, dtype=np_hsize_t + ) + elif dst_slab_idx == "chunks": + assert src_slab_idx != "chunks" + dst_slab_offsets_v = chunks[:, ndim + 1] + else: # pragma: nocover + raise ValueError(f"Invalid {dst_slab_idx=}") + + split_indices: ssize_t[:] + if src_slab_idx == "chunks" or dst_slab_idx == "chunks": + # Either multiple source slabs and single destination slab, or vice versa + # Note that _chunks_in_selection returns chunks sorted by slab_idx. + split_indices = np.flatnonzero(np.diff(chunks[:, ndim])) + 1 + else: + # Single source and destination slabs + split_indices = np.empty(0, dtype=np.intp) + + nsplits = len(split_indices) + for i in range(nsplits + 1): + start = 0 if i == 0 else split_indices[i - 1] + stop = nchunks if i == nsplits else split_indices[i] + + slab_idx_i = int(chunks[start, ndim]) + src_slab_idx_i = slab_idx_i if src_slab_idx == "chunks" else src_slab_idx + dst_slab_idx_i = slab_idx_i if dst_slab_idx == "chunks" else dst_slab_idx + chunk_idx_group = chunks[start:stop, :ndim] + src_slab_offsets_group = src_slab_offsets_v[start:stop] + dst_slab_offsets_group = dst_slab_offsets_v[start:stop] + + yield TransferPlan( + mappers, + src_slab_idx=src_slab_idx_i, + dst_slab_idx=dst_slab_idx_i, + chunk_idxidx=chunk_idx_group, + src_slab_offsets=src_slab_offsets_group, + dst_slab_offsets=dst_slab_offsets_group, + slab_indices=slab_indices, + slab_offsets=slab_offsets, + ) diff --git a/versioned_hdf5/staged_changes.pyx b/versioned_hdf5/staged_changes.pyx new file mode 120000 index 00000000..cecf5030 --- /dev/null +++ b/versioned_hdf5/staged_changes.pyx @@ -0,0 +1 @@ +staged_changes.py \ No newline at end of file diff --git a/versioned_hdf5/tests/test_staged_changes.py b/versioned_hdf5/tests/test_staged_changes.py new file mode 100644 index 00000000..869fef6b --- /dev/null +++ b/versioned_hdf5/tests/test_staged_changes.py @@ -0,0 +1,705 @@ +from __future__ import annotations + +import time +from collections.abc import Mapping +from typing import Any, Literal + +import hypothesis +import numpy as np +import pytest +from hypothesis import given +from hypothesis import strategies as st +from numpy.testing import assert_array_equal + +from ..staged_changes import StagedChangesArray +from .test_subchunk_map import idx_st, shape_chunks_st + +max_examples = 10_000 + +NP_GE_200 = np.lib.NumpyVersion(np.__version__) >= "2.0.0" + + +def action_st(shape: tuple[int, ...], max_size: int = 20): + resize_st = st.tuples(*(st.integers(0, max_size) for _ in shape)) + return st.one_of( + st.tuples(st.just("getitem"), idx_st(shape)), + st.tuples(st.just("setitem"), idx_st(shape)), + st.tuples(st.just("resize"), resize_st), + ) + + +@st.composite +def staged_array_st( + draw, max_ndim: int = 4, max_size: int = 20, max_actions: int = 6 +) -> tuple[ + Literal["full", "from_array"], # how to create the base array + tuple[int, ...], # initial shape + tuple[int, ...], # chunk size + list[tuple[Literal["getitem", "setitem", "resize"], Any]], # 0 or more actions +]: + base, (shape, chunks), n_actions = draw( + st.tuples( + st.one_of(st.just("full"), st.just("from_array")), + shape_chunks_st(max_ndim=max_ndim, min_size=0, max_size=max_size), + st.integers(0, max_actions), + ) + ) + + orig_shape = shape + actions = [] + for _ in range(n_actions): + label, arg = draw(action_st(shape, max_size=max_size)) + actions.append((label, arg)) + if label == "resize": + shape = arg + + return base, orig_shape, chunks, actions + + +@pytest.mark.slow +@given(staged_array_st()) +@hypothesis.settings(max_examples=max_examples, deadline=None) +def test_staged_array(args): + base, shape, chunks, actions = args + + rng = np.random.default_rng(0) + fill_value = 42 + + if base == "full": + base = np.full(shape, fill_value, dtype="u4") + arr = StagedChangesArray.full(shape, chunks, fill_value, dtype=base.dtype) + assert arr.n_base_slabs == 0 + elif base == "from_array": + base = rng.integers(2**32, size=shape, dtype="u4") + # copy base to detect bugs where the StagedChangesArray + # accidentally writes back to the base slabs + arr = StagedChangesArray.from_array(base.copy(), chunks, fill_value) + if base.size == 0: + assert arr.n_base_slabs == 0 + else: + assert arr.n_base_slabs > 0 + else: + raise AssertionError("unreachable") + + assert arr.dtype == base.dtype + assert arr.fill_value.shape == () + assert arr.fill_value == fill_value + assert arr.fill_value.dtype == base.dtype + assert arr.slabs[0].dtype == base.dtype + assert arr.slabs[0].shape == chunks + assert arr.itemsize == 4 + assert arr.size == np.prod(shape) + assert arr.nbytes == np.prod(shape) * 4 + assert arr.shape == shape + assert len(arr) == shape[0] + assert arr.ndim == len(shape) + assert arr.chunk_size == chunks + assert len(arr.n_chunks) == len(shape) + for n, s, c in zip(arr.n_chunks, shape, chunks): + assert n == s // c + (s % c > 0) + assert arr.n_staged_slabs == 0 + assert arr.n_base_slabs + 1 == arr.n_slabs == len(arr.slabs) + assert not arr.has_changes + + expect = base.copy() + + for label, arg in actions: + if label == "getitem": + assert_array_equal(arr[arg], expect[arg], strict=True) + + elif label == "setitem": + value = rng.integers(2**32, size=expect[arg].shape, dtype="u4") + expect[arg] = value + arr[arg] = value + assert_array_equal(arr, expect, strict=True) + + elif label == "resize": + # Can't use np.resize(), which works differently as it reflows the data + new = np.full(arg, fill_value, dtype="u4") + common_idx = tuple(slice(min(o, n)) for o, n in zip(arr.shape, arg)) + new[common_idx] = expect[common_idx] + expect = new + arr.resize(arg) + assert arr.shape == arg + assert_array_equal(arr, expect, strict=True) + + else: + raise AssertionError("unreachable") + + # Test has_changes property. + # Edge cases would be complicated to test: + # - a __setitem__ with empty index or a resize with the same shape won't + # flip has_changes + # - a round-trip where an array is created empty, resized, filled, wiped by + # resize(0), and then # resized to the original shape will have has_changes=True + # even if identical to the original. + if expect.shape != base.shape or (expect != base).any(): + assert arr.has_changes + elif all(label == "getitem" for label, _ in actions): + assert not arr.has_changes + + # One final __getitem__ of everything + assert_array_equal(arr, expect, strict=True) + + # Reconstruct the array starting from the base + changes + final = np.full(expect.shape, fill_value, dtype=base.dtype) + shapes = [base.shape] + [arg for label, arg in actions if label == "resize"] + common_idx = tuple(slice(min(sizes)) for sizes in zip(*shapes)) + final[common_idx] = base[common_idx] + for value_idx, _, chunk in arr.changes(): + if not isinstance(chunk, tuple): + assert chunk.dtype == base.dtype + final[value_idx] = chunk + assert_array_equal(final, expect, strict=True) + + # Test __iter__ + assert_array_equal(list(arr), list(expect), strict=True) + + +class MyArray(Mapping): + """A minimal numpy-like read-only array""" + + def __init__(self, arr): + self.arr = np.asarray(arr) + + @property + def shape(self): + return self.arr.shape + + @property + def ndim(self): + return self.arr.ndim + + @property + def itemsize(self): + return self.arr.itemsize + + @property + def dtype(self): + return self.arr.dtype + + def astype(self, dtype): + return MyArray(self.arr.astype(dtype)) + + def __array__(self, dtype=None, copy=None): + kwargs = {"copy": copy} if copy is not None else {} # Requires numpy >=2 + return np.asarray(self.arr, dtype=dtype, **kwargs) + + def __getitem__(self, idx): + return MyArray(self.arr[idx]) + + def __iter__(self): + return (MyArray(row) for row in self.arr) + + def __len__(self): + return len(self.arr) + + +def test_array_like_setitem(): + arr = StagedChangesArray.full((3, 3), (3, 1), dtype="f4") + arr[:2, :2] = MyArray([[1, 2], [3, 4]]) + assert all(isinstance(slab, np.ndarray) for slab in arr.slabs) + assert_array_equal(arr, np.array([[1, 2, 0], [3, 4, 0], [0, 0, 0]], dtype="f4")) + + +def test_array_like_from_array(): + orig = MyArray(np.arange(9).reshape(3, 3)) + arr = StagedChangesArray.from_array(orig, (2, 2)) + assert isinstance(arr.full_slab, np.ndarray) + assert arr.n_base_slabs == 2 + + # Because MyArray supports views (see MyArray.__getitem__), then the base slabs + # must be views of the original array. + for slab in arr.base_slabs: + assert isinstance(slab, MyArray) + assert slab.arr.base is orig.arr.base + + assert_array_equal(arr, orig.arr, strict=True) + + +def test_array_like_from_slabs(): + base_slab = MyArray(np.arange(9).reshape(3, 3)) + arr = StagedChangesArray( + shape=(3, 3), + chunk_size=(2, 3), + base_slabs=[base_slab], + slab_indices=[[1], [1]], + slab_offsets=[[0], [2]], + ) + assert arr.slabs[1] is base_slab + assert_array_equal(arr, base_slab.arr, strict=True) + + +def test_asarray(): + arr = StagedChangesArray.full((2, 2), (2, 2)) + assert isinstance(np.asarray(arr), np.ndarray) + assert_array_equal(np.asarray(arr), arr) + + assert isinstance(np.asarray(arr, dtype="i1"), np.ndarray) + assert np.asarray(arr, dtype="i1").dtype == "i1" + + if NP_GE_200: + assert isinstance(np.asarray(arr, copy=True), np.ndarray) + with pytest.raises(ValueError): + np.asarray(arr, copy=False) + + +def test_load(): + arr = StagedChangesArray.from_array(np.arange(4).reshape(2, 2), (2, 1), 42) + arr.resize((3, 2)) + assert len(arr.slabs) == 3 + assert_array_equal(arr.slab_indices, [[1, 2], [0, 0]]) + assert_array_equal(arr, np.array([[0, 1], [2, 3], [42, 42]])) + + arr.load() + assert len(arr.slabs) == 4 + assert_array_equal(arr.slab_indices, [[3, 3], [0, 0]]) + assert_array_equal(arr, np.array([[0, 1], [2, 3], [42, 42]])) + + # Base slabs were dereferenced; full slab was not + assert_array_equal(arr.slabs[0], [[42], [42]]) + assert arr.slabs[1] is None + assert arr.slabs[2] is None + + # No-op + arr.load() + assert len(arr.slabs) == 4 # Didn't append a new slab with size 0 + assert_array_equal(arr.slab_indices, [[3, 3], [0, 0]]) + assert_array_equal(arr, np.array([[0, 1], [2, 3], [42, 42]])) + + # Edge case where __setitem__ stops using a base slab but doesn't dereference it for + # performance reasons, so there are no transfers to do but you still should drop + # the base slab + arr = StagedChangesArray.from_array(np.arange(2), (2,)) + arr[0] = 2 + assert_array_equal(arr.slab_indices, [2]) + assert arr.slabs[1] is not None + arr.load() + assert_array_equal(arr.slab_indices, [2]) + assert len(arr.slabs) == 3 # Didn't append a new slab with size 0 + assert arr.slabs[1] is None + + +def test_shrinking_dereferences_slabs(): + arr = StagedChangesArray.from_array([[1, 2, 3, 4, 5, 6, 7]], (1, 2), fill_value=42) + arr[0, -1] = 8 + arr.resize((1, 10)) + assert_array_equal(arr.slab_indices, [[1, 2, 3, 5, 0]]) + assert_array_equal(arr, [[1, 2, 3, 4, 5, 6, 8, 42, 42, 42]]) + arr.resize((1, 3)) + assert_array_equal(arr.slab_indices, [[1, 2]]) + assert arr.slabs[3:] == [None, None, None] + assert_array_equal(arr, [[1, 2, 3]]) + + arr.resize((0, 0)) + assert_array_equal(arr.slab_indices, np.empty((0, 0))) + # The base slab is never dereferenced + assert_array_equal(arr.slabs[0], [[42, 42]]) + assert arr.slabs[1:] == [None, None, None, None, None] + + +def test_copy(): + a = StagedChangesArray.from_array([[1, 2]], chunk_size=(1, 1), fill_value=42) + a.resize((1, 3)) + a[0, 1] = 4 + assert_array_equal(a, [[1, 4, 42]]) + assert_array_equal(a.slab_indices, [[1, 3, 0]]) + + b = a.copy(deep=True) + assert b.slabs[0] is a.slabs[0] # full slab is shared + assert b.slabs[1] is a.slabs[1] # base slabs are shared + # staged slabs are deep-copied + b[0, 1] = 5 + assert_array_equal(b.slab_indices, [[1, 3, 0]]) + assert_array_equal(a, [[1, 4, 42]]) + assert_array_equal(b, [[1, 5, 42]]) + + c = a.copy(deep=False) + assert b.slabs[0] is a.slabs[0] # full slab is shared + assert c.slabs[1] is a.slabs[1] # base slabs are shared + # staged slabs are turned into read-only views + assert c.slabs[2].base is a.slabs[2].base + assert not c.slabs[2].flags.writeable + with pytest.raises(ValueError): + c[0, 1] = 5 + + +def test_astype(): + a = StagedChangesArray.full((2, 2), (2, 2), dtype="i1") + a[0, 0] = 1 + + # astype() with no type change deep-copies + b = a.astype("i1") + b[0, 0] = 2 + b.refill(123) + assert a[0, 0] == 1 + assert a[0, 1] == 0 + + # Create array with base slabs, full chunks, and staged slabs + a = StagedChangesArray.from_array( + np.asarray([[1, 2, 3]], dtype="f4"), chunk_size=(1, 1) + ) + a.resize((1, 2)) # dereference a slab + a.resize((1, 3)) + a[0, 1] = 4 + assert a.n_base_slabs == 3 + assert_array_equal(a.slab_indices, [[1, 4, 0]]) + + b = a.astype("i2") + + assert_array_equal(a.slab_indices, [[1, 4, 0]]) + # all slabs have been loaded + assert_array_equal(b.slab_indices, [[5, 4, 0]]) + + assert a.dtype == "f4" + for slab in a.slabs: + assert slab is None or slab.dtype == "f4" + + assert b.dtype == "i2" + for slab in b.slabs: + assert slab is None or slab.dtype == "i2" + + assert_array_equal(a, np.array([[1, 4, 0]], dtype="f4"), strict=True) + assert_array_equal(b, np.array([[1, 4, 0]], dtype="i2"), strict=True) + + +def test_refill(): + # Actual base slabs can entirely or partially contain the fill_value + a = StagedChangesArray.from_array([[1], [42]], chunk_size=(2, 1), fill_value=42) + a.resize((2, 3)) # Create full chunks full of 42 + a[0, 2] = 2 # Create staged slab with a non-42 and a 42 + assert_array_equal(a, [[1, 42, 2], [42, 42, 42]]) + assert_array_equal(a.slab_indices, [[1, 0, 2]]) + + b = a.refill(99) + # a is unchanged + assert_array_equal(a, [[1, 42, 2], [42, 42, 42]]) + assert_array_equal(a.slab_indices, [[1, 0, 2]]) + + # full slabs -> still full slabs, but slabs[0] has changed value + # base slabs -> staged slabs with 42 points replaced with 99 + # staged slabs -> same slab index, but 42 points have been replaced with 99 + assert_array_equal(b, [[1, 99, 2], [99, 99, 99]]) + assert_array_equal(b.slab_indices, [[3, 0, 2]]) + + +def test_big_O_performance(): + """Test that __getitem__ and __setitem__ performance is + O(selected number of chunks) and not O(total number of chunks). + + Note: resize() is, by necessity, O(total number of chunks in the array) when + adding/removing chunks and O(total number of chunks along resized axis) when + enlarging the edge chunks without changing the number of chunks. + """ + + def benchmark(shape): + arr = StagedChangesArray.full(shape, (1, 2)) + # Don't measure page faults on first access to slab_indices and slab_offsets. + # In the 10 chunks use case, it's almost certainly reused memory. + _ = arr[0, -3:] + + t0 = time.thread_time() + # Let's access the end, just in case there's something that + # performs a full scan which stops when the selection ends. + + # Update only part of a chunk. This triggers a whole + # extra section worth of logic in __setitem__. + arr[0, -1] = 42 + assert arr[0, -1] == 42 + assert arr[0, -2] == 0 + t1 = time.thread_time() + return t1 - t0 + + # trivially sized baseline: 5 chunks + a = benchmark((1, 10)) + + # 5 million chunks, small rulers + # Test will trip if __getitem__ or __setitem__ perform a + # full scan of slab_indices and/or slab_offsets anywhere + b = benchmark((2_500, 4_000)) + np.testing.assert_allclose(b, a, rtol=0.2) + + # 5 million chunks, long rulers + # Test will trip if __getitem__ or __setitem__ construct or iterate upon a ruler as + # long as the number of chunks along one axis, e.g. np.arange(mapper.n_chunks) + c = benchmark((1, 10_000_000)) + np.testing.assert_allclose(c, a, rtol=0.2) + + +def test_dont_load_wholly_selected_chunks(): + """Test that __setitem__ loads from the base slabs only the chunks that are + partially selected by the index + """ + a = StagedChangesArray.from_array([[1, 2], [3, 4]], chunk_size=(2, 1)) + assert a.n_slabs == 3 # [full, base column 0, base column 1] + + # Selection wholly covers chunk (0, 0) + plan = a._setitem_plan((slice(None), 0)) + assert len(plan.transfers) == 1 + assert plan.transfers[0].src_slab_idx is None # __setitem__ parameter + assert plan.transfers[0].dst_slab_idx == 3 # new slab + + # Selection partially covers chunk (0, 0) + plan = a._setitem_plan((0, 0)) + assert len(plan.transfers) == 2 + assert plan.transfers[0].src_slab_idx == 1 + assert plan.transfers[0].dst_slab_idx == 3 + assert plan.transfers[1].src_slab_idx is None + assert plan.transfers[1].dst_slab_idx == 3 + + +def test_weird_dtypes(): + a = StagedChangesArray.from_array( + np.array(["aaa", "bbb", "ccc"], dtype="U3"), + chunk_size=(2,), + fill_value="zzz", + ) + assert a.dtype == "U3" + a[0] = "ddd" + a.resize((5,)) + assert_array_equal(a, ["ddd", "bbb", "ccc", "zzz", "zzz"], strict=True) + for slab in a.slabs: + assert slab.dtype == "U3" + + a = StagedChangesArray.full((3,), (2,), fill_value="zzz", dtype="U3") + assert a.dtype == "U3" + a[0] = "aaa" + assert_array_equal(a, ["aaa", "zzz", "zzz"], strict=True) + for slab in a.slabs: + assert slab.dtype == "U3" + + a = StagedChangesArray.full((1,), (1,), dtype="U3") + assert a.dtype == "U3" + assert_array_equal(a, np.array([""], dtype="U3"), strict=True) + + a = StagedChangesArray.full((1,), (1,), dtype="|V3") + assert a.dtype == "|V3" + assert_array_equal(a, np.array([b""], dtype="|V3"), strict=True) + + # Test edge case where we need to generate a fill_value, but + # np.array(0, dtype=dtype) would crash + a = StagedChangesArray( + shape=(1,), + chunk_size=(1,), + base_slabs=[np.array([b"123"], dtype="|V3")], + slab_indices=[1], + slab_offsets=[0], + ) + a.resize((2,)) + assert a.dtype == "|V3" + assert_array_equal(a, np.array([b"123", b""], dtype="|V3"), strict=True) + + +def test_dtype_priority_init(): + """In StagedChangesArray.__init__, dtype from base slabs + takes precedence over the one from fill_value + """ + a = StagedChangesArray( + shape=(1,), + chunk_size=(1,), + base_slabs=[np.array([1], dtype="u1")], + slab_indices=[1], + slab_offsets=[0], + fill_value=np.float32(3.5), + ) + assert a.dtype == "u1" + assert a.fill_value == 3 + assert_array_equal(a, np.array([1], dtype="u1"), strict=True) + + +def test_dtype_priority_from_array(): + """In StagedChangesArray.from_array, dtype from base array takes + precedence over the one from fill_value + """ + a = StagedChangesArray.from_array( + np.array([1], dtype="u1"), + chunk_size=(1,), + fill_value=np.float32(3.5), + ) + assert a.dtype == "u1" + assert a.fill_value == 3 + assert_array_equal(a, np.array([1], dtype="u1"), strict=True) + + +def test_dtype_priority_full(): + """In StagedChangesArray.full, explicitly declared dtype takes precedence over the + one from fill_value. However, if omitted, always respect fill_value's dtype. + """ + a = StagedChangesArray.full( + shape=(1,), + chunk_size=(1,), + fill_value=np.float32(3.5), + dtype="u1", + ) + assert a.dtype == "u1" + assert a.fill_value == 3 + assert_array_equal(a, np.array([3], dtype="u1"), strict=True) + + a = StagedChangesArray.full( + shape=(1,), + chunk_size=(1,), + fill_value=np.uint16(3), + ) + assert a.dtype == "u2" + assert a.fill_value == 3 + assert_array_equal(a, np.array([3], dtype="u2"), strict=True) + + +def test_setitem_broadcast(): + a = StagedChangesArray.full((2, 2), (2, 2), fill_value=42) + a[()] = 1 + assert_array_equal(a, [[1, 1], [1, 1]]) + a[0] = [2, 3] + assert_array_equal(a, [[2, 3], [1, 1]]) + + +def test_repr(): + a = StagedChangesArray.from_array( + np.array([[1, 2], [3, 4]], dtype="u4"), (2, 1), fill_value=42 + ) + a.resize((2, 3)) + a[0, 0] = 5 + r = repr(a) + assert "shape=(2, 3)" in r, r + assert "chunk_size=(2, 1)" in r, r + assert "dtype=uint32" in r, r + assert "fill_value=42" in r, r + assert "2 base slabs" in r, r + assert "1 staged slab" in r, r + assert "[[3 2 0]]" in r, r # slab_indices + assert "[[0 0 0]]" in r, r # slab_offsets + + r = repr(a._getitem_plan((0, 0))) + assert "1 slice transfer" in r, r + assert "out[0:1, 0:1] = slabs[3][0:1, 0:1]" in r, r + + r = repr(a._setitem_plan((0, 2))) + assert "append 1 empty slab" in r, r + assert "2 slice transfers among 2 slab pairs" in r, r + assert "slabs.append(np.empty((2, 1))) # slabs[4]" in r, r + assert "slabs[4][0:2, 0:1] = slabs[0][0:2, 0:1]" in r, r + assert "slabs[4][0:1, 0:1] = value[0:1, 0:1]" in r, r + assert "[[3 2 4]]" in r, r # updated slab_indices + + a = StagedChangesArray.from_array( + np.array([[1, 2], [3, 4]], dtype="u4"), (1, 4), fill_value=42 + ) + r = repr(a._resize_plan((1, 5))) + assert "2 slice transfers among 2 slab pairs" in r, r + assert "drop 1 slabs" in r, r + assert "slabs[2][0:1, 0:2] = slabs[1][0:1, 0:2]" in r, r + + r = repr(a._load_plan()) + assert "append 1 empty slab" in r, r + assert "2 slice transfers among 1 slab pair" in r, r + assert "drop 1 slab" in r, r + assert "slabs.append(np.empty((2, 4))) # slabs[2]" in r, r + assert "slabs[2][0:1, 0:2] = slabs[1][0:1, 0:2]" in r, r + assert "slabs[1] = None" in r, r + + a[0, 0] = 5 + r = repr(a._changes_plan()) + assert "2 chunks" in r, r + assert "base[0:1, 0:2] = slabs[2][0:1, 0:2]" in r, r + assert "base[1:2, 0:2] = slabs[1][1:2, 0:2]" in r, r + + +def test_invalid_parameters(): + with pytest.raises(ValueError, match="chunk_size"): + StagedChangesArray( + shape=(1, 2), + chunk_size=(2,), + base_slabs=[], + slab_indices=[0], + slab_offsets=[0], + ) + with pytest.raises(ValueError, match="shape"): + StagedChangesArray( + shape=(-1,), + chunk_size=(2,), + base_slabs=[], + slab_indices=[0], + slab_offsets=[0], + ) + with pytest.raises(ValueError, match="shape"): + StagedChangesArray.full((-1,), (2,)) + with pytest.raises(ValueError, match="chunk_size"): + StagedChangesArray( + shape=(2,), + chunk_size=(0,), + base_slabs=[], + slab_indices=[0], + slab_offsets=[0], + ) + with pytest.raises(ValueError, match="chunk_size"): + StagedChangesArray.full((2,), (0,)) + with pytest.raises(ValueError, match="fill_value"): + StagedChangesArray( + shape=(2,), + chunk_size=(2,), + base_slabs=[], + slab_indices=[0], + slab_offsets=[0], + fill_value=np.zeros(5), + ) + with pytest.raises(ValueError, match="dtype"): + StagedChangesArray( + shape=(2,), + chunk_size=(1,), + base_slabs=[np.array([0], dtype="i1"), np.array([0], dtype="u1")], + slab_indices=[0, 0], + slab_offsets=[0, 0], + ) + with pytest.raises(ValueError, match="dimensionality"): + StagedChangesArray( + shape=(2,), + chunk_size=(2,), + base_slabs=[np.array([[0]])], + slab_indices=[0], + slab_offsets=[0], + ) + with pytest.raises(ValueError, match="slab_indices"): + StagedChangesArray( + shape=(2,), + chunk_size=(2,), + base_slabs=[], + slab_indices=[[0]], + slab_offsets=[0], + ) + with pytest.raises(ValueError, match="slab_indices"): + StagedChangesArray( + shape=(2,), + chunk_size=(2,), + base_slabs=[], + slab_indices=[0, 0], + slab_offsets=[0], + ) + with pytest.raises(ValueError, match="slab_offsets"): + StagedChangesArray( + shape=(2,), + chunk_size=(2,), + base_slabs=[], + slab_indices=[0], + slab_offsets=[[0]], + ) + with pytest.raises(ValueError, match="slab_offsets"): + StagedChangesArray( + shape=(2,), + chunk_size=(2,), + base_slabs=[], + slab_indices=[0], + slab_offsets=[0, 0], + ) + + a = StagedChangesArray.full((2,), (2,)) + with pytest.raises(ValueError): + del a[0] + with pytest.raises(ValueError, match="scalar"): + a.refill(np.zeros(2)) + with pytest.raises(ValueError, match="dimensionality"): + a.resize((2, 2)) + with pytest.raises(ValueError, match="non-negative"): + a.resize((-1,)) + if NP_GE_200: + with pytest.raises(ValueError): + np.asarray(a, copy=False)