Covers 9 commits: TaskRecord compile fixes, dual-layer ShmSpinLock, rm -rf cleanup, update_cold handshake fix, ExpTimes DB robustness and async persistence with AsyncDbWorker.
13 KiB
eqpalg 改动记录 — 2026-05-13
改动总览
| # | Commit | 内容 | 涉及文件 |
|---|---|---|---|
| 1 | 0a53973 |
修复 e21b2af 编译错误:TaskRecord 成员访问 |
eqpalg/algs/exp_base.cpp |
| 2 | df79a9a |
修复 e21b2af 编译错误:TaskRecord 缺少默认构造函数 |
shm/TaskData.h |
| 3 | b8596d3 |
移除启动时 rm -rf MapRuleStat_boost.mmap |
eqpalg/eqpalg.cpp |
| 4 | 9bb810d |
共享内存锁重设计:双层锁替代无效 ColdMutex | shm/RuleStatShm.h |
| 5 | 02447c7 |
替换 µ→us 消除 Gitea Unicode 告警 | shm/RuleStatShm.h |
| 6 | b3932b0 |
修复 update_cold 覆盖 SHM 握手数据的 bug | shm/RuleStatShm.h, eqpalg/utility/eqp_stat.cc |
| 7 | 6ed178b |
ExpTimes:保护 update_history_times 快照恢复不因 DB 故障丢失 | eqpalg/algs/exp_times.cc |
| 8 | f80a917 |
ExpTimes DB 持久化异步化 + EqpStat 冗余查询消除 | 6 文件(含 2 新增) |
| 9 | 11e5a6a |
include 风格统一为 <> |
eqpalg/utility/async_db_worker.cc |
1–2. e21b2af 编译修复
问题
e21b2af 将 TaskData.h 中的 Map<int, DataRecord> 改为 Map<int, TaskRecord>,其中 TaskRecord 内含 shm_vector_f data_record。两处遗漏:
- 成员访问:
operator[]返回TaskRecord&,原代码直接调用.size()/operator[]——需穿透到.data_record - 默认构造:
boost::container::map::operator[]在 key 不存在时需默认构造值,TaskRecord只有带 allocator 参数的构造函数
改动
exp_base.cpp 两处访问点:
// 修改前:data_record.size(), data_record[j] ← TaskRecord 没有这些成员
// 修改后:
auto &data_record = TaskShm::TaskRecordPtr.get()
->operator[](key).data_record; // 穿透到 shm_vector_f
for (size_t j = 0; j < data_record.size(); j++) { ... }
// 修改前:.push_back(res.value) ← TaskRecord 没有 push_back
// 修改后:.data_record.push_back(res.value)
TaskData.h:参照 RuleStatShm.h 中 RuleStatCold 的模式,添加静态 allocator + 默认构造函数:
static vec_allocator_f default_allocator(obj_mapped_file.get_segment_manager());
struct TaskRecord {
shm_vector_f data_record;
TaskRecord() : data_record(default_allocator) {} // ← 新增
TaskRecord(const void_allocator &alloc) : data_record(alloc) {}
};
3. 移除 rm -rf MapRuleStat
问题
eqpalg::start() 行 23-24 在每次启动时 rm -rf /users/dsc/shm/MapRuleStat_boost.mmap:
启动时序:
1. 静态初始化 → managed_mapped_file(open_or_create) 创建文件
2. start() → rm -rf 删除文件
3. 程序运行 → 文件不在磁盘上(映射仍有效)
4. 重启 → 文件不存在,open_or_create 重建 → 又被 rm -rf
该代码自 initial commit 即存在,与 e21b2af 移除的 TaskData_boost.mmap rm -rf 属同类遗留清理。
改动
删除 eqpalg.cpp:23-24 的 rm -rf。
4. 共享内存锁重设计
问题
RuleStatShm.h 原有两重缺陷:
缺陷一:锁不在共享内存中
namespace {
using ColdMutex = bipc::interprocess_mutex;
ColdMutex llmtx{}; // 匿名命名空间 → 进程本地 → mon/cron 各有一份
}
mon 和 cron 各自持有独立的 llmtx,锁 A 不影响锁 B——"进程间互斥"名不副实。mon 和 cron 对共享内存 map 的读写完全无同步。
缺陷二:即使修复缺陷一,崩溃后也死锁
boost::interprocess::interprocess_mutex 基于 PTHREAD_PROCESS_SHARED 但不含 PTHREAD_MUTEX_ROBUST。进程持锁崩溃后,互斥量保持 locked 状态。open_or_create 复用旧文件,重启后 scoped_lock 死锁。
改动
ShmSpinLock——基于 atomic<pid_t> 的自旋锁,在共享内存中分配:
lock():
循环:
CAS(owner, 0→getpid()) 成功 → 拿到锁
读 owner → kill(owner_pid, 0)
ESRCH → 持有者已死 → CAS 接管
0 → 持有者存活 → 100µs nanosleep 后退避
unlock():
CAS(owner, getpid()→0) // 只释放自己持有的
DualLock——进程内 std::mutex + 跨进程 ShmSpinLock 双层锁:
lock 顺序: std::mutex::lock() → ShmSpinLock::lock()
unlock 顺序: ShmSpinLock::unlock() → std::mutex::unlock()
| 场景 | std::mutex | ShmSpinLock |
|---|---|---|
| 同进程多线程 | pthread futex,不占 CPU | 同 PID,kill(0)认为存活 |
| 跨进程 | 管不到其他进程 | PID 不同,kill(0) 区分 |
| 崩溃恢复 | 随进程消失自动释放 | kill→ESRCH 后接管 |
共享内存构造:
static ShmSpinLock *shm_lock =
obj_mapped_file.find_or_construct<ShmSpinLock>("RuleStatLock")();
MapRuleStat 内集成:
struct MapRuleStat {
DualLock lock_{*shm_lock};
// 9 个方法中 scoped_lock<ColdMutex> 全部替换为 lock_guard<DualLock>
};
崩溃恢复验证:
mon 持锁崩溃:
mon: std::mutex ✓, ShmSpinLock owner=mon_pid
mon 崩溃 → std::mutex 随进程消失, ShmSpinLock.owner 残留
cron 加锁:
1. std::mutex → 无竞争者,拿到
2. ShmSpinLock → kill(mon_pid,0)=ESRCH → CAS 接管成功 ✓
5. Unicode 字符替换
Gitea 标记 µ(U+00B5 MICRO SIGN)为模棱两可字符(与 U+03BC GREEK SMALL LETTER MU 视觉相同)。改为工程惯用缩写 us。
6. update_cold 覆盖 SHM 握手数据
问题
AlgBase::exec_mon_call() 每个周期依次执行:
1. exec_mon() → mon_proc() → add_stat_value() // 向 SHM 累积数据
2. update_map_rule() → update_cold(rule_stat_) // 用本地值覆盖 SHM!
EqpStat::update_cold 将 mon 本地 rule_stat_ 的值搬到 SHM:
cold.stat_values.assign(rule_stat.stat_values.begin(), // mon 端永远为空!
rule_stat.stat_values.end()); // → 清空 SHM 累积数据
cold.fetch_mark = rule_stat.fetch_mark; // mon 端永远为 false
// → 重置 cron 的消费标记
mon 的 rule_stat_ 只维护热字段(alarm_value、current_value 等),stat_values 永远为空、fetch_mark 永远为 false。update_cold 把这些默认值写入 SHM,破坏 mon↔cron 握手协议。
改动
update_cold 不再复制 stat_values 和 fetch_mark——这两个字段由 add_stat_value/get_stat_value 握手独占管理。update_cold 只同步 running_time 和 shear_times。
RuleStatCold::update_cold:
bool update_cold(const RuleStatCold &value) {
// 仅同步 running_time / shear_times
running_time = value.running_time;
shear_times = value.shear_times;
}
// 删除了: stat_values = value.stat_values; fetch_mark = value.fetch_mark;
EqpStat::update_cold:
// 删除了: cold.stat_values.assign(...); cold.fetch_mark = rule_stat.fetch_mark;
cold.running_time = rule_stat.running_time;
cold.shear_times = rule_stat.shear_times;
修复后 SHM 字段的写入方:
| 字段 | 写入方 | 说明 |
|---|---|---|
| stat_values | add_stat_value | mon 累积,握手清空 |
| fetch_mark | get_stat_value | cron 取走后置 true |
| running_time | update_cold | mon/cron 均可写 |
| shear_times | update_cold | mon/cron 均可写 |
7. ExpTimes 持久化 DB 故障保护
问题
ExpTimes::update_history_times() 存在两个 DB 故障场景的缺陷:
场景一:get_history_times() 返回 -1(DB 查询失败)
原代码:
if (get_history_times() == -2) {
insert_history_times(...); // 首次 → INSERT
} else {
exec(update(...)); // -1 也走这里 → 尝试 UPDATE 不存在的行
}
get_history_times() 返回 -1 时已将 rule_stat_.shear_times = 0(副作用),然后走 UPDATE 分支 → 受影响行 0 → 数据静默丢失。
场景二:DB 操作抛异常
原代码的快照恢复在 try-catch 之外,异常被外层 handler_exec.cc 吞掉后:
① now_times = rule_stat_.shear_times; // 快照 = 47
② get_history_times() // SELECT → rule_stat_.shear_times = 42
③ exec(update(...)) // 异常!
④ rule_stat_.shear_times = now_times; // ← 被跳过!值停在 42
5 次计数永久丢失,且不产生任何告警。
改动
用 ret 变量替代提前 return,快照恢复保证执行:
int ret = -1;
try {
int hist_ret = get_history_times();
if (hist_ret == -1) {
// DB 故障 → 跳过,ret 保持 -1(不写 DB)
} else if (hist_ret == -2) {
insert_history_times(now_times, now_used_time);
ret = 0;
} else {
exec(update(...));
ret = 0;
}
if (ret == 0) {
update_static(rule_id, true); // 仅在成功时更新 SHM
}
} catch (...) {
// ret 保持 -1
}
this->rule_stat_.shear_times = now_times; // ← 所有路径都执行
this->rule_stat_.running_time = now_used_time;
return ret;
mon_proc() 仅在成功时推进时间戳:
if (update_history_times() == 0) {
last_load_time_ = now_time_; // 失败不推进 → 下周期重试
}
8. ExpTimes DB 持久化异步化
问题
ExpTimes::mon_proc() 每 10 分钟同步执行 7 次 DB2 操作(1 SELECT + 1 INSERT/UPDATE + 5 SELECT),阻塞 mon 20ms 周期几十到几百毫秒。DB2 慢或断连时,该线程所有规则的实时评估停摆,应在此期间触发的报警全部丢失。
对 algs/ 下 12 个算法文件的审计显示,仅 exp_times.cc、exp_base.cpp、exp_bound.cpp 在 mon 热路径上有 DB I/O,其余 9 个安全。
改动
新增 AsyncDbWorker(eqpalg/utility/async_db_worker.h/.cc):
全局单例后台线程,mon 线程投递 DB 任务后立即返回:
mon 线程 (20ms): worker 线程:
snap = {shear_times, running_time}
submit(rule_id, lambda) ───→ SELECT → INSERT/UPDATE
return 0 立即 update_static(直接传值,跳过 2 读)
继续累积 rule_stat_.shear_times++
去重机制:同一 rule_id 重复 submit 时覆盖旧任务(只保留最新快照)。worker 处理慢不影响 mon。
新增 EqpStat::update_static(ruleid, shear_times, running_time) 重载:
update_history_times 刚将值写入 DB 后,select_running_by_ruleid + select_times_by_ruleid 又读回——单写者场景下完全冗余。新重载接收已知值,DB 操作从 5 SELECT 降到 3 SELECT。
exp_times.cc 重构:
提取 persist_exp_times() 为独立函数(可在任意线程调用,不依赖 ExpTimes 实例):
int persist_exp_times(rule_id, exp_type, now_times, now_used_time) {
SELECT Flag, Count, X1 WHERE RuleId = rule_id
→ 有结果: UPDATE; 无结果: INSERT
}
update_history_times() 简化为快照 + 投递:
auto now_times = this->rule_stat_.shear_times; // 快照
AsyncDbWorker::instance().submit(rule_id, [=]() { // 投递,立即返回
if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) {
EqpStat::instance().update_static(rule_id, now_times, now_used_time);
}
});
reset_dev_data() 分离 SHM 更新(同步)和 DB 持久化(异步)。
退出安全(eqpalg_icei.cpp):
~EqpAlgICEI() {
is_running_ = false;
alg_mgr_.reset(); // 先停所有算法线程
AsyncDbWorker::instance().drain_and_stop(); // 再排空 worker
}
并发安全性
- mon 线程和 worker 线程之间通过
std::function按值捕获传递快照副本——与rule_stat_完全独立,无共享可变数据 - worker 单线程串行处理,无 DB 写竞争
- 退出序列保证:先停止任务投递源(算法线程),再排空消费者(worker)
- 同 rule_id 去重防止积压
涉及文件汇总
本次会话全部改动(9 commits):
shm/TaskData.h 添加 TaskRecord 默认构造函数
shm/RuleStatShm.h 双层锁 + update_cold 修复 + µ→us
eqpalg/algs/exp_base.cpp 修复 data_record 成员访问
eqpalg/algs/exp_times.cc 快照保护 + 异步化
eqpalg/eqpalg.cpp 移除 rm -rf MapRuleStat_boost.mmap
eqpalg/eqpalg_icei.cpp 退出时 drain_and_stop
eqpalg/utility/eqp_stat.h 新增 update_static 重载
eqpalg/utility/eqp_stat.cc update_cold 修复 + update_static 重载实现
eqpalg/utility/async_db_worker.h 新增——异步 DB worker 单例(头文件)
eqpalg/utility/async_db_worker.cc 新增——异步 DB worker 单例(实现)
共 10 个文件(8 修改 + 2 新增)。
部署注意事项
- 重新 CMake 配置:新增
async_db_worker.cc需要cmake .. && make -j$(nproc)重新构建 - 数据结构更新兼容:
ShmSpinLock在共享内存中是新增对象(find_or_construct<ShmSpinLock>("RuleStatLock")),首次启动时自动创建,旧进程不访问该段 std::atomic<pid_t>在目标环境(GCC 10+ / C++20)确认可用::kill(pid, 0)和ESRCH依赖 POSIX,CentOS 7 下可用