Skip to content

Commit

Permalink
Add pluggable_sort()
Browse files Browse the repository at this point in the history
  • Loading branch information
alugowski committed Jan 14, 2024
1 parent 036171e commit ec6045b
Show file tree
Hide file tree
Showing 5 changed files with 597 additions and 11 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ All in `std::` namespace.
### Other
* [`poolstl::iota_iter`](include/poolstl/iota_iter.hpp) - Iterate over integers. Same as iterating over output of [`std::iota`](https://en.cppreference.com/w/cpp/algorithm/iota) but without materializing anything. Iterator version of [`std::ranges::iota_view`](https://en.cppreference.com/w/cpp/ranges/iota_view).
* `poolstl::for_each_chunk` - Like `std::for_each`, but explicitly splits the input range into chunks then exposes the chunked parallelism. A user-specified chunk constructor is called for each parallel chunk then its output is passed to each loop iteration. Useful for workloads that need an expensive workspace that can be reused between iterations, but not simultaneously by all iterations in parallel.
* `poolstl::pluggable_sort` - Like `std::sort`, but allows specification of sequential sort and merge methods. To parallelize [pdqsort](https://github.com/orlp/pdqsort): `pluggable_sort(par, v.begin(), v.end(), pdqsort)`.
## Usage
Expand Down
40 changes: 38 additions & 2 deletions include/poolstl/algorithm
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ namespace std {
}

poolstl::internal::parallel_sort(std::forward<ExecPolicy>(policy), first, last, comp,
std::sort<RandIt, Compare>);
std::sort<RandIt, Compare>, std::inplace_merge<RandIt, Compare>);
}

/**
Expand All @@ -249,7 +249,7 @@ namespace std {
}

poolstl::internal::parallel_sort(std::forward<ExecPolicy>(policy), first, last, comp,
std::stable_sort<RandIt, Compare>);
std::stable_sort<RandIt, Compare>, std::inplace_merge<RandIt, Compare>);
}

/**
Expand Down Expand Up @@ -370,6 +370,42 @@ namespace poolstl {
for_each_chunk <RandIt, ChunkConstructor, UnaryFunction>,
(void*)nullptr, 1, construct, f);
}

/**
* NOTE: Iterators are expected to be random access.
*
* Like `std::sort`, but allows specifying the sequential sort and merge methods. These methods must have the
* same signature as the comparator versions of `std::sort` and `std::inplace_merge`, respectively.
*/
template <class ExecPolicy, class RandIt, class Compare>
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, void>
pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp,
void (sort_func)(RandIt, RandIt, Compare) = std::sort,
void (merge_func)(RandIt, RandIt, RandIt, Compare) = std::inplace_merge) {
if (poolstl::internal::is_seq<ExecPolicy>(policy)) {
sort_func(first, last, comp);
return;
}

poolstl::internal::parallel_sort(std::forward<ExecPolicy>(policy), first, last, comp, sort_func, merge_func);
}

/**
* NOTE: Iterators are expected to be random access.
*
* Like `std::sort`, but allows specifying the sequential sort and merge methods. These methods must have the
* same signature as the comparator versions of `std::sort` and `std::inplace_merge`, respectively.
*/
template <class ExecPolicy, class RandIt>
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, void>
pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last,
void (sort_func)(RandIt, RandIt,
std::less<typename std::iterator_traits<RandIt>::value_type>) = std::sort,
void (merge_func)(RandIt, RandIt, RandIt,
std::less<typename std::iterator_traits<RandIt>::value_type>) = std::inplace_merge){
using T = typename std::iterator_traits<RandIt>::value_type;
pluggable_sort(std::forward<ExecPolicy>(policy), first, last, std::less<T>(), sort_func, merge_func);
}
}

#endif
19 changes: 11 additions & 8 deletions include/poolstl/internal/ttp_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,20 @@ namespace poolstl {
/**
* Sort a range in parallel.
*
* @param stable Whether to use std::stable_sort or std::sort
* @param sort_func Sequential sort method, like std::sort or std::stable_sort
* @param merge_func Sequential merge method, like std::inplace_merge
*/
template <class ExecPolicy, class RandIt, class Compare, class SortFunc>
void parallel_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp, SortFunc sortfunc) {
template <class ExecPolicy, class RandIt, class Compare, class SortFunc, class MergeFunc>
void parallel_sort(ExecPolicy &&policy, RandIt first, RandIt last,
Compare comp, SortFunc sort_func, MergeFunc merge_func) {
if (first == last) {
return;
}

// Sort chunks in parallel
auto futures = parallel_chunk_for_gen(std::forward<ExecPolicy>(policy), first, last,
[&comp, sortfunc] (RandIt chunk_first, RandIt chunk_last) {
sortfunc(chunk_first, chunk_last, comp);
[&comp, sort_func] (RandIt chunk_first, RandIt chunk_last) {
sort_func(chunk_first, chunk_last, comp);
return std::make_pair(chunk_first, chunk_last);
});

Expand All @@ -186,9 +188,10 @@ namespace poolstl {
// pair up and merge
auto& lhs = subranges[i];
auto& rhs = subranges[i + 1];
futures.emplace_back(task_pool.submit([&comp] (RandIt chunk_first, RandIt chunk_middle,
RandIt chunk_last) {
std::inplace_merge(chunk_first, chunk_middle, chunk_last, comp);
futures.emplace_back(task_pool.submit([&comp, merge_func] (RandIt chunk_first,
RandIt chunk_middle,
RandIt chunk_last) {
merge_func(chunk_first, chunk_middle, chunk_last, comp);
return std::make_pair(chunk_first, chunk_last);
}, lhs.first, lhs.second, rhs.second));
++i;
Expand Down
16 changes: 15 additions & 1 deletion tests/poolstl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

#include "utils.hpp"

// for testing pluggable sort
#include "thirdparty/pdqsort.h"

namespace ttp = task_thread_pool;
using poolstl::iota_iter;

Expand Down Expand Up @@ -294,7 +297,7 @@ TEST_CASE("sort", "[alg][algorithm]") {
default: break;
}

for (auto which_impl : {0, 1}) {
for (auto which_impl : {0, 1, 2, 3, 4, 5}) {
std::vector<int> dest1(source);
std::vector<int> dest2(source);

Expand All @@ -306,6 +309,17 @@ TEST_CASE("sort", "[alg][algorithm]") {
case 1:
std::sort(poolstl::par.on(pool), dest2.begin(), dest2.end());
break;
case 2:
poolstl::pluggable_sort(poolstl::par_if(false), dest2.begin(), dest2.end(), std::sort, std::inplace_merge);
break;
case 3:
poolstl::pluggable_sort(poolstl::par.on(pool), dest2.begin(), dest2.end(), pdqsort);
break;
case 4:
poolstl::pluggable_sort(poolstl::par.on(pool), dest2.begin(), dest2.end(), std::less<int>(), pdqsort_branchless);
break;
case 5:
poolstl::pluggable_sort(poolstl::par.on(pool), dest2.begin(), dest2.end(), std::less<int>(), pdqsort_branchless, std::inplace_merge);
default:
break;
}
Expand Down
Loading

0 comments on commit ec6045b

Please sign in to comment.