Lightweight 0.20260617.0
Loading...
Searching...
No Matches
ExecutorQueues.hpp
1// SPDX-License-Identifier: Apache-2.0
2#pragma once
3
4#include "../Executor.hpp"
5
6#include <condition_variable>
7#include <cstddef>
8#include <deque>
9#include <mutex>
10#include <utility>
11
12namespace Lightweight::Async::detail
13{
14
15/// Single-consumer blocking FIFO of @ref Work items used by @ref ManualExecutor.
16///
17/// @ref ManualExecutor is a @e resume target, not an offloader: the coroutine continuations it
18/// receives run directly on the thread that pumps it (its @c Run / @c Drain / @c RunOne / @c RunUntil
19/// members), never on stdexec. (Blocking work is offloaded elsewhere, to a @ref ThreadPoolExecutor.)
20/// That pump still needs a tiny thread-safe queue a single owning thread can block on
21/// (@ref WaitAndPop), step (@ref TryPop), and be woken from (@ref Wake) when its stop flag flips.
22/// stdexec's @c run_loop cannot express the predicate-gated @c RunUntil pump that @c SyncWaitPumping
23/// relies on, so this stays hand-rolled.
24///
25/// This is deliberately a focused subset of the former shared @c BlockingQueue — no MPMC
26/// completion flag, no strand bookkeeping — keeping exactly the operations the pump needs.
27class PumpQueue
28{
29 public:
30 /// Enqueues @p work and notifies one blocked waiter.
31 /// @param work The work item to enqueue (consumed).
32 void Push(Work work)
33 {
34 {
35 std::scoped_lock const lock(_mutex);
36 _queue.push_back(std::move(work));
37 }
38 _condition.notify_one();
39 }
40
41 /// Pops the front item without blocking.
42 /// @param out Receives the popped item when one is available.
43 /// @return true if an item was popped; false if the queue was empty.
44 bool TryPop(Work& out)
45 {
46 std::scoped_lock const lock(_mutex);
47 if (_queue.empty())
48 return false;
49 out = std::move(_queue.front());
50 _queue.pop_front();
51 return true;
52 }
53
54 /// Blocks until an item is available or @p wake returns true, then pops the front item.
55 ///
56 /// @tparam Wake A predicate callable returning something contextually convertible to bool.
57 /// @param out Receives the popped item when one is available.
58 /// @param wake Extra wake condition, re-checked under the lock on every notification (e.g. a
59 /// stop flag or an external completion predicate). It must be cheap and must not touch
60 /// this queue's lock. The flag it reads must be published before a paired @ref Wake call.
61 /// @return true if an item was popped; false if @p wake fired with the queue empty.
62 template <typename Wake>
63 bool WaitAndPop(Work& out, Wake wake)
64 {
65 std::unique_lock lock(_mutex);
66 _condition.wait(lock, [&] { return !_queue.empty() || wake(); });
67 if (_queue.empty())
68 return false;
69 out = std::move(_queue.front());
70 _queue.pop_front();
71 return true;
72 }
73
74 /// Wakes all blocked waiters so they re-evaluate their wake condition.
75 ///
76 /// Briefly takes the lock before notifying so a waiter that has already evaluated its predicate
77 /// but not yet blocked cannot miss the notification — the flag that waiter reads must be
78 /// published (e.g. an atomic release store) before this call.
79 void Wake()
80 {
81 {
82 std::scoped_lock const lock(_mutex);
83 }
84 _condition.notify_all();
85 }
86
87 /// @return the number of queued items.
88 [[nodiscard]] std::size_t Size() const
89 {
90 std::scoped_lock const lock(_mutex);
91 return _queue.size();
92 }
93
94 private:
95 mutable std::mutex _mutex;
96 std::condition_variable _condition;
97 std::deque<Work> _queue;
98};
99
100/// Serialized-drain FIFO of @ref Work items used by @ref StrandExecutor.
101///
102/// Guards the pending queue and a single "a drain is scheduled/running" flag under @b one lock so
103/// the push-vs-end-drain race is closed without a second mutex: @ref PushAndClaimDrain enqueues and
104/// atomically decides whether the caller must schedule a fresh drain, and @ref PopOrEndDrain pops
105/// the next item or clears the flag when the queue empties. The drain closure runs on the strand's
106/// underlying (now stdexec-backed) executor, so the strand owns no thread of its own.
107class SerialDrainQueue
108{
109 public:
110 /// Enqueues @p work and, if no drain is currently active, claims the drain.
111 ///
112 /// @param work The work item to enqueue (consumed).
113 /// @return true if the caller should schedule a drain (it just transitioned idle -> draining);
114 /// false if a drain is already active and will pick up this item.
115 bool PushAndClaimDrain(Work work)
116 {
117 std::scoped_lock const lock(_mutex);
118 _queue.push_back(std::move(work));
119 if (_draining)
120 return false;
121 _draining = true;
122 return true;
123 }
124
125 /// Pops the next item, or ends the drain when the queue is empty.
126 ///
127 /// @param out Receives the popped item when one is available.
128 /// @return true if an item was popped; false if the queue was empty (the drain flag is then
129 /// cleared under the lock so the next @ref PushAndClaimDrain re-schedules a drain).
130 bool PopOrEndDrain(Work& out)
131 {
132 std::scoped_lock const lock(_mutex);
133 if (_queue.empty())
134 {
135 _draining = false;
136 return false;
137 }
138 out = std::move(_queue.front());
139 _queue.pop_front();
140 return true;
141 }
142
143 private:
144 std::mutex _mutex;
145 std::deque<Work> _queue;
146 bool _draining = false; ///< true while a single drain closure is scheduled/running.
147};
148
149} // namespace Lightweight::Async::detail