eis/eqpalg/eqpalg_readme.md
Huamonarch bd7e93ae68 Add READMEs for RICS, eqpm, dsm and update eqpalg README with corrections
- RICS_readme.md: Rule information centralized display service
- eqpm_readme.md: Equipment predictive maintenance & status monitoring
- dsm_readme.md: Data save manager for historical data archiving
- eqpalg_readme.md: Corrected architecture, data flow, variable system,
  thread model, and inter-process relationships
2026-05-09 11:46:17 +08:00

20 KiB
Raw Permalink Blame History

eqpalg — 设备算法监控引擎

概述

eqpalgEquipment Process Algorithm是 EIS 系统中的核心设备监控与报警引擎。它基于宝信 PACE 平台构建,通过 ICE 中间件与系统内其他进程通信,从共享内存实时读取 PLC 数据,根据 DB2 中配置的规则,以数学表达式引擎对设备运行状态进行实时监控、统计分析和报警。

进程架构

eqpalg 以 3 个进程实例 的形式运行,部署为不同的 PACE 组件名,各行其责:

进程名 ProcessType 功能
eqpalg-mon kMon 实时监控进程 — 从共享内存双缓冲区读取实时 PLC 数据(~19ms 刷新一次),周期性执行所有启用的规则(执行间隔可配置,最小 20ms产出实时报警
eqpalg-cron kCron 定时学习进程 — 每 1 秒轮询一次,从 RuleStatShm(由 mon 累积的统计值)中取出统计数据,通过 DAA::STA 分布分析类计算置信区间,将学习结果写回 DB2T_SAMPLE_STATT_SAMPLE_MAG)。默认每 24 小时更新一次
eqpalg-task kTask 单次执行进程 — 响应用户发起的按需分析任务,从 iHyperDB 拉取指定时间范围的历史数据,执行算法并返回结果(含报警回测、多项式拟合、相关性计算等)

三者协作eqpalg-mon 收到 99999 电文(规则 CRUD通过 ICE 代理转发给 eqpalg-cron确保 cron 的配置与 mon 保持同步。三个进程各自实例化独立的 AlgorithmManager,从 DB2 独立加载规则,但 mon 的执行频率最高毫秒级cron 次之秒级task 按需触发。

关键依赖

  • ICE — 进程间通信,注册端点 "baosight/eqpalg-{mon|cron|task}"
  • DB2 — 存储规则配置(T_RULE_CFG)、报警记录(T_RULE_RESULT)、样本统计(T_RULE_SAMPLE_1DT_SAMPLE_STATT_SAMPLE_FITT_SAMPLE_MAG)、时间戳(T_RULE_RECORD_TIME
  • 共享内存SHM — 实时 PLC 数据的低延迟访问通道,基于 MemFix<PLC_DATA> 双缓冲区设计
  • iHyperDB — 工业历史数据库,供 cron/task 查询历史趋势
  • Boost — 通用工具库filesystem, serialization, stacktrace, container 等)
  • dlib / mlpack / Armadillo / Eigen3 — 数值计算库(DAA::STA 统计分布分析、DAA::LSM 最小二乘拟合、矩阵运算)
  • nlohmann/json — JSON 解析

核心工作流

  现场PLC/传感器
       │
       ▼
  DSF 数据采集 ──→ 共享内存 MemFix<PLC_DATA>
       │
       ▼
  GlobaltemSharedMemory (双缓冲区, ~19ms刷新)
       │
       ├──→ eqpalg-mon: mm_vars → matheval 表达式引擎 → mon_proc()
       │         │
       │         ├──→ 报警 → AlarmPoster → CMemQueue / ICE MQ / DB2
       │         ├──→ 正常数据 → ExpModule → DB2 T_RULE_SAMPLE_*
       │         ├──→ 统计累积 → RuleStatShm (供 cron 学习)
       │         └──→ 页面数据 → EqpStat → Memcached
       │
       ├──→ eqpalg-cron: RuleStatShm → DAA::STA → 置信区间 → DB2
       │
       └──→ eqpalg-task: iHyperDB(指定时间范围) → 拟合/统计/回测

数据源模式

每个规则可以配置两种数据源(DataSource枚举):

模式 说明
MEMORY 1 mon 进程默认:从 GlobaltemSharedMemory 双缓冲区实时读取,延迟低至 20ms
IHDB 0 cron/task 进程使用:通过 mix_cc::ihd API 从 iHyperDB 查询历史数据

共享内存双缓冲区

GlobaltemSharedMemory 是 eqpalg 的数据中枢:

  • 写侧mem_cached_thread_(仅在 mon 中)以 ~19ms 为周期,使用 CLOCK_MONOTONIC 单调时钟计时,从 MemFix<PLC_DATA> 顺序读取所有 tag 值,写入 write 缓冲区后执行 swap通过 std::shared_mutex 保护)
  • 读侧:监控线程无锁读取 read 缓冲区,不会与写操作竞争
  • 超时处理:若一次刷新超过 19ms记录 Error 日志并立即开始下一次循环,不再 sleep
  • 索引构建:构造函数中遍历 BinaryTele 电文定义,将 tag 名→缓冲区索引的映射一次性构建

