eis/eqpalg/eqpalg_changes_2026-05-13.md
Huamonarch b99cd0a73c Add change documentation for 2026-05-13 eqpalg fixes and optimizations
Covers 9 commits: TaskRecord compile fixes, dual-layer ShmSpinLock,
rm -rf cleanup, update_cold handshake fix, ExpTimes DB robustness
and async persistence with AsyncDbWorker.
2026-05-13 17:10:09 +08:00

13 KiB
Raw Blame History

eqpalg 改动记录 — 2026-05-13

改动总览

# Commit 内容 涉及文件
1 0a53973 修复 e21b2af 编译错误TaskRecord 成员访问 eqpalg/algs/exp_base.cpp
2 df79a9a 修复 e21b2af 编译错误TaskRecord 缺少默认构造函数 shm/TaskData.h
3 b8596d3 移除启动时 rm -rf MapRuleStat_boost.mmap eqpalg/eqpalg.cpp
4 9bb810d 共享内存锁重设计:双层锁替代无效 ColdMutex shm/RuleStatShm.h
5 02447c7 替换 µ→us 消除 Gitea Unicode 告警 shm/RuleStatShm.h
6 b3932b0 修复 update_cold 覆盖 SHM 握手数据的 bug shm/RuleStatShm.h, eqpalg/utility/eqp_stat.cc
7 6ed178b ExpTimes保护 update_history_times 快照恢复不因 DB 故障丢失 eqpalg/algs/exp_times.cc
8 f80a917 ExpTimes DB 持久化异步化 + EqpStat 冗余查询消除 6 文件(含 2 新增)
9 11e5a6a include 风格统一为 <> eqpalg/utility/async_db_worker.cc

12. e21b2af 编译修复

问题

e21b2afTaskData.h 中的 Map<int, DataRecord> 改为 Map<int, TaskRecord>,其中 TaskRecord 内含 shm_vector_f data_record。两处遗漏:

  • 成员访问operator[] 返回 TaskRecord&,原代码直接调用 .size()/operator[]——需穿透到 .data_record
  • 默认构造boost::container::map::operator[] 在 key 不存在时需默认构造值,TaskRecord 只有带 allocator 参数的构造函数

改动

exp_base.cpp 两处访问点:

// 修改前data_record.size(), data_record[j]   ← TaskRecord 没有这些成员
// 修改后:
auto &data_record = TaskShm::TaskRecordPtr.get()
                        ->operator[](key).data_record;  // 穿透到 shm_vector_f
for (size_t j = 0; j < data_record.size(); j++) { ... }
// 修改前:.push_back(res.value)   ← TaskRecord 没有 push_back
// 修改后:.data_record.push_back(res.value)

TaskData.h:参照 RuleStatShm.hRuleStatCold 的模式,添加静态 allocator + 默认构造函数:

static vec_allocator_f default_allocator(obj_mapped_file.get_segment_manager());

struct TaskRecord {
    shm_vector_f data_record;
    TaskRecord() : data_record(default_allocator) {}        // ← 新增
    TaskRecord(const void_allocator &alloc) : data_record(alloc) {}
};

3. 移除 rm -rf MapRuleStat

问题

eqpalg::start() 行 23-24 在每次启动时 rm -rf /users/dsc/shm/MapRuleStat_boost.mmap

启动时序:
  1. 静态初始化 → managed_mapped_file(open_or_create) 创建文件
  2. start() → rm -rf 删除文件
  3. 程序运行 → 文件不在磁盘上(映射仍有效)
  4. 重启 → 文件不存在open_or_create 重建 → 又被 rm -rf

该代码自 initial commit 即存在,与 e21b2af 移除的 TaskData_boost.mmap rm -rf 属同类遗留清理。

改动

删除 eqpalg.cpp:23-24rm -rf


4. 共享内存锁重设计

问题

RuleStatShm.h 原有两重缺陷:

缺陷一:锁不在共享内存中

namespace {
    using ColdMutex = bipc::interprocess_mutex;
    ColdMutex llmtx{};  // 匿名命名空间 → 进程本地 → mon/cron 各有一份
}

mon 和 cron 各自持有独立的 llmtx,锁 A 不影响锁 B——"进程间互斥"名不副实。mon 和 cron 对共享内存 map 的读写完全无同步。

缺陷二:即使修复缺陷一,崩溃后也死锁

boost::interprocess::interprocess_mutex 基于 PTHREAD_PROCESS_SHARED 但不含 PTHREAD_MUTEX_ROBUST。进程持锁崩溃后,互斥量保持 locked 状态。open_or_create 复用旧文件,重启后 scoped_lock 死锁。

改动

ShmSpinLock——基于 atomic<pid_t> 的自旋锁,在共享内存中分配:

lock():
  循环:
    CAS(owner, 0→getpid()) 成功 → 拿到锁
    读 owner → kill(owner_pid, 0)
      ESRCH → 持有者已死 → CAS 接管
      0     → 持有者存活 → 100µs nanosleep 后退避

