eis/eqpalg/eqpalg_changes_2026-05-13.md
Huamonarch b99cd0a73c Add change documentation for 2026-05-13 eqpalg fixes and optimizations
Covers 9 commits: TaskRecord compile fixes, dual-layer ShmSpinLock,
rm -rf cleanup, update_cold handshake fix, ExpTimes DB robustness
and async persistence with AsyncDbWorker.
2026-05-13 17:10:09 +08:00

378 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# eqpalg 改动记录 — 2026-05-13
## 改动总览
| # | Commit | 内容 | 涉及文件 |
|---|--------|------|----------|
| 1 | `0a53973` | 修复 e21b2af 编译错误TaskRecord 成员访问 | `eqpalg/algs/exp_base.cpp` |
| 2 | `df79a9a` | 修复 e21b2af 编译错误TaskRecord 缺少默认构造函数 | `shm/TaskData.h` |
| 3 | `b8596d3` | 移除启动时 `rm -rf MapRuleStat_boost.mmap` | `eqpalg/eqpalg.cpp` |
| 4 | `9bb810d` | 共享内存锁重设计:双层锁替代无效 ColdMutex | `shm/RuleStatShm.h` |
| 5 | `02447c7` | 替换 µ→us 消除 Gitea Unicode 告警 | `shm/RuleStatShm.h` |
| 6 | `b3932b0` | 修复 update_cold 覆盖 SHM 握手数据的 bug | `shm/RuleStatShm.h`, `eqpalg/utility/eqp_stat.cc` |
| 7 | `6ed178b` | ExpTimes保护 update_history_times 快照恢复不因 DB 故障丢失 | `eqpalg/algs/exp_times.cc` |
| 8 | `f80a917` | ExpTimes DB 持久化异步化 + EqpStat 冗余查询消除 | 6 文件(含 2 新增) |
| 9 | `11e5a6a` | include 风格统一为 `<>` | `eqpalg/utility/async_db_worker.cc` |
---
## 12. e21b2af 编译修复
### 问题
e21b2af 将 `TaskData.h` 中的 `Map<int, DataRecord>` 改为 `Map<int, TaskRecord>`,其中 `TaskRecord` 内含 `shm_vector_f data_record`。两处遗漏:
- **成员访问**`operator[]` 返回 `TaskRecord&`,原代码直接调用 `.size()`/`operator[]`——需穿透到 `.data_record`
- **默认构造**`boost::container::map::operator[]` 在 key 不存在时需默认构造值,`TaskRecord` 只有带 allocator 参数的构造函数
### 改动
**exp_base.cpp** 两处访问点:
```cpp
// 修改前data_record.size(), data_record[j] ← TaskRecord 没有这些成员
// 修改后:
auto &data_record = TaskShm::TaskRecordPtr.get()
->operator[](key).data_record; // 穿透到 shm_vector_f
for (size_t j = 0; j < data_record.size(); j++) { ... }
```
```cpp
// 修改前:.push_back(res.value) ← TaskRecord 没有 push_back
// 修改后:.data_record.push_back(res.value)
```
**TaskData.h**:参照 `RuleStatShm.h``RuleStatCold` 的模式,添加静态 allocator + 默认构造函数:
```cpp
static vec_allocator_f default_allocator(obj_mapped_file.get_segment_manager());
struct TaskRecord {
shm_vector_f data_record;
TaskRecord() : data_record(default_allocator) {} // ← 新增
TaskRecord(const void_allocator &alloc) : data_record(alloc) {}
};
```
---
## 3. 移除 rm -rf MapRuleStat
### 问题
`eqpalg::start()` 行 23-24 在每次启动时 `rm -rf /users/dsc/shm/MapRuleStat_boost.mmap`
```
启动时序:
1. 静态初始化 → managed_mapped_file(open_or_create) 创建文件
2. start() → rm -rf 删除文件
3. 程序运行 → 文件不在磁盘上(映射仍有效)
4. 重启 → 文件不存在open_or_create 重建 → 又被 rm -rf
```
该代码自 initial commit 即存在,与 e21b2af 移除的 `TaskData_boost.mmap` rm -rf 属同类遗留清理。
### 改动
删除 `eqpalg.cpp:23-24``rm -rf`
---
## 4. 共享内存锁重设计
### 问题
`RuleStatShm.h` 原有两重缺陷:
**缺陷一:锁不在共享内存中**
```cpp
namespace {
using ColdMutex = bipc::interprocess_mutex;
ColdMutex llmtx{}; // 匿名命名空间 → 进程本地 → mon/cron 各有一份
}
```
mon 和 cron 各自持有独立的 `llmtx`,锁 A 不影响锁 B——"进程间互斥"名不副实。mon 和 cron 对共享内存 map 的读写完全无同步。
**缺陷二:即使修复缺陷一,崩溃后也死锁**
`boost::interprocess::interprocess_mutex` 基于 `PTHREAD_PROCESS_SHARED` 但不含 `PTHREAD_MUTEX_ROBUST`。进程持锁崩溃后,互斥量保持 locked 状态。`open_or_create` 复用旧文件,重启后 `scoped_lock` 死锁。
### 改动
**`ShmSpinLock`**——基于 `atomic<pid_t>` 的自旋锁,在共享内存中分配:
```
lock():
循环:
CAS(owner, 0→getpid()) 成功 → 拿到锁
读 owner → kill(owner_pid, 0)
ESRCH → 持有者已死 → CAS 接管
0 → 持有者存活 → 100µs nanosleep 后退避
unlock():
CAS(owner, getpid()→0) // 只释放自己持有的
```
**`DualLock`**——进程内 `std::mutex` + 跨进程 `ShmSpinLock` 双层锁:
```
lock 顺序: std::mutex::lock() → ShmSpinLock::lock()
unlock 顺序: ShmSpinLock::unlock() → std::mutex::unlock()
```
| 场景 | std::mutex | ShmSpinLock |
|------|-----------|-------------|
| 同进程多线程 | pthread futex不占 CPU | 同 PIDkill(0)认为存活 |
| 跨进程 | 管不到其他进程 | PID 不同kill(0) 区分 |
| 崩溃恢复 | 随进程消失自动释放 | kill→ESRCH 后接管 |
**共享内存构造**
```cpp
static ShmSpinLock *shm_lock =
obj_mapped_file.find_or_construct<ShmSpinLock>("RuleStatLock")();
```
**MapRuleStat** 内集成:
```cpp
struct MapRuleStat {
DualLock lock_{*shm_lock};
// 9 个方法中 scoped_lock<ColdMutex> 全部替换为 lock_guard<DualLock>
};
```
崩溃恢复验证:
```
mon 持锁崩溃:
mon: std::mutex ✓, ShmSpinLock owner=mon_pid
mon 崩溃 → std::mutex 随进程消失, ShmSpinLock.owner 残留
cron 加锁:
1. std::mutex → 无竞争者,拿到
2. ShmSpinLock → kill(mon_pid,0)=ESRCH → CAS 接管成功 ✓
```
---
## 5. Unicode 字符替换
Gitea 标记 `µ`U+00B5 MICRO SIGN为模棱两可字符与 U+03BC GREEK SMALL LETTER MU 视觉相同)。改为工程惯用缩写 `us`
---
## 6. update_cold 覆盖 SHM 握手数据
### 问题
`AlgBase::exec_mon_call()` 每个周期依次执行:
```
1. exec_mon() → mon_proc() → add_stat_value() // 向 SHM 累积数据
2. update_map_rule() → update_cold(rule_stat_) // 用本地值覆盖 SHM
```
`EqpStat::update_cold` 将 mon 本地 `rule_stat_` 的值搬到 SHM
```cpp
cold.stat_values.assign(rule_stat.stat_values.begin(), // mon 端永远为空!
rule_stat.stat_values.end()); // → 清空 SHM 累积数据
cold.fetch_mark = rule_stat.fetch_mark; // mon 端永远为 false
// → 重置 cron 的消费标记
```
mon 的 `rule_stat_` 只维护热字段alarm_value、current_value 等),`stat_values` 永远为空、`fetch_mark` 永远为 false。`update_cold` 把这些默认值写入 SHM破坏 mon↔cron 握手协议。
### 改动
`update_cold` 不再复制 `stat_values``fetch_mark`——这两个字段由 `add_stat_value`/`get_stat_value` 握手独占管理。`update_cold` 只同步 `running_time``shear_times`
**RuleStatCold::update_cold**
```cpp
bool update_cold(const RuleStatCold &value) {
// 仅同步 running_time / shear_times
running_time = value.running_time;
shear_times = value.shear_times;
}
// 删除了: stat_values = value.stat_values; fetch_mark = value.fetch_mark;
```
**EqpStat::update_cold**
```cpp
// 删除了: cold.stat_values.assign(...); cold.fetch_mark = rule_stat.fetch_mark;
cold.running_time = rule_stat.running_time;
cold.shear_times = rule_stat.shear_times;
```
修复后 SHM 字段的写入方:
| 字段 | 写入方 | 说明 |
|------|--------|------|
| stat_values | add_stat_value | mon 累积,握手清空 |
| fetch_mark | get_stat_value | cron 取走后置 true |
| running_time | update_cold | mon/cron 均可写 |
| shear_times | update_cold | mon/cron 均可写 |
---
## 7. ExpTimes 持久化 DB 故障保护
### 问题
`ExpTimes::update_history_times()` 存在两个 DB 故障场景的缺陷:
**场景一:`get_history_times()` 返回 -1DB 查询失败)**
原代码:
```cpp
if (get_history_times() == -2) {
insert_history_times(...); // 首次 → INSERT
} else {
exec(update(...)); // -1 也走这里 → 尝试 UPDATE 不存在的行
}
```
`get_history_times()` 返回 -1 时已将 `rule_stat_.shear_times = 0`(副作用),然后走 UPDATE 分支 → 受影响行 0 → 数据静默丢失。
**场景二DB 操作抛异常**
原代码的快照恢复在 try-catch 之外,异常被外层 `handler_exec.cc` 吞掉后:
```
① now_times = rule_stat_.shear_times; // 快照 = 47
② get_history_times() // SELECT → rule_stat_.shear_times = 42
③ exec(update(...)) // 异常!
④ rule_stat_.shear_times = now_times; // ← 被跳过!值停在 42
```
5 次计数永久丢失,且不产生任何告警。
### 改动
`ret` 变量替代提前 return快照恢复保证执行
```cpp
int ret = -1;
try {
int hist_ret = get_history_times();
if (hist_ret == -1) {
// DB 故障 → 跳过ret 保持 -1不写 DB
} else if (hist_ret == -2) {
insert_history_times(now_times, now_used_time);
ret = 0;
} else {
exec(update(...));
ret = 0;
}
if (ret == 0) {
update_static(rule_id, true); // 仅在成功时更新 SHM
}
} catch (...) {
// ret 保持 -1
}
this->rule_stat_.shear_times = now_times; // ← 所有路径都执行
this->rule_stat_.running_time = now_used_time;
return ret;
```
`mon_proc()` 仅在成功时推进时间戳:
```cpp
if (update_history_times() == 0) {
last_load_time_ = now_time_; // 失败不推进 → 下周期重试
}
```
---
## 8. ExpTimes DB 持久化异步化
### 问题
`ExpTimes::mon_proc()` 每 10 分钟同步执行 7 次 DB2 操作1 SELECT + 1 INSERT/UPDATE + 5 SELECT阻塞 mon 20ms 周期几十到几百毫秒。DB2 慢或断连时,该线程所有规则的实时评估停摆,应在此期间触发的报警全部丢失。
对 algs/ 下 12 个算法文件的审计显示,仅 `exp_times.cc`、`exp_base.cpp`、`exp_bound.cpp` 在 mon 热路径上有 DB I/O其余 9 个安全。
### 改动
**新增 `AsyncDbWorker`**`eqpalg/utility/async_db_worker.h/.cc`
全局单例后台线程mon 线程投递 DB 任务后立即返回:
```
mon 线程 (20ms): worker 线程:
snap = {shear_times, running_time}
submit(rule_id, lambda) ───→ SELECT → INSERT/UPDATE
return 0 立即 update_static直接传值跳过 2 读)
继续累积 rule_stat_.shear_times++
```
去重机制:同一 `rule_id` 重复 submit 时覆盖旧任务只保留最新快照。worker 处理慢不影响 mon。
**新增 `EqpStat::update_static(ruleid, shear_times, running_time)` 重载**
`update_history_times` 刚将值写入 DB 后,`select_running_by_ruleid` + `select_times_by_ruleid` 又读回——单写者场景下完全冗余。新重载接收已知值DB 操作从 5 SELECT 降到 3 SELECT。
**`exp_times.cc` 重构**
提取 `persist_exp_times()` 为独立函数(可在任意线程调用,不依赖 `ExpTimes` 实例):
```cpp
int persist_exp_times(rule_id, exp_type, now_times, now_used_time) {
SELECT Flag, Count, X1 WHERE RuleId = rule_id
有结果: UPDATE; 无结果: INSERT
}
```
`update_history_times()` 简化为快照 + 投递:
```cpp
auto now_times = this->rule_stat_.shear_times; // 快照
AsyncDbWorker::instance().submit(rule_id, [=]() { // 投递,立即返回
if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) {
EqpStat::instance().update_static(rule_id, now_times, now_used_time);
}
});
```
`reset_dev_data()` 分离 SHM 更新(同步)和 DB 持久化(异步)。
**退出安全**`eqpalg_icei.cpp`
```cpp
~EqpAlgICEI() {
is_running_ = false;
alg_mgr_.reset(); // 先停所有算法线程
AsyncDbWorker::instance().drain_and_stop(); // 再排空 worker
}
```
### 并发安全性
- mon 线程和 worker 线程之间通过 `std::function` 按值捕获传递快照副本——与 `rule_stat_` 完全独立,无共享可变数据
- worker 单线程串行处理,无 DB 写竞争
- 退出序列保证先停止任务投递源算法线程再排空消费者worker
- 同 rule_id 去重防止积压
---
## 涉及文件汇总
```
本次会话全部改动9 commits:
shm/TaskData.h 添加 TaskRecord 默认构造函数
shm/RuleStatShm.h 双层锁 + update_cold 修复 + µ→us
eqpalg/algs/exp_base.cpp 修复 data_record 成员访问
eqpalg/algs/exp_times.cc 快照保护 + 异步化
eqpalg/eqpalg.cpp 移除 rm -rf MapRuleStat_boost.mmap
eqpalg/eqpalg_icei.cpp 退出时 drain_and_stop
eqpalg/utility/eqp_stat.h 新增 update_static 重载
eqpalg/utility/eqp_stat.cc update_cold 修复 + update_static 重载实现
eqpalg/utility/async_db_worker.h 新增——异步 DB worker 单例(头文件)
eqpalg/utility/async_db_worker.cc 新增——异步 DB worker 单例(实现)
```
**10 个文件**8 修改 + 2 新增)。
---
## 部署注意事项
1. **重新 CMake 配置**:新增 `async_db_worker.cc` 需要 `cmake .. && make -j$(nproc)` 重新构建
2. **数据结构更新兼容**`ShmSpinLock` 在共享内存中是新增对象(`find_or_construct<ShmSpinLock>("RuleStatLock")`),首次启动时自动创建,旧进程不访问该段
3. `std::atomic<pid_t>` 在目标环境GCC 10+ / C++20确认可用
4. `::kill(pid, 0)``ESRCH` 依赖 POSIXCentOS 7 下可用