# eqpalg — 设备算法监控引擎 ## 概述 eqpalg(**Eq**uipment **P**rocess **Alg**orithm)是 EIS 系统中的核心设备监控与报警引擎。它基于宝信 PACE 平台构建,通过 ICE 中间件与系统内其他进程通信,从共享内存实时读取 PLC 数据,根据 DB2 中配置的规则,以数学表达式引擎对设备运行状态进行实时监控、统计分析和报警。 ## 进程架构 eqpalg 以 **3 个进程实例** 的形式运行,部署为不同的 PACE 组件名,各行其责: | 进程名 | ProcessType | 功能 | |--------|-------------|------| | `eqpalg-mon` | kMon | **实时监控进程** — 从共享内存双缓冲区读取实时 PLC 数据(~19ms 刷新一次),周期性执行所有启用的规则(执行间隔可配置,最小 20ms),产出实时报警 | | `eqpalg-cron` | kCron | **定时学习进程** — 每 1 秒轮询一次,从 `RuleStatShm`(由 mon 累积的统计值)中取出统计数据,通过 `DAA::STA` 分布分析类计算置信区间,将学习结果写回 DB2(`T_SAMPLE_STAT`、`T_SAMPLE_MAG`)。默认每 24 小时更新一次 | | `eqpalg-task` | kTask | **单次执行进程** — 响应用户发起的按需分析任务,从 iHyperDB 拉取指定时间范围的历史数据,执行算法并返回结果(含报警回测、多项式拟合、相关性计算等) | **三者协作**:eqpalg-mon 收到 99999 电文(规则 CRUD)时,通过 ICE 代理转发给 eqpalg-cron,确保 cron 的配置与 mon 保持同步。三个进程各自实例化独立的 `AlgorithmManager`,从 DB2 独立加载规则,但 mon 的执行频率最高(毫秒级),cron 次之(秒级),task 按需触发。 ### 关键依赖 - **ICE** — 进程间通信,注册端点 `"baosight/eqpalg-{mon|cron|task}"` - **DB2** — 存储规则配置(`T_RULE_CFG`)、报警记录(`T_RULE_RESULT`)、样本统计(`T_RULE_SAMPLE_1D`、`T_SAMPLE_STAT`、`T_SAMPLE_FIT`、`T_SAMPLE_MAG`)、时间戳(`T_RULE_RECORD_TIME`) - **共享内存(SHM)** — 实时 PLC 数据的低延迟访问通道,基于 `MemFix` 双缓冲区设计 - **iHyperDB** — 工业历史数据库,供 cron/task 查询历史趋势 - **Boost** — 通用工具库(filesystem, serialization, stacktrace, container 等) - **dlib / mlpack / Armadillo / Eigen3** — 数值计算库(`DAA::STA` 统计分布分析、`DAA::LSM` 最小二乘拟合、矩阵运算) - **nlohmann/json** — JSON 解析 ## 核心工作流 ``` 现场PLC/传感器 │ ▼ DSF 数据采集 ──→ 共享内存 MemFix │ ▼ GlobaltemSharedMemory (双缓冲区, ~19ms刷新) │ ├──→ eqpalg-mon: mm_vars → matheval 表达式引擎 → mon_proc() │ │ │ ├──→ 报警 → AlarmPoster → CMemQueue / ICE MQ / DB2 │ ├──→ 正常数据 → ExpModule → DB2 T_RULE_SAMPLE_* │ ├──→ 统计累积 → RuleStatShm (供 cron 学习) │ └──→ 页面数据 → EqpStat → Memcached │ ├──→ eqpalg-cron: RuleStatShm → DAA::STA → 置信区间 → DB2 │ └──→ eqpalg-task: iHyperDB(指定时间范围) → 拟合/统计/回测 ``` ### 数据源模式 每个规则可以配置两种数据源(`DataSource`枚举): | 模式 | 值 | 说明 | |------|---|------| | MEMORY | 1 | mon 进程默认:从 `GlobaltemSharedMemory` 双缓冲区实时读取,延迟低至 20ms | | IHDB | 0 | cron/task 进程使用:通过 `mix_cc::ihd` API 从 iHyperDB 查询历史数据 | ### 共享内存双缓冲区 `GlobaltemSharedMemory` 是 eqpalg 的数据中枢: - **写侧**:`mem_cached_thread_`(仅在 mon 中)以 ~19ms 为周期,使用 `CLOCK_MONOTONIC` 单调时钟计时,从 `MemFix` 顺序读取所有 tag 值,写入 write 缓冲区后执行 swap(通过 `std::shared_mutex` 保护) - **读侧**:监控线程无锁读取 read 缓冲区,不会与写操作竞争 - **超时处理**:若一次刷新超过 19ms,记录 Error 日志并立即开始下一次循环,不再 sleep - **索引构建**:构造函数中遍历 `BinaryTele` 电文定义,将 tag 名→缓冲区索引的映射一次性构建 ### ICE 电文接口 `EqpAlgICEI` 继承自 `MessageICE`,处理三类调用: **SendDataShort(eventNo, seq)** — 同步消息分发: | eventNo | 用途 | 处理逻辑 | |---------|------|----------| | 99999 | 规则 CRUD + 单次执行 | JSON `{"eventNo":N, "ruleId":"...", "algId":N, ...}` → `AlgorithmManager::dispose()`。mon 收到后通过 ICE 代理转发给 cron | | 11111 | 外部原始报警 | 直接调用 `AlarmPoster::alarm(str)` | | 22222 | 规则上下限更新 | JSON `{"ruleid":"...", "lb":N, "ub":N, "va":N, ...}` → `update_limit_alarm()` | **SendDataLong** — 复杂数据接口(未实现) **TimeNotify(eventNo)** — 定时器回调: | eventNo | 用途 | 触发周期 | |---------|------|----------| | 1 | 更新设备运行状态到 Memcached | ~1秒 | | 2 | 更新规则统计共享内存数据 | ~1秒 | | 5 | 向 dsm 发送 ruleId 列表和共享内存统计 | 定期 | ### 规则生命周期(eventNo=99999) ``` 页面/外部 → ICE 电文(JSON) → AlgorithmManager::dispose() │ ┌───────┬────────┬──────────┼──────────┬────────┐ ▼ ▼ ▼ ▼ ▼ ▼ kCreate kDelete kUpdate kEnable kReset kExec (1) (0) (2) (3) (4) (10) ``` - **kCreate(1)**: `thread_manager_.storage()` → `attach()` → 启动线程执行 → 更新 DB2 `T_RULE_RECORD_TIME` - **kDelete(0)**: `delete_instance()` → 停止线程 → 删除 DB2 记录 - **kUpdate(2)**: `detach()` → `storage()` → `attach()` → 更新修改时间 - **kEnable(3)**: `thread_manager_.enable(ruleId, usable)` → 更新 DB2 时间 - **kReset(4)**: 重置算法 6(累计时间)和 7(出现次数)的统计值 - **kExec(10)**: 单次执行,从 DB2 重新读取最新配置,创建独立线程运行 `exec_task(time_range)` ## 线程模型 ### 三层调度 ``` threads::Manager ← 顶层:按 TaskSeq 分配线程,维护 handles_ map │ ▼ HandlerExec ← 中层:单个线程的执行句柄,管理队列和规则集 │ ▼ AlgBase (子类实例) ← 底层:具体算法执行 ``` ### HandlerExec 执行循环 每个 `HandlerExec` 运行在独立线程中,线程名格式为 `alg_{algId}_{dataSource}_{taskSeq}`(cron 加后缀 `_cron`,task 加后缀 `_task`): - **mon 模式**:遍历规则,对每条调用 `exec_mon_call()`。每条规则执行间隔由配置的 `delay_time_` 控制。若单次执行超过 20ms,跳过 sleep 立即下一轮 - **cron 模式**:每秒轮询一次,对每条规则调用 `exec_cron_call()` - **task 模式**:每秒轮询一次,每条规则执行一次后从活动集中移除 ### 线程安全 通过多级队列实现无锁化动态增删: | 队列 | 用途 | |------|------| | `attach_queue_` | 新增算法排队加入 | | `detach_queue_` | 待删除的规则 ID 列表 | | `reset_queue_` | 待重置统计的规则 ID | | `usable_queue_` | 待设置启/停用的 `(ruleId, bool)` | | `once_exec_queue_` | task 单次执行请求 `(algo_ptr, time_range)` | 每轮 `event_handler()` 清空所有队列,修改活动规则集。 ### 随机偏移(防惊群) `AlgBase::init()` 使用 `XorShift128Plus` 随机数生成器为 `save_interval_ms_` 和 `rule_state_update_interval_ms_` 添加随机毫秒偏移,避免大量规则同时写 DB2。 ## 算法体系 所有算法继承自 `AlgBase`,基类提供: - 表达式解析引擎(`mix_cc::matheval::Expression`) - 双数据源(共享内存 / iHyperDB) - 执行间隔控制、心跳日志、前提条件判断 - 报警保存、统计累积 ### 表达式变量系统 表达式引擎中可用以下特殊变量(通过宏替换注入实际值): | 变量 | 含义 | |------|------| | `m{tagN}` | tagN 的当前值(如 `m{tag1}`) | | `p{tagN}` | tagN 上一次执行时的值 | | `s{tagN}` | 动作开始时的快照值 | | `mx_tagN` | tagN 的历史最大值 | | `mi_tagN` | tagN 的历史最小值 | | `mv2_tagN` | tagN 的累积变化量 | | `up_tagN` | tagN 的上限 | | `dw_tagN` | tagN 的下限 | | `time` | 当前运行时间 | | `stime` | 动作开始时间 | | `etime` | 动作结束时间 | | `now` | 当前时间点 | ### 自定义带状态函数 | 函数 | 含义 | |------|------| | `KeepT(N, T)` | tagN 为真后保持 T 分钟 | | `KeepC(N)` | tagN 为真后累计次数 | | `RiseEdge(N)` | tagN 的上升沿检测 | | `Detect(N, T)` | T 分钟内检测 tagN 是否为真 | | `hold(N, T)` | 将 tagN 保持 T 分钟不变(hold 变量有独立的状态管理) | ### 算法详细说明 #### 1-5: ExpBase — 表达式类算法 所有基于表达式的算法共享 `ExpBase`,通过 `exp_type_` 区分行为: | exp_type_ | algId | 名称 | 核心逻辑 | |-----------|-------|------|----------| | Logic (1) | 1 | 实时逻辑判断 | 评估 `exp_act_` 表达式,结果为真即报警 | | Bound (2) | 2 | 变量上下限 | 计算 `exp_result_` 值,与下限/上限比较。支持在线自学习:`is_learning_=true` 时累积统计值到 `EqpStat`,cron 定期用 `DAA::STA` 计算新置信区间写回 DB2 | | ActionFeedBack (3) | 3 | 动作反馈-逻辑 | 跟踪状态机:`act_start_done()` → 超时/保持检测 → `act_done()` → 反馈表达式评估 | | CondBound (4) | 4 | 动作反馈-上下限 | 与 3 相同流程,反馈后用上下限判断 | | BoundHoldTime (5) | 5 | 上下限-保持时间 | 仅在超限持续超过 `hold_time_` 毫秒后才触发报警 | **动作反馈状态机**(algId 3/4): ``` act_start_done() ──→ act_not_hold() ──→ 停止 │ (未保持) │ act_timeout() ──→ 超时报警 │ ▼ act_done() → 反馈评价 → 正常/报警 ``` #### 6-7: ExpTimes — 累计统计 | algId | 名称 | 逻辑 | |-------|------|------| | 6 | 运行时间累计 | 跟踪条件满足的持续时间,累积到 `running_time`(小时),超过 `max_time_` 阈值报警。结果持久化到 DB2 `T_RULE_SAMPLE_1D` | | 7 | 出现次数累计 | 跟踪条件满足的次数,累积到 `shear_times`,超过 `max_times_` 阈值报警 | 两种累计均支持通过 kReset 事件重置。 #### 8, 14: TrendSlope2 / TrendSlope3 — 趋势检测 | 类 | algId | 检测方式 | |----|-------|----------| | `TrendSlope2` | 8 | **原始值斜率**:在 `deltaX_` 间隔内查询 iHyperDB,计算连续间隔均值之间的斜率 `deltaY / deltaX`,若在连续 `CS_AVG_SIZE_` 个间隔中斜率超过 `limit_slope_` 则报警 | | `TrendSlope3` | 14 | **百分比变化**: 斜率为 `100 * deltaY / avg[i-1]`,适用于量级不同的变量,实现归一化 | #### 9, 16, 18: Roller2 / Roller3 — 离群检测 | 类 | algId | 检测方式 | |----|-------|----------| | `Roller2` | 9 | **基于均值**:计算所有变量的算术平均,任一个体偏离均值超过 `limit_over_%` 即报警 | | `Roller3` | 16 | **基于中位数(百分比)**:计算所有变量的中位数,偏离超过 `limit_warn_%`(警告)或 `limit_error_%`(错误)即报警,支持 `hold_time_` 持续判断 | | `Roller3` | 18 | **基于中位数(实际值)**:使用实际值阈值代替百分比 | Roller2/3 的典型应用场景是"轧辊组监控"——同组多个传感器(如多个轧辊温度计),检测其中某个是否明显偏离群体。 #### 10-11: FaultCode — 故障代码解析 维护一个 DB2 `T_LOV_FCODE` 查询缓存(`map2fcode_`),将整数值映射为故障名称和内容: | algId | code_type_ | 解析方式 | |-------|------------|----------| | 10 | 0 | **整体解析**:将故障码整数值在字典中查找对应描述 | | 11 | 1 | **按位解析**:检查 0-15 每一位,对每个置位位查找对应故障描述 | #### 12-13: ExpSample2D — 二维拟合与相关性 | algId | 名称 | mon 执行 | task 执行 | |-------|------|----------|-----------| | 12 | 多项式拟合 | 评估 X/Y 表达式,用 DB2 中存储的拟合系数计算 Y_fit,偏差超 `scale_ × |Y_fit|` 即报警 | 收集 (X,Y) 数据点,用 `DAA::LSM` 做最小二乘拟合,系数存入 `T_SAMPLE_FIT` | | 13 | 皮尔逊相关系数 | 累积到 `min_len_` 个数据点后计算皮尔逊 r,与存储值偏差超 `scale_` 报警 | 收集数据→计算皮尔逊系数→存入 `T_SAMPLE_FIT` | #### 15: GlitchDetection — 毛刺检测 将监控变量累积到固定大小数组(最大 2000 点),数据满后通过 `ProxPy` 将数据传递给外部 Python 进程("glitch" 模块)执行毛刺分析。是唯一与 Python 联动的算法。 #### 17: ExpBound — 双阈值限幅 单独配置一个值表达式,设置两层阈值: - `limit_warn_` — 警告阈值,触发 **WARN** 级别 - `limit_error_` — 错误阈值,触发 **ERROR** 级别 ### 已弃用算法(`.do_not_use/`) 样板曲线、CPC 检测、FFT 频谱统计、DTW 动态时间规整、一类 SVM 异常检测、KRR 核岭回归、区间分布统计、波形监控等实验性算法。保留代码供参考但不参与编译。 ## 报警机制 ### 报警通道 `AlarmPoster` 多渠道推送报警: ``` mon_proc() 返回 AlarmInfo │ ▼ AlarmPoster::alarm() │ ├──→ CMemQueue (内存队列) → AlarmHandler 线程 (500ms轮询) → 统一处理 ├──→ ICE MQ (baosight/zmqp) — 可选,用于消息队列服务 └──→ DB2 T_RULE_RESULT — 可选,persist 到数据库 ``` ### 去抖 `AlarmPoster` 强制每规则最小报警间隔(默认 5 分钟 × `AlarmIntervalHours`),防止信号振荡导致报警风暴。 ### 报警信息结构 ``` AlarmInfo { bool alarmed; json content; // 报警内容(含规则ID、名称、组、值、上下限等) time_point alarm_start_time; time_point alarm_end_time; ConfigInfo cfg_info; // rule ID / name / group / remark } ``` ## 表达式配置示例 DB2 `T_RULE_CFG.ruleParam` JSON 格式: ```json { "datasource": 1, "delay_time": 1000, "before_exec": "m{tag1} > 0", "function": { "exp_str": "m{tag1} > limit_up_ or m{tag1} < limit_down_", "exp_feedback": "m{tag10} > 0", "exp_result": "m{tag1} + m{tag2}", "limit_up_": 800, "limit_down_": 200, "hold_time": 5000, "is_learning_": true, "dist_mode_": 1, "time_out": 30000, "keep_mode": true, "max_times_": 100, "max_time_": 720, "unit": "℃" }, "tags": { "tag1": ["C308_TEMP_01", "1号温度"], "tag2": ["C308_TEMP_02", "2号温度"] } } ``` ## 目录结构 ``` eqpalg/ ├── eqpalg.cpp/h # 进程入口,根据进程名确定类型 ├── eqpalg_icei.cpp/h # ICE 接口,含共享内存缓存线程 ├── algorithm_manager.cpp/h # 算法管理器:DB2 加载 + 规则 CRUD ├── build_algorithm.cpp/h # 算法工厂:algId → 具体类 ├── alg_base.cpp/h # 所有算法的基类 ├── gb_logger.cpp/h # 全局日志 ├── gb_item_memory.cpp/h # GlobaltemSharedMemory 双缓冲区 ├── algs/ # 18 种算法实现 │ ├── exp_base.* # algId 1-5: 表达式/上下限/动作反馈 │ ├── exp_times.* # algId 6-7: 累计时间/次数 │ ├── exp_bound.* # algId 17: 双阈值限幅 │ ├── exp_sample2D.* # algId 12-13: 拟合/皮尔逊相关性 │ ├── trend_slope2.* # algId 8: 原始斜率 │ ├── trend_slope3.* # algId 14: 百分比变化率 │ ├── roller2.* # algId 9: 均值离群 │ ├── roller3.* # algId 16,18: 中位数离群 │ ├── fault_code.* # algId 10-11: 故障码解析 │ ├── glitch_detection.* # algId 15: 毛刺检测 → Python │ └── null.h # 未匹配的空算法 ├── threads/ # 多线程调度 │ ├── manager.* # 按 TaskSeq 分组,维护 HandlerExec │ └── handler_exec.* # 单线程执行句柄 + 5 种队列 ├── exp_macro/ # 表达式宏替换(delta/persist/hold/max/min) ├── feature_extraction/ # DAA::STA(统计) / DAA::LSM(拟合) / 分布分析 ├── utility/ # 工具类 │ ├── alarm_poster.* # 报警多渠道推送 + 去抖 │ ├── alarm_handler.hpp # CMemQueue 消费线程 │ ├── build_alarm_info.* # 报警 JSON 构造 │ ├── condition_monitor.hpp# 正常数据保存条件判断 │ ├── eqp_stat.* # RuleStatShm 统计管理 │ ├── eqp_status.* # 设备运行状态读取 │ ├── ExpModule.* # 表达式加载与变量管理 │ ├── HoldTime.* # hold(N,T) 状态保持 │ ├── update_data.* # 规则数据写入 Memcached │ ├── proxy_py.* # C++→Python 通信(共享内存 + ICE) │ ├── StatExp.hpp # 带状态自定义函数 │ ├── VarsCache.hpp # mm_vars 预分配缓存 │ └── XorShift128Plus.hpp # 随机数(防惊群) ├── table_struct/ # DB2 ORM: T_RULE_CFG / T_RULE_RESULT 等 ├── define/ # 枚举常量:ProcessType / ExpType / EventCase 等 ├── doc/ # Doxygen 文档 ├── .do_not_use/ # 弃用算法(不参与编译) └── CMakeLists.txt ``` ## 数据流全景 ``` PLC/传感器 │ ▼ DSF/OPC ──→ 共享内存 MemFix │ ├──→ GlobaltemSharedMemory (双缓冲, ~19ms) │ │ │ ├──→ eqpalg-mon: mm_vars → 表达式引擎 → 报警/统计 │ │ │ │ │ ├──→ AlarmPoster → DB2 T_RULE_RESULT / ICE MQ │ │ ├──→ 正常数据 → DB2 T_RULE_SAMPLE_* │ │ ├──→ 统计累积 → RuleStatShm → cron 学习 │ │ └──→ 页面数据 → EqpStat → Memcached │ │ │ ├──→ eqpalg-cron: RuleStatShm → DAA::STA → CI → DB2 │ │ │ └──→ eqpalg-task: iHyperDB → 拟合/统计 → DB2 │ └──→ dsm: ZONE10 → DataQuery → /dscdata/Rule*Data/*.json ``` ## 启动流程 1. `eqpalg::start()` — 根据模块名确定 `glob_process_type` 2. 创建 `EqpAlgICEI` — 注册 ICE + 初始化共享内存缓存线程(仅 mon) 3. `AlgorithmManager` — 从 DB2 `T_RULE_CFG` 加载所有规则配置 4. `threads::Manager::start()` — 按 `TaskSeq` 分组创建 `HandlerExec` 线程 5. mon: 额外启动 alarm handler 线程(500ms 轮询 CMemQueue) 6. mon: `mem_cached_thread_` 开始以 ~19ms 间隔刷新 `GlobaltemSharedMemory` ## 相关进程 | 进程 | 关系 | |------|------| | **dsm** | eqpalg-mon 的 `TimeNotify(5)` 向 dsm 推送 ruleId 列表;eqpalg 将规则配置写入共享内存 ZONE10 供 dsm 读取归档 | | **eqpalg-cron** | eqpalg-mon 收到 99999 电文时通过 ICE 代理转发给 cron,保持配置同步 | | **RICS** | RICS 读取同一条 `T_RULE_CFG` 配置,格式化为 JSON 推送到 Memcached 供前端展示,与 eqpalg 是"展示面"与"执行面"的关系 | | **eqpm** | eqpalg 产生报警写入 `T_RULE_RESULT` → eqpm 查询 `FV_RESULT_JOIN_EQPID` 获取报警统计用于设备状态面板 | | **前端/页面服务** | 通过 ICE 发送规则 CRUD 指令;通过 Memcached 读取规则状态和设备数据 | ## 作者 - Cat (null.null.null@qq.com) - 宝信软件(Baosight Co. Ltd.) - 版本: 0.1 (2021-09)