diff --git a/CMakeLists.txt b/CMakeLists.txt index 0ec0ecec6..c16fda19e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -197,7 +197,7 @@ target_compile_options(stdexec_executable_flags INTERFACE # Template backtrace limit target_compile_options(stdexec_executable_flags INTERFACE $<$,$>: - $<$:/clang:>-ferror-limit=0 + $<$:/clang:>-ferror-limit=1 $<$:/clang:>-fmacro-backtrace-limit=0 $<$:/clang:>-ftemplate-backtrace-limit=0> $<$,$,23.3.0>>: diff --git a/examples/nvexec/reduce.cpp b/examples/nvexec/reduce.cpp index a6255e276..5d959cca1 100644 --- a/examples/nvexec/reduce.cpp +++ b/examples/nvexec/reduce.cpp @@ -26,14 +26,14 @@ namespace ex = stdexec; int main() { const int n = 2 * 1024; - thrust::device_vector input(n, 1.0f); - float* first = thrust::raw_pointer_cast(input.data()); - float* last = thrust::raw_pointer_cast(input.data()) + input.size(); + thrust::device_vector input(n, 1); + int* first = thrust::raw_pointer_cast(input.data()); + int* last = thrust::raw_pointer_cast(input.data()) + input.size(); nvexec::stream_context stream_ctx{}; auto snd = ex::transfer_just(stream_ctx.get_scheduler(), std::span{first, last}) - | nvexec::reduce(42.0f); + | nvexec::reduce(42); auto [result] = stdexec::sync_wait(std::move(snd)).value(); diff --git a/include/nvexec/stream/algorithm_base.cuh b/include/nvexec/stream/algorithm_base.cuh index d08ca312e..d4b30ba04 100644 --- a/include/nvexec/stream/algorithm_base.cuh +++ b/include/nvexec/stream/algorithm_base.cuh @@ -91,7 +91,18 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun { }; }; - template + // This shouldn't be here. Imo I think algorithm_base should + // have a __data struct that each inheritor is responsible for providing. I put this here to get things to compile. + template + struct __data { + STDEXEC_ATTRIBUTE((no_unique_address)) _InitT __init_; + STDEXEC_ATTRIBUTE((no_unique_address)) _Fun __fun_; + static constexpr auto __mbrs_ = __mliterals<&__data::__init_, &__data::__fun_>(); + }; + template + __data(_InitT, _Fun) -> __data<_InitT, _Fun>; + + template struct sender_t { struct __t : stream_sender_base { using Sender = stdexec::__t; @@ -104,8 +115,13 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun { using _set_value_t = typename DerivedSender::template _set_value_t; Sender sndr_; - STDEXEC_ATTRIBUTE((no_unique_address)) InitT init_; - STDEXEC_ATTRIBUTE((no_unique_address)) Fun fun_; + // why is this called initT, anyway? If other algorithms will use this in the future im not sure initT is a good name + __data data_; + + __t(Sender sndr, InitT init, Fun fun) + : sndr_((Sender&&) sndr) + , data_{(InitT&&) init, (Fun&&) fun} { + } template using completion_signatures = // @@ -124,7 +140,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun { (Receiver&&) rcvr, [&](operation_state_base_t>& stream_provider) -> receiver_t { - return receiver_t(self.init_, self.fun_, stream_provider); + return receiver_t(self.data_.__init_, self.data_.__fun_, stream_provider); }); } @@ -140,3 +156,11 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun { }; }; } + +namespace stdexec::__detail { + // for pretty-printing a stream range algorithm sender + template + extern __mconst<__name_of> + __name_of_v>; +} diff --git a/include/nvexec/stream/common.cuh b/include/nvexec/stream/common.cuh index 4bec0037c..14d8a02fe 100644 --- a/include/nvexec/stream/common.cuh +++ b/include/nvexec/stream/common.cuh @@ -168,9 +168,12 @@ namespace nvexec { struct context_state_t { std::pmr::memory_resource* pinned_resource_{nullptr}; std::pmr::memory_resource* managed_resource_{nullptr}; - stream_pools_t* stream_pools_; + stream_pools_t* stream_pools_{nullptr}; queue::task_hub_t* hub_{nullptr}; - stream_priority priority_; + stream_priority priority_{stream_priority::normal}; + + // BUGBUG remove me + context_state_t() = default; context_state_t( std::pmr::memory_resource* pinned_resource, diff --git a/include/nvexec/stream/reduce.cuh b/include/nvexec/stream/reduce.cuh index 49fb51df4..dd337199a 100644 --- a/include/nvexec/stream/reduce.cuh +++ b/include/nvexec/stream/reduce.cuh @@ -30,19 +30,32 @@ namespace nvexec { namespace STDEXEC_STREAM_DETAIL_NS { namespace reduce_ { - template + + template + struct __connect_fn; + + template + struct __data { + _Init __init_; + STDEXEC_ATTRIBUTE((no_unique_address)) _Fun __fun_; + static constexpr auto __mbrs_ = __mliterals<&__data::__init_, &__data::__fun_>(); + }; + template + __data(_Init, _Fun) -> __data<_Init, _Fun>; + + template struct receiver_t : public __algo_range_init_fun::receiver_t< SenderId, ReceiverId, - InitT, + Init, Fun, - receiver_t> { + receiver_t> { using base = __algo_range_init_fun:: - receiver_t>; + receiver_t>; template - using result_t = typename __algo_range_init_fun::binary_invoke_result_t; + using result_t = typename __algo_range_init_fun::binary_invoke_result_t; template static void set_value_impl(base::__t&& self, Range&& range) noexcept { @@ -110,42 +123,180 @@ namespace nvexec { self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); } } + + receiver_t(__data& _data) + : _data_(_data) { + } + + __data& _data_; }; - template - struct sender_t - : public __algo_range_init_fun:: - sender_t> { - template - using receiver_t = - stdexec::__t, InitT, Fun>>; + template + struct __operation { + using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>; + using _Receiver = stdexec::__t<_ReceiverId>; + using __receiver_id = receiver_t<_CvrefSender, _ReceiverId, _Init, _Fun>; + using __receiver_t = stdexec::__t<__receiver_id>; - template - using _set_value_t = completion_signatures>)>; + struct __t : __immovable { + using __id = __operation; + using __data_t = __data<_Init, _Fun>; + + __data<_Init, _Fun> __state_; + _Receiver __rcvr_; + connect_result_t<_CvrefSender, __receiver_t> __op_; + + __t(_CvrefSender&& __sndr, _Receiver __rcvr, __data_t __data) // + noexcept(__nothrow_decay_copyable<_Receiver> // + && __nothrow_decay_copyable<__data_t> // + && __nothrow_connectable<_CvrefSender, __receiver_t>) + : __state_{(__data_t&&) __data} + , __rcvr_{(_Receiver&&) __rcvr} + , __op_(connect((_CvrefSender&&) __sndr, __receiver_t{&__state_})) { + } + + friend void tag_invoke(start_t, __t& __self) noexcept { + start(__self.__op_); + } + }; + }; + + template + struct __connect_fn { + _Receiver& __rcvr_; + + template + using __operation_t = // + __t<__operation< + __cvref_id<_Child>, + __id<_Receiver>, + decltype(_Data::__init_), + decltype(_Data::__fun_)>>; + + template + auto operator()(__ignore, _Data __data, _Child&& __child) const noexcept( + __nothrow_constructible_from<__operation_t<_Child, _Data>, _Child, _Receiver, _Data>) + -> __operation_t<_Child, _Data> { + return __operation_t<_Child, _Data>{ + (_Child&&) __child, (_Receiver&&) __rcvr_, (_Data&&) __data}; + } }; } struct reduce_t { - template - using __sender = - stdexec::__t>, InitT, Fun>>; + // idk if needed + // #if STDEXEC_FRIENDSHIP_IS_LEXICAL() + // private: + // template + // friend struct stdexec::__sexpr; + // #endif + + template < sender Sender, __movable_value Init, __movable_value Fun = cub::Sum> + auto operator()(Sender&& sndr, Init init, Fun fun) const { + auto __domain = __get_early_domain(sndr); + return __domain.transform_sender(__make_sexpr( + reduce_::__data{(Init&&) init, (Fun&&) fun}, (Sender&&) sndr)); + } - template < sender Sender, __movable_value InitT, __movable_value Fun = cub::Sum> - __sender operator()(Sender&& sndr, InitT init, Fun fun) const { - return __sender{{}, (Sender&&) sndr, (InitT&&) init, (Fun&&) fun}; + template _Sender> + static auto get_env(const _Sender&) noexcept { + return empty_env{}; } - template - __binder_back operator()(InitT init, Fun fun = {}) const { + template + static auto get_env(const _Sender&) noexcept { + return empty_env{}; + } + + struct op { + friend void tag_invoke(start_t, op&) noexcept { + } + }; + + template _Sender, receiver _Receiver> + //requires SOME CONSTRAINT HERE + static auto connect(_Sender&& __sndr, _Receiver __rcvr) { + return op{}; // return a dummy operation state to see if it compiles + } + + template + using _set_value_t = completion_signatures&)>; + + template + using __completion_signaturesss = // + __try_make_completion_signatures< + _CvrefSender, + _Env, + completion_signatures, + __mbind_back_q<_set_value_t, _Init, _Fun>>; + + template _Sender, class _Env> + static auto get_completion_signatures(_Sender&& __sndr, _Env&& env) { + // what's the relationship(if it exists) between the lambdas types and the lambda types in `stream_domain::transform_sender` + // apply_sender? + return stdexec::apply_sender( + (_Sender&&) __sndr, [&](reduce_t, _Data, _Child&&) { + using _Init = decltype(_Data::__init_); + using _Fun = decltype(_Data::__fun_); + if constexpr (__mvalid<__completion_signaturesss, _Child, _Env, _Init, _Fun>) { + return __completion_signaturesss< _Child, _Env, _Init, _Fun>(); + } else if constexpr (__decays_to<_Env, std::execution::__no_env>) { + // not sure i need this + return std::execution::dependent_completion_signatures(); + } else { + // BUGBUG improve this error message + return __mexception<_WHAT_<"unknown error in nvexec::reduce"__csz>>(); + } + STDEXEC_UNREACHABLE(); + }); + } + + using _Sender = __1; + using _Init = __nth_member<0>(__0); + using _Fun = __nth_member<1>(__0); + using __legacy_customizations_t = __types< + tag_invoke_t( + reduce_t, + get_completion_scheduler_t(get_env_t(_Sender&)), + _Sender, + _Init, + _Fun), + tag_invoke_t(reduce_t, _Sender, _Init, _Fun)>; + + template _Sender, receiver _Receiver> + static auto connect(_Sender&& __sndr, _Receiver __rcvr) noexcept( + __nothrow_callable< apply_sender_t, _Sender, reduce_::__connect_fn<_Receiver>>) + -> __call_result_t< apply_sender_t, _Sender, reduce_::__connect_fn<_Receiver>> { + return apply_sender((_Sender&&) __sndr, reduce_::__connect_fn<_Receiver>{__rcvr}); + } + + template + __binder_back operator()(Init init, Fun fun = {}) const { return { {}, {}, - {(InitT&&) init, (Fun&&) fun} + {(Init&&) init, (Fun&&) fun} }; } }; + + namespace reduce_ { + // moved this below so i can use reduce_t as a Tag type to algorithm_base sender + template + struct sender_t + : public __algo_range_init_fun:: + sender_t> { + + template + using _set_value_t = completion_signatures&)>; + + template + using receiver_t = + stdexec::__t, Init, Fun>>; + }; + } } inline constexpr STDEXEC_STREAM_DETAIL_NS::reduce_t reduce{}; diff --git a/include/nvexec/stream_context.cuh b/include/nvexec/stream_context.cuh index 56c6d871f..ff727fce7 100644 --- a/include/nvexec/stream_context.cuh +++ b/include/nvexec/stream_context.cuh @@ -46,6 +46,18 @@ namespace nvexec { template using bulk_sender_th = __t>, Shape, Fun>>; + template + using reduce_sender_t = // + stdexec::__t< + reduce_:: + sender_t< stdexec::__id>, InitT, stdexec::__decay_t>>; + + template + requires stdexec::__callable + using reduce_non_throwing = stdexec::__mbool< + stdexec::__nothrow_callable + && noexcept(stdexec::__decayed_tuple(std::declval()...)) >; + template using split_sender_th = __t>>>; @@ -75,6 +87,53 @@ namespace nvexec { template using ensure_started_th = __t>>; + // needed for subsumption purposes + template + concept _non_stream_sender = // + !derived_from<__decay_t, stream_sender_base>; + + struct stream_scheduler; + + template + struct stream_domain { + stream_domain(context_state_t context_state) + : context_state_(context_state) { + } + + // Lazy algorithm customizations require a recursive tree transformation + template + requires _non_stream_sender // no need to transform it a second time + auto transform_sender(Sender&& sndr, const Env& env) const noexcept { + return stdexec::apply_sender( + (Sender&&) sndr, + [&](Tag, Data&& data, Children&&... children) { + return stdexec::transform_sender( + *this, + __make_sexpr( + (Data&&) data, + stdexec::transform_sender(*this, (Children&&) children, env)...) + /*, env*/); // no env here!! + }); + } + + // reduce senders get a special transformation + template Sender, class Env> + requires _non_stream_sender // no need to transform it a second time + auto transform_sender(Sender&& sndr, const Env& env) const noexcept { + return stdexec::apply_sender( + (Sender&&) sndr, + [&](Tag, Data&& data, Child&& child) { + auto [init, fun] = (Data&&) data; + auto next = stdexec::transform_sender(*this, (Child&&) child, env); + return reduce_sender_t( + std::move(next), init, fun); + }); + } + + private: + context_state_t context_state_; + }; + struct stream_scheduler { using __t = stream_scheduler; using __id = stream_scheduler; @@ -121,34 +180,42 @@ namespace nvexec { } }; - struct sender_ { - struct __t : stream_sender_base { - using __id = sender_; - using completion_signatures = - completion_signatures< set_value_t(), set_error_t(cudaError_t)>; - - template - friend auto tag_invoke(connect_t, const __t& self, R&& rec) // - noexcept(__nothrow_constructible_from<__decay_t, R>) - -> operation_state_t>> { - return operation_state_t>>( - (R&&) rec, self.env_.context_state_); - } + struct sender_t : stream_sender_base { + using __t = sender_t; + using __id = sender_t; + using completion_signatures = + completion_signatures< set_value_t(), set_error_t(cudaError_t)>; + + template + friend auto tag_invoke(connect_t, const sender_t& self, R&& rec) // + noexcept(__nothrow_constructible_from<__decay_t, R>) + -> operation_state_t>> { + return operation_state_t>>( + (R&&) rec, self.env_.context_state_); + } - friend const env& tag_invoke(get_env_t, const __t& self) noexcept { - return self.env_; - }; + friend const env& tag_invoke(get_env_t, const sender_t& self) noexcept { + return self.env_; + } - STDEXEC_ATTRIBUTE((host, device)) - inline __t(context_state_t context_state) noexcept - : env_{context_state} { - } + STDEXEC_ATTRIBUTE((host, device)) + inline sender_t(context_state_t context_state) noexcept + : env_{context_state} { + } - env env_; - }; + env env_; }; - using sender_t = stdexec::__t; + // BUGBUG for now + // friend stream_domain + // tag_invoke(get_domain_t, const stream_scheduler& sch) noexcept { + // return stream_domain{sch.context_state_}; + // } + + STDEXEC_ATTRIBUTE((host, device)) + friend inline sender_t tag_invoke(schedule_t, const stream_scheduler& self) noexcept { + return {self.context_state_}; + } template friend schedule_from_sender_th @@ -156,6 +223,18 @@ namespace nvexec { return schedule_from_sender_th(sch.context_state_, (S&&) sndr); } + friend forward_progress_guarantee + tag_invoke(get_forward_progress_guarantee_t, const stream_scheduler&) noexcept { + return forward_progress_guarantee::weakly_parallel; + } + + friend std::true_type tag_invoke( + __has_algorithm_customizations_t, // + const stream_scheduler& self) noexcept { + return {}; + } + + // TODO: convert these to transform_sender member functions template friend bulk_sender_th tag_invoke(bulk_t, const stream_scheduler& sch, S&& sndr, Shape shape, Fn fun) // @@ -163,6 +242,13 @@ namespace nvexec { return bulk_sender_th{{}, (S&&) sndr, shape, (Fn&&) fun}; } + template + friend reduce_sender_t + tag_invoke(reduce_t, const stream_scheduler& sch, S&& sndr, InitT initT, Fn fun) // + noexcept { + return reduce_sender_t((S&&) sndr, initT, (Fn&&) fun); + } + template friend then_sender_th tag_invoke(then_t, const stream_scheduler& sch, S&& sndr, Fn fun) noexcept { @@ -236,24 +322,9 @@ namespace nvexec { return split_sender_th(sch.context_state_, (S&&) sndr); } - STDEXEC_ATTRIBUTE((host, device)) - friend inline sender_t tag_invoke(schedule_t, const stream_scheduler& self) noexcept { - return {self.context_state_}; - } - - friend std::true_type - tag_invoke(__has_algorithm_customizations_t, const stream_scheduler& self) noexcept { - return {}; - } - template - friend auto tag_invoke(sync_wait_t, const stream_scheduler& self, S&& sndr) { - return _sync_wait::sync_wait_t{}(self.context_state_, (S&&) sndr); - } - - friend forward_progress_guarantee - tag_invoke(get_forward_progress_guarantee_t, const stream_scheduler&) noexcept { - return forward_progress_guarantee::weakly_parallel; + friend auto tag_invoke(sync_wait_t, const stream_scheduler& sch, S&& sndr) { + return _sync_wait::sync_wait_t{}(sch.context_state_, (S&&) sndr); } bool operator==(const stream_scheduler& other) const noexcept { @@ -290,17 +361,6 @@ namespace nvexec { into_variant((Senders&&) sndrs)... }; } - - template - upon_error_sender_th tag_invoke(upon_error_t, S&& sndr, Fn fun) noexcept { - return upon_error_sender_th{{}, (S&&) sndr, (Fn&&) fun}; - } - - template - upon_stopped_sender_th tag_invoke(upon_stopped_t, S&& sndr, Fn fun) noexcept { - return upon_stopped_sender_th{{}, (S&&) sndr, (Fn&&) fun}; - } - } // namespace STDEXEC_STREAM_DETAIL_NS using STDEXEC_STREAM_DETAIL_NS::stream_scheduler; diff --git a/include/stdexec/__detail/__basic_sender.hpp b/include/stdexec/__detail/__basic_sender.hpp index 513e4ec84..9ca4bda87 100644 --- a/include/stdexec/__detail/__basic_sender.hpp +++ b/include/stdexec/__detail/__basic_sender.hpp @@ -484,7 +484,8 @@ namespace stdexec { noexcept( __nothrow_callable<__detail::__impl_of<_Sender>, __copy_cvref_fn<_Sender>, _ApplyFn>) // -> __call_result_t<__detail::__impl_of<_Sender>, __copy_cvref_fn<_Sender>, _ApplyFn> { // - return ((_Sender&&) __sndr).__impl_(__copy_cvref_fn<_Sender>(), (_ApplyFn&&) __fun); // + + return ((_Sender&&) __sndr).__impl_(__copy_cvref_fn<_Sender>(), (_ApplyFn&&) __fun); // } template _Self> @@ -634,10 +635,10 @@ namespace stdexec { // The __name_of utility defined below is used to pretty-print the type names of // senders in compiler diagnostics. namespace __detail { - struct __basic_sender_name { + struct __sexpr_name { template using __f = // - __call_result_t<__sexpr_apply_result_t<_Sender, __basic_sender_name>>; + __call_result_t<__sexpr_apply_result_t<_Sender, __sexpr_name>>; template auto operator()(_Tag, _Data&&, _Child&&...) const // @@ -659,7 +660,7 @@ namespace stdexec { extern __mcompose<__cpclr, __name_of_fn<_Sender>> __name_of_v; template - extern __basic_sender_name __name_of_v<__sexpr<_Impl>>; + extern __sexpr_name __name_of_v<__sexpr<_Impl>>; template <__has_id _Sender> requires(!same_as<__id<_Sender>, _Sender>) diff --git a/test/exec/test_any_sender.cpp b/test/exec/test_any_sender.cpp index 83124cab3..ce05b29fb 100644 --- a/test/exec/test_any_sender.cpp +++ b/test/exec/test_any_sender.cpp @@ -723,4 +723,5 @@ namespace { } CHECK(counting_scheduler::count == 0); } + } diff --git a/test/nvexec/CMakeLists.txt b/test/nvexec/CMakeLists.txt index 445fd9314..4a1d89ed1 100644 --- a/test/nvexec/CMakeLists.txt +++ b/test/nvexec/CMakeLists.txt @@ -30,7 +30,7 @@ set(nvexec_test_sources let_value.cpp test_main.cpp then.cpp - reduce.cpp + $<$>:reduce.cpp> split.cpp start_detached.cpp transfer.cpp