eis/eqpalg/utility/eqp_stat.cc
Huamonarch b3932b0af8 Fix: update_cold must not overwrite SHM stat_values and fetch_mark
Mon's update_map_rule() called update_cold() which blindly copied
RuleStatLocal's stat_values (always empty in mon) and fetch_mark
(always false in mon) into SHM, destroying accumulated data and
breaking the mon-cron handshake.

stat_values and fetch_mark are managed exclusively by the
add_stat_value/get_stat_value handshake. The cold sync path only
needs to transport running_time and shear_times.
2026-05-13 11:22:42 +08:00

338 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <eqpalg/table_struct/fv_result_latest.h>
#include <eqpalg/table_struct/t_rule_cfg.h>
#include <eqpalg/table_struct/t_rule_result.h>
#include <eqpalg/table_struct/t_rule_sample_1d.h>
#include <eqpalg/utility/eqp_stat.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
/* mapRuleStat 单例,只存冷数据 */
namespace {
RuleStatShm::MapRuleStat mapRuleStat;
RuleStatShm::RuleStatCold rule_stat_cold;
};
// ═══════════════════════════════════════════════════════════════
// DisplayCache
// ═══════════════════════════════════════════════════════════════
void DisplayCache::update(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &stat) {
std::lock_guard<std::mutex> guard(mtx_);
DisplayEntry &entry = cache_[ruleid];
entry.alarm_value = stat.alarm_value;
entry.current_value = stat.current_value;
entry.limit_up = stat.limit_up;
entry.limit_down = stat.limit_down;
entry.unit = stat.unit;
if (!stat.items.empty()) {
entry.items = stat.items;
}
}
void DisplayCache::update_static(const std::string &ruleid,
const RuleStatShm::RuleStatCold &cold) {
std::lock_guard<std::mutex> guard(mtx_);
mix_cc::json js;
js["running_time"] = cold.limit_precision(cold.running_time);
js["shear_times"] = cold.shear_times;
js["alarm_times"] = cold.alarm_times;
js["last_alarm_time"] = cold.last_alarm_time.c_str();
js["dev_coder"] = cold.dev_coder.c_str();
static_fields_[ruleid] = js.dump();
}
void DisplayCache::remove(const std::string &ruleid) {
std::lock_guard<std::mutex> guard(mtx_);
cache_.erase(ruleid);
static_fields_.erase(ruleid);
}
std::string DisplayCache::get_json() {
std::map<std::string, DisplayEntry> snapshot;
std::map<std::string, std::string> static_snapshot;
{
std::lock_guard<std::mutex> guard(mtx_);
snapshot = cache_;
static_snapshot = static_fields_;
}
mix_cc::json js;
for (const auto &[key, entry] : snapshot) {
mix_cc::json item = entry.to_json();
auto it = static_snapshot.find(key);
if (it != static_snapshot.end() && !it->second.empty()) {
auto static_js = mix_cc::json::parse(it->second);
for (auto &[k, v] : static_js.items()) {
item[k] = v;
}
} else {
// 静态字段尚未缓存时,填充默认值
item["running_time"] = 0;
item["shear_times"] = 0;
item["alarm_times"] = 0;
item["last_alarm_time"] = "无报警";
item["dev_coder"] = "";
}
js[key] = item;
}
return js.dump();
}
// ═══════════════════════════════════════════════════════════════
// EqpStat
// ═══════════════════════════════════════════════════════════════
EqpStat::EqpStat() {
logger_ = std::make_unique<LOG>("EqpStat");
logger_->Debug() << "EqpStat::EqpStat()" << std::endl;
}
EqpStat::~EqpStat() { logger_->Info() << "EqpStat::~EqpStat()" << std::endl; }
void EqpStat::get_cfg_rules() {
T_RULE_CFG t_rule_cfg;
auto sql_statement = select(t_rule_cfg.ruleId()).from(t_rule_cfg);
auto rule_list_maybe = mix_cc::sql::exec<db2_t, T_RULE_CFG>(sql_statement);
if (rule_list_maybe.is_just()) {
auto rule_list = rule_list_maybe.unsafe_get_just();
logger_->Debug() << "cfg_rules_ size:" << cfg_rules_.size() << std::endl;
cfg_rules_.clear();
if (!rule_list.empty()) {
int num = rule_list.size();
logger_->Debug() << "rules num:" << num << std::endl;
for (int i = 0; i < num; i++) {
cfg_rules_.push_back(rule_list[i].ruleId);
}
} else {
logger_->Info() << "no rule in database cfg!" << std::endl;
}
} else {
logger_->Error() << "EqpStat::get_cfg_rules()"
<< ", select查询异常" << std::endl;
}
auto res = this->stat_find_no_ruleid();
if (!res.empty()) {
for (auto item : res) {
this->delete_stat(item);
}
}
}
bool EqpStat::update_display(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &rule_stat) {
try {
display_cache_.update(ruleid, rule_stat);
return true;
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::update_display ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::update_cold(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &rule_stat) {
try {
RuleStatShm::RuleStatCold cold;
// stat_values 和 fetch_mark 由 SHM 的 add_stat_value/get_stat_value 握手管理
cold.running_time = rule_stat.running_time;
cold.shear_times = rule_stat.shear_times;
return mapRuleStat.update_cold_fields(ruleid, cold);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::update_cold ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::update_static(std::string ruleid, bool is_times) {
try {
rule_stat_cold.alarm_times = select_alarm_by_ruleid(ruleid);
rule_stat_cold.dev_coder = select_dev_coder_by_ruleid(ruleid).c_str();
rule_stat_cold.running_time = is_times ? select_running_by_ruleid(ruleid) : 0;
rule_stat_cold.shear_times = is_times ? select_times_by_ruleid(ruleid) : 0;
rule_stat_cold.last_alarm_time =
select_latest_alarm_by_ruleid(ruleid).c_str();
mapRuleStat.update_static_fields(ruleid, rule_stat_cold);
display_cache_.update_static(ruleid, rule_stat_cold);
return true;
} catch (const std::exception &e) {
logger_->Error() << "EqpStat::update_static() ERROR!" << e.what()
<< std::endl;
return false;
}
}
bool EqpStat::update_static() {
try {
bool res = true;
for (auto ruleid : cfg_rules_) {
res = res & this->update_static(ruleid, true);
}
return res;
} catch (const std::exception &e) {
logger_->Error() << "EqpStat::update_static() ERROR!" << e.what()
<< std::endl;
return false;
}
}
bool EqpStat::add_stat_values(std::string ruleid, const double &value) {
try {
return mapRuleStat.add_stat_value(ruleid, value);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::add_stat_values ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::get_stat_values(std::string ruleid,
RuleStatShm::RuleStatLocal &local) {
try {
RuleStatShm::RuleStatCold cold;
if (!mapRuleStat.get_stat_value(ruleid, cold)) {
return false;
}
local.stat_values.assign(cold.stat_values.begin(), cold.stat_values.end());
local.fetch_mark = cold.fetch_mark;
return true;
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::get_stat_values ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::delete_stat(std::string ruleid) {
try {
display_cache_.remove(ruleid);
return mapRuleStat.delete_data(ruleid);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid << "EqpStat::delete_stat ERROR!"
<< e.what() << std::endl;
return false;
}
}
std::string EqpStat::get_stat_json() { return display_cache_.get_json(); }
std::vector<std::string> EqpStat::stat_find_no_ruleid() {
std::vector<std::string> res{};
res = mapRuleStat.find_no_rule_id(cfg_rules_);
return res;
}
int EqpStat::get_stat_size() { return mapRuleStat.size(); }
double EqpStat::select_running_by_ruleid(std::string ruleid) {
T_RULE_SAMPLE_1D trs1a;
auto query_list_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
select(trs1a.X1()).from(trs1a).where(trs1a.RuleId() == ruleid));
if (query_list_maybe.is_just()) {
auto &query_list = query_list_maybe.unsafe_get_just();
if (query_list.empty()) {
return 0;
}
return query_list[0].X1;
} else {
this->logger_->Debug() << "!query_list_maybe.is_just() ruleid:" << ruleid
<< std::endl;
return 0;
}
}
unsigned long EqpStat::select_times_by_ruleid(std::string ruleid) {
T_RULE_SAMPLE_1D trs1a;
auto query_list_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
select(trs1a.Count()).from(trs1a).where(trs1a.RuleId() == ruleid));
if (query_list_maybe.is_just()) {
auto &query_list = query_list_maybe.unsafe_get_just();
if (query_list.empty()) {
return 0;
}
return query_list[0].Count;
} else {
return 0;
}
}
std::string EqpStat::get_ruleid_json() {
mix_cc::json js1;
js1["ruleid"] = this->cfg_rules_;
return js1.dump();
}
int EqpStat::select_alarm_by_ruleid(std::string ruleid) {
T_RULE_RESULT trr;
auto query_list_maybe = exec<db2_t, T_RULE_CFG>(
select(trr.ruleId())
.from(trr)
.where(trr.ruleId() == ruleid, trr.dealResult() == 0));
if (query_list_maybe.is_just()) {
auto &query_list = query_list_maybe.unsafe_get_just();
return query_list.size();
} else {
logger_->Error() << "ruleid:" << ruleid << "select_alarm_by_ruleid ERROR!"
<< ",location:" << BOOST_CURRENT_LOCATION << std::endl;
return 0;
}
}
std::string EqpStat::select_dev_coder_by_ruleid(std::string ruleid) {
T_RULE_CFG t_rule_cfg;
auto sql_statement = select(t_rule_cfg.eqpid())
.from(t_rule_cfg)
.where(t_rule_cfg.ruleId() == ruleid);
auto query_list_maybe = mix_cc::sql::exec<db2_t, T_RULE_CFG>(sql_statement);
if (query_list_maybe.is_just()) {
auto &query_list = query_list_maybe.unsafe_get_just();
return query_list[0].eqpid;
} else {
logger_->Error() << "规则:" << ruleid << ",九位码查询异常!" << std::endl;
return "";
}
}
std::string EqpStat::select_latest_alarm_by_ruleid(std::string ruleid) {
try {
FV_RESULT_LATEST fvrl;
auto query_list_maybe = exec<db2_t, FV_RESULT_LATEST>(
select(fvrl.alarmtime())
.from(fvrl)
.where(fvrl.ruleId() == ruleid, fvrl.rank() == 1));
if (query_list_maybe.is_just()) {
auto &query_list = query_list_maybe.unsafe_get_just();
if (query_list.size() == 0) {
return "无报警";
}
std::string quered_time =
mix_cc::mix_time_t(query_list[0].alarmtime).to_formatted_time();
return quered_time;
} else {
return "无报警";
}
} catch (std::exception &e) {
this->logger_->Error() << "查询最新报警时间,规则:" << ruleid << e.what()
<< std::endl;
}
return "无报警";
}
void EqpStat::init() {
if (!this->cfg_flag) {
get_cfg_rules();
logger_->Debug() << "EqpStat::init()---get_cfg_rules()---" << std::endl;
update_static();
logger_->Debug() << "EqpStat::init()---update_static()---" << std::endl;
this->cfg_flag = true;
last_update_static_time_ = std::chrono::system_clock::now();
}
if (std::chrono::system_clock::now() - last_update_static_time_ >
std::chrono::minutes(5)) {
this->cfg_flag = false;
}
}