Skip to content

Commit

Permalink
keep trying
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Nov 9, 2023
1 parent 1e23c84 commit b54bbe8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Homepage = "https://github.com/dask-contrib/dask-awkward"

[project.optional-dependencies]
io = [
"pyarrow;python_version<\"3.12\"",
"pyarrow",
]
complete = [
"dask-awkward[io]",
Expand Down
28 changes: 17 additions & 11 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

try:
from distributed.queues import Queue
from distributed.worker import get_worker
except ImportError:
Queue = None # type: ignore
Queue = None
get_worker = None


from dask_awkward.layers.layers import (
Expand Down Expand Up @@ -413,6 +415,7 @@ def to_dataframe(
"""
import dask
from dask.dataframe.core import DataFrame as DaskDataFrame
from dask.dataframe.core import new_dd_object

if optimize_graph:
Expand Down Expand Up @@ -475,23 +478,26 @@ def wrapped(*args, **kwargs):
try:
return fn(*args, **kwargs)
except allowed_exceptions as err:
if Queue is not None:
queue = Queue("dak_returned_empty")
queue.put((args, kwargs, str(err)))
if Queue is not None and get_worker is not None:
try:
_ = get_worker()
queue = Queue("dak_returned_empty")
queue.put((args, kwargs, str(err)))
except ValueError:
pass

return fn.mock_empty(backend)

return wrapped


def returned_empty_report() -> list[Any]:
if Queue is None:
return []
queue = Queue("dak_returned_empty")
things = []
while queue.qsize():
things.append(queue.get())
return things
report = []
if Queue is not None:
queue = Queue("dak_returned_empty")
while queue.qsize():
report.append(queue.get())
return report


def from_map(
Expand Down

0 comments on commit b54bbe8

Please sign in to comment.