eis/py/glitch/GlitchDetector.py

227 lines
7.3 KiB
Python
Raw Normal View History

import pandas as pd
import numpy as np
from scipy.stats import kurtosis
class GlitchDetector:
def __init__(self, historical_data=None, kurtosis_threshold=3.0):
"""
毛刺检测器初始化
参数:
historical_data: DataFrame, 历史正常数据包含"data"
kurtosis_threshold: float, 峰度异常阈值标准差倍数
"""
self.historical_data = historical_data
self.kurtosis_threshold = kurtosis_threshold
self.iqr_params = {} # 存储IQR参数
self.baseline_kurtosis = None # 基准峰度值
self.lb=0
self.ub=0
self.va=0
def iqr_normalize(self, df, column='data'):
"""
使用IQR方法对数据进行归一化处理 [1,2,3](@ref)
参数:
df: DataFrame, 输入数据
column: str, 数据列名
返回:
DataFrame, 归一化后的数据
"""
if df.empty:
return df
data_series = df[column]
# 计算IQR参数
Q1 = data_series.quantile(0.25)
Q3 = data_series.quantile(0.75)
IQR = Q3 - Q1
median = data_series.median()
# 存储参数供后续使用
self.iqr_params = {'Q1': Q1, 'Q3': Q3, 'IQR': IQR, 'median': median}
# IQR归一化: (x - median) / IQR
normalized_data = (data_series - median) / IQR
result_df = df.copy()
result_df[f'{column}_normalized'] = normalized_data
return result_df
def calculate_kurtosis(self, df, column='data_normalized'):
"""
计算数据的峰度峭度[5](@ref)
参数:
df: DataFrame, 输入数据
column: str, 数据列名
返回:
float, 峰度值
"""
if df.empty:
return 0
data_series = df[column].dropna()
if len(data_series) < 4: # 峰度计算需要至少4个数据点
return 0
# 计算峰度使用Fisher的定义正态分布的峰度为0
kurt = kurtosis(data_series, fisher=True)
return kurt
def establish_baseline(self, historical_df, column='data'):
"""
基于历史正常数据建立基准
参数:
historical_df: DataFrame, 历史正常数据
column: str, 数据列名
"""
if historical_df is None or historical_df.empty:
raise ValueError("历史数据不能为空")
# 对历史数据进行IQR归一化
normalized_historical = self.iqr_normalize(historical_df, column)
# 计算历史数据的峰度作为基准
self.baseline_kurtosis = self.calculate_kurtosis(normalized_historical)
# print(f"基准峰度值建立完成: {self.baseline_kurtosis:.4f}")
def detect_glitches(self, current_df, column='data',baseline_kurtosis =None):
"""
检测当前数据中的毛刺
参数:
current_df: DataFrame, 当前数据
column: str, 数据列名
返回:
dict, 检测结果
"""
if baseline_kurtosis is not None:
self.baseline_kurtosis=baseline_kurtosis
if self.baseline_kurtosis is None and self.historical_data is not None:
self.establish_baseline(self.historical_data, column)
elif self.baseline_kurtosis is None:
raise ValueError("未提供历史数据或未建立基准")
# 对当前数据进行IQR归一化
normalized_current = self.iqr_normalize(current_df, column)
# 计算当前数据的峰度
current_kurtosis = self.calculate_kurtosis(normalized_current)
# 计算与基准的偏差
kurtosis_deviation = abs(current_kurtosis - self.baseline_kurtosis)
self.va=current_kurtosis
self.lb=-abs(self.kurtosis_threshold*self.baseline_kurtosis)
self.ub=abs(self.kurtosis_threshold*self.baseline_kurtosis)
# 判断是否存在毛刺
# has_glitch = abs (current_kurtosis) > abs(self.kurtosis_threshold*self.baseline_kurtosis)
has_glitch = abs(self.va)>self.ub
# 构建结果
result = {
'has_glitch': has_glitch,
'current_kurtosis': current_kurtosis,
'baseline_kurtosis': self.baseline_kurtosis,
'deviation': kurtosis_deviation,
'threshold': self.kurtosis_threshold,
'normalized_data': normalized_current,
'iqr_params': self.iqr_params
}
return result
# 使用示例
def example_usage():
"""
使用示例
"""
# 生成示例数据
np.random.seed(42)
# 历史正常数据(无毛刺)
n_points = 200
time_index = pd.date_range('2024-01-01', periods=n_points, freq='h')
t = np.linspace(0, 4*np.pi, n_points)
historical_data = t + np.random.normal(0, 0.5, n_points)
historical_df = pd.DataFrame({'data': historical_data}, index=time_index[:n_points])
# 当前数据(包含一些毛刺)
current_data = historical_data.copy()
# 添加一些毛刺
glitch_indices = [10, 30, 50, 70,90,110, 130, 150, 170]
for idx in glitch_indices:
current_data[idx] += 15 * np.random.randn() # 添加随机毛刺
current_df = pd.DataFrame({'data': current_data}, index=time_index[:n_points])
# 创建检测器并检测毛刺
detector = GlitchDetector(historical_df, kurtosis_threshold=20.0)
try:
result = detector.detect_glitches(current_df,baseline_kurtosis=0.08)
# 打印结果
print("=== 毛刺检测结果 ===")
print(f"是否存在毛刺: {result['has_glitch']}")
print(f"当前数据峰度: {result['current_kurtosis']:.4f}")
print(f"基准峰度: {result['baseline_kurtosis']:.4f}")
print(f"峰度偏差: {result['deviation']:.4f}")
print(f"检测阈值: {result['threshold']}")
return result
except Exception as e:
print(f"检测过程中出错: {e}")
return None
# 批量处理函数
def batch_glitch_detection(historical_df, test_dfs, column='data', threshold=3.0):
"""
批量进行毛刺检测
参数:
historical_df: DataFrame, 历史正常数据
test_dfs: list, 多个测试数据的DataFrame列表
column: str, 数据列名
threshold: float, 检测阈值
返回:
DataFrame, 批量检测结果
"""
detector = GlitchDetector(historical_df, kurtosis_threshold=threshold)
results = []
for i, test_df in enumerate(test_dfs):
try:
result = detector.detect_glitches(test_df, column)
result['batch_id'] = i
result['data_points'] = len(test_df)
results.append(result)
except Exception as e:
print(f"{i} 个数据集检测失败: {e}")
# 转换为DataFrame
summary_df = pd.DataFrame([{
'batch_id': r['batch_id'],
'has_glitch': r['has_glitch'],
'current_kurtosis': r['current_kurtosis'],
'deviation': r['deviation'],
'data_points': r['data_points']
} for r in results])
return summary_df
if __name__ == "__main__":
# 运行示例
result = example_usage()