Skip to content

Commit

Permalink
feat: update to latest awkward==2.5.0 (#407)
Browse files Browse the repository at this point in the history
* feat: add attrs and highlevel

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: use new public API

* refactor: clean up type hints

* fix: more attrs work

* fix: more attrs

* refactor: more docs / change signatures

* test: fix anticipated error message

* test: fix test?

* test: don't modify fixture

* fix: propagate attrs

* fix: bump Awkward lower bound

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
agoose77 and pre-commit-ci[bot] authored Nov 16, 2023
1 parent 5bd87d0 commit 5b2580b
Show file tree
Hide file tree
Showing 11 changed files with 568 additions and 245 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ classifiers = [
"Topic :: Software Development",
]
dependencies = [
"awkward >=2.4.5",
"awkward >=2.5.0",
"dask >=2023.04.0",
"typing_extensions >=4.8.0",
]
Expand Down
46 changes: 32 additions & 14 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import operator
import sys
import warnings
from collections.abc import Callable, Hashable, Sequence
from collections.abc import Callable, Hashable, Mapping, Sequence
from enum import IntEnum
from functools import cached_property, partial, wraps
from numbers import Number
Expand All @@ -17,13 +17,14 @@
import dask.config
import numpy as np
from awkward._do import remove_structure as ak_do_remove_structure
from awkward._nplikes.typetracer import (
from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern
from awkward.typetracer import (
MaybeNone,
OneOf,
TypeTracerArray,
create_unknown_scalar,
is_unknown_scalar,
)
from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern
from dask.base import (
DaskMethodsMixin,
dont_optimize,
Expand Down Expand Up @@ -140,7 +141,9 @@ def key(self) -> Key:
return (self._name, 0)

def _check_meta(self, m: Any) -> Any | None:
if isinstance(m, (MaybeNone, OneOf)) or is_unknown_scalar(m):
if m is None:
return m
elif isinstance(m, (MaybeNone, OneOf)) or is_unknown_scalar(m):
return m
elif isinstance(m, ak.Array) and len(m) == 1:
return m
Expand Down Expand Up @@ -348,12 +351,9 @@ def new_scalar_object(dsk: HighLevelGraph, name: str, *, meta: Any) -> Scalar:
Resulting collection.
"""
if meta is None:
meta = ak.Array(TypeTracerArray._new(dtype=np.dtype(None), shape=()))

if isinstance(meta, MaybeNone):
meta = ak.Array(meta.content)
else:
elif meta is not None:
try:
if ak.backend(meta) != "typetracer":
raise TypeError(
Expand Down Expand Up @@ -411,9 +411,7 @@ def new_known_scalar(
dtype = np.dtype(dtype)
llg = AwkwardMaterializedLayer({(name, 0): s}, previous_layer_names=[])
hlg = HighLevelGraph.from_collections(name, llg, dependencies=())
return Scalar(
hlg, name, meta=TypeTracerArray._new(dtype=dtype, shape=()), known_value=s
)
return Scalar(hlg, name, meta=create_unknown_scalar(dtype), known_value=s)


class Record(Scalar):
Expand Down Expand Up @@ -819,7 +817,14 @@ def layout(self) -> Content:
raise ValueError("This collection's meta is None; unknown layout.")

@property
def behavior(self) -> dict:
def attrs(self) -> dict:
"""awkward Array attrs dictionary."""
if self._meta is not None:
return self._meta.attrs
raise ValueError("This collection's meta is None; no attrs property available.")

@property
def behavior(self) -> Mapping:
"""awkward Array behavior dictionary."""
if self._meta is not None:
return self._meta.behavior
Expand Down Expand Up @@ -1486,7 +1491,8 @@ def new_array_object(
name: str,
*,
meta: ak.Array | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
npartitions: int | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
) -> Array:
Expand All @@ -1505,6 +1511,10 @@ def new_array_object(
typetracer for the new Array. If the configuration option
``awkward.compute-unknown-meta`` is set to ``False``,
undefined `meta` will be assigned an empty typetracer.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : dict, optional
Custom attributes for the output array.
npartitions : int, optional
Total number of partitions; if used `divisions` will be a
tuple of length `npartitions` + 1 with all elements``None``.
Expand Down Expand Up @@ -1548,6 +1558,8 @@ def new_array_object(

if behavior is not None:
actual_meta.behavior = behavior
if attrs is not None:
actual_meta.attrs = attrs

out = Array(dsk, name, actual_meta, divs)
if actual_meta.__doc__ != actual_meta.__class__.__doc__:
Expand Down Expand Up @@ -1877,6 +1889,8 @@ def non_trivial_reduction(
keepdims: bool,
mask_identity: bool,
reducer: Callable,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
combiner: Callable | None = None,
token: str | None = None,
dtype: Any | None = None,
Expand Down Expand Up @@ -2225,7 +2239,11 @@ def typetracer_array(a: ak.Array | Array) -> ak.Array:
if isinstance(a, Array):
return a._meta
elif isinstance(a, ak.Array):
return ak.Array(a.layout.to_typetracer(forget_length=True))
return ak.Array(
a.layout.to_typetracer(forget_length=True),
behavior=a._behavior,
attrs=a._attrs,
)
else:
msg = (
"`a` should be an awkward array or a Dask awkward collection.\n"
Expand Down
65 changes: 49 additions & 16 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import logging
import math
from collections.abc import Callable, Iterable
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -65,8 +65,9 @@ def __call__(self, start: int, stop: int, **kwargs: Any) -> ak.Array:
def from_awkward(
source: ak.Array,
npartitions: int,
behavior: dict | None = None,
behavior: Mapping | None = None,
label: str | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a concrete :class:`awkward.Array` object.
Expand All @@ -76,8 +77,12 @@ def from_awkward(
The concrete awkward array.
npartitions : int
The total number of partitions for the collection.
behavior : dict, optional
Custom ak.behavior for the output array.
label : str, optional
Label for the task.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand Down Expand Up @@ -112,25 +117,35 @@ def from_awkward(
divisions=locs,
meta=meta,
behavior=behavior,
attrs=attrs,
)


class _FromListsFn:
def __init__(self, behavior: dict | None = None):
def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None):
self.behavior = behavior
self.attrs = attrs

def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)
return ak.Array(x, behavior=self.behavior, attrs=self.attrs)


def from_lists(source: list) -> Array:
def from_lists(
source: list,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a list of lists.
Parameters
----------
source : list[list[Any]]
List of lists, each outer list will become a partition in the
collection.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand All @@ -152,9 +167,9 @@ def from_lists(source: list) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(),
_FromListsFn(behavior=behavior, attrs=attrs),
lists,
meta=typetracer_array(ak.Array(lists[0])),
meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)),
divisions=divs,
label="from-lists",
)
Expand All @@ -163,9 +178,10 @@ def from_lists(source: list) -> Array:
def from_delayed(
source: list[Delayed] | Delayed,
meta: ak.Array | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
prefix: str = "from-delayed",
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a set of :class:`~dask.delayed.Delayed` objects.
Expand All @@ -179,10 +195,14 @@ def from_delayed(
Metadata (typetracer array) if known, if ``None`` the first
partition (first element of the list of ``Delayed`` objects)
will be computed to determine the metadata.
behavior : dict, optional
Custom ak.behavior for the output array.
divisions : tuple[int | None, ...], optional
Partition boundaries (if known).
prefix : str
Prefix for the keys in the task graph.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand All @@ -206,11 +226,7 @@ def from_delayed(
raise ValueError("divisions must be a tuple of length len(source) + 1")
hlg = HighLevelGraph.from_collections(name, dsk, dependencies=parts)
return new_array_object(
hlg,
name=name,
meta=meta,
behavior=behavior,
divisions=divs,
hlg, name=name, meta=meta, behavior=behavior, divisions=divs, attrs=attrs
)


Expand Down Expand Up @@ -346,13 +362,21 @@ def to_dask_array(
return new_da_object(graph, name, meta=None, chunks=chunks, dtype=dtype)


def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:
def from_dask_array(
array: DaskArray,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Convert a Dask Array collection to a Dask Awkard Array collection.
Parameters
----------
array : dask.array.Array
Array to convert.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : mapping, optional
Custom attributes for the output array.
Returns
-------
Expand Down Expand Up @@ -389,11 +413,18 @@ def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:
hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array])
if np.any(np.isnan(array.chunks)):
return new_array_object(
hlg, name, npartitions=array.npartitions, meta=meta, behavior=behavior
hlg,
name,
npartitions=array.npartitions,
meta=meta,
behavior=behavior,
attrs=attrs,
)
else:
divs = (0, *np.cumsum(array.chunks))
return new_array_object(hlg, name, divisions=divs, meta=meta, behavior=behavior)
return new_array_object(
hlg, name, divisions=divs, meta=meta, behavior=behavior, attrs=attrs
)


def to_dataframe(
Expand Down Expand Up @@ -522,6 +553,8 @@ def from_map(
number of partitions in the output collection (only one
element of each iterable will be passed to `func` for each
partition).
args : tuple
Tuple of positional arguments to append after mapped arguments.
label : str, optional
String to use as the function-name label in the output
collection-key names.
Expand Down
13 changes: 7 additions & 6 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc
import logging
import math
from collections.abc import Callable
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Any, Literal, overload

import awkward as ak
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
self.compression = compression
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -163,7 +163,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -432,7 +432,8 @@ def from_json(
initial: int = 1024,
resize: float = 8,
highlevel: bool = True,
behavior: dict | None = None,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
blocksize: int | str | None = None,
delimiter: bytes | None = None,
compression: str | None = "infer",
Expand Down
Loading

0 comments on commit 5b2580b

Please sign in to comment.