eis/shm/RuleStatShm.h
Huamonarch 9bb810d9ed Replace broken ColdMutex with dual-layer ShmSpinLock + std::mutex
The old ColdMutex (interprocess_mutex) was in process-local anonymous
namespace storage, so each process had its own copy — no actual
cross-process exclusion. Even if moved to SHM, interprocess_mutex is
not robust: a crash while holding the lock would deadlock on restart.

New design:
- ShmSpinLock: atomic<pid_t> in shared memory, kill(pid,0) detects
  dead owners (ESRCH → takeover), crash-safe by construction
- std::mutex: process-local, handles intra-process thread contention
  without burning CPU on the SHM spinlock
- DualLock: locks local first, then shm; unlocks in reverse

9 lock sites in MapRuleStat upgraded to std::lock_guard<DualLock>.
2026-05-13 10:21:55 +08:00

326 lines
9.7 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
/**
* @file RuleStatShm.h
* @brief 共享内存 map用于 mon↔cron 进程间冷数据交换
*
* 热数据display改走本地缓存 → Memcached不经过共享内存。
* 冷数据stat_values/running_time/shear_times 等)保留在共享内存中,
* 使用 ShmSpinLockkill(pid,0) 崩溃接管)+ std::mutex进程内双层锁。
*
* @author your name (you@domain.com)
* @version 0.2
* @date 2023-10-18
*
* Copyright: Baosight Co. Ltd.
* DO NOT COPY/USE WITHOUT PERMISSION
*/
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <atomic>
#include <cerrno>
#include <csignal>
#include <ctime>
#include <mutex>
#include <unistd.h>
#include "shm_header.h"
namespace RuleStatShm {
using namespace ShmHeader;
/// 跨进程自旋锁——崩溃安全kill(pid,0) 检测持有者存活ESRCH 则接管)
struct ShmSpinLock {
std::atomic<pid_t> owner{0};
ShmSpinLock() = default;
ShmSpinLock(const ShmSpinLock &) = delete;
ShmSpinLock &operator=(const ShmSpinLock &) = delete;
void lock() {
pid_t my_pid = ::getpid();
while (true) {
pid_t expected = 0;
if (owner.compare_exchange_strong(expected, my_pid,
std::memory_order_acquire)) {
return;
}
// 持有者进程已死 → 接管
if (::kill(expected, 0) != 0 && errno == ESRCH) {
if (owner.compare_exchange_strong(expected, my_pid,
std::memory_order_acquire)) {
return;
}
continue;
}
// 持有者存活,短暂退避
struct timespec ts = {0, 100000}; // 100µs
::nanosleep(&ts, nullptr);
}
}
void unlock() {
pid_t my_pid = ::getpid();
pid_t expected = my_pid;
owner.compare_exchange_strong(expected, 0, std::memory_order_release);
}
};
/// 双层锁:进程内 std::mutex + 跨进程 ShmSpinLock
struct DualLock {
std::mutex local;
ShmSpinLock &shm;
DualLock(ShmSpinLock &s) : shm(s) {}
void lock() {
local.lock();
shm.lock();
}
void unlock() {
shm.unlock();
local.unlock();
}
};
namespace {
const static std::string dir_path = "/users/dsc/shm";
const static std::string shm_file = "MapRuleStat"; ///<映射文件名
const double data_size = 1024; ///< 数据大小 MB
const size_t stat_size_min = 1000; ///<统计样本最小批处理量
const size_t stat_size_max = 10000; ///<统计样本最大储存量
static managed_mapped_file_t
obj_mapped_file(bipc::open_or_create,
(dir_path + "/" + shm_file + "_boost.mmap").c_str(),
mix_cc::data_size::MB(data_size));
static void_allocator
default_allocator(obj_mapped_file.get_segment_manager()); ///<默认分配器
static vec_allocator_s
items_allocator(obj_mapped_file.get_segment_manager()); ///< vector_s分配器
static ShmSpinLock *shm_lock =
obj_mapped_file.find_or_construct<ShmSpinLock>("RuleStatLock")();
thread_local static char_string tl_key_object("", default_allocator);
thread_local static char_string tl_key_delete("", default_allocator);
char_string &get_thread_local_key(const std::string &key) {
tl_key_object = key.c_str();
return tl_key_object;
}
char_string &get_thread_local_delete_key(const std::string &key) {
tl_key_delete = key.c_str();
return tl_key_delete;
}
} // namespace
/// 本地完整数据AlgBase 使用,标准分配器)
struct RuleStatLocal {
double alarm_value = 0;
double current_value = 0;
double limit_up = 0;
double limit_down = 0;
std::vector<std::string> items;
std::vector<double> stat_values;
bool fetch_mark = false;
double running_time = 0;
int64_t shear_times = 0;
int64_t alarm_times = 0;
std::string last_alarm_time = "无报警";
std::string dev_coder = "";
std::string unit;
};
/// 共享内存冷数据mon↔cron 交换用)
struct RuleStatCold {
vector_d stat_values; ///<统计值mon攒样本cron消费
bool fetch_mark = false; ///<取数据标记
double running_time = 0; ///<累积的运行时间
int64_t shear_times = 0; ///<剪切次数
int64_t alarm_times = 0; ///<报警次数
bipc::string last_alarm_time = "无报警"; ///<上次报警时间
bipc::string dev_coder = ""; ///<设备编码
RuleStatCold()
: stat_values(default_allocator) {}
double limit_precision(double data, int precision = 2) const {
double factor = std::pow(10, precision);
return std::round(data * factor) / factor;
}
bool update_static(const RuleStatCold &value) {
try {
running_time = value.running_time;
shear_times = value.shear_times;
alarm_times = value.alarm_times;
last_alarm_time = value.last_alarm_time;
dev_coder = value.dev_coder;
} catch (const std::exception &e) {
return false;
}
return true;
}
bool update_cold(const RuleStatCold &value) {
try {
stat_values = value.stat_values;
fetch_mark = value.fetch_mark;
running_time = value.running_time;
shear_times = value.shear_times;
} catch (...) {
return false;
}
return true;
}
};
///< key 是string的情况
typedef std::pair<const char_string, RuleStatCold> pair_s;
typedef bipc::node_allocator<pair_s, mapped_segment_manager_t>
allocator_s;
typedef std::less<char_string> less_s;
typedef bipc::map<char_string, RuleStatCold, less_s, allocator_s> MapRuleStat_s;
typedef MapRuleStat_s::iterator map_iter_s;
/// 共享内存 map 操作(仅冷数据)
struct MapRuleStat {
private:
MapRuleStat_s *p_msg_map = obj_mapped_file.find_or_construct<MapRuleStat_s>(
shm_file.c_str())(less_s(), obj_mapped_file.get_segment_manager());
DualLock lock_{*shm_lock};
public:
// ── mon 高频调用 ──
bool add_stat_value(const std::string &key, const double &value) {
try {
std::lock_guard<DualLock> guard(lock_);
if (p_msg_map->operator[](get_thread_local_key(key)).fetch_mark) {
p_msg_map->operator[](get_thread_local_key(key)).stat_values.clear();
p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = false;
}
if (p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() <
stat_size_max) {
p_msg_map->operator[](get_thread_local_key(key))
.stat_values.push_back(value);
}
return true;
} catch (const std::exception &e) {
return false;
}
}
/// mon 写入累积的冷字段stat_values, running_time, shear_times
bool update_cold_fields(const std::string &key, const RuleStatCold &value) {
std::lock_guard<DualLock> guard(lock_);
try {
if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) {
p_msg_map->operator[](get_thread_local_key(key)).update_cold(value);
} else {
p_msg_map->operator[](get_thread_local_key(key)) = value;
}
return true;
} catch (const std::exception &e) {
return false;
}
}
/// cron 写入静态字段running_time, alarm_times, last_alarm_time, dev_coder 等)
bool update_static_fields(const std::string &key, const RuleStatCold &value) {
std::lock_guard<DualLock> guard(lock_);
try {
if (p_msg_map->find(get_thread_local_key(key)) != p_msg_map->end()) {
p_msg_map->operator[](get_thread_local_key(key)).update_static(value);
return true;
}
p_msg_map->operator[](get_thread_local_key(key)) = value;
return true;
} catch (const std::exception &e) {
return false;
}
}
// ── cron 调用 ──
bool get_stat_value(const std::string &key, RuleStatCold &value) {
try {
std::lock_guard<DualLock> guard(lock_);
if (!p_msg_map->operator[](get_thread_local_key(key)).fetch_mark &&
p_msg_map->operator[](get_thread_local_key(key)).stat_values.size() >
stat_size_min) {
value.stat_values =
p_msg_map->operator[](get_thread_local_key(key)).stat_values;
p_msg_map->operator[](get_thread_local_key(key)).fetch_mark = true;
return true;
}
return false;
} catch (const std::exception &e) {
return false;
}
}
// ── 管理操作 ──
bool delete_data(const std::string &key) {
std::lock_guard<DualLock> guard(lock_);
try {
if (p_msg_map->find(get_thread_local_delete_key(key)) !=
p_msg_map->end()) {
p_msg_map->erase(get_thread_local_delete_key(key));
}
return true;
} catch (const std::exception &e) {
return false;
}
}
size_t size() {
std::lock_guard<DualLock> guard(lock_);
return p_msg_map->size();
}
bool empty() {
std::lock_guard<DualLock> guard(lock_);
return p_msg_map->empty();
}
std::vector<std::string>
find_rule_id(const std::vector<std::string> &ruleid) {
std::lock_guard<DualLock> guard(lock_);
std::vector<std::string> res;
if (p_msg_map->empty()) {
return {};
}
for (auto iter1 = p_msg_map->begin(); iter1 != p_msg_map->end(); iter1++) {
auto resV = std::find(ruleid.begin(), ruleid.end(), iter1->first);
if (resV != ruleid.end()) {
res.push_back(iter1->first);
}
}
return res;
}
std::vector<std::string>
find_no_rule_id(const std::vector<std::string> &ruleid) {
std::lock_guard<DualLock> guard(lock_);
std::vector<std::string> res;
if (p_msg_map->empty()) {
return {};
}
for (auto iter1 = p_msg_map->begin(); iter1 != p_msg_map->end(); iter1++) {
auto resV = std::find(ruleid.begin(), ruleid.end(), iter1->first);
if (resV == ruleid.end()) {
res.push_back(iter1->first);
}
}
return res;
}
};
} // namespace RuleStatShm