384 lines
12 KiB
C++
384 lines
12 KiB
C++
#pragma once
|
||
/**
|
||
* @file RuleStatShm.h
|
||
* @brief 共享内存 map,用于将后台规则数据提供给页面使用
|
||
* @author your name (you@domain.com)
|
||
* @version 0.1
|
||
* @date 2023-10-18
|
||
*
|
||
* Copyright: Baosight Co. Ltd.
|
||
* DO NOT COPY/USE WITHOUT PERMISSION
|
||
*
|
||
*/
|
||
// #include <boost/interprocess/sync/interprocess_mutex.hpp>
|
||
#include "shm_header.h"
|
||
#include <mutex>
|
||
#include <shared_mutex> // C++17
|
||
#include <thread>
|
||
namespace RuleStatShm {
|
||
|
||
using namespace ShmHeader;
|
||
|
||
namespace {
|
||
|
||
std::mutex llmtx{}; ///<共享锁
|
||
// 1. 改为读写锁
|
||
// std::shared_mutex local_rwlock;
|
||
|
||
const static std::string dir_path = "/users/dsc/shm";
|
||
const static std::string shm_file = "MapRuleStat"; ///<映射文件名
|
||
const double data_size = 1024; ///< 数据大小 MB
|
||
const size_t stat_size_min = 1000; ///<统计样本最小批处理量
|
||
const size_t stat_size_max = 10000; ///<统计样本最大储存量
|
||
|
||
static managed_mapped_file_t
|
||
obj_mapped_file(bipc::open_or_create,
|
||
(dir_path + "/" + shm_file + "_boost.mmap").c_str(),
|
||
mix_cc::data_size::MB(data_size));
|
||
|
||
static void_allocator
|
||
default_allocator(obj_mapped_file.get_segment_manager()); ///<默认分配器
|
||
static vec_allocator_s
|
||
items_allocator(obj_mapped_file.get_segment_manager()); ///< vector_s分配器
|
||
// static char_string key_object("", default_allocator);
|
||
// static char_string key_delete("", default_allocator);
|
||
// 2. 改为线程局部存储
|
||
thread_local static char_string tl_key_object("", default_allocator);
|
||
thread_local static char_string tl_key_delete("", default_allocator);
|
||
// 3. 辅助函数:安全设置线程局部key
|
||
char_string &get_thread_local_key(const std::string &key) {
|
||
tl_key_object = key.c_str();
|
||
return tl_key_object;
|
||
}
|
||
|
||
char_string &get_thread_local_delete_key(const std::string &key) {
|
||
tl_key_delete = key.c_str();
|
||
return tl_key_delete;
|
||
}
|
||
|
||
} // namespace
|
||
///<定义数据
|
||
struct RuleStat {
|
||
/** 动态数据 规则触发更新 **/
|
||
double alarm_value = 0; ///<最新报警值
|
||
double current_value = 0; ///<当前值
|
||
double limit_up = 0; ///<上限
|
||
double limit_down = 0; ///<下限
|
||
vector_s items; ///<数据项
|
||
vector_d stat_values; ///<统计值,用于cron
|
||
bool fetch_mark = false; ///<取数据标记
|
||
|
||
/** 静态数据 定时更新 **/
|
||
double running_time = 0; ///<累积的运行时间
|
||
int64_t shear_times = 0; ///<剪切次数
|
||
int64_t alarm_times = 0; ///<报警次数
|
||
bipc::string last_alarm_time = "无报警"; ///<上次报警时间
|
||
bipc::string dev_coder = "无"; ///<设备编码
|
||
bipc::string unit = ""; ///<数据单位(比如℃)
|
||
RuleStat()
|
||
: items(items_allocator), stat_values(default_allocator) {
|
||
} ///<共享内存的内存分配
|
||
/** 数据操作 **/
|
||
mix_cc::json invert2json() const {
|
||
mix_cc::json js1;
|
||
/** 动态数据 规则触发更新 **/
|
||
js1["alarm_value"] = alarm_value; ///<最新报警值
|
||
js1["current_value"] = limit_precision(current_value); ///<当前值
|
||
js1["limit_up"] = limit_precision(limit_up); ///<上限阈值
|
||
js1["limit_down"] = limit_precision(limit_down); ///<下限阈值
|
||
js1["items"] = items; ///< tag点
|
||
|
||
/** 静态数据 定时更新 **/
|
||
js1["running_time"] = running_time; ///<统计的运行时间
|
||
js1["shear_times"] = shear_times; ///<剪切次数
|
||
js1["alarm_times"] = alarm_times; ///<报警次数
|
||
js1["last_alarm_time"] = last_alarm_time; ///<上次报警时间
|
||
js1["dev_coder"] = dev_coder; ///<设备编码
|
||
js1["unit"] = unit; ///<数据单位
|
||
return js1;
|
||
}
|
||
/**
|
||
* @brief 浮点精度控制,默认2小数点后位
|
||
* @param data My Param doc
|
||
* @param precision My Param doc
|
||
* @return double
|
||
*/
|
||
double limit_precision(double data, int precision = 2) const {
|
||
double factor = std::pow(10, precision);
|
||
return std::round(data * factor) / factor;
|
||
}
|
||
/**
|
||
* @brief 动态数据更新
|
||
* @param value My Param doc
|
||
* @return true
|
||
* @return false
|
||
*/
|
||
bool update_dynamic(const RuleStat &value) {
|
||
try {
|
||
items = value.items;
|
||
alarm_value = value.alarm_value;
|
||
current_value = value.current_value;
|
||
limit_up = value.limit_up;
|
||
limit_down = value.limit_down;
|
||
unit = value.unit;
|
||
} catch (...) {
|
||
cout << "update_dynamic ERROR" << endl;
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
/**
|
||
* @brief 静态数据更新
|
||
* @param value My Param doc
|
||
* @return true
|
||
* @return false
|
||
*/
|
||
bool update_static(const RuleStat &value) {
|
||
try {
|
||
running_time = value.running_time;
|
||
shear_times = value.shear_times;
|
||
alarm_times = value.alarm_times;
|
||
last_alarm_time = value.last_alarm_time;
|
||
dev_coder = value.dev_coder;
|
||
} catch (const std::exception &e) {
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
};
|
||
|
||
///< key 是string的情况
|
||
typedef std::pair<const char_string, RuleStat> pair_s; ///< key-value
|
||
typedef bipc::node_allocator<pair_s, mapped_segment_manager_t>
|
||
allocator_s; ///<映射文件
|
||
typedef std::less<char_string> less_s;
|
||
typedef bipc::map<char_string, RuleStat, less_s, allocator_s> MapRuleStat_s;
|
||
typedef MapRuleStat_s::iterator map_iter_s;
|
||
|
||
///<定义数据操作
|
||
struct MapRuleStat {
|
||
private:
|
||
MapRuleStat_s *p_msg_map = obj_mapped_file.find_or_construct<MapRuleStat_s>(
|
||
shm_file.c_str())(less_s(), obj_mapped_file.get_segment_manager());
|
||
|
||
public:
|
||
/**
|
||
* @brief 更新 key对应的整个value
|
||
* @param key My Param doc
|
||
* @param value My Param doc
|
||
* @return true
|
||
* @return false
|
||
*/
|
||
bool update(const std::string &key, const RuleStat &value) {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
|
||
try {
|
||
// key_object = key.c_str();
|
||
p_msg_map->operator[](get_thread_local_key(key)) = value;
|
||
return true;
|
||
} catch (const std::exception &e) {
|
||
return false;
|
||
}
|
||
}
|
||
/**
|
||
* @brief 更新 key对应value的动态 或 静态数据
|
||
* @param key
|
||
* @param value
|
||
* @param is_dynamic 是否更新动态数据
|
||
* @return true
|
||
* @return false
|
||
*/
|
||
bool update_dynamic(const std::string &key, const RuleStat &value,
|
||
bool is_dynamic = true) {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
|
||
try {
|
||
// key_object = key.c_str();
|
||
if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) {
|
||
if (is_dynamic) {
|
||
p_msg_map->operator[](get_thread_local_key(key))
|
||
.update_dynamic(value);
|
||
return true;
|
||
} else {
|
||
p_msg_map->operator[](get_thread_local_key(key)).update_static(value);
|
||
return true;
|
||
}
|
||
}
|
||
p_msg_map->operator[](get_thread_local_key(key)) = value;
|
||
return true;
|
||
} catch (const std::exception &e) {
|
||
cout << e.what() << endl;
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @brief 删除 key-value
|
||
* @param key
|
||
* @return true
|
||
* @return false
|
||
*/
|
||
bool delete_data(const std::string &key) {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
|
||
try {
|
||
// key_delete = key.c_str();
|
||
if (p_msg_map->find(get_thread_local_delete_key(key)) !=
|
||
p_msg_map->end()) {
|
||
p_msg_map->erase(get_thread_local_delete_key(key));
|
||
}
|
||
return true;
|
||
} catch (const std::exception &e) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
size_t size() {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
|
||
return p_msg_map->size();
|
||
}
|
||
|
||
bool empty() {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
|
||
return p_msg_map->empty();
|
||
}
|
||
/**
|
||
* @brief 统计数据存储
|
||
* @param key My Param doc
|
||
* @param value My Param doc
|
||
* @return true
|
||
* @return false
|
||
*/
|
||
bool add_stat_value(const std::string &key, const double &value) {
|
||
try {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
|
||
// key_object = key.c_str();
|
||
if (p_msg_map->operator[](get_thread_local_key(key)).fetch_mark) {
|
||
p_msg_map->operator[](get_thread_local_key(key)).stat_values.clear();
|
||
p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = false;
|
||
}
|
||
if (p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() <
|
||
stat_size_max) {
|
||
p_msg_map->operator[](get_thread_local_key(key))
|
||
.stat_values.push_back(value);
|
||
}
|
||
return true;
|
||
} catch (const std::exception &e) {
|
||
return false;
|
||
}
|
||
}
|
||
/**
|
||
* @brief 获取统计数据
|
||
* @param key ruleid
|
||
* @param value 数据结果
|
||
* @return bool
|
||
*/
|
||
bool get_stat_value(const std::string &key, RuleStat &value) {
|
||
try {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// key_object = key.c_str();
|
||
// std::unique_lock<std::shared_mutex> lock(local_rwlock); // 写锁
|
||
if (!p_msg_map->operator[](get_thread_local_key(key)).fetch_mark &&
|
||
p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() >
|
||
stat_size_min) {
|
||
value.stat_values =
|
||
p_msg_map->operator[](get_thread_local_key(key)).stat_values;
|
||
p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = true;
|
||
return true;
|
||
}
|
||
return false;
|
||
} catch (const std::exception &e) {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/** find 共享内存数据中, vector<key> 存在的数据
|
||
* @brief
|
||
* @param ruleid My Param doc
|
||
* @return std::vector<std::string>
|
||
*/
|
||
std::vector<std::string>
|
||
find_rule_id(const std::vector<std::string> &ruleid) {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
|
||
std::vector<std::string> res;
|
||
if (p_msg_map->empty()) {
|
||
return {};
|
||
}
|
||
for (auto iter1 = p_msg_map->begin(); iter1 != p_msg_map->end(); iter1++) {
|
||
auto resV = std::find(ruleid.begin(), ruleid.end(), iter1->first);
|
||
if (resV != ruleid.end()) {
|
||
res.push_back(iter1->first);
|
||
}
|
||
}
|
||
return res;
|
||
}
|
||
|
||
/** find 共享内存数据中 vector<key> 不存在的数据
|
||
* @brief
|
||
* @param ruleid My Param doc
|
||
* @return std::vector<std::string>
|
||
*/
|
||
std::vector<std::string>
|
||
find_no_rule_id(const std::vector<std::string> &ruleid) {
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
|
||
std::vector<std::string> res;
|
||
if (p_msg_map->empty()) {
|
||
return {};
|
||
}
|
||
for (auto iter1 = p_msg_map->begin(); iter1 != p_msg_map->end(); iter1++) {
|
||
auto resV = std::find(ruleid.begin(), ruleid.end(), iter1->first);
|
||
if (resV == ruleid.end()) {
|
||
res.push_back(iter1->first);
|
||
}
|
||
}
|
||
return res;
|
||
}
|
||
|
||
/**
|
||
* @brief 将数据转换成 string
|
||
* @return std::string;失败为 "ERROR"
|
||
*/
|
||
std::string GetDataJson() {
|
||
// std::lock_guard<std::mutex> guard(local_mutext);
|
||
// try {
|
||
// mix_cc::json js1;
|
||
// for (auto iter = p_msg_map->begin(); iter != p_msg_map->end(); iter++)
|
||
// {
|
||
// js1[iter->first] = iter->second.invert2json();
|
||
// }
|
||
// return js1.dump();
|
||
// } catch (const std::exception &e) {
|
||
// return "ERROR";
|
||
// }
|
||
|
||
// 第一步:复制数据(加锁)
|
||
auto runs_t1 = std::chrono::steady_clock::now();
|
||
std::vector<std::pair<std::string, RuleStat>> snapshot;
|
||
{
|
||
|
||
std::lock_guard<std::mutex> guard(llmtx);
|
||
// std::shared_lock<std::shared_mutex> lock(local_rwlock); // 读锁
|
||
snapshot.reserve(p_msg_map->size());
|
||
for (const auto & [ key, value ] : *p_msg_map) {
|
||
snapshot.emplace_back(key.c_str(), value); // 复制RuleStat
|
||
}
|
||
|
||
} // 锁只在这几毫秒内持有!
|
||
auto runs_t2 = std::chrono::steady_clock::now();
|
||
int64_t cost_time = (runs_t2 - runs_t1).count() / 1000000;
|
||
// 第二步:序列化(不加锁)
|
||
mix_cc::json js1;
|
||
js1["cost_time"] = cost_time;
|
||
for (const auto & [ key, value ] : snapshot) {
|
||
js1[key] = value.invert2json(); // 耗时操作,但无锁
|
||
}
|
||
return js1.dump();
|
||
}
|
||
};
|
||
|
||
} // namespace RuleStatShm
|