Skip to content

Commit

Permalink
In the midst of fixing tests broken by this check...
Browse files Browse the repository at this point in the history
  • Loading branch information
peytondmurray committed Dec 2, 2023
1 parent a8aa7b8 commit e147657
Showing 1 changed file with 104 additions and 38 deletions.
142 changes: 104 additions & 38 deletions versioned_hdf5/backend.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict

import numpy as np
from numpy.testing import assert_array_equal
from h5py._hl.filters import guess_chunk
Expand Down Expand Up @@ -132,40 +134,84 @@ def write_dataset(f, name, data, chunks=None, dtype=None, compression=None,
data_hash = hashtable.hash(data_s)

if data_hash in hashtable:
slices[s] = hashtable[data_hash]
hashed_slice = hashtable[data_hash]
slices[s] = hashed_slice

_verify_new_chunk_reuse(
raw_data=ds,
new_data=data,
data_hash=data_hash,
hashed_slice=hashed_slice,
chunk_being_written=data_s,
slices_to_write=slices_to_write,
)

else:
slices[s] = raw_slice
hashtable[data_hash] = raw_slice
slices_to_write[raw_slice] = s

# Check that the data from the slice in the hashtable matches the
# data we are attempting to write
if raw_slice in slices_to_write:
reused_s = data[slices_to_write[raw_slice].raw]
else:
reused_slice = Tuple(
raw_slice,
*[slice(None, None) for _ in data.shape[1:]]
)
reused_s = ds[reused_slice.raw]

assert_array_equal(
reused_s,
data_s,
err_msg=(
f"Hash {data_hash} of existing data chunk {reused_s} "
f"matches the hash of new data chunk {data_s}, but data "
"does not."
)
)

ds.resize((old_shape[0] + len(slices_to_write)*chunk_size,) + chunks[1:])
for raw_slice, s in slices_to_write.items():
data_s = data[s.raw]
idx = Tuple(raw_slice, *[slice(0, i) for i in data_s.shape[1:]])
ds[idx.raw] = data[s.raw]
return slices

def _verify_new_chunk_reuse(
raw_data: np.ndarray,
new_data: np.ndarray,
data_hash: bytes,
hashed_slice: Tuple,
chunk_being_written: Tuple,
slices_to_write: Dict[Tuple, np.ndarray],
):
"""Check that the data corresponding to the slice in the hashtable matches the data
that is going to be written.
Raises a ValueError if the data reference by the hashed slice doesn't match the
underlying raw data.
Parameters
----------
raw_data : np.ndarray
Raw data that already exists in the file
new_data : np.ndarray
New data that we are writing
data_hash : bytes
Hash of the new data chunk
hashed_slice : Tuple
Slice that is stored in the hash table for the given data_hash
chunk_being_written : np.ndarray
New data chunk to be written
slices_to_write : Tuple
Dict of slices which will be written
"""
if hashed_slice in slices_to_write:
# The hash table contains a slice we will write but haven't yet; grab the
# chunk from the new data being written
reused_chunk = new_data[slices_to_write[hashed_slice].raw]
else:
# The hash table contains a slice that was written in a previous
# write operation; grab that chunk from the existing raw data
reused_slice = Tuple(
hashed_slice,
*[slice(0, size) for size in new_data.shape[1:]]
)
reused_chunk = raw_data[reused_slice.raw]

assert_array_equal(
reused_chunk,
chunk_being_written,
err_msg=(
f"Hash {data_hash} of existing data chunk {reused_chunk} "
f"matches the hash of new data chunk {chunk_being_written}, "
"but data does not."
)
)



def write_dataset_chunks(f, name, data_dict, shape=None):
"""
data_dict should be a dictionary mapping chunk_size index to either an
Expand All @@ -175,42 +221,62 @@ def write_dataset_chunks(f, name, data_dict, shape=None):
if name not in f['_version_data']:
raise NotImplementedError("Use write_dataset() if the dataset does not yet exist")

ds = f['_version_data'][name]['raw_data']
chunks = tuple(ds.attrs['chunks'])
raw_data = f['_version_data'][name]['raw_data']
chunks = tuple(raw_data.attrs['chunks'])
chunk_size = chunks[0]

if shape is None:
shape = tuple(max(c.args[i].stop for c in data_dict) for i in
range(len(chunks)))
# all_chunks = list(ChunkSize(chunks).indices(shape))
# for c in data_dict:
# if c not in all_chunks:
# raise ValueError(f"data_dict contains extra chunks ({c})")

with Hashtable(f, name) as hashtable:
slices = {i: None for i in data_dict}

# Mapping from slices in the dataset after this write is complete to chunks of
# the new data which will be written
data_to_write = {}
for chunk, data_s in data_dict.items():
if not isinstance(data_s, (slice, tuple, Tuple, Slice)) and data_s.dtype != ds.dtype:
raise ValueError(f"dtypes do not match ({data_s.dtype} != {ds.dtype})")

idx = hashtable.largest_index
# Mapping from slices in the dataset after this write is complete to ndarray
# chunks of the new data which will be written
slices_to_write = {}
for chunk, data_s in data_dict.items():
if isinstance(data_s, (slice, tuple, Tuple, Slice)):
slices[chunk] = ndindex(data_s)
else:
if data_s.dtype != raw_data.dtype:
raise ValueError(
f"dtypes do not match ({data_s.dtype} != {raw_data.dtype})"
)

idx = hashtable.largest_index
raw_slice = Slice(idx*chunk_size, idx*chunk_size + data_s.shape[0])
data_hash = hashtable.hash(data_s)
raw_slice2 = hashtable.setdefault(data_hash, raw_slice)
if raw_slice2 == raw_slice:

if data_hash in hashtable:
hashed_slice = hashtable[data_hash]
slices[chunk] = hashed_slice

_verify_new_chunk_reuse(
raw_data=raw_data,
new_data=data_s,
data_hash=data_hash,
hashed_slice=hashed_slice,
chunk_being_written=data_s,
slices_to_write=slices_to_write,
)

else:
slices[chunk] = raw_slice
hashtable[data_hash] = raw_slice
data_to_write[raw_slice] = data_s
slices[chunk] = raw_slice2
slices_to_write[raw_slice] = chunk

assert None not in slices.values()
old_shape = ds.shape
ds.resize((old_shape[0] + len(data_to_write)*chunk_size,) + chunks[1:])
old_shape = raw_data.shape
raw_data.resize((old_shape[0] + len(data_to_write)*chunk_size,) + chunks[1:])
for raw_slice, data_s in data_to_write.items():
c = (raw_slice.raw,) + tuple(slice(0, i) for i in data_s.shape[1:])
ds[c] = data_s
raw_data[c] = data_s
return slices

def create_virtual_dataset(f, version_name, name, shape, slices, attrs=None, fillvalue=None):
Expand Down

0 comments on commit e147657

Please sign in to comment.