diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index af92b7ceaf5..9dabe4e8800 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -461,6 +461,7 @@ add_library( src/hash/sha256_hash.cu src/hash/sha384_hash.cu src/hash/sha512_hash.cu + src/hash/xxhash_32.cu src/hash/xxhash_64.cu src/interop/dlpack.cpp src/interop/arrow_utilities.cpp diff --git a/cpp/include/cudf/hashing.hpp b/cpp/include/cudf/hashing.hpp index 307a52cd242..88034b4f804 100644 --- a/cpp/include/cudf/hashing.hpp +++ b/cpp/include/cudf/hashing.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -166,6 +166,26 @@ std::unique_ptr sha512( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); +/** + * @brief Computes the XXHash_32 hash value of each row in the given table + * + * This function computes the hash of each column using the `seed` for the first column + * and the resulting hash as a seed for the next column and so on. + * The result is a uint32 value for each row. + * + * @param input The table of columns to hash + * @param seed Optional seed value to use for the hash function + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns A column where each row is the hash of a row from the input + */ +std::unique_ptr xxhash_32( + table_view const& input, + uint32_t seed = DEFAULT_HASH_SEED, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + /** * @brief Computes the XXHash_64 hash value of each row in the given table * diff --git a/cpp/include/cudf/hashing/detail/hashing.hpp b/cpp/include/cudf/hashing/detail/hashing.hpp index 7cb80081a95..f796ff4526e 100644 --- a/cpp/include/cudf/hashing/detail/hashing.hpp +++ b/cpp/include/cudf/hashing/detail/hashing.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,11 @@ std::unique_ptr sha512(table_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +std::unique_ptr xxhash_32(table_view const& input, + uint64_t seed, + rmm::cuda_stream_view, + rmm::device_async_resource_ref mr); + std::unique_ptr xxhash_64(table_view const& input, uint64_t seed, rmm::cuda_stream_view, diff --git a/cpp/include/cudf/hashing/detail/xxhash_32.cuh b/cpp/include/cudf/hashing/detail/xxhash_32.cuh new file mode 100644 index 00000000000..bb6e7f18fbc --- /dev/null +++ b/cpp/include/cudf/hashing/detail/xxhash_32.cuh @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace cudf::hashing::detail { + +template +struct XXHash_32 { + using result_type = std::uint32_t; + + CUDF_HOST_DEVICE constexpr XXHash_32(uint32_t seed = cudf::DEFAULT_HASH_SEED) : _impl{seed} {} + + __device__ constexpr result_type operator()(Key const& key) const { return this->_impl(key); } + + __device__ constexpr result_type compute_bytes(cuda::std::byte const* bytes, + std::uint64_t size) const + { + return this->_impl.compute_hash(bytes, size); + } + + private: + template + __device__ constexpr result_type compute(T const& key) const + { + return this->compute_bytes(reinterpret_cast(&key), sizeof(T)); + } + + cuco::xxhash_32 _impl; +}; + +template <> +XXHash_32::result_type __device__ inline XXHash_32::operator()(bool const& key) const +{ + return this->compute(static_cast(key)); +} + +template <> +XXHash_32::result_type __device__ inline XXHash_32::operator()(float const& key) const +{ + return this->compute(normalize_nans_and_zeros(key)); +} + +template <> +XXHash_32::result_type __device__ inline XXHash_32::operator()( + double const& key) const +{ + return this->compute(normalize_nans_and_zeros(key)); +} + +template <> +XXHash_32::result_type + __device__ inline XXHash_32::operator()(cudf::string_view const& key) const +{ + return this->compute_bytes(reinterpret_cast(key.data()), + key.size_bytes()); +} + +template <> +XXHash_32::result_type + __device__ inline XXHash_32::operator()(numeric::decimal32 const& key) const +{ + return this->compute(key.value()); +} + +template <> +XXHash_32::result_type + __device__ inline XXHash_32::operator()(numeric::decimal64 const& key) const +{ + return this->compute(key.value()); +} + +template <> +XXHash_32::result_type + __device__ inline XXHash_32::operator()(numeric::decimal128 const& key) const +{ + return this->compute(key.value()); +} + +template <> +XXHash_32::result_type __device__ inline XXHash_32::operator()( + cudf::list_view const& key) const +{ + CUDF_UNREACHABLE("List column hashing is not supported"); +} + +template <> +XXHash_32::result_type + __device__ inline XXHash_32::operator()(cudf::struct_view const& key) const +{ + CUDF_UNREACHABLE("Direct hashing of struct_view is not supported"); +} + +} // namespace cudf::hashing::detail diff --git a/cpp/src/hash/xxhash_32.cu b/cpp/src/hash/xxhash_32.cu new file mode 100644 index 00000000000..40503f7f911 --- /dev/null +++ b/cpp/src/hash/xxhash_32.cu @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace cudf { +namespace hashing { +namespace detail { + +namespace { + +/** + * @brief Computes the hash value of a row in the given table. + * + * @tparam Nullate A cudf::nullate type describing whether to check for nulls. + */ +template +class device_row_hasher { + public: + device_row_hasher(Nullate nulls, table_device_view const& t, hash_value_type seed) + : _check_nulls(nulls), _table(t), _seed(seed) + { + } + + __device__ auto operator()(size_type row_index) const noexcept + { + return cudf::detail::accumulate( + _table.begin(), + _table.end(), + _seed, + [row_index, nulls = _check_nulls] __device__(auto hash, auto column) { + return cudf::type_dispatcher( + column.type(), element_hasher_adapter{}, column, row_index, nulls, hash); + }); + } + + /** + * @brief Computes the hash value of an element in the given column. + */ + class element_hasher_adapter { + public: + template ())> + __device__ hash_value_type operator()(column_device_view const& col, + size_type const row_index, + Nullate const _check_nulls, + hash_value_type const _seed) const noexcept + { + if (_check_nulls && col.is_null(row_index)) { + return cuda::std::numeric_limits::max(); + } + auto const hasher = XXHash_32{_seed}; + return hasher(col.element(row_index)); + } + + template ())> + __device__ hash_value_type operator()(column_device_view const&, + size_type const, + Nullate const, + hash_value_type const) const noexcept + { + CUDF_UNREACHABLE("Unsupported type for XXHash_32"); + } + }; + + Nullate const _check_nulls; + table_device_view const _table; + hash_value_type const _seed; +}; + +} // namespace + +std::unique_ptr xxhash_32(table_view const& input, + uint32_t seed, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto output = make_numeric_column(data_type(type_to_id()), + input.num_rows(), + mask_state::UNALLOCATED, + stream, + mr); + + // Return early if there's nothing to hash + if (input.num_columns() == 0 || input.num_rows() == 0) { return output; } + + bool const nullable = has_nulls(input); + auto const input_view = table_device_view::create(input, stream); + auto output_view = output->mutable_view(); + + // Compute the hash value for each row + thrust::tabulate(rmm::exec_policy(stream), + output_view.begin(), + output_view.end(), + device_row_hasher(nullable, *input_view, seed)); + + return output; +} + +} // namespace detail + +std::unique_ptr xxhash_32(table_view const& input, + uint32_t seed, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::xxhash_32(input, seed, stream, mr); +} + +} // namespace hashing +} // namespace cudf diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 7facc6497ed..469f933f918 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ #include #include +#include #include #include diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 1572b7246c0..1f84d1f81dc 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -132,6 +132,177 @@ struct orcdec_state_s { } vals; }; +/** + * @brief Manage caching of the first run of TIMESTAMP's DATA stream for a row group. + * + * This class is used to address a special case, where the first run of the DATA stream spans two + * adjacent row groups and its length is greater than the maximum length allowed to be consumed. + * This limit is imposed by the decoder when processing the SECONDARY stream. This class shall be + * instantiated in the shared memory, and be used to cache the DATA stream with a decoded data type + * of `int64_t`. As an optimization, the actual cache is implemented in the cache_helper class as a + * local variable and does not reside in the shared memory. + */ +class run_cache_manager { + private: + enum class status : uint8_t { + DISABLED, ///< Run cache manager is disabled. No caching will be performed. If the special case + ///< happens, the run cache manager will be set to this status after the cache read + ///< is completed. This status also applies when the special case does not happen. + CAN_WRITE_TO_CACHE, ///< Run cache manager is ready for write. If the special case happens, the + ///< run cache manager will be set to this status. + CAN_READ_FROM_CACHE, ///< Run cache manager is ready for read. If the special case happens, the + ///< run cache manager will be set to this status after the cache write is + ///< completed. + }; + + public: + /** + * @brief Initialize the run cache manager. + * + * @param[in] s ORC decoder state. + */ + __device__ void initialize(orcdec_state_s* s) + { + _status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP) + ? status::CAN_WRITE_TO_CACHE + : status::DISABLED; + _reusable_length = 0; + _run_length = 0; + } + + private: + status _status; ///< The status of the run cache manager. + uint32_t + _reusable_length; ///< The number of data to be cached and reused later. For example, if a run + ///< has a length of 512 but the maximum length allowed to be consumed is + ///< capped at 162, then 350 (512-162) data will be cached. + uint32_t _run_length; ///< The length of the run, 512 in the above example. + friend class cache_helper; +}; + +/** + * @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for + * a row group. + * + * The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache is + * in the local storage (as an optimization). If a function is to use run_cache_manager, both the + * manager and the cache objects need to be passed. This class is introduced to simplify the + * function call, so that only a single cache_helper object needs to be passed. To that end, public + * methods originally belonging to run_cache_manager have been moved to this class. + */ +class cache_helper { + public: + /** + * @brief Constructor. + * + * @param[in] run_cache_manager_inst An instance of run_cache_manager. + */ + __device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst) + : _manager(run_cache_manager_inst) + { + } + + /** + * @brief Set the reusable length object. + * + * @param[in] run_length The length of the first run (spanning two adjacent row groups) of the + * DATA stream. + * @param[in] max_length The maximum length allowed to be consumed. This limit is imposed + * by the decoder when processing the SECONDARY stream. + */ + __device__ void set_reusable_length(uint32_t run_length, uint32_t max_length) + { + if (_manager._status == run_cache_manager::status::CAN_WRITE_TO_CACHE) { + _manager._run_length = run_length; + _manager._reusable_length = + (_manager._run_length > max_length) ? (_manager._run_length - max_length) : 0; + } + } + + /** + * @brief Adjust the maximum length allowed to be consumed when the length of the first run is + * greater than it. + * + * @param[in] max_length The maximum length allowed to be consumed for the DATA stream. + * @return A new maximum length. + */ + [[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length) + { + auto new_max_length{max_length}; + if (_manager._status == run_cache_manager::status::CAN_READ_FROM_CACHE) { + new_max_length -= _manager._reusable_length; + } + return new_max_length; + } + + /** + * @brief Copy the excess data from the intermediate buffer for the DATA stream to the cache. + * + * @param[in] src Intermediate buffer for the DATA stream. + */ + __device__ void write_to_cache(int64_t* src) + { + if (_manager._status != run_cache_manager::status::CAN_WRITE_TO_CACHE) { return; } + + auto const tid = threadIdx.x; + + __syncthreads(); + + // All threads in the block always take a uniform code path for the following branches. + // _reusable_length ranges between [0, 512]. + if (_manager._reusable_length > 0) { + auto const length_to_skip = _manager._run_length - _manager._reusable_length; + if (tid < _manager._reusable_length) { + auto const src_idx = tid + length_to_skip; + _storage = src[src_idx]; + } + if (tid == 0) { _manager._status = run_cache_manager::status::CAN_READ_FROM_CACHE; } + } else { + if (tid == 0) { _manager._status = run_cache_manager::status::DISABLED; } + } + + __syncthreads(); + } + + /** + * @brief Copy the cached data to the intermediate buffer for the DATA stream. + * + * @param[in,out] dst Intermediate buffer for the DATA stream. + * @param[in,out] rle Run length decoder state object. + */ + __device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle) + { + if (_manager._status != run_cache_manager::status::CAN_READ_FROM_CACHE) { return; } + + auto const tid = threadIdx.x; + + // First, shift the data up + auto const dst_idx = tid + _manager._reusable_length; + auto const v = (dst_idx < rle->num_vals + _manager._reusable_length) ? dst[tid] : 0; + __syncthreads(); + + if (dst_idx < rle->num_vals + _manager._reusable_length) { dst[dst_idx] = v; } + __syncthreads(); + + // Second, insert the cached data + if (tid < _manager._reusable_length) { dst[tid] = _storage; } + __syncthreads(); + + if (tid == 0) { + // Disable the run cache manager, since cache write-and-read happens at most once per row + // group. + _manager._status = run_cache_manager::status::DISABLED; + rle->num_vals += _manager._reusable_length; + } + + __syncthreads(); + } + + private: + run_cache_manager& _manager; ///< An instance of run_cache_manager. + int64_t _storage; ///< Per-thread cache storage. +}; + /** * @brief Initializes byte stream, modifying length and start position to keep the read pointer * 8-byte aligned. @@ -631,6 +802,8 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = { * @param[in] maxvals maximum number of values to decode * @param[in] t thread id * @param[in] has_buffered_values If true, means there are already buffered values + * @param[in] cache_helper_inst If non-null, the run cache manager will be used to manage + * caching of the first run of the DATA stream. * * @return number of values decoded */ @@ -640,9 +813,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, T* vals, uint32_t maxvals, int t, - bool has_buffered_values = false) + bool has_buffered_values = false, + cache_helper* cache_helper_inst = nullptr) { if (t == 0) { + if (cache_helper_inst != nullptr) { maxvals = cache_helper_inst->adjust_max_length(maxvals); } uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u)); uint32_t lastpos = bs->pos; auto numvals = 0; @@ -685,6 +860,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, l += deltapos; } } + + if (cache_helper_inst != nullptr) { cache_helper_inst->set_reusable_length(n, maxvals); } + if ((numvals != 0) and (numvals + n > maxvals)) break; // case where there are buffered values and can't consume a whole chunk // from decoded values, so skip adding any more to buffer, work on buffered values and then @@ -866,6 +1044,17 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, __syncwarp(); } __syncthreads(); + // Currently run_cache_manager is only designed to fix the TIMESTAMP's DATA stream bug where the + // data type is int64_t. + if constexpr (cuda::std::is_same_v) { + if (cache_helper_inst != nullptr) { + // Run cache is read from during the 2nd iteration of the top-level while loop in + // gpuDecodeOrcColumnData(). + cache_helper_inst->read_from_cache(vals, rle); + // Run cache is written to during the 1st iteration of the loop. + cache_helper_inst->write_to_cache(vals); + } + } return rle->num_vals; } @@ -1401,6 +1590,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) // Struct doesn't have any data in itself, so skip bool const is_valid = s->chunk.type_kind != STRUCT; size_t const max_num_rows = s->chunk.column_num_rows; + __shared__ run_cache_manager run_cache_manager_inst; + cache_helper cache_helper_inst(run_cache_manager_inst); if (t == 0 and is_valid) { // If we have an index, seek to the initial run and update row positions if (num_rowgroups > 0) { @@ -1443,6 +1634,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]); bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]); + + run_cache_manager_inst.initialize(s); } __syncthreads(); @@ -1602,7 +1795,13 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (is_rlev1(s->chunk.encoding_kind)) { numvals = Integer_RLEv1(bs, &s->u.rlev1, s->vals.i64, numvals, t); } else { - numvals = Integer_RLEv2(bs, &s->u.rlev2, s->vals.i64, numvals, t); + numvals = Integer_RLEv2(bs, + &s->u.rlev2, + s->vals.i64, + numvals, + t, + false /**has_buffered_values */, + &cache_helper_inst); } if (s->chunk.type_kind == DECIMAL) { // If we're using an index, we may have to drop values from the initial run diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index b5f9b894c46..0d40a1f7b1b 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ #include #include +#include #include #include diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh index 4f75908fe72..37c5698f654 100644 --- a/cpp/src/join/join_common_utils.cuh +++ b/cpp/src/join/join_common_utils.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ #include #include #include +#include #include #include diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e5c29314203..344979e1288 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2024, NVIDIA CORPORATION. +# Copyright (c) 2018-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -192,6 +192,7 @@ ConfigureTest( hashing/sha256_test.cpp hashing/sha384_test.cpp hashing/sha512_test.cpp + hashing/xxhash_32_test.cpp hashing/xxhash_64_test.cpp ) diff --git a/cpp/tests/hashing/xxhash_32_test.cpp b/cpp/tests/hashing/xxhash_32_test.cpp new file mode 100644 index 00000000000..9e3c66b0d0b --- /dev/null +++ b/cpp/tests/hashing/xxhash_32_test.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +class XXHash_32_Test : public cudf::test::BaseFixture {}; + +TEST_F(XXHash_32_Test, TestInteger) +{ + auto col1 = cudf::test::fixed_width_column_wrapper{{0, 42, 825}}; + auto constexpr seed = 0u; + auto const output = cudf::hashing::xxhash_32(cudf::table_view({col1}), seed); + + // Expected results were generated with the reference implementation: + // https://github.com/Cyan4973/xxHash/blob/dev/xxhash.h + auto expected = + cudf::test::fixed_width_column_wrapper({148298089u, 1161967057u, 1066694813u}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(output->view(), expected); +} + +TEST_F(XXHash_32_Test, TestDouble) +{ + auto col1 = cudf::test::fixed_width_column_wrapper{{-8., 25., 90.}}; + auto constexpr seed = 42u; + + auto const output = cudf::hashing::xxhash_32(cudf::table_view({col1}), seed); + + // Expected results were generated with the reference implementation: + // https://github.com/Cyan4973/xxHash/blob/dev/xxhash.h + auto expected = + cudf::test::fixed_width_column_wrapper({2276435783u, 3120212431u, 3454197470u}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(output->view(), expected); +} + +TEST_F(XXHash_32_Test, StringType) +{ + auto col1 = cudf::test::strings_column_wrapper({"I", "am", "AI"}); + auto constexpr seed = 825u; + + auto output = cudf::hashing::xxhash_32(cudf::table_view({col1}), seed); + + // Expected results were generated with the reference implementation: + // https://github.com/Cyan4973/xxHash/blob/dev/xxhash.h + auto expected = + cudf::test::fixed_width_column_wrapper({320624298u, 1612654309u, 1409499009u}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(output->view(), expected); +} diff --git a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java index 53af52eff07..5e544e92a77 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,12 +62,13 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File f * @param filePath Full path of the input Parquet file to read. */ public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, File filePath) { - handle = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), - filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId()); - + long[] handles = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), + filePath.getAbsolutePath(), null, opts.timeUnit().typeId.getNativeId()); + handle = handles[0]; if (handle == 0) { throw new IllegalStateException("Cannot create native chunked Parquet reader object."); } + multiHostBufferSourceHandle = handles[1]; } /** @@ -100,12 +101,41 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMe public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, HostMemoryBuffer buffer, long offset, long len) { - handle = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, - buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId()); + long[] addrsSizes = new long[]{ buffer.getAddress() + offset, len }; + long[] handles = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, + addrsSizes, opts.timeUnit().typeId.getNativeId()); + handle = handles[0]; + if (handle == 0) { + throw new IllegalStateException("Cannot create native chunked Parquet reader object."); + } + multiHostBufferSourceHandle = handles[1]; + } + /** + * Construct the reader instance from a read limit and data in host memory buffers. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used for reading and decompressing data or + * 0 if there is no limit + * @param opts The options for Parquet reading. + * @param buffers Array of buffers containing the file data. The buffers are logically + * concatenated to construct the file being read. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, + ParquetOptions opts, HostMemoryBuffer... buffers) { + long[] addrsSizes = new long[buffers.length * 2]; + for (int i = 0; i < buffers.length; i++) { + addrsSizes[i * 2] = buffers[i].getAddress(); + addrsSizes[(i * 2) + 1] = buffers[i].getLength(); + } + long[] handles = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, + addrsSizes, opts.timeUnit().typeId.getNativeId()); + handle = handles[0]; if (handle == 0) { throw new IllegalStateException("Cannot create native chunked Parquet reader object."); } + multiHostBufferSourceHandle = handles[1]; } /** @@ -181,6 +211,10 @@ public void close() { DataSourceHelper.destroyWrapperDataSource(dataSourceHandle); dataSourceHandle = 0; } + if (multiHostBufferSourceHandle != 0) { + destroyMultiHostBufferSource(multiHostBufferSourceHandle); + multiHostBufferSourceHandle = 0; + } } @@ -196,6 +230,8 @@ public void close() { private long dataSourceHandle = 0; + private long multiHostBufferSourceHandle = 0; + /** * Create a native chunked Parquet reader object on heap and return its memory address. * @@ -206,13 +242,12 @@ public void close() { * @param filterColumnNames Name of the columns to read, or an empty array if we want to read all. * @param binaryToString Whether to convert the corresponding column to String if it is binary. * @param filePath Full path of the file to read, or given as null if reading from a buffer. - * @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer. - * @param length The length of the buffer to read from. + * @param bufferAddrsSizes The address and size pairs of buffers to read from, or null if we are not using buffers. * @param timeUnit Return type of time unit for timestamps. */ - private static native long create(long chunkSizeByteLimit, long passReadLimit, - String[] filterColumnNames, boolean[] binaryToString, - String filePath, long bufferAddrs, long length, int timeUnit); + private static native long[] create(long chunkSizeByteLimit, long passReadLimit, + String[] filterColumnNames, boolean[] binaryToString, + String filePath, long[] bufferAddrsSizes, int timeUnit); private static native long createWithDataSource(long chunkedSizeByteLimit, String[] filterColumnNames, boolean[] binaryToString, int timeUnit, long dataSourceHandle); @@ -222,4 +257,6 @@ private static native long createWithDataSource(long chunkedSizeByteLimit, private static native long[] readChunk(long handle); private static native void close(long handle); + + private static native void destroyMultiHostBufferSource(long handle); } diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index b01ce31b1f3..298f2cff6f3 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -313,12 +313,11 @@ private static native long readAndInferJSON(long address, long length, * all of them * @param binaryToString whether to convert this column to String if binary * @param filePath the path of the file to read, or null if no path should be read. - * @param address the address of the buffer to read from or 0 if we should not. - * @param length the length of the buffer to read from. + * @param addrsAndSizes the address and size pairs for every buffer or null for no buffers. * @param timeUnit return type of TimeStamp in units */ private static native long[] readParquet(String[] filterColumnNames, boolean[] binaryToString, String filePath, - long address, long length, int timeUnit) throws CudfException; + long[] addrsAndSizes, int timeUnit) throws CudfException; private static native long[] readParquetFromDataSource(String[] filterColumnNames, boolean[] binaryToString, int timeUnit, @@ -1357,7 +1356,7 @@ public static Table readParquet(File path) { */ public static Table readParquet(ParquetOptions opts, File path) { return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), - path.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId())); + path.getAbsolutePath(), null, opts.timeUnit().typeId.getNativeId())); } /** @@ -1402,6 +1401,14 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, } } + /** + * Read parquet formatted data. + * @param opts various parquet parsing options. + * @param buffer raw parquet formatted bytes. + * @param offset the starting offset into buffer. + * @param len the number of bytes to parse. + * @return the data parsed as a table on the GPU. + */ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, long len) { return readParquet(opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); } @@ -1422,10 +1429,35 @@ public static Table readParquet(ParquetOptions opts, HostMemoryBuffer buffer, assert len > 0; assert len <= buffer.getLength() - offset; assert offset >= 0 && offset < buffer.length; + long[] addrsSizes = new long[]{ buffer.getAddress() + offset, len }; + return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), + null, addrsSizes, opts.timeUnit().typeId.getNativeId())); + } + + /** + * Read parquet formatted data. + * @param opts various parquet parsing options. + * @param buffers Buffers containing the Parquet data. The buffers are logically concatenated + * in order to construct the file being read. + * @return the data parsed as a table on the GPU. + */ + public static Table readParquet(ParquetOptions opts, HostMemoryBuffer... buffers) { + assert buffers.length > 0; + long[] addrsSizes = new long[buffers.length * 2]; + for (int i = 0; i < buffers.length; i++) { + addrsSizes[i * 2] = buffers[i].getAddress(); + addrsSizes[(i * 2) + 1] = buffers[i].getLength(); + } return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), - null, buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId())); + null, addrsSizes, opts.timeUnit().typeId.getNativeId())); } + /** + * Read parquet formatted data. + * @param opts various parquet parsing options. + * @param ds custom datasource to provide the Parquet file data + * @return the data parsed as a table on the GPU. + */ public static Table readParquet(ParquetOptions opts, DataSource ds) { long dataSourceHandle = DataSourceHelper.createWrapperDataSource(ds); try { diff --git a/java/src/main/native/CMakeLists.txt b/java/src/main/native/CMakeLists.txt index 9ff43feeac6..bd1714aa476 100644 --- a/java/src/main/native/CMakeLists.txt +++ b/java/src/main/native/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2019-2024, NVIDIA CORPORATION. +# Copyright (c) 2019-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -156,8 +156,9 @@ add_library( src/ScalarJni.cpp src/TableJni.cpp src/aggregation128_utils.cu - src/maps_column_view.cu src/check_nvcomp_output_sizes.cu + src/maps_column_view.cu + src/multi_host_buffer_source.cpp ) # Disable NVTX if necessary diff --git a/java/src/main/native/include/multi_host_buffer_source.hpp b/java/src/main/native/include/multi_host_buffer_source.hpp new file mode 100644 index 00000000000..2aedb2321e4 --- /dev/null +++ b/java/src/main/native/include/multi_host_buffer_source.hpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "jni_utils.hpp" + +#include + +#include + +namespace cudf { +namespace jni { + +/** + * @brief A custom datasource providing data from an array of host memory buffers. + */ +class multi_host_buffer_source : public cudf::io::datasource { + std::vector addrs_; + std::vector offsets_; + + size_t locate_offset_index(size_t offset); + + public: + explicit multi_host_buffer_source(native_jlongArray const& addrs_sizes); + std::unique_ptr host_read(size_t offset, size_t size) override; + size_t host_read(size_t offset, size_t size, uint8_t* dst) override; + bool supports_device_read() const override { return true; } + bool is_device_read_preferred(size_t size) const override { return true; } + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override; + size_t device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override; + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override; + size_t size() const override { return offsets_.back(); } +}; + +} // namespace jni +} // namespace cudf diff --git a/java/src/main/native/src/ChunkedReaderJni.cpp b/java/src/main/native/src/ChunkedReaderJni.cpp index cf04a87262f..4967e0b2b04 100644 --- a/java/src/main/native/src/ChunkedReaderJni.cpp +++ b/java/src/main/native/src/ChunkedReaderJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ #include "cudf_jni_apis.hpp" #include "jni_utils.hpp" +#include "multi_host_buffer_source.hpp" #include #include @@ -36,7 +37,7 @@ extern "C" { // This function should take all the parameters that `Table.readParquet` takes, // plus one more parameter `long chunkSizeByteLimit`. -JNIEXPORT jlong JNICALL +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, jclass, jlong chunk_read_limit, @@ -44,27 +45,26 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inp_file_path, - jlong buffer, - jlong buffer_length, + jlongArray addrs_sizes, jint unit) { - JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0); + JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", nullptr); bool read_buffer = true; - if (buffer == 0) { - JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0); + if (addrs_sizes == nullptr) { + JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", nullptr); read_buffer = false; } else if (inp_file_path != nullptr) { - JNI_THROW_NEW( - env, cudf::jni::ILLEGAL_ARG_CLASS, "Cannot pass in both a buffer and an inp_file_path", 0); - } else if (buffer_length <= 0) { - JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0); + JNI_THROW_NEW(env, + cudf::jni::ILLEGAL_ARG_CLASS, + "Cannot pass in both buffers and an inp_file_path", + nullptr); } try { cudf::jni::auto_set_device(env); cudf::jni::native_jstring filename(env, inp_file_path); if (!read_buffer && filename.is_empty()) { - JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", 0); + JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", nullptr); } cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); @@ -75,9 +75,15 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read); (void)n_col_binary_read; - auto const source = read_buffer ? cudf::io::source_info(reinterpret_cast(buffer), - static_cast(buffer_length)) - : cudf::io::source_info(filename.get()); + cudf::jni::native_jlongArray n_addrs_sizes(env, addrs_sizes); + std::unique_ptr multi_buffer_source; + cudf::io::source_info source; + if (read_buffer) { + multi_buffer_source.reset(new cudf::jni::multi_host_buffer_source(n_addrs_sizes)); + source = cudf::io::source_info(multi_buffer_source.get()); + } else { + source = cudf::io::source_info(filename.get()); + } auto opts_builder = cudf::io::parquet_reader_options::builder(source); if (n_filter_col_names.size() > 0) { @@ -86,13 +92,18 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, auto const read_opts = opts_builder.convert_strings_to_categories(false) .timestamp_type(cudf::data_type(static_cast(unit))) .build(); - - return reinterpret_cast( + n_addrs_sizes.cancel(); + n_col_binary_read.cancel(); + auto reader_handle = reinterpret_cast( new cudf::io::chunked_parquet_reader(static_cast(chunk_read_limit), static_cast(pass_read_limit), read_opts)); + cudf::jni::native_jlongArray result(env, 2); + result[0] = reader_handle; + result[1] = cudf::jni::release_as_jlong(multi_buffer_source); + return result.get_jArray(); } - CATCH_STD(env, 0); + CATCH_STD(env, nullptr); } JNIEXPORT jlong JNICALL @@ -177,6 +188,17 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* en CATCH_STD(env, ); } +JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_destroyMultiHostBufferSource( + JNIEnv* env, jclass, jlong handle) +{ + JNI_NULL_CHECK(env, handle, "handle is null", ); + + try { + delete reinterpret_cast(handle); + } + CATCH_STD(env, ); +} + // // Chunked ORC reader JNI // diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index ed35f35794d..a6c7ae9ba18 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -19,6 +19,7 @@ #include "jni_compiled_expr.hpp" #include "jni_utils.hpp" #include "jni_writer_data_sink.hpp" +#include "multi_host_buffer_source.hpp" #include #include @@ -2071,20 +2072,17 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv* env, jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inputfilepath, - jlong buffer, - jlong buffer_length, + jlongArray addrs_and_sizes, jint unit) { JNI_NULL_CHECK(env, j_col_binary_read, "null col_binary_read", 0); bool read_buffer = true; - if (buffer == 0) { + if (addrs_and_sizes == nullptr) { JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL); read_buffer = false; } else if (inputfilepath != NULL) { JNI_THROW_NEW( env, cudf::jni::ILLEGAL_ARG_CLASS, "cannot pass in both a buffer and an inputfilepath", NULL); - } else if (buffer_length <= 0) { - JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", NULL); } try { @@ -2096,10 +2094,15 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv* env, cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read); - - auto source = read_buffer ? cudf::io::source_info(reinterpret_cast(buffer), - static_cast(buffer_length)) - : cudf::io::source_info(filename.get()); + cudf::jni::native_jlongArray n_addrs_sizes(env, addrs_and_sizes); + std::unique_ptr multi_buffer_source; + cudf::io::source_info source; + if (read_buffer) { + multi_buffer_source.reset(new cudf::jni::multi_host_buffer_source(n_addrs_sizes)); + source = cudf::io::source_info(multi_buffer_source.get()); + } else { + source = cudf::io::source_info(filename.get()); + } auto builder = cudf::io::parquet_reader_options::builder(source); if (n_filter_col_names.size() > 0) { @@ -2110,7 +2113,10 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv* env, builder.convert_strings_to_categories(false) .timestamp_type(cudf::data_type(static_cast(unit))) .build(); - return convert_table_for_return(env, cudf::io::read_parquet(opts).tbl); + auto tbl = cudf::io::read_parquet(opts).tbl; + n_col_binary_read.cancel(); + n_addrs_sizes.cancel(); + return convert_table_for_return(env, tbl); } CATCH_STD(env, NULL); } diff --git a/java/src/main/native/src/multi_host_buffer_source.cpp b/java/src/main/native/src/multi_host_buffer_source.cpp new file mode 100644 index 00000000000..c577fc680ba --- /dev/null +++ b/java/src/main/native/src/multi_host_buffer_source.cpp @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "multi_host_buffer_source.hpp" + +#include +#include +#include +#include + +namespace cudf { +namespace jni { + +multi_host_buffer_source::multi_host_buffer_source(native_jlongArray const& addrs_sizes) +{ + if (addrs_sizes.size() % 2 != 0) { + throw std::logic_error("addrs_sizes length not a multiple of 2"); + } + auto count = addrs_sizes.size() / 2; + addrs_.reserve(count); + offsets_.reserve(count + 1); + size_t total_size = 0; + for (int i = 0; i < addrs_sizes.size(); i += 2) { + addrs_.push_back(reinterpret_cast(addrs_sizes[i])); + offsets_.push_back(total_size); + total_size += addrs_sizes[i + 1]; + } + offsets_.push_back(total_size); +} + +size_t multi_host_buffer_source::locate_offset_index(size_t offset) +{ + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + auto start = offsets_.begin(); + auto it = std::upper_bound(start, offsets_.end(), offset); + return (it - start) - 1; +} + +std::unique_ptr multi_host_buffer_source::host_read(size_t offset, + size_t size) +{ + if (size == 0) { return 0; } + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + auto const end_offset = offset + size; + if (end_offset > offsets_.back()) { throw std::runtime_error("read past end of file"); } + auto buffer_index = locate_offset_index(offset); + auto next_offset = offsets_[buffer_index + 1]; + if (end_offset <= next_offset) { + // read range hits only a single buffer, so return a zero-copy view of the data + auto src = addrs_[buffer_index] + offset - offsets_[buffer_index]; + return std::make_unique(src, size); + } + auto buf = std::vector(size); + auto bytes_read = host_read(offset, size, buf.data()); + if (bytes_read != size) { + std::stringstream ss; + ss << "Expected host read of " << size << " found " << bytes_read; + throw std::logic_error(ss.str()); + } + return std::make_unique>>(std::move(buf)); +} + +size_t multi_host_buffer_source::host_read(size_t offset, size_t size, uint8_t* dst) +{ + if (size == 0) { return 0; } + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + if (offset + size > offsets_.back()) { throw std::runtime_error("read past end of file"); } + auto buffer_index = locate_offset_index(offset); + auto bytes_left = size; + while (bytes_left > 0) { + auto next_offset = offsets_[buffer_index + 1]; + auto buffer_left = next_offset - offset; + auto buffer_offset = offset - offsets_[buffer_index]; + auto src = addrs_[buffer_index] + buffer_offset; + auto copy_size = std::min(buffer_left, bytes_left); + std::memcpy(dst, src, copy_size); + offset += copy_size; + dst += copy_size; + bytes_left -= copy_size; + ++buffer_index; + } + return size; +} + +std::unique_ptr multi_host_buffer_source::device_read( + size_t offset, size_t size, rmm::cuda_stream_view stream) +{ + rmm::device_buffer buf(size, stream); + auto dst = static_cast(buf.data()); + auto bytes_read = device_read(offset, size, dst, stream); + if (bytes_read != size) { + std::stringstream ss; + ss << "Expected device read of " << size << " found " << bytes_read; + throw std::logic_error(ss.str()); + } + return std::make_unique>(std::move(buf)); +} + +size_t multi_host_buffer_source::device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) +{ + if (size == 0) { return 0; } + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + if (offset + size > offsets_.back()) { throw std::runtime_error("read past end of file"); } + auto buffer_index = locate_offset_index(offset); + auto bytes_left = size; + while (bytes_left > 0) { + auto next_offset = offsets_[buffer_index + 1]; + auto buffer_left = next_offset - offset; + auto buffer_offset = offset - offsets_[buffer_index]; + auto src = addrs_[buffer_index] + buffer_offset; + auto copy_size = std::min(buffer_left, bytes_left); + CUDF_CUDA_TRY(cudaMemcpyAsync(dst, src, copy_size, cudaMemcpyHostToDevice, stream.value())); + offset += copy_size; + dst += copy_size; + bytes_left -= copy_size; + ++buffer_index; + } + return size; +} + +std::future multi_host_buffer_source::device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) +{ + std::promise p; + p.set_value(device_read(offset, size, dst, stream)); + return p.get_future(); +} + +} // namespace jni +} // namespace cudf diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index c7fcb1756b6..7eb32892bad 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,8 +47,11 @@ import java.math.BigInteger; import java.math.RoundingMode; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -1714,6 +1717,42 @@ void testChunkedReadParquet() { } } + @Test + void testChunkedReadParquetHostBuffers() throws Exception { + long size = TEST_PARQUET_FILE_CHUNKED_READ.length(); + java.nio.file.Path path = TEST_PARQUET_FILE_CHUNKED_READ.toPath(); + try (HostMemoryBuffer buf1 = HostMemoryBuffer.allocate(size / 2); + HostMemoryBuffer buf2 = HostMemoryBuffer.allocate(size - buf1.getLength())) { + try (SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ)) { + ByteBuffer bb1 = buf1.asByteBuffer(); + while (bb1.hasRemaining()) { + if (channel.read(bb1) == -1) { + throw new EOFException("error reading first buffer"); + } + } + ByteBuffer bb2 = buf2.asByteBuffer(); + while (bb2.hasRemaining()) { + if (channel.read(bb2) == -1) { + throw new EOFException("error reading second buffer"); + } + } + } + ParquetOptions opts = ParquetOptions.DEFAULT; + try (ParquetChunkedReader reader = new ParquetChunkedReader(240000, 0, opts, buf1, buf2)) { + int numChunks = 0; + long totalRows = 0; + while(reader.hasNext()) { + ++numChunks; + try(Table chunk = reader.readChunk()) { + totalRows += chunk.getRowCount(); + } + } + assertEquals(2, numChunks); + assertEquals(40000, totalRows); + } + } + } + @Test void testChunkedReadParquetFromDataSource() throws IOException { try (MultiBufferDataSource source = sourceFrom(TEST_PARQUET_FILE_CHUNKED_READ); diff --git a/python/cudf/cudf/_lib/scalar.pyx b/python/cudf/cudf/_lib/scalar.pyx index 40bd50acf16..fd6d0257940 100644 --- a/python/cudf/cudf/_lib/scalar.pyx +++ b/python/cudf/cudf/_lib/scalar.pyx @@ -260,26 +260,3 @@ cdef class DeviceScalar: self._dtype = PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[ (cdtype_id) ] - - -def as_device_scalar(val, dtype=None): - if isinstance(val, (cudf.Scalar, DeviceScalar)): - if dtype == val.dtype or dtype is None: - if isinstance(val, DeviceScalar): - return val - else: - return val.device_value - else: - raise TypeError("Can't update dtype of existing GPU scalar") - else: - return cudf.Scalar(val, dtype=dtype).device_value - - -def _is_null_host_scalar(slr): - if cudf.utils.utils.is_na_like(slr): - return True - elif (isinstance(slr, (np.datetime64, np.timedelta64)) and np.isnat(slr)) or \ - slr is pd.NaT: - return True - else: - return False diff --git a/python/cudf/cudf/core/column/categorical.py b/python/cudf/cudf/core/column/categorical.py index b10b8dfe207..d705b4d4c21 100644 --- a/python/cudf/cudf/core/column/categorical.py +++ b/python/cudf/cudf/core/column/categorical.py @@ -621,7 +621,7 @@ def ordered(self) -> bool: def __setitem__(self, key, value): if cudf.api.types.is_scalar( value - ) and cudf._lib.scalar._is_null_host_scalar(value): + ) and cudf.utils.utils._is_null_host_scalar(value): to_add_categories = 0 else: if cudf.api.types.is_scalar(value): diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 31efe267c96..24b657f1c32 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -25,7 +25,6 @@ import cudf from cudf import _lib as libcudf from cudf._lib.column import Column -from cudf._lib.scalar import as_device_scalar from cudf._lib.types import dtype_to_pylibcudf_type, size_type_dtype from cudf.api.types import ( _is_non_decimal_numeric_dtype, @@ -71,7 +70,7 @@ min_signed_type, min_unsigned_type, ) -from cudf.utils.utils import _array_ufunc, mask_dtype +from cudf.utils.utils import _array_ufunc, _is_null_host_scalar, mask_dtype if TYPE_CHECKING: import builtins @@ -777,9 +776,7 @@ def fillna( if not self.has_nulls(include_nan=True): return self.copy() elif method is None: - if is_scalar(fill_value) and libcudf.scalar._is_null_host_scalar( - fill_value - ): + if is_scalar(fill_value) and _is_null_host_scalar(fill_value): return self.copy() else: fill_value = self._validate_fillna_value(fill_value) @@ -1984,12 +1981,12 @@ def as_column( column = Column.from_pylibcudf( plc.filling.sequence( len(arbitrary), - as_device_scalar( + cudf.Scalar( arbitrary.start, dtype=np.dtype(np.int64) - ).c_value, - as_device_scalar( + ).device_value.c_value, + cudf.Scalar( arbitrary.step, dtype=np.dtype(np.int64) - ).c_value, + ).device_value.c_value, ) ) if cudf.get_option("default_integer_bitwidth") and dtype is None: diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index 3d9440cdf21..6283e498842 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -236,7 +236,7 @@ def from_sequences( # Build Data, Mask & Offsets for data in arbitrary: - if cudf._lib.scalar._is_null_host_scalar(data): + if cudf.utils.utils._is_null_host_scalar(data): mask_col.append(False) offset_vals.append(offset) else: diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index 4405e153b0c..8fe5299fcdd 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -151,7 +151,7 @@ def __setitem__(self, key: Any, value: Any): cudf.Scalar( value, dtype=self.dtype - if cudf._lib.scalar._is_null_host_scalar(value) + if cudf.utils.utils._is_null_host_scalar(value) else None, ) if is_scalar(value) @@ -789,7 +789,7 @@ def _normalize_find_and_replace_input( ) # Scalar case if len(col_to_normalize) == 1: - if cudf._lib.scalar._is_null_host_scalar(col_to_normalize[0]): + if cudf.utils.utils._is_null_host_scalar(col_to_normalize[0]): return normalized_column.astype(input_column_dtype) if np.isinf(col_to_normalize[0]): return normalized_column diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 3334b57ce1b..b2121511a14 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -92,7 +92,11 @@ min_signed_type, ) from cudf.utils.performance_tracking import _performance_tracking -from cudf.utils.utils import GetAttrGetItemMixin, _external_only_api +from cudf.utils.utils import ( + GetAttrGetItemMixin, + _external_only_api, + _is_null_host_scalar, +) if TYPE_CHECKING: from cudf._typing import ColumnLike, Dtype, NotImplementedType @@ -3371,7 +3375,7 @@ def _insert(self, loc, name, value, nan_as_null=None, ignore_index=True): if isinstance(value, (np.ndarray, cupy.ndarray)): dtype = value.dtype value = value.item() - if libcudf.scalar._is_null_host_scalar(value): + if _is_null_host_scalar(value): dtype = "str" value = as_column( value, diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 6854cb02aa5..e9ed74f804b 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. """Base class for Frame types that have an index.""" from __future__ import annotations @@ -2836,16 +2836,22 @@ def hash_values( Parameters ---------- - method : {'murmur3', 'md5', 'xxhash64'}, default 'murmur3' + method : {'murmur3', 'xxhash32', 'xxhash64', 'md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'}, default 'murmur3' Hash function to use: * murmur3: MurmurHash3 hash function - * md5: MD5 hash function + * xxhash32: xxHash32 hash function * xxhash64: xxHash64 hash function + * md5: MD5 hash function + * sha1: SHA-1 hash function + * sha224: SHA-224 hash function + * sha256: SHA-256 hash function + * sha384: SHA-384 hash function + * sha512: SHA-512 hash function seed : int, optional Seed value to use for the hash function. This parameter is only - supported for 'murmur3' and 'xxhash64'. + supported for 'murmur3', 'xxhash32', and 'xxhash64'. Returns @@ -2900,7 +2906,7 @@ def hash_values( 2 fe061786ea286a515b772d91b0dfcd70 dtype: object """ - seed_hash_methods = {"murmur3", "xxhash64"} + seed_hash_methods = {"murmur3", "xxhash32", "xxhash64"} if seed is None: seed = 0 elif method not in seed_hash_methods: @@ -2914,6 +2920,8 @@ def hash_values( ) if method == "murmur3": plc_column = plc.hashing.murmurhash3_x86_32(plc_table, seed) + elif method == "xxhash32": + plc_column = plc.hashing.xxhash_32(plc_table, seed) elif method == "xxhash64": plc_column = plc.hashing.xxhash_64(plc_table, seed) elif method == "md5": diff --git a/python/cudf/cudf/core/scalar.py b/python/cudf/cudf/core/scalar.py index 80dd0921f9c..7d246960cc9 100644 --- a/python/cudf/cudf/core/scalar.py +++ b/python/cudf/cudf/core/scalar.py @@ -178,13 +178,13 @@ def dtype(self): def is_valid(self): if not self._is_host_value_current: self._device_value_to_host() - return not cudf._lib.scalar._is_null_host_scalar(self._host_value) + return not cudf.utils.utils._is_null_host_scalar(self._host_value) def _device_value_to_host(self): self._host_value = self._device_value._to_host_scalar() def _preprocess_host_value(self, value, dtype): - valid = not cudf._lib.scalar._is_null_host_scalar(value) + valid = not cudf.utils.utils._is_null_host_scalar(value) if isinstance(value, list): if dtype is not None: diff --git a/python/cudf/cudf/tests/data/orc/TestOrcFile.timestamp.desynced.snappy.RLEv2.orc b/python/cudf/cudf/tests/data/orc/TestOrcFile.timestamp.desynced.snappy.RLEv2.orc new file mode 100644 index 00000000000..a0ea4fbbfc2 Binary files /dev/null and b/python/cudf/cudf/tests/data/orc/TestOrcFile.timestamp.desynced.snappy.RLEv2.orc differ diff --git a/python/cudf/cudf/tests/data/orc/TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc b/python/cudf/cudf/tests/data/orc/TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc new file mode 100644 index 00000000000..8a7969cdbbb Binary files /dev/null and b/python/cudf/cudf/tests/data/orc/TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc differ diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index 11a9b398b50..f3cf8e36a5b 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2024, NVIDIA CORPORATION. +# Copyright (c) 2018-2025, NVIDIA CORPORATION. import array as arr import contextlib @@ -1440,6 +1440,7 @@ def test_assign_callable(mapping): "sha256", "sha384", "sha512", + "xxhash32", "xxhash64", ], ) @@ -1447,6 +1448,7 @@ def test_assign_callable(mapping): def test_dataframe_hash_values(nrows, method, seed): warning_expected = seed is not None and method not in { "murmur3", + "xxhash32", "xxhash64", } potential_warning = ( @@ -1472,6 +1474,7 @@ def test_dataframe_hash_values(nrows, method, seed): "sha256": object, "sha384": object, "sha512": object, + "xxhash32": np.uint32, "xxhash64": np.uint64, } assert out.dtype == expected_dtypes[method] @@ -1486,7 +1489,7 @@ def test_dataframe_hash_values(nrows, method, seed): assert_eq(gdf["a"].hash_values(method=method, seed=seed), out_one) -@pytest.mark.parametrize("method", ["murmur3", "xxhash64"]) +@pytest.mark.parametrize("method", ["murmur3", "xxhash32", "xxhash64"]) def test_dataframe_hash_values_seed(method): gdf = cudf.DataFrame() data = np.arange(10) @@ -1500,6 +1503,34 @@ def test_dataframe_hash_values_seed(method): assert_neq(out_one, out_two) +def test_dataframe_hash_values_xxhash32(): + # xxhash32 has no built-in implementation in Python and we don't want to + # add a testing dependency, so we use regression tests against known good + # values. + gdf = cudf.DataFrame({"a": [0.0, 1.0, 2.0, np.inf, np.nan]}) + gdf["b"] = -gdf["a"] + out_a = gdf["a"].hash_values(method="xxhash32", seed=0) + expected_a = cudf.Series( + [3736311059, 2307980487, 2906647130, 746578903, 4294967295], + dtype=np.uint32, + ) + assert_eq(out_a, expected_a) + + out_b = gdf["b"].hash_values(method="xxhash32", seed=42) + expected_b = cudf.Series( + [1076387279, 2261349915, 531498073, 650869264, 4294967295], + dtype=np.uint32, + ) + assert_eq(out_b, expected_b) + + out_df = gdf.hash_values(method="xxhash32", seed=0) + expected_df = cudf.Series( + [1223721700, 2885793241, 1920811472, 1146715602, 4294967295], + dtype=np.uint32, + ) + assert_eq(out_df, expected_df) + + def test_dataframe_hash_values_xxhash64(): # xxhash64 has no built-in implementation in Python and we don't want to # add a testing dependency, so we use regression tests against known good diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index c4b4ef60184..fe143e66407 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2024, NVIDIA CORPORATION. +# Copyright (c) 2019-2025, NVIDIA CORPORATION. import datetime import decimal @@ -1970,3 +1970,25 @@ def test_row_group_alignment(datadir): got = cudf.read_orc(buffer) assert_eq(expected, got) + + +@pytest.mark.parametrize( + "inputfile", + [ + "TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc", + "TestOrcFile.timestamp.desynced.snappy.RLEv2.orc", + ], +) +def test_orc_reader_desynced_timestamp(datadir, inputfile): + # Test a special case where the DATA stream (second) in a TIMESTAMP column + # is progressed faster than the SECONDARY stream (nanosecond) at the start of a row + # group. In this case, the "run cache manager" in the decoder kernel is used to + # orchestrate the dual-stream processing. + # For more information, see https://github.com/rapidsai/cudf/issues/17155. + + path = datadir / inputfile + + expect = pd.read_orc(path) + got = cudf.read_orc(path) + + assert_frame_equal(cudf.from_pandas(expect), got) diff --git a/python/cudf/cudf/utils/dtypes.py b/python/cudf/cudf/utils/dtypes.py index ca8f9cac2d0..31a8f4de3b3 100644 --- a/python/cudf/cudf/utils/dtypes.py +++ b/python/cudf/cudf/utils/dtypes.py @@ -198,7 +198,7 @@ def to_cudf_compatible_scalar(val, dtype=None): If `val` is None, returns None. """ - if cudf._lib.scalar._is_null_host_scalar(val) or isinstance( + if cudf.utils.utils._is_null_host_scalar(val) or isinstance( val, cudf.Scalar ): return val diff --git a/python/cudf/cudf/utils/utils.py b/python/cudf/cudf/utils/utils.py index c83c1cbe895..0adaaa60654 100644 --- a/python/cudf/cudf/utils/utils.py +++ b/python/cudf/cudf/utils/utils.py @@ -341,6 +341,15 @@ def is_na_like(obj): return obj is None or obj is cudf.NA or obj is cudf.NaT +def _is_null_host_scalar(slr) -> bool: + # slr is NA like or NaT like + return ( + is_na_like(slr) + or (isinstance(slr, (np.datetime64, np.timedelta64)) and np.isnat(slr)) + or slr is pd.NaT + ) + + def _warn_no_dask_cudf(fn): @functools.wraps(fn) def wrapper(self): diff --git a/python/pylibcudf/pylibcudf/hashing.pxd b/python/pylibcudf/pylibcudf/hashing.pxd index 2d070ddda69..fbd478f963f 100644 --- a/python/pylibcudf/pylibcudf/hashing.pxd +++ b/python/pylibcudf/pylibcudf/hashing.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. from libc.stdint cimport uint32_t, uint64_t @@ -16,6 +16,10 @@ cpdef Table murmurhash3_x64_128( uint64_t seed=* ) +cpdef Column xxhash_32( + Table input, + uint32_t seed=* +) cpdef Column xxhash_64( Table input, diff --git a/python/pylibcudf/pylibcudf/hashing.pyi b/python/pylibcudf/pylibcudf/hashing.pyi index a849f5d0729..d535d842a18 100644 --- a/python/pylibcudf/pylibcudf/hashing.pyi +++ b/python/pylibcudf/pylibcudf/hashing.pyi @@ -9,6 +9,7 @@ LIBCUDF_DEFAULT_HASH_SEED: Final[int] def murmurhash3_x86_32(input: Table, seed: int = ...) -> Column: ... def murmurhash3_x64_128(input: Table, seed: int = ...) -> Table: ... +def xxhash_32(input: Table, seed: int = ...) -> Column: ... def xxhash_64(input: Table, seed: int = ...) -> Column: ... def md5(input: Table) -> Column: ... def sha1(input: Table) -> Column: ... diff --git a/python/pylibcudf/pylibcudf/hashing.pyx b/python/pylibcudf/pylibcudf/hashing.pyx index 548cffc0ce8..1f093b20c6b 100644 --- a/python/pylibcudf/pylibcudf/hashing.pyx +++ b/python/pylibcudf/pylibcudf/hashing.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. from libc.stdint cimport uint32_t, uint64_t from libcpp.memory cimport unique_ptr from libcpp.utility cimport move @@ -13,6 +13,7 @@ from pylibcudf.libcudf.hash cimport ( sha256 as cpp_sha256, sha384 as cpp_sha384, sha512 as cpp_sha512, + xxhash_32 as cpp_xxhash_32, xxhash_64 as cpp_xxhash_64, ) from pylibcudf.libcudf.table.table cimport table @@ -30,6 +31,7 @@ __all__ = [ "sha256", "sha384", "sha512", + "xxhash_32", "xxhash_64", ] @@ -95,6 +97,37 @@ cpdef Table murmurhash3_x64_128( return Table.from_libcudf(move(c_result)) +cpdef Column xxhash_32( + Table input, + uint32_t seed=DEFAULT_HASH_SEED +): + """Computes the xxHash 32-bit hash value of each row in the given table. + + For details, see :cpp:func:`xxhash_32`. + + Parameters + ---------- + input : Table + The table of columns to hash + seed : uint32_t + Optional seed value to use for the hash function + + Returns + ------- + pylibcudf.Column + A column where each row is the hash of a row from the input + """ + + cdef unique_ptr[column] c_result + with nogil: + c_result = cpp_xxhash_32( + input.view(), + seed + ) + + return Column.from_libcudf(move(c_result)) + + cpdef Column xxhash_64( Table input, uint64_t seed=DEFAULT_HASH_SEED diff --git a/python/pylibcudf/pylibcudf/libcudf/hash.pxd b/python/pylibcudf/pylibcudf/libcudf/hash.pxd index 4e8a01b41a5..46fdf62cd6b 100644 --- a/python/pylibcudf/pylibcudf/libcudf/hash.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/hash.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. +# Copyright (c) 2020-2025, NVIDIA CORPORATION. from libc.stdint cimport uint32_t, uint64_t from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector @@ -44,6 +44,11 @@ cdef extern from "cudf/hashing.hpp" namespace "cudf::hashing" nogil: const table_view& input ) except +libcudf_exception_handler + cdef unique_ptr[column] xxhash_32( + const table_view& input, + const uint32_t seed + ) except +libcudf_exception_handler + cdef unique_ptr[column] xxhash_64( const table_view& input, const uint64_t seed diff --git a/python/pylibcudf/pylibcudf/tests/test_hashing.py b/python/pylibcudf/pylibcudf/tests/test_hashing.py index 83fb50fa4ef..7096dbe14ff 100644 --- a/python/pylibcudf/pylibcudf/tests/test_hashing.py +++ b/python/pylibcudf/pylibcudf/tests/test_hashing.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. import hashlib import struct @@ -34,7 +34,9 @@ def hash_single_uint32(val, seed=0): def hash_combine_32(lhs, rhs): - return np.uint32(lhs ^ (rhs + 0x9E3779B9 + (lhs << 6) + (lhs >> 2))) + return np.uint32( + int((lhs ^ (rhs + 0x9E3779B9 + (lhs << 6) + (lhs >> 2)))) % 2**32 + ) def uint_hash_combine_32(lhs, rhs): @@ -80,22 +82,6 @@ def list_struct_table(): return data -def python_hash_value(x, method): - if method == "murmurhash3_x86_32": - return libcudf_mmh3_x86_32(x) - elif method == "murmurhash3_x64_128": - hasher = mmh3.mmh3_x64_128(seed=plc.hashing.LIBCUDF_DEFAULT_HASH_SEED) - hasher.update(x) - # libcudf returns a tuple of two 64-bit integers - return hasher.utupledigest() - elif method == "xxhash_64": - return xxhash.xxh64( - x, seed=plc.hashing.LIBCUDF_DEFAULT_HASH_SEED - ).intdigest() - else: - return getattr(hashlib, method)(x).hexdigest() - - @pytest.mark.parametrize( "method", ["sha1", "sha224", "sha256", "sha384", "sha512", "md5"] ) @@ -115,6 +101,23 @@ def py_hasher(val): assert_column_eq(got, expect) +def test_hash_column_xxhash32(pa_scalar_input_column, plc_scalar_input_tbl): + def py_hasher(val): + return xxhash.xxh32( + scalar_to_binary(val), seed=plc.hashing.LIBCUDF_DEFAULT_HASH_SEED + ).intdigest() + + expect = pa.array( + [py_hasher(val) for val in pa_scalar_input_column.to_pylist()], + type=pa.uint32(), + ) + got = plc.hashing.xxhash_32( + plc_scalar_input_tbl, plc.hashing.LIBCUDF_DEFAULT_HASH_SEED + ) + + assert_column_eq(got, expect) + + def test_hash_column_xxhash64(pa_scalar_input_column, plc_scalar_input_tbl): def py_hasher(val): return xxhash.xxh64( @@ -125,7 +128,9 @@ def py_hasher(val): [py_hasher(val) for val in pa_scalar_input_column.to_pylist()], type=pa.uint64(), ) - got = plc.hashing.xxhash_64(plc_scalar_input_tbl, 0) + got = plc.hashing.xxhash_64( + plc_scalar_input_tbl, plc.hashing.LIBCUDF_DEFAULT_HASH_SEED + ) assert_column_eq(got, expect)