Skip to content

Commit

Permalink
Revert "Fix empty cluster handling in tdigest merge (#16675)" (#16800)
Browse files Browse the repository at this point in the history
This PR reverts #16675, which has introduced another bug. Our nightly CI pipeline is failing because of this bug (NVIDIA/spark-rapids#11463). I can reproduce the bug within a libcudf unit test. I will make another PR to fix both the original issue and the new bug.

Authors:
  - Jihoon Son (https://github.com/jihoonson)

Approvers:
  - Alessandro Bellina (https://github.com/abellina)
  - Nghia Truong (https://github.com/ttnghia)
  - Bradley Dice (https://github.com/bdice)

URL: #16800
  • Loading branch information
jihoonson authored Sep 12, 2024
1 parent 1b402df commit 3dbc33a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 162 deletions.
17 changes: 8 additions & 9 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,28 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
rmm::device_async_resource_ref mr);

/**
* @brief Create a tdigest column of empty clusters.
* @brief Create an empty tdigest column.
*
* The column created contains the specified number of rows of empty clusters.
* An empty tdigest column contains a single row of length 0
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns A tdigest column of empty clusters.
* @returns An empty tdigest column.
*/
CUDF_EXPORT
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create a scalar of an empty tdigest cluster.
* @brief Create an empty tdigest scalar.
*
* The returned scalar is a struct_scalar that contains a single row of an empty cluster.
* An empty tdigest scalar is a struct_scalar that contains a single row of length 0
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns A scalar of an empty tdigest cluster.
* @returns An empty tdigest scalar.
*/
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op)
static_cast<column_view>(values).type(), tdigest_gen{}, op, values, delta);

// NOTE: an empty tdigest column still has 1 row.
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}
Expand Down Expand Up @@ -562,12 +562,12 @@ template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto a = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
Expand All @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op)
auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}
Expand Down
23 changes: 11 additions & 12 deletions cpp/src/quantiles/tdigest/tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -292,33 +292,32 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr);
}

std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto offsets = cudf::make_fixed_width_column(
data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr);
data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
offsets->mutable_view().begin<size_type>(),
offsets->mutable_view().end<size_type>(),
0);

auto min_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
auto min_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
min_col->mutable_view().begin<double>(),
min_col->mutable_view().end<double>(),
0);
auto max_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
auto max_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
max_col->mutable_view().begin<double>(),
max_col->mutable_view().end<double>(),
0);

return make_tdigest_column(num_rows,
cudf::make_empty_column(type_id::FLOAT64),
cudf::make_empty_column(type_id::FLOAT64),
return make_tdigest_column(1,
make_empty_column(type_id::FLOAT64),
make_empty_column(type_id::FLOAT64),
std::move(offsets),
std::move(min_col),
std::move(max_col),
Expand All @@ -339,7 +338,7 @@ std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release();
auto contents = make_empty_tdigest_column(stream, mr)->release();
return std::make_unique<struct_scalar>(
std::move(*std::make_unique<table>(std::move(contents.children))), true, stream, mr);
}
Expand Down
70 changes: 25 additions & 45 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ std::unique_ptr<scalar> to_tdigest_scalar(std::unique_ptr<column>&& tdigest,
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be
* set to false when there is no empty cluster, true otherwise.
* @param has_nulls Whether or not the input contains nulls
*
*/

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
Expand All @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
double* group_cluster_wl,
size_type* group_num_clusters,
size_type const* group_cluster_offsets,
bool may_have_empty_clusters)
bool has_nulls)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;

Expand All @@ -399,12 +399,11 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
// a group with nothing in it.
group_num_clusters[group_index] = 0;
if (total_weight <= 0) {
// If the input contains empty clusters, we can potentially have a group that also generates
// empty clusters because -all- of the input values are null or empty cluster. In that case, the
// `reduce_by_key` call in the tdigest generation step will need a location to store the unused
// reduction value for that group of nulls and empty clusters. These "stubs" will be
// postprocessed out afterwards.
if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; }
// if the input contains nulls we can potentially have a group that generates no
// clusters because -all- of the input values are null. in that case, the reduce_by_key call
// in the tdigest generation step will need a location to store the unused reduction value for
// that group of nulls. these "stubs" will be postprocessed out afterwards.
if (has_nulls) { group_num_clusters[group_index] = 1; }
return;
}

