Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[InMemoryDataset redesign] Legacy engine cleanup #395

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion versioned_hdf5/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ py.extension_module(
subdir: 'versioned_hdf5',
dependencies: compiled_deps,
include_directories: [npy_include_path],
override_options : ['cython_language=cpp'],
)

cython_args = [
Expand Down
96 changes: 0 additions & 96 deletions versioned_hdf5/slicetools.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# distutils: language=c++
import sys
from functools import lru_cache

Expand All @@ -18,7 +17,6 @@ from numpy.typing import ArrayLike

from libc.stddef cimport ptrdiff_t, size_t
from libc.stdio cimport FILE, fclose
from libcpp.vector cimport vector

from versioned_hdf5.cytools import np_hsize_t
from versioned_hdf5.cytools cimport ceil_a_over_b, count2stop, hsize_t, stop2count
Expand Down Expand Up @@ -145,100 +143,6 @@ def hyperslab_to_slice(start, stride, count, block):
return Slice(start, end, stride)


@cython.infer_types(True)
cdef _spaceid_to_slice(space_id: hid_t):
"""
Helper function to read the data for `space_id` selection and
convert it to a Tuple of slices.
"""
sel_type = H5Sget_select_type(space_id)

if sel_type == H5S_sel_type.H5S_SEL_ALL:
return Tuple()
elif sel_type == H5S_sel_type.H5S_SEL_HYPERSLABS:
slices = []

rank = H5Sget_simple_extent_ndims(space_id)
if rank < 0:
raise HDF5Error()
start_array = vector[hsize_t](rank)
stride_array = vector[hsize_t](rank)
count_array = vector[hsize_t](rank)
block_array = vector[hsize_t](rank)

if H5Sget_regular_hyperslab(
space_id,
start_array.data(),
stride_array.data(),
count_array.data(),
block_array.data(),
) < 0:
raise HDF5Error()

for i in range(rank):
start = start_array[i]
stride = stride_array[i]
count = count_array[i]
block = block_array[i]
if not (block == 1 or count == 1):
raise NotImplementedError("Nontrivial blocks are not yet supported")
end = start + (stride * (count - 1) + 1) * block
stride = stride if block == 1 else 1
slices.append(Slice(start, end, stride))

return Tuple(*slices)
elif sel_type == H5S_sel_type.H5S_SEL_NONE:
return Tuple(Slice(0, 0))
else:
raise NotImplementedError("Point selections are not yet supported")


@cython.infer_types(True)
cpdef build_data_dict(dcpl, raw_data_name: str):
"""
Function to build the "data_dict" of a versioned virtual dataset.

All virtual datasets created by versioned-hdf5 should have chunks in
exactly one raw dataset `raw_data_name` in the same file.
This function blindly assumes this is the case.

:param dcpl: the dataset creation property list of the versioned dataset
:param raw_data_name: the name of the corresponding raw dataset
:return: a dictionary mapping the `Tuple` of the virtual dataset chunk
to a `Slice` in the raw dataset.
"""
data_dict = {}

with phil:
dcpl_id: hid_t = dcpl.id
virtual_count: size_t = dcpl.get_virtual_count()

for j in range(virtual_count):
vspace_id = H5Pget_virtual_vspace(dcpl_id, j)
if vspace_id == H5I_INVALID_HID:
raise HDF5Error()
try:
vspace_slice_tuple = _spaceid_to_slice(vspace_id)
finally:
if H5Sclose(vspace_id) < 0:
raise HDF5Error()

srcspace_id = H5Pget_virtual_srcspace(dcpl_id, j)
if srcspace_id == H5I_INVALID_HID:
raise HDF5Error()
try:
srcspace_slice_tuple = _spaceid_to_slice(srcspace_id)
finally:
if H5Sclose(srcspace_id) < 0:
raise HDF5Error()

# the slice into the raw_data (srcspace_slice_tuple) is only
# on the first axis
data_dict[vspace_slice_tuple] = srcspace_slice_tuple.args[0]

return data_dict


@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
Expand Down
1 change: 0 additions & 1 deletion versioned_hdf5/subchunk_map.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ cdef class IndexChunkMapper:
cdef readonly hsize_t n_chunks
cdef readonly hsize_t last_chunk_size

cpdef tuple[object, object, object] chunk_submap(self, hsize_t chunk_idx)
cpdef tuple[object, object | None] read_many_slices_params(self)

cpdef object chunks_indexer(self)
Expand Down
174 changes: 2 additions & 172 deletions versioned_hdf5/subchunk_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,15 @@

import abc
import enum
import itertools
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

import cython
import numpy as np
from cython import bint, ssize_t
from ndindex import ChunkSize, Slice, Tuple, ndindex
from ndindex import ChunkSize, Tuple, ndindex
from numpy.typing import NDArray

from .cytools import (
ceil_a_over_b,
count2stop,
np_hsize_t,
smallest_step_after,
stop2count,
)
from .cytools import ceil_a_over_b, np_hsize_t, smallest_step_after, stop2count
from .tools import asarray

if TYPE_CHECKING: # pragma: nocover
Expand All @@ -48,15 +40,6 @@
)


