diff --git a/cryosparc/column.py b/cryosparc/column.py index 3e84ee69..ee0bb930 100644 --- a/cryosparc/column.py +++ b/cryosparc/column.py @@ -52,7 +52,9 @@ def __new__(cls, field: Field, data: Data): dtype = n.dtype(fielddtype(field)) nrow = data.nrow() shape = (nrow, *dtype.shape) - buffer = data.getbuf(field[0]).memview if nrow else None + buffer = data.getbuf(field[0]) + if buffer is not None: + buffer = buffer.memview obj = super().__new__(cls, shape=shape, dtype=dtype.base, buffer=buffer) # type: ignore # Keep a reference to the data so that it only gets cleaned up when all diff --git a/cryosparc/core.pyi b/cryosparc/core.pyi new file mode 100644 index 00000000..e8f19aca --- /dev/null +++ b/cryosparc/core.pyi @@ -0,0 +1,81 @@ +from enum import Enum +from typing import SupportsBytes + +from numpy.typing import NDArray + +__all__ = ["DsetType", "Stream", "Data"] + +class MemoryView(SupportsBytes): # Note: Supports buffer protocol. + base: "Array" + size: int + itemsize: int + nbytes: int + ndim: int + shape: tuple[int, ...] + strides: tuple[int, ...] + suboffsets: tuple[int, ...] + T: "MemoryView" + + def copy(self) -> "MemoryView": ... + def copy_fortran(self) -> "MemoryView": ... + def is_c_contig(self) -> bool: ... + def is_f_contig(self) -> bool: ... + +class Array: + memview: MemoryView + + def __len__(self) -> int: ... + def __getitem__(self, key: int | slice) -> bytes: ... + def __setitem__(self, key: int | slice, item: bytes): ... + +class DsetType(int, Enum): + T_F32 = ... + T_F64 = ... + T_C32 = ... + T_C64 = ... + T_I8 = ... + T_I16 = ... + T_I32 = ... + T_I64 = ... + T_U8 = ... + T_U16 = ... + T_U32 = ... + T_U64 = ... + T_STR = ... + T_OBJ = ... + +class Data: + def __init__(self, other: "Data" | None = None) -> None: ... + def innerjoin(self, key: str, other: "Data") -> "Data": ... + def totalsz(self) -> int: ... + def ncol(self) -> int: ... + def nrow(self) -> int: ... + def key(self, index: int) -> str: ... + def type(self, field: str) -> int: ... + def has(self, field: str) -> bool: ... + def addrows(self, num: int) -> None: ... + def addcol_scalar(self, field: str, dtype: int) -> None: ... + def addcol_array(self, field: str, dtype: int, shape: tuple[int, ...]) -> None: ... + def getshp(self, colkey: str) -> tuple[int, ...]: ... + def getbuf(self, colkey: str) -> Array | None: ... + def getstr(self, col: str, index: int) -> bytes: ... + def tocstrs(self, col: str) -> bool: ... + def topystrs(self, col: str) -> bool: ... + def stralloc(self, val: str) -> int: ... + def dump(self) -> Array: ... + def dumpstrheap(self) -> Array: ... + def setstrheap(self, heap: bytes) -> None: ... + def defrag(self, realloc_smaller: bool) -> None: ... + def dumptxt(self, dump_data: bool = False) -> None: ... + def handle(self) -> int: ... + +class Stream: + def __init__(self, data: Data) -> None: ... + def cast_objs_to_strs(self) -> None: ... + def stralloc_col(self, col: str) -> Array | None: ... + def compress_col(self, col: str) -> Array: ... + def compress_numpy(self, arr: NDArray) -> Array: ... + def compress(self, arr: Array) -> Array: ... + def decompress_col(self, col: str, data: bytes) -> Array: ... + def decompress_numpy(self, data: bytes, arr: NDArray) -> Array: ... + def decompress(self, data: bytes, outptr: int = 0) -> Array: ... diff --git a/cryosparc/core.pyx b/cryosparc/core.pyx index 384ebb9d..6327cbe0 100644 --- a/cryosparc/core.pyx +++ b/cryosparc/core.pyx @@ -142,7 +142,7 @@ cdef class Data: with nogil: mem = dataset.dset_get(self._handle, colkey_c) size = dataset.dset_getsz(self._handle, colkey_c) - return 0 if size == 0 else mem + return None if size == 0 else mem def getstr(self, str col, size_t index): return dataset.dset_getstr(self._handle, col.encode(), index) # returns bytes diff --git a/cryosparc/dataset.py b/cryosparc/dataset.py index 1030f718..27a8e29e 100644 --- a/cryosparc/dataset.py +++ b/cryosparc/dataset.py @@ -75,8 +75,6 @@ if TYPE_CHECKING: from numpy.typing import ArrayLike, DTypeLike, NDArray - from .core import MemoryView - # Save format options NUMPY_FORMAT = 1 @@ -683,16 +681,17 @@ def _load_stream( descr = filter_descr(header["dtype"], keep_prefixes=prefixes, keep_fields=fields) field_names = {field[0] for field in descr} - # Calling addrows separately to minimizes column-based - # allocations, improves performance by ~20% + # Calling addrows separately to minimize column-based allocations, + # improves performance by ~20% dset = cls.allocate(0, descr) - if header["length"] == 0: - return dset # no more data to load - data = dset._data data.addrows(header["length"]) + + # If a dataset is empty, it won't have anything in its data section. + # Just the string heap at the end. + dtype = [] if header["length"] == 0 else header["dtype"] loader = Stream(data) - for field in header["dtype"]: + for field in dtype: colsize = u32intle(f.read(4)) if field[0] not in field_names: # try to seek instead of read to reduce memory usage @@ -701,8 +700,10 @@ def _load_stream( buffer = f.read(colsize) if field[0] in header["compressed_fields"]: loader.decompress_col(field[0], buffer) - else: - data.getbuf(field[0])[:] = buffer + continue + mem = data.getbuf(field[0]) + assert mem is not None, f"Could not load stream (missing {field[0]} buffer)" + mem[:] = buffer # Read in the string heap (rest of stream) # NOTE: There will be a bug here for long column keys that are @@ -726,16 +727,22 @@ async def from_async_stream(cls, stream: AsyncBinaryIO): dset = cls.allocate(0, header["dtype"]) data = dset._data data.addrows(header["length"]) + + # If a dataset is empty, it won't have anything in its data section. + # Just the string heap at the end. + dtype = [] if header["length"] == 0 else header["dtype"] loader = Stream(data) - for field in header["dtype"]: + for field in dtype: colsize = u32intle(await stream.read(4)) buffer = await stream.read(colsize) if field[0] in header["compressed_fields"]: loader.decompress_col(field[0], buffer) - else: - data.getbuf(field[0])[:] = buffer + continue + mem = data.getbuf(field[0]) + assert mem is not None, f"Could not load stream (missing {field[0]} buffer)" + mem[:] = buffer - heap = stream.read() + heap = await stream.read() data.setstrheap(heap) # Convert C strings to Python strings @@ -803,16 +810,14 @@ def stream(self, compression: Literal["lz4", None] = None) -> Generator[bytes, N yield u32bytesle(len(header)) yield header - if len(self) == 0: - return # empty dataset, don't yield anything - - for f in self: - fielddata: "MemoryView" + fields = [] if len(self) == 0 else self.fields() + for f in fields: if f in compressed_fields: # obj columns added to strheap and loaded as indexes fielddata = stream.compress_col(f) else: fielddata = stream.stralloc_col(f) or data.getbuf(f) + assert fielddata is not None, f"Could not stream dataset (missing {f} buffer)" yield u32bytesle(len(fielddata)) yield bytes(fielddata.memview) @@ -1231,7 +1236,7 @@ def filter_prefix(self, keep_prefix: str, *, rename: Optional[str] = None, copy: if rename and rename != keep_prefix: new_fields = [f"{rename}/{f.split('/', 1)[1]}" for f in keep_fields] - result = type(self)([("uid", self["uid"])] + [(nf, self[f]) for f, nf in zip(keep_fields, new_fields)]) + result = type(self)([("uid", self["uid"])] + [(nf, self[f]) for f, nf in zip(keep_fields, new_fields)]) # type: ignore return result if copy else self._reset(result._data) def drop_fields(self, names: Union[Collection[str], Callable[[str], bool]], *, copy: bool = False): diff --git a/typings/cryosparc/core.pyi b/typings/cryosparc/core.pyi deleted file mode 100644 index 6b1da4bf..00000000 --- a/typings/cryosparc/core.pyi +++ /dev/null @@ -1,45 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -from enum import Enum -from typing import Optional, Tuple - -from numpy.typing import NDArray - -__all__ = ["DsetType", "Stream", "Data"] - -class MemoryView: # Note: Supports buffer protocol. - base: "Array" - size: int - itemsize: int - nbytes: int - ndim: int - shape: Tuple[int, ...] - strides: Tuple[int, ...] - suboffsets: Tuple[int, ...] - T: "MemoryView" - def copy(self) -> "MemoryView": ... - def copy_fortran(self) -> "MemoryView": ... - def is_c_contig(self) -> bool: ... - def is_f_contig(self) -> bool: ... - -class Array: - memview: MemoryView - -class DsetType(int, Enum): - pass - -class Data: - pass - -class Stream: - def __init__(self, data: Data) -> None: ... - def cast_objs_to_strs(self) -> None: ... - def stralloc_col(self, col: str) -> Optional[Array]: ... - def compress_col(self, col: str) -> Array: ... - def compress_numpy(self, arr: NDArray) -> Array: ... - def compress(self, arr: Array) -> Array: ... - def decompress_col(self, col: str, data: bytes) -> Array: ... - def decompress_numpy(self, data: bytes, arr: NDArray) -> Array: ... - def decompress(self, data: bytes, outptr: int = 0) -> Array: ...