Skip to content

Commit

Permalink
[InMemoryDataset redesign] Legacy engine cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 21, 2024
1 parent 63558d8 commit e5a0b0a
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 321 deletions.
1 change: 0 additions & 1 deletion versioned_hdf5/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ py.extension_module(
subdir: 'versioned_hdf5',
dependencies: compiled_deps,
include_directories: [npy_include_path],
override_options : ['cython_language=cpp'],
)

cython_args = [
Expand Down
96 changes: 0 additions & 96 deletions versioned_hdf5/slicetools.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# distutils: language=c++
import sys
from functools import lru_cache

Expand All @@ -18,7 +17,6 @@ from numpy.typing import ArrayLike

from libc.stddef cimport ptrdiff_t, size_t
from libc.stdio cimport FILE, fclose
from libcpp.vector cimport vector

from versioned_hdf5.cytools import np_hsize_t
from versioned_hdf5.cytools cimport ceil_a_over_b, count2stop, hsize_t, stop2count
Expand Down Expand Up @@ -145,100 +143,6 @@ def hyperslab_to_slice(start, stride, count, block):
return Slice(start, end, stride)


@cython.infer_types(True)
cdef _spaceid_to_slice(space_id: hid_t):
"""
Helper function to read the data for `space_id` selection and
convert it to a Tuple of slices.
"""
sel_type = H5Sget_select_type(space_id)

if sel_type == H5S_sel_type.H5S_SEL_ALL:
return Tuple()
elif sel_type == H5S_sel_type.H5S_SEL_HYPERSLABS:
slices = []

rank = H5Sget_simple_extent_ndims(space_id)
if rank < 0:
raise HDF5Error()
start_array = vector[hsize_t](rank)
stride_array = vector[hsize_t](rank)
count_array = vector[hsize_t](rank)
block_array = vector[hsize_t](rank)

if H5Sget_regular_hyperslab(
space_id,
start_array.data(),
stride_array.data(),
count_array.data(),
block_array.data(),
) < 0:
raise HDF5Error()

for i in range(rank):
start = start_array[i]
stride = stride_array[i]
count = count_array[i]
block = block_array[i]
if not (block == 1 or count == 1):
raise NotImplementedError("Nontrivial blocks are not yet supported")
end = start + (stride * (count - 1) + 1) * block
stride = stride if block == 1 else 1
slices.append(Slice(start, end, stride))

return Tuple(*slices)
elif sel_type == H5S_sel_type.H5S_SEL_NONE:
return Tuple(Slice(0, 0))
else:
raise NotImplementedError("Point selections are not yet supported")


@cython.infer_types(True)
cpdef build_data_dict(dcpl, raw_data_name: str):
"""
Function to build the "data_dict" of a versioned virtual dataset.
All virtual datasets created by versioned-hdf5 should have chunks in
exactly one raw dataset `raw_data_name` in the same file.
This function blindly assumes this is the case.
:param dcpl: the dataset creation property list of the versioned dataset
:param raw_data_name: the name of the corresponding raw dataset
:return: a dictionary mapping the `Tuple` of the virtual dataset chunk
to a `Slice` in the raw dataset.
"""
data_dict = {}

with phil:
dcpl_id: hid_t = dcpl.id
virtual_count: size_t = dcpl.get_virtual_count()

for j in range(virtual_count):
vspace_id = H5Pget_virtual_vspace(dcpl_id, j)
if vspace_id == H5I_INVALID_HID:
raise HDF5Error()
try:
vspace_slice_tuple = _spaceid_to_slice(vspace_id)
finally:
if H5Sclose(vspace_id) < 0:
raise HDF5Error()

srcspace_id = H5Pget_virtual_srcspace(dcpl_id, j)
if srcspace_id == H5I_INVALID_HID:
raise HDF5Error()
try:
srcspace_slice_tuple = _spaceid_to_slice(srcspace_id)
finally:
if H5Sclose(srcspace_id) < 0:
raise HDF5Error()

# the slice into the raw_data (srcspace_slice_tuple) is only
# on the first axis
data_dict[vspace_slice_tuple] = srcspace_slice_tuple.args[0]

return data_dict


@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
Expand Down
1 change: 0 additions & 1 deletion versioned_hdf5/subchunk_map.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ cdef class IndexChunkMapper:
cdef readonly hsize_t n_chunks
cdef readonly hsize_t last_chunk_size

cpdef tuple[object, object, object] chunk_submap(self, hsize_t chunk_idx)
cpdef tuple[object, object | None] read_many_slices_params(self)

cpdef object chunks_indexer(self)
Expand Down
174 changes: 2 additions & 172 deletions versioned_hdf5/subchunk_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@

import abc
import enum
import itertools
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

import cython
import numpy as np
from cython import bint, ssize_t
from ndindex import ChunkSize, Slice, Tuple, ndindex
from ndindex import ChunkSize, Tuple, ndindex
from ndindex.ellipsis import ellipsis
from numpy.typing import NDArray

from .cytools import (
ceil_a_over_b,
count2stop,
np_hsize_t,
smallest_step_after,
stop2count,
)
from .cytools import ceil_a_over_b, np_hsize_t, smallest_step_after, stop2count
from .tools import asarray

if TYPE_CHECKING: # pragma: nocover
Expand All @@ -49,15 +41,6 @@
)


