diff --git a/eqpalg/algs/exp_times.cc b/eqpalg/algs/exp_times.cc index a04bc57..a94025f 100644 --- a/eqpalg/algs/exp_times.cc +++ b/eqpalg/algs/exp_times.cc @@ -1,9 +1,58 @@ #include #include +#include #include +#include #include #include extern ProcessType glob_process_type; + +namespace { + +/// 持久化累积次数/时间到 DB2——可在任意线程调用(不依赖 ExpTimes 实例) +int persist_exp_times(const std::string &rule_id, int exp_type, + int64_t now_times, double now_used_time) { + try { + T_RULE_SAMPLE_1D trs1a; + auto query_maybe = exec( + select(trs1a.Flag(), trs1a.Count(), trs1a.X1()) + .from(trs1a) + .where(trs1a.RuleId() == rule_id)); + if (query_maybe.is_nothing()) { + return -1; + } + auto &query_list = query_maybe.unsafe_get_just(); + if (query_list.empty()) { + if (exp_type == ExpType::OccTimesAcc) { + exec(insert_into(trs1a).set( + trs1a.RuleId() = rule_id, trs1a.X1() = 1, + trs1a.Count() = now_times, trs1a.Flag() = exp_type)); + } else { + exec(insert_into(trs1a).set( + trs1a.RuleId() = rule_id, trs1a.X1() = now_used_time, + trs1a.Count() = 1, trs1a.Flag() = exp_type)); + } + } else { + if (exp_type == ExpType::OccTimesAcc) { + exec(update(trs1a) + .set(trs1a.Count() = now_times, trs1a.X1() = 2, + trs1a.Flag() = exp_type) + .where(trs1a.RuleId() == rule_id)); + } else { + exec(update(trs1a) + .set(trs1a.X1() = now_used_time, + trs1a.Count() = 2, + trs1a.Flag() = exp_type) + .where(trs1a.RuleId() == rule_id)); + } + } + return 0; + } catch (const std::exception &e) { + return -1; + } +} + +} // namespace ExpTimes::ExpTimes(const string &name, const mix_cc::json &rule_json, const string &ruleId, size_t exp_type) : ExpBase(name, rule_json, ruleId, exp_type) { @@ -250,53 +299,31 @@ int ExpTimes::get_history_times() { int ExpTimes::update_history_times() { auto now_times = this->rule_stat_.shear_times; auto now_used_time = this->rule_stat_.running_time; - this->logger_->Debug() << "-----update db2------" + this->logger_->Debug() << "-----submit db2------" << " act_started_:" << std::to_string(this->act_started_) << ",now_times:" << now_times << ",now_used_time:" << now_used_time << std::endl; - int ret = -1; - try { - int hist_ret = get_history_times(); - if (hist_ret == -1) { - logger_->Error() << "get_history_times() db2 查询失败,跳过本次持久化" - << std::endl; - } else if (hist_ret == -2) { - logger_->Debug() << "首次存入!" << endl; - this->insert_history_times(now_times, now_used_time); - ret = 0; + + // 投递到后台线程——mon 20ms 周期不阻塞 + std::string rule_id = this->rule_id_; + int exp_type = static_cast(this->exp_type_); + std::string rule_name = this->rule_name_; + + AsyncDbWorker::instance().submit(rule_id, [=]() { + LOG logger("AsyncDb-" + rule_name, AUTO_CATCH_PID); + logger.Debug() << "persist start, now_times:" << now_times + << ", now_used_time:" << now_used_time << std::endl; + if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) { + SingletonTemp::GetInstance().update_static(rule_id, now_times, + now_used_time); + logger.Debug() << "persist done" << std::endl; } else { - T_RULE_SAMPLE_1D trs1a; - if (exp_type_ == ExpType::OccTimesAcc) { - this->rule_stat_.current_value = now_times; - exec(update(trs1a) - .set(trs1a.Count() = now_times, trs1a.X1() = 2, - trs1a.Flag() = this->exp_type_) - .where(trs1a.RuleId() == this->rule_id_)); - logger_->Debug() << "update_history_times(),update,now_times:" - << now_times << endl; - } else if (exp_type_ == ExpType::HoldTimeAcc) { - this->rule_stat_.current_value = now_used_time; - exec(update(trs1a) - .set(trs1a.X1() = now_used_time, - trs1a.Count() = 2, - trs1a.Flag() = this->exp_type_) - .where(trs1a.RuleId() == this->rule_id_)); - logger_->Debug() << "update_history_times(),update,now_used_time:" - << now_used_time << endl; - } - ret = 0; + logger.Error() << "persist failed" << std::endl; } - if (ret == 0) { - SingletonTemp::GetInstance().update_static(this->rule_id_, true); - } - } catch (const std::exception &e) { - logger_->Error() << "update_history_times() db2 操作异常: " << e.what() - << std::endl; - } - this->rule_stat_.shear_times = now_times; - this->rule_stat_.running_time = now_used_time; - return ret; + }); + + return 0; } int ExpTimes::insert_history_times(int64_t now_times, double now_used_time) { @@ -320,8 +347,14 @@ void ExpTimes::reset_dev_data() { last_load_time_ = now_time_; this->rule_stat_.shear_times = 0; this->rule_stat_.running_time = 0; - this->update_history_times(); - SingletonTemp::GetInstance().update_static(this->rule_id_, true); + // SHM 立即更新(不依赖 DB 读回,值已知为 0) + SingletonTemp::GetInstance().update_static(this->rule_id_, 0, 0.0); + // DB 持久化异步投递 + std::string rule_id = this->rule_id_; + int exp_type = static_cast(this->exp_type_); + AsyncDbWorker::instance().submit(rule_id, [=]() { + persist_exp_times(rule_id, exp_type, 0, 0.0); + }); logger_->Debug() << rule_name_ << ":ExpTimes::reset_dev_data()" << endl; this->wait_flag_ = 0; this->act_started_ = false; diff --git a/eqpalg/eqpalg_icei.cpp b/eqpalg/eqpalg_icei.cpp index 07865ac..9ad70df 100644 --- a/eqpalg/eqpalg_icei.cpp +++ b/eqpalg/eqpalg_icei.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -93,6 +94,10 @@ EqpAlgICEI::~EqpAlgICEI() { mem_cached_thread_->join(); } } + // 先停掉所有算法线程,确保不再有新的 DB 任务投递 + alg_mgr_.reset(); + // 排空并停止后台 DB worker + AsyncDbWorker::instance().drain_and_stop(); if (glob_process_type == ProcessType::kTask) { // TaskData 已改用动态 vector,进程退出时自然析构,无需 rm -rf } diff --git a/eqpalg/utility/async_db_worker.cc b/eqpalg/utility/async_db_worker.cc new file mode 100644 index 0000000..41723a7 --- /dev/null +++ b/eqpalg/utility/async_db_worker.cc @@ -0,0 +1,64 @@ +#include "async_db_worker.h" + +AsyncDbWorker &AsyncDbWorker::instance() { + static AsyncDbWorker inst; + return inst; +} + +AsyncDbWorker::AsyncDbWorker() { + worker_ = std::make_unique(&AsyncDbWorker::loop, this); +} + +AsyncDbWorker::~AsyncDbWorker() { + if (running_) { + drain_and_stop(); + } +} + +void AsyncDbWorker::submit(const std::string &rule_id, + std::function task) { + { + std::lock_guard guard(mtx_); + pending_[rule_id] = std::move(task); // 去重 + } + cv_.notify_one(); +} + +void AsyncDbWorker::drain_and_stop() { + running_ = false; + cv_.notify_all(); + if (worker_ && worker_->joinable()) { + worker_->join(); + } +} + +void AsyncDbWorker::loop() { + while (running_) { + std::function task; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this]() { return !pending_.empty() || !running_; }); + if (!running_ && pending_.empty()) break; + if (!pending_.empty()) { + auto it = pending_.begin(); + task = std::move(it->second); + pending_.erase(it); + } + } + if (task) { + task(); + } + } + // 排空剩余任务 + while (true) { + std::function task; + { + std::lock_guard guard(mtx_); + if (pending_.empty()) break; + auto it = pending_.begin(); + task = std::move(it->second); + pending_.erase(it); + } + if (task) task(); + } +} diff --git a/eqpalg/utility/async_db_worker.h b/eqpalg/utility/async_db_worker.h new file mode 100644 index 0000000..6b8bbd3 --- /dev/null +++ b/eqpalg/utility/async_db_worker.h @@ -0,0 +1,44 @@ +#pragma once +/** + * @file async_db_worker.h + * @brief 全局单例后台线程,异步执行 DB2 写入,避免阻塞 mon 20ms 周期 + * + * 去重机制:同一 rule_id 重复 submit 时,旧任务被覆盖(只保留最新快照)。 + * 进程退出时必须在其他单例析构前调用 drain_and_stop()。 + * + * @author your name (you@domain.com) + * @version 0.1 + * @date 2026-05-13 + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +class AsyncDbWorker { + public: + static AsyncDbWorker &instance(); + + /// 提交任务;同 rule_id 覆盖旧任务(去重) + void submit(const std::string &rule_id, std::function task); + + /// 排空队列并停止线程(进程退出前调用) + void drain_and_stop(); + + private: + AsyncDbWorker(); + ~AsyncDbWorker(); + + std::unique_ptr worker_; + std::mutex mtx_; + std::condition_variable cv_; + std::unordered_map> pending_; + std::atomic running_{true}; + + void loop(); +}; diff --git a/eqpalg/utility/eqp_stat.cc b/eqpalg/utility/eqp_stat.cc index 9b795c8..9962455 100644 --- a/eqpalg/utility/eqp_stat.cc +++ b/eqpalg/utility/eqp_stat.cc @@ -163,6 +163,25 @@ bool EqpStat::update_static(std::string ruleid, bool is_times) { } } +bool EqpStat::update_static(const std::string &ruleid, int64_t shear_times, + double running_time) { + try { + rule_stat_cold.alarm_times = select_alarm_by_ruleid(ruleid); + rule_stat_cold.dev_coder = select_dev_coder_by_ruleid(ruleid).c_str(); + rule_stat_cold.running_time = running_time; + rule_stat_cold.shear_times = shear_times; + rule_stat_cold.last_alarm_time = + select_latest_alarm_by_ruleid(ruleid).c_str(); + mapRuleStat.update_static_fields(ruleid, rule_stat_cold); + display_cache_.update_static(ruleid, rule_stat_cold); + return true; + } catch (const std::exception &e) { + logger_->Error() << "EqpStat::update_static(skip-db) ERROR! " << e.what() + << std::endl; + return false; + } +} + bool EqpStat::update_static() { try { bool res = true; diff --git a/eqpalg/utility/eqp_stat.h b/eqpalg/utility/eqp_stat.h index 47349ef..6d7b75b 100644 --- a/eqpalg/utility/eqp_stat.h +++ b/eqpalg/utility/eqp_stat.h @@ -78,6 +78,9 @@ public: /// 写静态数据到共享内存 + 更新本地展示缓存(cron/ExpTimes 调用) bool update_static(std::string ruleid, bool is_times); + /// 同上,但 running_time / shear_times 由调用方直接传入,跳过 DB 查询 + bool update_static(const std::string &ruleid, int64_t shear_times, + double running_time); bool update_static(); /// mon 攒样本到共享内存