eis/eqpalg/eqpalg_changes_2026-05-12.md

6.5 KiB
Raw Permalink Blame History

eqpalg 优化改动记录 — 2026-05-12

改动总览

# Commit 内容 涉及文件
1 973921f RuleStat 热冷分离:展示数据走本地缓存,冷数据共享内存用进程间锁 shm/RuleStatShm.h, eqpalg/utility/eqp_stat.h/.cc, eqpalg/alg_base.h/.cpp
2 1e70af7 移除 rm -rf MapRuleStat(原 bipc::string 段错误已自然消除) eqpalg/eqpalg_icei.cpp
3 1ca922a Task 线程生命周期修复:空闲自毁 + handles_mutex 串行化清理 eqpalg/threads/handler_exec.cc, eqpalg/threads/manager.cc, eqpalg/alg_base.cpp
4 e21b2af TaskData 固定 518MB 数组改动态向量,移除 rm -rf shm/TaskData.h, eqpalg/algs/exp_base.cpp/.h, eqpalg/eqpalg_icei.cpp

1. RuleStat 热冷分离

问题

RuleStatShm::MapRuleStat 用全局 std::mutex 保护所有共享内存 map 操作。展示数据alarm_value、current_value 等和冷数据stat_values、running_time 等混在同一把锁里。1500 规则 × 15 线程 × ~500ms 触发,写-写竞争严重。且 std::mutex 是线程锁mon/cron 跨进程互不可见——存在隐藏的数据竞争 bug。

改动

数据拆分

修改前: RuleStat全量字段共享内存分配器
           ↓ 全局 std::mutex → 共享内存 map → GetDataJson → Memcached

修改后: RuleStatLocal全量字段标准类型AlgBase 本地)
           ├─ update_display() → DisplayCache本地 map + std::mutex→ 画面 JSON
           └─ update_cold()    → 共享内存RuleStatCold仅冷字段
                                  boost::interprocess::interprocess_mutex

新类型

// 本地完整数据AlgBase 持有)
struct RuleStatLocal {
    double alarm_value, current_value, limit_up, limit_down;
    std::vector<std::string> items;
    std::vector<double> stat_values;
    double running_time;
    int64_t shear_times, alarm_times;
    std::string last_alarm_time, dev_coder, unit;
    bool fetch_mark;
};

// 共享内存冷数据mon↔cron 交换用)
struct RuleStatCold {
    vector_d stat_values;
    bool fetch_mark;
    double running_time;
    int64_t shear_times, alarm_times;
    bipc::string last_alarm_time, dev_coder;
};

DisplayCache:本地 std::map<string, DisplayEntry> + std::mutexupdate_display() 纳秒级临界区,get_json() 锁外拼 JSON。

算法子类零改动RuleStatLocal 字段名与原 RuleStat 完全一致。

效果

  • 展示热路径无共享内存锁15 线程写入本地缓存互不阻塞
  • 冷路径:真正的进程间锁,修了跨进程数据竞争 bug
  • GetDataJson从本地缓存读取不再持全局锁

2. 移除 rm -rf MapRuleStat

问题

EqpAlgICEI::~EqpAlgICEI() 在 mon 进程退出时 rm -rf MapRuleStat_boost.mmap。原因是 RuleStat::itemsvector_s<bipc::string>)中 tag 名超过 23 字节时,bipc::string 的堆指针在重启后失效导致段错误。删文件是绕过 bug 的 workaround但破坏了 mon↔cron 的共享内存通道。

改动

items 已迁到 RuleStatLocal::itemsstd::vector<std::string>,本地堆),不再涉及共享内存分配器。共享内存中剩下 last_alarm_timedev_coder 两个 bipc::string,长度远在 SSO 阈值内(~19 字符和 9 字符)。原 bug 不复存在,移除 rm -rf


3. Task 线程生命周期修复

问题

  1. LOG 文件串线LOG::doConfigure 疑似进程级全局状态,首个 HandlerExec 线程调用成功后,后续线程的调用返回 3"不能执行多遍"),所有日志写入第一个线程的 log 文件。
  2. 线程永不清理task 模式下 HandlerExec 执行完任务后 rule_pointers_ 已空,但 is_running_ 保持 true线程无限空转。Manager::start() 的 test_label 检查逻辑永远为 falsehandles_.clear() 是死代码。
  3. logReset 自赋值AlgBase::logReset(int task_seq)task_seq = task_seq 是 no-op参数被丢弃。

改动

HandlerExec 自毁handler_exec.cc

if (this->rule_pointers_.empty()) {
    std::lock_guard<std::mutex> guard(mutex_);
    if (this->once_exec_queue_.empty()) {
        this->is_running_ = false;  // 通知 Manager 可析构
    }
}

Manager 清理manager.cc

// 两阶段shared_lock 收集 → unique_lock 销毁
std::vector<string> to_remove;
{ shared_lock read_lock(handles_mutex); /* 收集 !is_running_ */ }
for (auto &name : to_remove) {
    unique_lock write_lock(handles_mutex);
    /* recheck + destroy + erase */
}

exec_task 判活manager.cc

unique_lock write_lock(handles_mutex);
if (handler存在 && !handler->get_is_running()) {
    destroy() + erase();  // 清理死线程
}
if (handler不存在) { create new; }
handler->submit(task);

竞态保证exec_task() 和 Manager cleanup 都持有 unique_lock(handles_mutex),互斥。

  • 如果 cleanup 先拿到锁 → destroy + erase → exec_task 发现不存在 → create new
  • 如果 exec_task 先拿到锁 → submit → handler 的 once_exec_queue_ 非空 → 不自毁

4. TaskData 动态向量

问题

TaskData.hDataRecord 固定分配 float[30×86400×50]518 MB/条),共享内存文件 5 GB。task 用共享内存而非堆是为了文件映射不消耗 swap。但每条 record 不管实际收集多少数据点都占满 518 MB极度浪费。进程退出时 rm -rf 清理。

改动

TaskData.h

// 修改前
struct DataRecord {
    float data_record[129600000];  // 固定 518 MB
};

// 修改后
struct TaskRecord {
    shm_vector_f data_record;  // boost::container::vector<float, 共享内存分配器>
    TaskRecord(const void_allocator &alloc) : data_record(alloc) {}
};

文件上限从 5 GB → 10 MB 起始,按需增长。

exp_base.cpp

  • 写入:data_record[idx++]data_record.push_back(val)
  • 读取:for (j < task_data_size)for (j < data_record.size())
  • 移除 task_data_size 成员变量

eqpalg_icei.cpp:移除 rm -rf TaskData_boost.mmap。vector 析构时自动归还内存到共享内存段。


部署注意事项

  1. 清理旧共享内存文件(数据结构已不兼容):
    rm -rf /users/dsc/shm/MapRuleStat_boost.mmap
    rm -rf /users/dsc/shm/TaskData_boost.mmap
    
  2. boost::interprocess::interprocess_mutex 需确认在目标环境的 Boost 版本中可用。
  3. DisplayCache 依赖 nlohmann::jsonmix_cc::json),项目已有此依赖。