eis/py/comlib/mlearn/MLAnalyzer.py

564 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
MLAnalyzer.py - 机器学习模型分析与训练工具
该模块提供了一个灵活的机器学习分析器类(MLAnalyzer),支持多种常见算法的训练和评估。
主要功能包括:
- 支持多种监督学习算法(回归和分类)
- 自动数据预处理(数值特征标准化和类别特征编码)
- 模型训练和评估
- 可扩展的算法集合
使用方法:
1. 实例化MLAnalyzer指定所需算法
2. 调用preprocess_data进行数据预处理
3. 使用fit方法训练模型
4. 使用predict方法进行预测
5. 使用evaluate方法评估模型性能
依赖库:
- pandas: 数据处理
- numpy: 数值计算
- scikit-learn: 机器学习算法和工具
- xgboost: 高性能梯度提升框架
- pickle: 模型序列化
Author
- Author : zoufuzhou
- Date : 2025-05-21 16:34:37
- Description : Machine learning analysis
- LastEditTime : 2025-05-21 16:34:37
"""
import pandas as pd
import numpy as np
from typing import Dict, Union, List, Optional
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression
from xgboost import XGBRegressor, XGBClassifier
from sklearn.neural_network import MLPRegressor, MLPClassifier
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
mean_squared_error, mean_absolute_error, r2_score
)
import pickle
import os
class MLAnalyzer:
"""机器学习分析器,支持多种算法选择"""
ALGORITHMS = {
'linear_regression': LinearRegression,
'logistic_regression': LogisticRegression,
'random_forest_reg': RandomForestRegressor,
'random_forest_clf': RandomForestClassifier,
'xgboost_reg': XGBRegressor,
'xgboost_clf': XGBClassifier,
'mlp_regressor': MLPRegressor,
'mlp_classifier': MLPClassifier
}
def __init__(self, algorithm: str = 'random_forest_reg', scaler_type: str = 'standard'):
"""
初始化分析器
:param algorithm: 算法名称
:param scaler_type: 归一化类型 ('minmax', 'standard', None)
"""
if algorithm not in self.ALGORITHMS:
raise ValueError(f"不支持的算法,可选: {list(self.ALGORITHMS.keys())}")
self.algorithm = algorithm
self.model = None
self.scaler = self._get_scaler(scaler_type) # 根据类型初始化归一化器
self.label_encoders = {}
def _get_scaler(self, scaler_type: str):
"""根据类型返回对应的归一化器"""
if scaler_type == 'minmax':
return MinMaxScaler()
elif scaler_type == 'standard':
return StandardScaler()
return None # 不进行归一化
def preprocess_data(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> Union[pd.DataFrame, tuple]:
"""数据预处理"""
# 数值特征标准化
numeric_cols = X.select_dtypes(include=np.number).columns
if len(numeric_cols) > 0:
X[numeric_cols] = self.scaler.fit_transform(X[numeric_cols])
# 类别特征编码
for col in X.select_dtypes(exclude=np.number).columns:
le = LabelEncoder()
X[col] = le.fit_transform(X[col])
self.label_encoders[col] = le
return X if y is None else (X, y)
def fit(self, X: pd.DataFrame, y: pd.Series,
test_size: float = 0.2,
param_grid: Optional[Dict] = None,
cv: int = 5) -> Dict[str, float]:
"""
训练模型
:param X: 特征DataFrame
:param y: 目标Series
:param test_size: 测试集比例
:param param_grid: 参数网格(GridSearchCV使用)
:param cv: 交叉验证折数
:return: 评估指标
"""
# 数据预处理
X, y = self.preprocess_data(X, y)
# 确保y是1D数组(单输出)或保持2D数组(多输出)
if isinstance(y, (pd.DataFrame, pd.Series)):
y = y.values # 转换为numpy数组
if y.ndim == 2 and y.shape[1] == 1:
y = y.ravel()
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# 初始化模型
model = self.ALGORITHMS[self.algorithm]()
# 保存特征名称
self.feature_names_ = X_train.columns.tolist()
# 参数搜索或普通训练
if param_grid:
self.model = GridSearchCV(model, param_grid, cv=cv)
else:
self.model = model
self.model.fit(X_train, y_train)
# 评估
preds = self.model.predict(X_test)
if self.algorithm.endswith('_clf'):
results = {
'accuracy': accuracy_score(y_test, preds),
'precision': precision_score(y_test, preds, average='weighted'),
'recall': recall_score(y_test, preds, average='weighted'),
'f1': f1_score(y_test, preds, average='weighted')
}
else:
results = {
'mse': mean_squared_error(y_test, preds),
'mae': mean_absolute_error(y_test, preds),
'r2': r2_score(y_test, preds)
}
# 返回最佳参数(如果使用GridSearchCV)
if param_grid:
results['best_params'] = self.model.best_params_
sorted_results = self.sort_metrics(results)
return sorted_results
def save_model(self, filepath: str) -> None:
"""
保存模型到文件
:param filepath: 文件路径
"""
if self.model is None:
raise RuntimeError("没有训练好的模型可保存")
with open(filepath, 'wb') as f:
pickle.dump({
'model': self.model,
'algorithm': self.algorithm,
'scaler': self.scaler,
'label_encoders': self.label_encoders
}, f)
@classmethod
def load_model(cls, filepath: str) -> 'MLAnalyzer':
"""
从文件加载模型
:param filepath: 文件路径
:return: 加载的MLAnalyzer实例
"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"模型文件不存在: {filepath}")
with open(filepath, 'rb') as f:
data = pickle.load(f)
analyzer = cls(data['algorithm'])
analyzer.model = data['model']
analyzer.scaler = data['scaler']
analyzer.label_encoders = data['label_encoders']
return analyzer
def predict(self, X: pd.DataFrame, return_df: bool = False) -> Union[np.ndarray, pd.DataFrame]:
"""
预测新数据
:param X: 输入特征
:param return_df: 是否返回DataFrame
:return: 预测结果(numpy数组或DataFrame)
"""
if self.model is None:
raise RuntimeError("请先训练模型")
X = self.preprocess_data(X)
preds = self.model.predict(X)
if return_df:
if isinstance(preds, np.ndarray) and preds.ndim == 1:
return pd.DataFrame(preds, columns=['prediction'])
elif isinstance(preds, np.ndarray) and preds.ndim == 2:
return pd.DataFrame(preds,
columns=[f'output_{i}' for i in range(preds.shape[1])])
return preds
def get_feature_importance(self) -> pd.DataFrame:
"""获取特征重要性"""
if self.model is None:
raise RuntimeError("请先训练模型")
# 处理随机森林/XGBoost等模型
if hasattr(self.model, 'feature_importances_'):
# 获取特征名称(兼容新旧版本scikit-learn)
if hasattr(self.model, 'feature_names_in_'):
feature_names = self.model.feature_names_in_
elif hasattr(self.model, 'feature_importances_'):
# 对于旧版本使用X_train的列名(需要确保X_train是DataFrame)
if hasattr(self.model, 'estimator') and hasattr(self.model.estimator, 'feature_names_in_'):
feature_names = self.model.estimator.feature_names_in_
else:
# 如果无法获取特征名,使用通用名称
feature_names = [f'feature_{i}' for i in range(len(self.model.feature_importances_))]
# 使用保存的实际特征名称或通用名称
if self.feature_names_ and len(self.feature_names_) == len(self.model.feature_importances_):
feature_names = self.feature_names_
else:
feature_names = [f'feature_{i}' for i in range(len(self.model.feature_importances_))]
return pd.DataFrame({
'feature': feature_names,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
# 处理线性回归模型
elif hasattr(self.model, 'coef_'):
if isinstance(self.model.coef_, np.ndarray) and self.model.coef_.ndim == 1:
# 单输出回归
coef = np.abs(self.model.coef_)
else:
# 多输出回归
coef = np.mean(np.abs(self.model.coef_), axis=0)
# 使用保存的特征名称或通用名称
if hasattr(self, 'feature_names_') and len(self.feature_names_) == len(coef):
feature_names = self.feature_names_
else:
feature_names = [f'feature_{i}' for i in range(len(coef))]
return pd.DataFrame({
'feature': feature_names,
'importance': coef
}).sort_values('importance', ascending=False)
return pd.DataFrame()
def sort_metrics(self, metrics: Dict[str, float],
ascending: bool = False) -> Dict[str, float]:
"""
对评估指标进行排序
:param metrics: 评估指标字典
:param ascending: 是否升序排列
:return: 排序后的指标字典
"""
return dict(sorted(metrics.items(),
key=lambda x: x[1],
reverse=not ascending))
def evaluate(self, X: pd.DataFrame, y: pd.Series,
average: str = 'weighted',
multioutput: Union[str, List[float]] = 'uniform_average',
sample_weight: Optional[np.ndarray] = None) -> Dict[str, float]:
"""
评估模型性能(支持多输出)
:param X: 特征数据
:param y: 目标数据
:param average: 多分类评估方式('micro','macro','weighted')
:param multioutput: 多输出评估方式('raw_values','uniform_average',权重数组)
:param sample_weight: 样本权重
:return: 评估指标字典
"""
if self.model is None:
raise RuntimeError("请先训练模型")
preds = self.predict(X)
y_arr = y.values if isinstance(y, pd.Series) else y
results = {}
if self.algorithm.endswith('_clf'):
# 分类任务评估
if preds.ndim == 1 or y_arr.ndim == 1:
# 单输出分类
results.update({
'accuracy': accuracy_score(y_arr, preds, sample_weight=sample_weight),
'precision': precision_score(y_arr, preds, average=average, sample_weight=sample_weight),
'recall': recall_score(y_arr, preds, average=average, sample_weight=sample_weight),
'f1': f1_score(y_arr, preds, average=average, sample_weight=sample_weight)
})
else:
# 多输出分类
for i in range(preds.shape[1]):
results[f'output_{i}_accuracy'] = accuracy_score(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
results[f'output_{i}_precision'] = precision_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
results[f'output_{i}_recall'] = recall_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
results[f'output_{i}_f1'] = f1_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
# 计算平均指标
results['mean_accuracy'] = np.mean([v for k,v in results.items() if 'accuracy' in k])
results['mean_precision'] = np.mean([v for k,v in results.items() if 'precision' in k])
results['mean_recall'] = np.mean([v for k,v in results.items() if 'recall' in k])
results['mean_f1'] = np.mean([v for k,v in results.items() if 'f1' in k])
else:
# 回归任务评估
results.update({
'mse': mean_squared_error(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight),
'mae': mean_absolute_error(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight),
'r2': r2_score(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight)
})
if preds.ndim > 1 and multioutput == 'raw_values':
# 多输出回归的详细指标
for i in range(preds.shape[1]):
results[f'output_{i}_mse'] = mean_squared_error(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
results[f'output_{i}_mae'] = mean_absolute_error(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
results[f'output_{i}_r2'] = r2_score(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
return results
def _get_column_names(self, y: Union[pd.Series, pd.DataFrame, np.ndarray]) -> List[str]:
"""获取列名列表"""
if isinstance(y, pd.Series):
return [y.name] if y.name else ['target']
elif isinstance(y, pd.DataFrame):
return y.columns.tolist()
else: # numpy array
if y.ndim == 1:
return ['target']
return [f'target_{i+1}' for i in range(y.shape[1])]
def plot_predictions(self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame, np.ndarray],
n_samples: int = 50, figsize: Union[tuple, list] = (12, 6),
save_path: Optional[str] = None) -> None:
"""
绘制预测值与实际值的对比图
:param X: 特征数据
:param y: 实际值
:param n_samples: 显示的样本数量(默认50)
:param figsize: 图表大小(默认(12,6))
:param save_path: 图片保存路径(如'plot.png'), None表示不保存
"""
# 验证figsize参数
if not isinstance(figsize, (tuple, list)) or len(figsize) != 2:
figsize = (12, 6)
try:
figsize = tuple(float(x) for x in figsize)
except (TypeError, ValueError):
figsize = (12, 6)
"""
绘制预测值与实际值的对比图
:param X: 特征数据
:param y: 实际值
:param n_samples: 显示的样本数量
:param figsize: 图表大小
:param save_path: 图片保存路径(如'plot.png'), None表示不保存
"""
import matplotlib.pyplot as plt
if self.model is None:
raise RuntimeError("请先训练模型")
# 获取预测结果
predictions = self.predict(X)
# 获取列名
col_names = self._get_column_names(y)
# 转换实际值为numpy数组
if isinstance(y, (pd.Series, pd.DataFrame)):
y_values = y.values
else:
y_values = y
# 创建图表
plt.figure(figsize=figsize)
# 处理单输出情况
if predictions.ndim == 1 or y_values.ndim == 1:
plt.plot(y_values[:n_samples], label=col_names[0], marker='o', alpha=0.7)
plt.plot(predictions[:n_samples], label=f'{col_names[0]}_predicted', marker='x', alpha=0.7)
else:
# 多输出情况
for i in range(predictions.shape[1]):
col_name = col_names[i] if i < len(col_names) else f'target_{i+1}'
plt.plot(y_values[:n_samples, i], label=col_name, alpha=0.7)
plt.plot(predictions[:n_samples, i], '--', label=f'{col_name}_predicted', alpha=0.7)
plt.title('predicted value and actual value')
plt.xlabel('index of sample')
plt.ylabel('value')
plt.legend()
plt.grid(True)
plt.tight_layout()
# 保存图表
if save_path:
supported_formats = ['.png', '.jpg', '.jpeg', '.pdf', '.svg']
if not any(save_path.lower().endswith(fmt) for fmt in supported_formats):
save_path += '.png' # 默认png格式
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存到 {save_path}")
#plt.show()
def save_prediction_results(self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame, np.ndarray],
filepath: str = 'prediction_results.csv') -> None:
"""
保存预测结果和实际值到CSV文件
:param X: 特征数据
:param y: 实际值(Series/DataFrame或numpy数组)
:param filepath: 输出文件路径
"""
if self.model is None:
raise RuntimeError("请先训练模型")
# 获取预测结果
predictions = self.predict(X)
# 创建结果DataFrame
results_df = pd.DataFrame()
# 处理实际值(支持单输出和多输出)
if isinstance(y, (pd.Series, pd.DataFrame)):
# 获取列名
if isinstance(y, pd.Series):
actual_cols = [y.name] if y.name else ['target']
else:
actual_cols = y.columns.tolist()
# 处理单列和多列情况
if len(actual_cols) == 1:
results_df[actual_cols[0]] = y.values.ravel()
else:
for i, col in enumerate(actual_cols):
results_df[col] = y.iloc[:, i].values if isinstance(y, pd.DataFrame) else y[:, i]
elif isinstance(y, np.ndarray):
if y.ndim == 1:
results_df['target'] = y
else:
for i in range(y.shape[1]):
results_df[f'target_{i+1}'] = y[:, i]
# 处理预测值(支持单输出和多输出)
if isinstance(predictions, np.ndarray):
if predictions.ndim == 1:
col_name = actual_cols[0] if 'actual_cols' in locals() else 'target'
results_df[f'{col_name}_predicted'] = predictions
else:
cols = actual_cols if 'actual_cols' in locals() else [f'target_{i+1}' for i in range(predictions.shape[1])]
for i, col in enumerate(cols):
results_df[f'{col}_predicted'] = predictions[:, i]
# 保存到CSV
results_df.to_csv(filepath, index=False)
print(f"\nThe prediction results have been saved to {filepath}")
from mlearn.DataFrameCleaner import DataFrameCleaner
from mlearn.PandasDataIO import PandasDataIO
from file.PathUtil import PathUtil
# 使用示例
if __name__ == "__main__":
path = PathUtil().getEnv('HOME') + '/data/'
data = PandasDataIO().read_csv(path + 't_mode_pdo.csv')
print("\n原始数据:")
print(data)
cleaned_df = (
DataFrameCleaner(data)
# .normalize_strings('name', case='title', strip=True)
# .normalize_headers(case='lower')
.convert_types({'STEELGRADE': 'category'}) # 转换数据类型
#.encode_categorical(['STEELGRADE'], method='label', drop=True)
).get_cleaned_data()
print("\n清洗后数据:")
print(cleaned_df)
print(cleaned_df.dtypes)
# data拆分成训练机测试集
from sklearn.model_selection import train_test_split
# Split the data into training and testing sets
train_df, test_df = train_test_split(
cleaned_df, test_size=0.2, random_state=42)
# Use the training set for fitting the model and the test set for predictions
# X_train = train_df[['TL_N1_INTERMESH_PV_AVE',
# 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_ELONG_PV_AVE', 'STEELGRADE']]
# y_train = train_df['TL_TEN_ENT_PV_AVE']
# X_test = test_df[['TL_N1_INTERMESH_PV_AVE',
# 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE','TL_ELONG_PV_AVE', 'STEELGRADE']]
# y_test = test_df['TL_TEN_ENT_PV_AVE']
#X_train = train_df.loc[:, train_df.columns != "TL_TEN_ENT_PV_AVE"]
#y_train = train_df['TL_TEN_ENT_PV_AVE']
#X_test = test_df.loc[:, test_df.columns != "TL_TEN_ENT_PV_AVE"]
#y_test = test_df['TL_TEN_ENT_PV_AVE']
X_train = train_df.drop(['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE'], axis=1)
y_train = train_df[['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE']]
X_test = test_df.drop(['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE'], axis=1)
y_test = test_df[['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE']]
# 回归分析
print("=== 回归分析 ===")
analyzer_reg = MLAnalyzer('random_forest_reg')
# analyzer_reg = MLAnalyzer('linear_regression', scaler_type='standard')
# analyzer_reg = MLAnalyzer('xgboost_reg')
# analyzer_reg = MLAnalyzer('mlp_regressor')
# analyzer_reg = MLAnalyzer('logistic_regression')
metrics = analyzer_reg.fit(X_train, y_train)
print("模型性能:", metrics)
# analyzer_reg.plot_metrics(metrics)
print("特征重要性:")
print(analyzer_reg.get_feature_importance())
# analyzer_reg.save_model('model.pkl')
# analyzer_reg = MLAnalyzer.load_model('model.pkl')
print(X_test[10:12])
# 模型预测
predictions = analyzer_reg.predict(X_test)
print("\n预测结果(前5个):")
print(predictions[:5])
analyzer_reg.save_prediction_results( X_test, y_test, path + 'prediction_results.csv')
analyzer_reg.plot_predictions(X_test, y_test, 60, save_path = path + 'prediction_results.png')
# 模型评估
print("\n=== 模型评估 ===")
test_metrics = analyzer_reg.evaluate(X_test, y_test)
print("测试集评估指标:")
print(test_metrics)