Covers 10 issues with root cause analysis, severity assessment, and concrete solutions: - P0: update_map_rule() global mutex bottleneck on shared memory map - P0: Thread allocation imbalance (default taskSeq=0) - P1: GetDataJson() full deep copy, event_handler lock hold time, cache_data() busy-loop, AlgBase::lm lock splitting - P2-P3: tag index pre-resolution, cron batching, task thread cleanup - Long-term: Memcached migration, thread pool, code path separation
17 KiB
eqpalg 代码优化建议
问题总览
| # | 问题 | 严重度 | 影响范围 |
|---|---|---|---|
| 1 | update_map_rule() 全局互斥锁瓶颈 |
高 | mon 所有规则的页面数据刷新 |
| 2 | 线程分配不均衡(默认 taskSeq=0) | 高 | mon 规则执行吞吐量 |
| 3 | GetDataJson() 全量深拷贝开销 |
中 | 页面数据序列化(每秒调用) |
| 4 | event_handler() 持锁处理全量队列 |
中 | 规则 CRUD 期间监控暂停 |
| 5 | cache_data() 超时立即重试导致忙等 |
中 | CPU 占用 |
| 6 | 每条规则独立调用 update_dynamic 无批处理 |
中 | 共享内存写放大 |
| 7 | GlobaltemSharedMemory 索引查找效率 |
低 | 每条规则每次执行 |
| 8 | cron 进程 DB2 写入无批处理 | 低 | cron 执行时间 |
| 9 | AlgBase::lm 锁职责不清 |
低 | 锁竞争局部放大 |
| 10 | task 进程 HandlerExec 泄漏 |
低 | 内存 |
问题 1:update_map_rule() 全局互斥锁瓶颈
现状
AlgBase::exec_mon_call()
└→ get_update_rule_stat_cycled() // ~500ms + random offset
└→ update_map_rule()
└→ std::lock_guard<std::mutex> guard(lm) // 规则级锁
└→ EqpStat::update_dynamic(rule_id_, rule_stat_)
└→ mapRuleStat.update_dynamic(key, value, true)
└→ std::lock_guard<std::mutex> guard(llmtx) // 全局锁!
└→ p_msg_map->operator[](key).update_dynamic(value)
└→ boost::interprocess 共享内存 map 操作
根因: MapRuleStat 底层使用 boost::interprocess::managed_mapped_file 共享内存 map,所有操作必须竞争同一个全局 llmtx。假设有 500 条规则,15 个 HandlerExec 线程,每隔约 500ms(带随机偏移)都会触发一次 update_map_rule()。即使有随机偏移,在高并发下仍有大量线程同时争抢这把锁。每次操作包含:
thread_localkey 赋值map.find()查找map.operator[]写入(可能触发共享内存分配)RuleStat::update_dynamic()— 5 个字段赋值、2 个vector_s共享内存 vector 复制
其中第 3 步涉及 boost::interprocess 的内存分配器,在共享内存段中分配节点,是重操作。
解决方案
方案 A:每线程本地缓存 + 定时批量刷入(推荐)
修改前: N条规则 × 每500ms = N次独立 map 写操作,每次持全局锁
修改后:
┌─────────────────────────────────────────────┐
│ 每个 HandlerExec 线程维护 thread_local │
│ std::map<string, RuleStat> local_cache_ │
│ │
│ exec_mon_call() 中: │
│ update_map_rule() → 只写 local_cache_ │
│ (无锁,无共享内存操作) │
│ │
│ 独立批量刷入线程 (100ms 间隔): │
│ 对每个 HandlerExec.local_cache_: │
│ lock(local_cache) → swap 出副本 │
│ 持全局锁一次,批量写入共享内存 map │
└─────────────────────────────────────────────┘
实现要点:
- 每个
HandlerExec的run_thread()中增加一个thread_local或成员变量batch_update_cache_ update_map_rule()改为写入此本地缓存- 在
Manager中新增一个批量刷入线程,周期 100ms,收集各 handler 缓存后一次性持锁写入 GetDataJson()优先从共享内存读(已有逻辑),批量刷新保证数据最终一致
收益: 500 条规则 × 每 500ms → 降为 15 个线程 × 每 100ms 批量写 = 150 次/秒 → 约 10 次/秒(批量合并后)。全局锁竞争减少 50 倍以上。
方案 B:改用读写锁(短期缓解)
当前 llmtx 是 std::mutex(互斥锁),但代码中已注释了 std::shared_mutex:
// RuleStatShm.h 第26行
// std::shared_mutex local_rwlock;
update_dynamic()/update()/add_stat_value()— 写操作,用unique_locksize()/empty()/GetDataJson()— 读操作,用shared_lock
注意: GetDataJson() 已经做了"快照复制后释放锁再序列化"的优化,这很好。但 update_dynamic() 之间依然互斥。如果只是简单改为读写锁,写操作之间的竞争不会改善。此方案只能缓解读写竞争,不能解决写-写竞争。
方案 C:按 ruleId 哈希分段锁
// 用 N 个 segment 的 map + 锁数组替代单一全局 map + 单一全局锁
static constexpr int SEGMENTS = 16;
struct Segment {
std::mutex mtx;
MapRuleStat_s* map;
};
Segment segments_[SEGMENTS];
auto& seg = segments_[std::hash<std::string>{}(key) % SEGMENTS];
std::lock_guard<std::mutex> guard(seg.mtx);
seg.map->operator[](key).update_dynamic(value);
收益: 锁竞争降低为原来的 1/SEGMENTS。
推荐: 综合使用 方案 A + 方案 C。先做本地缓存批量刷入(方案 A),如果仍有需要,将全局 map 分段(方案 C)。
问题 2:线程分配不均衡
现状
// threads/manager.cc 第283-285行
auto alg_thread_name = std::to_string(algId) + "_" +
std::to_string(data_source) + "_" +
std::to_string(task_seq);
规则按 algId_dataSource_taskSeq 分配到线程。但 taskSeq 来自 DB2 T_RULE_CFG.TaskSeq 字段,绝大多数规则的 taskSeq 为 0。
后果:同类算法(如 algId=2,最常见)的所有规则全部塞进一个 HandlerExec 线程,该线程中串行执行 exec_mon_call()。
假设:
- algId=2 有 200 条规则
- 每条规则
delay_time_= 100ms exec_mon_call()平均执行时间 = 0.5ms(含表达式求值)- 单线程执行一轮 = 200 × 0.5ms = 100ms
这已经接近甚至超过 delay_time。如果某些规则有复杂表达式或需要查 IHDB,会严重阻塞同线程的其他规则。
解决方案
方案 A:支持 taskSeq 配置(立即可用)
在 DB2 T_RULE_CFG 中为高负载规则设置不同的 TaskSeq(1, 2, 3...),使它们分散到不同线程。这是现有架构已支持的,只需配置。
限制: 需要人工划分,且 taskSeq 同时影响 task 进程的线程分配。
方案 B:自动按规则数量拆分线程
// 当某 algId_dataSource 线程中规则数 > 阈值时自动拆分
const size_t MAX_RULES_PER_THREAD = 50;
auto base_name = std::to_string(algId) + "_" + std::to_string(data_source);
int thread_index = 0;
auto candidate = base_name + "_" + std::to_string(task_seq);
// 如果已有线程的规则数超限,分配到新线程号
while (handles_.find(candidate) != handles_.end() &&
handles_[candidate]->size() >= MAX_RULES_PER_THREAD) {
thread_index++;
candidate = base_name + "_" + std::to_string(taskSeq) + "s" + std::to_string(thread_index);
}
收益: 200 条同类规则自动分散到 4 个线程,每线程 50 条,总执行时间从 100ms 降至 25ms。
方案 C:使用线程池替代固定线程映射
将 HandlerExec 改为提交到线程池执行,而非每个线程名固定一个线程。但改动较大,涉及生命周期管理。
推荐: 先采用 方案 A 作为手动优化,再实现 方案 B 作为自动均衡。
问题 3:GetDataJson() 全量深拷贝
现状
// RuleStatShm.h GetDataJson()
std::vector<std::pair<std::string, RuleStat>> snapshot;
{
std::lock_guard<std::mutex> guard(llmtx);
snapshot.reserve(p_msg_map->size());
for (const auto& [key, value] : *p_msg_map) {
snapshot.emplace_back(key.c_str(), value); // 深拷贝 RuleStat
}
}
虽然已做了"锁内复制、锁外序列化"优化,但 value 的复制是深拷贝,包括:
vector_s items(共享内存分配器的 vector)vector_d stat_values(共享内存分配器的 vector)
500 条规则 × 平均每个 RuleStat ~2KB = ~1MB 的深拷贝,每秒调用一次(TimeNotify 事件 2 触发 update_rule_stat_data() → get_stat_json())。
解决方案
方案 A:增量更新 + 本地 JSON 缓存
维护一个线程安全的本地 JSON 缓存:
map<string, string> json_cache_; // ruleId → JSON 片段
string full_json_cache_; // 完整 JSON
bool dirty_ = true; // 脏标记
update_dynamic(ruleId, value):
json_cache_[ruleId] = value.invert2json().dump()
dirty_ = true
get_stat_json():
if dirty_:
拼接所有 JSON 片段为完整 JSON
full_json_cache_ = ...
dirty_ = false
return full_json_cache_
方案 B:使用共享内存中的预序列化字段
在 RuleStat 中增加一个 char json_cache[512] 固定大小字段,在 update_dynamic() 时同时更新此字段。GetDataJson() 直接从共享内存读预序列化的字符串,无需再调用 invert2json()。
推荐: 方案 A,改动小,效果直接。配合问题 1 的本地缓存方案可自然实现。
问题 4:event_handler() 持锁处理全量队列
现状
// handler_exec.cc 第231-232行
int HandlerExec::event_handler() {
std::lock_guard<std::mutex> guard(mutex_); // 持锁整个函数
while (!reset_queue_.empty()) { ... }
while (!detach_queue_.empty()) { ... }
while (!attach_queue_.empty()) { ... }
while (!usable_queue_.empty()) { ... }
if (!once_exec_queue_.empty()) { ... }
}
event_handler() 在每次主循环迭代中调用。持锁期间,所有 attach/detach/reset/set_usable/submit 调用都被阻塞。虽然单次队列处理通常很快,但如果 attach 大量规则(如系统启动时),会长时间阻塞执行循环。
解决方案
int HandlerExec::event_handler() {
// 将队列 swap 出来,最小化持锁时间
decltype(attach_queue_) local_attach;
decltype(detach_queue_) local_detach;
// ...其他队列同理
{
std::lock_guard<std::mutex> guard(mutex_);
local_attach.swap(attach_queue_);
local_detach.swap(detach_queue_);
// ...
} // 立即释放锁
// 无锁处理本地队列
while (!local_detach.empty()) { ... }
while (!local_attach.empty()) { ... }
// ...
}
收益: 持锁时间从 O(队列长度) 降至 O(1) swap 操作。
问题 5:cache_data() 超时忙等
现状
// eqpalg_icei.cpp 第82-89行
if (time_costs >= CACHE_OUTTIME) { // >= 19ms
this->logger_->Error() << "共享内存消耗时间超时(ms):" << ...;
next_wake_time = std::chrono::steady_clock::now(); // 立即重试
} else {
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
当一次 cache_data() 耗时超过 19ms 时,不 sleep 直接重试。如果共享内存数据量很大或系统负载高,可能导致连续失败 + 立即重试,形成 CPU 忙等。
解决方案
int consecutive_timeouts = 0;
const int MAX_CONSECUTIVE_TIMEOUTS = 3;
if (time_costs >= CACHE_OUTTIME) {
consecutive_timeouts++;
if (consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS) {
next_wake_time += CACHE_OUTTIME * 2; // 退避
consecutive_timeouts = 0;
} else {
next_wake_time = std::chrono::steady_clock::now();
}
} else {
consecutive_timeouts = 0;
next_wake_time += CACHE_OUTTIME;
std::this_thread::sleep_until(next_wake_time);
}
问题 6:每条规则独立调用 update_dynamic 无批处理
现状
在 exec_mon_call() 中:
if (get_update_rule_stat_cycled()) {
this->update_map_rule(); // 每条规则独立调用
}
每条规则都独立触发全局锁 → 查共享内存 map → 写。配合问题 1 的方案 A,可改为本地缓存写入。
补充优化
AlgBase::update_map_rule() 中还有一个规则级别的 std::lock_guard<std::mutex> guard(lm)。这个锁同时被 update_limit_alarm()(事件 22222)使用。如果这两个操作频繁,规则级锁也会有竞争。建议拆分为两个独立的锁:
std::mutex lm_map_rule_; // 保护 update_map_rule()
std::mutex lm_limit_alarm_; // 保护 update_limit_alarm()
问题 7:GlobaltemSharedMemory 索引查找效率
现状
// gb_item_memory.cpp
double GlobaltemSharedMemory::operator[](std::string tag_name) {
// 每次调用在 unordered_map 中查找 tag_name → 索引映射
// 然后用索引去读缓冲区
}
mon 的每次表达式求值,每个 m{tagN} 变量都要调用一次 operator[]。每次调用涉及:
unordered_map<string, size_t>查找(字符串 hash)- 读取双缓冲区的 read_array[index]
如果一条表达式引用了 5 个 tag,200 条规则,每 100ms 执行一次 = 每秒 10,000 次 map 查找。
解决方案
AlgBase 的 init() 中已经构建了 m_tags 列表。可以在 ExpModule::update() 时把 tag 在缓冲区中的索引预先解析并缓存,避免每次求值时的字符串查找:
// ExpModule 中预先解析索引
std::vector<size_t> tag_indices_;
void update() {
for (size_t i = 0; i < tag_indices_.size(); i++) {
mm_vars["tag" + std::to_string(i+1)] = read_buffer[tag_indices_[i]];
}
}
问题 8:cron 进程 DB2 写入无批处理
现状
cron 进程中,ExpBase::cron_proc() 为每条规则独立执行:
- 从
EqpStat取统计值(持全局锁读共享内存) - DAA::STA 分布计算
- 写 DB2(独立 SQL INSERT/UPDATE)
如果有 200 条需学习的规则,就是 200 次独立的 DB2 写入。
解决方案
收集多条规则的统计结果后,使用 DB2 batch insert 或多行 INSERT 语句。但需确认 mix_cc::sql 是否支持批处理。如不支持,至少可以将 cron 的处理分散到多个 task_seq 线程中并行执行。
问题 9:AlgBase::lm 锁职责不清
现状
AlgBase 中的 std::mutex lm 同时保护:
update_map_rule()— 被 mon 执行循环高频调用update_limit_alarm()— 被事件 22222 低频调用
这两者频率差异巨大(~500ms vs 偶尔),但共享同一把锁。
解决方案
拆分为两把独立锁,见问题 6 补充优化。
问题 10:task 进程 HandlerExec 清理策略
现状
// manager.cc 第86-104行
// task 进程: 每 10ms 检查所有 handles_ 是否运行完毕
// 如果全部完成 → handles_.clear()
task 每执行一次就创建一个新的 HandlerExec 线程。线程执行完 exec_task_call() 后 is_running_ 变为 false,但 HandlerExec 对象和线程资源要等到下一次 Manager 清理循环(10ms 间隔)才释放。
潜在问题: 如果短时间内发起大量 task 执行,handles_ map 会快速膨胀,且清理不及时。
解决方案
在 HandlerExec::run_thread() 的 task 分支执行完后,设置一个 finished_ 标志,Manager 的清理线程检测到后立即清理,而非等全量检查。
优化实施建议优先级
| 优先级 | 问题 | 预期收益 | 改动风险 |
|---|---|---|---|
| P0 | 问题 1: update_map_rule 全局锁 | 页面刷新延迟降低 80%+ | 中 |
| P0 | 问题 2: 线程分配不均衡 | 规则执行吞吐量提升 2-4 倍 | 低(先手动配置 taskSeq) |
| P1 | 问题 4: event_handler 锁持有时间 | CRUD 操作响应更快 | 低 |
| P1 | 问题 5: cache_data 退避 | 避免 CPU 忙等 | 低 |
| P1 | 问题 6: 拆分 AlgBase::lm | 减少局部锁竞争 | 低 |
| P2 | 问题 3: GetDataJson 增量更新 | 序列化开销降低 | 中 |
| P2 | 问题 7: tag 索引预解析 | 表达式求值加速 20-30% | 中 |
| P3 | 问题 8: cron 批处理 | cron 执行时间缩短 | 中 |
| P3 | 问题 10: task 线程清理 | 内存及时释放 | 低 |
架构层面的长远建议
1. 考虑将页面数据从共享内存 map 迁移到 Redis/Memcached
当前使用 boost::interprocess::managed_mapped_file 共享内存 map 来存储规则统计信息给页面读取。这个设计导致所有写操作必须竞争一个进程级全局锁。如果改为:
- eqpalg 写统计数据到 Memcached(已有
UpDateData写入 Memcached,可复用) - 页面从 Memcached 读取
则可以完全消除共享内存 map 的锁竞争问题。Memcached 本身是高性能的,且天然支持并发读写。
2. 考虑将 HandlerExec 线程模型改为任务队列 + 线程池
当前每个 algId_dataSource_taskSeq 组合固定分配一个线程,导致线程数 = 算法类型组合数,且规则分配不均。改为:
- 一个固定大小的线程池(如 CPU 核数 × 2)
- 规则执行包装为 task 提交到队列
- 线程池按照规则 deadline(
delay_time_)调度执行
这样可以更均衡地利用 CPU,避免某些线程过载而其他线程空闲。
3. 统一三个进程的代码路径
当前 mon/cron/task 共用一个 HandlerExec 类但行为差异巨大(通过 glob_process_type 判断分支)。建议将三个进程的执行逻辑拆分为独立的类,提高可读性和维护性,也便于针对性优化。