class DropAxis(enum.Enum):
_drop_axis = 0


# Returned instead of an AnySlicer. Signals that the axis should be removed when
# aggregated into an AnySlicerND.
DROP_AXIS = DropAxis._drop_axis


# This is an abstract class, which should inherit from abc.ABC
# or have metaclass=abc.ABCMeta. Neither are supported by Cython though.
@cython.cclass
Expand Down Expand Up @@ -105,29 +88,6 @@ def _chunk_start_stop(self, chunk_idx: hsize_t) -> tuple[hsize_t, hsize_t]:
stop = min(start + self.chunk_size, self.dset_size)
return start, stop

@cython.ccall
@abc.abstractmethod
def chunk_submap(
self, chunk_idx: hsize_t
) -> tuple[Slice, AnySlicer | DropAxis, AnySlicer]:
"""Given a chunk index, return a tuple of
data_dict key
key of the data_dict (see build_data_dict())
value_subidx
the slicer selecting the points within the sliced array
(the return value for __getitem__, the value parameter for __setitem__)
chunk_subidx
the slicer selecting the points within the input chunks.
In other words, in the simplified one-dimensional case:
_, value_subidx, chunk_subidx = mapper.chunk_submap(i)
chunk_view = base_arr[chunk_idx * chunk_size:(chunk_idx + 1) * chunk_size]
return_value[value_subidx] = chunk_view[chunk_subidx] # __getitem__
chunk_view[chunk_subidx] = value_param[value_subidx] # __setitem__
"""

