73 _dm { std::move(dm) },
85 _dm { std::move(other._dm) },
89 PooledDataMapper& operator=(PooledDataMapper
const&) =
delete;
90 PooledDataMapper& operator=(PooledDataMapper&&) =
delete;
91 ~PooledDataMapper() noexcept
112 void ReturnToPool() noexcept
114 _pool.Return(std::move(_dm));
118 std::unique_ptr<DataMapper> _dm;
133 static void DropAsyncBackend(DataMapper& dm)
noexcept
135 dm.Connection().DisableAsync();
139 void Return(std::unique_ptr<DataMapper> dm)
noexcept
140 requires(Config.growthStrategy == GrowthStrategy::UnboundedGrow)
142 DropAsyncBackend(*dm);
143 std::scoped_lock lock(_mutex);
144 _idleDataMappers.push_back(std::move(dm));
149 void Return(std::unique_ptr<DataMapper> dm)
noexcept
150 requires(Config.growthStrategy == GrowthStrategy::BoundedWait)
152 DropAsyncBackend(*dm);
153 std::shared_ptr<WaiterNode> toResume;
155 std::scoped_lock
const lock(_mutex);
156 toResume = ReturnLocked(std::move(dm));
160 toResume->resume->Resume(toResume->handle);
170 std::shared_ptr<WaiterNode> ReturnLocked(std::unique_ptr<DataMapper> dm)
noexcept
171 requires(Config.growthStrategy == GrowthStrategy::BoundedWait)
173 while (!_waiters.empty())
175 auto node = _waiters.front();
176 _waiters.pop_front();
179 if (node->state != WaiterNode::State::Parked)
181 node->state = WaiterNode::State::Fulfilled;
182 node->mapper = std::move(dm);
183 if (node->kind == WaiterNode::Kind::Async)
185 node->cv.notify_one();
188 _idleDataMappers.push_back(std::move(dm));
194 void Return(std::unique_ptr<DataMapper> dm)
noexcept
195 requires(Config.growthStrategy == GrowthStrategy::BoundedOverflow)
197 DropAsyncBackend(*dm);
198 std::scoped_lock lock(_mutex);
199 if (_idleDataMappers.size() < Config.maxSize)
200 _idleDataMappers.push_back(std::move(dm));
208 _idleDataMappers.reserve(Config.initialSize);
209 for ([[maybe_unused]]
auto const _: std::views::iota(0U, Config.initialSize))
210 _idleDataMappers.push_back(std::make_unique<DataMapper>());
222 if (!_waiters.empty())
224 "Pool destroyed while acquirers are still waiting on it (coroutines parked in AcquireAsync "
225 "and/or threads blocked in Acquire); the pool must outlive every acquirer (drive each "
226 "AcquireAsync task to completion or destroy it first, and never destroy the pool while a "
227 "thread is blocked in Acquire). This is undefined behavior.");
228 assert(_waiters.empty() &&
"Pool destroyed while acquirers are still waiting on it");
232 Pool& operator=(
Pool const&) =
delete;
240 requires(Config.growthStrategy == GrowthStrategy::BoundedWait)
242 std::unique_lock lock(_mutex);
243 if (!_idleDataMappers.empty())
246 auto dm = std::move(_idleDataMappers.back());
247 _idleDataMappers.pop_back();
251 if (_checkedOut < Config.maxSize)
260 auto node = std::make_shared<WaiterNode>(WaiterNode::Kind::Sync);
261 _waiters.push_back(node);
262 node->cv.wait(lock, [&node] {
return node->state == WaiterNode::State::Fulfilled; });
270 requires(Config.growthStrategy != GrowthStrategy::BoundedWait)
272 std::scoped_lock lock(_mutex);
273 if (_idleDataMappers.empty())
280 auto dm = std::move(_idleDataMappers.back());
281 _idleDataMappers.pop_back();
298 return AcquireAsyncImpl(&dbWorkers, &resume);
317 _asyncDbWorkers = &dbWorkers;
318 _asyncResume = &resume;
331 if (!_asyncDbWorkers || !_asyncResume)
332 throw std::logic_error {
333 "Pool::AcquireAsync(): no async executors configured; call Pool::SetAsyncExecutors(...) first "
334 "or use the explicit AcquireAsync(dbWorkers, resume) overload."
336 return AcquireAsyncImpl(_asyncDbWorkers, _asyncResume);
339#if defined(BUILD_TESTS)
340 [[nodiscard]]
size_t IdleCount() noexcept
342 std::scoped_lock lock(_mutex);
343 return _idleDataMappers.size();
348 [[nodiscard]]
size_t WaiterCount() noexcept
350 std::scoped_lock lock(_mutex);
351 return _waiters.size();
366 enum class Kind : std::uint8_t
373 enum class State : std::uint8_t
381 State state = State::Parked;
382 std::unique_ptr<DataMapper> mapper {};
385 std::coroutine_handle<> handle {};
386 Async::IResumeScheduler* resume =
nullptr;
390 std::condition_variable cv {};
392 explicit WaiterNode(Kind nodeKind)
noexcept:
403 struct AsyncAcquireAwaitable
406 Async::IResumeScheduler& resume;
407 std::unique_ptr<DataMapper> acquired {};
408 std::shared_ptr<WaiterNode> node {};
410 AsyncAcquireAwaitable(
Pool& poolRef, Async::IResumeScheduler& resumeRef)
noexcept:
416 AsyncAcquireAwaitable(AsyncAcquireAwaitable
const&) =
delete;
417 AsyncAcquireAwaitable& operator=(AsyncAcquireAwaitable
const&) =
delete;
418 AsyncAcquireAwaitable(AsyncAcquireAwaitable&&) =
delete;
419 AsyncAcquireAwaitable& operator=(AsyncAcquireAwaitable&&) =
delete;
432 ~AsyncAcquireAwaitable()
436 std::shared_ptr<WaiterNode> toResume;
438 std::scoped_lock
const lock(pool._mutex);
441 case WaiterNode::State::Parked:
444 node->state = WaiterNode::State::Abandoned;
445 std::erase(pool._waiters, node);
447 case WaiterNode::State::Fulfilled:
450 node->state = WaiterNode::State::Abandoned;
451 if constexpr (Config.growthStrategy == GrowthStrategy::BoundedWait)
454 toResume = pool.ReturnLocked(std::move(node->mapper));
457 case WaiterNode::State::Abandoned:
462 toResume->resume->Resume(toResume->handle);
465 [[nodiscard]]
bool await_ready() const noexcept
470 bool await_suspend(std::coroutine_handle<> handle)
472 std::scoped_lock
const lock(pool._mutex);
473 if (!pool._idleDataMappers.empty())
475 acquired = std::move(pool._idleDataMappers.back());
476 pool._idleDataMappers.pop_back();
477 if constexpr (Config.growthStrategy == GrowthStrategy::BoundedWait)
484 if constexpr (Config.growthStrategy == GrowthStrategy::BoundedWait)
486 if (pool._checkedOut >= Config.maxSize)
488 node = std::make_shared<WaiterNode>(WaiterNode::Kind::Async);
489 node->handle = handle;
490 node->resume = &resume;
491 pool._waiters.push_back(node);
496 acquired = std::make_unique<DataMapper>();
500 std::unique_ptr<DataMapper> await_resume() noexcept
506 return std::move(node->mapper);
507 return std::move(acquired);
511 Async::Task<PooledDataMapper> AcquireAsyncImpl(Async::IExecutor* dbWorkers, Async::IResumeScheduler* resume)
513 auto dm =
co_await AsyncAcquireAwaitable { *
this, *resume };
517 auto pooled = PooledDataMapper(*
this, std::move(dm));
518 pooled->Connection().EnableAsync(*dbWorkers, *resume);
519 co_return std::move(pooled);
523 std::vector<std::unique_ptr<DataMapper>> _idleDataMappers;
524 size_t _checkedOut {};
527 Async::IExecutor* _asyncDbWorkers =
nullptr;
528 Async::IResumeScheduler* _asyncResume =
nullptr;
531 std::deque<std::shared_ptr<WaiterNode>> _waiters;