# eqpalg 算法模板功能阐述文档 > 基于源码阅读,2026-05-15 --- ## 一、架构总览 ### 继承层次 ``` AlgBase ← 所有算法基类(数据获取、报警发送、线程调度) ├── ExpBase ← 表达式驱动算法基类(表达式求值、反馈状态机、统计学习) │ ├── ExpTimes ← 运行时间累计 / 出现次数累计(Alg 6/7) │ ├── ExpSample2D ← 多项式拟合 / 皮尔逊相关系数(Alg 12/13) │ ├── ExpBound ← 数据超限幅值(Alg 17) │ ├── Roller2 ← 同组离群监测(Alg 9) │ └── Roller3 ← 多变量离群检测(Alg 16/18) ├── TrendSlope ← 斜率监控 v1(已废弃,build_algorithm 未使用) ├── TrendSlope2 ← 斜率监控 v2(Alg 8,绝对变化量) ├── TrendSlope3 ← 斜率监控 v3(Alg 14,百分比变化率) ├── Roller ← 负载平衡/离群检测 v1(已废弃,build_algorithm 未使用) ├── FaultCode ← 故障代码解析(Alg 10/11) ├── GlitchDetection ← 毛刺检测(Alg 15) └── Null ← 空算法(未实现算法的占位符) ``` ### 算法 ID 与类映射(build_algorithm.cpp) | Alg ID | 类 | exp_type | 功能 | |--------|-----|----------|------| | 1 | ExpBase | 1 | 实时逻辑判断 | | 2 | ExpBase | 2 | 监控变量-上下限 | | 3 | ExpBase | 3 | 动作反馈-逻辑判断 | | 4 | ExpBase | 4 | 动作反馈-上下限 | | 5 | ExpBase | 5 | 监控变量-上下限-持续 | | 6 | ExpTimes | 6 | 运行时间累计 | | 7 | ExpTimes | 7 | 出现次数累计 | | 8 | TrendSlope2 | - | 斜率监控(绝对变化量) | | 9 | Roller2 | -1 | 同组离群监测 | | 10 | FaultCode | 0 | 故障代码1(整体解析) | | 11 | FaultCode | 1 | 故障代码2(按位解析) | | 12 | ExpSample2D | 12 | 多项式拟合 | | 13 | ExpSample2D | 13 | 线性相关性(皮尔逊) | | 14 | TrendSlope3 | - | 斜率监控(百分比变化率) | | 15 | GlitchDetection | - | 毛刺检测 | | 16 | Roller3 | 16 | 多变量离群检测(百分比偏差) | | 17 | ExpBound | 17 | 数据超限幅值 | | 18 | Roller3 | 18 | 多变量离群检测(绝对值偏差) | ### 三种执行进程 每个算法可在三种上下文中运行: - **mon** — 实时监控进程(~20ms 周期),从共享内存或 iHyperDB 获取实时数据,执行 `exec_mon()` 产生报警 - **task** — 按需历史回测进程,从 iHyperDB 获取指定时间范围数据,执行 `exec_task()` 逐时间步长分析 - **cron** — 定时统计学习进程,周期性收集 mon 进程累积的样本数据,执行 DAA::STA 分布统计并写入 DB2 --- ## 二、各算法详细阐述 ### 2.1 ExpBase(Alg 1-5)— 表达式驱动算法 **继承关系**:`AlgBase` → `ExpBase` **核心能力**: - 表达式引擎:通过 `mix_cc::matheval::Expression` 解析和执行用户配置的数学表达式 - 变量系统:`mm_vars` 映射管理所有变量(tag值、pv历史值、s快照值、mv2累积值等) - 动作反馈状态机:支持带反馈的监控流程(开始→保持→结束→超时) - 统计学习:DAA::STA 分布统计(cron 进程定期学习数据分布区间) - 双数据源:共享内存(DataSource::MEMORY=1)或 iHyperDB(DataSource::IHDB=0) **五种 exp_type(算法模板)**: #### Alg 1 — 实时逻辑判断 - 评估前提表达式 `exp_act_`,若结果为 true 则立即报警 - 无上下限,无统计学习,无反馈流程 - 报警内容来自配置的 `output.error` #### Alg 2 — 监控变量-上下限 - 评估 `exp_act_` 获得当前值,与 [limit_down_, limit_up_] 比较 - 支持自学习模式:`is_learning_=true` 时将数据喂入 EqpStat → DAA::STA 分布统计 - 支持数据筛选表达式 `exp_feedback_`(filter_exp),仅满足时才参与统计 - 支持三种检测模式: - Default:双侧检测(value < limit_down 或 value > limit_up) - OnlyLeft:仅左边界(-32768 哨兵值表示无左边界 → 仅检测 > limit_up) - OnlyRight:仅右边界(32768/32767 哨兵值 → 仅检测 < limit_down) - cron 进程定期将累积样本写入 DB2 的 T_SAMPLE_STAT / T_SAMPLE_MAG #### Alg 3 — 动作反馈-逻辑判断 - 包含完整反馈状态机: 1. `act_start_done()`:前提条件满足时,记录开始时间 stime,快照各 tag 值(s[n]、mx_tag、mi_tag),初始化 mv2/up/dw 累积变量 2. `act_not_hold()`:若配置了保持模式(keep_mode),前提条件不满足时退出 3. `act_done()`:反馈条件满足时,计算结果表达式 `exp_result_`,判断是否报警 4. `act_timeout()`:超时时重置累积变量 - 无上下限检测,报警基于结果表达式的布尔值或数值 #### Alg 4 — 动作反馈-上下限 - 结合 Alg 2 和 Alg 3:反馈模式下,动作结束时评估结果值并检查上下限 - 支持 `m_timemode`:若结果表达式含 "time" 且 exp_type 为 CondBound,报警信息以 ms 为单位 - 支持自学习统计 #### Alg 5 — 监控变量-上下限-持续 - 类似 Alg 2,但增加 `hold_time_` 参数 - 前提条件满足且值超限后,需持续超限超过 hold_time_ 才报警 - 若值在 hold_time_ 内恢复正常,重置计时 - 报警后将计时器重置 **关键逻辑细节**: 1. **变量刷新**: - 共享内存模式:每次执行时将当前 tag 值存入 mm_vars["tagN"],旧值存入 mm_vars["pN"](前一周期值),pv 历史(pvN_0~pvN_5)循环移位 - iHyperDB 模式:遍历 queried_data_ 每行数据,逐行设置变量后调用 mon_proc 2. **首次填充(first_fill_mm_vars)**: - 将所有 pv 历史值(pvN_0~pvN_5)初始化为当前值,避免程序冷启动时差值计算异常 - 共享内存模式直接读当前值,iHyperDB 模式查询最近 10 秒数据 3. **hold(n,T) 函数**:表达式支持 `hold_N_HE` 子串,解析后在指定时间 T 分钟内保持 tag 值,用于防抖 --- ### 2.2 ExpBound(Alg 17)— 数据超限幅值 **继承关系**:`AlgBase` → `ExpBase` → `ExpBound` **功能描述**: - 监控单个表达式值是否超过预定义阈值 - 支持两级报警:Warning 和 Error - 配置中指定 `limit_warn`(警告阈值)和 `limit_error`(报警阈值) **执行流程(mon_proc)**: 1. 检查前提条件表达式 `exp_feedback_`(filter_exp),不满足直接返回(无报警) 2. 评估主表达式 `exp_act_` 获取当前值 3. 若当前值 > limit_warn_ → 报警 - 若 > limit_error_ → ERROR 级别 - 否则 → WARN 级别 **初始化特殊性**: - `limit_down_` 被设为哨兵值 -32768,实际仅检测上限 - 初始化时检查 `limit_warn_ > limit_error_`,若不符合则标记配置错误 --- ### 2.3 ExpTimes(Alg 6/7)— 时间/次数累计 **继承关系**:`AlgBase` → `ExpBase` → `ExpTimes` **功能描述**: - **Alg 6(HoldTimeAcc)— 运行时间累计**: - 前提条件满足时开始计时,不满足时停止计时 - 累计运行时间(单位:小时),`running_time += time/3600000`(ms→h 换算) - 若单次持续超过 `rw_time_`(默认10分钟),自动分段写入防止数据丢失 - 阈值单位:limit_time(小时) - **Alg 7(OccTimesAcc)— 出现次数累计**: - 每次前提条件满足时累加 1 - 阈值单位:limit_times(次) **持久化机制**: - mon 进程:通过 `AsyncDbWorker` 异步投递到后台线程写入 DB2(T_RULE_SAMPLE_1D 表),不阻塞 20ms 主循环 - rw_time_ 控制写入周期(默认10分钟),防止频繁 I/O - 同时更新共享内存 RuleStatShm 供 UI 读取 - 支持防溢出:累计值接近 `unsigned long::max()` / `double::max()` 时停止累计 **初始化恢复**: - 从 DB2 T_RULE_SAMPLE_1D 表读取上次持久化的累计值,实现进程重启后数据恢复 --- ### 2.4 TrendSlope2(Alg 8)— 斜率监控(绝对变化量) **继承关系**:`AlgBase` → `TrendSlope2` **功能描述**: - 监控单个 tag 在多个等间隔时间窗口内的均值变化趋势 - 检测是否存在连续 N 次的斜率超限 **配置参数**: - `interval_time`:查询均值的窗口时长(分钟,转为秒) - `deltaX`:时间步长(分钟,转为秒) - `CS_AVG_SIZE`:连续检测次数 - `diff`:斜率阈值(绝对变化量) - `need_tag`:指定监控的 tag(格式 "tagN",解析为列索引) **执行流程(exec_mon)**: 1. 固定回退 60 秒:`now_time_ = system_clock::now() - 60s` 2. 从 `now_time_ - deltaX * CS_AVG_SIZE` 开始,共查询 CS_AVG_SIZE+1 个窗口的均值 3. 逐对计算相邻窗口间的斜率:`f_slope ≈ (avg[i] - avg[i-1])`(四舍五入到3位小数) 4. 若任一对斜率未超限 → 立即退出,不报警(所有对必须连续超限) 5. 若连续 CS_AVG_SIZE 对全部超限 → 报警 **与原始 TrendSlope 的差异**: - 原始 TrendSlope:固定6个窗口,需3次连续超限 - TrendSlope2:可配置窗口数和连续次数,但要求 ALL 连续超限(更严格) **存在的潜在问题**: - ⚠️ `exec_mon()` 首行 `now_time_ = system_clock::now() - 60s` 会覆盖 `exec_task()` 循环中设置的 `this->now_time_`,导致 task 模式的时间参数被忽略 --- ### 2.5 TrendSlope3(Alg 14)— 斜率监控(百分比变化率) **继承关系**:`AlgBase` → `TrendSlope3` **功能描述**: - 与 TrendSlope2 结构几乎相同,核心差异在于斜率计算方式 - 使用**百分比变化率**而非绝对变化量 **与 TrendSlope2 的关键差异**: 1. 斜率比较方式: - TrendSlope2:直接比较 `f_slope > limit_slope_`(绝对量) - TrendSlope3:比较 `f_slope > abs(limit_slope_ * avg[i-1] / 100)`(limit_slope_ % × 前值) 2. f_slope_max 计算: - TrendSlope2:取绝对斜率最大值 - TrendSlope3:取百分比变化最大值 `100 * f_slope / avg[i-1]`,若前值为0则取绝对斜率 **存在的潜在问题**: - ⚠️ 同样存在 `now_time_ = system_clock::now() - 60s` 覆盖 task 时间的问题 --- ### 2.6 Roller(Alg 9 v1,已废弃)— 负载平衡/离群检测 **继承关系**:`AlgBase` → `Roller` **状态**:`build_algorithm.cpp` 中 Alg 9 已映射到 Roller2,此算法仅保留源码未使用 **原功能**: - 查询多个 tag 在 interval_time 内的均值 - 计算去头尾均值(去掉最大值和最小值后的均值) - 迭代寻找偏离均值超过 error_diff_% 的 tag - 每次迭代只标记并移除一个最异常的 tag,继续检测剩余 tag --- ### 2.7 Roller2(Alg 9)— 同组离群监测 **继承关系**:`AlgBase` → `ExpBase` → `Roller2` **功能描述**: - 对多组表达式(X1~X9,最多9个)分别求值,检测是否有值偏离组均值超过阈值 - 先评估前提表达式 pre_exp,不满足则跳过本轮检测 **执行流程(mon_proc)**: 1. 调用 `refresh_var_result()` 评估所有表达式 2. 若 pre_exp 条件不满足,直接返回 3. 计算所有 X 值的均值:`avg = sum(X_values) / Xsize` 4. 动态计算上下限:`[avg - |avg| * limit_over_, avg + |avg| * limit_over_]` 5. 遍历每个 X 值,若有超限则报警(指出具体哪个变量异常) **配置**: - limit_over_:百分比阈值(从配置的百分数除以100转为小数) - 每个表达式可配置 name(中文名称),报警消息中包含 --- ### 2.8 Roller3(Alg 16/18)— 多变量离群检测 **继承关系**:`AlgBase` → `ExpBase` → `Roller3` **功能描述**: - 对多个 tag 的实时值进行中位数离群检测 - 支持两种模式:百分比偏差(Alg 16)和绝对值偏差(Alg 18/OuterAct) **执行流程(mon_proc)**: 1. 检查前提条件 `exp_act_`(pre_exp),不满足则返回 2. 从 mm_vars 读取各 tag 的绝对值 3. 计算中位数 median_ 4. 根据 exp_type 计算允许偏差: - Alg 16(百分比):`deviation = |median| * limit_% * 0.01` - Alg 18(绝对值):`deviation = limit_value`(直接使用 limit 值) 5. 计算每个 tag 值与中位数的偏差 6. 找到最大偏差的 tag **报警逻辑(无持续时间要求,hold_time ≤ delay_time)**: - 检测最大偏差的 tag 值是否超出 [median - deviationWarn, median + deviationWarn] - 超出 warn 阈值 → WARN 级别 - 超出 error 阈值 → ERROR 级别 **报警逻辑(有持续时间要求)**: - 为每个 tag 维护状态机(last_alarm_state, last_start_time) - 连续超限且持续时间 > hold_time_ → 报警 - 若超限方向(上升/下降)改变,重新计时 **工具函数**: - `extractTagNumbers(expr)`:从 "tag1+tag2+…" 字符串中提取 tag 序号 - `calculateMedian(data)`:排序后取中位数 - `findMaxWithIndex(vec)`:返回最大值及其下标 - `get_up_down(value, is_up)`:基于 detect_mode 修正上下限(处理哨兵值) - `detect_up_down(value)`:判断值的超限状态 --- ### 2.9 FaultCode(Alg 10/11)— 故障代码解析 **继承关系**:`AlgBase` → `FaultCode` **功能描述**: - 从共享内存读取故障代码整数值,查表转换为可读的故障名称和描述 - 两种解析模式: **Alg 10(code_type=0)— 整体解析**: - 将故障代码作为整体在 map2fcode_ 中查找 - 若找到且启用(is_usable),返回故障名称+描述 - 若故障代码为 0 或不在表中 → 不报警 **Alg 11(code_type=1)— 按位解析**: - 对故障代码的 bit 0~15 逐位检查 - 每个置位且启用的 bit 对应的故障信息追加到报警消息 - 只要有任一启用的 bit 置位即报警 **初始化**: - 从 DB2 T_LOV_FCODE 表加载故障代码映射表(按 code_type 过滤) - 若查询失败或为空 → is_valid_=false,后续 mon 不执行 --- ### 2.10 GlitchDetection(Alg 15)— 毛刺检测 **继承关系**:`AlgBase` → `GlitchDetection` **功能描述**: - 累积指定长度的数据序列(dataX 表达式值),满后发送至 Python 分析模块进行毛刺检测 - 本身不做任何报警判断,报警由 Python 端生成 **执行流程(exec_mon)**: 1. 每次调用将当前 dataX 值写入 `data_[data_index_]`,索引递增 2. 当 `data_index_ >= data_size_`(数据收集完成): - 将数据通过 ProxPy(Python代理)插入共享缓存 - 通过 ProxPy 发送 JSON 参数(ruleid、时间范围、报警内容、glitch_per 等)到 Python 的 "glitch" 处理函数 - 重置 data_index_=0,开始新一轮收集 **前提条件处理**: - get_prr() 重写:若前提条件不满足,重置 data_index_ 和起始时间(丢弃已收集的部分数据) **配置参数**: - dataX:监控变量的表达式 - lenth:数据序列长度(钳制在 [100, MAXLEN]) --- ### 2.11 ExpSample2D(Alg 12/13)— 二维样本分析 **继承关系**:`AlgBase` → `ExpBase` → `ExpSample2D` **功能描述**: #### Alg 12(PolyFit)— 多项式拟合 - **task 进程**:从 iHyperDB 查询历史数据,收集样本 (X, Y),使用 DAA::LSM 最小二乘法进行多项式拟合(1~9阶),选出最优拟合阶数,将拟合系数和样本存入 DB2 - **mon 进程**:代入当前 X 值使用拟合系数计算预测 Y_Fit,若实际 Y 偏离 Y_Fit 超过 scale_% → 报警 #### Alg 13(PEAR)— 皮尔逊相关系数 - **task 进程**:同 PolyFit,样本收集后计算 Pearson 相关系数,存储到 DB2 - **mon 进程**: - 累积样本直到达到 min_len_(1000~200000) - 样本量不足时:前提条件满足则追加样本 - 样本量足够后:计算当前 Pearson 相关系数,与从 DB2 加载的基准 pear_coefs_ 比较,偏差超过 scale_% → 报警 - 计算后清空样本重新累积 **PearValue 函数细节**: - 使用 Eigen 向量化计算 - 计算 σ_x, σ_y, σ_xy - ⚠️ 离散度检查:若 `σ_x/m1 > 1` 或 `σ_y/m2 > 1` → 返回正常相关系数;否则返回 2(无效值)。即要求变异系数 CV > 1(标准差超过均值)才认为数据有效。此阈值可能过于严格。 --- ### 2.12 TrendSlope(v1,已废弃) **继承关系**:`AlgBase` → `TrendSlope` **状态**:`build_algorithm.cpp` 中未使用,Alg 8 已映射到 TrendSlope2 **原功能**: - 固定6个窗口(CS_AVG_SIZE=6),需连续3次斜率超限才报警 - 使用 iHyperDB 的 HD3_STATS_TYPE_ARITH_MEAN 直接获取均值(无需查询原始数据) - 斜率按绝对变化量比较 --- ### 2.13 Null — 空算法 **继承关系**:`AlgBase` → `Null` **功能**:所有虚函数返回空值/0,用于未实现算法的占位符 --- ## 三、关键公共机制 ### 3.1 动作反馈状态机(ExpBase 内) 适用于 Alg 3/4 及所有继承 ExpBase 且 feedback_mode_=true 的算法: ``` 前提条件满足(act_triggered_) → act_start_done(): 记录 stime, 快照 tagN→sN, 初始化 mv2/up/dw 变量 → 持续监控中... → act_not_hold(): keep_mode=1且前提条件不满足 → 退出 → act_timeout(): 超时 → 重置, 可选报警 → act_done(): 反馈条件满足 → 计算结果表达式, 判断超限/报警 ``` 关键变量及其含义(用于表达式中): - `tagN`:第N个tag当前值 - `pN`:第N个tag上一周期值 - `sN`:动作开始时刻第N个tag值(快照) - `stime`:动作开始时间(epoch ms) - `time`:动作已持续时间(ms),即 `now - stime` - `now`:当前时间(epoch ms) - `etime`:动作结束时间(ms) - `mx_tagN`:动作期间 tagN 最大值 - `mi_tagN`:动作期间 tagN 最小值 - `mv2_tagN`:动作期间 tagN==1 的累计次数(上升沿/布尔累计) - `mv2_pN`:动作期间 pN==1 的累计次数 - `up_tagN`:动作期间 pN==0 且 tagN==1 的次数 - `dw_tagN`:动作期间 pN==1 且 tagN==0 的次数 - `pvN_0~pvN_5`:tagN 最近6个周期的历史值 ### 3.2 统计学习(DAA::STA) 适用于 Alg 2/4/5(有上下限且启用自学习的算法): - **mon 进程**:每次执行将当前值通过 `SingletonTemp` 写入共享内存累积 - **cron 进程**:周期性读取累积值,喂入 `DAA::STA` 分布对象: - 首次(未初始化):计算数据范围,用 `range/STA_SIZE_MIN` 初始化分布 - 后续:逐条 `dist_add()` - 完成后 `store_db2()` 持久化 - **自学习区间更新**:`reload_ci_dist()` 根据 dist_mode: - 0(手动):不自动更新,使用配置的固定上下限 - 1(在线):从 T_RULE_FEATURE 表读取在线学习结果 - 2(离线):从 T_SAMPLE_MAG 表读取离线分析结果 ### 3.3 数据来源 - **DataSource::MEMORY(1)**:从共享内存 `GlobaltemSharedMemory` 实时读取 tag 值 - **DataSource::IHDB(0)**:从 iHyperDB 查询历史/实时数据 - 查询使用 `interval_time` 周期窗口 - 支持 delay_time 补偿(对齐数据到达延迟) - TrendSlope 系列固定使用 IHDB 数据源 ### 3.4 前提条件(PRR) - 所有算法继承自 AlgBase 的 PRR 机制 - `prr_` 字段:1=有条件,0=无条件 - 条件不满足时算法跳过本次执行(exec_mon 在 AlgBase 层即返回) --- ## 四、发现的潜在问题 ### 4.1 ⚠️ TrendSlope2/TrendSlope3 exec_mon 中的硬编码时间偏移 **位置**:`trend_slope2.cpp:90`, `trend_slope3.cpp:84` ```cpp now_time_ = system_clock::now() - 60s; ``` 此行在 `exec_mon()` 首行执行,会覆盖 `exec_task()` 循环中设置的 `this->now_time_`。导致 task 模式的逐时间步长遍历实际上每次都使用实时系统时间(减60秒),历史数据的 task 回测可能不准确。 ### 4.2 ⚠️ ExpSample2D::PearValue 的离散度检查可能过于严格 **位置**:`exp_sample2D.cc:274` ```cpp if (dis1 > 1 || dis2 > 1) { return sigma12 / sqrt(sigma1 * sigma2); // 正常计算 } else { return 2; // 标记为无效 } ``` 只有当变异系数 CV > 1(标准差 > 均值)时才返回有效相关系数。对于很多实际场景,CV < 1 的数据可能仍有有意义的相关系数。此逻辑可能导致大量有效样本被丢弃。 ### 4.3 Roller3 中 exp_result_ 可能未被使用 **位置**:`roller3.cpp:323-335` `init_X_exp()` 中将 `tags_exp_`(如 "tag1+tag2+tag3")编译为 `exp_result_`,但 `mon_proc()` 中并未调用 `exp_result_->evaluate()`,而是直接从 `mm_vars` 读取 tag 值。此表达式创建后被闲置。 ### 4.4 Roller2 init_X_exp 中 feedback_mode_ 设置 **位置**:`roller2.cpp:123` 循环中每次迭代设置 `feedback_mode_ = true`,但 Roller2 不使用 ExpBase 的反馈状态机,此字段对其行为无影响,但属于语义不准确的标记。 ### 4.5 TrendSlope v1 / Roller v1 保留但未使用 `trend_slope.cpp/h` 和 `roller.cpp/h` 仍存在于源码中,但 `build_algorithm.cpp` 已将 Alg 8/9 映射到 v2 版本。这些文件可能可以标记为废弃或移入 .do_not_use。 ### 4.6 TrendSlope2 报警条件比 TrendSlope 更严格 - TrendSlope:6次检查中至少3次连续超限即可报警 - TrendSlope2:要求配置的 CS_AVG_SIZE 次检查**全部**连续超限(任一不满足即退出且不报警) 这可能导致 TrendSlope2 对短暂波动过于敏感(?)—— 实际上,由于要求所有检查都超限,它对偶发波动是**欠敏感**的。这种差异可能是预期的设计意图,但值得确认。