-
Notifications
You must be signed in to change notification settings - Fork 983
[DO NOT MERGE] Remove unnecessary synchronization (mis-sync) during Parquet reading #18968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-25.08
Are you sure you want to change the base?
Changes from all commits
bc8a196
9bb5399
14721d0
02cf20e
a040bb8
6bb4263
7a75b63
40c35f0
bfee8ce
de2c750
5316923
8c0ca98
ad2bd94
c11a00e
c4bfa96
c9ac9fb
75bd459
59983c8
bdc814f
87a55b2
b109204
24f02c6
9b3996f
e05f53e
993d16e
506739e
a99869f
1ec74c9
ca71572
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
JigaoLuo marked this conversation as resolved.
Show resolved
Hide resolved
|
JigaoLuo marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| /* | ||
| * Copyright (c) 2024, NVIDIA CORPORATION. | ||
| * Copyright (c) 2024-2025, NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
|
|
@@ -71,6 +71,26 @@ void cuda_memcpy_async(host_span<T> dst, device_span<T const> src, rmm::cuda_str | |
| stream); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Asynchronously copies data from host memory to host memory. | ||
| * | ||
| * Implementation may use different strategies depending on the size and type of host data. | ||
| * | ||
| * @param dst Destination host memory | ||
| * @param src Source device memory | ||
| * @param stream CUDA stream used for the copy | ||
| */ | ||
| template <typename T> | ||
| void cuda_memcpy_async(host_span<T> dst, host_span<T const> src, rmm::cuda_stream_view stream) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It kind of depends, right? Consider if the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wence- Thanks. It is exactly what I have in mind. In my opinion, |
||
| { | ||
| CUDF_EXPECTS(dst.size() == src.size(), "Mismatched sizes in cuda_memcpy_async"); | ||
| cuda_memcpy_async_impl(dst.data(), | ||
| src.data(), | ||
| src.size_bytes(), | ||
| host_memory_kind::PAGEABLE, // use copy_pageable for host-to-host copy | ||
| stream); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Synchronously copies data from host to device memory. | ||
| * | ||
|
|
@@ -103,5 +123,21 @@ void cuda_memcpy(host_span<T> dst, device_span<T const> src, rmm::cuda_stream_vi | |
| stream.synchronize(); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Synchronously copies data from host memory to host memory. | ||
| * | ||
| * Implementation may use different strategies depending on the size and type of host data. | ||
| * | ||
| * @param dst Destination host memory | ||
| * @param src Source device memory | ||
| * @param stream CUDA stream used for the copy | ||
| */ | ||
| template <typename T> | ||
| void cuda_memcpy(host_span<T> dst, host_span<T const> src, rmm::cuda_stream_view stream) | ||
| { | ||
| cuda_memcpy_async(dst, src, stream); | ||
| stream.synchronize(); | ||
| } | ||
|
|
||
| } // namespace detail | ||
| } // namespace CUDF_EXPORT cudf | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -354,7 +354,7 @@ make_device_uvector_sync(Container const& c, | |
| * @note This function does not synchronize `stream` after the copy. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The device data to copy | ||
| * @param source_data The device_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
|
|
@@ -396,7 +396,7 @@ std::vector<typename Container::value_type> make_std_vector_async(Container cons | |
| * @note This function does a synchronize on `stream` after the copy. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The device data to copy | ||
| * @param source_data The device_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
|
|
@@ -499,7 +499,7 @@ host_vector<T> make_empty_host_vector(size_t capacity, rmm::cuda_stream_view str | |
| * using a pinned memory resource. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The device data to copy | ||
| * @param source_data The device_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
|
|
@@ -542,7 +542,7 @@ host_vector<typename Container::value_type> make_host_vector_async(Container con | |
| * using a pinned memory resource. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The device data to copy | ||
| * @param source_data The device_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
|
|
@@ -637,6 +637,113 @@ host_vector<T> make_pinned_vector(size_t size, rmm::cuda_stream_view stream) | |
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Asynchronously construct a `cudf::detail::host_vector` containing a copy of data from a | ||
| * `device_span` | ||
| * | ||
| * @note This function does not synchronize `stream` after the copy. The returned vector use | ||
| * a pinned memory resource. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The device_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
| template <typename T> | ||
| host_vector<T> make_pinned_vector_async(device_span<T const> v, rmm::cuda_stream_view stream) | ||
| { | ||
| auto result = make_pinned_vector<T>(v.size(), stream); | ||
| cuda_memcpy_async<T>(result, v, stream); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * | ||
| * @brief Asynchronously construct a `cudf::detail::host_vector` containing a copy of data from a | ||
| * device container | ||
| * | ||
| * @note This function does not synchronize `stream` after the copy. The returned vector uses | ||
| * a pinned memory resource. | ||
| * | ||
| * @tparam Container The type of the container to copy from | ||
| * @tparam T The type of the data to copy | ||
| * @param c The input device container from which to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
| template < | ||
| typename Container, | ||
| std::enable_if_t< | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: We have moved to using |
||
| std::is_convertible_v<Container, device_span<typename Container::value_type const>>>* = nullptr> | ||
| host_vector<typename Container::value_type> make_pinned_vector_async(Container const& c, | ||
| rmm::cuda_stream_view stream) | ||
| { | ||
| return make_pinned_vector_async(device_span<typename Container::value_type const>{c}, stream); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Asynchronously construct a `cudf::detail::host_vector` containing a copy of data from a | ||
| * `host_span` | ||
| * | ||
| * @note This function does not synchronize `stream` after the copy. The returned vector use | ||
| * a pinned memory resource. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The host_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
| template <typename T> | ||
| host_vector<T> make_pinned_vector_async(host_span<T const> v, rmm::cuda_stream_view stream) | ||
| { | ||
| auto result = make_pinned_vector<T>(v.size(), stream); | ||
| cuda_memcpy_async<T>(result, v, stream); | ||
| return result; | ||
| } | ||
JigaoLuo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * @brief Synchronously construct a `cudf::detail::host_vector` containing a copy of data from a | ||
| * `device_span` | ||
| * | ||
| * @note This function does a synchronize on `stream` after the copy. The returned vector used | ||
| * a pinned memory resource. | ||
| * | ||
| * @tparam T The type of the data to copy | ||
| * @param source_data The device_span of data to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
| template <typename T> | ||
| host_vector<T> make_pinned_vector(device_span<T const> v, rmm::cuda_stream_view stream) | ||
| { | ||
| auto result = make_pinned_vector_async(v, stream); | ||
| stream.synchronize(); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Synchronously construct a `cudf::detail::host_vector` containing a copy of data from a | ||
| * device container | ||
| * | ||
| * @note This function synchronizes `stream` after the copy. The returned vector used | ||
| * a pinned memory resource. | ||
| * | ||
| * @tparam Container The type of the container to copy from | ||
| * @tparam T The type of the data to copy | ||
| * @param c The input device container from which to copy | ||
| * @param stream The stream on which to perform the copy | ||
| * @return The data copied to the host | ||
| */ | ||
| template < | ||
| typename Container, | ||
| std::enable_if_t< | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here |
||
| std::is_convertible_v<Container, device_span<typename Container::value_type const>>>* = nullptr> | ||
| host_vector<typename Container::value_type> make_pinned_vector(Container const& c, | ||
| rmm::cuda_stream_view stream) | ||
| { | ||
| return make_pinned_vector(device_span<typename Container::value_type const>{c}, stream); | ||
| } | ||
|
|
||
| /** | ||
| * @copydoc cudf::detail::make_pinned_vector(size_t size, rmm::cuda_stream_view stream) | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,16 @@ std::unique_ptr<scalar> reduce(InputIterator d_in, | |
| { | ||
| auto const binary_op = cudf::detail::cast_functor<OutputType>(op.get_binary_op()); | ||
| auto const initial_value = init.value_or(op.template get_identity<OutputType>()); | ||
| auto dev_result = rmm::device_scalar<OutputType>{initial_value, stream, mr}; | ||
| auto host_scalar = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is just one value being copied to device, is there any performance advantage from first creating a pinned vector of size 1 and then using that to copy to device?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main goal of the code changes is to get rid of any cudaMemcpy using pageable memory. Here, the old code did a copy from the stack (pageable) to the device, and it can cause “miss-sync” by messing with other running CUDA streams. Sorry, I didn’t explain this better up front! |
||
| cudf::detail::make_pinned_vector_async<OutputType>(1, stream); // as host pinned memory | ||
| CUDF_CUDA_TRY(cudaMemcpyAsync( | ||
JigaoLuo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| host_scalar.data(), &initial_value, sizeof(OutputType), cudaMemcpyHostToHost, stream.value())); | ||
|
Comment on lines
+72
to
+73
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Host to Host copy |
||
| rmm::device_scalar<OutputType> dev_result{stream, mr}; | ||
| CUDF_CUDA_TRY(cudaMemcpyAsync(dev_result.data(), | ||
| host_scalar.data(), | ||
| sizeof(OutputType), | ||
| cudaMemcpyHostToDevice, | ||
| stream.value())); // device <- host pinned | ||
|
|
||
| // Allocate temporary storage | ||
| rmm::device_buffer d_temp_storage; | ||
|
|
@@ -231,6 +240,76 @@ std::unique_ptr<scalar> reduce(InputIterator d_in, | |
| return std::unique_ptr<scalar>(result); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Compute the specified by-key reduction over the input range of elements. | ||
| * | ||
| * @param[in] d_keys_in the begin iterator of input keys | ||
| * @param[out] d_unique_out the begin iterator of output keys (one key per run) | ||
| * @param[in] d_values_in the begin iterator of input values | ||
| * @param[out] d_aggregates_out the begin iterator of output aggregated values (one aggregate | ||
| * per run) | ||
| * @param[out] d_num_runs_out the pointer of total number of runs encountered (i.e., the length | ||
| * of d_unique_out) | ||
| * @param[in] op the reduction operator | ||
| * @param[in] num_items the number of key+value pairs (i.e., the length of d_in_keys and | ||
| * d_in_values) | ||
| * @param[in] stream CUDA stream used for device memory operations and kernel launches | ||
| * @param[in] mr Device memory resource used to allocate the returned scalar's | ||
| * device memory | ||
| * | ||
| * @tparam Op the reduction operator with device binary operator | ||
| * @tparam KeysInputIteratorT the input keys iterator | ||
| * @tparam UniqueOutputIteratorT the output keys iterator | ||
| * @tparam ValuesInputIteratorT the input values iterator | ||
| * @tparam AggregatesOutputIteratorT the output values iterator | ||
| * @tparam OutputType the output type of reduction | ||
| */ | ||
| template <typename Op, | ||
| typename KeysInputIteratorT, | ||
| typename UniqueOutputIteratorT, | ||
| typename ValuesInputIteratorT, | ||
| typename AggregatesOutputIteratorT, | ||
| typename OutputType = cuda::std::iter_value_t<KeysInputIteratorT>, | ||
| std::enable_if_t<is_fixed_width<OutputType>() && | ||
| not cudf::is_fixed_point<OutputType>()>* = nullptr> | ||
| void reduce_by_key(KeysInputIteratorT d_keys_in, | ||
| UniqueOutputIteratorT d_unique_out, | ||
| ValuesInputIteratorT d_values_in, | ||
| AggregatesOutputIteratorT d_aggregates_out, | ||
| cudf::size_type* d_num_runs_out, | ||
| op::simple_op<Op> op, | ||
| cudf::size_type num_items, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| { | ||
| auto const binary_op = cudf::detail::cast_functor<OutputType>(op.get_binary_op()); | ||
| // Allocate temporary storage | ||
| rmm::device_buffer d_temp_storage; | ||
| size_t temp_storage_bytes = 0; | ||
| cub::DeviceReduce::ReduceByKey(d_temp_storage.data(), | ||
| temp_storage_bytes, | ||
| d_keys_in, | ||
| d_unique_out, | ||
| d_values_in, | ||
| d_aggregates_out, | ||
| d_num_runs_out, | ||
| binary_op, | ||
| num_items, | ||
| stream.value()); | ||
| d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream, mr}; | ||
|
|
||
| // Run reduction | ||
| cub::DeviceReduce::ReduceByKey(d_temp_storage.data(), | ||
| temp_storage_bytes, | ||
| d_keys_in, | ||
| d_unique_out, | ||
| d_values_in, | ||
| d_aggregates_out, | ||
| d_num_runs_out, | ||
| binary_op, | ||
| num_items, | ||
| stream.value()); | ||
| } | ||
| } // namespace detail | ||
| } // namespace reduction | ||
| } // namespace cudf | ||
Uh oh!
There was an error while loading. Please reload this page.