class DropAxis(enum.Enum):
_drop_axis = 0


# Returned instead of an AnySlicer. Signals that the axis should be removed when
# aggregated into an AnySlicerND.
DROP_AXIS = DropAxis._drop_axis


# This is an abstract class, which should inherit from abc.ABC
# or have metaclass=abc.ABCMeta. Neither are supported by Cython though.
@cython.cclass
Expand Down Expand Up @@ -104,29 +87,6 @@ def _chunk_start_stop(self, chunk_idx: hsize_t) -> tuple[hsize_t, hsize_t]:
stop = min(start + self.chunk_size, self.dset_size)
return start, stop

@cython.ccall
@abc.abstractmethod
def chunk_submap(
self, chunk_idx: hsize_t
) -> tuple[Slice, AnySlicer | DropAxis, AnySlicer]:
"""Given a chunk index, return a tuple of

data_dict key
key of the data_dict (see build_data_dict())
value_subidx
the slicer selecting the points within the sliced array
(the return value for __getitem__, the value parameter for __setitem__)
chunk_subidx
the slicer selecting the points within the input chunks.

In other words, in the simplified one-dimensional case:

_, value_subidx, chunk_subidx = mapper.chunk_submap(i)
chunk_view = base_arr[chunk_idx * chunk_size:(chunk_idx + 1) * chunk_size]
return_value[value_subidx] = chunk_view[chunk_subidx] # __getitem__
chunk_view[chunk_subidx] = value_param[value_subidx] # __setitem__
"""