unlock():
  CAS(owner, getpid()→0)  // 只释放自己持有的

DualLock——进程内 std::mutex + 跨进程 ShmSpinLock 双层锁:

lock 顺序: std::mutex::lock() → ShmSpinLock::lock()
unlock 顺序: ShmSpinLock::unlock() → std::mutex::unlock()
场景 std::mutex ShmSpinLock
同进程多线程 pthread futex不占 CPU 同 PIDkill(0)认为存活
跨进程 管不到其他进程 PID 不同kill(0) 区分
崩溃恢复 随进程消失自动释放 kill→ESRCH 后接管

共享内存构造

static ShmSpinLock *shm_lock =
    obj_mapped_file.find_or_construct<ShmSpinLock>("RuleStatLock")();

MapRuleStat 内集成:

struct MapRuleStat {
    DualLock lock_{*shm_lock};
    // 9 个方法中 scoped_lock<ColdMutex> 全部替换为 lock_guard<DualLock>
};

崩溃恢复验证:

mon 持锁崩溃:
  mon: std::mutex ✓, ShmSpinLock owner=mon_pid
  mon 崩溃 → std::mutex 随进程消失, ShmSpinLock.owner 残留

cron 加锁:
  1. std::mutex → 无竞争者,拿到
  2. ShmSpinLock → kill(mon_pid,0)=ESRCH → CAS 接管成功 ✓

5. Unicode 字符替换

Gitea 标记 µU+00B5 MICRO SIGN为模棱两可字符与 U+03BC GREEK SMALL LETTER MU 视觉相同)。改为工程惯用缩写 us


6. update_cold 覆盖 SHM 握手数据

问题

AlgBase::exec_mon_call() 每个周期依次执行:

1. exec_mon() → mon_proc() → add_stat_value()   // 向 SHM 累积数据
2. update_map_rule() → update_cold(rule_stat_)    // 用本地值覆盖 SHM

EqpStat::update_cold 将 mon 本地 rule_stat_ 的值搬到 SHM

cold.stat_values.assign(rule_stat.stat_values.begin(),   // mon 端永远为空!
                         rule_stat.stat_values.end());   // → 清空 SHM 累积数据
cold.fetch_mark = rule_stat.fetch_mark;                  // mon 端永远为 false
                                                         // → 重置 cron 的消费标记

mon 的 rule_stat_ 只维护热字段alarm_value、current_value 等),stat_values 永远为空、fetch_mark 永远为 false。update_cold 把这些默认值写入 SHM破坏 mon↔cron 握手协议。

改动

update_cold 不再复制 stat_valuesfetch_mark——这两个字段由 add_stat_value/get_stat_value 握手独占管理。update_cold 只同步 running_timeshear_times

RuleStatCold::update_cold

bool update_cold(const RuleStatCold &value) {
    // 仅同步 running_time / shear_times
    running_time = value.running_time;
    shear_times = value.shear_times;
}
// 删除了: stat_values = value.stat_values; fetch_mark = value.fetch_mark;

EqpStat::update_cold

// 删除了: cold.stat_values.assign(...); cold.fetch_mark = rule_stat.fetch_mark;
cold.running_time = rule_stat.running_time;
cold.shear_times = rule_stat.shear_times;

修复后 SHM 字段的写入方:

字段 写入方 说明
stat_values add_stat_value mon 累积,握手清空
fetch_mark get_stat_value cron 取走后置 true
running_time update_cold mon/cron 均可写
shear_times update_cold mon/cron 均可写

7. ExpTimes 持久化 DB 故障保护

问题

ExpTimes::update_history_times() 存在两个 DB 故障场景的缺陷:

场景一:get_history_times() 返回 -1DB 查询失败)

原代码:

if (get_history_times() == -2) {
    insert_history_times(...);      // 首次 → INSERT
} else {
    exec(update(...));              // -1 也走这里 → 尝试 UPDATE 不存在的行
}

get_history_times() 返回 -1 时已将 rule_stat_.shear_times = 0(副作用),然后走 UPDATE 分支 → 受影响行 0 → 数据静默丢失。

场景二DB 操作抛异常

原代码的快照恢复在 try-catch 之外,异常被外层 handler_exec.cc 吞掉后:

① now_times = rule_stat_.shear_times;     // 快照 = 47
② get_history_times()                      // SELECT → rule_stat_.shear_times = 42
③ exec(update(...))                        // 异常!
④ rule_stat_.shear_times = now_times;      // ← 被跳过!值停在 42

5 次计数永久丢失,且不产生任何告警。

改动

ret 变量替代提前 return快照恢复保证执行

