eis/eqpalg/eqpalg_optimization_notes.md
Huamonarch ef6612d2ea Add eqpalg optimization recommendations document
Covers 10 issues with root cause analysis, severity assessment,
and concrete solutions:
- P0: update_map_rule() global mutex bottleneck on shared memory map
- P0: Thread allocation imbalance (default taskSeq=0)
- P1: GetDataJson() full deep copy, event_handler lock hold time,
  cache_data() busy-loop, AlgBase::lm lock splitting
- P2-P3: tag index pre-resolution, cron batching, task thread cleanup
- Long-term: Memcached migration, thread pool, code path separation
2026-05-09 13:02:45 +08:00

452 lines
17 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# eqpalg 代码优化建议
## 问题总览
| # | 问题 | 严重度 | 影响范围 |
|---|------|--------|----------|
| 1 | `update_map_rule()` 全局互斥锁瓶颈 | **高** | mon 所有规则的页面数据刷新 |
| 2 | 线程分配不均衡(默认 taskSeq=0 | **高** | mon 规则执行吞吐量 |
| 3 | `GetDataJson()` 全量深拷贝开销 | **中** | 页面数据序列化(每秒调用) |
| 4 | `event_handler()` 持锁处理全量队列 | **中** | 规则 CRUD 期间监控暂停 |
| 5 | `cache_data()` 超时立即重试导致忙等 | **中** | CPU 占用 |
| 6 | 每条规则独立调用 `update_dynamic` 无批处理 | **中** | 共享内存写放大 |
| 7 | `GlobaltemSharedMemory` 索引查找效率 | **低** | 每条规则每次执行 |
| 8 | cron 进程 DB2 写入无批处理 | **低** | cron 执行时间 |
| 9 | `AlgBase::lm` 锁职责不清 | **低** | 锁竞争局部放大 |
| 10 | task 进程 `HandlerExec` 泄漏 | **低** | 内存 |
---
## 问题 1`update_map_rule()` 全局互斥锁瓶颈
### 现状
```
AlgBase::exec_mon_call()
└→ get_update_rule_stat_cycled() // ~500ms + random offset
└→ update_map_rule()
└→ std::lock_guard<std::mutex> guard(lm) // 规则级锁
└→ EqpStat::update_dynamic(rule_id_, rule_stat_)
└→ mapRuleStat.update_dynamic(key, value, true)
└→ std::lock_guard<std::mutex> guard(llmtx) // 全局锁!
└→ p_msg_map->operator[](key).update_dynamic(value)
└→ boost::interprocess 共享内存 map 操作
```
**根因**: `MapRuleStat` 底层使用 `boost::interprocess::managed_mapped_file` 共享内存 map所有操作必须竞争同一个全局 `llmtx`。假设有 500 条规则15 个 HandlerExec 线程,每隔约 500ms带随机偏移都会触发一次 `update_map_rule()`。即使有随机偏移,在高并发下仍有大量线程同时争抢这把锁。每次操作包含:
1. `thread_local` key 赋值
2. `map.find()` 查找
3. `map.operator[]` 写入(可能触发共享内存分配)
4. `RuleStat::update_dynamic()` — 5 个字段赋值、2 个 `vector_s` 共享内存 vector 复制
其中第 3 步涉及 boost::interprocess 的内存分配器,在共享内存段中分配节点,是**重操作**。
### 解决方案
#### 方案 A每线程本地缓存 + 定时批量刷入(推荐)
```
修改前: N条规则 × 每500ms = N次独立 map 写操作,每次持全局锁
修改后:
┌─────────────────────────────────────────────┐
│ 每个 HandlerExec 线程维护 thread_local │
│ std::map<string, RuleStat> local_cache_ │
│ │
│ exec_mon_call() 中: │
│ update_map_rule() → 只写 local_cache_ │
│ (无锁,无共享内存操作) │
│ │
│ 独立批量刷入线程 (100ms 间隔): │
│ 对每个 HandlerExec.local_cache_: │
│ lock(local_cache) → swap 出副本 │
│ 持全局锁一次,批量写入共享内存 map │
└─────────────────────────────────────────────┘
```
**实现要点**
- 每个 `HandlerExec``run_thread()` 中增加一个 `thread_local` 或成员变量 `batch_update_cache_`
- `update_map_rule()` 改为写入此本地缓存
-`Manager` 中新增一个批量刷入线程,周期 100ms收集各 handler 缓存后一次性持锁写入
- `GetDataJson()` 优先从共享内存读(已有逻辑),批量刷新保证数据最终一致
**收益**: 500 条规则 × 每 500ms → 降为 15 个线程 × 每 100ms 批量写 = 150 次/秒 → 约 10 次/秒(批量合并后)。全局锁竞争减少 **50 倍以上**
#### 方案 B改用读写锁短期缓解
当前 `llmtx``std::mutex`(互斥锁),但代码中已注释了 `std::shared_mutex`
```cpp
// RuleStatShm.h 第26行
// std::shared_mutex local_rwlock;
```
- `update_dynamic()` / `update()` / `add_stat_value()` — 写操作,用 `unique_lock`
- `size()` / `empty()` / `GetDataJson()` — 读操作,用 `shared_lock`
**注意**: `GetDataJson()` 已经做了"快照复制后释放锁再序列化"的优化,这很好。但 `update_dynamic()` 之间依然互斥。如果只是简单改为读写锁,写操作之间的竞争不会改善。此方案**只能缓解读写竞争**,不能解决写-写竞争。
#### 方案 C按 ruleId 哈希分段锁
```cpp
// 用 N 个 segment 的 map + 锁数组替代单一全局 map + 单一全局锁
static constexpr int SEGMENTS = 16;
struct Segment {
std::mutex mtx;
MapRuleStat_s* map;
};
Segment segments_[SEGMENTS];
auto& seg = segments_[std::hash<std::string>{}(key) % SEGMENTS];
std::lock_guard<std::mutex> guard(seg.mtx);
seg.map->operator[](key).update_dynamic(value);
```
**收益**: 锁竞争降低为原来的 1/SEGMENTS。
**推荐**: 综合使用 **方案 A + 方案 C**。先做本地缓存批量刷入(方案 A如果仍有需要将全局 map 分段(方案 C
---
## 问题 2线程分配不均衡
### 现状
```cpp
// threads/manager.cc 第283-285行
auto alg_thread_name = std::to_string(algId) + "_" +
std::to_string(data_source) + "_" +
std::to_string(task_seq);
```
规则按 `algId_dataSource_taskSeq` 分配到线程。但 `taskSeq` 来自 DB2 `T_RULE_CFG.TaskSeq` 字段,**绝大多数规则的 taskSeq 为 0**。
后果:同类算法(如 algId=2最常见的所有规则全部塞进一个 `HandlerExec` 线程,该线程中串行执行 `exec_mon_call()`
假设:
- algId=2 有 200 条规则
- 每条规则 `delay_time_` = 100ms
- `exec_mon_call()` 平均执行时间 = 0.5ms(含表达式求值)
- 单线程执行一轮 = 200 × 0.5ms = 100ms
这已经接近甚至超过 delay_time。如果某些规则有复杂表达式或需要查 IHDB会严重阻塞同线程的其他规则。
### 解决方案
#### 方案 A支持 taskSeq 配置(立即可用)
在 DB2 `T_RULE_CFG` 中为高负载规则设置不同的 `TaskSeq`1, 2, 3...),使它们分散到不同线程。这是现有架构已支持的,只需配置。
**限制**: 需要人工划分,且 taskSeq 同时影响 task 进程的线程分配。
#### 方案 B自动按规则数量拆分线程
```cpp
// 当某 algId_dataSource 线程中规则数 > 阈值时自动拆分
const size_t MAX_RULES_PER_THREAD = 50;
auto base_name = std::to_string(algId) + "_" + std::to_string(data_source);
int thread_index = 0;
auto candidate = base_name + "_" + std::to_string(task_seq);
// 如果已有线程的规则数超限,分配到新线程号
while (handles_.find(candidate) != handles_.end() &&
handles_[candidate]->size() >= MAX_RULES_PER_THREAD) {
thread_index++;
candidate = base_name + "_" + std::to_string(taskSeq) + "s" + std::to_string(thread_index);
}
```
**收益**: 200 条同类规则自动分散到 4 个线程,每线程 50 条,总执行时间从 100ms 降至 25ms。
#### 方案 C使用线程池替代固定线程映射
将 HandlerExec 改为提交到线程池执行,而非每个线程名固定一个线程。但改动较大,涉及生命周期管理。
**推荐**: 先采用 **方案 A** 作为手动优化,再实现 **方案 B** 作为自动均衡。
---
## 问题 3`GetDataJson()` 全量深拷贝
### 现状
```cpp
// RuleStatShm.h GetDataJson()
std::vector<std::pair<std::string, RuleStat>> snapshot;
{
std::lock_guard<std::mutex> guard(llmtx);
snapshot.reserve(p_msg_map->size());
for (const auto& [key, value] : *p_msg_map) {
snapshot.emplace_back(key.c_str(), value); // 深拷贝 RuleStat
}
}
```
虽然已做了"锁内复制、锁外序列化"优化,但 `value` 的复制是深拷贝,包括:
- `vector_s items` (共享内存分配器的 vector<string>)
- `vector_d stat_values` (共享内存分配器的 vector<double>)
500 条规则 × 平均每个 RuleStat ~2KB = ~1MB 的深拷贝,每秒调用一次(`TimeNotify` 事件 2 触发 `update_rule_stat_data()``get_stat_json()`)。
### 解决方案
#### 方案 A增量更新 + 本地 JSON 缓存
```
维护一个线程安全的本地 JSON 缓存:
map<string, string> json_cache_; // ruleId → JSON 片段
string full_json_cache_; // 完整 JSON
bool dirty_ = true; // 脏标记
update_dynamic(ruleId, value):
json_cache_[ruleId] = value.invert2json().dump()
dirty_ = true
get_stat_json():
if dirty_:
拼接所有 JSON 片段为完整 JSON
full_json_cache_ = ...
dirty_ = false
return full_json_cache_
```
#### 方案 B使用共享内存中的预序列化字段
`RuleStat` 中增加一个 `char json_cache[512]` 固定大小字段,在 `update_dynamic()` 时同时更新此字段。`GetDataJson()` 直接从共享内存读预序列化的字符串,无需再调用 `invert2json()`
**推荐**: **方案 A**,改动小,效果直接。配合问题 1 的本地缓存方案可自然实现。
---
## 问题 4`event_handler()` 持锁处理全量队列
### 现状
```cpp
// handler_exec.cc 第231-232行
int HandlerExec::event_handler() {
std::lock_guard<std::mutex> guard(mutex_); // 持锁整个函数
while (!reset_queue_.empty()) { ... }
while (!detach_queue_.empty()) { ... }
while (!attach_queue_.empty()) { ... }
while (!usable_queue_.empty()) { ... }
if (!once_exec_queue_.empty()) { ... }
}
```
`event_handler()` 在每次主循环迭代中调用。持锁期间,所有 `attach/detach/reset/set_usable/submit` 调用都被阻塞。虽然单次队列处理通常很快,但如果 attach 大量规则(如系统启动时),会长时间阻塞执行循环。
### 解决方案
```cpp
int HandlerExec::event_handler() {
// 将队列 swap 出来,最小化持锁时间
decltype(attach_queue_) local_attach;
decltype(detach_queue_) local_detach;
// ...其他队列同理
{
std::lock_guard<std::mutex> guard(mutex_);
local_attach.swap(attach_queue_);
local_detach.swap(detach_queue_);
// ...
} // 立即释放锁
// 无锁处理本地队列
while (!local_detach.empty()) { ... }
while (!local_attach.empty()) { ... }
// ...
}
```
**收益**: 持锁时间从 O(队列长度) 降至 O(1) swap 操作。
---
## 问题 5`cache_data()` 超时忙等
### 现状
```cpp
// eqpalg_icei.cpp 第82-89行
if (time_costs >= CACHE_OUTTIME) { // >= 19ms
this->logger_->Error() << "共享内存消耗时间超时(ms):" << ...;
next_wake_time = std::chrono::steady_clock::now(); // 立即重试
} else {
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
```
当一次 `cache_data()` 耗时超过 19ms 时,不 sleep 直接重试。如果共享内存数据量很大或系统负载高,可能导致**连续失败 + 立即重试**,形成 CPU 忙等。
### 解决方案
```cpp
int consecutive_timeouts = 0;
const int MAX_CONSECUTIVE_TIMEOUTS = 3;
if (time_costs >= CACHE_OUTTIME) {
consecutive_timeouts++;
if (consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS) {
next_wake_time += CACHE_OUTTIME * 2; // 退避
consecutive_timeouts = 0;
} else {
next_wake_time = std::chrono::steady_clock::now();
}
} else {
consecutive_timeouts = 0;
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
```
---
## 问题 6每条规则独立调用 `update_dynamic` 无批处理
### 现状
`exec_mon_call()` 中:
```cpp
if (get_update_rule_stat_cycled()) {
this->update_map_rule(); // 每条规则独立调用
}
```
每条规则都独立触发全局锁 → 查共享内存 map → 写。配合问题 1 的方案 A可改为本地缓存写入。
### 补充优化
`AlgBase::update_map_rule()` 中还有一个规则级别的 `std::lock_guard<std::mutex> guard(lm)`。这个锁同时被 `update_limit_alarm()`(事件 22222使用。如果这两个操作频繁规则级锁也会有竞争。建议拆分为两个独立的锁
```cpp
std::mutex lm_map_rule_; // 保护 update_map_rule()
std::mutex lm_limit_alarm_; // 保护 update_limit_alarm()
```
---
## 问题 7`GlobaltemSharedMemory` 索引查找效率
### 现状
```cpp
// gb_item_memory.cpp
double GlobaltemSharedMemory::operator[](std::string tag_name) {
// 每次调用在 unordered_map 中查找 tag_name → 索引映射
// 然后用索引去读缓冲区
}
```
mon 的每次表达式求值,每个 `m{tagN}` 变量都要调用一次 `operator[]`。每次调用涉及:
1. `unordered_map<string, size_t>` 查找(字符串 hash
2. 读取双缓冲区的 read_array[index]
如果一条表达式引用了 5 个 tag200 条规则,每 100ms 执行一次 = 每秒 10,000 次 map 查找。
### 解决方案
`AlgBase``init()` 中已经构建了 `m_tags` 列表。可以在 `ExpModule::update()` 时把 tag 在缓冲区中的**索引**预先解析并缓存,避免每次求值时的字符串查找:
```cpp
// ExpModule 中预先解析索引
std::vector<size_t> tag_indices_;
void update() {
for (size_t i = 0; i < tag_indices_.size(); i++) {
mm_vars["tag" + std::to_string(i+1)] = read_buffer[tag_indices_[i]];
}
}
```
---
## 问题 8cron 进程 DB2 写入无批处理
### 现状
cron 进程中,`ExpBase::cron_proc()` 为每条规则独立执行:
1.`EqpStat` 取统计值(持全局锁读共享内存)
2. DAA::STA 分布计算
3. 写 DB2独立 SQL INSERT/UPDATE
如果有 200 条需学习的规则,就是 200 次独立的 DB2 写入。
### 解决方案
收集多条规则的统计结果后,使用 DB2 batch insert 或多行 INSERT 语句。但需确认 mix_cc::sql 是否支持批处理。如不支持,至少可以将 cron 的处理分散到多个 task_seq 线程中并行执行。
---
## 问题 9`AlgBase::lm` 锁职责不清
### 现状
`AlgBase` 中的 `std::mutex lm` 同时保护:
- `update_map_rule()` — 被 mon 执行循环高频调用
- `update_limit_alarm()` — 被事件 22222 低频调用
这两者频率差异巨大(~500ms vs 偶尔),但共享同一把锁。
### 解决方案
拆分为两把独立锁,见问题 6 补充优化。
---
## 问题 10task 进程 HandlerExec 清理策略
### 现状
```cpp
// manager.cc 第86-104行
// task 进程: 每 10ms 检查所有 handles_ 是否运行完毕
// 如果全部完成 → handles_.clear()
```
task 每执行一次就创建一个新的 `HandlerExec` 线程。线程执行完 `exec_task_call()``is_running_` 变为 false`HandlerExec` 对象和线程资源要等到下一次 Manager 清理循环10ms 间隔)才释放。
**潜在问题**: 如果短时间内发起大量 task 执行handles_ map 会快速膨胀,且清理不及时。
### 解决方案
`HandlerExec::run_thread()` 的 task 分支执行完后,设置一个 `finished_` 标志Manager 的清理线程检测到后立即清理,而非等全量检查。
---
## 优化实施建议优先级
| 优先级 | 问题 | 预期收益 | 改动风险 |
|--------|------|----------|----------|
| **P0** | 问题 1: update_map_rule 全局锁 | 页面刷新延迟降低 80%+ | 中 |
| **P0** | 问题 2: 线程分配不均衡 | 规则执行吞吐量提升 2-4 倍 | 低(先手动配置 taskSeq |
| **P1** | 问题 4: event_handler 锁持有时间 | CRUD 操作响应更快 | 低 |
| **P1** | 问题 5: cache_data 退避 | 避免 CPU 忙等 | 低 |
| **P1** | 问题 6: 拆分 AlgBase::lm | 减少局部锁竞争 | 低 |
| **P2** | 问题 3: GetDataJson 增量更新 | 序列化开销降低 | 中 |
| **P2** | 问题 7: tag 索引预解析 | 表达式求值加速 20-30% | 中 |
| **P3** | 问题 8: cron 批处理 | cron 执行时间缩短 | 中 |
| **P3** | 问题 10: task 线程清理 | 内存及时释放 | 低 |
## 架构层面的长远建议
### 1. 考虑将页面数据从共享内存 map 迁移到 Redis/Memcached
当前使用 `boost::interprocess::managed_mapped_file` 共享内存 map 来存储规则统计信息给页面读取。这个设计导致所有写操作必须竞争一个进程级全局锁。如果改为:
- eqpalg 写统计数据到 **Memcached**(已有 `UpDateData` 写入 Memcached可复用
- 页面从 Memcached 读取
则可以完全消除共享内存 map 的锁竞争问题。Memcached 本身是高性能的,且天然支持并发读写。
### 2. 考虑将 HandlerExec 线程模型改为任务队列 + 线程池
当前每个 `algId_dataSource_taskSeq` 组合固定分配一个线程,导致线程数 = 算法类型组合数,且规则分配不均。改为:
- 一个固定大小的线程池(如 CPU 核数 × 2
- 规则执行包装为 task 提交到队列
- 线程池按照规则 deadline`delay_time_`)调度执行
这样可以更均衡地利用 CPU避免某些线程过载而其他线程空闲。
### 3. 统一三个进程的代码路径
当前 mon/cron/task 共用一个 HandlerExec 类但行为差异巨大(通过 `glob_process_type` 判断分支)。建议将三个进程的执行逻辑拆分为独立的类,提高可读性和维护性,也便于针对性优化。