Skip to content

Commit

Permalink
Limit the number of keys to calculate column sizes and page starts in…
Browse files Browse the repository at this point in the history
… PQ reader to 1B (rapidsai#17059)

This PR limits the number of keys to use at a time to calculate column `sizes` and `page_start_values` to 1B averting possible OOM and UB from implicit typecasting of `size_t` iterator to `size_type` iterators in `thrust::reduce_by_key`.

Closes rapidsai#16985
Closes rapidsai#17086 

## Resolved
- [x] Add tests
- [x] Debug with fingerprinting structs table for a possible bug in PQ writer (nothing seems wrong with the writer as pyarrow is able to read the written parquet files).

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)

URL: rapidsai#17059
  • Loading branch information
mhaseeb123 authored Oct 17, 2024
1 parent 920a5f6 commit 00feb82
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 28 deletions.
89 changes: 61 additions & 28 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <thrust/unique.h>

#include <bitset>
#include <limits>
#include <numeric>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -1592,36 +1593,68 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, cudf::get_current_device_resource_ref());

auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();
// size iterator. indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys, _stream};
thrust::transform(
rmm::exec_policy(_stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_keys),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{subpass.pages.size()});
// Vector to store page sizes for each column at each depth
cudf::detail::hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
start_offset_output_iterator{
subpass.pages.device_begin(), 0, d_cols_info.data(), max_depth, subpass.pages.size()});
// Total number of keys to process
auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();

// Maximum 1 billion keys processed per iteration
auto constexpr max_keys_per_iter =
static_cast<size_t>(std::numeric_limits<size_type>::max() / 2);

// Number of keys for per each column
auto const num_keys_per_col = max_depth * subpass.pages.size();

// The largest multiple of `num_keys_per_col` that is <= `num_keys`
auto const num_keys_per_iter =
num_keys <= max_keys_per_iter
? num_keys
: num_keys_per_col * std::max<size_t>(1, max_keys_per_iter / num_keys_per_col);

// Size iterator. Indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys_per_iter, _stream};

// To keep track of the starting key of an iteration
size_t key_start = 0;
// Loop until all keys are processed
while (key_start < num_keys) {
// Number of keys processed in this iteration
auto const num_keys_this_iter = std::min<size_t>(num_keys_per_iter, num_keys - key_start);
thrust::transform(
rmm::exec_policy_nosync(_stream),
thrust::make_counting_iterator<size_t>(key_start),
thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});

// Manually create a int64_t `key_start` compatible counting_transform_iterator to avoid
// implicit casting to size_type.
auto const reduction_keys = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_t>(key_start), get_reduction_key{subpass.pages.size()});

// Find the size of each column
thrust::reduce_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin() + (key_start / subpass.pages.size()));

// For nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
start_offset_output_iterator{subpass.pages.device_begin(),
key_start,
d_cols_info.data(),
max_depth,
subpass.pages.size()});
// Increment the key_start
key_start += num_keys_this_iter;
}

sizes.device_to_host_sync(_stream);
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
Expand Down
35 changes: 35 additions & 0 deletions cpp/tests/io/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2724,3 +2724,38 @@ TYPED_TEST(ParquetReaderPredicatePushdownTest, FilterTyped)
EXPECT_EQ(result_table.num_columns(), expected->num_columns());
CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result_table);
}

TEST_F(ParquetReaderTest, ListsWideTable)
{
auto constexpr num_rows = 2;
auto constexpr num_cols = 26'755; // for slightly over 2B keys
auto constexpr seed = 0xceed;

std::mt19937 engine{seed};

auto list_list = make_parquet_list_list_col<int32_t>(0, num_rows, 1, 1, false);
auto list_list_nulls = make_parquet_list_list_col<int32_t>(0, num_rows, 1, 1, true);

// switch between nullable and non-nullable
std::vector<cudf::column_view> cols(num_cols);
bool with_nulls = false;
std::generate_n(cols.begin(), num_cols, [&]() {
auto const view = with_nulls ? list_list_nulls->view() : list_list->view();
with_nulls = not with_nulls;
return view;
});

cudf::table_view expected(cols);

// Use a host buffer for faster I/O
std::vector<char> buffer;
auto const out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected).build();
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size()));
auto const [result, _] = cudf::io::read_parquet(default_in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result->view());
}

0 comments on commit 00feb82

Please sign in to comment.