From b54bbe84b4fe08f64a1a0ed418efe937360d7746 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 9 Nov 2023 13:01:53 -0600 Subject: [PATCH] keep trying --- pyproject.toml | 2 +- src/dask_awkward/lib/io/io.py | 28 +++++++++++++++++----------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f5e4d29b..3d50be57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ Homepage = "https://github.com/dask-contrib/dask-awkward" [project.optional-dependencies] io = [ - "pyarrow;python_version<\"3.12\"", + "pyarrow", ] complete = [ "dask-awkward[io]", diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 2b2ac8c2..1112bb73 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -17,8 +17,10 @@ try: from distributed.queues import Queue + from distributed.worker import get_worker except ImportError: - Queue = None # type: ignore + Queue = None + get_worker = None from dask_awkward.layers.layers import ( @@ -413,6 +415,7 @@ def to_dataframe( """ import dask + from dask.dataframe.core import DataFrame as DaskDataFrame from dask.dataframe.core import new_dd_object if optimize_graph: @@ -475,9 +478,13 @@ def wrapped(*args, **kwargs): try: return fn(*args, **kwargs) except allowed_exceptions as err: - if Queue is not None: - queue = Queue("dak_returned_empty") - queue.put((args, kwargs, str(err))) + if Queue is not None and get_worker is not None: + try: + _ = get_worker() + queue = Queue("dak_returned_empty") + queue.put((args, kwargs, str(err))) + except ValueError: + pass return fn.mock_empty(backend) @@ -485,13 +492,12 @@ def wrapped(*args, **kwargs): def returned_empty_report() -> list[Any]: - if Queue is None: - return [] - queue = Queue("dak_returned_empty") - things = [] - while queue.qsize(): - things.append(queue.get()) - return things + report = [] + if Queue is not None: + queue = Queue("dak_returned_empty") + while queue.qsize(): + report.append(queue.get()) + return report def from_map(