Lightweight 0.20260617.0
Loading...
Searching...
No Matches
Sender.hpp
1// SPDX-License-Identifier: Apache-2.0
2#pragma once
3
4// Opt-in bridge: adapt a Lightweight Async::Task<T> into an NVIDIA stdexec sender so library
5// coroutines can flow into std::execution (P2300) sender/receiver pipelines (then, let_value,
6// when_all, sync_wait, ...).
7//
8// This is the ONLY public Lightweight header that includes the stdexec headers directly. It is
9// deliberately NOT pulled in by <Lightweight/Lightweight.hpp> nor by the C++20 module
10// (Lightweight.cppm), so the core library stays stdexec-free behind its pimpl. Include this header
11// only in translation units that already depend on stdexec, and link STDEXEC::stdexec yourself
12// (an installed Lightweight links stdexec PRIVATE and does not re-export it). See docs/async.md.
13
14#include "CancellationToken.hpp"
15#include "Task.hpp"
16
17#include <coroutine>
18#include <exception>
19#include <stdexcept>
20#include <type_traits>
21#include <utility>
22
23#include <stdexec/execution.hpp>
24
25namespace Lightweight::Async
26{
27
28namespace detail
29{
30
31 template <typename Receiver, typename T>
32 class SenderDriver;
33
34 /// Final-suspension awaiter for the sender driver coroutine: delivers the captured outcome to the
35 /// connected receiver once the driver has fully suspended (mirroring @c SyncWaitFinalAwaiter so the
36 /// completion is published from a fully-suspended frame, never re-entrantly mid-body).
37 struct SenderFinalAwaiter
38 {
39 [[nodiscard]] bool await_ready() const noexcept
40 {
41 return false;
42 }
43
44 template <typename Promise>
45 void await_suspend(std::coroutine_handle<Promise> coro) const noexcept
46 {
47 coro.promise().Complete();
48 }
49
50 void await_resume() const noexcept {}
51 };
52
53 /// Promise base for the sender driver coroutine, parameterized on the connected receiver type.
54 ///
55 /// The driver coroutine (see @c MakeSenderDriver) simply @c co_await s the user's Task; this
56 /// promise captures the produced value or exception and, at @c final_suspend, routes it to the
57 /// receiver via stdexec's completion channels:
58 /// - normal value -> @c stdexec::set_value(receiver, value...)
59 /// - @c OperationCancelledError -> @c stdexec::set_stopped(receiver) (parity with the library's
60 /// cooperative cancellation contract, see @ref CancellationToken.hpp)
61 /// - any other exception -> @c stdexec::set_error(receiver, std::exception_ptr)
62 ///
63 /// @tparam Receiver The connected stdexec receiver type.
64 template <typename Receiver>
65 class SenderPromiseBase
66 {
67 public:
68 std::suspend_always initial_suspend() noexcept
69 {
70 return {};
71 }
72
73 SenderFinalAwaiter final_suspend() noexcept
74 {
75 return {};
76 }
77
78 void unhandled_exception() noexcept
79 {
80 _error = std::current_exception();
81 }
82
83 /// Binds the receiver the captured outcome is delivered to. Called by the operation state
84 /// before the driver is started.
85 /// @param receiver The connected receiver (stored by pointer; outlives the operation).
86 void BindReceiver(Receiver& receiver) noexcept
87 {
88 _receiver = &receiver;
89 }
90
91 /// If the Task completed by throwing, delivers that exception to the receiver's stopped
92 /// (cancellation) or error channel and returns @c true; otherwise leaves the receiver untouched
93 /// and returns @c false so the derived promise can deliver its value.
94 /// @return @c true if an exception was delivered, @c false if the Task produced a value.
95 [[nodiscard]] bool TryDeliverError() noexcept
96 {
97 if (!_error)
98 return false;
99 try
100 {
101 std::rethrow_exception(_error);
102 }
103 catch (OperationCancelledError const&)
104 {
105 stdexec::set_stopped(std::move(*_receiver));
106 }
107 catch (...)
108 {
109 stdexec::set_error(std::move(*_receiver), std::current_exception());
110 }
111 return true;
112 }
113
114 /// @return The bound receiver (valid once @ref BindReceiver has been called).
115 [[nodiscard]] Receiver& BoundReceiver() noexcept
116 {
117 return *_receiver;
118 }
119
120 private:
121 Receiver* _receiver = nullptr;
122 std::exception_ptr _error {};
123 };
124
125 /// Sender driver promise for a non-void Task result.
126 ///
127 /// Value/exception storage is delegated to @ref CoroutineResult (the same primitive @c TaskPromise
128 /// uses), so the value channel is fed from @c CoroutineResult::Take rather than a bare
129 /// @c std::optional access — keeping the delivery path free of unchecked-optional reads.
130 template <typename Receiver, typename T>
131 class SenderPromise final: public SenderPromiseBase<Receiver>
132 {
133 public:
134 SenderDriver<Receiver, T> get_return_object() noexcept;
135
136 void return_value(T&& value) noexcept(std::is_nothrow_move_constructible_v<T>)
137 {
138 _result.SetValue(std::move(value));
139 }
140
141 void return_value(T const& value)
142 {
143 _result.SetValue(value);
144 }
145
146 /// Delivers the captured outcome to the receiver (called from @c final_suspend).
147 void Complete() noexcept
148 {
149 if (!this->TryDeliverError())
150 stdexec::set_value(std::move(this->BoundReceiver()), _result.Take());
151 }
152
153 private:
154 CoroutineResult<T> _result;
155 };
156
157 /// Sender driver promise for a @c Task<void> result.
158 template <typename Receiver>
159 class SenderPromise<Receiver, void> final: public SenderPromiseBase<Receiver>
160 {
161 public:
162 SenderDriver<Receiver, void> get_return_object() noexcept;
163
164 void return_void() noexcept {}
165
166 /// Delivers the captured outcome to the receiver (called from @c final_suspend).
167 void Complete() noexcept
168 {
169 if (!this->TryDeliverError())
170 stdexec::set_value(std::move(this->BoundReceiver()));
171 }
172 };
173
174 /// RAII owner of the sender driver coroutine. Lives inside the stdexec operation state.
175 template <typename Receiver, typename T>
176 class SenderDriver
177 {
178 public:
179 using promise_type = SenderPromise<Receiver, T>;
180 using Handle = std::coroutine_handle<promise_type>;
181
182 explicit SenderDriver(Handle handle) noexcept:
183 _handle { handle }
184 {
185 }
186
187 SenderDriver(SenderDriver&& other) noexcept:
188 _handle { std::exchange(other._handle, {}) }
189 {
190 }
191
192 SenderDriver(SenderDriver const&) = delete;
193 SenderDriver& operator=(SenderDriver const&) = delete;
194 SenderDriver& operator=(SenderDriver&&) = delete;
195
196 ~SenderDriver()
197 {
198 if (_handle)
199 _handle.destroy();
200 }
201
202 /// Binds the receiver and resumes the driver, kicking off the Task. Lazy until called.
203 void Start(Receiver& receiver)
204 {
205 _handle.promise().BindReceiver(receiver);
206 _handle.resume();
207 }
208
209 private:
210 Handle _handle;
211 };
212
213 template <typename Receiver, typename T>
214 SenderDriver<Receiver, T> SenderPromise<Receiver, T>::get_return_object() noexcept
215 {
216 return SenderDriver<Receiver, T> { std::coroutine_handle<SenderPromise<Receiver, T>>::from_promise(*this) };
217 }
218
219 template <typename Receiver>
220 SenderDriver<Receiver, void> SenderPromise<Receiver, void>::get_return_object() noexcept
221 {
222 return SenderDriver<Receiver, void> { std::coroutine_handle<SenderPromise<Receiver, void>>::from_promise(*this) };
223 }
224
225 /// The driver coroutine: awaits the user's Task; its promise captures the outcome and delivers it
226 /// to the receiver at @c final_suspend.
227 template <typename Receiver, typename T>
228 SenderDriver<Receiver, T> MakeSenderDriver(Task<T> task)
229 {
230 if constexpr (std::is_void_v<T>)
231 co_await std::move(task);
232 else
233 co_return co_await std::move(task);
234 }
235
236 /// stdexec operation state owning the driver coroutine and the receiver. Created by @c connect.
237 template <typename Receiver, typename T>
238 class TaskOperationState
239 {
240 public:
241 TaskOperationState(Task<T> task, Receiver receiver):
242 _driver { MakeSenderDriver<Receiver, T>(std::move(task)) },
243 _receiver { std::move(receiver) }
244 {
245 }
246
247 TaskOperationState(TaskOperationState const&) = delete;
248 TaskOperationState& operator=(TaskOperationState const&) = delete;
249 TaskOperationState(TaskOperationState&&) = delete;
250 TaskOperationState& operator=(TaskOperationState&&) = delete;
251 ~TaskOperationState() = default;
252
253 /// stdexec start customization: kicks off the Task. The operation state (and therefore the
254 /// driver frame and receiver) is guaranteed by stdexec to outlive the asynchronous operation.
255 void start() & noexcept
256 {
257 _driver.Start(_receiver);
258 }
259
260 private:
261 SenderDriver<Receiver, T> _driver;
262 Receiver _receiver;
263 };
264
265 /// Completion signatures for a @ref TaskSender of value type @c T: a value channel carrying @c T,
266 /// an error channel carrying @c std::exception_ptr, and a stopped channel for cancelled tasks.
267 /// Specialized for @c void because @c set_value_t(void) is ill-formed (a function type may not
268 /// have a @c void parameter); the void value channel carries no argument.
269 template <typename T>
270 struct TaskCompletionSignatures
271 {
272 using type = stdexec::completion_signatures<stdexec::set_value_t(T),
273 stdexec::set_error_t(std::exception_ptr),
274 stdexec::set_stopped_t()>;
275 };
276
277 template <>
278 struct TaskCompletionSignatures<void>
279 {
280 using type = stdexec::completion_signatures<stdexec::set_value_t(),
281 stdexec::set_error_t(std::exception_ptr),
282 stdexec::set_stopped_t()>;
283 };
284
285 /// stdexec sender wrapping a lazy @ref Task. @c connect builds a @ref TaskOperationState.
286 template <typename T>
287 class TaskSender
288 {
289 public:
290 using sender_concept = stdexec::sender_t;
291
292 using completion_signatures = TaskCompletionSignatures<T>::type;
293
294 explicit TaskSender(Task<T> task) noexcept:
295 _task { std::move(task) }
296 {
297 }
298
299 /// stdexec connect customization: pairs the wrapped Task with @p receiver into an operation
300 /// state. Consumes the sender (the Task is move-only), so connect is rvalue-qualified.
301 /// @param receiver The receiver to deliver the Task's completion to.
302 /// @return The operation state to be @c start ed.
303 template <typename Receiver>
304 TaskOperationState<std::decay_t<Receiver>, T> connect(Receiver&& receiver) &&
305 {
306 return TaskOperationState<std::decay_t<Receiver>, T> { std::move(_task), std::forward<Receiver>(receiver) };
307 }
308
309 private:
310 Task<T> _task;
311 };
312
313} // namespace detail
314
315/// Adapts a lazy @ref Task into an stdexec sender, so a Lightweight coroutine result can flow into
316/// any @c std::execution (P2300) sender pipeline — @c stdexec::then, @c let_value, @c when_all,
317/// @c sync_wait, etc.
318///
319/// The Task remains lazy: it does not start until the resulting sender is connected and started by a
320/// receiver (e.g. by @c stdexec::sync_wait or an enclosing sender). On completion the Task's outcome
321/// is mapped onto stdexec's completion channels:
322/// - a produced value completes the value channel (@c set_value);
323/// - a thrown @ref OperationCancelledError completes the stopped channel (@c set_stopped), matching
324/// the library's cooperative-cancellation contract;
325/// - any other exception completes the error channel (@c set_error) carrying a @c std::exception_ptr.
326///
327/// @tparam T The Task's result type (@c void is supported).
328/// @param task The Task to adapt (consumed — it is move-only).
329/// @return An stdexec sender that, when started, drives @p task and reports its result.
330template <typename T>
331[[nodiscard]] auto AsSender(Task<T> task) noexcept
332{
333 return detail::TaskSender<T> { std::move(task) };
334}
335
336} // namespace Lightweight::Async