eis/eqpalg/eqpalg_optimization_notes.md

452 lines
17 KiB
Markdown
Raw Permalink Normal View History

# eqpalg 代码优化建议
## 问题总览
| # | 问题 | 严重度 | 影响范围 |
|---|------|--------|----------|
| 1 | `update_map_rule()` 全局互斥锁瓶颈 | **高** | mon 所有规则的页面数据刷新 |
| 2 | 线程分配不均衡(默认 taskSeq=0 | **高** | mon 规则执行吞吐量 |
| 3 | `GetDataJson()` 全量深拷贝开销 | **中** | 页面数据序列化(每秒调用) |
| 4 | `event_handler()` 持锁处理全量队列 | **中** | 规则 CRUD 期间监控暂停 |
| 5 | `cache_data()` 超时立即重试导致忙等 | **中** | CPU 占用 |
| 6 | 每条规则独立调用 `update_dynamic` 无批处理 | **中** | 共享内存写放大 |
| 7 | `GlobaltemSharedMemory` 索引查找效率 | **低** | 每条规则每次执行 |
| 8 | cron 进程 DB2 写入无批处理 | **低** | cron 执行时间 |
| 9 | `AlgBase::lm` 锁职责不清 | **低** | 锁竞争局部放大 |
| 10 | task 进程 `HandlerExec` 泄漏 | **低** | 内存 |
---
## 问题 1`update_map_rule()` 全局互斥锁瓶颈
### 现状
```
AlgBase::exec_mon_call()
└→ get_update_rule_stat_cycled() // ~500ms + random offset
└→ update_map_rule()
└→ std::lock_guard<std::mutex> guard(lm) // 规则级锁
└→ EqpStat::update_dynamic(rule_id_, rule_stat_)
└→ mapRuleStat.update_dynamic(key, value, true)
└→ std::lock_guard<std::mutex> guard(llmtx) // 全局锁!
└→ p_msg_map->operator[](key).update_dynamic(value)
└→ boost::interprocess 共享内存 map 操作
```
**根因**: `MapRuleStat` 底层使用 `boost::interprocess::managed_mapped_file` 共享内存 map所有操作必须竞争同一个全局 `llmtx`。假设有 500 条规则15 个 HandlerExec 线程,每隔约 500ms带随机偏移都会触发一次 `update_map_rule()`。即使有随机偏移,在高并发下仍有大量线程同时争抢这把锁。每次操作包含:
1. `thread_local` key 赋值
2. `map.find()` 查找
3. `map.operator[]` 写入(可能触发共享内存分配)
4. `RuleStat::update_dynamic()` — 5 个字段赋值、2 个 `vector_s` 共享内存 vector 复制
其中第 3 步涉及 boost::interprocess 的内存分配器,在共享内存段中分配节点,是**重操作**。
### 解决方案
#### 方案 A每线程本地缓存 + 定时批量刷入(推荐)
```
修改前: N条规则 × 每500ms = N次独立 map 写操作,每次持全局锁
修改后:
┌─────────────────────────────────────────────┐
│ 每个 HandlerExec 线程维护 thread_local │
│ std::map<string, RuleStat> local_cache_ │
│ │
│ exec_mon_call() 中: │
│ update_map_rule() → 只写 local_cache_ │
│ (无锁,无共享内存操作) │
│ │
│ 独立批量刷入线程 (100ms 间隔): │
│ 对每个 HandlerExec.local_cache_: │
│ lock(local_cache) → swap 出副本 │
│ 持全局锁一次,批量写入共享内存 map │
└─────────────────────────────────────────────┘
```
**实现要点**
- 每个 `HandlerExec``run_thread()` 中增加一个 `thread_local` 或成员变量 `batch_update_cache_`
- `update_map_rule()` 改为写入此本地缓存
-`Manager` 中新增一个批量刷入线程,周期 100ms收集各 handler 缓存后一次性持锁写入
- `GetDataJson()` 优先从共享内存读(已有逻辑),批量刷新保证数据最终一致
**收益**: 500 条规则 × 每 500ms → 降为 15 个线程 × 每 100ms 批量写 = 150 次/秒 → 约 10 次/秒(批量合并后)。全局锁竞争减少 **50 倍以上**
#### 方案 B改用读写锁短期缓解
当前 `llmtx``std::mutex`(互斥锁),但代码中已注释了 `std::shared_mutex`
```cpp
// RuleStatShm.h 第26行
// std::shared_mutex local_rwlock;
```
- `update_dynamic()` / `update()` / `add_stat_value()` — 写操作,用 `unique_lock`
- `size()` / `empty()` / `GetDataJson()` — 读操作,用 `shared_lock`
**注意**: `GetDataJson()` 已经做了"快照复制后释放锁再序列化"的优化,这很好。但 `update_dynamic()` 之间依然互斥。如果只是简单改为读写锁,写操作之间的竞争不会改善。此方案**只能缓解读写竞争**,不能解决写-写竞争。
#### 方案 C按 ruleId 哈希分段锁
```cpp
// 用 N 个 segment 的 map + 锁数组替代单一全局 map + 单一全局锁
static constexpr int SEGMENTS = 16;
struct Segment {
std::mutex mtx;
MapRuleStat_s* map;
};
Segment segments_[SEGMENTS];
auto& seg = segments_[std::hash<std::string>{}(key) % SEGMENTS];
std::lock_guard<std::mutex> guard(seg.mtx);
seg.map->operator[](key).update_dynamic(value);
```
**收益**: 锁竞争降低为原来的 1/SEGMENTS。
**推荐**: 综合使用 **方案 A + 方案 C**。先做本地缓存批量刷入(方案 A如果仍有需要将全局 map 分段(方案 C
---
## 问题 2线程分配不均衡
### 现状
```cpp
// threads/manager.cc 第283-285行
auto alg_thread_name = std::to_string(algId) + "_" +
std::to_string(data_source) + "_" +
std::to_string(task_seq);
```
规则按 `algId_dataSource_taskSeq` 分配到线程。但 `taskSeq` 来自 DB2 `T_RULE_CFG.TaskSeq` 字段,**绝大多数规则的 taskSeq 为 0**。
后果:同类算法(如 algId=2最常见的所有规则全部塞进一个 `HandlerExec` 线程,该线程中串行执行 `exec_mon_call()`
假设:
- algId=2 有 200 条规则
- 每条规则 `delay_time_` = 100ms
- `exec_mon_call()` 平均执行时间 = 0.5ms(含表达式求值)
- 单线程执行一轮 = 200 × 0.5ms = 100ms
这已经接近甚至超过 delay_time。如果某些规则有复杂表达式或需要查 IHDB会严重阻塞同线程的其他规则。
### 解决方案
#### 方案 A支持 taskSeq 配置(立即可用)
在 DB2 `T_RULE_CFG` 中为高负载规则设置不同的 `TaskSeq`1, 2, 3...),使它们分散到不同线程。这是现有架构已支持的,只需配置。
**限制**: 需要人工划分,且 taskSeq 同时影响 task 进程的线程分配。
#### 方案 B自动按规则数量拆分线程
```cpp
// 当某 algId_dataSource 线程中规则数 > 阈值时自动拆分
const size_t MAX_RULES_PER_THREAD = 50;
auto base_name = std::to_string(algId) + "_" + std::to_string(data_source);
int thread_index = 0;
auto candidate = base_name + "_" + std::to_string(task_seq);
// 如果已有线程的规则数超限,分配到新线程号
while (handles_.find(candidate) != handles_.end() &&
handles_[candidate]->size() >= MAX_RULES_PER_THREAD) {
thread_index++;
candidate = base_name + "_" + std::to_string(taskSeq) + "s" + std::to_string(thread_index);
}
```
**收益**: 200 条同类规则自动分散到 4 个线程,每线程 50 条,总执行时间从 100ms 降至 25ms。
#### 方案 C使用线程池替代固定线程映射
将 HandlerExec 改为提交到线程池执行,而非每个线程名固定一个线程。但改动较大,涉及生命周期管理。
**推荐**: 先采用 **方案 A** 作为手动优化,再实现 **方案 B** 作为自动均衡。
---
## 问题 3`GetDataJson()` 全量深拷贝
### 现状
```cpp
// RuleStatShm.h GetDataJson()
std::vector<std::pair<std::string, RuleStat>> snapshot;
{
std::lock_guard<std::mutex> guard(llmtx);
snapshot.reserve(p_msg_map->size());
for (const auto& [key, value] : *p_msg_map) {
snapshot.emplace_back(key.c_str(), value); // 深拷贝 RuleStat
}
}
```
虽然已做了"锁内复制、锁外序列化"优化,但 `value` 的复制是深拷贝,包括:
- `vector_s items` (共享内存分配器的 vector<string>)
- `vector_d stat_values` (共享内存分配器的 vector<double>)
500 条规则 × 平均每个 RuleStat ~2KB = ~1MB 的深拷贝,每秒调用一次(`TimeNotify` 事件 2 触发 `update_rule_stat_data()``get_stat_json()`)。
### 解决方案
#### 方案 A增量更新 + 本地 JSON 缓存
```
维护一个线程安全的本地 JSON 缓存:
map<string, string> json_cache_; // ruleId → JSON 片段
string full_json_cache_; // 完整 JSON
bool dirty_ = true; // 脏标记
update_dynamic(ruleId, value):
json_cache_[ruleId] = value.invert2json().dump()
dirty_ = true
get_stat_json():
if dirty_:
拼接所有 JSON 片段为完整 JSON
full_json_cache_ = ...
dirty_ = false
return full_json_cache_
```
#### 方案 B使用共享内存中的预序列化字段
`RuleStat` 中增加一个 `char json_cache[512]` 固定大小字段,在 `update_dynamic()` 时同时更新此字段。`GetDataJson()` 直接从共享内存读预序列化的字符串,无需再调用 `invert2json()`
**推荐**: **方案 A**,改动小,效果直接。配合问题 1 的本地缓存方案可自然实现。
---
## 问题 4`event_handler()` 持锁处理全量队列
### 现状
```cpp
// handler_exec.cc 第231-232行
int HandlerExec::event_handler() {
std::lock_guard<std::mutex> guard(mutex_); // 持锁整个函数
while (!reset_queue_.empty()) { ... }
while (!detach_queue_.empty()) { ... }
while (!attach_queue_.empty()) { ... }
while (!usable_queue_.empty()) { ... }
if (!once_exec_queue_.empty()) { ... }
}
```
`event_handler()` 在每次主循环迭代中调用。持锁期间,所有 `attach/detach/reset/set_usable/submit` 调用都被阻塞。虽然单次队列处理通常很快,但如果 attach 大量规则(如系统启动时),会长时间阻塞执行循环。
### 解决方案
```cpp
int HandlerExec::event_handler() {
// 将队列 swap 出来,最小化持锁时间
decltype(attach_queue_) local_attach;
decltype(detach_queue_) local_detach;
// ...其他队列同理
{
std::lock_guard<std::mutex> guard(mutex_);
local_attach.swap(attach_queue_);
local_detach.swap(detach_queue_);
// ...
} // 立即释放锁
// 无锁处理本地队列
while (!local_detach.empty()) { ... }
while (!local_attach.empty()) { ... }
// ...
}
```
**收益**: 持锁时间从 O(队列长度) 降至 O(1) swap 操作。
---
## 问题 5`cache_data()` 超时忙等
### 现状
```cpp
// eqpalg_icei.cpp 第82-89行
if (time_costs >= CACHE_OUTTIME) { // >= 19ms
this->logger_->Error() << "共享内存消耗时间超时(ms):" << ...;
next_wake_time = std::chrono::steady_clock::now(); // 立即重试
} else {
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
```
当一次 `cache_data()` 耗时超过 19ms 时,不 sleep 直接重试。如果共享内存数据量很大或系统负载高,可能导致**连续失败 + 立即重试**,形成 CPU 忙等。
### 解决方案
```cpp
int consecutive_timeouts = 0;
const int MAX_CONSECUTIVE_TIMEOUTS = 3;
if (time_costs >= CACHE_OUTTIME) {
consecutive_timeouts++;
if (consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS) {
next_wake_time += CACHE_OUTTIME * 2; // 退避
consecutive_timeouts = 0;
} else {
next_wake_time = std::chrono::steady_clock::now();
}
} else {
consecutive_timeouts = 0;
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
```
---
## 问题 6每条规则独立调用 `update_dynamic` 无批处理
### 现状
`exec_mon_call()` 中:
```cpp
if (get_update_rule_stat_cycled()) {
this->update_map_rule(); // 每条规则独立调用
}
```
每条规则都独立触发全局锁 → 查共享内存 map → 写。配合问题 1 的方案 A可改为本地缓存写入。
### 补充优化
`AlgBase::update_map_rule()` 中还有一个规则级别的 `std::lock_guard<std::mutex> guard(lm)`。这个锁同时被 `update_limit_alarm()`(事件 22222使用。如果这两个操作频繁规则级锁也会有竞争。建议拆分为两个独立的锁
```cpp
std::mutex lm_map_rule_; // 保护 update_map_rule()
std::mutex lm_limit_alarm_; // 保护 update_limit_alarm()
```
---
## 问题 7`GlobaltemSharedMemory` 索引查找效率
### 现状
```cpp
// gb_item_memory.cpp
double GlobaltemSharedMemory::operator[](std::string tag_name) {
// 每次调用在 unordered_map 中查找 tag_name → 索引映射
// 然后用索引去读缓冲区
}
```
mon 的每次表达式求值,每个 `m{tagN}` 变量都要调用一次 `operator[]`。每次调用涉及:
1. `unordered_map<string, size_t>` 查找(字符串 hash
2. 读取双缓冲区的 read_array[index]
如果一条表达式引用了 5 个 tag200 条规则,每 100ms 执行一次 = 每秒 10,000 次 map 查找。
### 解决方案
`AlgBase``init()` 中已经构建了 `m_tags` 列表。可以在 `ExpModule::update()` 时把 tag 在缓冲区中的**索引**预先解析并缓存,避免每次求值时的字符串查找:
```cpp
// ExpModule 中预先解析索引
std::vector<size_t> tag_indices_;
void update() {
for (size_t i = 0; i < tag_indices_.size(); i++) {
mm_vars["tag" + std::to_string(i+1)] = read_buffer[tag_indices_[i]];
}
}
```
---
## 问题 8cron 进程 DB2 写入无批处理
### 现状
cron 进程中,`ExpBase::cron_proc()` 为每条规则独立执行:
1.`EqpStat` 取统计值(持全局锁读共享内存)
2. DAA::STA 分布计算
3. 写 DB2独立 SQL INSERT/UPDATE
如果有 200 条需学习的规则,就是 200 次独立的 DB2 写入。
### 解决方案
收集多条规则的统计结果后,使用 DB2 batch insert 或多行 INSERT 语句。但需确认 mix_cc::sql 是否支持批处理。如不支持,至少可以将 cron 的处理分散到多个 task_seq 线程中并行执行。
---
## 问题 9`AlgBase::lm` 锁职责不清
### 现状
`AlgBase` 中的 `std::mutex lm` 同时保护:
- `update_map_rule()` — 被 mon 执行循环高频调用
- `update_limit_alarm()` — 被事件 22222 低频调用
这两者频率差异巨大(~500ms vs 偶尔),但共享同一把锁。
### 解决方案
拆分为两把独立锁,见问题 6 补充优化。
---
## 问题 10task 进程 HandlerExec 清理策略
### 现状
```cpp
// manager.cc 第86-104行
// task 进程: 每 10ms 检查所有 handles_ 是否运行完毕
// 如果全部完成 → handles_.clear()
```
task 每执行一次就创建一个新的 `HandlerExec` 线程。线程执行完 `exec_task_call()``is_running_` 变为 false`HandlerExec` 对象和线程资源要等到下一次 Manager 清理循环10ms 间隔)才释放。
**潜在问题**: 如果短时间内发起大量 task 执行handles_ map 会快速膨胀,且清理不及时。
### 解决方案
`HandlerExec::run_thread()` 的 task 分支执行完后,设置一个 `finished_` 标志Manager 的清理线程检测到后立即清理,而非等全量检查。
---
## 优化实施建议优先级
| 优先级 | 问题 | 预期收益 | 改动风险 |
|--------|------|----------|----------|
| **P0** | 问题 1: update_map_rule 全局锁 | 页面刷新延迟降低 80%+ | 中 |
| **P0** | 问题 2: 线程分配不均衡 | 规则执行吞吐量提升 2-4 倍 | 低(先手动配置 taskSeq |
| **P1** | 问题 4: event_handler 锁持有时间 | CRUD 操作响应更快 | 低 |
| **P1** | 问题 5: cache_data 退避 | 避免 CPU 忙等 | 低 |
| **P1** | 问题 6: 拆分 AlgBase::lm | 减少局部锁竞争 | 低 |
| **P2** | 问题 3: GetDataJson 增量更新 | 序列化开销降低 | 中 |
| **P2** | 问题 7: tag 索引预解析 | 表达式求值加速 20-30% | 中 |
| **P3** | 问题 8: cron 批处理 | cron 执行时间缩短 | 中 |
| **P3** | 问题 10: task 线程清理 | 内存及时释放 | 低 |
## 架构层面的长远建议
### 1. 考虑将页面数据从共享内存 map 迁移到 Redis/Memcached
当前使用 `boost::interprocess::managed_mapped_file` 共享内存 map 来存储规则统计信息给页面读取。这个设计导致所有写操作必须竞争一个进程级全局锁。如果改为:
- eqpalg 写统计数据到 **Memcached**(已有 `UpDateData` 写入 Memcached可复用
- 页面从 Memcached 读取
则可以完全消除共享内存 map 的锁竞争问题。Memcached 本身是高性能的,且天然支持并发读写。
### 2. 考虑将 HandlerExec 线程模型改为任务队列 + 线程池
当前每个 `algId_dataSource_taskSeq` 组合固定分配一个线程,导致线程数 = 算法类型组合数,且规则分配不均。改为:
- 一个固定大小的线程池(如 CPU 核数 × 2
- 规则执行包装为 task 提交到队列
- 线程池按照规则 deadline`delay_time_`)调度执行
这样可以更均衡地利用 CPU避免某些线程过载而其他线程空闲。
### 3. 统一三个进程的代码路径
当前 mon/cron/task 共用一个 HandlerExec 类但行为差异巨大(通过 `glob_process_type` 判断分支)。建议将三个进程的执行逻辑拆分为独立的类,提高可读性和维护性,也便于针对性优化。