Skip to content

Commit

Permalink
Use logical types in Parquet reader (rapidsai#15365)
Browse files Browse the repository at this point in the history
Closes rapidsai#15224. Now use logical type exclusively in the reader rather than the deprecated converted type.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - MithunR (https://github.com/mythrocks)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: rapidsai#15365
  • Loading branch information
etseidl authored Mar 27, 2024
1 parent 747c8e1 commit a7ceede
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 193 deletions.
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ __device__ inline void gpuDecodeValues(
constexpr int max_batch_size = num_warps * cudf::detail::warp_size;

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;
int const dtype = s->col.data_type & 7;
int const dtype = s->col.physical_type;

// decode values
int pos = start;
Expand All @@ -187,7 +187,7 @@ __device__ inline void gpuDecodeValues(
uint32_t dtype_len = s->dtype_len;
void* dst =
nesting_info_base[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
if (s->col.converted_type == DECIMAL) {
if (s->col.logical_type.has_value() && s->col.logical_type->type == LogicalType::DECIMAL) {
switch (dtype) {
case INT32: gpuOutputFast(s, sb, src_pos, static_cast<uint32_t*>(dst)); break;
case INT64: gpuOutputFast(s, sb, src_pos, static_cast<uint2*>(dst)); break;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
// we only need to preprocess hierarchies with repetition in them (ie, hierarchies
// containing lists anywhere within).
compute_string_sizes =
compute_string_sizes && ((s->col.data_type & 7) == BYTE_ARRAY && s->dtype_len != 4);
compute_string_sizes && s->col.physical_type == BYTE_ARRAY && !s->col.is_strings_to_cat;

// early out optimizations:

Expand Down
18 changes: 10 additions & 8 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
if (s->dict_base) {
out_thread0 = (s->dict_bits > 0) ? 64 : 32;
} else {
switch (s->col.data_type & 7) {
switch (s->col.physical_type) {
case BOOLEAN: [[fallthrough]];
case BYTE_ARRAY: [[fallthrough]];
case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break;
Expand Down Expand Up @@ -123,16 +123,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
// be needed in the other DecodeXXX kernels.
if (s->dict_base) {
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, t & 0x1f).first;
} else if ((s->col.data_type & 7) == BOOLEAN) {
} else if (s->col.physical_type == BOOLEAN) {
src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f);
} else if ((s->col.data_type & 7) == BYTE_ARRAY or
(s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
} else if (s->col.physical_type == BYTE_ARRAY or
s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, t & 0x1f);
}
if (t == 32) { s->dict_pos = src_target_pos; }
} else {
// WARP1..WARP3: Decode values
int const dtype = s->col.data_type & 7;
int const dtype = s->col.physical_type;
src_pos += t - out_thread0;

// the position in the output column/buffer
Expand Down Expand Up @@ -166,10 +166,12 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
uint32_t dtype_len = s->dtype_len;
void* dst =
nesting_info_base[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
auto const is_decimal =
s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL;
if (dtype == BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
if (is_decimal) {
auto const [ptr, len] = gpuGetStringData(s, sb, val_src_pos);
auto const decimal_precision = s->col.decimal_precision;
auto const decimal_precision = s->col.logical_type->precision();
if (decimal_precision <= MAX_DECIMAL32_PRECISION) {
gpuOutputByteArrayAsInt(ptr, len, static_cast<int32_t*>(dst));
} else if (decimal_precision <= MAX_DECIMAL64_PRECISION) {
Expand All @@ -182,7 +184,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
} else if (dtype == BOOLEAN) {
gpuOutputBoolean(sb, val_src_pos, static_cast<uint8_t*>(dst));
} else if (s->col.converted_type == DECIMAL) {
} else if (is_decimal) {
switch (dtype) {
case INT32: gpuOutputFast(s, sb, val_src_pos, static_cast<uint32_t*>(dst)); break;
case INT64: gpuOutputFast(s, sb, val_src_pos, static_cast<uint2*>(dst)); break;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ template <typename state_buf>
inline __device__ void gpuOutputString(page_state_s* s, state_buf* sb, int src_pos, void* dstv)
{
auto [ptr, len] = gpuGetStringData(s, sb, src_pos);
// make sure to only hash `BYTE_ARRAY` when specified with the output type size
if (s->dtype_len == 4 and (s->col.data_type & 7) == BYTE_ARRAY) {
if (s->col.is_strings_to_cat and s->col.physical_type == BYTE_ARRAY) {
// Output hash. This hash value is used if the option to convert strings to
// categoricals is enabled. The seed value is chosen arbitrarily.
uint32_t constexpr hash_seed = 33;
Expand Down
58 changes: 30 additions & 28 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int ta

while (pos < target_pos) {
int len = 0;
if ((s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
if (k < dict_size) { len = s->dtype_len_in; }
} else {
if (k + 4 <= dict_size) {
Expand Down Expand Up @@ -1144,11 +1144,11 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (s->page.num_input_values > 0) {
uint8_t* cur = s->page.page_data;
uint8_t* end = cur + s->page.uncompressed_page_size;

uint32_t dtype_len_out = s->col.data_type >> 3;
s->ts_scale = 0;
s->ts_scale = 0;
// Validate data type
auto const data_type = s->col.data_type & 7;
auto const data_type = s->col.physical_type;
auto const is_decimal =
s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL;
switch (data_type) {
case BOOLEAN:
s->dtype_len = 1; // Boolean are stored as 1 byte on the output
Expand All @@ -1159,13 +1159,15 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (s->col.ts_clock_rate) {
int32_t units = 0;
// Duration types are not included because no scaling is done when reading
if (s->col.converted_type == TIMESTAMP_MILLIS) {
units = cudf::timestamp_ms::period::den;
} else if (s->col.converted_type == TIMESTAMP_MICROS) {
units = cudf::timestamp_us::period::den;
} else if (s->col.logical_type.has_value() and
s->col.logical_type->is_timestamp_nanos()) {
units = cudf::timestamp_ns::period::den;
if (s->col.logical_type.has_value()) {
auto const& lt = s->col.logical_type.value();
if (lt.is_timestamp_millis()) {
units = cudf::timestamp_ms::period::den;
} else if (lt.is_timestamp_micros()) {
units = cudf::timestamp_us::period::den;
} else if (lt.is_timestamp_nanos()) {
units = cudf::timestamp_ns::period::den;
}
}
if (units and units != s->col.ts_clock_rate) {
s->ts_scale = (s->col.ts_clock_rate < units) ? -(units / s->col.ts_clock_rate)
Expand All @@ -1176,8 +1178,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
case DOUBLE: s->dtype_len = 8; break;
case INT96: s->dtype_len = 12; break;
case BYTE_ARRAY:
if (s->col.converted_type == DECIMAL) {
auto const decimal_precision = s->col.decimal_precision;
if (is_decimal) {
auto const decimal_precision = s->col.logical_type->precision();
s->dtype_len = [decimal_precision]() {
if (decimal_precision <= MAX_DECIMAL32_PRECISION) {
return sizeof(int32_t);
Expand All @@ -1192,14 +1194,14 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}
break;
default: // FIXED_LEN_BYTE_ARRAY:
s->dtype_len = dtype_len_out;
s->dtype_len = s->col.type_length;
if (s->dtype_len <= 0) { s->set_error_code(decode_error::INVALID_DATA_TYPE); }
break;
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
if (data_type == FIXED_LEN_BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
if (is_decimal) {
s->dtype_len = [dtype_len = s->dtype_len]() {
if (dtype_len <= sizeof(int32_t)) {
return sizeof(int32_t);
Expand All @@ -1213,17 +1215,17 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dtype_len = sizeof(string_index_pair);
}
} else if (data_type == INT32) {
if (dtype_len_out == 1) {
// INT8 output
s->dtype_len = 1;
} else if (dtype_len_out == 2) {
// INT16 output
s->dtype_len = 2;
} else if (s->col.converted_type == TIME_MILLIS) {
// INT64 output
s->dtype_len = 8;
// check for smaller bitwidths
if (s->col.logical_type.has_value()) {
auto const& lt = s->col.logical_type.value();
if (lt.type == LogicalType::INTEGER) {
s->dtype_len = lt.bit_width() / 8;
} else if (lt.is_time_millis()) {
// cudf outputs as INT64
s->dtype_len = 8;
}
}
} else if (data_type == BYTE_ARRAY && dtype_len_out == 4) {
} else if (data_type == BYTE_ARRAY && s->col.is_strings_to_cat) {
s->dtype_len = 4; // HASH32 output
} else if (data_type == INT96) {
s->dtype_len = 8; // Convert to 64-bit timestamp
Expand Down Expand Up @@ -1298,7 +1300,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// RLE-packed dictionary indices, first byte indicates index length in bits
if (((s->col.data_type & 7) == BYTE_ARRAY) && (s->col.str_dict_index)) {
if (s->col.physical_type == BYTE_ARRAY && s->col.str_dict_index != nullptr) {
// String dictionary: use index
s->dict_base = reinterpret_cast<uint8_t const*>(s->col.str_dict_index);
s->dict_size = s->col.dict_page->num_input_values * sizeof(string_index_pair);
Expand All @@ -1316,7 +1318,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
case Encoding::PLAIN:
s->dict_size = static_cast<int32_t>(end - cur);
s->dict_val = 0;
if ((s->col.data_type & 7) == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; }
if (s->col.physical_type == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; }
break;
case Encoding::RLE: {
// first 4 bytes are length of RLE data
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ __device__ inline bool is_nested(ColumnChunkDesc const& chunk)

__device__ inline bool is_byte_array(ColumnChunkDesc const& chunk)
{
return (chunk.data_type & 7) == BYTE_ARRAY;
return chunk.physical_type == BYTE_ARRAY;
}

__device__ inline bool is_boolean(ColumnChunkDesc const& chunk)
{
return (chunk.data_type & 7) == BOOLEAN;
return chunk.physical_type == BOOLEAN;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPage
auto const start_value = pp->start_val;

// if data size is known, can short circuit here
if ((chunks[pp->chunk_idx].data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (chunks[pp->chunk_idx].physical_type == FIXED_LEN_BYTE_ARRAY) {
if (t == 0) {
pp->str_bytes = pp->num_valids * s->dtype_len_in;

Expand Down Expand Up @@ -881,7 +881,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi
auto const& col = s->col;
size_t str_bytes = 0;
// short circuit for FIXED_LEN_BYTE_ARRAY
if ((col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (col.physical_type == FIXED_LEN_BYTE_ARRAY) {
str_bytes = pp->num_valids * s->dtype_len_in;
} else {
// now process string info in the range [start_value, end_value)
Expand Down
41 changes: 23 additions & 18 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,32 +370,32 @@ struct ColumnChunkDesc {
explicit ColumnChunkDesc(size_t compressed_size_,
uint8_t* compressed_data_,
size_t num_values_,
uint16_t datatype_,
uint16_t datatype_length_,
Type datatype_,
int32_t datatype_length_,
size_t start_row_,
uint32_t num_rows_,
int16_t max_definition_level_,
int16_t max_repetition_level_,
int16_t max_nesting_depth_,
uint8_t def_level_bits_,
uint8_t rep_level_bits_,
int8_t codec_,
int8_t converted_type_,
Compression codec_,
thrust::optional<LogicalType> logical_type_,
int8_t decimal_precision_,
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_,
column_chunk_info const* chunk_info_,
float list_bytes_per_row_est_)
float list_bytes_per_row_est_,
bool strings_to_categorical_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
start_row(start_row_),
num_rows(num_rows_),
max_level{max_definition_level_, max_repetition_level_},
max_nesting_depth{max_nesting_depth_},
data_type(datatype_ | (datatype_length_ << 3)),
type_length(datatype_length_),
physical_type(datatype_),
level_bits{def_level_bits_, rep_level_bits_},
num_data_pages(0),
num_dict_pages(0),
Expand All @@ -405,14 +405,13 @@ struct ColumnChunkDesc {
column_data_base{nullptr},
column_string_base{nullptr},
codec(codec_),
converted_type(converted_type_),
logical_type(logical_type_),
decimal_precision(decimal_precision_),
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_)
list_bytes_per_row_est(list_bytes_per_row_est_),
is_strings_to_cat(strings_to_categorical_)
{
}

Expand All @@ -423,7 +422,8 @@ struct ColumnChunkDesc {
uint32_t num_rows{}; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]{}; // max definition/repetition level
int16_t max_nesting_depth{}; // max nesting depth of the output
uint16_t data_type{}; // basic column data type, ((type_length << 3) | // parquet::Type)
int32_t type_length{}; // type length from schema (for FLBA only)
Type physical_type{}; // parquet physical data type
uint8_t
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
Expand All @@ -433,10 +433,8 @@ struct ColumnChunkDesc {
bitmask_type** valid_map_base{}; // base pointers of valid bit map for this column
void** column_data_base{}; // base pointers of column data
void** column_string_base{}; // base pointers of column string data
int8_t codec{}; // compressed codec enum
int8_t converted_type{}; // converted type enum
Compression codec{}; // compressed codec enum
thrust::optional<LogicalType> logical_type{}; // logical type
int8_t decimal_precision{}; // Decimal precision
int32_t ts_clock_rate{}; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)

int32_t src_col_index{}; // my input column index
Expand All @@ -446,6 +444,8 @@ struct ColumnChunkDesc {
column_chunk_info const* h_chunk_info{};

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row

bool is_strings_to_cat{}; // convert strings to hashes
};

/**
Expand Down Expand Up @@ -615,11 +615,16 @@ struct EncPage {
*/
constexpr bool is_string_col(ColumnChunkDesc const& chunk)
{
auto const not_converted_to_decimal = chunk.converted_type != DECIMAL;
// return true for non-hashed byte_array and fixed_len_byte_array that isn't representing
// a decimal.
if (chunk.logical_type.has_value() and chunk.logical_type->type == LogicalType::DECIMAL) {
return false;
}

auto const non_hashed_byte_array =
(chunk.data_type & 7) == BYTE_ARRAY and (chunk.data_type >> 3) != 4;
auto const fixed_len_byte_array = (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY;
return not_converted_to_decimal and (non_hashed_byte_array or fixed_len_byte_array);
chunk.physical_type == BYTE_ARRAY and not chunk.is_strings_to_cat;
auto const fixed_len_byte_array = chunk.physical_type == FIXED_LEN_BYTE_ARRAY;
return non_hashed_byte_array or fixed_len_byte_array;
}

/**
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@

namespace cudf::io::parquet::detail {

namespace {
// Tests the passed in logical type for a FIXED_LENGTH_BYTE_ARRAY column to see if it should
// be treated as a string. Currently the only logical type that has special handling is DECIMAL.
// Other valid types in the future would be UUID (still treated as string) and FLOAT16 (which
// for now would also be treated as a string).
inline bool is_treat_fixed_length_as_string(thrust::optional<LogicalType> const& logical_type)
{
if (!logical_type.has_value()) { return true; }
return logical_type->type != LogicalType::DECIMAL;
}

} // namespace

void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows)
{
auto& pass = *_pass_itm_data;
Expand Down Expand Up @@ -66,7 +79,8 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
// TODO: we could probably dummy up size stats for FLBA data since we know the width
auto const has_flba =
std::any_of(pass.chunks.begin(), pass.chunks.end(), [](auto const& chunk) {
return (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY && chunk.converted_type != DECIMAL;
return chunk.physical_type == FIXED_LEN_BYTE_ARRAY and
is_treat_fixed_length_as_string(chunk.logical_type);
});

if (!_has_page_index || uses_custom_row_bounds || has_flba) {
Expand Down
Loading

0 comments on commit a7ceede

Please sign in to comment.