From f80a917ab70eef702b144da36e422bcd10c41967 Mon Sep 17 00:00:00 2001 From: Huamonarch Date: Wed, 13 May 2026 13:32:50 +0800 Subject: [PATCH] Async-ify ExpTimes DB persistence with global singleton worker thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add AsyncDbWorker: a persistent background thread with dedup queue that executes DB2 writes asynchronously, keeping the mon 20ms cycle free of blocking I/O. Changes: - async_db_worker.h/.cc: singleton worker, submit() with rule_id dedup, drain_and_stop() for clean shutdown - eqp_stat.h/.cc: new update_static(ruleid, shear_times, running_time) overload that skips redundant DB reads for known values (reduces 5 SELECTs to 3 per persist cycle) - exp_times.cc: extract persist_exp_times() as a standalone function, update_history_times() snapshots values and submits to worker (returns immediately), reset_dev_data() uses direct SHM update - eqpalg_icei.cpp: alg_mgr_.reset() → drain_and_stop() in destructor ensures all algorithm threads are stopped before draining the worker Risk: re-run cmake .. to pick up the new async_db_worker.cc file. --- eqpalg/algs/exp_times.cc | 119 +++++++++++++++++++----------- eqpalg/eqpalg_icei.cpp | 5 ++ eqpalg/utility/async_db_worker.cc | 64 ++++++++++++++++ eqpalg/utility/async_db_worker.h | 44 +++++++++++ eqpalg/utility/eqp_stat.cc | 19 +++++ eqpalg/utility/eqp_stat.h | 3 + 6 files changed, 211 insertions(+), 43 deletions(-) create mode 100644 eqpalg/utility/async_db_worker.cc create mode 100644 eqpalg/utility/async_db_worker.h diff --git a/eqpalg/algs/exp_times.cc b/eqpalg/algs/exp_times.cc index a04bc57..a94025f 100644 --- a/eqpalg/algs/exp_times.cc +++ b/eqpalg/algs/exp_times.cc @@ -1,9 +1,58 @@ #include #include +#include #include +#include #include #include extern ProcessType glob_process_type; + +namespace { + +/// 持久化累积次数/时间到 DB2——可在任意线程调用(不依赖 ExpTimes 实例) +int persist_exp_times(const std::string &rule_id, int exp_type, + int64_t now_times, double now_used_time) { + try { + T_RULE_SAMPLE_1D trs1a; + auto query_maybe = exec( + select(trs1a.Flag(), trs1a.Count(), trs1a.X1()) + .from(trs1a) + .where(trs1a.RuleId() == rule_id)); + if (query_maybe.is_nothing()) { + return -1; + } + auto &query_list = query_maybe.unsafe_get_just(); + if (query_list.empty()) { + if (exp_type == ExpType::OccTimesAcc) { + exec(insert_into(trs1a).set( + trs1a.RuleId() = rule_id, trs1a.X1() = 1, + trs1a.Count() = now_times, trs1a.Flag() = exp_type)); + } else { + exec(insert_into(trs1a).set( + trs1a.RuleId() = rule_id, trs1a.X1() = now_used_time, + trs1a.Count() = 1, trs1a.Flag() = exp_type)); + } + } else { + if (exp_type == ExpType::OccTimesAcc) { + exec(update(trs1a) + .set(trs1a.Count() = now_times, trs1a.X1() = 2, + trs1a.Flag() = exp_type) + .where(trs1a.RuleId() == rule_id)); + } else { + exec(update(trs1a) + .set(trs1a.X1() = now_used_time, + trs1a.Count() = 2, + trs1a.Flag() = exp_type) + .where(trs1a.RuleId() == rule_id)); + } + } + return 0; + } catch (const std::exception &e) { + return -1; + } +} + +} // namespace ExpTimes::ExpTimes(const string &name, const mix_cc::json &rule_json, const string &ruleId, size_t exp_type) : ExpBase(name, rule_json, ruleId, exp_type) { @@ -250,53 +299,31 @@ int ExpTimes::get_history_times() { int ExpTimes::update_history_times() { auto now_times = this->rule_stat_.shear_times; auto now_used_time = this->rule_stat_.running_time; - this->logger_->Debug() << "-----update db2------" + this->logger_->Debug() << "-----submit db2------" << " act_started_:" << std::to_string(this->act_started_) << ",now_times:" << now_times << ",now_used_time:" << now_used_time << std::endl; - int ret = -1; - try { - int hist_ret = get_history_times(); - if (hist_ret == -1) { - logger_->Error() << "get_history_times() db2 查询失败,跳过本次持久化" - << std::endl; - } else if (hist_ret == -2) { - logger_->Debug() << "首次存入!" << endl; - this->insert_history_times(now_times, now_used_time); - ret = 0; + + // 投递到后台线程——mon 20ms 周期不阻塞 + std::string rule_id = this->rule_id_; + int exp_type = static_cast(this->exp_type_); + std::string rule_name = this->rule_name_; + + AsyncDbWorker::instance().submit(rule_id, [=]() { + LOG logger("AsyncDb-" + rule_name, AUTO_CATCH_PID); + logger.Debug() << "persist start, now_times:" << now_times + << ", now_used_time:" << now_used_time << std::endl; + if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) { + SingletonTemp::GetInstance().update_static(rule_id, now_times, + now_used_time); + logger.Debug() << "persist done" << std::endl; } else { - T_RULE_SAMPLE_1D trs1a; - if (exp_type_ == ExpType::OccTimesAcc) { - this->rule_stat_.current_value = now_times; - exec(update(trs1a) - .set(trs1a.Count() = now_times, trs1a.X1() = 2, - trs1a.Flag() = this->exp_type_) - .where(trs1a.RuleId() == this->rule_id_)); - logger_->Debug() << "update_history_times(),update,now_times:" - << now_times << endl; - } else if (exp_type_ == ExpType::HoldTimeAcc) { - this->rule_stat_.current_value = now_used_time; - exec(update(trs1a) - .set(trs1a.X1() = now_used_time, - trs1a.Count() = 2, - trs1a.Flag() = this->exp_type_) - .where(trs1a.RuleId() == this->rule_id_)); - logger_->Debug() << "update_history_times(),update,now_used_time:" - << now_used_time << endl; - } - ret = 0; + logger.Error() << "persist failed" << std::endl; } - if (ret == 0) { - SingletonTemp::GetInstance().update_static(this->rule_id_, true); - } - } catch (const std::exception &e) { - logger_->Error() << "update_history_times() db2 操作异常: " << e.what() - << std::endl; - } - this->rule_stat_.shear_times = now_times; - this->rule_stat_.running_time = now_used_time; - return ret; + }); + + return 0; } int ExpTimes::insert_history_times(int64_t now_times, double now_used_time) { @@ -320,8 +347,14 @@ void ExpTimes::reset_dev_data() { last_load_time_ = now_time_; this->rule_stat_.shear_times = 0; this->rule_stat_.running_time = 0; - this->update_history_times(); - SingletonTemp::GetInstance().update_static(this->rule_id_, true); + // SHM 立即更新(不依赖 DB 读回,值已知为 0) + SingletonTemp::GetInstance().update_static(this->rule_id_, 0, 0.0); + // DB 持久化异步投递 + std::string rule_id = this->rule_id_; + int exp_type = static_cast(this->exp_type_); + AsyncDbWorker::instance().submit(rule_id, [=]() { + persist_exp_times(rule_id, exp_type, 0, 0.0); + }); logger_->Debug() << rule_name_ << ":ExpTimes::reset_dev_data()" << endl; this->wait_flag_ = 0; this->act_started_ = false; diff --git a/eqpalg/eqpalg_icei.cpp b/eqpalg/eqpalg_icei.cpp index 07865ac..9ad70df 100644 --- a/eqpalg/eqpalg_icei.cpp +++ b/eqpalg/eqpalg_icei.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -93,6 +94,10 @@ EqpAlgICEI::~EqpAlgICEI() { mem_cached_thread_->join(); } } + // 先停掉所有算法线程,确保不再有新的 DB 任务投递 + alg_mgr_.reset(); + // 排空并停止后台 DB worker + AsyncDbWorker::instance().drain_and_stop(); if (glob_process_type == ProcessType::kTask) { // TaskData 已改用动态 vector,进程退出时自然析构,无需 rm -rf } diff --git a/eqpalg/utility/async_db_worker.cc b/eqpalg/utility/async_db_worker.cc new file mode 100644 index 0000000..41723a7 --- /dev/null +++ b/eqpalg/utility/async_db_worker.cc @@ -0,0 +1,64 @@ +#include "async_db_worker.h" + +AsyncDbWorker &AsyncDbWorker::instance() { + static AsyncDbWorker inst; + return inst; +} + +AsyncDbWorker::AsyncDbWorker() { + worker_ = std::make_unique(&AsyncDbWorker::loop, this); +} + +AsyncDbWorker::~AsyncDbWorker() { + if (running_) { + drain_and_stop(); + } +} + +void AsyncDbWorker::submit(const std::string &rule_id, + std::function task) { + { + std::lock_guard guard(mtx_); + pending_[rule_id] = std::move(task); // 去重 + } + cv_.notify_one(); +} + +void AsyncDbWorker::drain_and_stop() { + running_ = false; + cv_.notify_all(); + if (worker_ && worker_->joinable()) { + worker_->join(); + } +} + +void AsyncDbWorker::loop() { + while (running_) { + std::function task; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this]() { return !pending_.empty() || !running_; }); + if (!running_ && pending_.empty()) break; + if (!pending_.empty()) { + auto it = pending_.begin(); + task = std::move(it->second); + pending_.erase(it); + } + } + if (task) { + task(); + } + } + // 排空剩余任务 + while (true) { + std::function task; + { + std::lock_guard guard(mtx_); + if (pending_.empty()) break; + auto it = pending_.begin(); + task = std::move(it->second); + pending_.erase(it); + } + if (task) task(); + } +} diff --git a/eqpalg/utility/async_db_worker.h b/eqpalg/utility/async_db_worker.h new file mode 100644 index 0000000..6b8bbd3 --- /dev/null +++ b/eqpalg/utility/async_db_worker.h @@ -0,0 +1,44 @@ +#pragma once +/** + * @file async_db_worker.h + * @brief 全局单例后台线程,异步执行 DB2 写入,避免阻塞 mon 20ms 周期 + * + * 去重机制:同一 rule_id 重复 submit 时,旧任务被覆盖(只保留最新快照)。 + * 进程退出时必须在其他单例析构前调用 drain_and_stop()。 + * + * @author your name (you@domain.com) + * @version 0.1 + * @date 2026-05-13 + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +class AsyncDbWorker { + public: + static AsyncDbWorker &instance(); + + /// 提交任务;同 rule_id 覆盖旧任务(去重) + void submit(const std::string &rule_id, std::function task); + + /// 排空队列并停止线程(进程退出前调用) + void drain_and_stop(); + + private: + AsyncDbWorker(); + ~AsyncDbWorker(); + + std::unique_ptr worker_; + std::mutex mtx_; + std::condition_variable cv_; + std::unordered_map> pending_; + std::atomic running_{true}; + + void loop(); +}; diff --git a/eqpalg/utility/eqp_stat.cc b/eqpalg/utility/eqp_stat.cc index 9b795c8..9962455 100644 --- a/eqpalg/utility/eqp_stat.cc +++ b/eqpalg/utility/eqp_stat.cc @@ -163,6 +163,25 @@ bool EqpStat::update_static(std::string ruleid, bool is_times) { } } +bool EqpStat::update_static(const std::string &ruleid, int64_t shear_times, + double running_time) { + try { + rule_stat_cold.alarm_times = select_alarm_by_ruleid(ruleid); + rule_stat_cold.dev_coder = select_dev_coder_by_ruleid(ruleid).c_str(); + rule_stat_cold.running_time = running_time; + rule_stat_cold.shear_times = shear_times; + rule_stat_cold.last_alarm_time = + select_latest_alarm_by_ruleid(ruleid).c_str(); + mapRuleStat.update_static_fields(ruleid, rule_stat_cold); + display_cache_.update_static(ruleid, rule_stat_cold); + return true; + } catch (const std::exception &e) { + logger_->Error() << "EqpStat::update_static(skip-db) ERROR! " << e.what() + << std::endl; + return false; + } +} + bool EqpStat::update_static() { try { bool res = true; diff --git a/eqpalg/utility/eqp_stat.h b/eqpalg/utility/eqp_stat.h index 47349ef..6d7b75b 100644 --- a/eqpalg/utility/eqp_stat.h +++ b/eqpalg/utility/eqp_stat.h @@ -78,6 +78,9 @@ public: /// 写静态数据到共享内存 + 更新本地展示缓存(cron/ExpTimes 调用) bool update_static(std::string ruleid, bool is_times); + /// 同上,但 running_time / shear_times 由调用方直接传入,跳过 DB 查询 + bool update_static(const std::string &ruleid, int64_t shear_times, + double running_time); bool update_static(); /// mon 攒样本到共享内存