Skip to content

Commit

Permalink
Fix regression: shrinking corrupts the hash table
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jan 14, 2025
1 parent 1a22450 commit 53a15ca
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 41 deletions.
152 changes: 114 additions & 38 deletions versioned_hdf5/staged_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,10 +1292,12 @@ def __init__(
if old_shape == new_shape:
return

# Shrinking along an axis can't alter chunks on the slabs.
# However, it can reduce the amount of chunks impacted by enlarges on other
# axes, so it should be done first.
# It can also causes slabs to drop.
# Shrinking along an axis can't alter chunks on the slabs; however, partial edge
# chunks should be loaded into memory to avoid ending up with partially
# overlapping chunks on disk, e.g. [10:19] vs. [10:17].
# It can also reduce the amount of chunks impacted if we are also enlarging
# along other axes, so it should be done first.
# Finally, it can cause slabs to drop.
shrunk_shape = tuple(min(o, n) for o, n in zip(old_shape, new_shape))
if shrunk_shape != old_shape:
chunks_slice = tuple(
Expand All @@ -1305,6 +1307,28 @@ def __init__(
# edge chunks without reducing the number of chunks.
self.slab_indices = self.slab_indices[chunks_slice]
self.slab_offsets = self.slab_offsets[chunks_slice]

# Load partial edge chunks into memory to avoid ending up with partially
# overlapping chunks on disk, e.g. [10:19] vs. [10:17].
load_edge_axes = [
axis
for axis, (old_size, shrunk_size, c) in enumerate(
zip(old_shape, shrunk_shape, chunk_size)
)
if old_size > shrunk_size and shrunk_size % c != 0
]
if n_base_slabs > 0 and load_edge_axes:
self.slab_indices = self.slab_indices.copy()
self.slab_offsets = self.slab_offsets.copy()
for axis in load_edge_axes:
self._shrink_along_axis(
new_shape=new_shape,
chunk_size=chunk_size,
axis=axis,
n_slabs=n_slabs + len(self.append_slabs),
n_base_slabs=n_base_slabs,
)

chunks_dropped = self.slab_indices.size < slab_indices.size

if shrunk_shape != new_shape:
Expand All @@ -1323,18 +1347,18 @@ def __init__(
self.slab_offsets = np.pad(self.slab_offsets, pad_width)

# No need to transfer anything if there are only full chunks
if n_slabs > 1:
if n_slabs + len(self.append_slabs) > 1:
prev_shape = shrunk_shape
for axis in range(len(new_shape)):
next_shape = new_shape[: axis + 1] + prev_shape[axis + 1 :]
if next_shape != prev_shape:
self._enlarge_along_axis(
prev_shape,
next_shape,
chunk_size,
axis,
n_slabs + len(self.append_slabs),
n_base_slabs,
old_shape=prev_shape,
new_shape=next_shape,
chunk_size=chunk_size,
axis=axis,
n_slabs=n_slabs + len(self.append_slabs),
n_base_slabs=n_base_slabs,
)
prev_shape = next_shape

Expand All @@ -1353,6 +1377,34 @@ def __init__(
assume_unique=True,
).tolist()

@cython.cfunc
def _shrink_along_axis(
self,
new_shape: tuple[int, ...],
chunk_size: tuple[int, ...],
axis: ssize_t,
n_slabs: int,
n_base_slabs: int,
):
"""Shrink along a single axis.
Load partial edge chunks into memory to avoid ending up with partially
overlapping chunks on disk, e.g. [10:19] vs. [10:17].
"""
new_size = new_shape[axis]
new_floor_size = new_size - new_size % chunk_size[axis]
assert new_floor_size < new_size

self._load_edge_chunks_along_axis(
shape=new_shape,
chunk_size=chunk_size,
axis=axis,
floor_size=new_floor_size,
size=new_size,
n_slabs=n_slabs,
n_base_slabs=n_base_slabs,
)

@cython.cfunc
def _enlarge_along_axis(
self,
Expand All @@ -1366,7 +1418,7 @@ def _enlarge_along_axis(
"""Enlarge along a single axis"""
old_size = old_shape[axis]

# Old size, rounded down or up to the nearest chunk
# Old size, rounded down to the nearest chunk
old_floor_size = old_size - old_size % chunk_size[axis]
if old_floor_size == old_size:
return # Everything we're doing is adding extra empty chunks
Expand All @@ -1382,32 +1434,15 @@ def _enlarge_along_axis(
# This includes chunks we just transferred in the previous step)

# Step 1
if n_base_slabs > 0:
idx = (slice(None),) * axis + (slice(old_floor_size, old_size),)
_, mappers = index_chunk_mappers(idx, new_shape, chunk_size)
if not mappers:
return # Resizing from size 0

chunks = _chunks_in_selection(
self.slab_indices,
self.slab_offsets,
mappers,
filter=lambda slab_idx: (0 < slab_idx) & (slab_idx <= n_base_slabs),
idxidx=True,
)
nchunks = chunks.shape[0]
if nchunks > 0:
self.append_slabs.append((nchunks * chunk_size[0],) + chunk_size[1:])
self.transfers.extend(
_make_transfer_plans(
mappers,
chunks,
src_slab_idx="chunks",
dst_slab_idx=n_slabs,
slab_indices=self.slab_indices, # Modified in place
slab_offsets=self.slab_offsets, # Modified in place
)
)
self._load_edge_chunks_along_axis(
shape=new_shape,
chunk_size=chunk_size,
axis=axis,
floor_size=old_floor_size,
size=old_size,
n_slabs=n_slabs,
n_base_slabs=n_base_slabs,
)

# Step 2
idx = (slice(None),) * axis + (slice(old_size, new_size),)
Expand Down Expand Up @@ -1436,6 +1471,47 @@ def _enlarge_along_axis(
)
)

@cython.cfunc
def _load_edge_chunks_along_axis(
self,
shape: tuple[int, ...],
chunk_size: tuple[int, ...],
axis: int,
floor_size: int,
size: int, # Not necessarily shape[axis]
n_slabs: int,
n_base_slabs: int,
):
"""Load the edge chunks along an axis from the base slabs into a new slab"""
if n_base_slabs == 0:
return

idx = (slice(None),) * axis + (slice(floor_size, size),)
_, mappers = index_chunk_mappers(idx, shape, chunk_size)
if not mappers:
return # Resizing to size 0

chunks = _chunks_in_selection(
self.slab_indices,
self.slab_offsets,
mappers,
filter=lambda slab_idx: (0 < slab_idx) & (slab_idx <= n_base_slabs),
idxidx=True,
)
nchunks = chunks.shape[0]
if nchunks > 0:
self.append_slabs.append((nchunks * chunk_size[0],) + chunk_size[1:])
self.transfers.extend(
_make_transfer_plans(
mappers,
chunks,
src_slab_idx="chunks",
dst_slab_idx=n_slabs,
slab_indices=self.slab_indices, # Modified in place
slab_offsets=self.slab_offsets, # Modified in place
)
)

@property
def head(self) -> str:
return "ResizePlan<" + super().head
Expand Down
45 changes: 44 additions & 1 deletion versioned_hdf5/tests/test_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import h5py
import numpy as np
import pytest
from ndindex import Slice
from packaging.version import Version

from versioned_hdf5 import VersionedHDF5File
Expand Down Expand Up @@ -913,7 +914,7 @@ def test_delete_versions_current_version(vfile):
np.testing.assert_equal(vfile[cv]["bar"][:], np.arange(17))


def test_variable_length_strings(vfile):
def test_delete_variable_length_strings(vfile):
with vfile.stage_version("r0") as sv:
g = sv.create_group("data")
dt = h5py.string_dtype(encoding="ascii")
Expand Down Expand Up @@ -1045,6 +1046,48 @@ def test_delete_versions_speed(vfile):
assert mock_get_parent.call_count == 90


def test_delete_versions_after_shrinking(vfile):
"""Test that if you shrink a dataset so that an edge chunk contains the same data of
the previous edge chunk on disk, but trimmed to the new size, then you end up with a
full copy of the edge chunk and you can safely delete the previous, larger version
of it.
See Also
--------
https://github.com/deshaw/versioned-hdf5/issues/411
test_staged_changes.py::test_shrinking_does_not_reuse_partial_chunks
"""
with vfile.stage_version("r1") as sv:
sv.create_dataset("values", data=np.arange(26), chunks=(10,))
with vfile.stage_version("r2") as sv:
sv["values"].resize((17,))

ht_before = Hashtable(vfile.f, "values").inverse()
assert ht_before.keys() == {
Slice(0, 10, 1), # r1
Slice(10, 20, 1), # r1
Slice(20, 26, 1), # r1
# Shrinking the r1[10:20] chunk triggered a deep copy of the remaining [10:17],
# and now it has its own hash key and a non-overlapping slice, even if the
# shared area is identical.
Slice(30, 37, 1), # r2
}
assert (ht_before[Slice(10, 20, 1)] != Slice(30, 37, 1)).any()
raw_data = vfile.f["_version_data/values/raw_data"][:]
np.testing.assert_equal(raw_data[30:37], raw_data[10:17])

delete_versions(vfile, ["r1"])
np.testing.assert_equal(vfile["r2"]["values"], np.arange(17))

ht_after = Hashtable(vfile.f, "values").inverse()
assert ht_after.keys() == {
Slice(0, 10, 1), # Same as before delete
Slice(10, 17, 1), # Was Slice(30, 37, 1)
}
np.testing.assert_equal(ht_after[Slice(0, 10, 1)], ht_before[Slice(0, 10, 1)])
np.testing.assert_equal(ht_after[Slice(10, 17, 1)], ht_before[Slice(30, 37, 1)])


@pytest.mark.parametrize(
("obj", "metadata_opts"),
[
Expand Down
28 changes: 26 additions & 2 deletions versioned_hdf5/tests/test_staged_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,19 @@ def test_load():


def test_shrinking_dereferences_slabs():
"""Test that shrinking a StagedChangesArray dereferences the slabs that are no
longer referenced by any chunks.
"""
arr = StagedChangesArray.from_array([[1, 2, 3, 4, 5, 6, 7]], (1, 2), fill_value=42)
arr[0, -1] = 8
arr.resize((1, 10))
assert_array_equal(arr.slab_indices, [[1, 2, 3, 5, 0]])
assert_array_equal(arr, [[1, 2, 3, 4, 5, 6, 8, 42, 42, 42]])
arr.resize((1, 3))

arr.resize((1, 4))
assert_array_equal(arr.slab_indices, [[1, 2]])
assert arr.slabs[3:] == [None, None, None]
assert_array_equal(arr, [[1, 2, 3]])
assert_array_equal(arr, [[1, 2, 3, 4]])

arr.resize((0, 0))
assert_array_equal(arr.slab_indices, np.empty((0, 0)))
Expand All @@ -298,6 +302,26 @@ def test_shrinking_dereferences_slabs():
assert arr.slabs[1:] == [None, None, None, None, None]


@pytest.mark.parametrize("starting_size", (6, 5))
def test_shrinking_does_not_reuse_partial_chunks(starting_size):
"""Test that if a partial chunk is created or resized when shrinking, and that
chunk was on a base slab, it is loaded into memory to avoid different keys in the
hash table from pointing to the same data.
See Also
--------
https://github.com/deshaw/versioned-hdf5/issues/411
test_replay.py::test_delete_versions_after_shrinking
"""
arr = StagedChangesArray.from_array(
list(range(starting_size)), chunk_size=(3,), fill_value=4
)
assert_array_equal(arr.slab_indices, [1, 1])
arr.resize((4,))
assert_array_equal(arr.slab_indices, [1, 2])
assert_array_equal(arr, [0, 1, 2, 3])


def test_copy():
a = StagedChangesArray.from_array([[1, 2]], chunk_size=(1, 1), fill_value=42)
a.resize((1, 3))
Expand Down

0 comments on commit 53a15ca

Please sign in to comment.