int ret = -1;
try {
    int hist_ret = get_history_times();
    if (hist_ret == -1) {
        // DB 故障 → 跳过ret 保持 -1不写 DB
    } else if (hist_ret == -2) {
        insert_history_times(now_times, now_used_time);
        ret = 0;
    } else {
        exec(update(...));
        ret = 0;
    }
    if (ret == 0) {
        update_static(rule_id, true);  // 仅在成功时更新 SHM
    }
} catch (...) {
    // ret 保持 -1
}
this->rule_stat_.shear_times = now_times;      // ← 所有路径都执行
this->rule_stat_.running_time = now_used_time;
return ret;

mon_proc() 仅在成功时推进时间戳:

if (update_history_times() == 0) {
    last_load_time_ = now_time_;    // 失败不推进 → 下周期重试
}

8. ExpTimes DB 持久化异步化

问题

ExpTimes::mon_proc() 每 10 分钟同步执行 7 次 DB2 操作1 SELECT + 1 INSERT/UPDATE + 5 SELECT阻塞 mon 20ms 周期几十到几百毫秒。DB2 慢或断连时,该线程所有规则的实时评估停摆,应在此期间触发的报警全部丢失。

对 algs/ 下 12 个算法文件的审计显示,仅 exp_times.ccexp_base.cppexp_bound.cpp 在 mon 热路径上有 DB I/O其余 9 个安全。

改动

新增 AsyncDbWorkereqpalg/utility/async_db_worker.h/.cc

全局单例后台线程mon 线程投递 DB 任务后立即返回:

mon 线程 (20ms):                        worker 线程:
  snap = {shear_times, running_time}
  submit(rule_id, lambda)  ───→         SELECT → INSERT/UPDATE
  return 0 立即                        update_static直接传值跳过 2 读)
  继续累积 rule_stat_.shear_times++

去重机制:同一 rule_id 重复 submit 时覆盖旧任务只保留最新快照。worker 处理慢不影响 mon。

新增 EqpStat::update_static(ruleid, shear_times, running_time) 重载

update_history_times 刚将值写入 DB 后,select_running_by_ruleid + select_times_by_ruleid 又读回——单写者场景下完全冗余。新重载接收已知值DB 操作从 5 SELECT 降到 3 SELECT。

exp_times.cc 重构

提取 persist_exp_times() 为独立函数(可在任意线程调用,不依赖 ExpTimes 实例):

int persist_exp_times(rule_id, exp_type, now_times, now_used_time) {
    SELECT Flag, Count, X1 WHERE RuleId = rule_id
     有结果: UPDATE; 无结果: INSERT
}

update_history_times() 简化为快照 + 投递:

auto now_times = this->rule_stat_.shear_times;      // 快照
AsyncDbWorker::instance().submit(rule_id, [=]() {    // 投递,立即返回
    if (persist_exp_times(rule_id, exp_type, now_times, now_used_time) == 0) {
        EqpStat::instance().update_static(rule_id, now_times, now_used_time);
    }
});

reset_dev_data() 分离 SHM 更新(同步)和 DB 持久化(异步)。

退出安全eqpalg_icei.cpp

~EqpAlgICEI() {
    is_running_ = false;
    alg_mgr_.reset();                             // 先停所有算法线程
    AsyncDbWorker::instance().drain_and_stop();    // 再排空 worker
}

并发安全性

  • mon 线程和 worker 线程之间通过 std::function 按值捕获传递快照副本——与 rule_stat_ 完全独立,无共享可变数据
  • worker 单线程串行处理,无 DB 写竞争
  • 退出序列保证先停止任务投递源算法线程再排空消费者worker
  • 同 rule_id 去重防止积压

涉及文件汇总

本次会话全部改动9 commits:

shm/TaskData.h                    添加 TaskRecord 默认构造函数
shm/RuleStatShm.h                 双层锁 + update_cold 修复 + µ→us
eqpalg/algs/exp_base.cpp          修复 data_record 成员访问
eqpalg/algs/exp_times.cc          快照保护 + 异步化
eqpalg/eqpalg.cpp                 移除 rm -rf MapRuleStat_boost.mmap
eqpalg/eqpalg_icei.cpp            退出时 drain_and_stop
eqpalg/utility/eqp_stat.h         新增 update_static 重载
eqpalg/utility/eqp_stat.cc        update_cold 修复 + update_static 重载实现
eqpalg/utility/async_db_worker.h  新增——异步 DB worker 单例(头文件)
eqpalg/utility/async_db_worker.cc 新增——异步 DB worker 单例(实现)

10 个文件8 修改 + 2 新增)。


部署注意事项

  1. 重新 CMake 配置:新增 async_db_worker.cc 需要 cmake .. && make -j$(nproc) 重新构建
  2. 数据结构更新兼容ShmSpinLock 在共享内存中是新增对象(find_or_construct<ShmSpinLock>("RuleStatLock")),首次启动时自动创建,旧进程不访问该段
  3. std::atomic<pid_t> 在目标环境GCC 10+ / C++20确认可用
  4. ::kill(pid, 0)ESRCH 依赖 POSIXCentOS 7 下可用