Lightweight 0.20260617.0
Loading...
Searching...
No Matches
Async.hpp
1// SPDX-License-Identifier: Apache-2.0
2#pragma once
3
5#include "Executor.hpp"
6#include "Task.hpp"
7
8#include <exception>
9#include <optional>
10#include <stdexcept>
11#include <stop_token>
12#include <type_traits>
13#include <utility>
14#include <variant>
15
16namespace Lightweight::Async
17{
18
19namespace detail
20{
21
22 template <typename F>
23 using OffloadResult = std::invoke_result_t<F&>;
24
25 /// Awaitable that runs a blocking callable on an executor and resumes elsewhere.
26 ///
27 /// Lives in the awaiting coroutine's frame for the duration of the suspension, so the
28 /// worker thread may safely write the result/exception into it before resuming.
29 template <typename F>
30 class OffloadAwaitable
31 {
32 public:
33 using Result = OffloadResult<F>;
34
35 OffloadAwaitable(IExecutor& offload, IResumeScheduler& resume, F fn, std::stop_token token):
36 _offload { offload },
37 _resume { resume },
38 _fn { std::move(fn) },
39 _token { std::move(token) }
40 {
41 }
42
43 OffloadAwaitable(OffloadAwaitable&&) = default;
44 OffloadAwaitable(OffloadAwaitable const&) = delete;
45 OffloadAwaitable& operator=(OffloadAwaitable const&) = delete;
46 OffloadAwaitable& operator=(OffloadAwaitable&&) = delete;
47 ~OffloadAwaitable() = default;
48
49 [[nodiscard]] bool await_ready() const noexcept
50 {
51 return false;
52 }
53
54 void await_suspend(std::coroutine_handle<> awaiting)
55 {
56 // Honor cancellation *before* dispatching to the offload executor, so a pre-cancelled
57 // operation never occupies a DB worker at all (matching the "checked before the step is
58 // dispatched" contract). The cancellation is reported through the resume scheduler exactly
59 // as a normal completion would be, so the awaiting coroutine still resumes on the app thread.
60 if (_token.stop_requested())
61 {
62 _error = std::make_exception_ptr(OperationCancelledError {});
63 _resume.Resume(awaiting);
64 return;
65 }
66 _offload.Post([this, awaiting]() mutable {
67 Run();
68 _resume.Resume(awaiting);
69 });
70 }
71
72 Result await_resume()
73 {
74 if (_error)
75 std::rethrow_exception(_error);
76 if constexpr (!std::is_void_v<Result>)
77 {
78 // Run() always either emplaces _value or stores into _error; reaching here with a
79 // disengaged optional would be a broken invariant. The explicit check both documents
80 // that invariant and lets static analysis see the access as guarded.
81 if (!_value.has_value())
82 throw std::logic_error { "OffloadAwaitable: result missing without an error" };
83 return std::move(*_value);
84 }
85 }
86
87 private:
88 void Run() noexcept
89 {
90 try
91 {
92 if (_token.stop_requested())
93 throw OperationCancelledError {};
94 if constexpr (std::is_void_v<Result>)
95 _fn();
96 else
97 _value.emplace(_fn());
98 }
99 catch (...)
100 {
101 _error = std::current_exception();
102 }
103 }
104
105 IExecutor& _offload;
106 IResumeScheduler& _resume;
107 F _fn;
108 std::stop_token _token;
109 std::conditional_t<std::is_void_v<Result>, std::monostate, std::optional<Result>> _value {};
110 std::exception_ptr _error {};
111 };
112
113 /// Drives a by-value offload awaitable to completion.
114 ///
115 /// Taking the awaitable @b by value (rather than the executors by reference) keeps this
116 /// coroutine free of reference parameters; the executor references live inside the
117 /// awaitable, which is stored in this coroutine's frame for the duration of the await.
118 template <typename Awaitable>
119 Task<typename Awaitable::Result> RunOffloadTask(Awaitable awaitable)
120 {
121 if constexpr (std::is_void_v<typename Awaitable::Result>)
122 co_await awaitable;
123 else
124 co_return co_await awaitable;
125 }
126
127} // namespace detail
128
129/// Offloads a blocking callable to an executor and resumes the awaiting coroutine elsewhere.
130///
131/// @p fn runs on @p offload (typically a connection strand over the DB worker pool). When it
132/// finishes, the awaiting coroutine is resumed via @p resume (typically the app's run loop),
133/// so coroutine logic continues on the app thread while only the blocking call ran on a
134/// worker. Exceptions thrown by @p fn are captured and rethrown on resume (parity with the
135/// synchronous, throwing API). If cancellation is already requested when the work is about to
136/// run, the operation completes with @ref OperationCancelledError.
137///
138/// @tparam F A callable invocable with no arguments.
139/// @param offload The executor to run @p fn on.
140/// @param resume The scheduler used to resume the awaiting coroutine.
141/// @param fn The blocking callable (consumed).
142/// @param token Optional cancellation token (a default-constructed @c std::stop_token is non-cancellable).
143/// @return A Task producing @p fn's result.
144template <typename F>
145[[nodiscard]] Task<detail::OffloadResult<F>> Async(IExecutor& offload,
146 IResumeScheduler& resume,
147 F fn,
148 std::stop_token token = {})
149{
150 return detail::RunOffloadTask(detail::OffloadAwaitable<F> { offload, resume, std::move(fn), std::move(token) });
151}
152
153} // namespace Lightweight::Async