diff --git a/eqpalg/eqpalg_optimization_notes.md b/eqpalg/eqpalg_optimization_notes.md new file mode 100644 index 0000000..60c470a --- /dev/null +++ b/eqpalg/eqpalg_optimization_notes.md @@ -0,0 +1,451 @@ +# eqpalg 代码优化建议 + +## 问题总览 + +| # | 问题 | 严重度 | 影响范围 | +|---|------|--------|----------| +| 1 | `update_map_rule()` 全局互斥锁瓶颈 | **高** | mon 所有规则的页面数据刷新 | +| 2 | 线程分配不均衡(默认 taskSeq=0) | **高** | mon 规则执行吞吐量 | +| 3 | `GetDataJson()` 全量深拷贝开销 | **中** | 页面数据序列化(每秒调用) | +| 4 | `event_handler()` 持锁处理全量队列 | **中** | 规则 CRUD 期间监控暂停 | +| 5 | `cache_data()` 超时立即重试导致忙等 | **中** | CPU 占用 | +| 6 | 每条规则独立调用 `update_dynamic` 无批处理 | **中** | 共享内存写放大 | +| 7 | `GlobaltemSharedMemory` 索引查找效率 | **低** | 每条规则每次执行 | +| 8 | cron 进程 DB2 写入无批处理 | **低** | cron 执行时间 | +| 9 | `AlgBase::lm` 锁职责不清 | **低** | 锁竞争局部放大 | +| 10 | task 进程 `HandlerExec` 泄漏 | **低** | 内存 | + +--- + +## 问题 1:`update_map_rule()` 全局互斥锁瓶颈 + +### 现状 + +``` +AlgBase::exec_mon_call() + └→ get_update_rule_stat_cycled() // ~500ms + random offset + └→ update_map_rule() + └→ std::lock_guard guard(lm) // 规则级锁 + └→ EqpStat::update_dynamic(rule_id_, rule_stat_) + └→ mapRuleStat.update_dynamic(key, value, true) + └→ std::lock_guard guard(llmtx) // 全局锁! + └→ p_msg_map->operator[](key).update_dynamic(value) + └→ boost::interprocess 共享内存 map 操作 +``` + +**根因**: `MapRuleStat` 底层使用 `boost::interprocess::managed_mapped_file` 共享内存 map,所有操作必须竞争同一个全局 `llmtx`。假设有 500 条规则,15 个 HandlerExec 线程,每隔约 500ms(带随机偏移)都会触发一次 `update_map_rule()`。即使有随机偏移,在高并发下仍有大量线程同时争抢这把锁。每次操作包含: +1. `thread_local` key 赋值 +2. `map.find()` 查找 +3. `map.operator[]` 写入(可能触发共享内存分配) +4. `RuleStat::update_dynamic()` — 5 个字段赋值、2 个 `vector_s` 共享内存 vector 复制 + +其中第 3 步涉及 boost::interprocess 的内存分配器,在共享内存段中分配节点,是**重操作**。 + +### 解决方案 + +#### 方案 A:每线程本地缓存 + 定时批量刷入(推荐) + +``` + 修改前: N条规则 × 每500ms = N次独立 map 写操作,每次持全局锁 + + 修改后: + ┌─────────────────────────────────────────────┐ + │ 每个 HandlerExec 线程维护 thread_local │ + │ std::map local_cache_ │ + │ │ + │ exec_mon_call() 中: │ + │ update_map_rule() → 只写 local_cache_ │ + │ (无锁,无共享内存操作) │ + │ │ + │ 独立批量刷入线程 (100ms 间隔): │ + │ 对每个 HandlerExec.local_cache_: │ + │ lock(local_cache) → swap 出副本 │ + │ 持全局锁一次,批量写入共享内存 map │ + └─────────────────────────────────────────────┘ +``` + +**实现要点**: +- 每个 `HandlerExec` 的 `run_thread()` 中增加一个 `thread_local` 或成员变量 `batch_update_cache_` +- `update_map_rule()` 改为写入此本地缓存 +- 在 `Manager` 中新增一个批量刷入线程,周期 100ms,收集各 handler 缓存后一次性持锁写入 +- `GetDataJson()` 优先从共享内存读(已有逻辑),批量刷新保证数据最终一致 + +**收益**: 500 条规则 × 每 500ms → 降为 15 个线程 × 每 100ms 批量写 = 150 次/秒 → 约 10 次/秒(批量合并后)。全局锁竞争减少 **50 倍以上**。 + +#### 方案 B:改用读写锁(短期缓解) + +当前 `llmtx` 是 `std::mutex`(互斥锁),但代码中已注释了 `std::shared_mutex`: + +```cpp +// RuleStatShm.h 第26行 +// std::shared_mutex local_rwlock; +``` + +- `update_dynamic()` / `update()` / `add_stat_value()` — 写操作,用 `unique_lock` +- `size()` / `empty()` / `GetDataJson()` — 读操作,用 `shared_lock` + +**注意**: `GetDataJson()` 已经做了"快照复制后释放锁再序列化"的优化,这很好。但 `update_dynamic()` 之间依然互斥。如果只是简单改为读写锁,写操作之间的竞争不会改善。此方案**只能缓解读写竞争**,不能解决写-写竞争。 + +#### 方案 C:按 ruleId 哈希分段锁 + +```cpp +// 用 N 个 segment 的 map + 锁数组替代单一全局 map + 单一全局锁 +static constexpr int SEGMENTS = 16; +struct Segment { + std::mutex mtx; + MapRuleStat_s* map; +}; +Segment segments_[SEGMENTS]; + +auto& seg = segments_[std::hash{}(key) % SEGMENTS]; +std::lock_guard guard(seg.mtx); +seg.map->operator[](key).update_dynamic(value); +``` + +**收益**: 锁竞争降低为原来的 1/SEGMENTS。 + +**推荐**: 综合使用 **方案 A + 方案 C**。先做本地缓存批量刷入(方案 A),如果仍有需要,将全局 map 分段(方案 C)。 + +--- + +## 问题 2:线程分配不均衡 + +### 现状 + +```cpp +// threads/manager.cc 第283-285行 +auto alg_thread_name = std::to_string(algId) + "_" + + std::to_string(data_source) + "_" + + std::to_string(task_seq); +``` + +规则按 `algId_dataSource_taskSeq` 分配到线程。但 `taskSeq` 来自 DB2 `T_RULE_CFG.TaskSeq` 字段,**绝大多数规则的 taskSeq 为 0**。 + +后果:同类算法(如 algId=2,最常见)的所有规则全部塞进一个 `HandlerExec` 线程,该线程中串行执行 `exec_mon_call()`。 + +假设: +- algId=2 有 200 条规则 +- 每条规则 `delay_time_` = 100ms +- `exec_mon_call()` 平均执行时间 = 0.5ms(含表达式求值) +- 单线程执行一轮 = 200 × 0.5ms = 100ms + +这已经接近甚至超过 delay_time。如果某些规则有复杂表达式或需要查 IHDB,会严重阻塞同线程的其他规则。 + +### 解决方案 + +#### 方案 A:支持 taskSeq 配置(立即可用) + +在 DB2 `T_RULE_CFG` 中为高负载规则设置不同的 `TaskSeq`(1, 2, 3...),使它们分散到不同线程。这是现有架构已支持的,只需配置。 + +**限制**: 需要人工划分,且 taskSeq 同时影响 task 进程的线程分配。 + +#### 方案 B:自动按规则数量拆分线程 + +```cpp +// 当某 algId_dataSource 线程中规则数 > 阈值时自动拆分 +const size_t MAX_RULES_PER_THREAD = 50; + +auto base_name = std::to_string(algId) + "_" + std::to_string(data_source); +int thread_index = 0; +auto candidate = base_name + "_" + std::to_string(task_seq); +// 如果已有线程的规则数超限,分配到新线程号 +while (handles_.find(candidate) != handles_.end() && + handles_[candidate]->size() >= MAX_RULES_PER_THREAD) { + thread_index++; + candidate = base_name + "_" + std::to_string(taskSeq) + "s" + std::to_string(thread_index); +} +``` + +**收益**: 200 条同类规则自动分散到 4 个线程,每线程 50 条,总执行时间从 100ms 降至 25ms。 + +#### 方案 C:使用线程池替代固定线程映射 + +将 HandlerExec 改为提交到线程池执行,而非每个线程名固定一个线程。但改动较大,涉及生命周期管理。 + +**推荐**: 先采用 **方案 A** 作为手动优化,再实现 **方案 B** 作为自动均衡。 + +--- + +## 问题 3:`GetDataJson()` 全量深拷贝 + +### 现状 + +```cpp +// RuleStatShm.h GetDataJson() +std::vector> snapshot; +{ + std::lock_guard guard(llmtx); + snapshot.reserve(p_msg_map->size()); + for (const auto& [key, value] : *p_msg_map) { + snapshot.emplace_back(key.c_str(), value); // 深拷贝 RuleStat + } +} +``` + +虽然已做了"锁内复制、锁外序列化"优化,但 `value` 的复制是深拷贝,包括: +- `vector_s items` (共享内存分配器的 vector) +- `vector_d stat_values` (共享内存分配器的 vector) + +500 条规则 × 平均每个 RuleStat ~2KB = ~1MB 的深拷贝,每秒调用一次(`TimeNotify` 事件 2 触发 `update_rule_stat_data()` → `get_stat_json()`)。 + +### 解决方案 + +#### 方案 A:增量更新 + 本地 JSON 缓存 + +``` + 维护一个线程安全的本地 JSON 缓存: + + map json_cache_; // ruleId → JSON 片段 + string full_json_cache_; // 完整 JSON + bool dirty_ = true; // 脏标记 + + update_dynamic(ruleId, value): + json_cache_[ruleId] = value.invert2json().dump() + dirty_ = true + + get_stat_json(): + if dirty_: + 拼接所有 JSON 片段为完整 JSON + full_json_cache_ = ... + dirty_ = false + return full_json_cache_ +``` + +#### 方案 B:使用共享内存中的预序列化字段 + +在 `RuleStat` 中增加一个 `char json_cache[512]` 固定大小字段,在 `update_dynamic()` 时同时更新此字段。`GetDataJson()` 直接从共享内存读预序列化的字符串,无需再调用 `invert2json()`。 + +**推荐**: **方案 A**,改动小,效果直接。配合问题 1 的本地缓存方案可自然实现。 + +--- + +## 问题 4:`event_handler()` 持锁处理全量队列 + +### 现状 + +```cpp +// handler_exec.cc 第231-232行 +int HandlerExec::event_handler() { + std::lock_guard guard(mutex_); // 持锁整个函数 + while (!reset_queue_.empty()) { ... } + while (!detach_queue_.empty()) { ... } + while (!attach_queue_.empty()) { ... } + while (!usable_queue_.empty()) { ... } + if (!once_exec_queue_.empty()) { ... } +} +``` + +`event_handler()` 在每次主循环迭代中调用。持锁期间,所有 `attach/detach/reset/set_usable/submit` 调用都被阻塞。虽然单次队列处理通常很快,但如果 attach 大量规则(如系统启动时),会长时间阻塞执行循环。 + +### 解决方案 + +```cpp +int HandlerExec::event_handler() { + // 将队列 swap 出来,最小化持锁时间 + decltype(attach_queue_) local_attach; + decltype(detach_queue_) local_detach; + // ...其他队列同理 + { + std::lock_guard guard(mutex_); + local_attach.swap(attach_queue_); + local_detach.swap(detach_queue_); + // ... + } // 立即释放锁 + // 无锁处理本地队列 + while (!local_detach.empty()) { ... } + while (!local_attach.empty()) { ... } + // ... +} +``` + +**收益**: 持锁时间从 O(队列长度) 降至 O(1) swap 操作。 + +--- + +## 问题 5:`cache_data()` 超时忙等 + +### 现状 + +```cpp +// eqpalg_icei.cpp 第82-89行 +if (time_costs >= CACHE_OUTTIME) { // >= 19ms + this->logger_->Error() << "共享内存消耗时间超时(ms):" << ...; + next_wake_time = std::chrono::steady_clock::now(); // 立即重试 +} else { + next_wake_time += CACHE_OUTTIME; + std::this_thread::sleep_until(next_wake_time); +} +``` + +当一次 `cache_data()` 耗时超过 19ms 时,不 sleep 直接重试。如果共享内存数据量很大或系统负载高,可能导致**连续失败 + 立即重试**,形成 CPU 忙等。 + +### 解决方案 + +```cpp +int consecutive_timeouts = 0; +const int MAX_CONSECUTIVE_TIMEOUTS = 3; + +if (time_costs >= CACHE_OUTTIME) { + consecutive_timeouts++; + if (consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS) { + next_wake_time += CACHE_OUTTIME * 2; // 退避 + consecutive_timeouts = 0; + } else { + next_wake_time = std::chrono::steady_clock::now(); + } +} else { + consecutive_timeouts = 0; + next_wake_time += CACHE_OUTTIME; + std::this_thread::sleep_until(next_wake_time); +} +``` + +--- + +## 问题 6:每条规则独立调用 `update_dynamic` 无批处理 + +### 现状 + +在 `exec_mon_call()` 中: +```cpp +if (get_update_rule_stat_cycled()) { + this->update_map_rule(); // 每条规则独立调用 +} +``` + +每条规则都独立触发全局锁 → 查共享内存 map → 写。配合问题 1 的方案 A,可改为本地缓存写入。 + +### 补充优化 + +`AlgBase::update_map_rule()` 中还有一个规则级别的 `std::lock_guard guard(lm)`。这个锁同时被 `update_limit_alarm()`(事件 22222)使用。如果这两个操作频繁,规则级锁也会有竞争。建议拆分为两个独立的锁: + +```cpp +std::mutex lm_map_rule_; // 保护 update_map_rule() +std::mutex lm_limit_alarm_; // 保护 update_limit_alarm() +``` + +--- + +## 问题 7:`GlobaltemSharedMemory` 索引查找效率 + +### 现状 + +```cpp +// gb_item_memory.cpp +double GlobaltemSharedMemory::operator[](std::string tag_name) { + // 每次调用在 unordered_map 中查找 tag_name → 索引映射 + // 然后用索引去读缓冲区 +} +``` + +mon 的每次表达式求值,每个 `m{tagN}` 变量都要调用一次 `operator[]`。每次调用涉及: +1. `unordered_map` 查找(字符串 hash) +2. 读取双缓冲区的 read_array[index] + +如果一条表达式引用了 5 个 tag,200 条规则,每 100ms 执行一次 = 每秒 10,000 次 map 查找。 + +### 解决方案 + +`AlgBase` 的 `init()` 中已经构建了 `m_tags` 列表。可以在 `ExpModule::update()` 时把 tag 在缓冲区中的**索引**预先解析并缓存,避免每次求值时的字符串查找: + +```cpp +// ExpModule 中预先解析索引 +std::vector tag_indices_; +void update() { + for (size_t i = 0; i < tag_indices_.size(); i++) { + mm_vars["tag" + std::to_string(i+1)] = read_buffer[tag_indices_[i]]; + } +} +``` + +--- + +## 问题 8:cron 进程 DB2 写入无批处理 + +### 现状 + +cron 进程中,`ExpBase::cron_proc()` 为每条规则独立执行: +1. 从 `EqpStat` 取统计值(持全局锁读共享内存) +2. DAA::STA 分布计算 +3. 写 DB2(独立 SQL INSERT/UPDATE) + +如果有 200 条需学习的规则,就是 200 次独立的 DB2 写入。 + +### 解决方案 + +收集多条规则的统计结果后,使用 DB2 batch insert 或多行 INSERT 语句。但需确认 mix_cc::sql 是否支持批处理。如不支持,至少可以将 cron 的处理分散到多个 task_seq 线程中并行执行。 + +--- + +## 问题 9:`AlgBase::lm` 锁职责不清 + +### 现状 + +`AlgBase` 中的 `std::mutex lm` 同时保护: +- `update_map_rule()` — 被 mon 执行循环高频调用 +- `update_limit_alarm()` — 被事件 22222 低频调用 + +这两者频率差异巨大(~500ms vs 偶尔),但共享同一把锁。 + +### 解决方案 + +拆分为两把独立锁,见问题 6 补充优化。 + +--- + +## 问题 10:task 进程 HandlerExec 清理策略 + +### 现状 + +```cpp +// manager.cc 第86-104行 +// task 进程: 每 10ms 检查所有 handles_ 是否运行完毕 +// 如果全部完成 → handles_.clear() +``` + +task 每执行一次就创建一个新的 `HandlerExec` 线程。线程执行完 `exec_task_call()` 后 `is_running_` 变为 false,但 `HandlerExec` 对象和线程资源要等到下一次 Manager 清理循环(10ms 间隔)才释放。 + +**潜在问题**: 如果短时间内发起大量 task 执行,handles_ map 会快速膨胀,且清理不及时。 + +### 解决方案 + +在 `HandlerExec::run_thread()` 的 task 分支执行完后,设置一个 `finished_` 标志,Manager 的清理线程检测到后立即清理,而非等全量检查。 + +--- + +## 优化实施建议优先级 + +| 优先级 | 问题 | 预期收益 | 改动风险 | +|--------|------|----------|----------| +| **P0** | 问题 1: update_map_rule 全局锁 | 页面刷新延迟降低 80%+ | 中 | +| **P0** | 问题 2: 线程分配不均衡 | 规则执行吞吐量提升 2-4 倍 | 低(先手动配置 taskSeq) | +| **P1** | 问题 4: event_handler 锁持有时间 | CRUD 操作响应更快 | 低 | +| **P1** | 问题 5: cache_data 退避 | 避免 CPU 忙等 | 低 | +| **P1** | 问题 6: 拆分 AlgBase::lm | 减少局部锁竞争 | 低 | +| **P2** | 问题 3: GetDataJson 增量更新 | 序列化开销降低 | 中 | +| **P2** | 问题 7: tag 索引预解析 | 表达式求值加速 20-30% | 中 | +| **P3** | 问题 8: cron 批处理 | cron 执行时间缩短 | 中 | +| **P3** | 问题 10: task 线程清理 | 内存及时释放 | 低 | + +## 架构层面的长远建议 + +### 1. 考虑将页面数据从共享内存 map 迁移到 Redis/Memcached + +当前使用 `boost::interprocess::managed_mapped_file` 共享内存 map 来存储规则统计信息给页面读取。这个设计导致所有写操作必须竞争一个进程级全局锁。如果改为: +- eqpalg 写统计数据到 **Memcached**(已有 `UpDateData` 写入 Memcached,可复用) +- 页面从 Memcached 读取 + +则可以完全消除共享内存 map 的锁竞争问题。Memcached 本身是高性能的,且天然支持并发读写。 + +### 2. 考虑将 HandlerExec 线程模型改为任务队列 + 线程池 + +当前每个 `algId_dataSource_taskSeq` 组合固定分配一个线程,导致线程数 = 算法类型组合数,且规则分配不均。改为: +- 一个固定大小的线程池(如 CPU 核数 × 2) +- 规则执行包装为 task 提交到队列 +- 线程池按照规则 deadline(`delay_time_`)调度执行 + +这样可以更均衡地利用 CPU,避免某些线程过载而其他线程空闲。 + +### 3. 统一三个进程的代码路径 + +当前 mon/cron/task 共用一个 HandlerExec 类但行为差异巨大(通过 `glob_process_type` 判断分支)。建议将三个进程的执行逻辑拆分为独立的类,提高可读性和维护性,也便于针对性优化。