Split RuleStat display from cold data paths to reduce lock contention

Display data (alarm_value, current_value, limit_up/down, items, unit) now
goes to a local-memory DisplayCache and is serialized to JSON without any
shared memory lock. Cold data (stat_values, running_time, shear_times, etc.)
stays in shared memory for mon-cron IPC, protected by a real interprocess
mutex (boost::interprocess::interprocess_mutex) instead of the broken
process-local std::mutex. AlgBase::rule_stat_ is now RuleStatLocal with
standard types — zero changes to algorithm subclass code.
This commit is contained in:
Huamonarch 2026-05-12 15:46:01 +08:00
parent 6a28112cd7
commit 973921fc4b
5 changed files with 347 additions and 423 deletions

View File

@ -358,8 +358,11 @@ int AlgBase::refresh_now_time() {
bool AlgBase::update_map_rule() {
try {
std::lock_guard<std::mutex> guard(lm);
return SingletonTemp<EqpStat>::GetInstance().update_dynamic(
SingletonTemp<EqpStat>::GetInstance().update_display(
this->rule_id_, this->rule_stat_);
SingletonTemp<EqpStat>::GetInstance().update_cold(
this->rule_id_, this->rule_stat_);
return true;
} catch (...) {
gb_logger_->log_error(this->rule_name_ + "update_map_rule()");
return false;

View File

@ -93,7 +93,7 @@ protected:
"0";
std::mutex lm;
RuleStatShm::RuleStat rule_stat_;
RuleStatShm::RuleStatLocal rule_stat_;
std::string error_message_str_;

View File

@ -5,17 +5,87 @@
#include <eqpalg/utility/eqp_stat.h>
#include <mix_cc/sql.h>
#include <mix_cc/sql/database/db2_t.h>
#include <mutex>
/*mapRuleStat 单例*/
/* mapRuleStat 单例,只存冷数据 */
namespace {
std::mutex up_mutex{};
RuleStatShm::MapRuleStat mapRuleStat;
RuleStatShm::RuleStat rule_stat;
RuleStatShm::RuleStatCold rule_stat_cold;
};
// ═══════════════════════════════════════════════════════════════
// DisplayCache
// ═══════════════════════════════════════════════════════════════
void DisplayCache::update(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &stat) {
std::lock_guard<std::mutex> guard(mtx_);
DisplayEntry &entry = cache_[ruleid];
entry.alarm_value = stat.alarm_value;
entry.current_value = stat.current_value;
entry.limit_up = stat.limit_up;
entry.limit_down = stat.limit_down;
entry.unit = stat.unit;
if (!stat.items.empty()) {
entry.items = stat.items;
}
}
void DisplayCache::update_static(const std::string &ruleid,
const RuleStatShm::RuleStatCold &cold) {
std::lock_guard<std::mutex> guard(mtx_);
mix_cc::json js;
js["running_time"] = cold.limit_precision(cold.running_time);
js["shear_times"] = cold.shear_times;
js["alarm_times"] = cold.alarm_times;
js["last_alarm_time"] = cold.last_alarm_time.c_str();
js["dev_coder"] = cold.dev_coder.c_str();
static_fields_[ruleid] = js.dump();
}
void DisplayCache::remove(const std::string &ruleid) {
std::lock_guard<std::mutex> guard(mtx_);
cache_.erase(ruleid);
static_fields_.erase(ruleid);
}
std::string DisplayCache::get_json() {
std::map<std::string, DisplayEntry> snapshot;
std::map<std::string, std::string> static_snapshot;
{
std::lock_guard<std::mutex> guard(mtx_);
snapshot = cache_;
static_snapshot = static_fields_;
}
mix_cc::json js;
for (const auto &[key, entry] : snapshot) {
mix_cc::json item = entry.to_json();
auto it = static_snapshot.find(key);
if (it != static_snapshot.end() && !it->second.empty()) {
auto static_js = mix_cc::json::parse(it->second);
for (auto &[k, v] : static_js.items()) {
item[k] = v;
}
} else {
// 静态字段尚未缓存时,填充默认值
item["running_time"] = 0;
item["shear_times"] = 0;
item["alarm_times"] = 0;
item["last_alarm_time"] = "无报警";
item["dev_coder"] = "";
}
js[key] = item;
}
return js.dump();
}
// ═══════════════════════════════════════════════════════════════
// EqpStat
// ═══════════════════════════════════════════════════════════════
EqpStat::EqpStat() {
logger_ = std::make_unique<LOG>("EqpStat");
logger_->Debug() << "EqpStat::EqpStat()" << endl;
logger_->Debug() << "EqpStat::EqpStat()" << std::endl;
}
EqpStat::~EqpStat() { logger_->Info() << "EqpStat::~EqpStat()" << std::endl; }
@ -25,11 +95,11 @@ void EqpStat::get_cfg_rules() {
auto rule_list_maybe = mix_cc::sql::exec<db2_t, T_RULE_CFG>(sql_statement);
if (rule_list_maybe.is_just()) {
auto rule_list = rule_list_maybe.unsafe_get_just();
logger_->Debug() << "cfg_rules_ size:" << cfg_rules_.size() << endl;
logger_->Debug() << "cfg_rules_ size:" << cfg_rules_.size() << std::endl;
cfg_rules_.clear();
if (!rule_list.empty()) {
int num = rule_list.size();
logger_->Debug() << "rules num:" << num << endl;
logger_->Debug() << "rules num:" << num << std::endl;
for (int i = 0; i < num; i++) {
cfg_rules_.push_back(rule_list[i].ruleId);
}
@ -48,26 +118,45 @@ void EqpStat::get_cfg_rules() {
}
}
bool EqpStat::update_dynamic(std::string ruleid,
const RuleStatShm::RuleStat &rule_stat) {
bool EqpStat::update_display(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &rule_stat) {
try {
return mapRuleStat.update_dynamic(ruleid, rule_stat, true);
display_cache_.update(ruleid, rule_stat);
return true;
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid << "EqpStat::update_dynamic ERROR!"
<< e.what() << std::endl;
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::update_display ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::update_cold(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &rule_stat) {
try {
RuleStatShm::RuleStatCold cold;
cold.stat_values.assign(rule_stat.stat_values.begin(),
rule_stat.stat_values.end());
cold.fetch_mark = rule_stat.fetch_mark;
cold.running_time = rule_stat.running_time;
cold.shear_times = rule_stat.shear_times;
return mapRuleStat.update_cold_fields(ruleid, cold);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::update_cold ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::update_static(std::string ruleid, bool is_times) {
try {
std::lock_guard<std::mutex> guard(up_mutex);
rule_stat.alarm_times = select_alarm_by_ruleid(ruleid);
rule_stat.dev_coder = select_dev_coder_by_ruleid(ruleid);
rule_stat.running_time = is_times ? select_running_by_ruleid(ruleid) : 0;
rule_stat.shear_times = is_times ? select_times_by_ruleid(ruleid) : 0;
rule_stat.last_alarm_time = select_latest_alarm_by_ruleid(ruleid);
mapRuleStat.update_dynamic(ruleid, rule_stat, false);
rule_stat_cold.alarm_times = select_alarm_by_ruleid(ruleid);
rule_stat_cold.dev_coder = select_dev_coder_by_ruleid(ruleid).c_str();
rule_stat_cold.running_time = is_times ? select_running_by_ruleid(ruleid) : 0;
rule_stat_cold.shear_times = is_times ? select_times_by_ruleid(ruleid) : 0;
rule_stat_cold.last_alarm_time =
select_latest_alarm_by_ruleid(ruleid).c_str();
mapRuleStat.update_static_fields(ruleid, rule_stat_cold);
display_cache_.update_static(ruleid, rule_stat_cold);
return true;
} catch (const std::exception &e) {
logger_->Error() << "EqpStat::update_static() ERROR!" << e.what()
@ -90,39 +179,36 @@ bool EqpStat::update_static() {
}
}
bool EqpStat::update_stat(std::string ruleid,
const RuleStatShm::RuleStat &rule_stat) {
bool EqpStat::add_stat_values(std::string ruleid, const double &value) {
try {
return mapRuleStat.update(ruleid, rule_stat);
return mapRuleStat.add_stat_value(ruleid, value);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid << "EqpStat::update_stat ERROR!"
<< e.what() << std::endl;
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::add_stat_values ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::add_stat_values(std::string ruleid, const double &rule_stat) {
try {
return mapRuleStat.add_stat_value(ruleid, rule_stat);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid << "EqpStat::add_stat_values ERROR!"
<< e.what() << std::endl;
return false;
}
}
bool EqpStat::get_stat_values(std::string ruleid,
RuleStatShm::RuleStat &rule_stat) {
RuleStatShm::RuleStatLocal &local) {
try {
return mapRuleStat.get_stat_value(ruleid, rule_stat);
RuleStatShm::RuleStatCold cold;
if (!mapRuleStat.get_stat_value(ruleid, cold)) {
return false;
}
local.stat_values.assign(cold.stat_values.begin(), cold.stat_values.end());
local.fetch_mark = cold.fetch_mark;
return true;
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid << "EqpStat::get_stat_values ERROR!"
<< e.what() << std::endl;
logger_->Error() << "ruleid:" << ruleid
<< "EqpStat::get_stat_values ERROR!" << e.what() << std::endl;
return false;
}
}
bool EqpStat::delete_stat(std::string ruleid) {
try {
display_cache_.remove(ruleid);
return mapRuleStat.delete_data(ruleid);
} catch (const std::exception &e) {
logger_->Error() << "ruleid:" << ruleid << "EqpStat::delete_stat ERROR!"
@ -131,7 +217,7 @@ bool EqpStat::delete_stat(std::string ruleid) {
}
}
std::string EqpStat::get_stat_json() { return mapRuleStat.GetDataJson(); }
std::string EqpStat::get_stat_json() { return display_cache_.get_json(); }
std::vector<std::string> EqpStat::stat_find_no_ruleid() {
std::vector<std::string> res{};
@ -141,7 +227,7 @@ std::vector<std::string> EqpStat::stat_find_no_ruleid() {
int EqpStat::get_stat_size() { return mapRuleStat.size(); }
double EqpStat::select_running_by_ruleid(string ruleid) {
double EqpStat::select_running_by_ruleid(std::string ruleid) {
T_RULE_SAMPLE_1D trs1a;
auto query_list_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
select(trs1a.X1()).from(trs1a).where(trs1a.RuleId() == ruleid));
@ -153,11 +239,12 @@ double EqpStat::select_running_by_ruleid(string ruleid) {
return query_list[0].X1;
} else {
this->logger_->Debug() << "!query_list_maybe.is_just() ruleid:" << ruleid
<< endl;
<< std::endl;
return 0;
}
}
unsigned long EqpStat::select_times_by_ruleid(string ruleid) {
unsigned long EqpStat::select_times_by_ruleid(std::string ruleid) {
T_RULE_SAMPLE_1D trs1a;
auto query_list_maybe = exec<db2_t, T_RULE_SAMPLE_1D>(
select(trs1a.Count()).from(trs1a).where(trs1a.RuleId() == ruleid));
@ -172,13 +259,13 @@ unsigned long EqpStat::select_times_by_ruleid(string ruleid) {
}
}
string EqpStat::get_ruleid_json() {
std::string EqpStat::get_ruleid_json() {
mix_cc::json js1;
js1["ruleid"] = this->cfg_rules_;
return js1.dump();
}
int EqpStat::select_alarm_by_ruleid(string ruleid) {
int EqpStat::select_alarm_by_ruleid(std::string ruleid) {
T_RULE_RESULT trr;
auto query_list_maybe = exec<db2_t, T_RULE_CFG>(
select(trr.ruleId())
@ -189,12 +276,12 @@ int EqpStat::select_alarm_by_ruleid(string ruleid) {
return query_list.size();
} else {
logger_->Error() << "ruleid:" << ruleid << "select_alarm_by_ruleid ERROR!"
<< ",location:" << BOOST_CURRENT_LOCATION << endl;
<< ",location:" << BOOST_CURRENT_LOCATION << std::endl;
return 0;
}
}
std::string EqpStat::select_dev_coder_by_ruleid(string ruleid) {
std::string EqpStat::select_dev_coder_by_ruleid(std::string ruleid) {
T_RULE_CFG t_rule_cfg;
auto sql_statement = select(t_rule_cfg.eqpid())
.from(t_rule_cfg)
@ -209,7 +296,7 @@ std::string EqpStat::select_dev_coder_by_ruleid(string ruleid) {
}
}
string EqpStat::select_latest_alarm_by_ruleid(string ruleid) {
std::string EqpStat::select_latest_alarm_by_ruleid(std::string ruleid) {
try {
FV_RESULT_LATEST fvrl;
@ -222,7 +309,7 @@ string EqpStat::select_latest_alarm_by_ruleid(string ruleid) {
if (query_list.size() == 0) {
return "无报警";
}
string quered_time =
std::string quered_time =
mix_cc::mix_time_t(query_list[0].alarmtime).to_formatted_time();
return quered_time;
@ -239,14 +326,14 @@ string EqpStat::select_latest_alarm_by_ruleid(string ruleid) {
void EqpStat::init() {
if (!this->cfg_flag) {
get_cfg_rules();
logger_->Debug() << "EqpStat::init()---get_cfg_rules()---" << endl;
logger_->Debug() << "EqpStat::init()---get_cfg_rules()---" << std::endl;
update_static();
logger_->Debug() << "EqpStat::init()---update_static()---" << endl;
logger_->Debug() << "EqpStat::init()---update_static()---" << std::endl;
this->cfg_flag = true;
last_update_static_time_ = chrono::system_clock::now();
last_update_static_time_ = std::chrono::system_clock::now();
}
if (chrono::system_clock::now() - last_update_static_time_ >
chrono::minutes(5)) {
if (std::chrono::system_clock::now() - last_update_static_time_ >
std::chrono::minutes(5)) {
this->cfg_flag = false;
}
}
}

View File

@ -1,154 +1,108 @@
#pragma once
/**
* @file eqp_stat.h
* @brief shm数据
* @brief + moncron
*
* get_stat_json() JSON
* mapboost::interprocess::interprocess_mutex
*
* @author your name (you@domain.com)
* @version 0.1
* @version 0.2
* @date 2023-12-21
*
* Copyright: Baosight Co. Ltd.
* DO NOT COPY/USE WITHOUT PERMISSION
*
*/
#include <log4cplus/LOG.h>
#include <shm/RuleStatShm.h>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
/// 展示数据条目(本地缓存用)
struct DisplayEntry {
double alarm_value = 0;
double current_value = 0;
double limit_up = 0;
double limit_down = 0;
std::string unit;
std::vector<std::string> items;
double limit_precision(double data, int precision = 2) const {
double factor = std::pow(10, precision);
return std::round(data * factor) / factor;
}
mix_cc::json to_json() const {
mix_cc::json js;
js["alarm_value"] = limit_precision(alarm_value);
js["current_value"] = limit_precision(current_value);
js["limit_up"] = limit_precision(limit_up);
js["limit_down"] = limit_precision(limit_down);
js["unit"] = unit;
js["items"] = items;
return js;
}
};
class DisplayCache {
public:
void update(const std::string &ruleid, const RuleStatShm::RuleStatLocal &stat);
void update_static(const std::string &ruleid, const RuleStatShm::RuleStatCold &cold);
void remove(const std::string &ruleid);
std::string get_json();
private:
std::map<std::string, DisplayEntry> cache_;
std::map<std::string, std::string> static_fields_;
std::mutex mtx_;
};
class EqpStat {
public:
public:
EqpStat();
~EqpStat();
public:
/**
* @brief
*/
public:
void init();
/**
* @brief key的动态数据
* @param ruleid id
* @param rule_stat RuleStatShm::RuleStat数据
* @return true
* @return false
*/
bool update_dynamic(std::string ruleid,
const RuleStatShm::RuleStat& rule_stat);
/// 写展示数据到本地缓存mon 高频调用,线程锁,无共享内存操作)
bool update_display(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &rule_stat);
/**
* @brief key的静态数据
* @param ruleid id
* @param rule_stat RuleStatShm::RuleStat数据
* @return true
* @return false
*/
/**
* @brief key的
* rule_stat_statstic_
* @param ruleid id
* @param is_times /
* @return true
* @return false
*/
/// 写冷数据到共享内存mon 高频调用,进程间锁)
bool update_cold(const std::string &ruleid,
const RuleStatShm::RuleStatLocal &rule_stat);
/// 写静态数据到共享内存 + 更新本地展示缓存cron/ExpTimes 调用)
bool update_static(std::string ruleid, bool is_times);
/**
* @brief
* rule_stat_
* @return true
* @return false
*/
bool update_static();
/**
* @brief key的所有数据
* @param ruleid My Param doc
* @param rule_stat My Param doc
* @return true
* @return false
*/
bool update_stat(std::string ruleid, const RuleStatShm::RuleStat& rule_stat);
/**
* @brief shm
* @param ruleid My Param doc
* @param rule_stat My Param doc
* @return true
* @return false
*/
bool add_stat_values(std::string ruleid, const double& rule_stat);
/**
* @brief shm的数据
* @param ruleid id
* @param rule_stat
* @return true
* @return false
*/
bool get_stat_values(std::string ruleid, RuleStatShm::RuleStat& rule_stat);
/**
* @brief key的数据
* @param ruleid My Param doc
* @return true
* @return false
*/
/// mon 攒样本到共享内存
bool add_stat_values(std::string ruleid, const double &value);
/// cron 从共享内存取样本
bool get_stat_values(std::string ruleid, RuleStatShm::RuleStatLocal &local);
bool delete_stat(std::string ruleid);
/**
* @brief map json string
* @return std::string
*/
std::string get_stat_json();
/**
* @brief cfg查询 map中找到cfg不存在的
* @return std::vector<std::string>
*/
std::vector<std::string> stat_find_no_ruleid();
/**
* @brief map的size
* @return int
*/
int get_stat_size();
/**
* @brief T_RULE_CFG表
*/
void get_cfg_rules();
std::string get_ruleid_json();
/**
* @brief Get the ruleid json object
* @return string {"ruleid":[xxx,xxx,]}
*/
string get_ruleid_json();
private:
private:
std::unique_ptr<LOG> logger_;
std::vector<string> cfg_rules_;
DisplayCache display_cache_;
std::vector<std::string> cfg_rules_;
bool cfg_flag = false;
chrono::system_clock::time_point
last_update_static_time_;
/**
* @brief
* @param ruleid My Param doc
* @return double
*/
double select_running_by_ruleid(string ruleid);
/**
* @brief
* @param ruleid My Param doc
* @return unsigned long
*/
unsigned long select_times_by_ruleid(string ruleid);
/**
* @brief
* @param ruleid My Param doc
* @return int
*/
int select_alarm_by_ruleid(string ruleid);
/**
* @brief
* @param ruleid My Param doc
* @return std::string
*/
std::string select_dev_coder_by_ruleid(string ruleid);
/**
* @brief
* @param ruleid My Param doc
* @return string
*/
string select_latest_alarm_by_ruleid(string ruleid);
};
std::chrono::system_clock::time_point last_update_static_time_;
double select_running_by_ruleid(std::string ruleid);
unsigned long select_times_by_ruleid(std::string ruleid);
int select_alarm_by_ruleid(std::string ruleid);
std::string select_dev_coder_by_ruleid(std::string ruleid);
std::string select_latest_alarm_by_ruleid(std::string ruleid);
};

View File

@ -1,29 +1,31 @@
#pragma once
/**
* @file RuleStatShm.h
* @brief map使
* @brief map moncron
*
* display Memcached
* stat_values/running_time/shear_times
* 使 boost::interprocess::interprocess_mutex
*
* @author your name (you@domain.com)
* @version 0.1
* @version 0.2
* @date 2023-10-18
*
* Copyright: Baosight Co. Ltd.
* DO NOT COPY/USE WITHOUT PERMISSION
*
*/
// #include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "shm_header.h"
#include <mutex>
#include <shared_mutex> // C++17
#include <thread>
namespace RuleStatShm {
using namespace ShmHeader;
namespace {
std::mutex llmtx{}; ///<共享锁
// 1. 改为读写锁
// std::shared_mutex local_rwlock;
using ColdMutex = bipc::interprocess_mutex;
ColdMutex llmtx{}; ///< 进程间互斥锁
const static std::string dir_path = "/users/dsc/shm";
const static std::string shm_file = "MapRuleStat"; ///<映射文件名
@ -40,12 +42,10 @@ static void_allocator
default_allocator(obj_mapped_file.get_segment_manager()); ///<默认分配器
static vec_allocator_s
items_allocator(obj_mapped_file.get_segment_manager()); ///< vector_s分配器
// static char_string key_object("", default_allocator);
// static char_string key_delete("", default_allocator);
// 2. 改为线程局部存储
thread_local static char_string tl_key_object("", default_allocator);
thread_local static char_string tl_key_delete("", default_allocator);
// 3. 辅助函数安全设置线程局部key
char_string &get_thread_local_key(const std::string &key) {
tl_key_object = key.c_str();
return tl_key_object;
@ -57,83 +57,43 @@ char_string &get_thread_local_delete_key(const std::string &key) {
}
} // namespace
///<定义数据
struct RuleStat {
/** 动态数据 规则触发更新 **/
double alarm_value = 0; ///<最新报警值
double current_value = 0; ///<当前值
double limit_up = 0; ///<上限
double limit_down = 0; ///<下限
vector_s items; ///<数据项
vector_d stat_values; ///<统计值用于cron
bool fetch_mark = false; ///<取数据标记
/** 静态数据 定时更新 **/
double running_time = 0; ///<累积的运行时间
int64_t shear_times = 0; ///<剪切次数
int64_t alarm_times = 0; ///<报警次数
/// 本地完整数据AlgBase 使用,标准分配器)
struct RuleStatLocal {
double alarm_value = 0;
double current_value = 0;
double limit_up = 0;
double limit_down = 0;
std::vector<std::string> items;
std::vector<double> stat_values;
bool fetch_mark = false;
double running_time = 0;
int64_t shear_times = 0;
int64_t alarm_times = 0;
std::string last_alarm_time = "无报警";
std::string dev_coder = "";
std::string unit;
};
/// 共享内存冷数据mon↔cron 交换用)
struct RuleStatCold {
vector_d stat_values; ///<统计值mon攒样本cron消费
bool fetch_mark = false; ///<取数据标记
double running_time = 0; ///<累积的运行时间
int64_t shear_times = 0; ///<剪切次数
int64_t alarm_times = 0; ///<报警次数
bipc::string last_alarm_time = "无报警"; ///<上次报警时间
bipc::string dev_coder = ""; ///<设备编码
bipc::string unit = ""; ///<数据单位(比如℃)
RuleStat()
: items(items_allocator), stat_values(default_allocator) {
} ///<共享内存的内存分配
/** 数据操作 **/
mix_cc::json invert2json() const {
mix_cc::json js1;
/** 动态数据 规则触发更新 **/
js1["alarm_value"] = alarm_value; ///<最新报警值
js1["current_value"] = limit_precision(current_value); ///<当前值
js1["limit_up"] = limit_precision(limit_up); ///<上限阈值
js1["limit_down"] = limit_precision(limit_down); ///<下限阈值
js1["items"] = items; ///< tag点
/** 静态数据 定时更新 **/
js1["running_time"] = running_time; ///<统计的运行时间
js1["shear_times"] = shear_times; ///<剪切次数
js1["alarm_times"] = alarm_times; ///<报警次数
js1["last_alarm_time"] = last_alarm_time; ///<上次报警时间
js1["dev_coder"] = dev_coder; ///<设备编码
js1["unit"] = unit; ///<数据单位
return js1;
}
/**
* @brief 2
* @param data My Param doc
* @param precision My Param doc
* @return double
*/
RuleStatCold()
: stat_values(default_allocator) {}
double limit_precision(double data, int precision = 2) const {
double factor = std::pow(10, precision);
return std::round(data * factor) / factor;
}
/**
* @brief
* @param value My Param doc
* @return true
* @return false
*/
bool update_dynamic(const RuleStat &value) {
try {
items = value.items;
alarm_value = value.alarm_value;
current_value = value.current_value;
limit_up = value.limit_up;
limit_down = value.limit_down;
unit = value.unit;
} catch (...) {
cout << "update_dynamic ERROR" << endl;
return false;
}
return true;
}
/**
* @brief
* @param value My Param doc
* @return true
* @return false
*/
bool update_static(const RuleStat &value) {
bool update_static(const RuleStatCold &value) {
try {
running_time = value.running_time;
shear_times = value.shear_times;
@ -145,117 +105,40 @@ struct RuleStat {
}
return true;
}
bool update_cold(const RuleStatCold &value) {
try {
stat_values = value.stat_values;
fetch_mark = value.fetch_mark;
running_time = value.running_time;
shear_times = value.shear_times;
} catch (...) {
return false;
}
return true;
}
};
///< key 是string的情况
typedef std::pair<const char_string, RuleStat> pair_s; ///< key-value
typedef std::pair<const char_string, RuleStatCold> pair_s;
typedef bipc::node_allocator<pair_s, mapped_segment_manager_t>
allocator_s; ///<映射文件
allocator_s;
typedef std::less<char_string> less_s;
typedef bipc::map<char_string, RuleStat, less_s, allocator_s> MapRuleStat_s;
typedef bipc::map<char_string, RuleStatCold, less_s, allocator_s> MapRuleStat_s;
typedef MapRuleStat_s::iterator map_iter_s;
///<定义数据操作
/// 共享内存 map 操作(仅冷数据)
struct MapRuleStat {
private:
MapRuleStat_s *p_msg_map = obj_mapped_file.find_or_construct<MapRuleStat_s>(
shm_file.c_str())(less_s(), obj_mapped_file.get_segment_manager());
public:
/**
* @brief key对应的整个value
* @param key My Param doc
* @param value My Param doc
* @return true
* @return false
*/
bool update(const std::string &key, const RuleStat &value) {
std::lock_guard<std::mutex> guard(llmtx);
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
try {
// key_object = key.c_str();
p_msg_map->operator[](get_thread_local_key(key)) = value;
return true;
} catch (const std::exception &e) {
return false;
}
}
/**
* @brief key对应value的动态
* @param key
* @param value
* @param is_dynamic
* @return true
* @return false
*/
bool update_dynamic(const std::string &key, const RuleStat &value,
bool is_dynamic = true) {
std::lock_guard<std::mutex> guard(llmtx);
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
try {
// key_object = key.c_str();
if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) {
if (is_dynamic) {
p_msg_map->operator[](get_thread_local_key(key))
.update_dynamic(value);
return true;
} else {
p_msg_map->operator[](get_thread_local_key(key)).update_static(value);
return true;
}
}
p_msg_map->operator[](get_thread_local_key(key)) = value;
return true;
} catch (const std::exception &e) {
cout << e.what() << endl;
return false;
}
}
// ── mon 高频调用 ──
/**
* @brief key-value
* @param key
* @return true
* @return false
*/
bool delete_data(const std::string &key) {
std::lock_guard<std::mutex> guard(llmtx);
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
try {
// key_delete = key.c_str();
if (p_msg_map->find(get_thread_local_delete_key(key)) !=
p_msg_map->end()) {
p_msg_map->erase(get_thread_local_delete_key(key));
}
return true;
} catch (const std::exception &e) {
return false;
}
}
size_t size() {
std::lock_guard<std::mutex> guard(llmtx);
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
return p_msg_map->size();
}
bool empty() {
std::lock_guard<std::mutex> guard(llmtx);
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
return p_msg_map->empty();
}
/**
* @brief
* @param key My Param doc
* @param value My Param doc
* @return true
* @return false
*/
bool add_stat_value(const std::string &key, const double &value) {
try {
std::lock_guard<std::mutex> guard(llmtx);
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
// key_object = key.c_str();
bipc::scoped_lock<ColdMutex> guard(llmtx);
if (p_msg_map->operator[](get_thread_local_key(key)).fetch_mark) {
p_msg_map->operator[](get_thread_local_key(key)).stat_values.clear();
p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = false;
@ -270,17 +153,42 @@ public:
return false;
}
}
/**
* @brief
* @param key ruleid
* @param value
* @return bool
*/
bool get_stat_value(const std::string &key, RuleStat &value) {
/// mon 写入累积的冷字段stat_values, running_time, shear_times
bool update_cold_fields(const std::string &key, const RuleStatCold &value) {
bipc::scoped_lock<ColdMutex> guard(llmtx);
try {
std::lock_guard<std::mutex> guard(llmtx);
// key_object = key.c_str();
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) {
p_msg_map->operator[](get_thread_local_key(key)).update_cold(value);
} else {
p_msg_map->operator[](get_thread_local_key(key)) = value;
}
return true;
} catch (const std::exception &e) {
return false;
}
}
/// cron 写入静态字段running_time, alarm_times, last_alarm_time, dev_coder 等)
bool update_static_fields(const std::string &key, const RuleStatCold &value) {
bipc::scoped_lock<ColdMutex> guard(llmtx);
try {
if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) {
p_msg_map->operator[](get_thread_local_key(key)).update_static(value);
return true;
}
p_msg_map->operator[](get_thread_local_key(key)) = value;
return true;
} catch (const std::exception &e) {
return false;
}
}
// ── cron 调用 ──
bool get_stat_value(const std::string &key, RuleStatCold &value) {
try {
bipc::scoped_lock<ColdMutex> guard(llmtx);
if (!p_msg_map->operator[](get_thread_local_key(key)).fetch_mark &&
p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() >
stat_size_min) {
@ -295,15 +203,34 @@ public:
}
}
/** find 共享内存数据中, vector<key> 存在的数据
* @brief
* @param ruleid My Param doc
* @return std::vector<std::string>
*/
// ── 管理操作 ──
bool delete_data(const std::string &key) {
bipc::scoped_lock<ColdMutex> guard(llmtx);
try {
if (p_msg_map->find(get_thread_local_delete_key(key)) !=
p_msg_map->end()) {
p_msg_map->erase(get_thread_local_delete_key(key));
}
return true;
} catch (const std::exception &e) {
return false;
}
}
size_t size() {
bipc::scoped_lock<ColdMutex> guard(llmtx);
return p_msg_map->size();
}
bool empty() {
bipc::scoped_lock<ColdMutex> guard(llmtx);
return p_msg_map->empty();
}
std::vector<std::string>
find_rule_id(const std::vector<std::string> &ruleid) {
std::lock_guard<std::mutex> guard(llmtx);
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
bipc::scoped_lock<ColdMutex> guard(llmtx);
std::vector<std::string> res;
if (p_msg_map->empty()) {
return {};
@ -317,15 +244,9 @@ public:
return res;
}
/** find 共享内存数据中 vector<key> 不存在的数据
* @brief
* @param ruleid My Param doc
* @return std::vector<std::string>
*/
std::vector<std::string>
find_no_rule_id(const std::vector<std::string> &ruleid) {
std::lock_guard<std::mutex> guard(llmtx);
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
bipc::scoped_lock<ColdMutex> guard(llmtx);
std::vector<std::string> res;
if (p_msg_map->empty()) {
return {};
@ -338,47 +259,6 @@ public:
}
return res;
}
/**
* @brief string
* @return std::string; "ERROR"
*/
std::string GetDataJson() {
// std::lock_guard<std::mutex> guard(local_mutext);
// try {
// mix_cc::json js1;
// for (auto iter = p_msg_map->begin(); iter != p_msg_map->end(); iter++)
// {
// js1[iter->first] = iter->second.invert2json();
// }
// return js1.dump();
// } catch (const std::exception &e) {
// return "ERROR";
// }
// 第一步:复制数据(加锁)
auto runs_t1 = std::chrono::steady_clock::now();
std::vector<std::pair<std::string, RuleStat>> snapshot;
{
std::lock_guard<std::mutex> guard(llmtx);
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
snapshot.reserve(p_msg_map->size());
for (const auto & [ key, value ] : *p_msg_map) {
snapshot.emplace_back(key.c_str(), value); // 复制RuleStat
}
} // 锁只在这几毫秒内持有!
auto runs_t2 = std::chrono::steady_clock::now();
int64_t cost_time = (runs_t2 - runs_t1).count() / 1000000;
// 第二步:序列化(不加锁)
mix_cc::json js1;
js1["cost_time"] = cost_time;
for (const auto & [ key, value ] : snapshot) {
js1[key] = value.invert2json(); // 耗时操作,但无锁
}
return js1.dump();
}
};
} // namespace RuleStatShm
} // namespace RuleStatShm