Skip to content

Commit

Permalink
[InMemoryDataset redesign] Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 19, 2024
1 parent a82060d commit 8750d92
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 320 deletions.
5 changes: 5 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ The ndindex library was initially created for versioned-hdf5, in order to make
index manipulation possible as well as allowing code that passes indices around
to become much cleaner.

`InMemoryDataset` is implemented as a fairly thin wrapper around
`StagedChangesArray`, which is abstracted from h5py and holds all the modified
chunks in memory, presenting them as a numpy-like array, until they are ready to
be flushed to disk. For details, read [Staging changes in memory](staged_changes).

These wrapper objects all try to emulate the h5py API as closely as possible,
so that the user can use them just as they would the real h5py objects. Any
discrepancy between h5py and versioned-hdf5 semantics should be considered a
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Versioned HDF5 provides a versioned abstraction on top of `h5py <https://www.h5p
performance
reference
design
staged_changes
changelog
releasing

Expand Down
16 changes: 6 additions & 10 deletions versioned_hdf5/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import posixpath
import sys
from collections.abc import Iterable
from copy import deepcopy
from typing import Any

import numpy as np
Expand Down Expand Up @@ -130,10 +129,7 @@ def recreate_dataset(f, name, newf, callback=None):
# hash table has the raw data in the same locations, even if the
# data is unchanged).
if isinstance(dataset, (InMemoryDataset, InMemorySparseDataset)):
for c, index in dataset.data_dict.copy().items():
if isinstance(index, Slice):
dataset[c.raw]
assert not isinstance(dataset.data_dict[c], Slice)
dataset.staged_changes.load()
slices = write_dataset_chunks(newf, name, dataset.data_dict)
else:
slices = write_dataset(newf, name, dataset)
Expand Down Expand Up @@ -669,7 +665,7 @@ def callback(dataset, version_name):
new_dataset = InMemoryArrayDataset(
name, dataset[()], tmp_parent, fillvalue=_fillvalue, chunks=_chunks
)
if _fillvalue:
if _fillvalue is not None:
new_dataset[new_dataset == dataset.fillvalue] = _fillvalue
elif isinstance(dataset, InMemorySparseDataset):
new_dataset = InMemorySparseDataset(
Expand All @@ -680,10 +676,10 @@ def callback(dataset, version_name):
chunks=_chunks,
fillvalue=_fillvalue,
)
new_dataset.data_dict = deepcopy(dataset.data_dict)
if _fillvalue:
for a in new_dataset.data_dict.values():
a[a == dataset.fillvalue] = _fillvalue
if _fillvalue is not None:
new_dataset.staged_changes = dataset.staged_changes.refill(_fillvalue)
else:
new_dataset.staged_changes = dataset.staged_changes.copy()
else:
raise NotImplementedError(type(dataset))

Expand Down
61 changes: 0 additions & 61 deletions versioned_hdf5/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2697,67 +2697,6 @@ def test_make_empty_dataset(tmp_path):
assert_equal(cv["values"][:], np.array([]))


@mark.parametrize(
("chunk_size", "iterations"), itertools.product([3, 5, 10], [5, 20, 50, 100])
)
def test_resize_performance(tmp_path, chunk_size, iterations):
"""Test that resizing an InMemoryDataset only calls InMemoryDatasetID.can_read_direct once per iteration.
InMemoryDatasetID.can_read_direct iterates through every chunk in the InMemoryDataset.data_dict. If this
is inadvertently called inside a resize operation, it can lead to quadratic performance. This test checks
that `can_read_direct` is only called once per resize operation. See
https://github.com/deshaw/versioned-hdf5/issues/325 for more information.
"""
path = tmp_path / "tmp.h5"
with h5py.File(path, "w") as f:
vf = VersionedHDF5File(f)
with vf.stage_version("r0") as sv:
sv.create_dataset("values", data=np.array([1, 2, 3]), chunks=(chunk_size,))

with mock.patch(
"versioned_hdf5.wrappers.InMemoryDatasetID.can_read_direct",
new_callable=mock.PropertyMock,
) as mock_crd:
mock_crd.can_read_direct.return_value = True
for i in range(iterations):
with vf.stage_version(f"r{i+1}") as sv:
# Grow the dataset
sv["values"].resize((i + 3,))

assert mock_crd.call_count == iterations


@mark.parametrize(
("can_read_direct", "expected_calls"),
[
[True, 0],
[False, 0],
[None, 1],
],
)
def test_dataset_getitem_can_read_direct(tmp_path, can_read_direct, expected_calls):
"""Check that InMemoryDataset.get_index only reads directly when expected."""
chunk_size = 10

path = tmp_path / "tmp.h5"
with h5py.File(path, "w") as f:
vf = VersionedHDF5File(f)
with vf.stage_version("r0") as sv:
sv.create_dataset("values", data=np.array([1, 2, 3]), chunks=(chunk_size,))

with mock.patch(
"versioned_hdf5.wrappers.InMemoryDatasetID.can_read_direct",
new_callable=mock.PropertyMock,
) as mock_crd:
mock_crd.can_read_direct.return_value = can_read_direct
with vf.stage_version("r1") as sv:
sv["values"].get_index(
slice(None, None), can_read_direct=can_read_direct
)

assert mock_crd.call_count == expected_calls


def test_insert_in_middle_multi_dim(tmp_path):
"""
Test we correctly handle inserting into a multi-dimensional Dataset
Expand Down
4 changes: 2 additions & 2 deletions versioned_hdf5/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def commit_version(
shape = None
if isinstance(data, InMemoryDataset):
shape = data.shape
if data.id._data_dict is None:
if not data.staged_changes.has_changes:
# The virtual dataset was not changed from the previous
# version. Just copy it to the new version directly.
assert data.name.startswith(prev_version.name + "/")
Expand All @@ -142,7 +142,7 @@ def commit_version(
for k, v in data.attrs.items():
data_copy.attrs[k] = v
continue
data = data.id._data_dict
data = data.data_dict
if isinstance(data, dict):
if chunks[name] is not None:
raise NotImplementedError("Specifying chunk size with dict data")
Expand Down
Loading

0 comments on commit 8750d92

Please sign in to comment.