@cython.ccall
@abc.abstractmethod
def read_many_slices_params(
Expand Down Expand Up @@ -362,31 +322,6 @@ def __init__(

super().__init__(chunk_indices, dset_size, chunk_size)

@cython.ccall
def chunk_submap(self, chunk_idx: hsize_t) -> tuple[Slice, slice, slice]:
chunk_start, chunk_stop = self._chunk_start_stop(chunk_idx)
sel_start = self.start
sel_stop = self.stop
sel_step = self.step

abs_start = max(chunk_start, sel_start)
# Get the smallest lcm multiple of common that is >= start
abs_start = smallest_step_after(abs_start, sel_start % sel_step, sel_step)

# shift start so that it is relative to index
value_sub_start = (abs_start - sel_start) // sel_step
value_sub_stop = ceil_a_over_b(min(chunk_stop, sel_stop) - sel_start, sel_step)

chunk_sub_start = abs_start - chunk_start
count = value_sub_stop - value_sub_start
chunk_sub_stop = count2stop(chunk_sub_start, count, sel_step)

return (
Slice(chunk_start, chunk_stop, 1),
slice(value_sub_start, value_sub_stop, 1),
slice(chunk_sub_start, chunk_sub_stop, sel_step),
)

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
Expand Down Expand Up @@ -465,12 +400,6 @@ def __init__(self, idx: hsize_t, dset_size: hsize_t, chunk_size: hsize_t):
chunk_indices = np.array([idx // chunk_size], dtype=np_hsize_t)
super().__init__(chunk_indices, dset_size, chunk_size)

@cython.ccall
def chunk_submap(self, chunk_idx: hsize_t) -> tuple[Slice, DropAxis, int]:
chunk_start, chunk_stop = self._chunk_start_stop(chunk_idx)
chunk_sub_idx = self.idx - chunk_start
return Slice(chunk_start, chunk_stop, 1), DROP_AXIS, chunk_sub_idx

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
Expand Down Expand Up @@ -527,14 +456,6 @@ def __init__(self, other: IndexChunkMapper):
self._chunks_indexer = other.chunks_indexer()
super().__init__(other.chunk_indices, other.dset_size, other.chunk_size)

@cython.ccall
def chunk_submap(
self, chunk_idx: hsize_t
) -> tuple[Slice, AnySlicer | DropAxis, AnySlicer]:
raise NotImplementedError( # pragma: nocover
"not used in legacy as_subchunk_map"
)

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
Expand Down Expand Up @@ -589,28 +510,6 @@ def __init__(
chunk_indices = np.unique(idx // chunk_size)
super().__init__(chunk_indices, dset_size, chunk_size)

@cython.ccall
def chunk_submap(
self, chunk_idx: hsize_t
) -> tuple[Slice, NDArray[np_hsize_t] | slice, NDArray[np_hsize_t] | slice]:
chunk_start, chunk_stop = self._chunk_start_stop(chunk_idx)

if self.is_ascending:
# O(n*logn)
start_idx, stop_idx = np.searchsorted(self.idx, [chunk_start, chunk_stop])
mask = slice(int(start_idx), int(stop_idx), 1)
# TODO optimize monotonic descending
else:
# O(n^2)
mask = (chunk_start <= self.idx) & (self.idx < chunk_stop)
mask = _maybe_array_idx_to_slice(mask)

return (
Slice(chunk_start, chunk_stop, 1),
mask,
_maybe_array_idx_to_slice(self.idx[mask] - chunk_start),
)

@cython.ccall
def read_many_slices_params(
self,
Expand Down Expand Up @@ -1206,72 +1105,3 @@ def read_many_slices_params_nd(
assert outs == nslices

return ndslices


def as_subchunk_map(
idx: Any,
shape: tuple[int, ...],
chunk_size: tuple[int, ...] | ChunkSize,
) -> Iterator[
tuple[
Tuple,
AnySlicerND,
AnySlicerND,
]
]:
"""Computes the chunk selection assignment. In particular, given a `chunk_size`
it returns triple (chunk_idx, value_sub_idx, chunk_sub_idx) such that for a
chunked Dataset `ds` we can translate selections like

>> value = ds[idx]

into selecting from the individual chunks of `ds` as

>> value = np.empty(output_shape)
>> for chunk_idx, value_sub_idx, chunk_sub_idx in as_subchunk_map(
.. idx, ds.shape, ds.chunk_size
.. ):
.. value[value_sub_idx] = ds.data_dict[chunk_idx][chunk_sub_idx]

Similarly, assignments like

>> ds[idx] = value

can be translated into

>> for chunk_idx, value_sub_idx, chunk_sub_idx in as_subchunk_map(
.. idx, ds.shape, ds.chunk_size
.. ):
.. ds.data_dict[chunk_idx][chunk_sub_idx] = value[value_sub_idx]

:param idx: the "index" to read from / write to the Dataset
:param shape: the shape of the Dataset
:param chunk_size: the `ChunkSize` of the Dataset
:return: a generator of `(chunk_idx, value_sub_idx, chunk_sub_idx)` tuples
"""
idx, mappers = index_chunk_mappers(idx, shape, chunk_size)
if not mappers:
return
idx_len = len(idx.args)

mapper: IndexChunkMapper # noqa: F842
chunk_subindexes = [
[mapper.chunk_submap(chunk_idx) for chunk_idx in mapper.chunk_indices]
for mapper in mappers
]

# Now combine the chunk_slices and subindexes for each dimension into tuples
# across all dimensions.
for p in itertools.product(*chunk_subindexes):
chunk_slices, value_subidxs, chunk_subidxs = zip(*p)

# skip dimensions which were sliced away
value_subidxs = tuple(
value_subidx
for value_subidx in value_subidxs
if value_subidx is not DROP_AXIS
)
# skip suffix dimensions
chunk_subidxs = chunk_subidxs[:idx_len]

yield Tuple(*chunk_slices), value_subidxs, chunk_subidxs
Loading
Loading