- RICS_readme.md: Rule information centralized display service - eqpm_readme.md: Equipment predictive maintenance & status monitoring - dsm_readme.md: Data save manager for historical data archiving - eqpalg_readme.md: Corrected architecture, data flow, variable system, thread model, and inter-process relationships
20 KiB
eqpalg — 设备算法监控引擎
概述
eqpalg(Equipment Process Algorithm)是 EIS 系统中的核心设备监控与报警引擎。它基于宝信 PACE 平台构建,通过 ICE 中间件与系统内其他进程通信,从共享内存实时读取 PLC 数据,根据 DB2 中配置的规则,以数学表达式引擎对设备运行状态进行实时监控、统计分析和报警。
进程架构
eqpalg 以 3 个进程实例 的形式运行,部署为不同的 PACE 组件名,各行其责:
| 进程名 | ProcessType | 功能 |
|---|---|---|
eqpalg-mon |
kMon | 实时监控进程 — 从共享内存双缓冲区读取实时 PLC 数据(~19ms 刷新一次),周期性执行所有启用的规则(执行间隔可配置,最小 20ms),产出实时报警 |
eqpalg-cron |
kCron | 定时学习进程 — 每 1 秒轮询一次,从 RuleStatShm(由 mon 累积的统计值)中取出统计数据,通过 DAA::STA 分布分析类计算置信区间,将学习结果写回 DB2(T_SAMPLE_STAT、T_SAMPLE_MAG)。默认每 24 小时更新一次 |
eqpalg-task |
kTask | 单次执行进程 — 响应用户发起的按需分析任务,从 iHyperDB 拉取指定时间范围的历史数据,执行算法并返回结果(含报警回测、多项式拟合、相关性计算等) |
三者协作:eqpalg-mon 收到 99999 电文(规则 CRUD)时,通过 ICE 代理转发给 eqpalg-cron,确保 cron 的配置与 mon 保持同步。三个进程各自实例化独立的 AlgorithmManager,从 DB2 独立加载规则,但 mon 的执行频率最高(毫秒级),cron 次之(秒级),task 按需触发。
关键依赖
- ICE — 进程间通信,注册端点
"baosight/eqpalg-{mon|cron|task}" - DB2 — 存储规则配置(
T_RULE_CFG)、报警记录(T_RULE_RESULT)、样本统计(T_RULE_SAMPLE_1D、T_SAMPLE_STAT、T_SAMPLE_FIT、T_SAMPLE_MAG)、时间戳(T_RULE_RECORD_TIME) - 共享内存(SHM) — 实时 PLC 数据的低延迟访问通道,基于
MemFix<PLC_DATA>双缓冲区设计 - iHyperDB — 工业历史数据库,供 cron/task 查询历史趋势
- Boost — 通用工具库(filesystem, serialization, stacktrace, container 等)
- dlib / mlpack / Armadillo / Eigen3 — 数值计算库(
DAA::STA统计分布分析、DAA::LSM最小二乘拟合、矩阵运算) - nlohmann/json — JSON 解析
核心工作流
现场PLC/传感器
│
▼
DSF 数据采集 ──→ 共享内存 MemFix<PLC_DATA>
│
▼
GlobaltemSharedMemory (双缓冲区, ~19ms刷新)
│
├──→ eqpalg-mon: mm_vars → matheval 表达式引擎 → mon_proc()
│ │
│ ├──→ 报警 → AlarmPoster → CMemQueue / ICE MQ / DB2
│ ├──→ 正常数据 → ExpModule → DB2 T_RULE_SAMPLE_*
│ ├──→ 统计累积 → RuleStatShm (供 cron 学习)
│ └──→ 页面数据 → EqpStat → Memcached
│
├──→ eqpalg-cron: RuleStatShm → DAA::STA → 置信区间 → DB2
│
└──→ eqpalg-task: iHyperDB(指定时间范围) → 拟合/统计/回测
数据源模式
每个规则可以配置两种数据源(DataSource枚举):
| 模式 | 值 | 说明 |
|---|---|---|
| MEMORY | 1 | mon 进程默认:从 GlobaltemSharedMemory 双缓冲区实时读取,延迟低至 20ms |
| IHDB | 0 | cron/task 进程使用:通过 mix_cc::ihd API 从 iHyperDB 查询历史数据 |
共享内存双缓冲区
GlobaltemSharedMemory 是 eqpalg 的数据中枢:
- 写侧:
mem_cached_thread_(仅在 mon 中)以 ~19ms 为周期,使用CLOCK_MONOTONIC单调时钟计时,从MemFix<PLC_DATA>顺序读取所有 tag 值,写入 write 缓冲区后执行 swap(通过std::shared_mutex保护) - 读侧:监控线程无锁读取 read 缓冲区,不会与写操作竞争
- 超时处理:若一次刷新超过 19ms,记录 Error 日志并立即开始下一次循环,不再 sleep
- 索引构建:构造函数中遍历
BinaryTele电文定义,将 tag 名→缓冲区索引的映射一次性构建
ICE 电文接口
EqpAlgICEI 继承自 MessageICE,处理三类调用:
SendDataShort(eventNo, seq) — 同步消息分发:
| eventNo | 用途 | 处理逻辑 |
|---|---|---|
| 99999 | 规则 CRUD + 单次执行 | JSON {"eventNo":N, "ruleId":"...", "algId":N, ...} → AlgorithmManager::dispose()。mon 收到后通过 ICE 代理转发给 cron |
| 11111 | 外部原始报警 | 直接调用 AlarmPoster::alarm(str) |
| 22222 | 规则上下限更新 | JSON {"ruleid":"...", "lb":N, "ub":N, "va":N, ...} → update_limit_alarm() |
SendDataLong — 复杂数据接口(未实现)
TimeNotify(eventNo) — 定时器回调:
| eventNo | 用途 | 触发周期 |
|---|---|---|
| 1 | 更新设备运行状态到 Memcached | ~1秒 |
| 2 | 更新规则统计共享内存数据 | ~1秒 |
| 5 | 向 dsm 发送 ruleId 列表和共享内存统计 | 定期 |
规则生命周期(eventNo=99999)
页面/外部 → ICE 电文(JSON) → AlgorithmManager::dispose()
│
┌───────┬────────┬──────────┼──────────┬────────┐
▼ ▼ ▼ ▼ ▼ ▼
kCreate kDelete kUpdate kEnable kReset kExec
(1) (0) (2) (3) (4) (10)
- kCreate(1):
thread_manager_.storage()→attach()→ 启动线程执行 → 更新 DB2T_RULE_RECORD_TIME - kDelete(0):
delete_instance()→ 停止线程 → 删除 DB2 记录 - kUpdate(2):
detach()→storage()→attach()→ 更新修改时间 - kEnable(3):
thread_manager_.enable(ruleId, usable)→ 更新 DB2 时间 - kReset(4): 重置算法 6(累计时间)和 7(出现次数)的统计值
- kExec(10): 单次执行,从 DB2 重新读取最新配置,创建独立线程运行
exec_task(time_range)
线程模型
三层调度
threads::Manager ← 顶层:按 TaskSeq 分配线程,维护 handles_ map
│
▼
HandlerExec ← 中层:单个线程的执行句柄,管理队列和规则集
│
▼
AlgBase (子类实例) ← 底层:具体算法执行
HandlerExec 执行循环
每个 HandlerExec 运行在独立线程中,线程名格式为 alg_{algId}_{dataSource}_{taskSeq}(cron 加后缀 _cron,task 加后缀 _task):
- mon 模式:遍历规则,对每条调用
exec_mon_call()。每条规则执行间隔由配置的delay_time_控制。若单次执行超过 20ms,跳过 sleep 立即下一轮 - cron 模式:每秒轮询一次,对每条规则调用
exec_cron_call() - task 模式:每秒轮询一次,每条规则执行一次后从活动集中移除
线程安全
通过多级队列实现无锁化动态增删:
| 队列 | 用途 |
|---|---|
attach_queue_ |
新增算法排队加入 |
detach_queue_ |
待删除的规则 ID 列表 |
reset_queue_ |
待重置统计的规则 ID |
usable_queue_ |
待设置启/停用的 (ruleId, bool) |
once_exec_queue_ |
task 单次执行请求 (algo_ptr, time_range) |
每轮 event_handler() 清空所有队列,修改活动规则集。
随机偏移(防惊群)
AlgBase::init() 使用 XorShift128Plus 随机数生成器为 save_interval_ms_ 和 rule_state_update_interval_ms_ 添加随机毫秒偏移,避免大量规则同时写 DB2。
算法体系
所有算法继承自 AlgBase,基类提供:
- 表达式解析引擎(
mix_cc::matheval::Expression) - 双数据源(共享内存 / iHyperDB)
- 执行间隔控制、心跳日志、前提条件判断
- 报警保存、统计累积
表达式变量系统
表达式引擎中可用以下特殊变量(通过宏替换注入实际值):
| 变量 | 含义 |
|---|---|
m{tagN} |
tagN 的当前值(如 m{tag1}) |
p{tagN} |
tagN 上一次执行时的值 |
s{tagN} |
动作开始时的快照值 |
mx_tagN |
tagN 的历史最大值 |
mi_tagN |
tagN 的历史最小值 |
mv2_tagN |
tagN 的累积变化量 |
up_tagN |
tagN 的上限 |
dw_tagN |
tagN 的下限 |
time |
当前运行时间 |
stime |
动作开始时间 |
etime |
动作结束时间 |
now |
当前时间点 |
自定义带状态函数
| 函数 | 含义 |
|---|---|
KeepT(N, T) |
tagN 为真后保持 T 分钟 |
KeepC(N) |
tagN 为真后累计次数 |
RiseEdge(N) |
tagN 的上升沿检测 |
Detect(N, T) |
T 分钟内检测 tagN 是否为真 |
hold(N, T) |
将 tagN 保持 T 分钟不变(hold 变量有独立的状态管理) |
算法详细说明
1-5: ExpBase — 表达式类算法
所有基于表达式的算法共享 ExpBase,通过 exp_type_ 区分行为:
| exp_type_ | algId | 名称 | 核心逻辑 |
|---|---|---|---|
| Logic (1) | 1 | 实时逻辑判断 | 评估 exp_act_ 表达式,结果为真即报警 |
| Bound (2) | 2 | 变量上下限 | 计算 exp_result_ 值,与下限/上限比较。支持在线自学习:is_learning_=true 时累积统计值到 EqpStat,cron 定期用 DAA::STA 计算新置信区间写回 DB2 |
| ActionFeedBack (3) | 3 | 动作反馈-逻辑 | 跟踪状态机:act_start_done() → 超时/保持检测 → act_done() → 反馈表达式评估 |
| CondBound (4) | 4 | 动作反馈-上下限 | 与 3 相同流程,反馈后用上下限判断 |
| BoundHoldTime (5) | 5 | 上下限-保持时间 | 仅在超限持续超过 hold_time_ 毫秒后才触发报警 |
动作反馈状态机(algId 3/4):
act_start_done() ──→ act_not_hold() ──→ 停止
│ (未保持)
│ act_timeout() ──→ 超时报警
│
▼
act_done() → 反馈评价 → 正常/报警
6-7: ExpTimes — 累计统计
| algId | 名称 | 逻辑 |
|---|---|---|
| 6 | 运行时间累计 | 跟踪条件满足的持续时间,累积到 running_time(小时),超过 max_time_ 阈值报警。结果持久化到 DB2 T_RULE_SAMPLE_1D |
| 7 | 出现次数累计 | 跟踪条件满足的次数,累积到 shear_times,超过 max_times_ 阈值报警 |
两种累计均支持通过 kReset 事件重置。
8, 14: TrendSlope2 / TrendSlope3 — 趋势检测
| 类 | algId | 检测方式 |
|---|---|---|
TrendSlope2 |
8 | 原始值斜率:在 deltaX_ 间隔内查询 iHyperDB,计算连续间隔均值之间的斜率 deltaY / deltaX,若在连续 CS_AVG_SIZE_ 个间隔中斜率超过 limit_slope_ 则报警 |
TrendSlope3 |
14 | 百分比变化: 斜率为 100 * deltaY / avg[i-1],适用于量级不同的变量,实现归一化 |
9, 16, 18: Roller2 / Roller3 — 离群检测
| 类 | algId | 检测方式 |
|---|---|---|
Roller2 |
9 | 基于均值:计算所有变量的算术平均,任一个体偏离均值超过 limit_over_% 即报警 |
Roller3 |
16 | 基于中位数(百分比):计算所有变量的中位数,偏离超过 limit_warn_%(警告)或 limit_error_%(错误)即报警,支持 hold_time_ 持续判断 |
Roller3 |
18 | 基于中位数(实际值):使用实际值阈值代替百分比 |
Roller2/3 的典型应用场景是"轧辊组监控"——同组多个传感器(如多个轧辊温度计),检测其中某个是否明显偏离群体。
10-11: FaultCode — 故障代码解析
维护一个 DB2 T_LOV_FCODE 查询缓存(map2fcode_),将整数值映射为故障名称和内容:
| algId | code_type_ | 解析方式 |
|---|---|---|
| 10 | 0 | 整体解析:将故障码整数值在字典中查找对应描述 |
| 11 | 1 | 按位解析:检查 0-15 每一位,对每个置位位查找对应故障描述 |
12-13: ExpSample2D — 二维拟合与相关性
| algId | 名称 | mon 执行 | task 执行 |
|---|---|---|---|
| 12 | 多项式拟合 | 评估 X/Y 表达式,用 DB2 中存储的拟合系数计算 Y_fit,偏差超 `scale_ × | Y_fit |
| 13 | 皮尔逊相关系数 | 累积到 min_len_ 个数据点后计算皮尔逊 r,与存储值偏差超 scale_ 报警 |
收集数据→计算皮尔逊系数→存入 T_SAMPLE_FIT |
15: GlitchDetection — 毛刺检测
将监控变量累积到固定大小数组(最大 2000 点),数据满后通过 ProxPy 将数据传递给外部 Python 进程("glitch" 模块)执行毛刺分析。是唯一与 Python 联动的算法。
17: ExpBound — 双阈值限幅
单独配置一个值表达式,设置两层阈值:
limit_warn_— 警告阈值,触发 WARN 级别limit_error_— 错误阈值,触发 ERROR 级别
已弃用算法(.do_not_use/)
样板曲线、CPC 检测、FFT 频谱统计、DTW 动态时间规整、一类 SVM 异常检测、KRR 核岭回归、区间分布统计、波形监控等实验性算法。保留代码供参考但不参与编译。
报警机制
报警通道
AlarmPoster 多渠道推送报警:
mon_proc() 返回 AlarmInfo
│
▼
AlarmPoster::alarm()
│
├──→ CMemQueue (内存队列) → AlarmHandler 线程 (500ms轮询) → 统一处理
├──→ ICE MQ (baosight/zmqp) — 可选,用于消息队列服务
└──→ DB2 T_RULE_RESULT — 可选,persist 到数据库
去抖
AlarmPoster 强制每规则最小报警间隔(默认 5 分钟 × AlarmIntervalHours),防止信号振荡导致报警风暴。
报警信息结构
AlarmInfo {
bool alarmed;
json content; // 报警内容(含规则ID、名称、组、值、上下限等)
time_point alarm_start_time;
time_point alarm_end_time;
ConfigInfo cfg_info; // rule ID / name / group / remark
}
表达式配置示例
DB2 T_RULE_CFG.ruleParam JSON 格式:
{
"datasource": 1,
"delay_time": 1000,
"before_exec": "m{tag1} > 0",
"function": {
"exp_str": "m{tag1} > limit_up_ or m{tag1} < limit_down_",
"exp_feedback": "m{tag10} > 0",
"exp_result": "m{tag1} + m{tag2}",
"limit_up_": 800,
"limit_down_": 200,
"hold_time": 5000,
"is_learning_": true,
"dist_mode_": 1,
"time_out": 30000,
"keep_mode": true,
"max_times_": 100,
"max_time_": 720,
"unit": "℃"
},
"tags": {
"tag1": ["C308_TEMP_01", "1号温度"],
"tag2": ["C308_TEMP_02", "2号温度"]
}
}
目录结构
eqpalg/
├── eqpalg.cpp/h # 进程入口,根据进程名确定类型
├── eqpalg_icei.cpp/h # ICE 接口,含共享内存缓存线程
├── algorithm_manager.cpp/h # 算法管理器:DB2 加载 + 规则 CRUD
├── build_algorithm.cpp/h # 算法工厂:algId → 具体类
├── alg_base.cpp/h # 所有算法的基类
├── gb_logger.cpp/h # 全局日志
├── gb_item_memory.cpp/h # GlobaltemSharedMemory 双缓冲区
├── algs/ # 18 种算法实现
│ ├── exp_base.* # algId 1-5: 表达式/上下限/动作反馈
│ ├── exp_times.* # algId 6-7: 累计时间/次数
│ ├── exp_bound.* # algId 17: 双阈值限幅
│ ├── exp_sample2D.* # algId 12-13: 拟合/皮尔逊相关性
│ ├── trend_slope2.* # algId 8: 原始斜率
│ ├── trend_slope3.* # algId 14: 百分比变化率
│ ├── roller2.* # algId 9: 均值离群
│ ├── roller3.* # algId 16,18: 中位数离群
│ ├── fault_code.* # algId 10-11: 故障码解析
│ ├── glitch_detection.* # algId 15: 毛刺检测 → Python
│ └── null.h # 未匹配的空算法
├── threads/ # 多线程调度
│ ├── manager.* # 按 TaskSeq 分组,维护 HandlerExec
│ └── handler_exec.* # 单线程执行句柄 + 5 种队列
├── exp_macro/ # 表达式宏替换(delta/persist/hold/max/min)
├── feature_extraction/ # DAA::STA(统计) / DAA::LSM(拟合) / 分布分析
├── utility/ # 工具类
│ ├── alarm_poster.* # 报警多渠道推送 + 去抖
│ ├── alarm_handler.hpp # CMemQueue 消费线程
│ ├── build_alarm_info.* # 报警 JSON 构造
│ ├── condition_monitor.hpp# 正常数据保存条件判断
│ ├── eqp_stat.* # RuleStatShm 统计管理
│ ├── eqp_status.* # 设备运行状态读取
│ ├── ExpModule.* # 表达式加载与变量管理
│ ├── HoldTime.* # hold(N,T) 状态保持
│ ├── update_data.* # 规则数据写入 Memcached
│ ├── proxy_py.* # C++→Python 通信(共享内存 + ICE)
│ ├── StatExp.hpp # 带状态自定义函数
│ ├── VarsCache.hpp # mm_vars 预分配缓存
│ └── XorShift128Plus.hpp # 随机数(防惊群)
├── table_struct/ # DB2 ORM: T_RULE_CFG / T_RULE_RESULT 等
├── define/ # 枚举常量:ProcessType / ExpType / EventCase 等
├── doc/ # Doxygen 文档
├── .do_not_use/ # 弃用算法(不参与编译)
└── CMakeLists.txt
数据流全景
PLC/传感器
│
▼
DSF/OPC ──→ 共享内存 MemFix<PLC_DATA>
│
├──→ GlobaltemSharedMemory (双缓冲, ~19ms)
│ │
│ ├──→ eqpalg-mon: mm_vars → 表达式引擎 → 报警/统计
│ │ │
│ │ ├──→ AlarmPoster → DB2 T_RULE_RESULT / ICE MQ
│ │ ├──→ 正常数据 → DB2 T_RULE_SAMPLE_*
│ │ ├──→ 统计累积 → RuleStatShm → cron 学习
│ │ └──→ 页面数据 → EqpStat → Memcached
│ │
│ ├──→ eqpalg-cron: RuleStatShm → DAA::STA → CI → DB2
│ │
│ └──→ eqpalg-task: iHyperDB → 拟合/统计 → DB2
│
└──→ dsm: ZONE10 → DataQuery → /dscdata/Rule*Data/*.json
启动流程
eqpalg::start()— 根据模块名确定glob_process_type- 创建
EqpAlgICEI— 注册 ICE + 初始化共享内存缓存线程(仅 mon) AlgorithmManager— 从 DB2T_RULE_CFG加载所有规则配置threads::Manager::start()— 按TaskSeq分组创建HandlerExec线程- mon: 额外启动 alarm handler 线程(500ms 轮询 CMemQueue)
- mon:
mem_cached_thread_开始以 ~19ms 间隔刷新GlobaltemSharedMemory
相关进程
| 进程 | 关系 |
|---|---|
| dsm | eqpalg-mon 的 TimeNotify(5) 向 dsm 推送 ruleId 列表;eqpalg 将规则配置写入共享内存 ZONE10 供 dsm 读取归档 |
| eqpalg-cron | eqpalg-mon 收到 99999 电文时通过 ICE 代理转发给 cron,保持配置同步 |
| RICS | RICS 读取同一条 T_RULE_CFG 配置,格式化为 JSON 推送到 Memcached 供前端展示,与 eqpalg 是"展示面"与"执行面"的关系 |
| eqpm | eqpalg 产生报警写入 T_RULE_RESULT → eqpm 查询 FV_RESULT_JOIN_EQPID 获取报警统计用于设备状态面板 |
| 前端/页面服务 | 通过 ICE 发送规则 CRUD 指令;通过 Memcached 读取规则状态和设备数据 |
作者
- Cat (null.null.null@qq.com)
- 宝信软件(Baosight Co. Ltd.)
- 版本: 0.1 (2021-09)