Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update to latest awkward==2.5.0 #407

Merged
merged 15 commits into from
Nov 16, 2023
Merged
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ classifiers = [
"Topic :: Software Development",
]
dependencies = [
"awkward >=2.4.5",
"awkward >=2.5.0",
"dask >=2023.04.0",
"typing_extensions >=4.8.0",
]
Expand Down
46 changes: 32 additions & 14 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import operator
import sys
import warnings
from collections.abc import Callable, Hashable, Sequence
from collections.abc import Callable, Hashable, Mapping, Sequence
from enum import IntEnum
from functools import cached_property, partial, wraps
from numbers import Number
Expand All @@ -17,13 +17,14 @@
import dask.config
import numpy as np
from awkward._do import remove_structure as ak_do_remove_structure
from awkward._nplikes.typetracer import (
from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern
from awkward.typetracer import (
MaybeNone,
OneOf,
TypeTracerArray,
create_unknown_scalar,
is_unknown_scalar,
)
from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern
from dask.base import (
DaskMethodsMixin,
dont_optimize,
Expand Down Expand Up @@ -140,7 +141,9 @@ def key(self) -> Key:
return (self._name, 0)

def _check_meta(self, m: Any) -> Any | None:
if isinstance(m, (MaybeNone, OneOf)) or is_unknown_scalar(m):
if m is None:
return m
elif isinstance(m, (MaybeNone, OneOf)) or is_unknown_scalar(m):
return m
elif isinstance(m, ak.Array) and len(m) == 1:
return m
Expand Down Expand Up @@ -348,12 +351,9 @@ def new_scalar_object(dsk: HighLevelGraph, name: str, *, meta: Any) -> Scalar:
Resulting collection.

"""
if meta is None:
meta = ak.Array(TypeTracerArray._new(dtype=np.dtype(None), shape=()))

if isinstance(meta, MaybeNone):
meta = ak.Array(meta.content)
else:
elif meta is not None:
try:
if ak.backend(meta) != "typetracer":
raise TypeError(
Expand Down Expand Up @@ -411,9 +411,7 @@ def new_known_scalar(
dtype = np.dtype(dtype)
llg = AwkwardMaterializedLayer({(name, 0): s}, previous_layer_names=[])
hlg = HighLevelGraph.from_collections(name, llg, dependencies=())
return Scalar(
hlg, name, meta=TypeTracerArray._new(dtype=dtype, shape=()), known_value=s
)
return Scalar(hlg, name, meta=create_unknown_scalar(dtype), known_value=s)


class Record(Scalar):
Expand Down Expand Up @@ -819,7 +817,14 @@ def layout(self) -> Content:
raise ValueError("This collection's meta is None; unknown layout.")

@property
def behavior(self) -> dict:
def attrs(self) -> dict:
"""awkward Array attrs dictionary."""
if self._meta is not None:
return self._meta.attrs
raise ValueError("This collection's meta is None; no attrs property available.")

@property
def behavior(self) -> Mapping:
"""awkward Array behavior dictionary."""
if self._meta is not None:
return self._meta.behavior
Expand Down Expand Up @@ -1486,7 +1491,8 @@ def new_array_object(
name: str,
*,
meta: ak.Array | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
npartitions: int | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
) -> Array:
Expand All @@ -1505,6 +1511,10 @@ def new_array_object(
typetracer for the new Array. If the configuration option
``awkward.compute-unknown-meta`` is set to ``False``,
undefined `meta` will be assigned an empty typetracer.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : dict, optional
Custom attributes for the output array.
npartitions : int, optional
Total number of partitions; if used `divisions` will be a
tuple of length `npartitions` + 1 with all elements``None``.
Expand Down Expand Up @@ -1548,6 +1558,8 @@ def new_array_object(

if behavior is not None:
actual_meta.behavior = behavior
if attrs is not None:
actual_meta.attrs = attrs

out = Array(dsk, name, actual_meta, divs)
if actual_meta.__doc__ != actual_meta.__class__.__doc__:
Expand Down Expand Up @@ -1877,6 +1889,8 @@ def non_trivial_reduction(
keepdims: bool,
mask_identity: bool,
reducer: Callable,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
combiner: Callable | None = None,
token: str | None = None,
dtype: Any | None = None,
Expand Down Expand Up @@ -2225,7 +2239,11 @@ def typetracer_array(a: ak.Array | Array) -> ak.Array:
if isinstance(a, Array):
return a._meta
elif isinstance(a, ak.Array):
return ak.Array(a.layout.to_typetracer(forget_length=True))
return ak.Array(
a.layout.to_typetracer(forget_length=True),
behavior=a._behavior,
attrs=a._attrs,
)
else:
msg = (
"`a` should be an awkward array or a Dask awkward collection.\n"
Expand Down
65 changes: 49 additions & 16 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import logging
import math
from collections.abc import Callable, Iterable
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -65,8 +65,9 @@ def __call__(self, start: int, stop: int, **kwargs: Any) -> ak.Array:
def from_awkward(
source: ak.Array,
npartitions: int,
behavior: dict | None = None,
behavior: Mapping | None = None,
label: str | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a concrete :class:`awkward.Array` object.

Expand All @@ -76,8 +77,12 @@ def from_awkward(
The concrete awkward array.
npartitions : int
The total number of partitions for the collection.
behavior : dict, optional
Custom ak.behavior for the output array.
label : str, optional
Label for the task.
attrs : mapping, optional
Custom attributes for the output array.

Returns
-------
Expand Down Expand Up @@ -112,25 +117,35 @@ def from_awkward(
divisions=locs,
meta=meta,
behavior=behavior,
attrs=attrs,
)


class _FromListsFn:
def __init__(self, behavior: dict | None = None):
def __init__(self, behavior: Mapping | None, attrs: Mapping[str, Any] | None):
self.behavior = behavior
self.attrs = attrs

def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)
return ak.Array(x, behavior=self.behavior, attrs=self.attrs)


def from_lists(source: list) -> Array:
def from_lists(
source: list,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a list of lists.

Parameters
----------
source : list[list[Any]]
List of lists, each outer list will become a partition in the
collection.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : mapping, optional
Custom attributes for the output array.

Returns
-------
Expand All @@ -152,9 +167,9 @@ def from_lists(source: list) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(),
_FromListsFn(behavior=behavior, attrs=attrs),
lists,
meta=typetracer_array(ak.Array(lists[0])),
meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@douglasdavis hmm we might need to add a mechanism for passing a type object and/or enforcing this meta - lists could have any type in lists[1], etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I see what you're getting at here. from_lists is meant to be a pretty simple entry point to instantiating a dask-awkward array from non-disk data. I originally wrote it as a simple testing ground when adding the from_map interface and I felt like it wouldn't be a bad addition to the IO API. My thoughts here are to keep it simple and perhaps add a note to the docstring describing the function as expecting the same type for each list.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can make it a runtime error if the subsequent partitions don't have the anticipated meta?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable to me

divisions=divs,
label="from-lists",
)
Expand All @@ -163,9 +178,10 @@ def from_lists(source: list) -> Array:
def from_delayed(
source: list[Delayed] | Delayed,
meta: ak.Array | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
prefix: str = "from-delayed",
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Create an Array collection from a set of :class:`~dask.delayed.Delayed` objects.

Expand All @@ -179,10 +195,14 @@ def from_delayed(
Metadata (typetracer array) if known, if ``None`` the first
partition (first element of the list of ``Delayed`` objects)
will be computed to determine the metadata.
behavior : dict, optional
Custom ak.behavior for the output array.
divisions : tuple[int | None, ...], optional
Partition boundaries (if known).
prefix : str
Prefix for the keys in the task graph.
attrs : mapping, optional
Custom attributes for the output array.

Returns
-------
Expand All @@ -206,11 +226,7 @@ def from_delayed(
raise ValueError("divisions must be a tuple of length len(source) + 1")
hlg = HighLevelGraph.from_collections(name, dsk, dependencies=parts)
return new_array_object(
hlg,
name=name,
meta=meta,
behavior=behavior,
divisions=divs,
hlg, name=name, meta=meta, behavior=behavior, divisions=divs, attrs=attrs
)


Expand Down Expand Up @@ -346,13 +362,21 @@ def to_dask_array(
return new_da_object(graph, name, meta=None, chunks=chunks, dtype=dtype)


def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:
def from_dask_array(
array: DaskArray,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
"""Convert a Dask Array collection to a Dask Awkard Array collection.

Parameters
----------
array : dask.array.Array
Array to convert.
behavior : dict, optional
Custom ak.behavior for the output array.
attrs : mapping, optional
Custom attributes for the output array.

Returns
-------
Expand Down Expand Up @@ -389,11 +413,18 @@ def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array:
hlg = HighLevelGraph.from_collections(name, layer, dependencies=[array])
if np.any(np.isnan(array.chunks)):
return new_array_object(
hlg, name, npartitions=array.npartitions, meta=meta, behavior=behavior
hlg,
name,
npartitions=array.npartitions,
meta=meta,
behavior=behavior,
attrs=attrs,
)
else:
divs = (0, *np.cumsum(array.chunks))
return new_array_object(hlg, name, divisions=divs, meta=meta, behavior=behavior)
return new_array_object(
hlg, name, divisions=divs, meta=meta, behavior=behavior, attrs=attrs
)


def to_dataframe(
Expand Down Expand Up @@ -522,6 +553,8 @@ def from_map(
number of partitions in the output collection (only one
element of each iterable will be passed to `func` for each
partition).
args : tuple
Tuple of positional arguments to append after mapped arguments.
label : str, optional
String to use as the function-name label in the output
collection-key names.
Expand Down
13 changes: 7 additions & 6 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc
import logging
import math
from collections.abc import Callable
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Any, Literal, overload

import awkward as ak
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
self.compression = compression
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -163,7 +163,7 @@ def __init__(
form: Form,
compression: str | None = None,
schema: str | dict | list | None = None,
behavior: dict | None = None,
behavior: Mapping | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand Down Expand Up @@ -432,7 +432,8 @@ def from_json(
initial: int = 1024,
resize: float = 8,
highlevel: bool = True,
behavior: dict | None = None,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
blocksize: int | str | None = None,
delimiter: bytes | None = None,
compression: str | None = "infer",
Expand Down
Loading
Loading