Skip to content

Commit

Permalink
Remove "legacy" Dask DataFrame support from Dask cuDF (#17558)
Browse files Browse the repository at this point in the history
The legacy Dask DataFrame API is deprecated. We should remove it for 25.02 to reduce maintenance burden.

**Blockers**:

- [x] rapidsai/dask-cuda#1417

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - James Lamb (https://github.com/jameslamb)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Matthew Roeschke (https://github.com/mroeschke)

URL: #17558
  • Loading branch information
rjzamora authored Jan 7, 2025
1 parent 4e97cd4 commit 30c6caa
Show file tree
Hide file tree
Showing 35 changed files with 864 additions and 3,311 deletions.
13 changes: 3 additions & 10 deletions ci/test_python_other.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Copyright (c) 2022-2025, NVIDIA CORPORATION.

# Support invoking test_python_cudf.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../
Expand All @@ -24,8 +24,8 @@ EXITCODE=0
trap "EXITCODE=1" ERR
set +e

rapids-logger "pytest dask_cudf (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \
rapids-logger "pytest dask_cudf"
./ci/run_dask_cudf_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \
--numprocesses=8 \
--dist=worksteal \
Expand All @@ -34,13 +34,6 @@ DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cudf-coverage.xml" \
--cov-report=term

rapids-logger "pytest dask_cudf (legacy)"
DASK_DATAFRAME__QUERY_PLANNING=False ./ci/run_dask_cudf_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \
--numprocesses=8 \
--dist=worksteal \
.

rapids-logger "pytest cudf_kafka"
./ci/run_cudf_kafka_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cudf-kafka.xml"
Expand Down
16 changes: 3 additions & 13 deletions ci/test_wheel_dask_cudf.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.

set -eou pipefail

Expand Down Expand Up @@ -30,21 +30,11 @@ RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/
mkdir -p "${RAPIDS_TESTS_DIR}"

# Run tests in dask_cudf/tests and dask_cudf/io/tests
rapids-logger "pytest dask_cudf (dask-expr)"
rapids-logger "pytest dask_cudf"
pushd python/dask_cudf/dask_cudf
DASK_DATAFRAME__QUERY_PLANNING=True python -m pytest \
python -m pytest \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \
--numprocesses=8 \
--dist=worksteal \
.
popd

# Run tests in dask_cudf/tests and dask_cudf/io/tests (legacy)
rapids-logger "pytest dask_cudf (legacy)"
pushd python/dask_cudf/dask_cudf
DASK_DATAFRAME__QUERY_PLANNING=False python -m pytest \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \
--numprocesses=8 \
--dist=worksteal \
.
popd
56 changes: 15 additions & 41 deletions python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# Copyright (c) 2018-2024, NVIDIA CORPORATION.

import warnings
from importlib import import_module
# Copyright (c) 2018-2025, NVIDIA CORPORATION.

import dask.dataframe as dd
from dask import config
from dask.dataframe import from_delayed

import cudf

from . import backends # noqa: F401
from . import backends, io # noqa: F401
from ._expr.expr import _patch_dask_expr
from ._version import __git_commit__, __version__ # noqa: F401
from .core import DataFrame, Index, Series, concat, from_cudf
from .core import DataFrame, Index, Series, _deprecated_api, concat, from_cudf

QUERY_PLANNING_ON = dd.DASK_EXPR_ENABLED
if not (QUERY_PLANNING_ON := dd._dask_expr_enabled()):
raise ValueError(
"The legacy DataFrame API is not supported in dask_cudf>24.12. "
"Please enable query-planning, or downgrade to dask_cudf<=24.12"
)


def read_csv(*args, **kwargs):
Expand All @@ -36,46 +38,18 @@ def read_parquet(*args, **kwargs):
return dd.read_parquet(*args, **kwargs)


def _deprecated_api(old_api, new_api=None, rec=None):
def inner_func(*args, **kwargs):
if new_api:
# Use alternative
msg = f"{old_api} is now deprecated. "
msg += rec or f"Please use {new_api} instead."
warnings.warn(msg, FutureWarning)
new_attr = new_api.split(".")
module = import_module(".".join(new_attr[:-1]))
return getattr(module, new_attr[-1])(*args, **kwargs)

# No alternative - raise an error
raise NotImplementedError(
f"{old_api} is no longer supported. " + (rec or "")
)

return inner_func


if QUERY_PLANNING_ON:
from . import io
from ._expr.expr import _patch_dask_expr

groupby_agg = _deprecated_api("dask_cudf.groupby_agg")
read_text = DataFrame.read_text
_patch_dask_expr()

else:
from . import io # noqa: F401
from ._legacy.groupby import groupby_agg # noqa: F401
from ._legacy.io import read_text # noqa: F401


groupby_agg = _deprecated_api("dask_cudf.groupby_agg")
read_text = DataFrame.read_text
to_orc = _deprecated_api(
"dask_cudf.to_orc",
new_api="dask_cudf._legacy.io.to_orc",
new_api="dask_cudf.io.to_orc",
rec="Please use DataFrame.to_orc instead.",
)


_patch_dask_expr()


__all__ = [
"DataFrame",
"Index",
Expand Down
36 changes: 16 additions & 20 deletions python/dask_cudf/dask_cudf/_expr/collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.

import warnings
from functools import cached_property
Expand All @@ -15,19 +15,11 @@

from dask import config
from dask.dataframe.core import is_dataframe_like
from dask.dataframe.dispatch import get_parallel_type
from dask.typing import no_default

import cudf

_LEGACY_WORKAROUND = (
"To enable the 'legacy' dask-cudf API, set the "
"global 'dataframe.query-planning' config to "
"`False` before dask is imported. This can also "
"be done by setting an environment variable: "
"`DASK_DATAFRAME__QUERY_PLANNING=False` "
)


##
## Custom collection classes
##
Expand Down Expand Up @@ -103,9 +95,8 @@ def set_index(
divisions = None
warnings.warn(
"Ignoring divisions='quantile'. This option is now "
"deprecated. Please use the legacy API and raise an "
"issue on github if this feature is necessary."
f"\n{_LEGACY_WORKAROUND}",
"deprecated. Please raise an issue on github if this "
"feature is necessary.",
FutureWarning,
)

Expand Down Expand Up @@ -135,9 +126,7 @@ def groupby(

if kwargs.pop("as_index") is not True:
raise NotImplementedError(
f"{msg} Please reset the index after aggregating, or "
"use the legacy API if `as_index=False` is required.\n"
f"{_LEGACY_WORKAROUND}"
f"{msg} Please reset the index after aggregating."
)
else:
warnings.warn(msg, FutureWarning)
Expand All @@ -153,15 +142,15 @@ def groupby(
)

def to_orc(self, *args, **kwargs):
from dask_cudf._legacy.io import to_orc
from dask_cudf.io.orc import to_orc as to_orc_impl

return to_orc(self, *args, **kwargs)
return to_orc_impl(self, *args, **kwargs)

@staticmethod
def read_text(*args, **kwargs):
from dask_cudf._legacy.io.text import read_text as legacy_read_text
from dask_cudf.io.text import read_text as read_text_impl

return legacy_read_text(*args, **kwargs)
return read_text_impl(*args, **kwargs)

def clip(self, lower=None, upper=None, axis=1):
if axis not in (None, 1):
Expand Down Expand Up @@ -197,6 +186,13 @@ class Index(DXIndex, CudfFrameBase):
pass # Same as pandas (for now)


# dask.dataframe dispatch
get_parallel_type.register(cudf.DataFrame, lambda _: DataFrame)
get_parallel_type.register(cudf.Series, lambda _: Series)
get_parallel_type.register(cudf.BaseIndex, lambda _: Index)


# dask_expr dispatch (might go away?)
get_collection_type.register(cudf.DataFrame, lambda _: DataFrame)
get_collection_type.register(cudf.Series, lambda _: Series)
get_collection_type.register(cudf.BaseIndex, lambda _: Index)
Expand Down
8 changes: 4 additions & 4 deletions python/dask_cudf/dask_cudf/_expr/expr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
import functools

import dask_expr._shuffle as _shuffle_module
Expand All @@ -7,13 +7,13 @@
from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns
from dask_expr._reductions import Reduction, Var

from dask.dataframe.core import (
is_dataframe_like,
from dask.dataframe.dispatch import (
is_categorical_dtype,
make_meta,
meta_nonempty,
)
from dask.dataframe.dispatch import is_categorical_dtype
from dask.typing import no_default
from dask.utils import is_dataframe_like

import cudf

Expand Down
Loading

0 comments on commit 30c6caa

Please sign in to comment.