@cython.ccall
@abc.abstractmethod
def read_many_slices_params(
Expand Down Expand Up @@ -363,31 +323,6 @@ def __init__(

super().__init__(chunk_indices, dset_size, chunk_size)

@cython.ccall
def chunk_submap(self, chunk_idx: hsize_t) -> tuple[Slice, slice, slice]:
chunk_start, chunk_stop = self._chunk_start_stop(chunk_idx)
sel_start = self.start
sel_stop = self.stop
sel_step = self.step

abs_start = max(chunk_start, sel_start)
# Get the smallest lcm multiple of common that is >= start
abs_start = smallest_step_after(abs_start, sel_start % sel_step, sel_step)

# shift start so that it is relative to index
value_sub_start = (abs_start - sel_start) // sel_step
value_sub_stop = ceil_a_over_b(min(chunk_stop, sel_stop) - sel_start, sel_step)

chunk_sub_start = abs_start - chunk_start
count = value_sub_stop - value_sub_start
chunk_sub_stop = count2stop(chunk_sub_start, count, sel_step)

return (
Slice(chunk_start, chunk_stop, 1),
slice(value_sub_start, value_sub_stop, 1),
slice(chunk_sub_start, chunk_sub_stop, sel_step),
)

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
Expand Down Expand Up @@ -466,12 +401,6 @@ def __init__(self, idx: hsize_t, dset_size: hsize_t, chunk_size: hsize_t):
chunk_indices = np.array([idx // chunk_size], dtype=np_hsize_t)
super().__init__(chunk_indices, dset_size, chunk_size)

@cython.ccall
def chunk_submap(self, chunk_idx: hsize_t) -> tuple[Slice, DropAxis, int]:
chunk_start, chunk_stop = self._chunk_start_stop(chunk_idx)
chunk_sub_idx = self.idx - chunk_start
return Slice(chunk_start, chunk_stop, 1), DROP_AXIS, chunk_sub_idx

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
Expand Down Expand Up @@ -528,14 +457,6 @@ def __init__(self, other: IndexChunkMapper):
self._chunks_indexer = other.chunks_indexer()
super().__init__(other.chunk_indices, other.dset_size, other.chunk_size)

@cython.ccall
def chunk_submap(
self, chunk_idx: hsize_t
) -> tuple[Slice, AnySlicer | DropAxis, AnySlicer]:
raise NotImplementedError( # pragma: nocover
"not used in legacy as_subchunk_map"
)

@cython.cfunc
@cython.nogil
@cython.exceptval(check=False)
Expand Down Expand Up @@ -590,28 +511,6 @@ def __init__(
chunk_indices = np.unique(idx // chunk_size)
super().__init__(chunk_indices, dset_size, chunk_size)

@cython.ccall
def chunk_submap(
self, chunk_idx: hsize_t
) -> tuple[Slice, NDArray[np_hsize_t] | slice, NDArray[np_hsize_t] | slice]:
chunk_start, chunk_stop = self._chunk_start_stop(chunk_idx)

if self.is_ascending:
# O(n*logn)
start_idx, stop_idx = np.searchsorted(self.idx, [chunk_start, chunk_stop])
mask = slice(int(start_idx), int(stop_idx), 1)
# TODO optimize monotonic descending
else:
# O(n^2)
mask = (chunk_start <= self.idx) & (self.idx < chunk_stop)
mask = _maybe_array_idx_to_slice(mask)

return (
Slice(chunk_start, chunk_stop, 1),
mask,
_maybe_array_idx_to_slice(self.idx[mask] - chunk_start),
)

@cython.ccall
def read_many_slices_params(
self,
Expand Down Expand Up @@ -1197,72 +1096,3 @@ def read_many_slices_params_nd(
assert outs == nslices

return ndslices


def as_subchunk_map(
idx: Any,
shape: tuple[int, ...],
chunk_size: tuple[int, ...] | ChunkSize,
) -> Iterator[
tuple[
Tuple,
AnySlicerND,
AnySlicerND,
]
]:
"""Computes the chunk selection assignment. In particular, given a `chunk_size`
it returns triple (chunk_idx, value_sub_idx, chunk_sub_idx) such that for a
chunked Dataset `ds` we can translate selections like
>> value = ds[idx]
into selecting from the individual chunks of `ds` as
>> value = np.empty(output_shape)
>> for chunk_idx, value_sub_idx, chunk_sub_idx in as_subchunk_map(
.. idx, ds.shape, ds.chunk_size
.. ):
.. value[value_sub_idx] = ds.data_dict[chunk_idx][chunk_sub_idx]
Similarly, assignments like
>> ds[idx] = value
can be translated into
>> for chunk_idx, value_sub_idx, chunk_sub_idx in as_subchunk_map(
.. idx, ds.shape, ds.chunk_size
.. ):
.. ds.data_dict[chunk_idx][chunk_sub_idx] = value[value_sub_idx]
:param idx: the "index" to read from / write to the Dataset
:param shape: the shape of the Dataset
:param chunk_size: the `ChunkSize` of the Dataset
:return: a generator of `(chunk_idx, value_sub_idx, chunk_sub_idx)` tuples
"""
idx, mappers = index_chunk_mappers(idx, shape, chunk_size)
if not mappers:
return
idx_len = len(idx.args)

mapper: IndexChunkMapper # noqa: F842
chunk_subindexes = [
[mapper.chunk_submap(chunk_idx) for chunk_idx in mapper.chunk_indices]
for mapper in mappers
]

# Now combine the chunk_slices and subindexes for each dimension into tuples
# across all dimensions.
for p in itertools.product(*chunk_subindexes):
chunk_slices, value_subidxs, chunk_subidxs = zip(*p)

# skip dimensions which were sliced away
value_subidxs = tuple(
value_subidx
for value_subidx in value_subidxs
if value_subidx is not DROP_AXIS
)
# skip suffix dimensions
chunk_subidxs = chunk_subidxs[:idx_len]

yield Tuple(*chunk_slices), value_subidxs, chunk_subidxs
Loading

0 comments on commit e5a0b0a

Please sign in to comment.