diff --git a/eqpalg/eqpalg_changes_2026-05-13.md b/eqpalg/eqpalg_changes_2026-05-13.md new file mode 100644 index 0000000..8022d71 --- /dev/null +++ b/eqpalg/eqpalg_changes_2026-05-13.md @@ -0,0 +1,377 @@ +# eqpalg 改动记录 — 2026-05-13 + +## 改动总览 + +| # | Commit | 内容 | 涉及文件 | +|---|--------|------|----------| +| 1 | `0a53973` | 修复 e21b2af 编译错误:TaskRecord 成员访问 | `eqpalg/algs/exp_base.cpp` | +| 2 | `df79a9a` | 修复 e21b2af 编译错误:TaskRecord 缺少默认构造函数 | `shm/TaskData.h` | +| 3 | `b8596d3` | 移除启动时 `rm -rf MapRuleStat_boost.mmap` | `eqpalg/eqpalg.cpp` | +| 4 | `9bb810d` | 共享内存锁重设计:双层锁替代无效 ColdMutex | `shm/RuleStatShm.h` | +| 5 | `02447c7` | 替换 µ→us 消除 Gitea Unicode 告警 | `shm/RuleStatShm.h` | +| 6 | `b3932b0` | 修复 update_cold 覆盖 SHM 握手数据的 bug | `shm/RuleStatShm.h`, `eqpalg/utility/eqp_stat.cc` | +| 7 | `6ed178b` | ExpTimes:保护 update_history_times 快照恢复不因 DB 故障丢失 | `eqpalg/algs/exp_times.cc` | +| 8 | `f80a917` | ExpTimes DB 持久化异步化 + EqpStat 冗余查询消除 | 6 文件(含 2 新增) | +| 9 | `11e5a6a` | include 风格统一为 `<>` | `eqpalg/utility/async_db_worker.cc` | + +--- + +## 1–2. e21b2af 编译修复 + +### 问题 + +e21b2af 将 `TaskData.h` 中的 `Map` 改为 `Map`,其中 `TaskRecord` 内含 `shm_vector_f data_record`。两处遗漏: + +- **成员访问**:`operator[]` 返回 `TaskRecord&`,原代码直接调用 `.size()`/`operator[]`——需穿透到 `.data_record` +- **默认构造**:`boost::container::map::operator[]` 在 key 不存在时需默认构造值,`TaskRecord` 只有带 allocator 参数的构造函数 + +### 改动 + +**exp_base.cpp** 两处访问点: +```cpp +// 修改前:data_record.size(), data_record[j] ← TaskRecord 没有这些成员 +// 修改后: +auto &data_record = TaskShm::TaskRecordPtr.get() + ->operator[](key).data_record; // 穿透到 shm_vector_f +for (size_t j = 0; j < data_record.size(); j++) { ... } +``` + +```cpp +// 修改前:.push_back(res.value) ← TaskRecord 没有 push_back +// 修改后:.data_record.push_back(res.value) +``` + +**TaskData.h**:参照 `RuleStatShm.h` 中 `RuleStatCold` 的模式,添加静态 allocator + 默认构造函数: +```cpp +static vec_allocator_f default_allocator(obj_mapped_file.get_segment_manager()); + +struct TaskRecord { + shm_vector_f data_record; + TaskRecord() : data_record(default_allocator) {} // ← 新增 + TaskRecord(const void_allocator &alloc) : data_record(alloc) {} +}; +``` + +--- + +## 3. 移除 rm -rf MapRuleStat + +### 问题 + +`eqpalg::start()` 行 23-24 在每次启动时 `rm -rf /users/dsc/shm/MapRuleStat_boost.mmap`: + +``` +启动时序: + 1. 静态初始化 → managed_mapped_file(open_or_create) 创建文件 + 2. start() → rm -rf 删除文件 + 3. 程序运行 → 文件不在磁盘上(映射仍有效) + 4. 重启 → 文件不存在,open_or_create 重建 → 又被 rm -rf +``` + +该代码自 initial commit 即存在,与 e21b2af 移除的 `TaskData_boost.mmap` rm -rf 属同类遗留清理。 + +### 改动 + +删除 `eqpalg.cpp:23-24` 的 `rm -rf`。 + +--- + +## 4. 共享内存锁重设计 + +### 问题 + +`RuleStatShm.h` 原有两重缺陷: + +**缺陷一:锁不在共享内存中** + +```cpp +namespace { + using ColdMutex = bipc::interprocess_mutex; + ColdMutex llmtx{}; // 匿名命名空间 → 进程本地 → mon/cron 各有一份 +} +``` + +mon 和 cron 各自持有独立的 `llmtx`,锁 A 不影响锁 B——"进程间互斥"名不副实。mon 和 cron 对共享内存 map 的读写完全无同步。 + +**缺陷二:即使修复缺陷一,崩溃后也死锁** + +`boost::interprocess::interprocess_mutex` 基于 `PTHREAD_PROCESS_SHARED` 但不含 `PTHREAD_MUTEX_ROBUST`。进程持锁崩溃后,互斥量保持 locked 状态。`open_or_create` 复用旧文件,重启后 `scoped_lock` 死锁。 + +### 改动 + +**`ShmSpinLock`**——基于 `atomic` 的自旋锁,在共享内存中分配: + +``` +lock(): + 循环: + CAS(owner, 0→getpid()) 成功 → 拿到锁 + 读 owner → kill(owner_pid, 0) + ESRCH → 持有者已死 → CAS 接管 + 0 → 持有者存活 → 100µs nanosleep 后退避 + +unlock(): + CAS(owner, getpid()→0) // 只释放自己持有的 +``` + +**`DualLock`**——进程内 `std::mutex` + 跨进程 `ShmSpinLock` 双层锁: + +``` +lock 顺序: std::mutex::lock() → ShmSpinLock::lock() +unlock 顺序: ShmSpinLock::unlock() → std::mutex::unlock() +``` + +| 场景 | std::mutex | ShmSpinLock | +|------|-----------|-------------| +| 同进程多线程 | pthread futex,不占 CPU | 同 PID,kill(0)认为存活 | +| 跨进程 | 管不到其他进程 | PID 不同,kill(0) 区分 | +| 崩溃恢复 | 随进程消失自动释放 | kill→ESRCH 后接管 | + +**共享内存构造**: +```cpp +static ShmSpinLock *shm_lock = + obj_mapped_file.find_or_construct("RuleStatLock")(); +``` + +**MapRuleStat** 内集成: +```cpp +struct MapRuleStat { + DualLock lock_{*shm_lock}; + // 9 个方法中 scoped_lock 全部替换为 lock_guard +}; +``` + +崩溃恢复验证: +``` +mon 持锁崩溃: + mon: std::mutex ✓, ShmSpinLock owner=mon_pid + mon 崩溃 → std::mutex 随进程消失, ShmSpinLock.owner 残留 + +cron 加锁: + 1. std::mutex → 无竞争者,拿到 + 2. ShmSpinLock → kill(mon_pid,0)=ESRCH → CAS 接管成功 ✓ +``` + +--- + +## 5. Unicode 字符替换 + +Gitea 标记 `µ`(U+00B5 MICRO SIGN)为模棱两可字符(与 U+03BC GREEK SMALL LETTER MU 视觉相同)。改为工程惯用缩写 `us`。 + +--- + +## 6. update_cold 覆盖 SHM 握手数据 + +### 问题 + +`AlgBase::exec_mon_call()` 每个周期依次执行: + +``` +1. exec_mon() → mon_proc() → add_stat_value() // 向 SHM 累积数据 +2. update_map_rule() → update_cold(rule_stat_) // 用本地值覆盖 SHM! +``` + +`EqpStat::update_cold` 将 mon 本地 `rule_stat_` 的值搬到 SHM: + +```cpp +cold.stat_values.assign(rule_stat.stat_values.begin(), // mon 端永远为空! + rule_stat.stat_values.end()); // → 清空 SHM 累积数据 +cold.fetch_mark = rule_stat.fetch_mark; // mon 端永远为 false + // → 重置 cron 的消费标记 +``` + +mon 的 `rule_stat_` 只维护热字段(alarm_value、current_value 等),`stat_values` 永远为空、`fetch_mark` 永远为 false。`update_cold` 把这些默认值写入 SHM,破坏 mon↔cron 握手协议。 + +### 改动 + +`update_cold` 不再复制 `stat_values` 和 `fetch_mark`——这两个字段由 `add_stat_value`/`get_stat_value` 握手独占管理。`update_cold` 只同步 `running_time` 和 `shear_times`。 + +**RuleStatCold::update_cold**: +```cpp +bool update_cold(const RuleStatCold &value) { + // 仅同步 running_time / shear_times + running_time = value.running_time; + shear_times = value.shear_times; +} +// 删除了: stat_values = value.stat_values; fetch_mark = value.fetch_mark; +``` + +**EqpStat::update_cold**: +```cpp +// 删除了: cold.stat_values.assign(...); cold.fetch_mark = rule_stat.fetch_mark; +cold.running_time = rule_stat.running_time; +cold.shear_times = rule_stat.shear_times; +``` + +修复后 SHM 字段的写入方: + +| 字段 | 写入方 | 说明 | +|------|--------|------| +| stat_values | add_stat_value | mon 累积,握手清空 | +| fetch_mark | get_stat_value | cron 取走后置 true | +| running_time | update_cold | mon/cron 均可写 | +| shear_times | update_cold | mon/cron 均可写 | + +--- + +## 7. ExpTimes 持久化 DB 故障保护 + +### 问题 + +`ExpTimes::update_history_times()` 存在两个 DB 故障场景的缺陷: + +**场景一:`get_history_times()` 返回 -1(DB 查询失败)** + +原代码: +```cpp +if (get_history_times() == -2) { + insert_history_times(...); // 首次 → INSERT +} else { + exec(update(...)); // -1 也走这里 → 尝试 UPDATE 不存在的行 +} +``` +`get_history_times()` 返回 -1 时已将 `rule_stat_.shear_times = 0`(副作用),然后走 UPDATE 分支 → 受影响行 0 → 数据静默丢失。 + +**场景二:DB 操作抛异常** + +原代码的快照恢复在 try-catch 之外,异常被外层 `handler_exec.cc` 吞掉后: +``` +① now_times = rule_stat_.shear_times; // 快照 = 47 +② get_history_times() // SELECT → rule_stat_.shear_times = 42 +③ exec(update(...)) // 异常! +④ rule_stat_.shear_times = now_times; // ← 被跳过!值停在 42 +``` +5 次计数永久丢失,且不产生任何告警。 + +### 改动 + +用 `ret` 变量替代提前 return,快照恢复保证执行: + +```cpp +int ret = -1; +try { + int hist_ret = get_history_times(); + if (hist_ret == -1) { + // DB 故障 → 跳过,ret 保持 -1(不写 DB) + } else if (hist_ret == -2) { + insert_history_times(now_times, now_used_time); + ret = 0; + } else { + exec(update(...)); + ret = 0; + } + if (ret == 0) { + update_static(rule_id, true); // 仅在成功时更新 SHM + } +} catch (...) { + // ret 保持 -1 +} +this->rule_stat_.shear_times = now_times; // ← 所有路径都执行 +this->rule_stat_.running_time = now_used_time; +return ret; +``` + +`mon_proc()` 仅在成功时推进时间戳: +```cpp +if (update_history_times() == 0) { + last_load_time_ = now_time_; // 失败不推进 → 下周期重试 +} +``` + +--- + +## 8. ExpTimes DB 持久化异步化 + +### 问题 + +`ExpTimes::mon_proc()` 每 10 分钟同步执行 7 次 DB2 操作(1 SELECT + 1 INSERT/UPDATE + 5 SELECT),阻塞 mon 20ms 周期几十到几百毫秒。DB2 慢或断连时,该线程所有规则的实时评估停摆,应在此期间触发的报警全部丢失。 + +对 algs/ 下 12 个算法文件的审计显示,仅 `exp_times.cc`、`exp_base.cpp`、`exp_bound.cpp` 在 mon 热路径上有 DB I/O,其余 9 个安全。 + +### 改动 + +**新增 `AsyncDbWorker`**(`eqpalg/utility/async_db_worker.h/.cc`): + +全局单例后台线程,mon 线程投递 DB 任务后立即返回: + +``` +mon 线程 (20ms): worker 线程: + snap = {shear_times, running_time} + submit(rule_id, lambda) ───→ SELECT → INSERT/UPDATE + return 0 立即 update_static(直接传值,跳过 2 读) + 继续累积 rule_stat_.shear_times++ +``` + +去重机制:同一 `rule_id` 重复 submit 时覆盖旧任务(只保留最新快照)。worker 处理慢不影响 mon。 + +**新增 `EqpStat::update_static(ruleid, shear_times, running_time)` 重载**: + +`update_history_times` 刚将值写入 DB 后,`select_running_by_ruleid` + `select_times_by_ruleid` 又读回——单写者场景下完全冗余。新重载接收已知值,DB 操作从 5 SELECT 降到 3 SELECT。 + +**`exp_times.cc` 重构**: + +提取 `persist_exp_times()` 为独立函数(可在任意线程调用,不依赖 `ExpTimes` 实例): +```cpp +int persist_exp_times(rule_id, exp_type, now_times, now_used_time) { + SELECT Flag, Count, X1 WHERE RuleId = rule_id + → 有结果: UPDATE; 无结果: INSERT +} +``` + +`update_history_times()` 简化为快照 + 投递: +```cpp +auto now_times = this->rule_stat_.shear_times; // 快照 +AsyncDbWorker::instance().submit(rule_id, [=]() { // 投递,立即返回 + if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) { + EqpStat::instance().update_static(rule_id, now_times, now_used_time); + } +}); +``` + +`reset_dev_data()` 分离 SHM 更新(同步)和 DB 持久化(异步)。 + +**退出安全**(`eqpalg_icei.cpp`): + +```cpp +~EqpAlgICEI() { + is_running_ = false; + alg_mgr_.reset(); // 先停所有算法线程 + AsyncDbWorker::instance().drain_and_stop(); // 再排空 worker +} +``` + +### 并发安全性 + +- mon 线程和 worker 线程之间通过 `std::function` 按值捕获传递快照副本——与 `rule_stat_` 完全独立,无共享可变数据 +- worker 单线程串行处理,无 DB 写竞争 +- 退出序列保证:先停止任务投递源(算法线程),再排空消费者(worker) +- 同 rule_id 去重防止积压 + +--- + +## 涉及文件汇总 + +``` +本次会话全部改动(9 commits): + +shm/TaskData.h 添加 TaskRecord 默认构造函数 +shm/RuleStatShm.h 双层锁 + update_cold 修复 + µ→us +eqpalg/algs/exp_base.cpp 修复 data_record 成员访问 +eqpalg/algs/exp_times.cc 快照保护 + 异步化 +eqpalg/eqpalg.cpp 移除 rm -rf MapRuleStat_boost.mmap +eqpalg/eqpalg_icei.cpp 退出时 drain_and_stop +eqpalg/utility/eqp_stat.h 新增 update_static 重载 +eqpalg/utility/eqp_stat.cc update_cold 修复 + update_static 重载实现 +eqpalg/utility/async_db_worker.h 新增——异步 DB worker 单例(头文件) +eqpalg/utility/async_db_worker.cc 新增——异步 DB worker 单例(实现) +``` + +共 **10 个文件**(8 修改 + 2 新增)。 + +--- + +## 部署注意事项 + +1. **重新 CMake 配置**:新增 `async_db_worker.cc` 需要 `cmake .. && make -j$(nproc)` 重新构建 +2. **数据结构更新兼容**:`ShmSpinLock` 在共享内存中是新增对象(`find_or_construct("RuleStatLock")`),首次启动时自动创建,旧进程不访问该段 +3. `std::atomic` 在目标环境(GCC 10+ / C++20)确认可用 +4. `::kill(pid, 0)` 和 `ESRCH` 依赖 POSIX,CentOS 7 下可用