Add change documentation for 2026-05-13 eqpalg fixes and optimizations

Covers 9 commits: TaskRecord compile fixes, dual-layer ShmSpinLock,
rm -rf cleanup, update_cold handshake fix, ExpTimes DB robustness
and async persistence with AsyncDbWorker.
This commit is contained in:
Huamonarch 2026-05-13 17:10:09 +08:00
parent 672412e5a8
commit b99cd0a73c

View File

@ -0,0 +1,377 @@
# eqpalg 改动记录 — 2026-05-13
## 改动总览
| # | Commit | 内容 | 涉及文件 |
|---|--------|------|----------|
| 1 | `0a53973` | 修复 e21b2af 编译错误TaskRecord 成员访问 | `eqpalg/algs/exp_base.cpp` |
| 2 | `df79a9a` | 修复 e21b2af 编译错误TaskRecord 缺少默认构造函数 | `shm/TaskData.h` |
| 3 | `b8596d3` | 移除启动时 `rm -rf MapRuleStat_boost.mmap` | `eqpalg/eqpalg.cpp` |
| 4 | `9bb810d` | 共享内存锁重设计:双层锁替代无效 ColdMutex | `shm/RuleStatShm.h` |
| 5 | `02447c7` | 替换 µ→us 消除 Gitea Unicode 告警 | `shm/RuleStatShm.h` |
| 6 | `b3932b0` | 修复 update_cold 覆盖 SHM 握手数据的 bug | `shm/RuleStatShm.h`, `eqpalg/utility/eqp_stat.cc` |
| 7 | `6ed178b` | ExpTimes保护 update_history_times 快照恢复不因 DB 故障丢失 | `eqpalg/algs/exp_times.cc` |
| 8 | `f80a917` | ExpTimes DB 持久化异步化 + EqpStat 冗余查询消除 | 6 文件(含 2 新增) |
| 9 | `11e5a6a` | include 风格统一为 `<>` | `eqpalg/utility/async_db_worker.cc` |
---
## 12. e21b2af 编译修复
### 问题
e21b2af 将 `TaskData.h` 中的 `Map<int, DataRecord>` 改为 `Map<int, TaskRecord>`,其中 `TaskRecord` 内含 `shm_vector_f data_record`。两处遗漏:
- **成员访问**`operator[]` 返回 `TaskRecord&`,原代码直接调用 `.size()`/`operator[]`——需穿透到 `.data_record`
- **默认构造**`boost::container::map::operator[]` 在 key 不存在时需默认构造值,`TaskRecord` 只有带 allocator 参数的构造函数
### 改动
**exp_base.cpp** 两处访问点:
```cpp
// 修改前data_record.size(), data_record[j] ← TaskRecord 没有这些成员
// 修改后:
auto &data_record = TaskShm::TaskRecordPtr.get()
->operator[](key).data_record; // 穿透到 shm_vector_f
for (size_t j = 0; j < data_record.size(); j++) { ... }
```
```cpp
// 修改前:.push_back(res.value) ← TaskRecord 没有 push_back
// 修改后:.data_record.push_back(res.value)
```
**TaskData.h**:参照 `RuleStatShm.h``RuleStatCold` 的模式,添加静态 allocator + 默认构造函数:
```cpp
static vec_allocator_f default_allocator(obj_mapped_file.get_segment_manager());
struct TaskRecord {
shm_vector_f data_record;
TaskRecord() : data_record(default_allocator) {} // ← 新增
TaskRecord(const void_allocator &alloc) : data_record(alloc) {}
};
```
---
## 3. 移除 rm -rf MapRuleStat
### 问题
`eqpalg::start()` 行 23-24 在每次启动时 `rm -rf /users/dsc/shm/MapRuleStat_boost.mmap`
```
启动时序:
1. 静态初始化 → managed_mapped_file(open_or_create) 创建文件
2. start() → rm -rf 删除文件
3. 程序运行 → 文件不在磁盘上(映射仍有效)
4. 重启 → 文件不存在open_or_create 重建 → 又被 rm -rf
```
该代码自 initial commit 即存在,与 e21b2af 移除的 `TaskData_boost.mmap` rm -rf 属同类遗留清理。
### 改动
删除 `eqpalg.cpp:23-24``rm -rf`
---
## 4. 共享内存锁重设计
### 问题
`RuleStatShm.h` 原有两重缺陷:
**缺陷一:锁不在共享内存中**
```cpp
namespace {
using ColdMutex = bipc::interprocess_mutex;
ColdMutex llmtx{}; // 匿名命名空间 → 进程本地 → mon/cron 各有一份
}
```
mon 和 cron 各自持有独立的 `llmtx`,锁 A 不影响锁 B——"进程间互斥"名不副实。mon 和 cron 对共享内存 map 的读写完全无同步。
**缺陷二:即使修复缺陷一,崩溃后也死锁**
`boost::interprocess::interprocess_mutex` 基于 `PTHREAD_PROCESS_SHARED` 但不含 `PTHREAD_MUTEX_ROBUST`。进程持锁崩溃后,互斥量保持 locked 状态。`open_or_create` 复用旧文件,重启后 `scoped_lock` 死锁。
### 改动
**`ShmSpinLock`**——基于 `atomic<pid_t>` 的自旋锁,在共享内存中分配:
```
lock():
循环:
CAS(owner, 0→getpid()) 成功 → 拿到锁
读 owner → kill(owner_pid, 0)
ESRCH → 持有者已死 → CAS 接管
0 → 持有者存活 → 100µs nanosleep 后退避
unlock():
CAS(owner, getpid()→0) // 只释放自己持有的
```
**`DualLock`**——进程内 `std::mutex` + 跨进程 `ShmSpinLock` 双层锁:
```
lock 顺序: std::mutex::lock() → ShmSpinLock::lock()
unlock 顺序: ShmSpinLock::unlock() → std::mutex::unlock()
```
| 场景 | std::mutex | ShmSpinLock |
|------|-----------|-------------|
| 同进程多线程 | pthread futex不占 CPU | 同 PIDkill(0)认为存活 |
| 跨进程 | 管不到其他进程 | PID 不同kill(0) 区分 |
| 崩溃恢复 | 随进程消失自动释放 | kill→ESRCH 后接管 |
**共享内存构造**
```cpp
static ShmSpinLock *shm_lock =
obj_mapped_file.find_or_construct<ShmSpinLock>("RuleStatLock")();
```
**MapRuleStat** 内集成:
```cpp
struct MapRuleStat {
DualLock lock_{*shm_lock};
// 9 个方法中 scoped_lock<ColdMutex> 全部替换为 lock_guard<DualLock>
};
```
崩溃恢复验证:
```
mon 持锁崩溃:
mon: std::mutex ✓, ShmSpinLock owner=mon_pid
mon 崩溃 → std::mutex 随进程消失, ShmSpinLock.owner 残留
cron 加锁:
1. std::mutex → 无竞争者,拿到
2. ShmSpinLock → kill(mon_pid,0)=ESRCH → CAS 接管成功 ✓
```
---
## 5. Unicode 字符替换
Gitea 标记 `µ`U+00B5 MICRO SIGN为模棱两可字符与 U+03BC GREEK SMALL LETTER MU 视觉相同)。改为工程惯用缩写 `us`
---
## 6. update_cold 覆盖 SHM 握手数据
### 问题
`AlgBase::exec_mon_call()` 每个周期依次执行:
```
1. exec_mon() → mon_proc() → add_stat_value() // 向 SHM 累积数据
2. update_map_rule() → update_cold(rule_stat_) // 用本地值覆盖 SHM
```
`EqpStat::update_cold` 将 mon 本地 `rule_stat_` 的值搬到 SHM
```cpp
cold.stat_values.assign(rule_stat.stat_values.begin(), // mon 端永远为空!
rule_stat.stat_values.end()); // → 清空 SHM 累积数据
cold.fetch_mark = rule_stat.fetch_mark; // mon 端永远为 false
// → 重置 cron 的消费标记
```
mon 的 `rule_stat_` 只维护热字段alarm_value、current_value 等),`stat_values` 永远为空、`fetch_mark` 永远为 false。`update_cold` 把这些默认值写入 SHM破坏 mon↔cron 握手协议。
### 改动
`update_cold` 不再复制 `stat_values``fetch_mark`——这两个字段由 `add_stat_value`/`get_stat_value` 握手独占管理。`update_cold` 只同步 `running_time``shear_times`
**RuleStatCold::update_cold**
```cpp
bool update_cold(const RuleStatCold &value) {
// 仅同步 running_time / shear_times
running_time = value.running_time;
shear_times = value.shear_times;
}
// 删除了: stat_values = value.stat_values; fetch_mark = value.fetch_mark;
```
**EqpStat::update_cold**
```cpp
// 删除了: cold.stat_values.assign(...); cold.fetch_mark = rule_stat.fetch_mark;
cold.running_time = rule_stat.running_time;
cold.shear_times = rule_stat.shear_times;
```
修复后 SHM 字段的写入方:
| 字段 | 写入方 | 说明 |
|------|--------|------|
| stat_values | add_stat_value | mon 累积,握手清空 |
| fetch_mark | get_stat_value | cron 取走后置 true |
| running_time | update_cold | mon/cron 均可写 |
| shear_times | update_cold | mon/cron 均可写 |
---
## 7. ExpTimes 持久化 DB 故障保护
### 问题
`ExpTimes::update_history_times()` 存在两个 DB 故障场景的缺陷:
**场景一:`get_history_times()` 返回 -1DB 查询失败)**
原代码:
```cpp
if (get_history_times() == -2) {
insert_history_times(...); // 首次 → INSERT
} else {
exec(update(...)); // -1 也走这里 → 尝试 UPDATE 不存在的行
}
```
`get_history_times()` 返回 -1 时已将 `rule_stat_.shear_times = 0`(副作用),然后走 UPDATE 分支 → 受影响行 0 → 数据静默丢失。
**场景二DB 操作抛异常**
原代码的快照恢复在 try-catch 之外,异常被外层 `handler_exec.cc` 吞掉后:
```
① now_times = rule_stat_.shear_times; // 快照 = 47
② get_history_times() // SELECT → rule_stat_.shear_times = 42
③ exec(update(...)) // 异常!
④ rule_stat_.shear_times = now_times; // ← 被跳过!值停在 42
```
5 次计数永久丢失,且不产生任何告警。
### 改动
`ret` 变量替代提前 return快照恢复保证执行
```cpp
int ret = -1;
try {
int hist_ret = get_history_times();
if (hist_ret == -1) {
// DB 故障 → 跳过ret 保持 -1不写 DB
} else if (hist_ret == -2) {
insert_history_times(now_times, now_used_time);
ret = 0;
} else {
exec(update(...));
ret = 0;
}
if (ret == 0) {
update_static(rule_id, true); // 仅在成功时更新 SHM
}
} catch (...) {
// ret 保持 -1
}
this->rule_stat_.shear_times = now_times; // ← 所有路径都执行
this->rule_stat_.running_time = now_used_time;
return ret;
```
`mon_proc()` 仅在成功时推进时间戳:
```cpp
if (update_history_times() == 0) {
last_load_time_ = now_time_; // 失败不推进 → 下周期重试
}
```
---
## 8. ExpTimes DB 持久化异步化
### 问题
`ExpTimes::mon_proc()` 每 10 分钟同步执行 7 次 DB2 操作1 SELECT + 1 INSERT/UPDATE + 5 SELECT阻塞 mon 20ms 周期几十到几百毫秒。DB2 慢或断连时,该线程所有规则的实时评估停摆,应在此期间触发的报警全部丢失。
对 algs/ 下 12 个算法文件的审计显示,仅 `exp_times.cc`、`exp_base.cpp`、`exp_bound.cpp` 在 mon 热路径上有 DB I/O其余 9 个安全。
### 改动
**新增 `AsyncDbWorker`**`eqpalg/utility/async_db_worker.h/.cc`
全局单例后台线程mon 线程投递 DB 任务后立即返回:
```
mon 线程 (20ms): worker 线程:
snap = {shear_times, running_time}
submit(rule_id, lambda) ───→ SELECT → INSERT/UPDATE
return 0 立即 update_static直接传值跳过 2 读)
继续累积 rule_stat_.shear_times++
```
去重机制:同一 `rule_id` 重复 submit 时覆盖旧任务只保留最新快照。worker 处理慢不影响 mon。
**新增 `EqpStat::update_static(ruleid, shear_times, running_time)` 重载**
`update_history_times` 刚将值写入 DB 后,`select_running_by_ruleid` + `select_times_by_ruleid` 又读回——单写者场景下完全冗余。新重载接收已知值DB 操作从 5 SELECT 降到 3 SELECT。
**`exp_times.cc` 重构**
提取 `persist_exp_times()` 为独立函数(可在任意线程调用,不依赖 `ExpTimes` 实例):
```cpp
int persist_exp_times(rule_id, exp_type, now_times, now_used_time) {
SELECT Flag, Count, X1 WHERE RuleId = rule_id
→ 有结果: UPDATE; 无结果: INSERT
}
```
`update_history_times()` 简化为快照 + 投递:
```cpp
auto now_times = this->rule_stat_.shear_times; // 快照
AsyncDbWorker::instance().submit(rule_id, [=]() { // 投递,立即返回
if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) {
EqpStat::instance().update_static(rule_id, now_times, now_used_time);
}
});
```
`reset_dev_data()` 分离 SHM 更新(同步)和 DB 持久化(异步)。
**退出安全**`eqpalg_icei.cpp`
```cpp
~EqpAlgICEI() {
is_running_ = false;
alg_mgr_.reset(); // 先停所有算法线程
AsyncDbWorker::instance().drain_and_stop(); // 再排空 worker
}
```
### 并发安全性
- mon 线程和 worker 线程之间通过 `std::function` 按值捕获传递快照副本——与 `rule_stat_` 完全独立,无共享可变数据
- worker 单线程串行处理,无 DB 写竞争
- 退出序列保证先停止任务投递源算法线程再排空消费者worker
- 同 rule_id 去重防止积压
---
## 涉及文件汇总
```
本次会话全部改动9 commits:
shm/TaskData.h 添加 TaskRecord 默认构造函数
shm/RuleStatShm.h 双层锁 + update_cold 修复 + µ→us
eqpalg/algs/exp_base.cpp 修复 data_record 成员访问
eqpalg/algs/exp_times.cc 快照保护 + 异步化
eqpalg/eqpalg.cpp 移除 rm -rf MapRuleStat_boost.mmap
eqpalg/eqpalg_icei.cpp 退出时 drain_and_stop
eqpalg/utility/eqp_stat.h 新增 update_static 重载
eqpalg/utility/eqp_stat.cc update_cold 修复 + update_static 重载实现
eqpalg/utility/async_db_worker.h 新增——异步 DB worker 单例(头文件)
eqpalg/utility/async_db_worker.cc 新增——异步 DB worker 单例(实现)
```
**10 个文件**8 修改 + 2 新增)。
---
## 部署注意事项
1. **重新 CMake 配置**:新增 `async_db_worker.cc` 需要 `cmake .. && make -j$(nproc)` 重新构建
2. **数据结构更新兼容**`ShmSpinLock` 在共享内存中是新增对象(`find_or_construct<ShmSpinLock>("RuleStatLock")`),首次启动时自动创建,旧进程不访问该段
3. `std::atomic<pid_t>` 在目标环境GCC 10+ / C++20确认可用
4. `::kill(pid, 0)``ESRCH` 依赖 POSIXCentOS 7 下可用