Skip to content

Commit

Permalink
Batch memcpy the last offsets for output buffers of str and list cols…
Browse files Browse the repository at this point in the history
… in PQ reader (rapidsai#16905)

This PR adds the capability to batch memcpy the last offsets for the output buffers of string and list columns in PQ reader. This reduces the overhead from several `cudaMemcpyAsync` calls when reading wide strings and/or list columns tables. This optimization was found as well as ORC changes were contributed by @vuule. See this [comment](rapidsai#16905 (comment)) for performance improvement data and discussion.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: rapidsai#16905
  • Loading branch information
mhaseeb123 authored Oct 3, 2024
1 parent 466e379 commit 7ae5360
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 147 deletions.
5 changes: 0 additions & 5 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,6 @@ ConfigureNVBench(JSON_READER_NVBENCH io/json/nested_json.cpp io/json/json_reader
ConfigureNVBench(JSON_READER_OPTION_NVBENCH io/json/json_reader_option.cpp)
ConfigureNVBench(JSON_WRITER_NVBENCH io/json/json_writer.cpp)

# ##################################################################################################
# * multi buffer memset benchmark
# ----------------------------------------------------------------------
ConfigureNVBench(BATCHED_MEMSET_BENCH io/utilities/batched_memset_bench.cpp)

# ##################################################################################################
# * io benchmark ---------------------------------------------------------------------
ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp)
Expand Down
101 changes: 0 additions & 101 deletions cpp/benchmarks/io/utilities/batched_memset_bench.cpp

This file was deleted.

67 changes: 67 additions & 0 deletions cpp/include/cudf/detail/utilities/batched_memcpy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/detail/iterator.cuh>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>

#include <cub/device/device_memcpy.cuh>
#include <cuda/functional>
#include <thrust/iterator/constant_iterator.h>

namespace CUDF_EXPORT cudf {
namespace detail {

/**
* @brief A helper function that copies a vector of vectors from source to destination addresses in
* a batched manner.
*
* @tparam SrcIterator **[inferred]** The type of device-accessible source addresses iterator
* @tparam DstIterator **[inferred]** The type of device-accessible destination address iterator
* @tparam SizeIterator **[inferred]** The type of device-accessible buffer size iterator
*
* @param src_iter Device-accessible iterator to source addresses
* @param dst_iter Device-accessible iterator to destination addresses
* @param size_iter Device-accessible iterator to the buffer sizes (in bytes)
* @param num_buffs Number of buffers to be copied
* @param stream CUDA stream to use
*/
template <typename SrcIterator, typename DstIterator, typename SizeIterator>
void batched_memcpy_async(SrcIterator src_iter,
DstIterator dst_iter,
SizeIterator size_iter,
size_t num_buffs,
rmm::cuda_stream_view stream)
{
size_t temp_storage_bytes = 0;
cub::DeviceMemcpy::Batched(
nullptr, temp_storage_bytes, src_iter, dst_iter, size_iter, num_buffs, stream.value());

rmm::device_buffer d_temp_storage{temp_storage_bytes, stream.value()};

cub::DeviceMemcpy::Batched(d_temp_storage.data(),
temp_storage_bytes,
src_iter,
dst_iter,
size_iter,
num_buffs,
stream.value());
}

} // namespace detail
} // namespace CUDF_EXPORT cudf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <thrust/transform.h>

namespace CUDF_EXPORT cudf {
namespace io::detail {
namespace detail {

/**
* @brief A helper function that takes in a vector of device spans and memsets them to the
Expand Down Expand Up @@ -78,5 +78,5 @@ void batched_memset(std::vector<cudf::device_span<T>> const& bufs,
d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);
}

} // namespace io::detail
} // namespace detail
} // namespace CUDF_EXPORT cudf
64 changes: 44 additions & 20 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "orc_gpu.hpp"

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/utilities/batched_memcpy.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/logger.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
Expand Down Expand Up @@ -1087,37 +1089,42 @@ CUDF_KERNEL void __launch_bounds__(block_size)
/**
* @brief Merge chunked column data into a single contiguous stream
*
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] streams List of encoder chunk streams [column][rowgroup]
* @param[in] strm_desc StripeStream device array [stripe][stream]
* @param[in] streams List of encoder chunk streams [column][rowgroup]
* @param[out] srcs List of source encoder chunk stream data addresses
* @param[out] dsts List of destination StripeStream data addresses
* @param[out] sizes List of stream sizes in bytes
*/
// blockDim {compact_streams_block_size,1,1}
CUDF_KERNEL void __launch_bounds__(compact_streams_block_size)
gpuCompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> streams)
gpuInitBatchedMemcpy(device_2dspan<StripeStream const> strm_desc,
device_2dspan<encoder_chunk_streams> streams,
device_span<uint8_t*> srcs,
device_span<uint8_t*> dsts,
device_span<size_t> sizes)
{
__shared__ __align__(16) StripeStream ss;

auto const stripe_id = blockIdx.x;
auto const stripe_id = cudf::detail::grid_1d::global_thread_id();
auto const stream_id = blockIdx.y;
auto const t = threadIdx.x;
if (stripe_id >= strm_desc.size().first) { return; }

if (t == 0) { ss = strm_desc[stripe_id][stream_id]; }
__syncthreads();
auto const out_id = stream_id * strm_desc.size().first + stripe_id;
StripeStream ss = strm_desc[stripe_id][stream_id];

if (ss.data_ptr == nullptr) { return; }

auto const cid = ss.stream_type;
auto dst_ptr = ss.data_ptr;
for (auto group = ss.first_chunk_id; group < ss.first_chunk_id + ss.num_chunks; ++group) {
auto const out_id = stream_id * streams.size().second + group;
srcs[out_id] = streams[ss.column_id][group].data_ptrs[cid];
dsts[out_id] = dst_ptr;

// Also update the stream here, data will be copied in a separate kernel
streams[ss.column_id][group].data_ptrs[cid] = dst_ptr;

auto const len = streams[ss.column_id][group].lengths[cid];
if (len > 0) {
auto const src_ptr = streams[ss.column_id][group].data_ptrs[cid];
for (uint32_t i = t; i < len; i += blockDim.x) {
dst_ptr[i] = src_ptr[i];
}
__syncthreads();
}
if (t == 0) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
// len is the size (in bytes) of the current stream.
sizes[out_id] = len;
dst_ptr += len;
}
}
Expand Down Expand Up @@ -1325,9 +1332,26 @@ void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
rmm::cuda_stream_view stream)
{
auto const num_rowgroups = enc_streams.size().second;
auto const num_streams = strm_desc.size().second;
auto const num_stripes = strm_desc.size().first;
auto const num_chunks = num_rowgroups * num_streams;
auto srcs = cudf::detail::make_zeroed_device_uvector_async<uint8_t*>(
num_chunks, stream, rmm::mr::get_current_device_resource());
auto dsts = cudf::detail::make_zeroed_device_uvector_async<uint8_t*>(
num_chunks, stream, rmm::mr::get_current_device_resource());
auto lengths = cudf::detail::make_zeroed_device_uvector_async<size_t>(
num_chunks, stream, rmm::mr::get_current_device_resource());

dim3 dim_block(compact_streams_block_size, 1);
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second);
gpuCompactOrcDataStreams<<<dim_grid, dim_block, 0, stream.value()>>>(strm_desc, enc_streams);
dim3 dim_grid(cudf::util::div_rounding_up_unsafe(num_stripes, compact_streams_block_size),
strm_desc.size().second);
gpuInitBatchedMemcpy<<<dim_grid, dim_block, 0, stream.value()>>>(
strm_desc, enc_streams, srcs, dsts, lengths);

// Copy streams in a batched manner.
cudf::detail::batched_memcpy_async(
srcs.begin(), dsts.begin(), lengths.begin(), lengths.size(), stream);
}

std::optional<writer_compression_statistics> CompressOrcDataStreams(
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "page_data.cuh"
#include "page_decode.cuh"

#include <cudf/detail/utilities/batched_memcpy.hpp>

#include <rmm/exec_policy.hpp>

#include <thrust/reduce.h>
Expand Down Expand Up @@ -466,4 +468,28 @@ void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span<PageInfo> pages,
}
}

void WriteFinalOffsets(host_span<size_type const> offsets,
host_span<size_type* const> buff_addrs,
rmm::cuda_stream_view stream)
{
// Copy offsets to device and create an iterator
auto d_src_data = cudf::detail::make_device_uvector_async(
offsets, stream, cudf::get_current_device_resource_ref());
// Iterator for the source (scalar) data
auto src_iter = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<size_type*>(
[src = d_src_data.begin()] __device__(std::size_t i) { return src + i; }));

// Copy buffer addresses to device and create an iterator
auto d_dst_addrs = cudf::detail::make_device_uvector_async(
buff_addrs, stream, cudf::get_current_device_resource_ref());
// size_iter is simply a constant iterator of sizeof(size_type) bytes.
auto size_iter = thrust::make_constant_iterator(sizeof(size_type));

// Copy offsets to buffers in batched manner.
cudf::detail::batched_memcpy_async(
src_iter, d_dst_addrs.begin(), size_iter, offsets.size(), stream);
}

} // namespace cudf::io::parquet::detail
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,18 @@ void DecodeSplitPageData(cudf::detail::hostdevice_span<PageInfo> pages,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
* @brief Writes the final offsets to the corresponding list and string buffer end addresses in a
* batched manner.
*
* @param offsets Host span of final offsets
* @param buff_addrs Host span of corresponding output col buffer end addresses
* @param stream CUDA stream to use
*/
void WriteFinalOffsets(host_span<size_type const> offsets,
host_span<size_type* const> buff_addrs,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for reading the string column data stored in the pages
*
Expand Down
Loading

0 comments on commit 7ae5360

Please sign in to comment.