eis/py/comlib/mlearn/MLAnalyzer.py

564 lines
22 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
MLAnalyzer.py - 机器学习模型分析与训练工具
该模块提供了一个灵活的机器学习分析器类(MLAnalyzer)支持多种常见算法的训练和评估
主要功能包括
- 支持多种监督学习算法回归和分类
- 自动数据预处理数值特征标准化和类别特征编码
- 模型训练和评估
- 可扩展的算法集合
使用方法
1. 实例化MLAnalyzer指定所需算法
2. 调用preprocess_data进行数据预处理
3. 使用fit方法训练模型
4. 使用predict方法进行预测
5. 使用evaluate方法评估模型性能
依赖库
- pandas: 数据处理
- numpy: 数值计算
- scikit-learn: 机器学习算法和工具
- xgboost: 高性能梯度提升框架
- pickle: 模型序列化
Author
- Author : zoufuzhou
- Date : 2025-05-21 16:34:37
- Description : Machine learning analysis
- LastEditTime : 2025-05-21 16:34:37
"""
import pandas as pd
import numpy as np
from typing import Dict, Union, List, Optional
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression
from xgboost import XGBRegressor, XGBClassifier
from sklearn.neural_network import MLPRegressor, MLPClassifier
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
mean_squared_error, mean_absolute_error, r2_score
)
import pickle
import os
class MLAnalyzer:
"""机器学习分析器,支持多种算法选择"""
ALGORITHMS = {
'linear_regression': LinearRegression,
'logistic_regression': LogisticRegression,
'random_forest_reg': RandomForestRegressor,
'random_forest_clf': RandomForestClassifier,
'xgboost_reg': XGBRegressor,
'xgboost_clf': XGBClassifier,
'mlp_regressor': MLPRegressor,
'mlp_classifier': MLPClassifier
}
def __init__(self, algorithm: str = 'random_forest_reg', scaler_type: str = 'standard'):
"""
初始化分析器
:param algorithm: 算法名称
:param scaler_type: 归一化类型 ('minmax', 'standard', None)
"""
if algorithm not in self.ALGORITHMS:
raise ValueError(f"不支持的算法,可选: {list(self.ALGORITHMS.keys())}")
self.algorithm = algorithm
self.model = None
self.scaler = self._get_scaler(scaler_type) # 根据类型初始化归一化器
self.label_encoders = {}
def _get_scaler(self, scaler_type: str):
"""根据类型返回对应的归一化器"""
if scaler_type == 'minmax':
return MinMaxScaler()
elif scaler_type == 'standard':
return StandardScaler()
return None # 不进行归一化
def preprocess_data(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> Union[pd.DataFrame, tuple]:
"""数据预处理"""
# 数值特征标准化
numeric_cols = X.select_dtypes(include=np.number).columns
if len(numeric_cols) > 0:
X[numeric_cols] = self.scaler.fit_transform(X[numeric_cols])
# 类别特征编码
for col in X.select_dtypes(exclude=np.number).columns:
le = LabelEncoder()
X[col] = le.fit_transform(X[col])
self.label_encoders[col] = le
return X if y is None else (X, y)
def fit(self, X: pd.DataFrame, y: pd.Series,
test_size: float = 0.2,
param_grid: Optional[Dict] = None,
cv: int = 5) -> Dict[str, float]:
"""
训练模型
:param X: 特征DataFrame
:param y: 目标Series
:param test_size: 测试集比例
:param param_grid: 参数网格(GridSearchCV使用)
:param cv: 交叉验证折数
:return: 评估指标
"""
# 数据预处理
X, y = self.preprocess_data(X, y)
# 确保y是1D数组(单输出)或保持2D数组(多输出)
if isinstance(y, (pd.DataFrame, pd.Series)):
y = y.values # 转换为numpy数组
if y.ndim == 2 and y.shape[1] == 1:
y = y.ravel()
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# 初始化模型
model = self.ALGORITHMS[self.algorithm]()
# 保存特征名称
self.feature_names_ = X_train.columns.tolist()
# 参数搜索或普通训练
if param_grid:
self.model = GridSearchCV(model, param_grid, cv=cv)
else:
self.model = model
self.model.fit(X_train, y_train)
# 评估
preds = self.model.predict(X_test)
if self.algorithm.endswith('_clf'):
results = {
'accuracy': accuracy_score(y_test, preds),
'precision': precision_score(y_test, preds, average='weighted'),
'recall': recall_score(y_test, preds, average='weighted'),
'f1': f1_score(y_test, preds, average='weighted')
}
else:
results = {
'mse': mean_squared_error(y_test, preds),
'mae': mean_absolute_error(y_test, preds),
'r2': r2_score(y_test, preds)
}
# 返回最佳参数(如果使用GridSearchCV)
if param_grid:
results['best_params'] = self.model.best_params_
sorted_results = self.sort_metrics(results)
return sorted_results
def save_model(self, filepath: str) -> None:
"""
保存模型到文件
:param filepath: 文件路径
"""
if self.model is None:
raise RuntimeError("没有训练好的模型可保存")
with open(filepath, 'wb') as f:
pickle.dump({
'model': self.model,
'algorithm': self.algorithm,
'scaler': self.scaler,
'label_encoders': self.label_encoders
}, f)
@classmethod
def load_model(cls, filepath: str) -> 'MLAnalyzer':
"""
从文件加载模型
:param filepath: 文件路径
:return: 加载的MLAnalyzer实例
"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"模型文件不存在: {filepath}")
with open(filepath, 'rb') as f:
data = pickle.load(f)
analyzer = cls(data['algorithm'])
analyzer.model = data['model']
analyzer.scaler = data['scaler']
analyzer.label_encoders = data['label_encoders']
return analyzer
def predict(self, X: pd.DataFrame, return_df: bool = False) -> Union[np.ndarray, pd.DataFrame]:
"""
预测新数据
:param X: 输入特征
:param return_df: 是否返回DataFrame
:return: 预测结果(numpy数组或DataFrame)
"""
if self.model is None:
raise RuntimeError("请先训练模型")
X = self.preprocess_data(X)
preds = self.model.predict(X)
if return_df:
if isinstance(preds, np.ndarray) and preds.ndim == 1:
return pd.DataFrame(preds, columns=['prediction'])
elif isinstance(preds, np.ndarray) and preds.ndim == 2:
return pd.DataFrame(preds,
columns=[f'output_{i}' for i in range(preds.shape[1])])
return preds
def get_feature_importance(self) -> pd.DataFrame:
"""获取特征重要性"""
if self.model is None:
raise RuntimeError("请先训练模型")
# 处理随机森林/XGBoost等模型
if hasattr(self.model, 'feature_importances_'):
# 获取特征名称(兼容新旧版本scikit-learn)
if hasattr(self.model, 'feature_names_in_'):
feature_names = self.model.feature_names_in_
elif hasattr(self.model, 'feature_importances_'):
# 对于旧版本使用X_train的列名(需要确保X_train是DataFrame)
if hasattr(self.model, 'estimator') and hasattr(self.model.estimator, 'feature_names_in_'):
feature_names = self.model.estimator.feature_names_in_
else:
# 如果无法获取特征名,使用通用名称
feature_names = [f'feature_{i}' for i in range(len(self.model.feature_importances_))]
# 使用保存的实际特征名称或通用名称
if self.feature_names_ and len(self.feature_names_) == len(self.model.feature_importances_):
feature_names = self.feature_names_
else:
feature_names = [f'feature_{i}' for i in range(len(self.model.feature_importances_))]
return pd.DataFrame({
'feature': feature_names,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
# 处理线性回归模型
elif hasattr(self.model, 'coef_'):
if isinstance(self.model.coef_, np.ndarray) and self.model.coef_.ndim == 1:
# 单输出回归
coef = np.abs(self.model.coef_)
else:
# 多输出回归
coef = np.mean(np.abs(self.model.coef_), axis=0)
# 使用保存的特征名称或通用名称
if hasattr(self, 'feature_names_') and len(self.feature_names_) == len(coef):
feature_names = self.feature_names_
else:
feature_names = [f'feature_{i}' for i in range(len(coef))]
return pd.DataFrame({
'feature': feature_names,
'importance': coef
}).sort_values('importance', ascending=False)
return pd.DataFrame()
def sort_metrics(self, metrics: Dict[str, float],
ascending: bool = False) -> Dict[str, float]:
"""
对评估指标进行排序
:param metrics: 评估指标字典
:param ascending: 是否升序排列
:return: 排序后的指标字典
"""
return dict(sorted(metrics.items(),
key=lambda x: x[1],
reverse=not ascending))
def evaluate(self, X: pd.DataFrame, y: pd.Series,
average: str = 'weighted',
multioutput: Union[str, List[float]] = 'uniform_average',
sample_weight: Optional[np.ndarray] = None) -> Dict[str, float]:
"""
评估模型性能(支持多输出)
:param X: 特征数据
:param y: 目标数据
:param average: 多分类评估方式('micro','macro','weighted')
:param multioutput: 多输出评估方式('raw_values','uniform_average',权重数组)
:param sample_weight: 样本权重
:return: 评估指标字典
"""
if self.model is None:
raise RuntimeError("请先训练模型")
preds = self.predict(X)
y_arr = y.values if isinstance(y, pd.Series) else y
results = {}
if self.algorithm.endswith('_clf'):
# 分类任务评估
if preds.ndim == 1 or y_arr.ndim == 1:
# 单输出分类
results.update({
'accuracy': accuracy_score(y_arr, preds, sample_weight=sample_weight),
'precision': precision_score(y_arr, preds, average=average, sample_weight=sample_weight),
'recall': recall_score(y_arr, preds, average=average, sample_weight=sample_weight),
'f1': f1_score(y_arr, preds, average=average, sample_weight=sample_weight)
})
else:
# 多输出分类
for i in range(preds.shape[1]):
results[f'output_{i}_accuracy'] = accuracy_score(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
results[f'output_{i}_precision'] = precision_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
results[f'output_{i}_recall'] = recall_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
results[f'output_{i}_f1'] = f1_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
# 计算平均指标
results['mean_accuracy'] = np.mean([v for k,v in results.items() if 'accuracy' in k])
results['mean_precision'] = np.mean([v for k,v in results.items() if 'precision' in k])
results['mean_recall'] = np.mean([v for k,v in results.items() if 'recall' in k])
results['mean_f1'] = np.mean([v for k,v in results.items() if 'f1' in k])
else:
# 回归任务评估
results.update({
'mse': mean_squared_error(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight),
'mae': mean_absolute_error(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight),
'r2': r2_score(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight)
})
if preds.ndim > 1 and multioutput == 'raw_values':
# 多输出回归的详细指标
for i in range(preds.shape[1]):
results[f'output_{i}_mse'] = mean_squared_error(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
results[f'output_{i}_mae'] = mean_absolute_error(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
results[f'output_{i}_r2'] = r2_score(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
return results
def _get_column_names(self, y: Union[pd.Series, pd.DataFrame, np.ndarray]) -> List[str]:
"""获取列名列表"""
if isinstance(y, pd.Series):
return [y.name] if y.name else ['target']
elif isinstance(y, pd.DataFrame):
return y.columns.tolist()
else: # numpy array
if y.ndim == 1:
return ['target']
return [f'target_{i+1}' for i in range(y.shape[1])]
def plot_predictions(self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame, np.ndarray],
n_samples: int = 50, figsize: Union[tuple, list] = (12, 6),
save_path: Optional[str] = None) -> None:
"""
绘制预测值与实际值的对比图
:param X: 特征数据
:param y: 实际值
:param n_samples: 显示的样本数量(默认50)
:param figsize: 图表大小(默认(12,6))
:param save_path: 图片保存路径('plot.png'), None表示不保存
"""
# 验证figsize参数
if not isinstance(figsize, (tuple, list)) or len(figsize) != 2:
figsize = (12, 6)
try:
figsize = tuple(float(x) for x in figsize)
except (TypeError, ValueError):
figsize = (12, 6)
"""
绘制预测值与实际值的对比图
:param X: 特征数据
:param y: 实际值
:param n_samples: 显示的样本数量
:param figsize: 图表大小
:param save_path: 图片保存路径('plot.png'), None表示不保存
"""
import matplotlib.pyplot as plt
if self.model is None:
raise RuntimeError("请先训练模型")
# 获取预测结果
predictions = self.predict(X)
# 获取列名
col_names = self._get_column_names(y)
# 转换实际值为numpy数组
if isinstance(y, (pd.Series, pd.DataFrame)):
y_values = y.values
else:
y_values = y
# 创建图表
plt.figure(figsize=figsize)
# 处理单输出情况
if predictions.ndim == 1 or y_values.ndim == 1:
plt.plot(y_values[:n_samples], label=col_names[0], marker='o', alpha=0.7)
plt.plot(predictions[:n_samples], label=f'{col_names[0]}_predicted', marker='x', alpha=0.7)
else:
# 多输出情况
for i in range(predictions.shape[1]):
col_name = col_names[i] if i < len(col_names) else f'target_{i+1}'
plt.plot(y_values[:n_samples, i], label=col_name, alpha=0.7)
plt.plot(predictions[:n_samples, i], '--', label=f'{col_name}_predicted', alpha=0.7)
plt.title('predicted value and actual value')
plt.xlabel('index of sample')
plt.ylabel('value')
plt.legend()
plt.grid(True)
plt.tight_layout()
# 保存图表
if save_path:
supported_formats = ['.png', '.jpg', '.jpeg', '.pdf', '.svg']
if not any(save_path.lower().endswith(fmt) for fmt in supported_formats):
save_path += '.png' # 默认png格式
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存到 {save_path}")
#plt.show()
def save_prediction_results(self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame, np.ndarray],
filepath: str = 'prediction_results.csv') -> None:
"""
保存预测结果和实际值到CSV文件
:param X: 特征数据
:param y: 实际值(Series/DataFrame或numpy数组)
:param filepath: 输出文件路径
"""
if self.model is None:
raise RuntimeError("请先训练模型")
# 获取预测结果
predictions = self.predict(X)
# 创建结果DataFrame
results_df = pd.DataFrame()
# 处理实际值(支持单输出和多输出)
if isinstance(y, (pd.Series, pd.DataFrame)):
# 获取列名
if isinstance(y, pd.Series):
actual_cols = [y.name] if y.name else ['target']
else:
actual_cols = y.columns.tolist()
# 处理单列和多列情况
if len(actual_cols) == 1:
results_df[actual_cols[0]] = y.values.ravel()
else:
for i, col in enumerate(actual_cols):
results_df[col] = y.iloc[:, i].values if isinstance(y, pd.DataFrame) else y[:, i]
elif isinstance(y, np.ndarray):
if y.ndim == 1:
results_df['target'] = y
else:
for i in range(y.shape[1]):
results_df[f'target_{i+1}'] = y[:, i]
# 处理预测值(支持单输出和多输出)
if isinstance(predictions, np.ndarray):
if predictions.ndim == 1:
col_name = actual_cols[0] if 'actual_cols' in locals() else 'target'
results_df[f'{col_name}_predicted'] = predictions
else:
cols = actual_cols if 'actual_cols' in locals() else [f'target_{i+1}' for i in range(predictions.shape[1])]
for i, col in enumerate(cols):
results_df[f'{col}_predicted'] = predictions[:, i]
# 保存到CSV
results_df.to_csv(filepath, index=False)
print(f"\nThe prediction results have been saved to {filepath}")
from mlearn.DataFrameCleaner import DataFrameCleaner
from mlearn.PandasDataIO import PandasDataIO
from file.PathUtil import PathUtil
# 使用示例
if __name__ == "__main__":
path = PathUtil().getEnv('HOME') + '/data/'
data = PandasDataIO().read_csv(path + 't_mode_pdo.csv')
print("\n原始数据:")
print(data)
cleaned_df = (
DataFrameCleaner(data)
# .normalize_strings('name', case='title', strip=True)
# .normalize_headers(case='lower')
.convert_types({'STEELGRADE': 'category'}) # 转换数据类型
#.encode_categorical(['STEELGRADE'], method='label', drop=True)
).get_cleaned_data()
print("\n清洗后数据:")
print(cleaned_df)
print(cleaned_df.dtypes)
# data拆分成训练机测试集
from sklearn.model_selection import train_test_split
# Split the data into training and testing sets
train_df, test_df = train_test_split(
cleaned_df, test_size=0.2, random_state=42)
# Use the training set for fitting the model and the test set for predictions
# X_train = train_df[['TL_N1_INTERMESH_PV_AVE',
# 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_ELONG_PV_AVE', 'STEELGRADE']]
# y_train = train_df['TL_TEN_ENT_PV_AVE']
# X_test = test_df[['TL_N1_INTERMESH_PV_AVE',
# 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE','TL_ELONG_PV_AVE', 'STEELGRADE']]
# y_test = test_df['TL_TEN_ENT_PV_AVE']
#X_train = train_df.loc[:, train_df.columns != "TL_TEN_ENT_PV_AVE"]
#y_train = train_df['TL_TEN_ENT_PV_AVE']
#X_test = test_df.loc[:, test_df.columns != "TL_TEN_ENT_PV_AVE"]
#y_test = test_df['TL_TEN_ENT_PV_AVE']
X_train = train_df.drop(['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE'], axis=1)
y_train = train_df[['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE']]
X_test = test_df.drop(['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE'], axis=1)
y_test = test_df[['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE']]
# 回归分析
print("=== 回归分析 ===")
analyzer_reg = MLAnalyzer('random_forest_reg')
# analyzer_reg = MLAnalyzer('linear_regression', scaler_type='standard')
# analyzer_reg = MLAnalyzer('xgboost_reg')
# analyzer_reg = MLAnalyzer('mlp_regressor')
# analyzer_reg = MLAnalyzer('logistic_regression')
metrics = analyzer_reg.fit(X_train, y_train)
print("模型性能:", metrics)
# analyzer_reg.plot_metrics(metrics)
print("特征重要性:")
print(analyzer_reg.get_feature_importance())
# analyzer_reg.save_model('model.pkl')
# analyzer_reg = MLAnalyzer.load_model('model.pkl')
print(X_test[10:12])
# 模型预测
predictions = analyzer_reg.predict(X_test)
print("\n预测结果(前5个):")
print(predictions[:5])
analyzer_reg.save_prediction_results( X_test, y_test, path + 'prediction_results.csv')
analyzer_reg.plot_predictions(X_test, y_test, 60, save_path = path + 'prediction_results.png')
# 模型评估
print("\n=== 模型评估 ===")
test_metrics = analyzer_reg.evaluate(X_test, y_test)
print("测试集评估指标:")
print(test_metrics)