|
11 | 11 | #include <cudf/detail/offsets_iterator_factory.cuh> |
12 | 12 | #include <cudf/detail/utilities/cuda.cuh> |
13 | 13 | #include <cudf/detail/utilities/grid_1d.cuh> |
| 14 | +#include <cudf/detail/utilities/integer_utils.hpp> |
14 | 15 | #include <cudf/strings/detail/strings_children.cuh> |
15 | 16 | #include <cudf/strings/detail/utilities.hpp> |
16 | 17 | #include <cudf/strings/strings_column_view.hpp> |
17 | 18 | #include <cudf/utilities/memory_resource.hpp> |
18 | 19 | #include <cudf/utilities/prefetch.hpp> |
19 | 20 |
|
20 | 21 | #include <rmm/cuda_stream_view.hpp> |
| 22 | +#include <rmm/device_buffer.hpp> |
21 | 23 | #include <rmm/exec_policy.hpp> |
22 | 24 |
|
| 25 | +#include <cub/cub.cuh> |
23 | 26 | #include <cuda/functional> |
24 | 27 | #include <cuda/std/iterator> |
25 | 28 | #include <thrust/binary_search.h> |
26 | 29 | #include <thrust/execution_policy.h> |
27 | 30 | #include <thrust/iterator/transform_iterator.h> |
28 | 31 |
|
| 32 | +#include <cstddef> |
| 33 | + |
29 | 34 | namespace cudf { |
30 | 35 | namespace strings { |
31 | 36 | namespace detail { |
@@ -190,68 +195,6 @@ CUDF_KERNEL void gather_chars_fn_char_parallel(StringIterator strings_begin, |
190 | 195 | } |
191 | 196 | } |
192 | 197 |
|
193 | | -/** |
194 | | - * @brief Returns a new chars column using the specified indices to select |
195 | | - * strings from the input iterator. |
196 | | - * |
197 | | - * This uses a character-parallel gather CUDA kernel that performs very |
198 | | - * well on a strings column with long strings (e.g. average > 64 bytes). |
199 | | - * |
200 | | - * @tparam StringIterator Iterator should produce `string_view` objects. |
201 | | - * @tparam MapIterator Iterator for retrieving integer indices of the `StringIterator`. |
202 | | - * |
203 | | - * @param strings_begin Start of the iterator to retrieve `string_view` instances. |
204 | | - * @param map_begin Start of index iterator. |
205 | | - * @param map_end End of index iterator. |
206 | | - * @param offsets The offset values to be associated with the output chars column. |
207 | | - * @param chars_bytes The total number of bytes for the output chars column. |
208 | | - * @param stream CUDA stream used for device memory operations and kernel launches. |
209 | | - * @param mr Device memory resource used to allocate the returned column's device memory. |
210 | | - * @return New chars column fit for a strings column. |
211 | | - */ |
212 | | -template <typename StringIterator, typename MapIterator> |
213 | | -rmm::device_uvector<char> gather_chars(StringIterator strings_begin, |
214 | | - MapIterator map_begin, |
215 | | - MapIterator map_end, |
216 | | - cudf::detail::input_offsetalator const offsets, |
217 | | - int64_t chars_bytes, |
218 | | - rmm::cuda_stream_view stream, |
219 | | - rmm::device_async_resource_ref mr) |
220 | | -{ |
221 | | - auto const output_count = std::distance(map_begin, map_end); |
222 | | - if (output_count == 0) return rmm::device_uvector<char>(0, stream, mr); |
223 | | - |
224 | | - auto chars_data = rmm::device_uvector<char>(chars_bytes, stream, mr); |
225 | | - cudf::prefetch::detail::prefetch(chars_data, stream); |
226 | | - auto d_chars = chars_data.data(); |
227 | | - |
228 | | - constexpr int warps_per_threadblock = 4; |
229 | | - // String parallel strategy will be used if average string length is above this threshold. |
230 | | - // Otherwise, char parallel strategy will be used. |
231 | | - constexpr int64_t string_parallel_threshold = 32; |
232 | | - |
233 | | - int64_t const average_string_length = chars_bytes / output_count; |
234 | | - |
235 | | - if (average_string_length > string_parallel_threshold) { |
236 | | - constexpr int max_threadblocks = 65536; |
237 | | - gather_chars_fn_string_parallel<<< |
238 | | - min((static_cast<int>(output_count) + warps_per_threadblock - 1) / warps_per_threadblock, |
239 | | - max_threadblocks), |
240 | | - warps_per_threadblock * cudf::detail::warp_size, |
241 | | - 0, |
242 | | - stream.value()>>>(strings_begin, d_chars, offsets, map_begin, output_count); |
243 | | - } else { |
244 | | - constexpr int strings_per_threadblock = 32; |
245 | | - gather_chars_fn_char_parallel<strings_per_threadblock> |
246 | | - <<<(output_count + strings_per_threadblock - 1) / strings_per_threadblock, |
247 | | - warps_per_threadblock * cudf::detail::warp_size, |
248 | | - 0, |
249 | | - stream.value()>>>(strings_begin, d_chars, offsets, map_begin, output_count); |
250 | | - } |
251 | | -
|
252 | | - return chars_data; |
253 | | -} |
254 | | -
|
255 | 198 | /** |
256 | 199 | * @brief Returns a new strings column using the specified indices to select |
257 | 200 | * elements from the `strings` column. |
@@ -299,15 +242,94 @@ std::unique_ptr<cudf::column> gather(strings_column_view const& strings, |
299 | 242 | if (not d_strings.is_valid(idx)) { return 0; } |
300 | 243 | return static_cast<size_type>(d_in_offsets[idx + 1] - d_in_offsets[idx]); |
301 | 244 | })); |
302 | | - auto [out_offsets_column, total_bytes] = cudf::strings::detail::make_offsets_child_column( |
| 245 | + |
| 246 | + auto [out_offsets_column, out_char_bytes] = cudf::strings::detail::make_offsets_child_column( |
303 | 247 | sizes_itr, sizes_itr + output_count, stream, mr); |
304 | 248 |
|
305 | | - // build chars column |
| 249 | + // build out offset view |
306 | 250 | auto const offsets_view = |
307 | 251 | cudf::detail::offsetalator_factory::make_input_iterator(out_offsets_column->view()); |
308 | 252 | cudf::prefetch::detail::prefetch(strings.chars_begin(stream), strings.chars_size(stream), stream); |
309 | | - auto out_chars_data = gather_chars( |
310 | | - d_strings->begin<string_view>(), begin, end, offsets_view, total_bytes, stream, mr); |
| 253 | + |
| 254 | + // build output char column |
| 255 | + auto out_chars_data = rmm::device_uvector<char>(out_char_bytes, stream, mr); |
| 256 | + cudf::prefetch::detail::prefetch(out_chars_data, stream); |
| 257 | + auto d_out_chars = out_chars_data.data(); |
| 258 | + |
| 259 | + constexpr int warps_per_threadblock = 4; |
| 260 | + // String parallel strategy will be used if average string length is above this threshold. |
| 261 | + // Otherwise, char parallel strategy will be used. |
| 262 | + constexpr int64_t string_parallel_threshold = 32; |
| 263 | + |
| 264 | + int64_t const average_string_length = out_char_bytes / output_count; |
| 265 | + |
| 266 | + if (average_string_length > string_parallel_threshold) { |
| 267 | + constexpr int max_threadblocks = 65536; |
| 268 | + auto const grid_size = |
| 269 | + min(cudf::util::div_rounding_up_safe(static_cast<int64_t>(output_count), |
| 270 | + static_cast<int64_t>(warps_per_threadblock)), |
| 271 | + static_cast<int64_t>(max_threadblocks)); |
| 272 | + gather_chars_fn_string_parallel<<<grid_size, |
| 273 | + warps_per_threadblock * cudf::detail::warp_size, |
| 274 | + 0, |
| 275 | + stream.value()>>>( |
| 276 | + d_strings->begin<string_view>(), d_out_chars, offsets_view, begin, output_count); |
| 277 | + } else { |
| 278 | + // Threshold is based on empirical data on H100. |
| 279 | + // If row count is above this threshold we use the cub::DeviceMemcpy::Batched API, otherwise we |
| 280 | + // use the custom cuDF kernel. |
| 281 | + constexpr int64_t cub_batch_copy_threshold = 1024 * 1024 * 0.5; |
| 282 | + |
| 283 | + if (output_count < cub_batch_copy_threshold) { |
| 284 | + constexpr int strings_per_threadblock = 32; |
| 285 | + auto const grid_size = cudf::util::div_rounding_up_safe( |
| 286 | + static_cast<int64_t>(output_count), static_cast<int64_t>(strings_per_threadblock)); |
| 287 | + gather_chars_fn_char_parallel<strings_per_threadblock> |
| 288 | + <<<grid_size, warps_per_threadblock * cudf::detail::warp_size, 0, stream.value()>>>( |
| 289 | + d_strings->begin<string_view>(), d_out_chars, offsets_view, begin, output_count); |
| 290 | + } else { |
| 291 | + // Iterator over the character column of input strings to gather |
| 292 | + auto in_chars_itr = thrust::make_transform_iterator( |
| 293 | + begin, |
| 294 | + cuda::proclaim_return_type<const char*>([d_strings = *d_strings] __device__(size_type idx) { |
| 295 | + if (NullifyOutOfBounds && (idx < 0 || idx >= d_strings.size())) { |
| 296 | + return static_cast<const char*>(nullptr); |
| 297 | + } |
| 298 | + if (not d_strings.is_valid(idx)) { return static_cast<const char*>(nullptr); } |
| 299 | + return d_strings.element<string_view>(idx).data(); |
| 300 | + })); |
| 301 | + |
| 302 | + // Iterator over the output locations to write the output |
| 303 | + auto out_chars_itr = cudf::detail::make_counting_transform_iterator( |
| 304 | + 0, |
| 305 | + cuda::proclaim_return_type<char*>( |
| 306 | + [d_strings = *d_strings, offsets_view, d_out_chars] __device__(size_type idx) { |
| 307 | + return d_out_chars + offsets_view[idx]; |
| 308 | + })); |
| 309 | + |
| 310 | + // Determine temporary device storage requirements |
| 311 | + size_t temp_storage_bytes = 0; |
| 312 | + cub::DeviceMemcpy::Batched(nullptr, |
| 313 | + temp_storage_bytes, |
| 314 | + in_chars_itr, |
| 315 | + out_chars_itr, |
| 316 | + sizes_itr, |
| 317 | + output_count, |
| 318 | + stream.value()); |
| 319 | + |
| 320 | + // Allocate temporary storage |
| 321 | + auto d_temp_storage = rmm::device_buffer(temp_storage_bytes, stream, mr); |
| 322 | + |
| 323 | + // Run batched copy algorithm |
| 324 | + cub::DeviceMemcpy::Batched(d_temp_storage.data(), |
| 325 | + temp_storage_bytes, |
| 326 | + in_chars_itr, |
| 327 | + out_chars_itr, |
| 328 | + sizes_itr, |
| 329 | + output_count, |
| 330 | + stream.value()); |
| 331 | + } |
| 332 | + } |
311 | 333 |
|
312 | 334 | return make_strings_column(output_count, |
313 | 335 | std::move(out_offsets_column), |
|
0 commit comments