Async-ify ExpTimes DB persistence with global singleton worker thread

Add AsyncDbWorker: a persistent background thread with dedup queue that
executes DB2 writes asynchronously, keeping the mon 20ms cycle free of
blocking I/O.

Changes:
- async_db_worker.h/.cc: singleton worker, submit() with rule_id dedup,
  drain_and_stop() for clean shutdown
- eqp_stat.h/.cc: new update_static(ruleid, shear_times, running_time)
  overload that skips redundant DB reads for known values (reduces
  5 SELECTs to 3 per persist cycle)
- exp_times.cc: extract persist_exp_times() as a standalone function,
  update_history_times() snapshots values and submits to worker
  (returns immediately), reset_dev_data() uses direct SHM update
- eqpalg_icei.cpp: alg_mgr_.reset() → drain_and_stop() in destructor
  ensures all algorithm threads are stopped before draining the worker

Risk: re-run cmake .. to pick up the new async_db_worker.cc file.
This commit is contained in:
Huamonarch 2026-05-13 13:32:50 +08:00
parent 6ed178b367
commit f80a917ab7
6 changed files with 211 additions and 43 deletions

View File

@ -1,9 +1,58 @@
#include <eqpalg/algs/exp_times.h>
#include <eqpalg/table_struct/t_rule_sample_1d.h>
#include <eqpalg/utility/async_db_worker.h>
#include <eqpalg/utility/build_alarm_info.h>
#include <eqpalg/utility/eqp_stat.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
extern ProcessType glob_process_type;
namespace {
/// 持久化累积次数/时间到 DB2——可在任意线程调用不依赖 ExpTimes 实例)
int persist_exp_times(const std::string &rule_id, int exp_type,
int64_t now_times, double now_used_time) {
try {
T_RULE_SAMPLE_1D trs1a;
auto query_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
select(trs1a.Flag(), trs1a.Count(), trs1a.X1())
.from(trs1a)
.where(trs1a.RuleId() == rule_id));
if (query_maybe.is_nothing()) {
return -1;
}
auto &query_list = query_maybe.unsafe_get_just();
if (query_list.empty()) {
if (exp_type == ExpType::OccTimesAcc) {
exec<db2_t, size_t>(insert_into(trs1a).set(
trs1a.RuleId() = rule_id, trs1a.X1() = 1,
trs1a.Count() = now_times, trs1a.Flag() = exp_type));
} else {
exec<db2_t, size_t>(insert_into(trs1a).set(
trs1a.RuleId() = rule_id, trs1a.X1() = now_used_time,
trs1a.Count() = 1, trs1a.Flag() = exp_type));
}
} else {
if (exp_type == ExpType::OccTimesAcc) {
exec<db2_t, size_t>(update(trs1a)
.set(trs1a.Count() = now_times, trs1a.X1() = 2,
trs1a.Flag() = exp_type)
.where(trs1a.RuleId() == rule_id));
} else {
exec<db2_t, size_t>(update(trs1a)
.set(trs1a.X1() = now_used_time,
trs1a.Count() = 2,
trs1a.Flag() = exp_type)
.where(trs1a.RuleId() == rule_id));
}
}
return 0;
} catch (const std::exception &e) {
return -1;
}
}
} // namespace
ExpTimes::ExpTimes(const string &name, const mix_cc::json &rule_json,
const string &ruleId, size_t exp_type)
: ExpBase(name, rule_json, ruleId, exp_type) {
@ -250,53 +299,31 @@ int ExpTimes::get_history_times() {
int ExpTimes::update_history_times() {
auto now_times = this->rule_stat_.shear_times;
auto now_used_time = this->rule_stat_.running_time;
this->logger_->Debug() << "-----update db2------"
this->logger_->Debug() << "-----submit db2------"
<< " act_started_:"
<< std::to_string(this->act_started_)
<< ",now_times:" << now_times
<< ",now_used_time:" << now_used_time << std::endl;
int ret = -1;
try {
int hist_ret = get_history_times();
if (hist_ret == -1) {
logger_->Error() << "get_history_times() db2 查询失败,跳过本次持久化"
<< std::endl;
} else if (hist_ret == -2) {
logger_->Debug() << "首次存入!" << endl;
this->insert_history_times(now_times, now_used_time);
ret = 0;
// 投递到后台线程——mon 20ms 周期不阻塞
std::string rule_id = this->rule_id_;
int exp_type = static_cast<int>(this->exp_type_);
std::string rule_name = this->rule_name_;
AsyncDbWorker::instance().submit(rule_id, [=]() {
LOG logger("AsyncDb-" + rule_name, AUTO_CATCH_PID);
logger.Debug() << "persist start, now_times:" << now_times
<< ", now_used_time:" << now_used_time << std::endl;
if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) {
SingletonTemp<EqpStat>::GetInstance().update_static(rule_id, now_times,
now_used_time);
logger.Debug() << "persist done" << std::endl;
} else {
T_RULE_SAMPLE_1D trs1a;
if (exp_type_ == ExpType::OccTimesAcc) {
this->rule_stat_.current_value = now_times;
exec<db2_t, size_t>(update(trs1a)
.set(trs1a.Count() = now_times, trs1a.X1() = 2,
trs1a.Flag() = this->exp_type_)
.where(trs1a.RuleId() == this->rule_id_));
logger_->Debug() << "update_history_times(),update,now_times:"
<< now_times << endl;
} else if (exp_type_ == ExpType::HoldTimeAcc) {
this->rule_stat_.current_value = now_used_time;
exec<db2_t, size_t>(update(trs1a)
.set(trs1a.X1() = now_used_time,
trs1a.Count() = 2,
trs1a.Flag() = this->exp_type_)
.where(trs1a.RuleId() == this->rule_id_));
logger_->Debug() << "update_history_times(),update,now_used_time:"
<< now_used_time << endl;
logger.Error() << "persist failed" << std::endl;
}
ret = 0;
}
if (ret == 0) {
SingletonTemp<EqpStat>::GetInstance().update_static(this->rule_id_, true);
}
} catch (const std::exception &e) {
logger_->Error() << "update_history_times() db2 操作异常: " << e.what()
<< std::endl;
}
this->rule_stat_.shear_times = now_times;
this->rule_stat_.running_time = now_used_time;
return ret;
});
return 0;
}
int ExpTimes::insert_history_times(int64_t now_times, double now_used_time) {
@ -320,8 +347,14 @@ void ExpTimes::reset_dev_data() {
last_load_time_ = now_time_;
this->rule_stat_.shear_times = 0;
this->rule_stat_.running_time = 0;
this->update_history_times();
SingletonTemp<EqpStat>::GetInstance().update_static(this->rule_id_, true);
// SHM 立即更新(不依赖 DB 读回,值已知为 0
SingletonTemp<EqpStat>::GetInstance().update_static(this->rule_id_, 0, 0.0);
// DB 持久化异步投递
std::string rule_id = this->rule_id_;
int exp_type = static_cast<int>(this->exp_type_);
AsyncDbWorker::instance().submit(rule_id, [=]() {
persist_exp_times(rule_id, exp_type, 0, 0.0);
});
logger_->Debug() << rule_name_ << ":ExpTimes::reset_dev_data()" << endl;
this->wait_flag_ = 0;
this->act_started_ = false;

View File

@ -3,6 +3,7 @@
#include <dao/DbStandardDBAX.h>
#include <eqpalg/define/public.h>
#include <eqpalg/eqpalg_icei.h>
#include <eqpalg/utility/async_db_worker.h>
#include <eqpalg/utility/eqp_stat.h>
#include <log4cplus/LOG.h>
#include <shm/SingletonTemp.hpp>
@ -93,6 +94,10 @@ EqpAlgICEI::~EqpAlgICEI() {
mem_cached_thread_->join();
}
}
// 先停掉所有算法线程,确保不再有新的 DB 任务投递
alg_mgr_.reset();
// 排空并停止后台 DB worker
AsyncDbWorker::instance().drain_and_stop();
if (glob_process_type == ProcessType::kTask) {
// TaskData 已改用动态 vector进程退出时自然析构无需 rm -rf
}

View File

@ -0,0 +1,64 @@
#include "async_db_worker.h"
AsyncDbWorker &AsyncDbWorker::instance() {
static AsyncDbWorker inst;
return inst;
}
AsyncDbWorker::AsyncDbWorker() {
worker_ = std::make_unique<std::thread>(&AsyncDbWorker::loop, this);
}
AsyncDbWorker::~AsyncDbWorker() {
if (running_) {
drain_and_stop();
}
}
void AsyncDbWorker::submit(const std::string &rule_id,
std::function<void()> task) {
{
std::lock_guard<std::mutex> guard(mtx_);
pending_[rule_id] = std::move(task); // 去重
}
cv_.notify_one();
}
void AsyncDbWorker::drain_and_stop() {
running_ = false;
cv_.notify_all();
if (worker_ && worker_->joinable()) {
worker_->join();
}
}
void AsyncDbWorker::loop() {
while (running_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() { return !pending_.empty() || !running_; });
if (!running_ && pending_.empty()) break;
if (!pending_.empty()) {
auto it = pending_.begin();
task = std::move(it->second);
pending_.erase(it);
}
}
if (task) {
task();
}
}
// 排空剩余任务
while (true) {
std::function<void()> task;
{
std::lock_guard<std::mutex> guard(mtx_);
if (pending_.empty()) break;
auto it = pending_.begin();
task = std::move(it->second);
pending_.erase(it);
}
if (task) task();
}
}

View File

@ -0,0 +1,44 @@
#pragma once
/**
* @file async_db_worker.h
* @brief 线 DB2 mon 20ms
*
* rule_id submit
* 退 drain_and_stop()
*
* @author your name (you@domain.com)
* @version 0.1
* @date 2026-05-13
*/
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
class AsyncDbWorker {
public:
static AsyncDbWorker &instance();
/// 提交任务;同 rule_id 覆盖旧任务(去重)
void submit(const std::string &rule_id, std::function<void()> task);
/// 排空队列并停止线程(进程退出前调用)
void drain_and_stop();
private:
AsyncDbWorker();
~AsyncDbWorker();
std::unique_ptr<std::thread> worker_;
std::mutex mtx_;
std::condition_variable cv_;
std::unordered_map<std::string, std::function<void()>> pending_;
std::atomic<bool> running_{true};
void loop();
};

View File

@ -163,6 +163,25 @@ bool EqpStat::update_static(std::string ruleid, bool is_times) {
}
}
bool EqpStat::update_static(const std::string &ruleid, int64_t shear_times,
double running_time) {
try {
rule_stat_cold.alarm_times = select_alarm_by_ruleid(ruleid);
rule_stat_cold.dev_coder = select_dev_coder_by_ruleid(ruleid).c_str();
rule_stat_cold.running_time = running_time;
rule_stat_cold.shear_times = shear_times;
rule_stat_cold.last_alarm_time =
select_latest_alarm_by_ruleid(ruleid).c_str();
mapRuleStat.update_static_fields(ruleid, rule_stat_cold);
display_cache_.update_static(ruleid, rule_stat_cold);
return true;
} catch (const std::exception &e) {
logger_->Error() << "EqpStat::update_static(skip-db) ERROR! " << e.what()
<< std::endl;
return false;
}
}
bool EqpStat::update_static() {
try {
bool res = true;

View File

@ -78,6 +78,9 @@ public:
/// 写静态数据到共享内存 + 更新本地展示缓存cron/ExpTimes 调用)
bool update_static(std::string ruleid, bool is_times);
/// 同上,但 running_time / shear_times 由调用方直接传入,跳过 DB 查询
bool update_static(const std::string &ruleid, int64_t shear_times,
double running_time);
bool update_static();
/// mon 攒样本到共享内存