Skip to content

Feature/cuda reduce #1064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
21c7d35
SAVE WORK SQUASH COMMIT
ZelboK Aug 23, 2023
f909b9d
save
ZelboK Aug 25, 2023
17c4592
getting closer
ericniebler Aug 29, 2023
71cd446
save work before merge main squash
ZelboK Sep 1, 2023
201b0ab
Revert "save work before merge main squash"
ZelboK Sep 1, 2023
217e5d1
save before change branch squash later pls
ZelboK Sep 1, 2023
6c10550
Revert "save before change branch squash later pls"
ZelboK Sep 1, 2023
e6a37e3
Merge branch 'main' into feature/cuda_reduce
ZelboK Sep 1, 2023
ae4297a
commit before chage branch squash later
ZelboK Sep 1, 2023
2aa2d9a
push upwards for second opinions
ZelboK Sep 4, 2023
1e517a3
remove noise
ZelboK Sep 4, 2023
b024e09
demonstrate recursive sender transformation
ericniebler Sep 4, 2023
6a6aafa
add test for static_thread_pool bulk concurrency; cleanup
ericniebler Sep 5, 2023
fcd7747
__reconstitute is just make_sender, rename apply_sender
ericniebler Sep 5, 2023
5fb2849
Merge remote-tracking branch 'origin/main' into feature/cuda_reduce
ericniebler Sep 5, 2023
f0dc533
Merge branch 'on-redux' into feature/cuda_reduce
ericniebler Sep 11, 2023
b94bd4d
Merge remote-tracking branch 'origin/main' into feature/cuda_reduce
ericniebler Sep 15, 2023
8722f36
fix cycle in type system in stream scheduler concepts
ericniebler Sep 15, 2023
8350038
Merge pull request #1 from ericniebler/feature/cuda_reduce
ZelboK Sep 15, 2023
76f863d
Merge remote-tracking branch 'origin/main' into tmp_cuda_reduce
ericniebler Sep 23, 2023
b240e36
Merge remote-tracking branch 'origin/main' into HEAD
ericniebler Sep 25, 2023
6dce716
Merge branch 'main' into HEAD
ericniebler Sep 30, 2023
d97b5df
Merge remote-tracking branch 'origin/main' into cuda_reduce
ericniebler Oct 9, 2023
82657c4
Merge remote-tracking branch 'origin/main' into cuda_reduce
ericniebler Oct 18, 2023
6f55541
merge main
ZelboK Oct 22, 2023
b5a09b8
Merge branch 'main' of github.com:NVIDIA/stdexec into feature/cuda_re…
trxcllnt Jan 8, 2024
7931851
update type names
trxcllnt Jan 8, 2024
298cae0
use int instead of float
trxcllnt Jan 8, 2024
bdd9307
clean up include
trxcllnt Jan 9, 2024
a73f622
Merge branch 'main' of github.com:NVIDIA/stdexec into feature/cuda_re…
trxcllnt Jan 9, 2024
8e62305
update aws-actions/configure-aws-credentials version
trxcllnt Jan 9, 2024
ed370d6
update aws-actions/configure-aws-credentials version
trxcllnt Jan 9, 2024
200fa34
change CI CPU workflow trigger
trxcllnt Jan 9, 2024
7ec74dd
Merge branch 'main' into feature/cuda_reduce
trxcllnt Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ target_compile_options(stdexec_executable_flags INTERFACE
# Template backtrace limit
target_compile_options(stdexec_executable_flags INTERFACE
$<$<OR:$<CXX_COMPILER_ID:Clang>,$<CXX_COMPILER_ID:AppleClang>>:
$<$<STREQUAL:${CMAKE_CXX_COMPILER_FRONTEND_VARIANT},MSVC>:/clang:>-ferror-limit=0
$<$<STREQUAL:${CMAKE_CXX_COMPILER_FRONTEND_VARIANT},MSVC>:/clang:>-ferror-limit=1
$<$<STREQUAL:${CMAKE_CXX_COMPILER_FRONTEND_VARIANT},MSVC>:/clang:>-fmacro-backtrace-limit=0
$<$<STREQUAL:${CMAKE_CXX_COMPILER_FRONTEND_VARIANT},MSVC>:/clang:>-ftemplate-backtrace-limit=0>
$<$<AND:$<CXX_COMPILER_ID:NVHPC>,$<VERSION_GREATER:$<CXX_COMPILER_VERSION>,23.3.0>>:
Expand Down
8 changes: 4 additions & 4 deletions examples/nvexec/reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ namespace ex = stdexec;

int main() {
const int n = 2 * 1024;
thrust::device_vector<float> input(n, 1.0f);
float* first = thrust::raw_pointer_cast(input.data());
float* last = thrust::raw_pointer_cast(input.data()) + input.size();
thrust::device_vector<int> input(n, 1);
int* first = thrust::raw_pointer_cast(input.data());
int* last = thrust::raw_pointer_cast(input.data()) + input.size();

nvexec::stream_context stream_ctx{};

auto snd = ex::transfer_just(stream_ctx.get_scheduler(), std::span{first, last})
| nvexec::reduce(42.0f);
| nvexec::reduce(42);

auto [result] = stdexec::sync_wait(std::move(snd)).value();

Expand Down
32 changes: 28 additions & 4 deletions include/nvexec/stream/algorithm_base.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,18 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun {
};
};

template <class SenderId, class InitT, class Fun, class DerivedSender>
// This shouldn't be here. Imo I think algorithm_base should
// have a __data struct that each inheritor is responsible for providing. I put this here to get things to compile.
template <class _InitT, class _Fun>
struct __data {
STDEXEC_ATTRIBUTE((no_unique_address)) _InitT __init_;
STDEXEC_ATTRIBUTE((no_unique_address)) _Fun __fun_;
static constexpr auto __mbrs_ = __mliterals<&__data::__init_, &__data::__fun_>();
};
template <class _InitT, class _Fun>
__data(_InitT, _Fun) -> __data<_InitT, _Fun>;

template <class Tag, class SenderId, class InitT, class Fun, class DerivedSender>
struct sender_t {
struct __t : stream_sender_base {
using Sender = stdexec::__t<SenderId>;
Expand All @@ -104,8 +115,13 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun {
using _set_value_t = typename DerivedSender::template _set_value_t<Range>;

Sender sndr_;
STDEXEC_ATTRIBUTE((no_unique_address)) InitT init_;
STDEXEC_ATTRIBUTE((no_unique_address)) Fun fun_;
// why is this called initT, anyway? If other algorithms will use this in the future im not sure initT is a good name
__data<InitT, Fun> data_;

__t(Sender sndr, InitT init, Fun fun)
: sndr_((Sender&&) sndr)
, data_{(InitT&&) init, (Fun&&) fun} {
}

template <class Self, class Env>
using completion_signatures = //
Expand All @@ -124,7 +140,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun {
(Receiver&&) rcvr,
[&](operation_state_base_t<stdexec::__id<Receiver>>& stream_provider)
-> receiver_t<Receiver> {
return receiver_t<Receiver>(self.init_, self.fun_, stream_provider);
return receiver_t<Receiver>(self.data_.__init_, self.data_.__fun_, stream_provider);
});
}

Expand All @@ -140,3 +156,11 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun {
};
};
}

namespace stdexec::__detail {
// for pretty-printing a stream range algorithm sender
template <class Tag, class SenderId, class InitT, class Fun, class DerivedSender>
extern __mconst<__name_of<DerivedSender>>
__name_of_v<nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun::
sender_t<Tag, SenderId, InitT, Fun, DerivedSender>>;
}
7 changes: 5 additions & 2 deletions include/nvexec/stream/common.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,12 @@ namespace nvexec {
struct context_state_t {
std::pmr::memory_resource* pinned_resource_{nullptr};
std::pmr::memory_resource* managed_resource_{nullptr};
stream_pools_t* stream_pools_;
stream_pools_t* stream_pools_{nullptr};
queue::task_hub_t* hub_{nullptr};
stream_priority priority_;
stream_priority priority_{stream_priority::normal};

// BUGBUG remove me
context_state_t() = default;

context_state_t(
std::pmr::memory_resource* pinned_resource,
Expand Down
201 changes: 176 additions & 25 deletions include/nvexec/stream/reduce.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,32 @@
namespace nvexec {
namespace STDEXEC_STREAM_DETAIL_NS {
namespace reduce_ {
template <class SenderId, class ReceiverId, class InitT, class Fun>

template <class _Receiver>
struct __connect_fn;

template <class _Init, class _Fun>
struct __data {
_Init __init_;
STDEXEC_ATTRIBUTE((no_unique_address)) _Fun __fun_;
static constexpr auto __mbrs_ = __mliterals<&__data::__init_, &__data::__fun_>();
};
template <class _Init, class _Fun>
__data(_Init, _Fun) -> __data<_Init, _Fun>;

template <class SenderId, class ReceiverId, class Init, class Fun>
struct receiver_t
: public __algo_range_init_fun::receiver_t<
SenderId,
ReceiverId,
InitT,
Init,
Fun,
receiver_t<SenderId, ReceiverId, InitT, Fun>> {
receiver_t<SenderId, ReceiverId, Init, Fun>> {
using base = __algo_range_init_fun::
receiver_t<SenderId, ReceiverId, InitT, Fun, receiver_t<SenderId, ReceiverId, InitT, Fun>>;
receiver_t<SenderId, ReceiverId, Init, Fun, receiver_t<SenderId, ReceiverId, Init, Fun>>;

template <class Range>
using result_t = typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>;
using result_t = typename __algo_range_init_fun::binary_invoke_result_t<Range, Init, Fun>;

template <class Range>
static void set_value_impl(base::__t&& self, Range&& range) noexcept {
Expand Down Expand Up @@ -110,42 +123,180 @@ namespace nvexec {
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status));
}
}

receiver_t(__data<Init, Fun>& _data)
: _data_(_data) {
}

__data<Init, Fun>& _data_;
};

template <class SenderId, class InitT, class Fun>
struct sender_t
: public __algo_range_init_fun::
sender_t<SenderId, InitT, Fun, sender_t<SenderId, InitT, Fun>> {
template <class Receiver>
using receiver_t =
stdexec::__t<reduce_::receiver_t< SenderId, stdexec::__id<Receiver>, InitT, Fun>>;
template <class _CvrefSenderId, class _ReceiverId, class _Init, class _Fun>
struct __operation {
using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
using _Receiver = stdexec::__t<_ReceiverId>;
using __receiver_id = receiver_t<_CvrefSender, _ReceiverId, _Init, _Fun>;
using __receiver_t = stdexec::__t<__receiver_id>;

template <class Range>
using _set_value_t = completion_signatures<set_value_t(
::std::add_lvalue_reference_t<
typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>>)>;
struct __t : __immovable {
using __id = __operation;
using __data_t = __data<_Init, _Fun>;

__data<_Init, _Fun> __state_;
_Receiver __rcvr_;
connect_result_t<_CvrefSender, __receiver_t> __op_;

__t(_CvrefSender&& __sndr, _Receiver __rcvr, __data_t __data) //
noexcept(__nothrow_decay_copyable<_Receiver> //
&& __nothrow_decay_copyable<__data_t> //
&& __nothrow_connectable<_CvrefSender, __receiver_t>)
: __state_{(__data_t&&) __data}
, __rcvr_{(_Receiver&&) __rcvr}
, __op_(connect((_CvrefSender&&) __sndr, __receiver_t{&__state_})) {
}

friend void tag_invoke(start_t, __t& __self) noexcept {
start(__self.__op_);
}
};
};

template <class _Receiver>
struct __connect_fn {
_Receiver& __rcvr_;

template <class _Child, class _Data>
using __operation_t = //
__t<__operation<
__cvref_id<_Child>,
__id<_Receiver>,
decltype(_Data::__init_),
decltype(_Data::__fun_)>>;

template <class _Data, class _Child>
auto operator()(__ignore, _Data __data, _Child&& __child) const noexcept(
__nothrow_constructible_from<__operation_t<_Child, _Data>, _Child, _Receiver, _Data>)
-> __operation_t<_Child, _Data> {
return __operation_t<_Child, _Data>{
(_Child&&) __child, (_Receiver&&) __rcvr_, (_Data&&) __data};
}
};
}

struct reduce_t {
template <class Sender, class InitT, class Fun>
using __sender =
stdexec::__t<reduce_::sender_t<stdexec::__id<__decay_t<Sender>>, InitT, Fun>>;
// idk if needed
// #if STDEXEC_FRIENDSHIP_IS_LEXICAL()
// private:
// template <class...>
// friend struct stdexec::__sexpr;
// #endif

template < sender Sender, __movable_value Init, __movable_value Fun = cub::Sum>
auto operator()(Sender&& sndr, Init init, Fun fun) const {
auto __domain = __get_early_domain(sndr);
return __domain.transform_sender(__make_sexpr<reduce_t>(
reduce_::__data{(Init&&) init, (Fun&&) fun}, (Sender&&) sndr));
}

template < sender Sender, __movable_value InitT, __movable_value Fun = cub::Sum>
__sender<Sender, InitT, Fun> operator()(Sender&& sndr, InitT init, Fun fun) const {
return __sender<Sender, InitT, Fun>{{}, (Sender&&) sndr, (InitT&&) init, (Fun&&) fun};
template <sender_expr_for<reduce_t> _Sender>
static auto get_env(const _Sender&) noexcept {
return empty_env{};
}

template <class InitT, class Fun = cub::Sum>
__binder_back<reduce_t, InitT, Fun> operator()(InitT init, Fun fun = {}) const {
template <class _Sender>
static auto get_env(const _Sender&) noexcept {
return empty_env{};
}

struct op {
friend void tag_invoke(start_t, op&) noexcept {
}
};

template <sender_expr_for<reduce_t> _Sender, receiver _Receiver>
//requires SOME CONSTRAINT HERE
static auto connect(_Sender&& __sndr, _Receiver __rcvr) {
return op{}; // return a dummy operation state to see if it compiles
}

template <class Range, class Init, class Fun>
using _set_value_t = completion_signatures<set_value_t(
__algo_range_init_fun::binary_invoke_result_t<Range, Init, Fun>&)>;

template <class _CvrefSender, class _Env, class _Init, class _Fun>
using __completion_signaturesss = //
__try_make_completion_signatures<
_CvrefSender,
_Env,
completion_signatures<set_stopped_t()>,
__mbind_back_q<_set_value_t, _Init, _Fun>>;

template <sender_expr_for<reduce_t> _Sender, class _Env>
static auto get_completion_signatures(_Sender&& __sndr, _Env&& env) {
// what's the relationship(if it exists) between the lambdas types and the lambda types in `stream_domain::transform_sender`
// apply_sender?
return stdexec::apply_sender(
(_Sender&&) __sndr, [&]<class _Data, class _Child>(reduce_t, _Data, _Child&&) {
using _Init = decltype(_Data::__init_);
using _Fun = decltype(_Data::__fun_);
if constexpr (__mvalid<__completion_signaturesss, _Child, _Env, _Init, _Fun>) {
return __completion_signaturesss< _Child, _Env, _Init, _Fun>();
} else if constexpr (__decays_to<_Env, std::execution::__no_env>) {
// not sure i need this
return std::execution::dependent_completion_signatures<std::execution::__no_env>();
} else {
// BUGBUG improve this error message
return __mexception<_WHAT_<"unknown error in nvexec::reduce"__csz>>();
}
STDEXEC_UNREACHABLE();
});
}

using _Sender = __1;
using _Init = __nth_member<0>(__0);
using _Fun = __nth_member<1>(__0);
using __legacy_customizations_t = __types<
tag_invoke_t(
reduce_t,
get_completion_scheduler_t<set_value_t>(get_env_t(_Sender&)),
_Sender,
_Init,
_Fun),
tag_invoke_t(reduce_t, _Sender, _Init, _Fun)>;

template <sender_expr_for<reduce_t> _Sender, receiver _Receiver>
static auto connect(_Sender&& __sndr, _Receiver __rcvr) noexcept(
__nothrow_callable< apply_sender_t, _Sender, reduce_::__connect_fn<_Receiver>>)
-> __call_result_t< apply_sender_t, _Sender, reduce_::__connect_fn<_Receiver>> {
return apply_sender((_Sender&&) __sndr, reduce_::__connect_fn<_Receiver>{__rcvr});
}

template <class Init, class Fun = cub::Sum>
__binder_back<reduce_t, Init, Fun> operator()(Init init, Fun fun = {}) const {
return {
{},
{},
{(InitT&&) init, (Fun&&) fun}
{(Init&&) init, (Fun&&) fun}
};
}
};

namespace reduce_ {
// moved this below so i can use reduce_t as a Tag type to algorithm_base sender
template <class SenderId, class Init, class Fun>
struct sender_t
: public __algo_range_init_fun::
sender_t<reduce_t, SenderId, Init, Fun, sender_t<SenderId, Init, Fun>> {

template <class Range>
using _set_value_t = completion_signatures<set_value_t(
__algo_range_init_fun::binary_invoke_result_t<Range, Init, Fun>&)>;

template <class Receiver>
using receiver_t =
stdexec::__t<reduce_::receiver_t< SenderId, stdexec::__id<Receiver>, Init, Fun>>;
};
}
}

inline constexpr STDEXEC_STREAM_DETAIL_NS::reduce_t reduce{};
Expand Down
Loading