Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/benchmarks/string/copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,7 @@ NVBENCH_BENCH(bench_copy)
.set_name("copy")
.add_int64_axis("min_width", {0})
.add_int64_axis("max_width", {32, 64, 128, 256})
.add_int64_axis("num_rows", {32768, 262144, 2097152})
.add_int64_axis(
"num_rows",
{32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, 16777216, 33554432})
.add_string_axis("api", {"gather", "scatter"});
150 changes: 84 additions & 66 deletions cpp/include/cudf/strings/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#include <cudf/utilities/prefetch.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>

#include <cub/cub.cuh>
#include <cuda/functional>
#include <cuda/std/iterator>
#include <thrust/binary_search.h>
Expand Down Expand Up @@ -190,68 +192,6 @@ CUDF_KERNEL void gather_chars_fn_char_parallel(StringIterator strings_begin,
}
}

/**
* @brief Returns a new chars column using the specified indices to select
* strings from the input iterator.
*
* This uses a character-parallel gather CUDA kernel that performs very
* well on a strings column with long strings (e.g. average > 64 bytes).
*
* @tparam StringIterator Iterator should produce `string_view` objects.
* @tparam MapIterator Iterator for retrieving integer indices of the `StringIterator`.
*
* @param strings_begin Start of the iterator to retrieve `string_view` instances.
* @param map_begin Start of index iterator.
* @param map_end End of index iterator.
* @param offsets The offset values to be associated with the output chars column.
* @param chars_bytes The total number of bytes for the output chars column.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return New chars column fit for a strings column.
*/
template <typename StringIterator, typename MapIterator>
rmm::device_uvector<char> gather_chars(StringIterator strings_begin,
MapIterator map_begin,
MapIterator map_end,
cudf::detail::input_offsetalator const offsets,
int64_t chars_bytes,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const output_count = std::distance(map_begin, map_end);
if (output_count == 0) return rmm::device_uvector<char>(0, stream, mr);

auto chars_data = rmm::device_uvector<char>(chars_bytes, stream, mr);
cudf::prefetch::detail::prefetch(chars_data, stream);
auto d_chars = chars_data.data();

constexpr int warps_per_threadblock = 4;
// String parallel strategy will be used if average string length is above this threshold.
// Otherwise, char parallel strategy will be used.
constexpr int64_t string_parallel_threshold = 32;

int64_t const average_string_length = chars_bytes / output_count;

if (average_string_length > string_parallel_threshold) {
constexpr int max_threadblocks = 65536;
gather_chars_fn_string_parallel<<<
min((static_cast<int>(output_count) + warps_per_threadblock - 1) / warps_per_threadblock,
max_threadblocks),
warps_per_threadblock * cudf::detail::warp_size,
0,
stream.value()>>>(strings_begin, d_chars, offsets, map_begin, output_count);
} else {
constexpr int strings_per_threadblock = 32;
gather_chars_fn_char_parallel<strings_per_threadblock>
<<<(output_count + strings_per_threadblock - 1) / strings_per_threadblock,
warps_per_threadblock * cudf::detail::warp_size,
0,
stream.value()>>>(strings_begin, d_chars, offsets, map_begin, output_count);
}

return chars_data;
}

