From 4f8eecd828ad3333e3f783600e0a61575c63ba5e Mon Sep 17 00:00:00 2001 From: Huamonarch Date: Fri, 15 May 2026 14:21:36 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=8F=90=E5=8F=96=20StatCollector?= =?UTF-8?q?=20=E7=BB=9F=E8=AE=A1=E5=AD=A6=E4=B9=A0=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将 ExpBase::cron_proc() 中的 DAA::STA 统计学习逻辑提取到独立的 StatCollector 工具类,统一管理分布统计的生命周期(样本累积、分布 初始化、DB2 持久化、置信区间更新)。同时将 exec_task() 与 task_mon_pro() 中的任务相关 STA 操作也委托给 StatCollector。 新增: - eqpalg/utility/stat_collector.h -- StatCollector 接口 - eqpalg/utility/stat_collector.cpp -- StatCollector 实现 修改: - eqpalg/algs/exp_base.h -- 替换 sta_ptr_ 为 stat_collector_ - eqpalg/algs/exp_base.cpp -- cron_proc/reload_ci_dist/reset_dev_data/ exec_task/task_mon_pro 委托给 StatCollector --- eqpalg/algs/exp_base.cpp | 96 +++++-------------------- eqpalg/algs/exp_base.h | 5 +- eqpalg/utility/stat_collector.cpp | 112 ++++++++++++++++++++++++++++++ eqpalg/utility/stat_collector.h | 94 +++++++++++++++++++++++++ 4 files changed, 228 insertions(+), 79 deletions(-) create mode 100644 eqpalg/utility/stat_collector.cpp create mode 100644 eqpalg/utility/stat_collector.h diff --git a/eqpalg/algs/exp_base.cpp b/eqpalg/algs/exp_base.cpp index 68e67ca..758fb40 100644 --- a/eqpalg/algs/exp_base.cpp +++ b/eqpalg/algs/exp_base.cpp @@ -49,6 +49,7 @@ int ExpBase::init() { ret += this->reload_config_up_down(); /*7.上下限*/ this->reload_ci_dist(); this->last_load_time_ = chrono::system_clock::now(); + stat_collector_.configure(rule_id_, rule_name_, dist_mode_, is_learning_); } if (exp_type_ == ExpType::BoundHoldTime) { ret += this->reload_config_up_down_hold_time(); /*8.上下限-保持时间*/ @@ -169,10 +170,7 @@ std::vector ExpBase::exec_task(mix_cc::time_range_t time_range) { << mix_cc::mix_time_t(time_range.get_right()).to_formatted_time() << endl; std::string sample_id = this->get_id(time_range); - if (sta_ptr_ == nullptr) { - logger_->Debug() << rule_name_ << " sta_ptr_ == nullptr" << endl; - sta_ptr_ = std::make_unique(rule_id_, rule_name_); - } + stat_collector_.ensureInitialized(); logger_->Debug() << "reset_data()------1" << endl; if ((time_range.get_right() - time_range.get_left()) < @@ -192,7 +190,7 @@ std::vector ExpBase::exec_task(mix_cc::time_range_t time_range) { } } logger_->Debug() << "reset_data()------1" << endl; - this->sta_ptr_->reset_data(); /*参数重置*/ + stat_collector_.resetData(); /*参数重置*/ logger_->Debug() << "reset_data()------2" << endl; logger_->Debug() << "task_seq:" << task_seq << std::endl; auto &data_record = TaskShm::TaskRecordPtr.get() @@ -202,12 +200,12 @@ std::vector ExpBase::exec_task(mix_cc::time_range_t time_range) { logger_->Debug() << "dataSize:" << data_record.size() << std::endl; for (size_t j = 0; j < data_record.size(); j++) { double dataJ = data_record[j]; - this->sta_ptr_->dist_add(dataJ); + stat_collector_.distAdd(dataJ); } logger_->Debug() << "reset_data()------3" << endl; - auto store_res = this->sta_ptr_->task_store_db2(sample_id); + auto store_res = stat_collector_.taskStoreDb2(sample_id); logger_->Debug() << "reset_data()------4" << endl; - this->sample_result_ = this->sta_ptr_->get_sample_stat_str(); + this->sample_result_ = stat_collector_.getSampleStatStr(); this->update_t_sample_mag(store_res); this->alarm_poster_.zmqp_send(912, this->sample_result_); TaskShm::TaskRecordPtr.get()->erase(exp_type_ * 1000 + task_seq); @@ -489,50 +487,13 @@ int ExpBase::cron_proc() { if (exp_type_ == ExpType::Bound || exp_type_ == ExpType::CondBound || exp_type_ == ExpType::BoundHoldTime) { /*只保存有上下限的,2-监控变量-上下限;4-动作反馈-上下限;5-监控变量-上下限-持续*/ - if (sta_ptr_ == nullptr) { - logger_->Debug() << rule_name_ << " sta_ptr_ == nullptr" << endl; - sta_ptr_ = std::make_unique(rule_id_, rule_name_); - this->sta_ptr_->update_ci_dist(); - last_load_time_ = chrono::system_clock::now(); - } - - if (now_time_ - last_load_time_ > hours(CronUpdateDelay)) { - this->sta_ptr_->update_ci_dist(); - last_load_time_ = now_time_; - } this->rule_stat_.stat_values.clear(); SingletonTemp::GetInstance().get_stat_values(this->rule_id_, this->rule_stat_); if (!this->rule_stat_.stat_values.empty()) { - size_data = this->rule_stat_.stat_values.size(); - logger_->Debug() << rule_name_ << ",this->rule_stat_ size:" << size_data - << endl; - if (sta_ptr_->is_init()) { - for (int i = 0; i < size_data; i++) { - sta_ptr_->dist_add(this->rule_stat_.stat_values[i]); - } - } else { - double max_data = - *(std::max_element(this->rule_stat_.stat_values.begin(), - this->rule_stat_.stat_values.end())); - double min_data = - *(std::min_element(this->rule_stat_.stat_values.begin(), - this->rule_stat_.stat_values.end())); - double range = (max_data - min_data) / double(DAA::STA_SIZE_MIN); - logger_->Debug() << "max:" << max_data << ",min:" << min_data - << ",range:" << range << endl; - if (range < 0.1) { - range = 0.1; - } - if (sta_ptr_->init(range, min_data)) { - for (int i = 0; i < size_data; i++) { - sta_ptr_->dist_add(this->rule_stat_.stat_values[i]); - } - } - } - logger_->Debug() << rule_name_ << ",sta_ptr_ size:" << sta_ptr_->size() - << endl; - sta_ptr_->store_db2(); + size_data = stat_collector_.processCron( + this->rule_stat_.stat_values, now_time_, + CronUpdateDelay, last_load_time_); } this->rule_stat_.stat_values.clear(); this->rule_stat_.stat_values.shrink_to_fit(); @@ -803,31 +764,17 @@ int ExpBase::reload_ci_dist() { this->exp_type_ != ExpType::BoundHoldTime) { return 0; } - last_load_time_ = now_time_; - if (this->dist_mode_ != 0) { - mix_cc::float_range_t dist_range; - if (this->dist_mode_ == DistMode::Online) { - dist_range = DAA::STA::select_from_t_rule_feature(this->rule_id_); - } else if (this->dist_mode_ == DistMode::Offline) { - dist_range = DAA::STA::select_from_t_sample_mag(this->rule_id_); - } - - if (dist_range.get_distance() == 0 || - dist_range.get_left() > dist_range.get_right()) { - logger_->Error() << "区间不合法![" << dist_range.get_left() << "," - << dist_range.get_right() << "]" << endl; - return -1; - } - this->limit_down_ = dist_range.get_left(); - this->limit_up_ = dist_range.get_right(); - bound_checker_.setLimits(limit_down_, limit_up_); + double newDown, newUp; + if (stat_collector_.reloadCiDist(newDown, newUp, now_time_, last_load_time_)) { + this->limit_down_ = newDown; + this->limit_up_ = newUp; + bound_checker_.setLimits(newDown, newUp); this->rule_stat_.limit_down = limit_down_; this->rule_stat_.limit_up = limit_up_; - this->rule_stat_.current_value = - (this->limit_down_ + this->limit_up_) / 2; + this->rule_stat_.current_value = (limit_down_ + limit_up_) / 2; logger_->Info() - << "更新置信区间,[" << dist_range.get_left() << "," - << dist_range.get_right() << "]" + << "更新置信区间,[" << newDown << "," + << newUp << "]" << ",type[0-手动设置的区间;1-在线更新的区间;2-离线分析的区间]:" << this->dist_mode_ << endl; } @@ -873,7 +820,7 @@ void ExpBase::task_mon_pro() { -> operator[](exp_type_ * 1000 + task_seq) .data_record.push_back(res.value); - this->sta_ptr_->running_stat_add(res.value); + stat_collector_.runningStatAdd(res.value); } } } @@ -1034,12 +981,7 @@ void ExpBase::reset_dev_data() { if (glob_process_type == ProcessType::kMon) { if (exp_type_ == ExpType::Bound || exp_type_ == ExpType::CondBound || exp_type_ == ExpType::BoundHoldTime) { - int res = DAA::STA::delete_statistics_data(this->rule_id_); - if (res != 0) { - logger_->Debug() << "DAA::STA::delete_statistics_data(this->rule_id_) " - "存在删除失败的表!" - << endl; - } + stat_collector_.reset(rule_id_); this->reload_config_up_down(); /*7.上下限*/ this->reload_ci_dist(); this->last_load_time_ = chrono::system_clock::now(); diff --git a/eqpalg/algs/exp_base.h b/eqpalg/algs/exp_base.h index 6add062..48a8a40 100644 --- a/eqpalg/algs/exp_base.h +++ b/eqpalg/algs/exp_base.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -95,7 +96,7 @@ public: * 删除 * 四张表(T_RULE_SAMPLE_1D,T_RULE_SAMPLE_1D_INFO,T_SAMPLE_RECORD,T_SAMPLE_RECORD,T_RULE_SAMPLE_FEATURE)rule_id_对应数据 * cron---- - * 重置 sta_ptr_(sta_ptr_.reset()) + * 重置 stat_collector_(stat_collector_.reset()) */ virtual void reset_dev_data() override; @@ -171,7 +172,7 @@ protected: TimeDur hold_time_ = 0ms; - std::unique_ptr sta_ptr_; + StatCollector stat_collector_; std::map> hold_times_; diff --git a/eqpalg/utility/stat_collector.cpp b/eqpalg/utility/stat_collector.cpp new file mode 100644 index 0000000..dddfbec --- /dev/null +++ b/eqpalg/utility/stat_collector.cpp @@ -0,0 +1,112 @@ +// eqpalg/utility/stat_collector.cpp +#include +#include +#include +#include +#include + +StatCollector::~StatCollector() = default; + +void StatCollector::configure(const std::string& ruleId, + const std::string& ruleName, + int distMode, bool isLearning) { + rule_id_ = ruleId; + rule_name_ = ruleName; + dist_mode_ = distMode; + is_learning_ = isLearning; + configured_ = true; +} + +int StatCollector::processCron(const std::vector& statValues, + TimePoint now, int cronDelayHours, + TimePoint& lastLoadTime) { + if (statValues.empty()) return 0; + + int size_data = static_cast(statValues.size()); + + if (sta_ptr_ == nullptr) { + sta_ptr_ = std::make_unique(rule_id_, rule_name_); + sta_ptr_->update_ci_dist(); + lastLoadTime = now; + } + + if (now - lastLoadTime > std::chrono::hours(cronDelayHours)) { + sta_ptr_->update_ci_dist(); + lastLoadTime = now; + } + + if (sta_ptr_->is_init()) { + for (int i = 0; i < size_data; i++) { + sta_ptr_->dist_add(statValues[i]); + } + } else { + double max_data = *std::max_element(statValues.begin(), statValues.end()); + double min_data = *std::min_element(statValues.begin(), statValues.end()); + double range = (max_data - min_data) / static_cast(DAA::STA_SIZE_MIN); + if (range < 0.1) range = 0.1; + if (sta_ptr_->init(range, min_data)) { + for (int i = 0; i < size_data; i++) { + sta_ptr_->dist_add(statValues[i]); + } + } + } + + sta_ptr_->store_db2(); + return size_data; +} + +bool StatCollector::reloadCiDist(double& limitDown, double& limitUp, + TimePoint now, TimePoint& lastLoadTime) { + if (dist_mode_ == 0) return false; // 手动模式,不自动加载 + + lastLoadTime = now; + + mix_cc::float_range_t dist_range; + if (dist_mode_ == DistMode::Online) { + dist_range = DAA::STA::select_from_t_rule_feature(rule_id_); + } else if (dist_mode_ == DistMode::Offline) { + dist_range = DAA::STA::select_from_t_sample_mag(rule_id_); + } else { + return false; + } + + if (dist_range.get_distance() == 0 || + dist_range.get_left() > dist_range.get_right()) { + return false; + } + + limitDown = dist_range.get_left(); + limitUp = dist_range.get_right(); + return true; +} + +void StatCollector::reset(const std::string& ruleId) { + DAA::STA::delete_statistics_data(ruleId); + sta_ptr_.reset(); +} + +void StatCollector::ensureInitialized() { + if (sta_ptr_ == nullptr) { + sta_ptr_ = std::make_unique(rule_id_, rule_name_); + } +} + +void StatCollector::resetData() { + sta_ptr_->reset_data(); +} + +void StatCollector::distAdd(double value) { + sta_ptr_->dist_add(value); +} + +bool StatCollector::taskStoreDb2(const std::string& sampleId) { + return sta_ptr_->task_store_db2(sampleId); +} + +std::string StatCollector::getSampleStatStr() { + return sta_ptr_->get_sample_stat_str(); +} + +void StatCollector::runningStatAdd(double value) { + sta_ptr_->running_stat_add(value); +} diff --git a/eqpalg/utility/stat_collector.h b/eqpalg/utility/stat_collector.h new file mode 100644 index 0000000..4e8e5a5 --- /dev/null +++ b/eqpalg/utility/stat_collector.h @@ -0,0 +1,94 @@ +// eqpalg/utility/stat_collector.h +#pragma once + +#include +#include +#include +#include + +// 前向声明 +namespace DAA { class STA; } + +using TimePoint = std::chrono::system_clock::time_point; + +/** + * @brief 统计学习收集器 + * + * 管理 DAA::STA 分布统计的生命周期: + * - 样本累积(dist_add) + * - 分布初始化与 DB2 持久化 + * - 置信区间更新 + */ +class StatCollector { +public: + StatCollector() = default; + ~StatCollector(); + + /** + * @brief 配置统计收集器 + * @param ruleId 规则 ID + * @param ruleName 规则名称 + * @param distMode 分布模式(0=手动, 1=在线, 2=离线) + * @param isLearning 是否启用自学习 + */ + void configure(const std::string& ruleId, const std::string& ruleName, + int distMode, bool isLearning); + + /** + * @brief cron 进程:从共享内存获取累积样本,更新分布并持久化 + * @param statValues 累积的样本值 + * @param now 当前时间 + * @param cronDelayHours cron 更新延迟(小时) + * @param lastLoadTime [in/out] 上次加载时间 + * @return 处理的样本数量 + */ + int processCron(const std::vector& statValues, + TimePoint now, int cronDelayHours, + TimePoint& lastLoadTime); + + /** + * @brief 加载置信区间到上下限 + * @param limitDown [out] 新的下限 + * @param limitUp [out] 新的上限 + * @param now 当前时间 + * @param lastLoadTime [out] 更新时间 + * @return true 成功加载新区间 + */ + bool reloadCiDist(double& limitDown, double& limitUp, + TimePoint now, TimePoint& lastLoadTime); + + /** + * @brief 删除统计数据和重置 + */ + void reset(const std::string& ruleId); + + bool isConfigured() const { return configured_; } + + // ---- task 进程专用接口 ---- + + /** @brief 确保 STA 实例已初始化(惰性初始化) */ + void ensureInitialized(); + + /** @brief 重置累积数据,供新 task 使用 */ + void resetData(); + + /** @brief 向分布添加单个值 */ + void distAdd(double value); + + /** @brief 将 task 结果存储到 DB2 (T_SAMPLE_STAT) */ + bool taskStoreDb2(const std::string& sampleId); + + /** @brief 获取样本统计字符串,用于 T_SAMPLE_MAG */ + std::string getSampleStatStr(); + + /** @brief 向运行统计中添加值 */ + void runningStatAdd(double value); + +private: + std::unique_ptr sta_ptr_; + std::string rule_id_; + std::string rule_name_; + int dist_mode_ = 0; + bool is_learning_ = true; + bool configured_ = false; +};