ICE 电文接口

EqpAlgICEI 继承自 MessageICE,处理三类调用:

SendDataShort(eventNo, seq) — 同步消息分发:

eventNo 用途 处理逻辑
99999 规则 CRUD + 单次执行 JSON {"eventNo":N, "ruleId":"...", "algId":N, ...}AlgorithmManager::dispose()。mon 收到后通过 ICE 代理转发给 cron
11111 外部原始报警 直接调用 AlarmPoster::alarm(str)
22222 规则上下限更新 JSON {"ruleid":"...", "lb":N, "ub":N, "va":N, ...}update_limit_alarm()

SendDataLong — 复杂数据接口(未实现)

TimeNotify(eventNo) — 定时器回调:

eventNo 用途 触发周期
1 更新设备运行状态到 Memcached ~1秒
2 更新规则统计共享内存数据 ~1秒
5 向 dsm 发送 ruleId 列表和共享内存统计 定期

规则生命周期eventNo=99999

页面/外部 → ICE 电文(JSON) → AlgorithmManager::dispose()
                                   │
       ┌───────┬────────┬──────────┼──────────┬────────┐
       ▼       ▼        ▼          ▼          ▼        ▼
   kCreate  kDelete  kUpdate   kEnable    kReset   kExec
   (1)      (0)      (2)       (3)        (4)      (10)
  • kCreate(1): thread_manager_.storage()attach() → 启动线程执行 → 更新 DB2 T_RULE_RECORD_TIME
  • kDelete(0): delete_instance() → 停止线程 → 删除 DB2 记录
  • kUpdate(2): detach()storage()attach() → 更新修改时间
  • kEnable(3): thread_manager_.enable(ruleId, usable) → 更新 DB2 时间
  • kReset(4): 重置算法 6累计时间和 7出现次数的统计值
  • kExec(10): 单次执行,从 DB2 重新读取最新配置,创建独立线程运行 exec_task(time_range)

线程模型

三层调度

threads::Manager        ← 顶层:按 TaskSeq 分配线程,维护 handles_ map
       │
       ▼
HandlerExec             ← 中层:单个线程的执行句柄,管理队列和规则集
       │
       ▼
AlgBase (子类实例)       ← 底层:具体算法执行

HandlerExec 执行循环

每个 HandlerExec 运行在独立线程中,线程名格式为 alg_{algId}_{dataSource}_{taskSeq}cron 加后缀 _crontask 加后缀 _task

  • mon 模式:遍历规则,对每条调用 exec_mon_call()。每条规则执行间隔由配置的 delay_time_ 控制。若单次执行超过 20ms跳过 sleep 立即下一轮
  • cron 模式:每秒轮询一次,对每条规则调用 exec_cron_call()
  • task 模式:每秒轮询一次,每条规则执行一次后从活动集中移除

线程安全

通过多级队列实现无锁化动态增删:

队列 用途
attach_queue_ 新增算法排队加入
detach_queue_ 待删除的规则 ID 列表
reset_queue_ 待重置统计的规则 ID
usable_queue_ 待设置启/停用的 (ruleId, bool)
once_exec_queue_ task 单次执行请求 (algo_ptr, time_range)

每轮 event_handler() 清空所有队列,修改活动规则集。

随机偏移(防惊群)

AlgBase::init() 使用 XorShift128Plus 随机数生成器为 save_interval_ms_rule_state_update_interval_ms_ 添加随机毫秒偏移,避免大量规则同时写 DB2。

