# eqpalg 改动记录 — 2026-05-13 ## 改动总览 | # | Commit | 内容 | 涉及文件 | |---|--------|------|----------| | 1 | `0a53973` | 修复 e21b2af 编译错误:TaskRecord 成员访问 | `eqpalg/algs/exp_base.cpp` | | 2 | `df79a9a` | 修复 e21b2af 编译错误:TaskRecord 缺少默认构造函数 | `shm/TaskData.h` | | 3 | `b8596d3` | 移除启动时 `rm -rf MapRuleStat_boost.mmap` | `eqpalg/eqpalg.cpp` | | 4 | `9bb810d` | 共享内存锁重设计:双层锁替代无效 ColdMutex | `shm/RuleStatShm.h` | | 5 | `02447c7` | 替换 µ→us 消除 Gitea Unicode 告警 | `shm/RuleStatShm.h` | | 6 | `b3932b0` | 修复 update_cold 覆盖 SHM 握手数据的 bug | `shm/RuleStatShm.h`, `eqpalg/utility/eqp_stat.cc` | | 7 | `6ed178b` | ExpTimes:保护 update_history_times 快照恢复不因 DB 故障丢失 | `eqpalg/algs/exp_times.cc` | | 8 | `f80a917` | ExpTimes DB 持久化异步化 + EqpStat 冗余查询消除 | 6 文件(含 2 新增) | | 9 | `11e5a6a` | include 风格统一为 `<>` | `eqpalg/utility/async_db_worker.cc` | --- ## 1–2. e21b2af 编译修复 ### 问题 e21b2af 将 `TaskData.h` 中的 `Map` 改为 `Map`,其中 `TaskRecord` 内含 `shm_vector_f data_record`。两处遗漏: - **成员访问**:`operator[]` 返回 `TaskRecord&`,原代码直接调用 `.size()`/`operator[]`——需穿透到 `.data_record` - **默认构造**:`boost::container::map::operator[]` 在 key 不存在时需默认构造值,`TaskRecord` 只有带 allocator 参数的构造函数 ### 改动 **exp_base.cpp** 两处访问点: ```cpp // 修改前:data_record.size(), data_record[j] ← TaskRecord 没有这些成员 // 修改后: auto &data_record = TaskShm::TaskRecordPtr.get() ->operator[](key).data_record; // 穿透到 shm_vector_f for (size_t j = 0; j < data_record.size(); j++) { ... } ``` ```cpp // 修改前:.push_back(res.value) ← TaskRecord 没有 push_back // 修改后:.data_record.push_back(res.value) ``` **TaskData.h**:参照 `RuleStatShm.h` 中 `RuleStatCold` 的模式,添加静态 allocator + 默认构造函数: ```cpp static vec_allocator_f default_allocator(obj_mapped_file.get_segment_manager()); struct TaskRecord { shm_vector_f data_record; TaskRecord() : data_record(default_allocator) {} // ← 新增 TaskRecord(const void_allocator &alloc) : data_record(alloc) {} }; ``` --- ## 3. 移除 rm -rf MapRuleStat ### 问题 `eqpalg::start()` 行 23-24 在每次启动时 `rm -rf /users/dsc/shm/MapRuleStat_boost.mmap`: ``` 启动时序: 1. 静态初始化 → managed_mapped_file(open_or_create) 创建文件 2. start() → rm -rf 删除文件 3. 程序运行 → 文件不在磁盘上(映射仍有效) 4. 重启 → 文件不存在,open_or_create 重建 → 又被 rm -rf ``` 该代码自 initial commit 即存在,与 e21b2af 移除的 `TaskData_boost.mmap` rm -rf 属同类遗留清理。 ### 改动 删除 `eqpalg.cpp:23-24` 的 `rm -rf`。 --- ## 4. 共享内存锁重设计 ### 问题 `RuleStatShm.h` 原有两重缺陷: **缺陷一:锁不在共享内存中** ```cpp namespace { using ColdMutex = bipc::interprocess_mutex; ColdMutex llmtx{}; // 匿名命名空间 → 进程本地 → mon/cron 各有一份 } ``` mon 和 cron 各自持有独立的 `llmtx`,锁 A 不影响锁 B——"进程间互斥"名不副实。mon 和 cron 对共享内存 map 的读写完全无同步。 **缺陷二:即使修复缺陷一,崩溃后也死锁** `boost::interprocess::interprocess_mutex` 基于 `PTHREAD_PROCESS_SHARED` 但不含 `PTHREAD_MUTEX_ROBUST`。进程持锁崩溃后,互斥量保持 locked 状态。`open_or_create` 复用旧文件,重启后 `scoped_lock` 死锁。 ### 改动 **`ShmSpinLock`**——基于 `atomic` 的自旋锁,在共享内存中分配: ``` lock(): 循环: CAS(owner, 0→getpid()) 成功 → 拿到锁 读 owner → kill(owner_pid, 0) ESRCH → 持有者已死 → CAS 接管 0 → 持有者存活 → 100µs nanosleep 后退避 unlock(): CAS(owner, getpid()→0) // 只释放自己持有的 ``` **`DualLock`**——进程内 `std::mutex` + 跨进程 `ShmSpinLock` 双层锁: ``` lock 顺序: std::mutex::lock() → ShmSpinLock::lock() unlock 顺序: ShmSpinLock::unlock() → std::mutex::unlock() ``` | 场景 | std::mutex | ShmSpinLock | |------|-----------|-------------| | 同进程多线程 | pthread futex,不占 CPU | 同 PID,kill(0)认为存活 | | 跨进程 | 管不到其他进程 | PID 不同,kill(0) 区分 | | 崩溃恢复 | 随进程消失自动释放 | kill→ESRCH 后接管 | **共享内存构造**: ```cpp static ShmSpinLock *shm_lock = obj_mapped_file.find_or_construct("RuleStatLock")(); ``` **MapRuleStat** 内集成: ```cpp struct MapRuleStat { DualLock lock_{*shm_lock}; // 9 个方法中 scoped_lock 全部替换为 lock_guard }; ``` 崩溃恢复验证: ``` mon 持锁崩溃: mon: std::mutex ✓, ShmSpinLock owner=mon_pid mon 崩溃 → std::mutex 随进程消失, ShmSpinLock.owner 残留 cron 加锁: 1. std::mutex → 无竞争者,拿到 2. ShmSpinLock → kill(mon_pid,0)=ESRCH → CAS 接管成功 ✓ ``` --- ## 5. Unicode 字符替换 Gitea 标记 `µ`(U+00B5 MICRO SIGN)为模棱两可字符(与 U+03BC GREEK SMALL LETTER MU 视觉相同)。改为工程惯用缩写 `us`。 --- ## 6. update_cold 覆盖 SHM 握手数据 ### 问题 `AlgBase::exec_mon_call()` 每个周期依次执行: ``` 1. exec_mon() → mon_proc() → add_stat_value() // 向 SHM 累积数据 2. update_map_rule() → update_cold(rule_stat_) // 用本地值覆盖 SHM! ``` `EqpStat::update_cold` 将 mon 本地 `rule_stat_` 的值搬到 SHM: ```cpp cold.stat_values.assign(rule_stat.stat_values.begin(), // mon 端永远为空! rule_stat.stat_values.end()); // → 清空 SHM 累积数据 cold.fetch_mark = rule_stat.fetch_mark; // mon 端永远为 false // → 重置 cron 的消费标记 ``` mon 的 `rule_stat_` 只维护热字段(alarm_value、current_value 等),`stat_values` 永远为空、`fetch_mark` 永远为 false。`update_cold` 把这些默认值写入 SHM,破坏 mon↔cron 握手协议。 ### 改动 `update_cold` 不再复制 `stat_values` 和 `fetch_mark`——这两个字段由 `add_stat_value`/`get_stat_value` 握手独占管理。`update_cold` 只同步 `running_time` 和 `shear_times`。 **RuleStatCold::update_cold**: ```cpp bool update_cold(const RuleStatCold &value) { // 仅同步 running_time / shear_times running_time = value.running_time; shear_times = value.shear_times; } // 删除了: stat_values = value.stat_values; fetch_mark = value.fetch_mark; ``` **EqpStat::update_cold**: ```cpp // 删除了: cold.stat_values.assign(...); cold.fetch_mark = rule_stat.fetch_mark; cold.running_time = rule_stat.running_time; cold.shear_times = rule_stat.shear_times; ``` 修复后 SHM 字段的写入方: | 字段 | 写入方 | 说明 | |------|--------|------| | stat_values | add_stat_value | mon 累积,握手清空 | | fetch_mark | get_stat_value | cron 取走后置 true | | running_time | update_cold | mon/cron 均可写 | | shear_times | update_cold | mon/cron 均可写 | --- ## 7. ExpTimes 持久化 DB 故障保护 ### 问题 `ExpTimes::update_history_times()` 存在两个 DB 故障场景的缺陷: **场景一:`get_history_times()` 返回 -1(DB 查询失败)** 原代码: ```cpp if (get_history_times() == -2) { insert_history_times(...); // 首次 → INSERT } else { exec(update(...)); // -1 也走这里 → 尝试 UPDATE 不存在的行 } ``` `get_history_times()` 返回 -1 时已将 `rule_stat_.shear_times = 0`(副作用),然后走 UPDATE 分支 → 受影响行 0 → 数据静默丢失。 **场景二:DB 操作抛异常** 原代码的快照恢复在 try-catch 之外,异常被外层 `handler_exec.cc` 吞掉后: ``` ① now_times = rule_stat_.shear_times; // 快照 = 47 ② get_history_times() // SELECT → rule_stat_.shear_times = 42 ③ exec(update(...)) // 异常! ④ rule_stat_.shear_times = now_times; // ← 被跳过!值停在 42 ``` 5 次计数永久丢失,且不产生任何告警。 ### 改动 用 `ret` 变量替代提前 return,快照恢复保证执行: ```cpp int ret = -1; try { int hist_ret = get_history_times(); if (hist_ret == -1) { // DB 故障 → 跳过,ret 保持 -1(不写 DB) } else if (hist_ret == -2) { insert_history_times(now_times, now_used_time); ret = 0; } else { exec(update(...)); ret = 0; } if (ret == 0) { update_static(rule_id, true); // 仅在成功时更新 SHM } } catch (...) { // ret 保持 -1 } this->rule_stat_.shear_times = now_times; // ← 所有路径都执行 this->rule_stat_.running_time = now_used_time; return ret; ``` `mon_proc()` 仅在成功时推进时间戳: ```cpp if (update_history_times() == 0) { last_load_time_ = now_time_; // 失败不推进 → 下周期重试 } ``` --- ## 8. ExpTimes DB 持久化异步化 ### 问题 `ExpTimes::mon_proc()` 每 10 分钟同步执行 7 次 DB2 操作(1 SELECT + 1 INSERT/UPDATE + 5 SELECT),阻塞 mon 20ms 周期几十到几百毫秒。DB2 慢或断连时,该线程所有规则的实时评估停摆,应在此期间触发的报警全部丢失。 对 algs/ 下 12 个算法文件的审计显示,仅 `exp_times.cc`、`exp_base.cpp`、`exp_bound.cpp` 在 mon 热路径上有 DB I/O,其余 9 个安全。 ### 改动 **新增 `AsyncDbWorker`**(`eqpalg/utility/async_db_worker.h/.cc`): 全局单例后台线程,mon 线程投递 DB 任务后立即返回: ``` mon 线程 (20ms): worker 线程: snap = {shear_times, running_time} submit(rule_id, lambda) ───→ SELECT → INSERT/UPDATE return 0 立即 update_static(直接传值,跳过 2 读) 继续累积 rule_stat_.shear_times++ ``` 去重机制:同一 `rule_id` 重复 submit 时覆盖旧任务(只保留最新快照)。worker 处理慢不影响 mon。 **新增 `EqpStat::update_static(ruleid, shear_times, running_time)` 重载**: `update_history_times` 刚将值写入 DB 后,`select_running_by_ruleid` + `select_times_by_ruleid` 又读回——单写者场景下完全冗余。新重载接收已知值,DB 操作从 5 SELECT 降到 3 SELECT。 **`exp_times.cc` 重构**: 提取 `persist_exp_times()` 为独立函数(可在任意线程调用,不依赖 `ExpTimes` 实例): ```cpp int persist_exp_times(rule_id, exp_type, now_times, now_used_time) { SELECT Flag, Count, X1 WHERE RuleId = rule_id → 有结果: UPDATE; 无结果: INSERT } ``` `update_history_times()` 简化为快照 + 投递: ```cpp auto now_times = this->rule_stat_.shear_times; // 快照 AsyncDbWorker::instance().submit(rule_id, [=]() { // 投递,立即返回 if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) { EqpStat::instance().update_static(rule_id, now_times, now_used_time); } }); ``` `reset_dev_data()` 分离 SHM 更新(同步)和 DB 持久化(异步)。 **退出安全**(`eqpalg_icei.cpp`): ```cpp ~EqpAlgICEI() { is_running_ = false; alg_mgr_.reset(); // 先停所有算法线程 AsyncDbWorker::instance().drain_and_stop(); // 再排空 worker } ``` ### 并发安全性 - mon 线程和 worker 线程之间通过 `std::function` 按值捕获传递快照副本——与 `rule_stat_` 完全独立,无共享可变数据 - worker 单线程串行处理,无 DB 写竞争 - 退出序列保证:先停止任务投递源(算法线程),再排空消费者(worker) - 同 rule_id 去重防止积压 --- ## 涉及文件汇总 ``` 本次会话全部改动(9 commits): shm/TaskData.h 添加 TaskRecord 默认构造函数 shm/RuleStatShm.h 双层锁 + update_cold 修复 + µ→us eqpalg/algs/exp_base.cpp 修复 data_record 成员访问 eqpalg/algs/exp_times.cc 快照保护 + 异步化 eqpalg/eqpalg.cpp 移除 rm -rf MapRuleStat_boost.mmap eqpalg/eqpalg_icei.cpp 退出时 drain_and_stop eqpalg/utility/eqp_stat.h 新增 update_static 重载 eqpalg/utility/eqp_stat.cc update_cold 修复 + update_static 重载实现 eqpalg/utility/async_db_worker.h 新增——异步 DB worker 单例(头文件) eqpalg/utility/async_db_worker.cc 新增——异步 DB worker 单例(实现) ``` 共 **10 个文件**(8 修改 + 2 新增)。 --- ## 部署注意事项 1. **重新 CMake 配置**:新增 `async_db_worker.cc` 需要 `cmake .. && make -j$(nproc)` 重新构建 2. **数据结构更新兼容**:`ShmSpinLock` 在共享内存中是新增对象(`find_or_construct("RuleStatLock")`),首次启动时自动创建,旧进程不访问该段 3. `std::atomic` 在目标环境(GCC 10+ / C++20)确认可用 4. `::kill(pid, 0)` 和 `ESRCH` 依赖 POSIX,CentOS 7 下可用