Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spm): improved dataset type checking #107

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cryosparc/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def __new__(cls, field: Field, data: Data):
dtype = n.dtype(fielddtype(field))
nrow = data.nrow()
shape = (nrow, *dtype.shape)
buffer = data.getbuf(field[0]).memview if nrow else None
buffer = data.getbuf(field[0])
if buffer is not None:
buffer = buffer.memview
obj = super().__new__(cls, shape=shape, dtype=dtype.base, buffer=buffer) # type: ignore

# Keep a reference to the data so that it only gets cleaned up when all
Expand Down
81 changes: 81 additions & 0 deletions cryosparc/core.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from enum import Enum
from typing import SupportsBytes

from numpy.typing import NDArray

__all__ = ["DsetType", "Stream", "Data"]

class MemoryView(SupportsBytes): # Note: Supports buffer protocol.
base: "Array"
size: int
itemsize: int
nbytes: int
ndim: int
shape: tuple[int, ...]
strides: tuple[int, ...]
suboffsets: tuple[int, ...]
T: "MemoryView"

def copy(self) -> "MemoryView": ...
def copy_fortran(self) -> "MemoryView": ...
def is_c_contig(self) -> bool: ...
def is_f_contig(self) -> bool: ...

class Array:
memview: MemoryView

def __len__(self) -> int: ...
def __getitem__(self, key: int | slice) -> bytes: ...
def __setitem__(self, key: int | slice, item: bytes): ...

class DsetType(int, Enum):
T_F32 = ...
T_F64 = ...
T_C32 = ...
T_C64 = ...
T_I8 = ...
T_I16 = ...
T_I32 = ...
T_I64 = ...
T_U8 = ...
T_U16 = ...
T_U32 = ...
T_U64 = ...
T_STR = ...
T_OBJ = ...

class Data:
def __init__(self, other: "Data" | None = None) -> None: ...
def innerjoin(self, key: str, other: "Data") -> "Data": ...
def totalsz(self) -> int: ...
def ncol(self) -> int: ...
def nrow(self) -> int: ...
def key(self, index: int) -> str: ...
def type(self, field: str) -> int: ...
def has(self, field: str) -> bool: ...
def addrows(self, num: int) -> None: ...
def addcol_scalar(self, field: str, dtype: int) -> None: ...
def addcol_array(self, field: str, dtype: int, shape: tuple[int, ...]) -> None: ...
def getshp(self, colkey: str) -> tuple[int, ...]: ...
def getbuf(self, colkey: str) -> Array | None: ...
def getstr(self, col: str, index: int) -> bytes: ...
def tocstrs(self, col: str) -> bool: ...
def topystrs(self, col: str) -> bool: ...
def stralloc(self, val: str) -> int: ...
def dump(self) -> Array: ...
def dumpstrheap(self) -> Array: ...
def setstrheap(self, heap: bytes) -> None: ...
def defrag(self, realloc_smaller: bool) -> None: ...
def dumptxt(self, dump_data: bool = False) -> None: ...
def handle(self) -> int: ...

class Stream:
def __init__(self, data: Data) -> None: ...
def cast_objs_to_strs(self) -> None: ...
def stralloc_col(self, col: str) -> Array | None: ...
def compress_col(self, col: str) -> Array: ...
def compress_numpy(self, arr: NDArray) -> Array: ...
def compress(self, arr: Array) -> Array: ...
def decompress_col(self, col: str, data: bytes) -> Array: ...
def decompress_numpy(self, data: bytes, arr: NDArray) -> Array: ...
def decompress(self, data: bytes, outptr: int = 0) -> Array: ...
2 changes: 1 addition & 1 deletion cryosparc/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ cdef class Data:
with nogil:
mem = dataset.dset_get(self._handle, colkey_c)
size = dataset.dset_getsz(self._handle, colkey_c)
return 0 if size == 0 else <unsigned char [:size]> mem
return None if size == 0 else <unsigned char [:size]> mem

def getstr(self, str col, size_t index):
return dataset.dset_getstr(self._handle, col.encode(), index) # returns bytes
Expand Down
45 changes: 25 additions & 20 deletions cryosparc/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@
if TYPE_CHECKING:
from numpy.typing import ArrayLike, DTypeLike, NDArray

