Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark column chunks in a PQ reader pass as large strings when the cumulative offsets exceeds the large strings threshold. #17207

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,36 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
_stream);
}

// column string sizes for this subpass
col_string_sizes = calculate_page_string_offsets();

// ensure cumulative column string sizes have been initialized
if (pass.cumulative_col_string_sizes.empty()) {
pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0);
}

// add to cumulative column string sizes from this subpass
std::transform(pass.cumulative_col_string_sizes.begin(),
pass.cumulative_col_string_sizes.end(),
col_string_sizes.begin(),
pass.cumulative_col_string_sizes.begin(),
vuule marked this conversation as resolved.
Show resolved Hide resolved
std::plus<>{});

// check for overflow
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(),
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
pass.cumulative_col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// mark any chunks that are large string columns
// Mark any chunks for which the cumulative string columns size has exceeded the large strings
// threshold
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
}
}
Expand Down Expand Up @@ -195,7 +209,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
// only do string buffer for leaf
if (idx == max_depth - 1 and out_buf.string_size() == 0 and
col_string_sizes[pass.chunks[c].src_col_index] > 0) {
out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index], _stream);
out_buf.create_string_data(
col_string_sizes[pass.chunks[c].src_col_index],
pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] >
static_cast<size_t>(strings::detail::get_offset64_threshold()),
_stream);
}
if (has_strings) { str_data[idx] = out_buf.string_data(); }
out_buf.user_data |=
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ struct pass_intermediate_data {
rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()};
rmm::device_uvector<string_index_pair> str_dict_index{0, cudf::get_default_stream()};

// cumulative strings column sizes.
std::vector<size_t> cumulative_col_string_sizes{};

int level_type_size{0};

// skip_rows / num_rows for this pass.
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ void cudf::io::detail::inline_column_buffer::allocate_strings_data(bool memset_d
}

void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes,
bool is_large_strings_col,
rmm::cuda_stream_view stream)
{
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
_is_large_strings_col = is_large_strings_col;
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
}

namespace {
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,17 @@ class inline_column_buffer : public column_buffer_base<inline_column_buffer> {
[[nodiscard]] size_t data_size_impl() const { return _data.size(); }
std::unique_ptr<column> make_string_column_impl(rmm::cuda_stream_view stream);

void create_string_data(size_t num_bytes, rmm::cuda_stream_view stream);
void create_string_data(size_t num_bytes,
bool is_large_strings_col,
rmm::cuda_stream_view stream);
void* string_data() { return _string_data.data(); }
[[nodiscard]] void const* string_data() const { return _string_data.data(); }
[[nodiscard]] size_t string_size() const { return _string_data.size(); }
[[nodiscard]] bool is_large_strings_column() const { return _is_large_strings_col; }

private:
rmm::device_buffer _string_data{};
bool _is_large_strings_col{};
};

using column_buffer = gather_column_buffer;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/utilities/column_buffer_strings.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_colu
{
// if the size of _string_data is over the threshold for 64bit size_type, _data will contain
// sizes rather than offsets. need special handling for that case.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
if (_string_data.size() > threshold) {
if (is_large_strings_column()) {
if (not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
Expand Down
55 changes: 55 additions & 0 deletions cpp/tests/large_strings/parquet_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cudf_test/table_utilities.hpp>

#include <cudf/concatenate.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>
Expand Down Expand Up @@ -69,3 +70,57 @@ TEST_F(ParquetStringsTest, ReadLargeStrings)
// go back to normal threshold
unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD");
}

TEST_F(ParquetStringsTest, ChunkedReadLargeStrings)
{
// Construct a table with one large strings column > 2GB
auto const wide = this->wide_column();
auto input = cudf::concatenate(std::vector<cudf::column_view>(120000, wide)); ///< 230MB

int constexpr multiplier = 12;
std::vector<cudf::column_view> input_cols(multiplier, input->view());
auto col0 = cudf::concatenate(input_cols); ///< 2.70GB
Copy link
Member Author

@mhaseeb123 mhaseeb123 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same column from GTest: CaseTest.ToLower


// Expected table
auto const expected = cudf::table_view{{col0->view()}};
auto expected_metadata = cudf::io::table_input_metadata{expected};
expected_metadata.column_metadata[0].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

// Write to Parquet
auto const filepath = g_temp_env->get_temp_filepath("ChunkedReadLargeStrings.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.compression(cudf::io::compression_type::ZSTD)
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
.stats_level(cudf::io::STATISTICS_NONE)
.metadata(expected_metadata);
cudf::io::write_parquet(out_opts);

// Read with chunked_parquet_reader
size_t constexpr pass_read_limit =
size_t{8} * 1024 * 1024 *
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
1024; ///< Set to 8GB so we read almost entire table (>2GB string) in the first subpass
///< and only a small amount in the second subpass.
cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto reader = cudf::io::chunked_parquet_reader(0, pass_read_limit, default_in_opts);

auto tables = std::vector<std::unique_ptr<cudf::table>>{};
while (reader.has_next()) {
tables.emplace_back(reader.read_chunk().tbl);
}
auto table_views = std::vector<cudf::table_view>{};
std::transform(tables.begin(), tables.end(), std::back_inserter(table_views), [](auto& tbl) {
return tbl->view();
});
auto result = cudf::concatenate(table_views);
auto const result_view = result->view();

// Verify
for (auto cv : result_view) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto const offsets = cudf::strings_column_view(cv).offsets();
EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64});
}
EXPECT_EQ(tables.size(), 2);
CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected);
}
Loading