# eqpalg 代码优化建议 ## 问题总览 | # | 问题 | 严重度 | 影响范围 | |---|------|--------|----------| | 1 | `update_map_rule()` 全局互斥锁瓶颈 | **高** | mon 所有规则的页面数据刷新 | | 2 | 线程分配不均衡(默认 taskSeq=0) | **高** | mon 规则执行吞吐量 | | 3 | `GetDataJson()` 全量深拷贝开销 | **中** | 页面数据序列化(每秒调用) | | 4 | `event_handler()` 持锁处理全量队列 | **中** | 规则 CRUD 期间监控暂停 | | 5 | `cache_data()` 超时立即重试导致忙等 | **中** | CPU 占用 | | 6 | 每条规则独立调用 `update_dynamic` 无批处理 | **中** | 共享内存写放大 | | 7 | `GlobaltemSharedMemory` 索引查找效率 | **低** | 每条规则每次执行 | | 8 | cron 进程 DB2 写入无批处理 | **低** | cron 执行时间 | | 9 | `AlgBase::lm` 锁职责不清 | **低** | 锁竞争局部放大 | | 10 | task 进程 `HandlerExec` 泄漏 | **低** | 内存 | --- ## 问题 1:`update_map_rule()` 全局互斥锁瓶颈 ### 现状 ``` AlgBase::exec_mon_call() └→ get_update_rule_stat_cycled() // ~500ms + random offset └→ update_map_rule() └→ std::lock_guard guard(lm) // 规则级锁 └→ EqpStat::update_dynamic(rule_id_, rule_stat_) └→ mapRuleStat.update_dynamic(key, value, true) └→ std::lock_guard guard(llmtx) // 全局锁! └→ p_msg_map->operator[](key).update_dynamic(value) └→ boost::interprocess 共享内存 map 操作 ``` **根因**: `MapRuleStat` 底层使用 `boost::interprocess::managed_mapped_file` 共享内存 map,所有操作必须竞争同一个全局 `llmtx`。假设有 500 条规则,15 个 HandlerExec 线程,每隔约 500ms(带随机偏移)都会触发一次 `update_map_rule()`。即使有随机偏移,在高并发下仍有大量线程同时争抢这把锁。每次操作包含: 1. `thread_local` key 赋值 2. `map.find()` 查找 3. `map.operator[]` 写入(可能触发共享内存分配) 4. `RuleStat::update_dynamic()` — 5 个字段赋值、2 个 `vector_s` 共享内存 vector 复制 其中第 3 步涉及 boost::interprocess 的内存分配器,在共享内存段中分配节点,是**重操作**。 ### 解决方案 #### 方案 A:每线程本地缓存 + 定时批量刷入(推荐) ``` 修改前: N条规则 × 每500ms = N次独立 map 写操作,每次持全局锁 修改后: ┌─────────────────────────────────────────────┐ │ 每个 HandlerExec 线程维护 thread_local │ │ std::map local_cache_ │ │ │ │ exec_mon_call() 中: │ │ update_map_rule() → 只写 local_cache_ │ │ (无锁,无共享内存操作) │ │ │ │ 独立批量刷入线程 (100ms 间隔): │ │ 对每个 HandlerExec.local_cache_: │ │ lock(local_cache) → swap 出副本 │ │ 持全局锁一次,批量写入共享内存 map │ └─────────────────────────────────────────────┘ ``` **实现要点**: - 每个 `HandlerExec` 的 `run_thread()` 中增加一个 `thread_local` 或成员变量 `batch_update_cache_` - `update_map_rule()` 改为写入此本地缓存 - 在 `Manager` 中新增一个批量刷入线程,周期 100ms,收集各 handler 缓存后一次性持锁写入 - `GetDataJson()` 优先从共享内存读(已有逻辑),批量刷新保证数据最终一致 **收益**: 500 条规则 × 每 500ms → 降为 15 个线程 × 每 100ms 批量写 = 150 次/秒 → 约 10 次/秒(批量合并后)。全局锁竞争减少 **50 倍以上**。 #### 方案 B:改用读写锁(短期缓解) 当前 `llmtx` 是 `std::mutex`(互斥锁),但代码中已注释了 `std::shared_mutex`: ```cpp // RuleStatShm.h 第26行 // std::shared_mutex local_rwlock; ``` - `update_dynamic()` / `update()` / `add_stat_value()` — 写操作,用 `unique_lock` - `size()` / `empty()` / `GetDataJson()` — 读操作,用 `shared_lock` **注意**: `GetDataJson()` 已经做了"快照复制后释放锁再序列化"的优化,这很好。但 `update_dynamic()` 之间依然互斥。如果只是简单改为读写锁,写操作之间的竞争不会改善。此方案**只能缓解读写竞争**,不能解决写-写竞争。 #### 方案 C:按 ruleId 哈希分段锁 ```cpp // 用 N 个 segment 的 map + 锁数组替代单一全局 map + 单一全局锁 static constexpr int SEGMENTS = 16; struct Segment { std::mutex mtx; MapRuleStat_s* map; }; Segment segments_[SEGMENTS]; auto& seg = segments_[std::hash{}(key) % SEGMENTS]; std::lock_guard guard(seg.mtx); seg.map->operator[](key).update_dynamic(value); ``` **收益**: 锁竞争降低为原来的 1/SEGMENTS。 **推荐**: 综合使用 **方案 A + 方案 C**。先做本地缓存批量刷入(方案 A),如果仍有需要,将全局 map 分段(方案 C)。 --- ## 问题 2:线程分配不均衡 ### 现状 ```cpp // threads/manager.cc 第283-285行 auto alg_thread_name = std::to_string(algId) + "_" + std::to_string(data_source) + "_" + std::to_string(task_seq); ``` 规则按 `algId_dataSource_taskSeq` 分配到线程。但 `taskSeq` 来自 DB2 `T_RULE_CFG.TaskSeq` 字段,**绝大多数规则的 taskSeq 为 0**。 后果:同类算法(如 algId=2,最常见)的所有规则全部塞进一个 `HandlerExec` 线程,该线程中串行执行 `exec_mon_call()`。 假设: - algId=2 有 200 条规则 - 每条规则 `delay_time_` = 100ms - `exec_mon_call()` 平均执行时间 = 0.5ms(含表达式求值) - 单线程执行一轮 = 200 × 0.5ms = 100ms 这已经接近甚至超过 delay_time。如果某些规则有复杂表达式或需要查 IHDB,会严重阻塞同线程的其他规则。 ### 解决方案 #### 方案 A:支持 taskSeq 配置(立即可用) 在 DB2 `T_RULE_CFG` 中为高负载规则设置不同的 `TaskSeq`(1, 2, 3...),使它们分散到不同线程。这是现有架构已支持的,只需配置。 **限制**: 需要人工划分,且 taskSeq 同时影响 task 进程的线程分配。 #### 方案 B:自动按规则数量拆分线程 ```cpp // 当某 algId_dataSource 线程中规则数 > 阈值时自动拆分 const size_t MAX_RULES_PER_THREAD = 50; auto base_name = std::to_string(algId) + "_" + std::to_string(data_source); int thread_index = 0; auto candidate = base_name + "_" + std::to_string(task_seq); // 如果已有线程的规则数超限,分配到新线程号 while (handles_.find(candidate) != handles_.end() && handles_[candidate]->size() >= MAX_RULES_PER_THREAD) { thread_index++; candidate = base_name + "_" + std::to_string(taskSeq) + "s" + std::to_string(thread_index); } ``` **收益**: 200 条同类规则自动分散到 4 个线程,每线程 50 条,总执行时间从 100ms 降至 25ms。 #### 方案 C:使用线程池替代固定线程映射 将 HandlerExec 改为提交到线程池执行,而非每个线程名固定一个线程。但改动较大,涉及生命周期管理。 **推荐**: 先采用 **方案 A** 作为手动优化,再实现 **方案 B** 作为自动均衡。 --- ## 问题 3:`GetDataJson()` 全量深拷贝 ### 现状 ```cpp // RuleStatShm.h GetDataJson() std::vector> snapshot; { std::lock_guard guard(llmtx); snapshot.reserve(p_msg_map->size()); for (const auto& [key, value] : *p_msg_map) { snapshot.emplace_back(key.c_str(), value); // 深拷贝 RuleStat } } ``` 虽然已做了"锁内复制、锁外序列化"优化,但 `value` 的复制是深拷贝,包括: - `vector_s items` (共享内存分配器的 vector) - `vector_d stat_values` (共享内存分配器的 vector) 500 条规则 × 平均每个 RuleStat ~2KB = ~1MB 的深拷贝,每秒调用一次(`TimeNotify` 事件 2 触发 `update_rule_stat_data()` → `get_stat_json()`)。 ### 解决方案 #### 方案 A:增量更新 + 本地 JSON 缓存 ``` 维护一个线程安全的本地 JSON 缓存: map json_cache_; // ruleId → JSON 片段 string full_json_cache_; // 完整 JSON bool dirty_ = true; // 脏标记 update_dynamic(ruleId, value): json_cache_[ruleId] = value.invert2json().dump() dirty_ = true get_stat_json(): if dirty_: 拼接所有 JSON 片段为完整 JSON full_json_cache_ = ... dirty_ = false return full_json_cache_ ``` #### 方案 B:使用共享内存中的预序列化字段 在 `RuleStat` 中增加一个 `char json_cache[512]` 固定大小字段,在 `update_dynamic()` 时同时更新此字段。`GetDataJson()` 直接从共享内存读预序列化的字符串,无需再调用 `invert2json()`。 **推荐**: **方案 A**,改动小,效果直接。配合问题 1 的本地缓存方案可自然实现。 --- ## 问题 4:`event_handler()` 持锁处理全量队列 ### 现状 ```cpp // handler_exec.cc 第231-232行 int HandlerExec::event_handler() { std::lock_guard guard(mutex_); // 持锁整个函数 while (!reset_queue_.empty()) { ... } while (!detach_queue_.empty()) { ... } while (!attach_queue_.empty()) { ... } while (!usable_queue_.empty()) { ... } if (!once_exec_queue_.empty()) { ... } } ``` `event_handler()` 在每次主循环迭代中调用。持锁期间,所有 `attach/detach/reset/set_usable/submit` 调用都被阻塞。虽然单次队列处理通常很快,但如果 attach 大量规则(如系统启动时),会长时间阻塞执行循环。 ### 解决方案 ```cpp int HandlerExec::event_handler() { // 将队列 swap 出来,最小化持锁时间 decltype(attach_queue_) local_attach; decltype(detach_queue_) local_detach; // ...其他队列同理 { std::lock_guard guard(mutex_); local_attach.swap(attach_queue_); local_detach.swap(detach_queue_); // ... } // 立即释放锁 // 无锁处理本地队列 while (!local_detach.empty()) { ... } while (!local_attach.empty()) { ... } // ... } ``` **收益**: 持锁时间从 O(队列长度) 降至 O(1) swap 操作。 --- ## 问题 5:`cache_data()` 超时忙等 ### 现状 ```cpp // eqpalg_icei.cpp 第82-89行 if (time_costs >= CACHE_OUTTIME) { // >= 19ms this->logger_->Error() << "共享内存消耗时间超时(ms):" << ...; next_wake_time = std::chrono::steady_clock::now(); // 立即重试 } else { next_wake_time += CACHE_OUTTIME; std::this_thread::sleep_until(next_wake_time); } ``` 当一次 `cache_data()` 耗时超过 19ms 时,不 sleep 直接重试。如果共享内存数据量很大或系统负载高,可能导致**连续失败 + 立即重试**,形成 CPU 忙等。 ### 解决方案 ```cpp int consecutive_timeouts = 0; const int MAX_CONSECUTIVE_TIMEOUTS = 3; if (time_costs >= CACHE_OUTTIME) { consecutive_timeouts++; if (consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS) { next_wake_time += CACHE_OUTTIME * 2; // 退避 consecutive_timeouts = 0; } else { next_wake_time = std::chrono::steady_clock::now(); } } else { consecutive_timeouts = 0; next_wake_time += CACHE_OUTTIME; std::this_thread::sleep_until(next_wake_time); } ``` --- ## 问题 6:每条规则独立调用 `update_dynamic` 无批处理 ### 现状 在 `exec_mon_call()` 中: ```cpp if (get_update_rule_stat_cycled()) { this->update_map_rule(); // 每条规则独立调用 } ``` 每条规则都独立触发全局锁 → 查共享内存 map → 写。配合问题 1 的方案 A,可改为本地缓存写入。 ### 补充优化 `AlgBase::update_map_rule()` 中还有一个规则级别的 `std::lock_guard guard(lm)`。这个锁同时被 `update_limit_alarm()`(事件 22222)使用。如果这两个操作频繁,规则级锁也会有竞争。建议拆分为两个独立的锁: ```cpp std::mutex lm_map_rule_; // 保护 update_map_rule() std::mutex lm_limit_alarm_; // 保护 update_limit_alarm() ``` --- ## 问题 7:`GlobaltemSharedMemory` 索引查找效率 ### 现状 ```cpp // gb_item_memory.cpp double GlobaltemSharedMemory::operator[](std::string tag_name) { // 每次调用在 unordered_map 中查找 tag_name → 索引映射 // 然后用索引去读缓冲区 } ``` mon 的每次表达式求值,每个 `m{tagN}` 变量都要调用一次 `operator[]`。每次调用涉及: 1. `unordered_map` 查找(字符串 hash) 2. 读取双缓冲区的 read_array[index] 如果一条表达式引用了 5 个 tag,200 条规则,每 100ms 执行一次 = 每秒 10,000 次 map 查找。 ### 解决方案 `AlgBase` 的 `init()` 中已经构建了 `m_tags` 列表。可以在 `ExpModule::update()` 时把 tag 在缓冲区中的**索引**预先解析并缓存,避免每次求值时的字符串查找: ```cpp // ExpModule 中预先解析索引 std::vector tag_indices_; void update() { for (size_t i = 0; i < tag_indices_.size(); i++) { mm_vars["tag" + std::to_string(i+1)] = read_buffer[tag_indices_[i]]; } } ``` --- ## 问题 8:cron 进程 DB2 写入无批处理 ### 现状 cron 进程中,`ExpBase::cron_proc()` 为每条规则独立执行: 1. 从 `EqpStat` 取统计值(持全局锁读共享内存) 2. DAA::STA 分布计算 3. 写 DB2(独立 SQL INSERT/UPDATE) 如果有 200 条需学习的规则,就是 200 次独立的 DB2 写入。 ### 解决方案 收集多条规则的统计结果后,使用 DB2 batch insert 或多行 INSERT 语句。但需确认 mix_cc::sql 是否支持批处理。如不支持,至少可以将 cron 的处理分散到多个 task_seq 线程中并行执行。 --- ## 问题 9:`AlgBase::lm` 锁职责不清 ### 现状 `AlgBase` 中的 `std::mutex lm` 同时保护: - `update_map_rule()` — 被 mon 执行循环高频调用 - `update_limit_alarm()` — 被事件 22222 低频调用 这两者频率差异巨大(~500ms vs 偶尔),但共享同一把锁。 ### 解决方案 拆分为两把独立锁,见问题 6 补充优化。 --- ## 问题 10:task 进程 HandlerExec 清理策略 ### 现状 ```cpp // manager.cc 第86-104行 // task 进程: 每 10ms 检查所有 handles_ 是否运行完毕 // 如果全部完成 → handles_.clear() ``` task 每执行一次就创建一个新的 `HandlerExec` 线程。线程执行完 `exec_task_call()` 后 `is_running_` 变为 false,但 `HandlerExec` 对象和线程资源要等到下一次 Manager 清理循环(10ms 间隔)才释放。 **潜在问题**: 如果短时间内发起大量 task 执行,handles_ map 会快速膨胀,且清理不及时。 ### 解决方案 在 `HandlerExec::run_thread()` 的 task 分支执行完后,设置一个 `finished_` 标志,Manager 的清理线程检测到后立即清理,而非等全量检查。 --- ## 优化实施建议优先级 | 优先级 | 问题 | 预期收益 | 改动风险 | |--------|------|----------|----------| | **P0** | 问题 1: update_map_rule 全局锁 | 页面刷新延迟降低 80%+ | 中 | | **P0** | 问题 2: 线程分配不均衡 | 规则执行吞吐量提升 2-4 倍 | 低(先手动配置 taskSeq) | | **P1** | 问题 4: event_handler 锁持有时间 | CRUD 操作响应更快 | 低 | | **P1** | 问题 5: cache_data 退避 | 避免 CPU 忙等 | 低 | | **P1** | 问题 6: 拆分 AlgBase::lm | 减少局部锁竞争 | 低 | | **P2** | 问题 3: GetDataJson 增量更新 | 序列化开销降低 | 中 | | **P2** | 问题 7: tag 索引预解析 | 表达式求值加速 20-30% | 中 | | **P3** | 问题 8: cron 批处理 | cron 执行时间缩短 | 中 | | **P3** | 问题 10: task 线程清理 | 内存及时释放 | 低 | ## 架构层面的长远建议 ### 1. 考虑将页面数据从共享内存 map 迁移到 Redis/Memcached 当前使用 `boost::interprocess::managed_mapped_file` 共享内存 map 来存储规则统计信息给页面读取。这个设计导致所有写操作必须竞争一个进程级全局锁。如果改为: - eqpalg 写统计数据到 **Memcached**(已有 `UpDateData` 写入 Memcached,可复用) - 页面从 Memcached 读取 则可以完全消除共享内存 map 的锁竞争问题。Memcached 本身是高性能的,且天然支持并发读写。 ### 2. 考虑将 HandlerExec 线程模型改为任务队列 + 线程池 当前每个 `algId_dataSource_taskSeq` 组合固定分配一个线程,导致线程数 = 算法类型组合数,且规则分配不均。改为: - 一个固定大小的线程池(如 CPU 核数 × 2) - 规则执行包装为 task 提交到队列 - 线程池按照规则 deadline(`delay_time_`)调度执行 这样可以更均衡地利用 CPU,避免某些线程过载而其他线程空闲。 ### 3. 统一三个进程的代码路径 当前 mon/cron/task 共用一个 HandlerExec 类但行为差异巨大(通过 `glob_process_type` 判断分支)。建议将三个进程的执行逻辑拆分为独立的类,提高可读性和维护性,也便于针对性优化。