#include AsyncDbWorker &AsyncDbWorker::instance() { static AsyncDbWorker inst; return inst; } AsyncDbWorker::AsyncDbWorker() { worker_ = std::make_unique(&AsyncDbWorker::loop, this); } AsyncDbWorker::~AsyncDbWorker() { if (running_) { drain_and_stop(); } } void AsyncDbWorker::submit(const std::string &rule_id, std::function task) { { std::lock_guard guard(mtx_); pending_[rule_id] = std::move(task); // 去重 } cv_.notify_one(); } void AsyncDbWorker::drain_and_stop() { running_ = false; cv_.notify_all(); if (worker_ && worker_->joinable()) { worker_->join(); } } void AsyncDbWorker::loop() { while (running_) { std::function task; { std::unique_lock lock(mtx_); cv_.wait(lock, [this]() { return !pending_.empty() || !running_; }); if (!running_ && pending_.empty()) break; if (!pending_.empty()) { auto it = pending_.begin(); task = std::move(it->second); pending_.erase(it); } } if (task) { task(); } } // 排空剩余任务 while (true) { std::function task; { std::lock_guard guard(mtx_); if (pending_.empty()) break; auto it = pending_.begin(); task = std::move(it->second); pending_.erase(it); } if (task) task(); } }