算法体系

所有算法继承自 AlgBase,基类提供:

  • 表达式解析引擎(mix_cc::matheval::Expression
  • 双数据源(共享内存 / iHyperDB
  • 执行间隔控制、心跳日志、前提条件判断
  • 报警保存、统计累积

表达式变量系统

表达式引擎中可用以下特殊变量(通过宏替换注入实际值):

变量 含义
m{tagN} tagN 的当前值(如 m{tag1}
p{tagN} tagN 上一次执行时的值
s{tagN} 动作开始时的快照值
mx_tagN tagN 的历史最大值
mi_tagN tagN 的历史最小值
mv2_tagN tagN 的累积变化量
up_tagN tagN 的上限
dw_tagN tagN 的下限
time 当前运行时间
stime 动作开始时间
etime 动作结束时间
now 当前时间点

自定义带状态函数

函数 含义
KeepT(N, T) tagN 为真后保持 T 分钟
KeepC(N) tagN 为真后累计次数
RiseEdge(N) tagN 的上升沿检测
Detect(N, T) T 分钟内检测 tagN 是否为真
hold(N, T) 将 tagN 保持 T 分钟不变hold 变量有独立的状态管理)

算法详细说明

1-5: ExpBase — 表达式类算法

所有基于表达式的算法共享 ExpBase,通过 exp_type_ 区分行为:

exp_type_ algId 名称 核心逻辑
Logic (1) 1 实时逻辑判断 评估 exp_act_ 表达式,结果为真即报警
Bound (2) 2 变量上下限 计算 exp_result_ 值,与下限/上限比较。支持在线自学习:is_learning_=true 时累积统计值到 EqpStatcron 定期用 DAA::STA 计算新置信区间写回 DB2
ActionFeedBack (3) 3 动作反馈-逻辑 跟踪状态机:act_start_done() → 超时/保持检测 → act_done() → 反馈表达式评估
CondBound (4) 4 动作反馈-上下限 与 3 相同流程,反馈后用上下限判断
BoundHoldTime (5) 5 上下限-保持时间 仅在超限持续超过 hold_time_ 毫秒后才触发报警

动作反馈状态机algId 3/4

  act_start_done()  ──→ act_not_hold()  ──→ 停止
       │                     (未保持)
       │ act_timeout() ──→ 超时报警
       │
       ▼
  act_done() → 反馈评价 → 正常/报警

6-7: ExpTimes — 累计统计

algId 名称 逻辑
6 运行时间累计 跟踪条件满足的持续时间,累积到 running_time(小时),超过 max_time_ 阈值报警。结果持久化到 DB2 T_RULE_SAMPLE_1D
7 出现次数累计 跟踪条件满足的次数,累积到 shear_times,超过 max_times_ 阈值报警

两种累计均支持通过 kReset 事件重置。

8, 14: TrendSlope2 / TrendSlope3 — 趋势检测

algId 检测方式
TrendSlope2 8 原始值斜率:在 deltaX_ 间隔内查询 iHyperDB计算连续间隔均值之间的斜率 deltaY / deltaX,若在连续 CS_AVG_SIZE_ 个间隔中斜率超过 limit_slope_ 则报警
TrendSlope3 14 百分比变化: 斜率为 100 * deltaY / avg[i-1],适用于量级不同的变量,实现归一化

9, 16, 18: Roller2 / Roller3 — 离群检测

algId 检测方式
Roller2 9 基于均值:计算所有变量的算术平均,任一个体偏离均值超过 limit_over_% 即报警
Roller3 16 基于中位数(百分比):计算所有变量的中位数,偏离超过 limit_warn_%(警告)或 limit_error_%(错误)即报警,支持 hold_time_ 持续判断
Roller3 18 基于中位数(实际值):使用实际值阈值代替百分比

Roller2/3 的典型应用场景是"轧辊组监控"——同组多个传感器(如多个轧辊温度计),检测其中某个是否明显偏离群体。

10-11: FaultCode — 故障代码解析

维护一个 DB2 T_LOV_FCODE 查询缓存(map2fcode_),将整数值映射为故障名称和内容:

algId code_type_ 解析方式
10 0 整体解析:将故障码整数值在字典中查找对应描述
11 1 按位解析:检查 0-15 每一位,对每个置位位查找对应故障描述

12-13: ExpSample2D — 二维拟合与相关性

algId 名称 mon 执行 task 执行
12 多项式拟合 评估 X/Y 表达式,用 DB2 中存储的拟合系数计算 Y_fit偏差超 `scale_ × Y_fit
13 皮尔逊相关系数 累积到 min_len_ 个数据点后计算皮尔逊 r与存储值偏差超 scale_ 报警 收集数据→计算皮尔逊系数→存入 T_SAMPLE_FIT

15: GlitchDetection — 毛刺检测

将监控变量累积到固定大小数组(最大 2000 点),数据满后通过 ProxPy 将数据传递给外部 Python 进程("glitch" 模块)执行毛刺分析。是唯一与 Python 联动的算法。

17: ExpBound — 双阈值限幅

单独配置一个值表达式,设置两层阈值:

  • limit_warn_ — 警告阈值,触发 WARN 级别
  • limit_error_ — 错误阈值,触发 ERROR 级别

已弃用算法(.do_not_use/

样板曲线、CPC 检测、FFT 频谱统计、DTW 动态时间规整、一类 SVM 异常检测、KRR 核岭回归、区间分布统计、波形监控等实验性算法。保留代码供参考但不参与编译。

报警机制

报警通道

AlarmPoster 多渠道推送报警:

  mon_proc() 返回 AlarmInfo
       │
       ▼
  AlarmPoster::alarm()
       │
       ├──→ CMemQueue (内存队列) → AlarmHandler 线程 (500ms轮询) → 统一处理
       ├──→ ICE MQ (baosight/zmqp) — 可选,用于消息队列服务
       └──→ DB2 T_RULE_RESULT — 可选persist 到数据库

去抖

AlarmPoster 强制每规则最小报警间隔(默认 5 分钟 × AlarmIntervalHours),防止信号振荡导致报警风暴。

报警信息结构

AlarmInfo {
  bool alarmed;
  json content;       // 报警内容含规则ID、名称、组、值、上下限等
  time_point alarm_start_time;
  time_point alarm_end_time;
  ConfigInfo cfg_info;  // rule ID / name / group / remark
}

表达式配置示例

DB2 T_RULE_CFG.ruleParam JSON 格式:

{
  "datasource": 1,
  "delay_time": 1000,
  "before_exec": "m{tag1} > 0",
  "function": {
    "exp_str": "m{tag1} > limit_up_ or m{tag1} < limit_down_",
    "exp_feedback": "m{tag10} > 0",
    "exp_result": "m{tag1} + m{tag2}",
    "limit_up_": 800,
    "limit_down_": 200,
    "hold_time": 5000,
    "is_learning_": true,
    "dist_mode_": 1,
    "time_out": 30000,
    "keep_mode": true,
    "max_times_": 100,
    "max_time_": 720,
    "unit": "℃"
  },
  "tags": {
    "tag1": ["C308_TEMP_01", "1号温度"],
    "tag2": ["C308_TEMP_02", "2号温度"]
  }
}

目录结构

eqpalg/
├── eqpalg.cpp/h              # 进程入口,根据进程名确定类型
├── eqpalg_icei.cpp/h         # ICE 接口,含共享内存缓存线程
├── algorithm_manager.cpp/h   # 算法管理器DB2 加载 + 规则 CRUD
├── build_algorithm.cpp/h     # 算法工厂algId → 具体类
├── alg_base.cpp/h            # 所有算法的基类
├── gb_logger.cpp/h           # 全局日志
├── gb_item_memory.cpp/h      # GlobaltemSharedMemory 双缓冲区
├── algs/                     # 18 种算法实现
│   ├── exp_base.*           # algId 1-5: 表达式/上下限/动作反馈
│   ├── exp_times.*          # algId 6-7: 累计时间/次数
│   ├── exp_bound.*          # algId 17: 双阈值限幅
│   ├── exp_sample2D.*       # algId 12-13: 拟合/皮尔逊相关性
│   ├── trend_slope2.*       # algId 8: 原始斜率
│   ├── trend_slope3.*       # algId 14: 百分比变化率
│   ├── roller2.*            # algId 9: 均值离群
│   ├── roller3.*            # algId 16,18: 中位数离群
│   ├── fault_code.*         # algId 10-11: 故障码解析
│   ├── glitch_detection.*   # algId 15: 毛刺检测 → Python
│   └── null.h               # 未匹配的空算法
├── threads/                  # 多线程调度
│   ├── manager.*            # 按 TaskSeq 分组,维护 HandlerExec
│   └── handler_exec.*       # 单线程执行句柄 + 5 种队列
├── exp_macro/                # 表达式宏替换delta/persist/hold/max/min
├── feature_extraction/       # DAA::STA(统计) / DAA::LSM(拟合) / 分布分析
├── utility/                  # 工具类
│   ├── alarm_poster.*       # 报警多渠道推送 + 去抖
│   ├── alarm_handler.hpp    # CMemQueue 消费线程
│   ├── build_alarm_info.*   # 报警 JSON 构造
│   ├── condition_monitor.hpp# 正常数据保存条件判断
│   ├── eqp_stat.*           # RuleStatShm 统计管理
│   ├── eqp_status.*         # 设备运行状态读取
│   ├── ExpModule.*          # 表达式加载与变量管理
│   ├── HoldTime.*           # hold(N,T) 状态保持
│   ├── update_data.*        # 规则数据写入 Memcached
│   ├── proxy_py.*           # C++→Python 通信(共享内存 + ICE
│   ├── StatExp.hpp          # 带状态自定义函数
│   ├── VarsCache.hpp        # mm_vars 预分配缓存
│   └── XorShift128Plus.hpp  # 随机数(防惊群)
├── table_struct/             # DB2 ORM: T_RULE_CFG / T_RULE_RESULT 等
├── define/                   # 枚举常量ProcessType / ExpType / EventCase 等
├── doc/                      # Doxygen 文档
├── .do_not_use/              # 弃用算法(不参与编译)
└── CMakeLists.txt

数据流全景

  PLC/传感器
      │
      ▼
  DSF/OPC ──→ 共享内存 MemFix<PLC_DATA>
      │
      ├──→ GlobaltemSharedMemory (双缓冲, ~19ms)
      │       │
      │       ├──→ eqpalg-mon: mm_vars → 表达式引擎 → 报警/统计
      │       │       │
      │       │       ├──→ AlarmPoster → DB2 T_RULE_RESULT / ICE MQ
      │       │       ├──→ 正常数据 → DB2 T_RULE_SAMPLE_*
      │       │       ├──→ 统计累积 → RuleStatShm → cron 学习
      │       │       └──→ 页面数据 → EqpStat → Memcached
      │       │
      │       ├──→ eqpalg-cron: RuleStatShm → DAA::STA → CI → DB2
      │       │
      │       └──→ eqpalg-task: iHyperDB → 拟合/统计 → DB2
      │
      └──→ dsm: ZONE10 → DataQuery → /dscdata/Rule*Data/*.json

启动流程

  1. eqpalg::start() — 根据模块名确定 glob_process_type
  2. 创建 EqpAlgICEI — 注册 ICE + 初始化共享内存缓存线程(仅 mon
  3. AlgorithmManager — 从 DB2 T_RULE_CFG 加载所有规则配置
  4. threads::Manager::start() — 按 TaskSeq 分组创建 HandlerExec 线程
  5. mon: 额外启动 alarm handler 线程500ms 轮询 CMemQueue
  6. mon: mem_cached_thread_ 开始以 ~19ms 间隔刷新 GlobaltemSharedMemory

相关进程

进程 关系
dsm eqpalg-mon 的 TimeNotify(5) 向 dsm 推送 ruleId 列表eqpalg 将规则配置写入共享内存 ZONE10 供 dsm 读取归档
eqpalg-cron eqpalg-mon 收到 99999 电文时通过 ICE 代理转发给 cron保持配置同步
RICS RICS 读取同一条 T_RULE_CFG 配置,格式化为 JSON 推送到 Memcached 供前端展示,与 eqpalg 是"展示面"与"执行面"的关系
eqpm eqpalg 产生报警写入 T_RULE_RESULT → eqpm 查询 FV_RESULT_JOIN_EQPID 获取报警统计用于设备状态面板
前端/页面服务 通过 ICE 发送规则 CRUD 指令;通过 Memcached 读取规则状态和设备数据

作者