from .core import MemoryView


# Save format options
NUMPY_FORMAT = 1
Expand Down Expand Up @@ -683,16 +681,17 @@ def _load_stream(
descr = filter_descr(header["dtype"], keep_prefixes=prefixes, keep_fields=fields)
field_names = {field[0] for field in descr}

# Calling addrows separately to minimizes column-based
# allocations, improves performance by ~20%
# Calling addrows separately to minimize column-based allocations,
# improves performance by ~20%
dset = cls.allocate(0, descr)
if header["length"] == 0:
return dset # no more data to load

data = dset._data
data.addrows(header["length"])

# If a dataset is empty, it won't have anything in its data section.
# Just the string heap at the end.
dtype = [] if header["length"] == 0 else header["dtype"]
loader = Stream(data)
for field in header["dtype"]:
for field in dtype:
colsize = u32intle(f.read(4))
if field[0] not in field_names:
# try to seek instead of read to reduce memory usage
Expand All @@ -701,8 +700,10 @@ def _load_stream(
buffer = f.read(colsize)
if field[0] in header["compressed_fields"]:
loader.decompress_col(field[0], buffer)
else:
data.getbuf(field[0])[:] = buffer
continue
mem = data.getbuf(field[0])
assert mem is not None, f"Could not load stream (missing {field[0]} buffer)"
mem[:] = buffer

# Read in the string heap (rest of stream)
# NOTE: There will be a bug here for long column keys that are
Expand All @@ -726,16 +727,22 @@ async def from_async_stream(cls, stream: AsyncBinaryIO):
dset = cls.allocate(0, header["dtype"])
data = dset._data
data.addrows(header["length"])

# If a dataset is empty, it won't have anything in its data section.
# Just the string heap at the end.
dtype = [] if header["length"] == 0 else header["dtype"]
loader = Stream(data)
for field in header["dtype"]:
for field in dtype:
colsize = u32intle(await stream.read(4))
buffer = await stream.read(colsize)
if field[0] in header["compressed_fields"]:
loader.decompress_col(field[0], buffer)
else:
data.getbuf(field[0])[:] = buffer
continue
mem = data.getbuf(field[0])
assert mem is not None, f"Could not load stream (missing {field[0]} buffer)"
mem[:] = buffer

heap = stream.read()
heap = await stream.read()
data.setstrheap(heap)

# Convert C strings to Python strings
Expand Down Expand Up @@ -803,16 +810,14 @@ def stream(self, compression: Literal["lz4", None] = None) -> Generator[bytes, N
yield u32bytesle(len(header))
yield header

if len(self) == 0:
return # empty dataset, don't yield anything

for f in self:
fielddata: "MemoryView"
fields = [] if len(self) == 0 else self.fields()
for f in fields:
if f in compressed_fields:
# obj columns added to strheap and loaded as indexes
fielddata = stream.compress_col(f)
else:
fielddata = stream.stralloc_col(f) or data.getbuf(f)
assert fielddata is not None, f"Could not stream dataset (missing {f} buffer)"
yield u32bytesle(len(fielddata))
yield bytes(fielddata.memview)

Expand Down Expand Up @@ -1231,7 +1236,7 @@ def filter_prefix(self, keep_prefix: str, *, rename: Optional[str] = None, copy:
if rename and rename != keep_prefix:
new_fields = [f"{rename}/{f.split('/', 1)[1]}" for f in keep_fields]

result = type(self)([("uid", self["uid"])] + [(nf, self[f]) for f, nf in zip(keep_fields, new_fields)])
result = type(self)([("uid", self["uid"])] + [(nf, self[f]) for f, nf in zip(keep_fields, new_fields)]) # type: ignore
return result if copy else self._reset(result._data)

def drop_fields(self, names: Union[Collection[str], Callable[[str], bool]], *, copy: bool = False):
Expand Down
45 changes: 0 additions & 45 deletions typings/cryosparc/core.pyi

This file was deleted.

Loading