eis/py/glitch/GlitchDetector.py

227 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
from scipy.stats import kurtosis
class GlitchDetector:
def __init__(self, historical_data=None, kurtosis_threshold=3.0):
"""
毛刺检测器初始化
参数:
historical_data: DataFrame, 历史正常数据,包含"data"
kurtosis_threshold: float, 峰度异常阈值(标准差倍数)
"""
self.historical_data = historical_data
self.kurtosis_threshold = kurtosis_threshold
self.iqr_params = {} # 存储IQR参数
self.baseline_kurtosis = None # 基准峰度值
self.lb=0
self.ub=0
self.va=0
def iqr_normalize(self, df, column='data'):
"""
使用IQR方法对数据进行归一化处理 [1,2,3](@ref)
参数:
df: DataFrame, 输入数据
column: str, 数据列名
返回:
DataFrame, 归一化后的数据
"""
if df.empty:
return df
data_series = df[column]
# 计算IQR参数
Q1 = data_series.quantile(0.25)
Q3 = data_series.quantile(0.75)
IQR = Q3 - Q1
median = data_series.median()
# 存储参数供后续使用
self.iqr_params = {'Q1': Q1, 'Q3': Q3, 'IQR': IQR, 'median': median}
# IQR归一化: (x - median) / IQR
normalized_data = (data_series - median) / IQR
result_df = df.copy()
result_df[f'{column}_normalized'] = normalized_data
return result_df
def calculate_kurtosis(self, df, column='data_normalized'):
"""
计算数据的峰度(峭度)[5](@ref)
参数:
df: DataFrame, 输入数据
column: str, 数据列名
返回:
float, 峰度值
"""
if df.empty:
return 0
data_series = df[column].dropna()
if len(data_series) < 4: # 峰度计算需要至少4个数据点
return 0
# 计算峰度使用Fisher的定义正态分布的峰度为0
kurt = kurtosis(data_series, fisher=True)
return kurt
def establish_baseline(self, historical_df, column='data'):
"""
基于历史正常数据建立基准
参数:
historical_df: DataFrame, 历史正常数据
column: str, 数据列名
"""
if historical_df is None or historical_df.empty:
raise ValueError("历史数据不能为空")
# 对历史数据进行IQR归一化
normalized_historical = self.iqr_normalize(historical_df, column)
# 计算历史数据的峰度作为基准
self.baseline_kurtosis = self.calculate_kurtosis(normalized_historical)
# print(f"基准峰度值建立完成: {self.baseline_kurtosis:.4f}")
def detect_glitches(self, current_df, column='data',baseline_kurtosis =None):
"""
检测当前数据中的毛刺
参数:
current_df: DataFrame, 当前数据
column: str, 数据列名
返回:
dict, 检测结果
"""
if baseline_kurtosis is not None:
self.baseline_kurtosis=baseline_kurtosis
if self.baseline_kurtosis is None and self.historical_data is not None:
self.establish_baseline(self.historical_data, column)
elif self.baseline_kurtosis is None:
raise ValueError("未提供历史数据或未建立基准")
# 对当前数据进行IQR归一化
normalized_current = self.iqr_normalize(current_df, column)
# 计算当前数据的峰度
current_kurtosis = self.calculate_kurtosis(normalized_current)
# 计算与基准的偏差
kurtosis_deviation = abs(current_kurtosis - self.baseline_kurtosis)
self.va=current_kurtosis
self.lb=-abs(self.kurtosis_threshold*self.baseline_kurtosis)
self.ub=abs(self.kurtosis_threshold*self.baseline_kurtosis)
# 判断是否存在毛刺
# has_glitch = abs (current_kurtosis) > abs(self.kurtosis_threshold*self.baseline_kurtosis)
has_glitch = abs(self.va)>self.ub
# 构建结果
result = {
'has_glitch': has_glitch,
'current_kurtosis': current_kurtosis,
'baseline_kurtosis': self.baseline_kurtosis,
'deviation': kurtosis_deviation,
'threshold': self.kurtosis_threshold,
'normalized_data': normalized_current,
'iqr_params': self.iqr_params
}
return result
# 使用示例
def example_usage():
"""
使用示例
"""
# 生成示例数据
np.random.seed(42)
# 历史正常数据(无毛刺)
n_points = 200
time_index = pd.date_range('2024-01-01', periods=n_points, freq='h')
t = np.linspace(0, 4*np.pi, n_points)
historical_data = t + np.random.normal(0, 0.5, n_points)
historical_df = pd.DataFrame({'data': historical_data}, index=time_index[:n_points])
# 当前数据(包含一些毛刺)
current_data = historical_data.copy()
# 添加一些毛刺
glitch_indices = [10, 30, 50, 70,90,110, 130, 150, 170]
for idx in glitch_indices:
current_data[idx] += 15 * np.random.randn() # 添加随机毛刺
current_df = pd.DataFrame({'data': current_data}, index=time_index[:n_points])
# 创建检测器并检测毛刺
detector = GlitchDetector(historical_df, kurtosis_threshold=20.0)
try:
result = detector.detect_glitches(current_df,baseline_kurtosis=0.08)
# 打印结果
print("=== 毛刺检测结果 ===")
print(f"是否存在毛刺: {result['has_glitch']}")
print(f"当前数据峰度: {result['current_kurtosis']:.4f}")
print(f"基准峰度: {result['baseline_kurtosis']:.4f}")
print(f"峰度偏差: {result['deviation']:.4f}")
print(f"检测阈值: {result['threshold']}")
return result
except Exception as e:
print(f"检测过程中出错: {e}")
return None
# 批量处理函数
def batch_glitch_detection(historical_df, test_dfs, column='data', threshold=3.0):
"""
批量进行毛刺检测
参数:
historical_df: DataFrame, 历史正常数据
test_dfs: list, 多个测试数据的DataFrame列表
column: str, 数据列名
threshold: float, 检测阈值
返回:
DataFrame, 批量检测结果
"""
detector = GlitchDetector(historical_df, kurtosis_threshold=threshold)
results = []
for i, test_df in enumerate(test_dfs):
try:
result = detector.detect_glitches(test_df, column)
result['batch_id'] = i
result['data_points'] = len(test_df)
results.append(result)
except Exception as e:
print(f"{i} 个数据集检测失败: {e}")
# 转换为DataFrame
summary_df = pd.DataFrame([{
'batch_id': r['batch_id'],
'has_glitch': r['has_glitch'],
'current_kurtosis': r['current_kurtosis'],
'deviation': r['deviation'],
'data_points': r['data_points']
} for r in results])
return summary_df
if __name__ == "__main__":
# 运行示例
result = example_usage()