/**
* @brief Returns a new strings column using the specified indices to select
* elements from the `strings` column.
Expand Down Expand Up @@ -299,15 +239,93 @@ std::unique_ptr<cudf::column> gather(strings_column_view const& strings,
if (not d_strings.is_valid(idx)) { return 0; }
return static_cast<size_type>(d_in_offsets[idx + 1] - d_in_offsets[idx]);
}));
auto [out_offsets_column, total_bytes] = cudf::strings::detail::make_offsets_child_column(

auto [out_offsets_column, out_char_bytes] = cudf::strings::detail::make_offsets_child_column(
sizes_itr, sizes_itr + output_count, stream, mr);

// build chars column
// build out offset view
auto const offsets_view =
cudf::detail::offsetalator_factory::make_input_iterator(out_offsets_column->view());
cudf::prefetch::detail::prefetch(strings.chars_begin(stream), strings.chars_size(stream), stream);
auto out_chars_data = gather_chars(
d_strings->begin<string_view>(), begin, end, offsets_view, total_bytes, stream, mr);

// build output char column
auto out_chars_data = rmm::device_uvector<char>(out_char_bytes, stream, mr);
cudf::prefetch::detail::prefetch(out_chars_data, stream);
auto d_out_chars = out_chars_data.data();

constexpr int warps_per_threadblock = 4;
// String parallel strategy will be used if average string length is above this threshold.
// Otherwise, char parallel strategy will be used.
constexpr int64_t string_parallel_threshold = 32;

int64_t const average_string_length = out_char_bytes / output_count;

if (average_string_length > string_parallel_threshold) {
constexpr int max_threadblocks = 65536;
gather_chars_fn_string_parallel<<<
min((static_cast<int>(output_count) + warps_per_threadblock - 1) / warps_per_threadblock,
max_threadblocks),
warps_per_threadblock * cudf::detail::warp_size,
0,
stream.value()>>>(
d_strings->begin<string_view>(), d_out_chars, offsets_view, begin, output_count);
} else {
// Threshold is based on empirical data on H100.
// If row count is above this threshold we use the cub::DeviceMemcpy::Batched API, otherwise we
// use the custom cuDF kernel.
constexpr int64_t cub_batch_copy_threshold = 1024 * 1024 * 0.5;

if (output_count < cub_batch_copy_threshold) {
constexpr int strings_per_threadblock = 32;
gather_chars_fn_char_parallel<strings_per_threadblock>
<<<(output_count + strings_per_threadblock - 1) / strings_per_threadblock,
warps_per_threadblock * cudf::detail::warp_size,
0,
stream.value()>>>(
d_strings->begin<string_view>(), d_out_chars, offsets_view, begin, output_count);
} else {
// Iterator over the character column of input strings to gather
auto in_chars_itr = thrust::make_transform_iterator(
begin,
cuda::proclaim_return_type<const char*>([d_strings = *d_strings] __device__(size_type idx) {
if (NullifyOutOfBounds && (idx < 0 || idx >= d_strings.size())) {
return static_cast<const char*>(nullptr);
}
if (not d_strings.is_valid(idx)) { return static_cast<const char*>(nullptr); }
return d_strings.element<string_view>(idx).data();
}));

// Iterator over the output locations to write the output
auto out_chars_itr = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<char*>(
[d_strings = *d_strings, offsets_view, d_out_chars] __device__(size_type idx) {
return d_out_chars + offsets_view[idx];
}));

// Determine temporary device storage requirements
size_t temp_storage_bytes = 0;
cub::DeviceMemcpy::Batched(nullptr,
temp_storage_bytes,
in_chars_itr,
out_chars_itr,
sizes_itr,
output_count,
stream.value());

// Allocate temporary storage
auto d_temp_storage = rmm::device_buffer(temp_storage_bytes, stream, mr);

// Run batched copy algorithm
cub::DeviceMemcpy::Batched(d_temp_storage.data(),
temp_storage_bytes,
in_chars_itr,
out_chars_itr,
sizes_itr,
output_count,
stream.value());
}
}

return make_strings_column(output_count,
std::move(out_offsets_column),
Expand Down
57 changes: 56 additions & 1 deletion cpp/tests/copying/gather_str_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2020-2024, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2020-2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#include <cudf_test/base_fixture.hpp>
Expand All @@ -14,6 +14,8 @@
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <random>

class GatherTestStr : public cudf::test::BaseFixture {};

TEST_F(GatherTestStr, StringColumn)
Expand Down Expand Up @@ -145,3 +147,56 @@ TEST_F(GatherTestStr, GatherZeroSizeStringsColumn)
cudf::get_current_device_resource_ref());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, results->get_column(0).view());
}

TEST_F(GatherTestStr, GatherRandomStringsColumn)
{
constexpr int num_total_strings = 512;
constexpr int num_gathered_strings = 128;

std::mt19937 rng(12345);
std::uniform_int_distribution<int> len_dist(0, 20);
std::uniform_int_distribution<int> ch_dist(97, 122); // 'a'..'z'

// Generate random strings
std::vector<std::string> host_strings;
host_strings.reserve(num_total_strings);
for (int i = 0; i < num_total_strings; ++i) {
int len = len_dist(rng);
std::string s;
s.reserve(len);
for (int j = 0; j < len; ++j) {
s.push_back(static_cast<char>(ch_dist(rng)));
}
host_strings.push_back(std::move(s));
}

std::vector<char const*> h_ptrs;
h_ptrs.reserve(num_total_strings);
for (auto& s : host_strings) {
h_ptrs.push_back(s.c_str());
}

cudf::test::strings_column_wrapper strings(h_ptrs.begin(), h_ptrs.end());
cudf::table_view source_table({strings});

// Generate random string indices to gather
std::uniform_int_distribution<int> idx_dist(0, num_total_strings - 1);
std::vector<int32_t> h_map;
h_map.reserve(num_gathered_strings);
for (int i = 0; i < num_gathered_strings; ++i) {
h_map.push_back(static_cast<int32_t>(idx_dist(rng)));
}

// Gather strings
cudf::test::fixed_width_column_wrapper<int32_t> gather_map(h_map.begin(), h_map.end());
auto result = cudf::gather(source_table, gather_map);

std::vector<char const*> h_expected;
h_expected.reserve(num_gathered_strings);
for (auto idx : h_map) {
h_expected.push_back(h_ptrs[static_cast<size_t>(idx)]);
}
cudf::test::strings_column_wrapper expected(h_expected.begin(), h_expected.end());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(result->view().column(0), expected);
}