Expand Down Expand Up @@ -503,8 +502,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
* stream that falls before our current cluster limit
* @param group_info A functor which returns the info for the specified group (total weight,
* size and start offset)
* @param may_have_empty_clusters Whether or not there could be empty clusters. It should be
* set to false only when there is no empty cluster.
* @param has_nulls Whether or not the input data contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -518,7 +516,7 @@ generate_group_cluster_info(int delta,
NearestWeight nearest_weight,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
bool may_have_empty_clusters,
bool has_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -537,7 +535,7 @@ generate_group_cluster_info(int delta,
nullptr,
group_num_clusters.begin(),
nullptr,
may_have_empty_clusters);
has_nulls);

// generate group cluster offsets (where the clusters for a given group start and end)
auto group_cluster_offsets = cudf::make_numeric_column(
Expand Down Expand Up @@ -569,7 +567,7 @@ generate_group_cluster_info(int delta,
group_cluster_wl.begin(),
group_num_clusters.begin(),
group_cluster_offsets->view().begin<size_type>(),
may_have_empty_clusters);
has_nulls);

return {std::move(group_cluster_wl),
std::move(group_cluster_offsets),
Expand All @@ -582,7 +580,7 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
std::unique_ptr<column>&& offsets,
std::unique_ptr<column>&& min_col,
std::unique_ptr<column>&& max_col,
bool may_have_empty_clusters,
bool has_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -597,7 +595,7 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; };

size_type const num_stubs = [&]() {
if (!may_have_empty_clusters) { return 0; }
if (!has_nulls) { return 0; }
auto iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<size_type>(is_stub_digest));
return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows);
Expand Down Expand Up @@ -663,10 +661,6 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
mr);
}

/**
* @brief A functor which returns the cluster index within a group that the value at
* the given value index falls into.
*/
template <typename CumulativeWeight>
struct compute_tdigests_keys_fn {
int const delta;
Expand Down Expand Up @@ -712,17 +706,16 @@ struct compute_tdigests_keys_fn {
* boundaries.
*
* @param delta tdigest compression level
* @param centroids_begin Beginning of the range of centroids.
* @param centroids_end End of the range of centroids.
* @param values_begin Beginning of the range of input values.
* @param values_end End of the range of input values.
* @param cumulative_weight Functor which returns cumulative weight and group information for
* an absolute input value index.
* @param min_col Column containing the minimum value per group.
* @param max_col Column containing the maximum value per group.
* @param group_cluster_wl Cluster weight limits for each group.
* @param group_cluster_offsets R-value reference of offsets into the cluster weight limits.
* @param total_clusters Total number of clusters in all groups.
* @param may_have_empty_clusters Whether or not there could be empty clusters. It should be
* set to false only when there is no empty cluster.
* @param has_nulls Whether or not the input contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -738,7 +731,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
rmm::device_uvector<double> const& group_cluster_wl,
std::unique_ptr<column>&& group_cluster_offsets,
size_type total_clusters,
bool may_have_empty_clusters,
bool has_nulls,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -757,9 +750,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
// double // max
// }
//
if (total_clusters == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}
if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }

// each input group represents an individual tdigest. within each tdigest, we want the keys
// to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall
Expand Down Expand Up @@ -802,7 +793,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
std::move(group_cluster_offsets),
std::move(min_col),
std::move(max_col),
may_have_empty_clusters,
has_nulls,
stream,
mr);
}
Expand Down Expand Up @@ -1154,13 +1145,8 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
auto merged =
cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref());

auto merged_weights = merged->get_column(1).view();
// If there are no values, we can simply return a column that has only empty tdigests.
if (merged_weights.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr);
}

// generate cumulative weights
auto merged_weights = merged->get_column(1).view();
auto cumulative_weights = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream);
auto keys = cudf::detail::make_counting_transform_iterator(
Expand All @@ -1175,10 +1161,6 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,

auto const delta = max_centroids;

// We do not know whether there is any empty cluster in the input without actually reading the
// data, which could be expensive. So, we just assume that there could be empty clusters.
auto const may_have_empty_clusters = true;

// generate cluster info
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
Expand All @@ -1195,7 +1177,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
may_have_empty_clusters,
false,
stream,
mr);

Expand All @@ -1220,7 +1202,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_cluster_wl,
std::move(group_cluster_offsets),
total_clusters,
may_have_empty_clusters,
false,
stream,
mr);
}
Expand Down Expand Up @@ -1285,9 +1267,7 @@ std::unique_ptr<column> group_tdigest(column_view const& col,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (col.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}
if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }

auto const delta = max_centroids;
return cudf::type_dispatcher(col.type(),
Expand All @@ -1313,7 +1293,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
tdigest_column_view tdv(input);

if (num_groups == 0 || input.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr);
}

// bring group offsets back to the host
Expand Down
Loading

0 comments on commit 3dbc33a

Please sign in to comment.