Lightweight 0.20260617.0
Loading...
Searching...
No Matches
BlockingQueue.hpp
1// SPDX-License-Identifier: Apache-2.0
2
3#pragma once
4
5#include <condition_variable>
6#include <cstddef>
7#include <deque>
8#include <mutex>
9#include <utility>
10
11namespace Lightweight::detail
12{
13
14/// Thread-safe blocking FIFO primitive backing @c ThreadSafeQueue.
15///
16/// It encapsulates the `std::mutex + std::condition_variable + std::deque<T>` dance so the
17/// locking/wait logic lives in exactly one place: @ref Push enqueues and wakes a consumer, @ref
18/// WaitAndPop blocks until an item is available or the queue is finished, and @ref MarkFinished
19/// drains-then-stops. (The async executors deliberately do not share this primitive — they own
20/// the smaller, purpose-built queues in @c Async/detail/ExecutorQueues.hpp.)
21///
22/// @tparam T The element type stored in the queue.
23template <typename T>
24class BlockingQueue
25{
26 public:
27 /// Enqueues @p item and notifies one blocked waiter.
28 void Push(T item)
29 {
30 {
31 std::scoped_lock const lock(_mutex);
32 _queue.push_back(std::move(item));
33 }
34 _condition.notify_one();
35 }
36
37 /// Blocks until an item is available or the queue is marked finished and empty, then pops.
38 /// @param out Receives the popped item when one is available.
39 /// @return true if an item was popped; false if the queue is finished and empty.
40 bool WaitAndPop(T& out)
41 {
42 std::unique_lock lock(_mutex);
43 _condition.wait(lock, [&] { return !_queue.empty() || _finished; });
44 if (_queue.empty())
45 return false;
46 out = std::move(_queue.front());
47 _queue.pop_front();
48 return true;
49 }
50
51 /// Signals that no more items will be added; @ref WaitAndPop returns false once the queue is empty.
52 void MarkFinished()
53 {
54 {
55 std::scoped_lock const lock(_mutex);
56 _finished = true;
57 }
58 _condition.notify_all();
59 }
60
61 /// @return true if the queue is currently empty.
62 [[nodiscard]] bool Empty() const
63 {
64 std::scoped_lock const lock(_mutex);
65 return _queue.empty();
66 }
67
68 /// @return the number of queued items.
69 [[nodiscard]] std::size_t Size() const
70 {
71 std::scoped_lock const lock(_mutex);
72 return _queue.size();
73 }
74
75 private:
76 mutable std::mutex _mutex;
77 std::condition_variable _condition;
78 std::deque<T> _queue;
79 bool _finished = false; ///< Set by MarkFinished(); makes WaitAndPop drain-then-stop.
80};
81
82} // namespace Lightweight::detail