eis/eqpalg/eqpalg_optimization_notes.md
Huamonarch ef6612d2ea Add eqpalg optimization recommendations document
Covers 10 issues with root cause analysis, severity assessment,
and concrete solutions:
- P0: update_map_rule() global mutex bottleneck on shared memory map
- P0: Thread allocation imbalance (default taskSeq=0)
- P1: GetDataJson() full deep copy, event_handler lock hold time,
  cache_data() busy-loop, AlgBase::lm lock splitting
- P2-P3: tag index pre-resolution, cron batching, task thread cleanup
- Long-term: Memcached migration, thread pool, code path separation
2026-05-09 13:02:45 +08:00

17 KiB
Raw Permalink Blame History

eqpalg 代码优化建议

问题总览

# 问题 严重度 影响范围
1 update_map_rule() 全局互斥锁瓶颈 mon 所有规则的页面数据刷新
2 线程分配不均衡(默认 taskSeq=0 mon 规则执行吞吐量
3 GetDataJson() 全量深拷贝开销 页面数据序列化(每秒调用)
4 event_handler() 持锁处理全量队列 规则 CRUD 期间监控暂停
5 cache_data() 超时立即重试导致忙等 CPU 占用
6 每条规则独立调用 update_dynamic 无批处理 共享内存写放大
7 GlobaltemSharedMemory 索引查找效率 每条规则每次执行
8 cron 进程 DB2 写入无批处理 cron 执行时间
9 AlgBase::lm 锁职责不清 锁竞争局部放大
10 task 进程 HandlerExec 泄漏 内存

问题 1update_map_rule() 全局互斥锁瓶颈

现状

AlgBase::exec_mon_call()
  └→ get_update_rule_stat_cycled()  // ~500ms + random offset
       └→ update_map_rule()
            └→ std::lock_guard<std::mutex> guard(lm)  // 规则级锁
                 └→ EqpStat::update_dynamic(rule_id_, rule_stat_)
                      └→ mapRuleStat.update_dynamic(key, value, true)
                           └→ std::lock_guard<std::mutex> guard(llmtx)  // 全局锁!
                                └→ p_msg_map->operator[](key).update_dynamic(value)
                                     └→ boost::interprocess 共享内存 map 操作

根因: MapRuleStat 底层使用 boost::interprocess::managed_mapped_file 共享内存 map所有操作必须竞争同一个全局 llmtx。假设有 500 条规则15 个 HandlerExec 线程,每隔约 500ms带随机偏移都会触发一次 update_map_rule()。即使有随机偏移,在高并发下仍有大量线程同时争抢这把锁。每次操作包含:

  1. thread_local key 赋值
  2. map.find() 查找
  3. map.operator[] 写入(可能触发共享内存分配)
  4. RuleStat::update_dynamic() — 5 个字段赋值、2 个 vector_s 共享内存 vector 复制

其中第 3 步涉及 boost::interprocess 的内存分配器,在共享内存段中分配节点,是重操作

解决方案

方案 A每线程本地缓存 + 定时批量刷入(推荐)

  修改前: N条规则 × 每500ms = N次独立 map 写操作,每次持全局锁
  
  修改后:
  ┌─────────────────────────────────────────────┐
  │  每个 HandlerExec 线程维护 thread_local       │
  │  std::map<string, RuleStat> local_cache_    │
  │                                             │
  │  exec_mon_call() 中:                        │
  │    update_map_rule() → 只写 local_cache_    │
  │    (无锁,无共享内存操作)                      │
  │                                             │
  │  独立批量刷入线程 (100ms 间隔):               │
  │    对每个 HandlerExec.local_cache_:          │
  │      lock(local_cache) → swap 出副本         │
  │    持全局锁一次,批量写入共享内存 map          │
  └─────────────────────────────────────────────┘

实现要点

  • 每个 HandlerExecrun_thread() 中增加一个 thread_local 或成员变量 batch_update_cache_
  • update_map_rule() 改为写入此本地缓存
  • Manager 中新增一个批量刷入线程,周期 100ms收集各 handler 缓存后一次性持锁写入
  • GetDataJson() 优先从共享内存读(已有逻辑),批量刷新保证数据最终一致

收益: 500 条规则 × 每 500ms → 降为 15 个线程 × 每 100ms 批量写 = 150 次/秒 → 约 10 次/秒(批量合并后)。全局锁竞争减少 50 倍以上

方案 B改用读写锁短期缓解

当前 llmtxstd::mutex(互斥锁),但代码中已注释了 std::shared_mutex

// RuleStatShm.h 第26行
// std::shared_mutex local_rwlock;
  • update_dynamic() / update() / add_stat_value() — 写操作,用 unique_lock
  • size() / empty() / GetDataJson() — 读操作,用 shared_lock

注意: GetDataJson() 已经做了"快照复制后释放锁再序列化"的优化,这很好。但 update_dynamic() 之间依然互斥。如果只是简单改为读写锁,写操作之间的竞争不会改善。此方案只能缓解读写竞争,不能解决写-写竞争。

方案 C按 ruleId 哈希分段锁

// 用 N 个 segment 的 map + 锁数组替代单一全局 map + 单一全局锁
static constexpr int SEGMENTS = 16;
struct Segment {
    std::mutex mtx;
    MapRuleStat_s* map;
};
Segment segments_[SEGMENTS];

auto& seg = segments_[std::hash<std::string>{}(key) % SEGMENTS];
std::lock_guard<std::mutex> guard(seg.mtx);
seg.map->operator[](key).update_dynamic(value);

收益: 锁竞争降低为原来的 1/SEGMENTS。

推荐: 综合使用 方案 A + 方案 C。先做本地缓存批量刷入(方案 A如果仍有需要将全局 map 分段(方案 C


问题 2线程分配不均衡

现状

// threads/manager.cc 第283-285行
auto alg_thread_name = std::to_string(algId) + "_" +
                       std::to_string(data_source) + "_" +
                       std::to_string(task_seq);

规则按 algId_dataSource_taskSeq 分配到线程。但 taskSeq 来自 DB2 T_RULE_CFG.TaskSeq 字段,绝大多数规则的 taskSeq 为 0

后果:同类算法(如 algId=2最常见的所有规则全部塞进一个 HandlerExec 线程,该线程中串行执行 exec_mon_call()

假设:

  • algId=2 有 200 条规则
  • 每条规则 delay_time_ = 100ms
  • exec_mon_call() 平均执行时间 = 0.5ms(含表达式求值)
  • 单线程执行一轮 = 200 × 0.5ms = 100ms

这已经接近甚至超过 delay_time。如果某些规则有复杂表达式或需要查 IHDB会严重阻塞同线程的其他规则。

解决方案

方案 A支持 taskSeq 配置(立即可用)

在 DB2 T_RULE_CFG 中为高负载规则设置不同的 TaskSeq1, 2, 3...),使它们分散到不同线程。这是现有架构已支持的,只需配置。

限制: 需要人工划分,且 taskSeq 同时影响 task 进程的线程分配。

方案 B自动按规则数量拆分线程

// 当某 algId_dataSource 线程中规则数 > 阈值时自动拆分
const size_t MAX_RULES_PER_THREAD = 50;

auto base_name = std::to_string(algId) + "_" + std::to_string(data_source);
int thread_index = 0;
auto candidate = base_name + "_" + std::to_string(task_seq);
// 如果已有线程的规则数超限,分配到新线程号
while (handles_.find(candidate) != handles_.end() &&
       handles_[candidate]->size() >= MAX_RULES_PER_THREAD) {
    thread_index++;
    candidate = base_name + "_" + std::to_string(taskSeq) + "s" + std::to_string(thread_index);
}

收益: 200 条同类规则自动分散到 4 个线程,每线程 50 条,总执行时间从 100ms 降至 25ms。

方案 C使用线程池替代固定线程映射

将 HandlerExec 改为提交到线程池执行,而非每个线程名固定一个线程。但改动较大,涉及生命周期管理。

推荐: 先采用 方案 A 作为手动优化,再实现 方案 B 作为自动均衡。


问题 3GetDataJson() 全量深拷贝

现状

// RuleStatShm.h GetDataJson()
std::vector<std::pair<std::string, RuleStat>> snapshot;
{
    std::lock_guard<std::mutex> guard(llmtx);
    snapshot.reserve(p_msg_map->size());
    for (const auto& [key, value] : *p_msg_map) {
        snapshot.emplace_back(key.c_str(), value); // 深拷贝 RuleStat
    }
}

虽然已做了"锁内复制、锁外序列化"优化,但 value 的复制是深拷贝,包括:

  • vector_s items (共享内存分配器的 vector)
  • vector_d stat_values (共享内存分配器的 vector)

500 条规则 × 平均每个 RuleStat ~2KB = ~1MB 的深拷贝,每秒调用一次(TimeNotify 事件 2 触发 update_rule_stat_data()get_stat_json())。

解决方案

方案 A增量更新 + 本地 JSON 缓存

  维护一个线程安全的本地 JSON 缓存:
  
  map<string, string> json_cache_;     // ruleId → JSON 片段
  string full_json_cache_;             // 完整 JSON
  bool dirty_ = true;                  // 脏标记
  
  update_dynamic(ruleId, value):
      json_cache_[ruleId] = value.invert2json().dump()
      dirty_ = true
  
  get_stat_json():
      if dirty_:
          拼接所有 JSON 片段为完整 JSON
          full_json_cache_ = ...
          dirty_ = false
      return full_json_cache_

方案 B使用共享内存中的预序列化字段

RuleStat 中增加一个 char json_cache[512] 固定大小字段,在 update_dynamic() 时同时更新此字段。GetDataJson() 直接从共享内存读预序列化的字符串,无需再调用 invert2json()

推荐: 方案 A,改动小,效果直接。配合问题 1 的本地缓存方案可自然实现。


问题 4event_handler() 持锁处理全量队列

现状

// handler_exec.cc 第231-232行
int HandlerExec::event_handler() {
  std::lock_guard<std::mutex> guard(mutex_);  // 持锁整个函数
  while (!reset_queue_.empty()) { ... }
  while (!detach_queue_.empty()) { ... }
  while (!attach_queue_.empty()) { ... }
  while (!usable_queue_.empty()) { ... }
  if (!once_exec_queue_.empty()) { ... }
}

event_handler() 在每次主循环迭代中调用。持锁期间,所有 attach/detach/reset/set_usable/submit 调用都被阻塞。虽然单次队列处理通常很快,但如果 attach 大量规则(如系统启动时),会长时间阻塞执行循环。

解决方案

int HandlerExec::event_handler() {
  // 将队列 swap 出来,最小化持锁时间
  decltype(attach_queue_) local_attach;
  decltype(detach_queue_) local_detach;
  // ...其他队列同理
  {
    std::lock_guard<std::mutex> guard(mutex_);
    local_attach.swap(attach_queue_);
    local_detach.swap(detach_queue_);
    // ...
  } // 立即释放锁
  // 无锁处理本地队列
  while (!local_detach.empty()) { ... }
  while (!local_attach.empty()) { ... }
  // ...
}

收益: 持锁时间从 O(队列长度) 降至 O(1) swap 操作。


问题 5cache_data() 超时忙等

现状

// eqpalg_icei.cpp 第82-89行
if (time_costs >= CACHE_OUTTIME) {  // >= 19ms
    this->logger_->Error() << "共享内存消耗时间超时(ms):" << ...;
    next_wake_time = std::chrono::steady_clock::now();  // 立即重试
} else {
    next_wake_time += CACHE_OUTTIME;
    std::this_thread::sleep_until(next_wake_time);
}

当一次 cache_data() 耗时超过 19ms 时,不 sleep 直接重试。如果共享内存数据量很大或系统负载高,可能导致连续失败 + 立即重试,形成 CPU 忙等。

解决方案

int consecutive_timeouts = 0;
const int MAX_CONSECUTIVE_TIMEOUTS = 3;

if (time_costs >= CACHE_OUTTIME) {
    consecutive_timeouts++;
    if (consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS) {
        next_wake_time += CACHE_OUTTIME * 2;  // 退避
        consecutive_timeouts = 0;
    } else {
        next_wake_time = std::chrono::steady_clock::now();
    }
} else {
    consecutive_timeouts = 0;
    next_wake_time += CACHE_OUTTIME;
    std::this_thread::sleep_until(next_wake_time);
}

问题 6每条规则独立调用 update_dynamic 无批处理

现状

exec_mon_call() 中:

if (get_update_rule_stat_cycled()) {
    this->update_map_rule();  // 每条规则独立调用
}

每条规则都独立触发全局锁 → 查共享内存 map → 写。配合问题 1 的方案 A可改为本地缓存写入。

补充优化

AlgBase::update_map_rule() 中还有一个规则级别的 std::lock_guard<std::mutex> guard(lm)。这个锁同时被 update_limit_alarm()(事件 22222使用。如果这两个操作频繁规则级锁也会有竞争。建议拆分为两个独立的锁

std::mutex lm_map_rule_;       // 保护 update_map_rule()
std::mutex lm_limit_alarm_;    // 保护 update_limit_alarm()

问题 7GlobaltemSharedMemory 索引查找效率

现状

// gb_item_memory.cpp
double GlobaltemSharedMemory::operator[](std::string tag_name) {
    // 每次调用在 unordered_map 中查找 tag_name → 索引映射
    // 然后用索引去读缓冲区
}

mon 的每次表达式求值,每个 m{tagN} 变量都要调用一次 operator[]。每次调用涉及:

  1. unordered_map<string, size_t> 查找(字符串 hash
  2. 读取双缓冲区的 read_array[index]

如果一条表达式引用了 5 个 tag200 条规则,每 100ms 执行一次 = 每秒 10,000 次 map 查找。

解决方案

AlgBaseinit() 中已经构建了 m_tags 列表。可以在 ExpModule::update() 时把 tag 在缓冲区中的索引预先解析并缓存,避免每次求值时的字符串查找:

// ExpModule 中预先解析索引
std::vector<size_t> tag_indices_;
void update() {
    for (size_t i = 0; i < tag_indices_.size(); i++) {
        mm_vars["tag" + std::to_string(i+1)] = read_buffer[tag_indices_[i]];
    }
}

问题 8cron 进程 DB2 写入无批处理

现状

cron 进程中,ExpBase::cron_proc() 为每条规则独立执行:

  1. EqpStat 取统计值(持全局锁读共享内存)
  2. DAA::STA 分布计算
  3. 写 DB2独立 SQL INSERT/UPDATE

如果有 200 条需学习的规则,就是 200 次独立的 DB2 写入。

解决方案

收集多条规则的统计结果后,使用 DB2 batch insert 或多行 INSERT 语句。但需确认 mix_cc::sql 是否支持批处理。如不支持,至少可以将 cron 的处理分散到多个 task_seq 线程中并行执行。


问题 9AlgBase::lm 锁职责不清

现状

AlgBase 中的 std::mutex lm 同时保护:

  • update_map_rule() — 被 mon 执行循环高频调用
  • update_limit_alarm() — 被事件 22222 低频调用

这两者频率差异巨大(~500ms vs 偶尔),但共享同一把锁。

解决方案

拆分为两把独立锁,见问题 6 补充优化。


问题 10task 进程 HandlerExec 清理策略

现状

// manager.cc 第86-104行
// task 进程: 每 10ms 检查所有 handles_ 是否运行完毕
// 如果全部完成 → handles_.clear()

task 每执行一次就创建一个新的 HandlerExec 线程。线程执行完 exec_task_call()is_running_ 变为 falseHandlerExec 对象和线程资源要等到下一次 Manager 清理循环10ms 间隔)才释放。

潜在问题: 如果短时间内发起大量 task 执行handles_ map 会快速膨胀,且清理不及时。

解决方案

HandlerExec::run_thread() 的 task 分支执行完后,设置一个 finished_ 标志Manager 的清理线程检测到后立即清理,而非等全量检查。


优化实施建议优先级

优先级 问题 预期收益 改动风险
P0 问题 1: update_map_rule 全局锁 页面刷新延迟降低 80%+
P0 问题 2: 线程分配不均衡 规则执行吞吐量提升 2-4 倍 低(先手动配置 taskSeq
P1 问题 4: event_handler 锁持有时间 CRUD 操作响应更快
P1 问题 5: cache_data 退避 避免 CPU 忙等
P1 问题 6: 拆分 AlgBase::lm 减少局部锁竞争
P2 问题 3: GetDataJson 增量更新 序列化开销降低
P2 问题 7: tag 索引预解析 表达式求值加速 20-30%
P3 问题 8: cron 批处理 cron 执行时间缩短
P3 问题 10: task 线程清理 内存及时释放

架构层面的长远建议

1. 考虑将页面数据从共享内存 map 迁移到 Redis/Memcached

当前使用 boost::interprocess::managed_mapped_file 共享内存 map 来存储规则统计信息给页面读取。这个设计导致所有写操作必须竞争一个进程级全局锁。如果改为:

  • eqpalg 写统计数据到 Memcached(已有 UpDateData 写入 Memcached可复用
  • 页面从 Memcached 读取

则可以完全消除共享内存 map 的锁竞争问题。Memcached 本身是高性能的,且天然支持并发读写。

2. 考虑将 HandlerExec 线程模型改为任务队列 + 线程池

当前每个 algId_dataSource_taskSeq 组合固定分配一个线程,导致线程数 = 算法类型组合数,且规则分配不均。改为:

  • 一个固定大小的线程池(如 CPU 核数 × 2
  • 规则执行包装为 task 提交到队列
  • 线程池按照规则 deadlinedelay_time_)调度执行

这样可以更均衡地利用 CPU避免某些线程过载而其他线程空闲。

3. 统一三个进程的代码路径

当前 mon/cron/task 共用一个 HandlerExec 类但行为差异巨大(通过 glob_process_type 判断分支)。建议将三个进程的执行逻辑拆分为独立的类,提高可读性和维护性,也便于针对性优化。