From 973921fc4b793308abb220fdc129c590bfddbe98 Mon Sep 17 00:00:00 2001 From: Huamonarch Date: Tue, 12 May 2026 15:46:01 +0800 Subject: [PATCH] Split RuleStat display from cold data paths to reduce lock contention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Display data (alarm_value, current_value, limit_up/down, items, unit) now goes to a local-memory DisplayCache and is serialized to JSON without any shared memory lock. Cold data (stat_values, running_time, shear_times, etc.) stays in shared memory for mon-cron IPC, protected by a real interprocess mutex (boost::interprocess::interprocess_mutex) instead of the broken process-local std::mutex. AlgBase::rule_stat_ is now RuleStatLocal with standard types — zero changes to algorithm subclass code. --- eqpalg/alg_base.cpp | 5 +- eqpalg/alg_base.h | 2 +- eqpalg/utility/eqp_stat.cc | 193 +++++++++++++------ eqpalg/utility/eqp_stat.h | 202 ++++++++------------ shm/RuleStatShm.h | 368 +++++++++++++------------------------ 5 files changed, 347 insertions(+), 423 deletions(-) diff --git a/eqpalg/alg_base.cpp b/eqpalg/alg_base.cpp index 9e6d4d9..925b31a 100644 --- a/eqpalg/alg_base.cpp +++ b/eqpalg/alg_base.cpp @@ -358,8 +358,11 @@ int AlgBase::refresh_now_time() { bool AlgBase::update_map_rule() { try { std::lock_guard guard(lm); - return SingletonTemp::GetInstance().update_dynamic( + SingletonTemp::GetInstance().update_display( this->rule_id_, this->rule_stat_); + SingletonTemp::GetInstance().update_cold( + this->rule_id_, this->rule_stat_); + return true; } catch (...) { gb_logger_->log_error(this->rule_name_ + "update_map_rule()"); return false; diff --git a/eqpalg/alg_base.h b/eqpalg/alg_base.h index d2d96fd..24bc417 100644 --- a/eqpalg/alg_base.h +++ b/eqpalg/alg_base.h @@ -93,7 +93,7 @@ protected: "0"; std::mutex lm; - RuleStatShm::RuleStat rule_stat_; + RuleStatShm::RuleStatLocal rule_stat_; std::string error_message_str_; diff --git a/eqpalg/utility/eqp_stat.cc b/eqpalg/utility/eqp_stat.cc index 417a6f1..61ed936 100644 --- a/eqpalg/utility/eqp_stat.cc +++ b/eqpalg/utility/eqp_stat.cc @@ -5,17 +5,87 @@ #include #include #include -#include -/*mapRuleStat 单例*/ + +/* mapRuleStat 单例,只存冷数据 */ namespace { -std::mutex up_mutex{}; RuleStatShm::MapRuleStat mapRuleStat; -RuleStatShm::RuleStat rule_stat; +RuleStatShm::RuleStatCold rule_stat_cold; }; +// ═══════════════════════════════════════════════════════════════ +// DisplayCache +// ═══════════════════════════════════════════════════════════════ + +void DisplayCache::update(const std::string &ruleid, + const RuleStatShm::RuleStatLocal &stat) { + std::lock_guard guard(mtx_); + DisplayEntry &entry = cache_[ruleid]; + entry.alarm_value = stat.alarm_value; + entry.current_value = stat.current_value; + entry.limit_up = stat.limit_up; + entry.limit_down = stat.limit_down; + entry.unit = stat.unit; + if (!stat.items.empty()) { + entry.items = stat.items; + } +} + +void DisplayCache::update_static(const std::string &ruleid, + const RuleStatShm::RuleStatCold &cold) { + std::lock_guard guard(mtx_); + mix_cc::json js; + js["running_time"] = cold.limit_precision(cold.running_time); + js["shear_times"] = cold.shear_times; + js["alarm_times"] = cold.alarm_times; + js["last_alarm_time"] = cold.last_alarm_time.c_str(); + js["dev_coder"] = cold.dev_coder.c_str(); + static_fields_[ruleid] = js.dump(); +} + +void DisplayCache::remove(const std::string &ruleid) { + std::lock_guard guard(mtx_); + cache_.erase(ruleid); + static_fields_.erase(ruleid); +} + +std::string DisplayCache::get_json() { + std::map snapshot; + std::map static_snapshot; + { + std::lock_guard guard(mtx_); + snapshot = cache_; + static_snapshot = static_fields_; + } + + mix_cc::json js; + for (const auto &[key, entry] : snapshot) { + mix_cc::json item = entry.to_json(); + auto it = static_snapshot.find(key); + if (it != static_snapshot.end() && !it->second.empty()) { + auto static_js = mix_cc::json::parse(it->second); + for (auto &[k, v] : static_js.items()) { + item[k] = v; + } + } else { + // 静态字段尚未缓存时,填充默认值 + item["running_time"] = 0; + item["shear_times"] = 0; + item["alarm_times"] = 0; + item["last_alarm_time"] = "无报警"; + item["dev_coder"] = "无"; + } + js[key] = item; + } + return js.dump(); +} + +// ═══════════════════════════════════════════════════════════════ +// EqpStat +// ═══════════════════════════════════════════════════════════════ + EqpStat::EqpStat() { logger_ = std::make_unique("EqpStat"); - logger_->Debug() << "EqpStat::EqpStat()" << endl; + logger_->Debug() << "EqpStat::EqpStat()" << std::endl; } EqpStat::~EqpStat() { logger_->Info() << "EqpStat::~EqpStat()" << std::endl; } @@ -25,11 +95,11 @@ void EqpStat::get_cfg_rules() { auto rule_list_maybe = mix_cc::sql::exec(sql_statement); if (rule_list_maybe.is_just()) { auto rule_list = rule_list_maybe.unsafe_get_just(); - logger_->Debug() << "cfg_rules_ size:" << cfg_rules_.size() << endl; + logger_->Debug() << "cfg_rules_ size:" << cfg_rules_.size() << std::endl; cfg_rules_.clear(); if (!rule_list.empty()) { int num = rule_list.size(); - logger_->Debug() << "rules num:" << num << endl; + logger_->Debug() << "rules num:" << num << std::endl; for (int i = 0; i < num; i++) { cfg_rules_.push_back(rule_list[i].ruleId); } @@ -48,26 +118,45 @@ void EqpStat::get_cfg_rules() { } } -bool EqpStat::update_dynamic(std::string ruleid, - const RuleStatShm::RuleStat &rule_stat) { +bool EqpStat::update_display(const std::string &ruleid, + const RuleStatShm::RuleStatLocal &rule_stat) { try { - return mapRuleStat.update_dynamic(ruleid, rule_stat, true); + display_cache_.update(ruleid, rule_stat); + return true; } catch (const std::exception &e) { - logger_->Error() << "ruleid:" << ruleid << "EqpStat::update_dynamic ERROR!" - << e.what() << std::endl; + logger_->Error() << "ruleid:" << ruleid + << "EqpStat::update_display ERROR!" << e.what() << std::endl; + return false; + } +} + +bool EqpStat::update_cold(const std::string &ruleid, + const RuleStatShm::RuleStatLocal &rule_stat) { + try { + RuleStatShm::RuleStatCold cold; + cold.stat_values.assign(rule_stat.stat_values.begin(), + rule_stat.stat_values.end()); + cold.fetch_mark = rule_stat.fetch_mark; + cold.running_time = rule_stat.running_time; + cold.shear_times = rule_stat.shear_times; + return mapRuleStat.update_cold_fields(ruleid, cold); + } catch (const std::exception &e) { + logger_->Error() << "ruleid:" << ruleid + << "EqpStat::update_cold ERROR!" << e.what() << std::endl; return false; } } bool EqpStat::update_static(std::string ruleid, bool is_times) { try { - std::lock_guard guard(up_mutex); - rule_stat.alarm_times = select_alarm_by_ruleid(ruleid); - rule_stat.dev_coder = select_dev_coder_by_ruleid(ruleid); - rule_stat.running_time = is_times ? select_running_by_ruleid(ruleid) : 0; - rule_stat.shear_times = is_times ? select_times_by_ruleid(ruleid) : 0; - rule_stat.last_alarm_time = select_latest_alarm_by_ruleid(ruleid); - mapRuleStat.update_dynamic(ruleid, rule_stat, false); + rule_stat_cold.alarm_times = select_alarm_by_ruleid(ruleid); + rule_stat_cold.dev_coder = select_dev_coder_by_ruleid(ruleid).c_str(); + rule_stat_cold.running_time = is_times ? select_running_by_ruleid(ruleid) : 0; + rule_stat_cold.shear_times = is_times ? select_times_by_ruleid(ruleid) : 0; + rule_stat_cold.last_alarm_time = + select_latest_alarm_by_ruleid(ruleid).c_str(); + mapRuleStat.update_static_fields(ruleid, rule_stat_cold); + display_cache_.update_static(ruleid, rule_stat_cold); return true; } catch (const std::exception &e) { logger_->Error() << "EqpStat::update_static() ERROR!" << e.what() @@ -90,39 +179,36 @@ bool EqpStat::update_static() { } } -bool EqpStat::update_stat(std::string ruleid, - const RuleStatShm::RuleStat &rule_stat) { +bool EqpStat::add_stat_values(std::string ruleid, const double &value) { try { - return mapRuleStat.update(ruleid, rule_stat); + return mapRuleStat.add_stat_value(ruleid, value); } catch (const std::exception &e) { - logger_->Error() << "ruleid:" << ruleid << "EqpStat::update_stat ERROR!" - << e.what() << std::endl; + logger_->Error() << "ruleid:" << ruleid + << "EqpStat::add_stat_values ERROR!" << e.what() << std::endl; return false; } } -bool EqpStat::add_stat_values(std::string ruleid, const double &rule_stat) { - try { - return mapRuleStat.add_stat_value(ruleid, rule_stat); - } catch (const std::exception &e) { - logger_->Error() << "ruleid:" << ruleid << "EqpStat::add_stat_values ERROR!" - << e.what() << std::endl; - return false; - } -} bool EqpStat::get_stat_values(std::string ruleid, - RuleStatShm::RuleStat &rule_stat) { + RuleStatShm::RuleStatLocal &local) { try { - return mapRuleStat.get_stat_value(ruleid, rule_stat); + RuleStatShm::RuleStatCold cold; + if (!mapRuleStat.get_stat_value(ruleid, cold)) { + return false; + } + local.stat_values.assign(cold.stat_values.begin(), cold.stat_values.end()); + local.fetch_mark = cold.fetch_mark; + return true; } catch (const std::exception &e) { - logger_->Error() << "ruleid:" << ruleid << "EqpStat::get_stat_values ERROR!" - << e.what() << std::endl; + logger_->Error() << "ruleid:" << ruleid + << "EqpStat::get_stat_values ERROR!" << e.what() << std::endl; return false; } } bool EqpStat::delete_stat(std::string ruleid) { try { + display_cache_.remove(ruleid); return mapRuleStat.delete_data(ruleid); } catch (const std::exception &e) { logger_->Error() << "ruleid:" << ruleid << "EqpStat::delete_stat ERROR!" @@ -131,7 +217,7 @@ bool EqpStat::delete_stat(std::string ruleid) { } } -std::string EqpStat::get_stat_json() { return mapRuleStat.GetDataJson(); } +std::string EqpStat::get_stat_json() { return display_cache_.get_json(); } std::vector EqpStat::stat_find_no_ruleid() { std::vector res{}; @@ -141,7 +227,7 @@ std::vector EqpStat::stat_find_no_ruleid() { int EqpStat::get_stat_size() { return mapRuleStat.size(); } -double EqpStat::select_running_by_ruleid(string ruleid) { +double EqpStat::select_running_by_ruleid(std::string ruleid) { T_RULE_SAMPLE_1D trs1a; auto query_list_maybe = exec( select(trs1a.X1()).from(trs1a).where(trs1a.RuleId() == ruleid)); @@ -153,11 +239,12 @@ double EqpStat::select_running_by_ruleid(string ruleid) { return query_list[0].X1; } else { this->logger_->Debug() << "!query_list_maybe.is_just() ruleid:" << ruleid - << endl; + << std::endl; return 0; } } -unsigned long EqpStat::select_times_by_ruleid(string ruleid) { + +unsigned long EqpStat::select_times_by_ruleid(std::string ruleid) { T_RULE_SAMPLE_1D trs1a; auto query_list_maybe = exec( select(trs1a.Count()).from(trs1a).where(trs1a.RuleId() == ruleid)); @@ -172,13 +259,13 @@ unsigned long EqpStat::select_times_by_ruleid(string ruleid) { } } -string EqpStat::get_ruleid_json() { +std::string EqpStat::get_ruleid_json() { mix_cc::json js1; js1["ruleid"] = this->cfg_rules_; return js1.dump(); } -int EqpStat::select_alarm_by_ruleid(string ruleid) { +int EqpStat::select_alarm_by_ruleid(std::string ruleid) { T_RULE_RESULT trr; auto query_list_maybe = exec( select(trr.ruleId()) @@ -189,12 +276,12 @@ int EqpStat::select_alarm_by_ruleid(string ruleid) { return query_list.size(); } else { logger_->Error() << "ruleid:" << ruleid << "select_alarm_by_ruleid ERROR!" - << ",location:" << BOOST_CURRENT_LOCATION << endl; + << ",location:" << BOOST_CURRENT_LOCATION << std::endl; return 0; } } -std::string EqpStat::select_dev_coder_by_ruleid(string ruleid) { +std::string EqpStat::select_dev_coder_by_ruleid(std::string ruleid) { T_RULE_CFG t_rule_cfg; auto sql_statement = select(t_rule_cfg.eqpid()) .from(t_rule_cfg) @@ -209,7 +296,7 @@ std::string EqpStat::select_dev_coder_by_ruleid(string ruleid) { } } -string EqpStat::select_latest_alarm_by_ruleid(string ruleid) { +std::string EqpStat::select_latest_alarm_by_ruleid(std::string ruleid) { try { FV_RESULT_LATEST fvrl; @@ -222,7 +309,7 @@ string EqpStat::select_latest_alarm_by_ruleid(string ruleid) { if (query_list.size() == 0) { return "无报警"; } - string quered_time = + std::string quered_time = mix_cc::mix_time_t(query_list[0].alarmtime).to_formatted_time(); return quered_time; @@ -239,14 +326,14 @@ string EqpStat::select_latest_alarm_by_ruleid(string ruleid) { void EqpStat::init() { if (!this->cfg_flag) { get_cfg_rules(); - logger_->Debug() << "EqpStat::init()---get_cfg_rules()---" << endl; + logger_->Debug() << "EqpStat::init()---get_cfg_rules()---" << std::endl; update_static(); - logger_->Debug() << "EqpStat::init()---update_static()---" << endl; + logger_->Debug() << "EqpStat::init()---update_static()---" << std::endl; this->cfg_flag = true; - last_update_static_time_ = chrono::system_clock::now(); + last_update_static_time_ = std::chrono::system_clock::now(); } - if (chrono::system_clock::now() - last_update_static_time_ > - chrono::minutes(5)) { + if (std::chrono::system_clock::now() - last_update_static_time_ > + std::chrono::minutes(5)) { this->cfg_flag = false; } -} \ No newline at end of file +} diff --git a/eqpalg/utility/eqp_stat.h b/eqpalg/utility/eqp_stat.h index 8cb3edb..47349ef 100644 --- a/eqpalg/utility/eqp_stat.h +++ b/eqpalg/utility/eqp_stat.h @@ -1,154 +1,108 @@ #pragma once /** * @file eqp_stat.h - * @brief 处理与页面交互的shm数据 + * @brief 处理与页面交互的展示数据 + mon↔cron 冷数据交换 + * + * 展示数据:本地缓存 → get_stat_json() 拼 JSON(无共享内存锁) + * 冷数据:共享内存 map,boost::interprocess::interprocess_mutex 同步 + * * @author your name (you@domain.com) - * @version 0.1 + * @version 0.2 * @date 2023-12-21 * * Copyright: Baosight Co. Ltd. * DO NOT COPY/USE WITHOUT PERMISSION - * */ #include #include +#include #include +#include +#include #include + +/// 展示数据条目(本地缓存用) +struct DisplayEntry { + double alarm_value = 0; + double current_value = 0; + double limit_up = 0; + double limit_down = 0; + std::string unit; + std::vector items; + + double limit_precision(double data, int precision = 2) const { + double factor = std::pow(10, precision); + return std::round(data * factor) / factor; + } + + mix_cc::json to_json() const { + mix_cc::json js; + js["alarm_value"] = limit_precision(alarm_value); + js["current_value"] = limit_precision(current_value); + js["limit_up"] = limit_precision(limit_up); + js["limit_down"] = limit_precision(limit_down); + js["unit"] = unit; + js["items"] = items; + return js; + } +}; + +class DisplayCache { +public: + void update(const std::string &ruleid, const RuleStatShm::RuleStatLocal &stat); + void update_static(const std::string &ruleid, const RuleStatShm::RuleStatCold &cold); + void remove(const std::string &ruleid); + std::string get_json(); + +private: + std::map cache_; + std::map static_fields_; + std::mutex mtx_; +}; + class EqpStat { - public: +public: EqpStat(); ~EqpStat(); - public: - /** - * @brief 静态数据维护 - */ +public: void init(); - /** - * @brief 更新指定key的动态数据 - * @param ruleid 规则id - * @param rule_stat RuleStatShm::RuleStat数据 引用 - * @return true - * @return false - */ - bool update_dynamic(std::string ruleid, - const RuleStatShm::RuleStat& rule_stat); + /// 写展示数据到本地缓存(mon 高频调用,线程锁,无共享内存操作) + bool update_display(const std::string &ruleid, + const RuleStatShm::RuleStatLocal &rule_stat); - /** - * @brief 更新指定key的静态数据 弃用 - * @param ruleid 规则id - * @param rule_stat RuleStatShm::RuleStat数据 引用 - * @return true - * @return false - */ - /** - * @brief 更新指定key的 静态数据 - * 内含当前变量 rule_stat_statstic_ - * @param ruleid 规则id - * @param is_times 是否为运行时间/出现次数 - * @return true - * @return false - */ + /// 写冷数据到共享内存(mon 高频调用,进程间锁) + bool update_cold(const std::string &ruleid, + const RuleStatShm::RuleStatLocal &rule_stat); + + /// 写静态数据到共享内存 + 更新本地展示缓存(cron/ExpTimes 调用) bool update_static(std::string ruleid, bool is_times); - /** - * @brief 更新所有 循环 - * 内含当前变量 rule_stat_ - * @return true - * @return false - */ bool update_static(); - /** - * @brief 更新指定key的所有数据 - * @param ruleid My Param doc - * @param rule_stat My Param doc - * @return true - * @return false - */ - bool update_stat(std::string ruleid, const RuleStatShm::RuleStat& rule_stat); - /** - * @brief 添加数据到shm - * @param ruleid My Param doc - * @param rule_stat My Param doc - * @return true - * @return false - */ - bool add_stat_values(std::string ruleid, const double& rule_stat); - /** - * @brief 获取shm的数据 - * @param ruleid 规则id - * @param rule_stat 接收的数据类 - * @return true - * @return false - */ - bool get_stat_values(std::string ruleid, RuleStatShm::RuleStat& rule_stat); - /** - * @brief 删除指定key的数据 - * @param ruleid My Param doc - * @return true - * @return false - */ + + /// mon 攒样本到共享内存 + bool add_stat_values(std::string ruleid, const double &value); + + /// cron 从共享内存取样本 + bool get_stat_values(std::string ruleid, RuleStatShm::RuleStatLocal &local); + bool delete_stat(std::string ruleid); - /** - * @brief map 转 json 的 string - * @return std::string - */ std::string get_stat_json(); - /** - * @brief 从 cfg查询 所有规则,从map中找到cfg不存在的 - * @return std::vector - */ std::vector stat_find_no_ruleid(); - /** - * @brief map的size - * @return int - */ int get_stat_size(); - /** - * @brief 查T_RULE_CFG表 - */ void get_cfg_rules(); + std::string get_ruleid_json(); -/** - * @brief Get the ruleid json object - * @return string {"ruleid":[xxx,xxx,……]} - */ - string get_ruleid_json(); - - private: +private: std::unique_ptr logger_; - std::vector cfg_rules_; + DisplayCache display_cache_; + std::vector cfg_rules_; bool cfg_flag = false; - chrono::system_clock::time_point - last_update_static_time_; - /** - * @brief 运行时间 - * @param ruleid My Param doc - * @return double - */ - double select_running_by_ruleid(string ruleid); - /** - * @brief 出现次数 - * @param ruleid My Param doc - * @return unsigned long - */ - unsigned long select_times_by_ruleid(string ruleid); - /** - * @brief 报警次数 - * @param ruleid My Param doc - * @return int - */ - int select_alarm_by_ruleid(string ruleid); - /** - * @brief 设备编码 - * @param ruleid My Param doc - * @return std::string - */ - std::string select_dev_coder_by_ruleid(string ruleid); - /** - * @brief 最新报警时间 - * @param ruleid My Param doc - * @return string - */ - string select_latest_alarm_by_ruleid(string ruleid); -}; \ No newline at end of file + std::chrono::system_clock::time_point last_update_static_time_; + + double select_running_by_ruleid(std::string ruleid); + unsigned long select_times_by_ruleid(std::string ruleid); + int select_alarm_by_ruleid(std::string ruleid); + std::string select_dev_coder_by_ruleid(std::string ruleid); + std::string select_latest_alarm_by_ruleid(std::string ruleid); +}; diff --git a/shm/RuleStatShm.h b/shm/RuleStatShm.h index 80d14d7..9ae0941 100644 --- a/shm/RuleStatShm.h +++ b/shm/RuleStatShm.h @@ -1,29 +1,31 @@ #pragma once /** * @file RuleStatShm.h - * @brief 共享内存 map,用于将后台规则数据提供给页面使用 + * @brief 共享内存 map,用于 mon↔cron 进程间冷数据交换 + * + * 热数据(display)改走本地缓存 → Memcached,不经过共享内存。 + * 冷数据(stat_values/running_time/shear_times 等)保留在共享内存中, + * 使用 boost::interprocess::interprocess_mutex 做真正的进程间同步。 + * * @author your name (you@domain.com) - * @version 0.1 + * @version 0.2 * @date 2023-10-18 * * Copyright: Baosight Co. Ltd. * DO NOT COPY/USE WITHOUT PERMISSION - * */ -// #include +#include +#include #include "shm_header.h" -#include -#include // C++17 -#include + namespace RuleStatShm { using namespace ShmHeader; namespace { -std::mutex llmtx{}; ///<共享锁 -// 1. 改为读写锁 -// std::shared_mutex local_rwlock; +using ColdMutex = bipc::interprocess_mutex; +ColdMutex llmtx{}; ///< 进程间互斥锁 const static std::string dir_path = "/users/dsc/shm"; const static std::string shm_file = "MapRuleStat"; ///<映射文件名 @@ -40,12 +42,10 @@ static void_allocator default_allocator(obj_mapped_file.get_segment_manager()); ///<默认分配器 static vec_allocator_s items_allocator(obj_mapped_file.get_segment_manager()); ///< vector_s分配器 -// static char_string key_object("", default_allocator); -// static char_string key_delete("", default_allocator); -// 2. 改为线程局部存储 + thread_local static char_string tl_key_object("", default_allocator); thread_local static char_string tl_key_delete("", default_allocator); -// 3. 辅助函数:安全设置线程局部key + char_string &get_thread_local_key(const std::string &key) { tl_key_object = key.c_str(); return tl_key_object; @@ -57,83 +57,43 @@ char_string &get_thread_local_delete_key(const std::string &key) { } } // namespace -///<定义数据 -struct RuleStat { - /** 动态数据 规则触发更新 **/ - double alarm_value = 0; ///<最新报警值 - double current_value = 0; ///<当前值 - double limit_up = 0; ///<上限 - double limit_down = 0; ///<下限 - vector_s items; ///<数据项 - vector_d stat_values; ///<统计值,用于cron - bool fetch_mark = false; ///<取数据标记 - /** 静态数据 定时更新 **/ - double running_time = 0; ///<累积的运行时间 - int64_t shear_times = 0; ///<剪切次数 - int64_t alarm_times = 0; ///<报警次数 +/// 本地完整数据(AlgBase 使用,标准分配器) +struct RuleStatLocal { + double alarm_value = 0; + double current_value = 0; + double limit_up = 0; + double limit_down = 0; + std::vector items; + std::vector stat_values; + bool fetch_mark = false; + double running_time = 0; + int64_t shear_times = 0; + int64_t alarm_times = 0; + std::string last_alarm_time = "无报警"; + std::string dev_coder = "无"; + std::string unit; +}; + +/// 共享内存冷数据(mon↔cron 交换用) +struct RuleStatCold { + vector_d stat_values; ///<统计值,mon攒样本,cron消费 + bool fetch_mark = false; ///<取数据标记 + double running_time = 0; ///<累积的运行时间 + int64_t shear_times = 0; ///<剪切次数 + int64_t alarm_times = 0; ///<报警次数 bipc::string last_alarm_time = "无报警"; ///<上次报警时间 bipc::string dev_coder = "无"; ///<设备编码 - bipc::string unit = ""; ///<数据单位(比如℃) - RuleStat() - : items(items_allocator), stat_values(default_allocator) { - } ///<共享内存的内存分配 - /** 数据操作 **/ - mix_cc::json invert2json() const { - mix_cc::json js1; - /** 动态数据 规则触发更新 **/ - js1["alarm_value"] = alarm_value; ///<最新报警值 - js1["current_value"] = limit_precision(current_value); ///<当前值 - js1["limit_up"] = limit_precision(limit_up); ///<上限阈值 - js1["limit_down"] = limit_precision(limit_down); ///<下限阈值 - js1["items"] = items; ///< tag点 - /** 静态数据 定时更新 **/ - js1["running_time"] = running_time; ///<统计的运行时间 - js1["shear_times"] = shear_times; ///<剪切次数 - js1["alarm_times"] = alarm_times; ///<报警次数 - js1["last_alarm_time"] = last_alarm_time; ///<上次报警时间 - js1["dev_coder"] = dev_coder; ///<设备编码 - js1["unit"] = unit; ///<数据单位 - return js1; - } - /** - * @brief 浮点精度控制,默认2小数点后位 - * @param data My Param doc - * @param precision My Param doc - * @return double - */ + RuleStatCold() + : stat_values(default_allocator) {} + double limit_precision(double data, int precision = 2) const { double factor = std::pow(10, precision); return std::round(data * factor) / factor; } - /** - * @brief 动态数据更新 - * @param value My Param doc - * @return true - * @return false - */ - bool update_dynamic(const RuleStat &value) { - try { - items = value.items; - alarm_value = value.alarm_value; - current_value = value.current_value; - limit_up = value.limit_up; - limit_down = value.limit_down; - unit = value.unit; - } catch (...) { - cout << "update_dynamic ERROR" << endl; - return false; - } - return true; - } - /** - * @brief 静态数据更新 - * @param value My Param doc - * @return true - * @return false - */ - bool update_static(const RuleStat &value) { + + bool update_static(const RuleStatCold &value) { try { running_time = value.running_time; shear_times = value.shear_times; @@ -145,117 +105,40 @@ struct RuleStat { } return true; } + + bool update_cold(const RuleStatCold &value) { + try { + stat_values = value.stat_values; + fetch_mark = value.fetch_mark; + running_time = value.running_time; + shear_times = value.shear_times; + } catch (...) { + return false; + } + return true; + } }; ///< key 是string的情况 -typedef std::pair pair_s; ///< key-value +typedef std::pair pair_s; typedef bipc::node_allocator - allocator_s; ///<映射文件 + allocator_s; typedef std::less less_s; -typedef bipc::map MapRuleStat_s; +typedef bipc::map MapRuleStat_s; typedef MapRuleStat_s::iterator map_iter_s; -///<定义数据操作 +/// 共享内存 map 操作(仅冷数据) struct MapRuleStat { private: MapRuleStat_s *p_msg_map = obj_mapped_file.find_or_construct( shm_file.c_str())(less_s(), obj_mapped_file.get_segment_manager()); public: - /** - * @brief 更新 key对应的整个value - * @param key My Param doc - * @param value My Param doc - * @return true - * @return false - */ - bool update(const std::string &key, const RuleStat &value) { - std::lock_guard guard(llmtx); - // std::unique_lock lock(local_rwlock); // 写锁 - try { - // key_object = key.c_str(); - p_msg_map->operator[](get_thread_local_key(key)) = value; - return true; - } catch (const std::exception &e) { - return false; - } - } - /** - * @brief 更新 key对应value的动态 或 静态数据 - * @param key - * @param value - * @param is_dynamic 是否更新动态数据 - * @return true - * @return false - */ - bool update_dynamic(const std::string &key, const RuleStat &value, - bool is_dynamic = true) { - std::lock_guard guard(llmtx); - // std::unique_lock lock(local_rwlock); // 写锁 - try { - // key_object = key.c_str(); - if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) { - if (is_dynamic) { - p_msg_map->operator[](get_thread_local_key(key)) - .update_dynamic(value); - return true; - } else { - p_msg_map->operator[](get_thread_local_key(key)).update_static(value); - return true; - } - } - p_msg_map->operator[](get_thread_local_key(key)) = value; - return true; - } catch (const std::exception &e) { - cout << e.what() << endl; - return false; - } - } + // ── mon 高频调用 ── - /** - * @brief 删除 key-value - * @param key - * @return true - * @return false - */ - bool delete_data(const std::string &key) { - std::lock_guard guard(llmtx); - // std::unique_lock lock(local_rwlock); // 写锁 - try { - // key_delete = key.c_str(); - if (p_msg_map->find(get_thread_local_delete_key(key)) != - p_msg_map->end()) { - p_msg_map->erase(get_thread_local_delete_key(key)); - } - return true; - } catch (const std::exception &e) { - return false; - } - } - - size_t size() { - std::lock_guard guard(llmtx); - // std::shared_lock lock(local_rwlock); // 读锁 - return p_msg_map->size(); - } - - bool empty() { - std::lock_guard guard(llmtx); - // std::shared_lock lock(local_rwlock); // 读锁 - return p_msg_map->empty(); - } - /** - * @brief 统计数据存储 - * @param key My Param doc - * @param value My Param doc - * @return true - * @return false - */ bool add_stat_value(const std::string &key, const double &value) { try { - std::lock_guard guard(llmtx); - // std::unique_lock lock(local_rwlock); // 写锁 - // key_object = key.c_str(); + bipc::scoped_lock guard(llmtx); if (p_msg_map->operator[](get_thread_local_key(key)).fetch_mark) { p_msg_map->operator[](get_thread_local_key(key)).stat_values.clear(); p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = false; @@ -270,17 +153,42 @@ public: return false; } } - /** - * @brief 获取统计数据 - * @param key ruleid - * @param value 数据结果 - * @return bool - */ - bool get_stat_value(const std::string &key, RuleStat &value) { + + /// mon 写入累积的冷字段(stat_values, running_time, shear_times) + bool update_cold_fields(const std::string &key, const RuleStatCold &value) { + bipc::scoped_lock guard(llmtx); try { - std::lock_guard guard(llmtx); - // key_object = key.c_str(); - // std::unique_lock lock(local_rwlock); // 写锁 + if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) { + p_msg_map->operator[](get_thread_local_key(key)).update_cold(value); + } else { + p_msg_map->operator[](get_thread_local_key(key)) = value; + } + return true; + } catch (const std::exception &e) { + return false; + } + } + + /// cron 写入静态字段(running_time, alarm_times, last_alarm_time, dev_coder 等) + bool update_static_fields(const std::string &key, const RuleStatCold &value) { + bipc::scoped_lock guard(llmtx); + try { + if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) { + p_msg_map->operator[](get_thread_local_key(key)).update_static(value); + return true; + } + p_msg_map->operator[](get_thread_local_key(key)) = value; + return true; + } catch (const std::exception &e) { + return false; + } + } + + // ── cron 调用 ── + + bool get_stat_value(const std::string &key, RuleStatCold &value) { + try { + bipc::scoped_lock guard(llmtx); if (!p_msg_map->operator[](get_thread_local_key(key)).fetch_mark && p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() > stat_size_min) { @@ -295,15 +203,34 @@ public: } } - /** find 共享内存数据中, vector 存在的数据 - * @brief - * @param ruleid My Param doc - * @return std::vector - */ + // ── 管理操作 ── + + bool delete_data(const std::string &key) { + bipc::scoped_lock guard(llmtx); + try { + if (p_msg_map->find(get_thread_local_delete_key(key)) != + p_msg_map->end()) { + p_msg_map->erase(get_thread_local_delete_key(key)); + } + return true; + } catch (const std::exception &e) { + return false; + } + } + + size_t size() { + bipc::scoped_lock guard(llmtx); + return p_msg_map->size(); + } + + bool empty() { + bipc::scoped_lock guard(llmtx); + return p_msg_map->empty(); + } + std::vector find_rule_id(const std::vector &ruleid) { - std::lock_guard guard(llmtx); - // std::shared_lock lock(local_rwlock); // 读锁 + bipc::scoped_lock guard(llmtx); std::vector res; if (p_msg_map->empty()) { return {}; @@ -317,15 +244,9 @@ public: return res; } - /** find 共享内存数据中 vector 不存在的数据 - * @brief - * @param ruleid My Param doc - * @return std::vector - */ std::vector find_no_rule_id(const std::vector &ruleid) { - std::lock_guard guard(llmtx); - // std::shared_lock lock(local_rwlock); // 读锁 + bipc::scoped_lock guard(llmtx); std::vector res; if (p_msg_map->empty()) { return {}; @@ -338,47 +259,6 @@ public: } return res; } - - /** - * @brief 将数据转换成 string - * @return std::string;失败为 "ERROR" - */ - std::string GetDataJson() { - // std::lock_guard guard(local_mutext); - // try { - // mix_cc::json js1; - // for (auto iter = p_msg_map->begin(); iter != p_msg_map->end(); iter++) - // { - // js1[iter->first] = iter->second.invert2json(); - // } - // return js1.dump(); - // } catch (const std::exception &e) { - // return "ERROR"; - // } - - // 第一步:复制数据(加锁) - auto runs_t1 = std::chrono::steady_clock::now(); - std::vector> snapshot; - { - - std::lock_guard guard(llmtx); - // std::shared_lock lock(local_rwlock); // 读锁 - snapshot.reserve(p_msg_map->size()); - for (const auto & [ key, value ] : *p_msg_map) { - snapshot.emplace_back(key.c_str(), value); // 复制RuleStat - } - - } // 锁只在这几毫秒内持有! - auto runs_t2 = std::chrono::steady_clock::now(); - int64_t cost_time = (runs_t2 - runs_t1).count() / 1000000; - // 第二步:序列化(不加锁) - mix_cc::json js1; - js1["cost_time"] = cost_time; - for (const auto & [ key, value ] : snapshot) { - js1[key] = value.invert2json(); // 耗时操作,但无锁 - } - return js1.dump(); - } }; -} // namespace RuleStatShm \ No newline at end of file +} // namespace RuleStatShm