564 lines
22 KiB
Python
564 lines
22 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
MLAnalyzer.py - 机器学习模型分析与训练工具
|
|||
|
|
|
|||
|
|
该模块提供了一个灵活的机器学习分析器类(MLAnalyzer),支持多种常见算法的训练和评估。
|
|||
|
|
主要功能包括:
|
|||
|
|
- 支持多种监督学习算法(回归和分类)
|
|||
|
|
- 自动数据预处理(数值特征标准化和类别特征编码)
|
|||
|
|
- 模型训练和评估
|
|||
|
|
- 可扩展的算法集合
|
|||
|
|
|
|||
|
|
使用方法:
|
|||
|
|
1. 实例化MLAnalyzer,指定所需算法
|
|||
|
|
2. 调用preprocess_data进行数据预处理
|
|||
|
|
3. 使用fit方法训练模型
|
|||
|
|
4. 使用predict方法进行预测
|
|||
|
|
5. 使用evaluate方法评估模型性能
|
|||
|
|
|
|||
|
|
依赖库:
|
|||
|
|
- pandas: 数据处理
|
|||
|
|
- numpy: 数值计算
|
|||
|
|
- scikit-learn: 机器学习算法和工具
|
|||
|
|
- xgboost: 高性能梯度提升框架
|
|||
|
|
- pickle: 模型序列化
|
|||
|
|
|
|||
|
|
Author:
|
|||
|
|
- Author : zoufuzhou
|
|||
|
|
- Date : 2025-05-21 16:34:37
|
|||
|
|
- Description : Machine learning analysis
|
|||
|
|
- LastEditTime : 2025-05-21 16:34:37
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import pandas as pd
|
|||
|
|
import numpy as np
|
|||
|
|
from typing import Dict, Union, List, Optional
|
|||
|
|
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
|
|||
|
|
from sklearn.linear_model import LinearRegression, LogisticRegression
|
|||
|
|
from xgboost import XGBRegressor, XGBClassifier
|
|||
|
|
from sklearn.neural_network import MLPRegressor, MLPClassifier
|
|||
|
|
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder
|
|||
|
|
from sklearn.model_selection import train_test_split, GridSearchCV
|
|||
|
|
from sklearn.metrics import (
|
|||
|
|
accuracy_score, precision_score, recall_score, f1_score,
|
|||
|
|
mean_squared_error, mean_absolute_error, r2_score
|
|||
|
|
)
|
|||
|
|
import pickle
|
|||
|
|
import os
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MLAnalyzer:
|
|||
|
|
"""机器学习分析器,支持多种算法选择"""
|
|||
|
|
|
|||
|
|
ALGORITHMS = {
|
|||
|
|
'linear_regression': LinearRegression,
|
|||
|
|
'logistic_regression': LogisticRegression,
|
|||
|
|
'random_forest_reg': RandomForestRegressor,
|
|||
|
|
'random_forest_clf': RandomForestClassifier,
|
|||
|
|
'xgboost_reg': XGBRegressor,
|
|||
|
|
'xgboost_clf': XGBClassifier,
|
|||
|
|
'mlp_regressor': MLPRegressor,
|
|||
|
|
'mlp_classifier': MLPClassifier
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def __init__(self, algorithm: str = 'random_forest_reg', scaler_type: str = 'standard'):
|
|||
|
|
"""
|
|||
|
|
初始化分析器
|
|||
|
|
:param algorithm: 算法名称
|
|||
|
|
:param scaler_type: 归一化类型 ('minmax', 'standard', None)
|
|||
|
|
"""
|
|||
|
|
if algorithm not in self.ALGORITHMS:
|
|||
|
|
raise ValueError(f"不支持的算法,可选: {list(self.ALGORITHMS.keys())}")
|
|||
|
|
self.algorithm = algorithm
|
|||
|
|
self.model = None
|
|||
|
|
self.scaler = self._get_scaler(scaler_type) # 根据类型初始化归一化器
|
|||
|
|
self.label_encoders = {}
|
|||
|
|
|
|||
|
|
def _get_scaler(self, scaler_type: str):
|
|||
|
|
"""根据类型返回对应的归一化器"""
|
|||
|
|
if scaler_type == 'minmax':
|
|||
|
|
return MinMaxScaler()
|
|||
|
|
elif scaler_type == 'standard':
|
|||
|
|
return StandardScaler()
|
|||
|
|
return None # 不进行归一化
|
|||
|
|
|
|||
|
|
def preprocess_data(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> Union[pd.DataFrame, tuple]:
|
|||
|
|
"""数据预处理"""
|
|||
|
|
# 数值特征标准化
|
|||
|
|
numeric_cols = X.select_dtypes(include=np.number).columns
|
|||
|
|
if len(numeric_cols) > 0:
|
|||
|
|
X[numeric_cols] = self.scaler.fit_transform(X[numeric_cols])
|
|||
|
|
|
|||
|
|
# 类别特征编码
|
|||
|
|
for col in X.select_dtypes(exclude=np.number).columns:
|
|||
|
|
le = LabelEncoder()
|
|||
|
|
X[col] = le.fit_transform(X[col])
|
|||
|
|
self.label_encoders[col] = le
|
|||
|
|
|
|||
|
|
return X if y is None else (X, y)
|
|||
|
|
|
|||
|
|
def fit(self, X: pd.DataFrame, y: pd.Series,
|
|||
|
|
test_size: float = 0.2,
|
|||
|
|
param_grid: Optional[Dict] = None,
|
|||
|
|
cv: int = 5) -> Dict[str, float]:
|
|||
|
|
"""
|
|||
|
|
训练模型
|
|||
|
|
:param X: 特征DataFrame
|
|||
|
|
:param y: 目标Series
|
|||
|
|
:param test_size: 测试集比例
|
|||
|
|
:param param_grid: 参数网格(GridSearchCV使用)
|
|||
|
|
:param cv: 交叉验证折数
|
|||
|
|
:return: 评估指标
|
|||
|
|
"""
|
|||
|
|
# 数据预处理
|
|||
|
|
X, y = self.preprocess_data(X, y)
|
|||
|
|
|
|||
|
|
# 确保y是1D数组(单输出)或保持2D数组(多输出)
|
|||
|
|
if isinstance(y, (pd.DataFrame, pd.Series)):
|
|||
|
|
y = y.values # 转换为numpy数组
|
|||
|
|
|
|||
|
|
if y.ndim == 2 and y.shape[1] == 1:
|
|||
|
|
y = y.ravel()
|
|||
|
|
|
|||
|
|
# 划分训练测试集
|
|||
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|||
|
|
X, y, test_size=test_size, random_state=42
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 初始化模型
|
|||
|
|
model = self.ALGORITHMS[self.algorithm]()
|
|||
|
|
|
|||
|
|
# 保存特征名称
|
|||
|
|
self.feature_names_ = X_train.columns.tolist()
|
|||
|
|
|
|||
|
|
# 参数搜索或普通训练
|
|||
|
|
if param_grid:
|
|||
|
|
self.model = GridSearchCV(model, param_grid, cv=cv)
|
|||
|
|
else:
|
|||
|
|
self.model = model
|
|||
|
|
|
|||
|
|
self.model.fit(X_train, y_train)
|
|||
|
|
|
|||
|
|
# 评估
|
|||
|
|
preds = self.model.predict(X_test)
|
|||
|
|
if self.algorithm.endswith('_clf'):
|
|||
|
|
results = {
|
|||
|
|
'accuracy': accuracy_score(y_test, preds),
|
|||
|
|
'precision': precision_score(y_test, preds, average='weighted'),
|
|||
|
|
'recall': recall_score(y_test, preds, average='weighted'),
|
|||
|
|
'f1': f1_score(y_test, preds, average='weighted')
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
results = {
|
|||
|
|
'mse': mean_squared_error(y_test, preds),
|
|||
|
|
'mae': mean_absolute_error(y_test, preds),
|
|||
|
|
'r2': r2_score(y_test, preds)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 返回最佳参数(如果使用GridSearchCV)
|
|||
|
|
if param_grid:
|
|||
|
|
results['best_params'] = self.model.best_params_
|
|||
|
|
|
|||
|
|
sorted_results = self.sort_metrics(results)
|
|||
|
|
return sorted_results
|
|||
|
|
|
|||
|
|
def save_model(self, filepath: str) -> None:
|
|||
|
|
"""
|
|||
|
|
保存模型到文件
|
|||
|
|
:param filepath: 文件路径
|
|||
|
|
"""
|
|||
|
|
if self.model is None:
|
|||
|
|
raise RuntimeError("没有训练好的模型可保存")
|
|||
|
|
with open(filepath, 'wb') as f:
|
|||
|
|
pickle.dump({
|
|||
|
|
'model': self.model,
|
|||
|
|
'algorithm': self.algorithm,
|
|||
|
|
'scaler': self.scaler,
|
|||
|
|
'label_encoders': self.label_encoders
|
|||
|
|
}, f)
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def load_model(cls, filepath: str) -> 'MLAnalyzer':
|
|||
|
|
"""
|
|||
|
|
从文件加载模型
|
|||
|
|
:param filepath: 文件路径
|
|||
|
|
:return: 加载的MLAnalyzer实例
|
|||
|
|
"""
|
|||
|
|
if not os.path.exists(filepath):
|
|||
|
|
raise FileNotFoundError(f"模型文件不存在: {filepath}")
|
|||
|
|
|
|||
|
|
with open(filepath, 'rb') as f:
|
|||
|
|
data = pickle.load(f)
|
|||
|
|
|
|||
|
|
analyzer = cls(data['algorithm'])
|
|||
|
|
analyzer.model = data['model']
|
|||
|
|
analyzer.scaler = data['scaler']
|
|||
|
|
analyzer.label_encoders = data['label_encoders']
|
|||
|
|
|
|||
|
|
return analyzer
|
|||
|
|
|
|||
|
|
def predict(self, X: pd.DataFrame, return_df: bool = False) -> Union[np.ndarray, pd.DataFrame]:
|
|||
|
|
"""
|
|||
|
|
预测新数据
|
|||
|
|
:param X: 输入特征
|
|||
|
|
:param return_df: 是否返回DataFrame
|
|||
|
|
:return: 预测结果(numpy数组或DataFrame)
|
|||
|
|
"""
|
|||
|
|
if self.model is None:
|
|||
|
|
raise RuntimeError("请先训练模型")
|
|||
|
|
X = self.preprocess_data(X)
|
|||
|
|
preds = self.model.predict(X)
|
|||
|
|
|
|||
|
|
if return_df:
|
|||
|
|
if isinstance(preds, np.ndarray) and preds.ndim == 1:
|
|||
|
|
return pd.DataFrame(preds, columns=['prediction'])
|
|||
|
|
elif isinstance(preds, np.ndarray) and preds.ndim == 2:
|
|||
|
|
return pd.DataFrame(preds,
|
|||
|
|
columns=[f'output_{i}' for i in range(preds.shape[1])])
|
|||
|
|
return preds
|
|||
|
|
|
|||
|
|
def get_feature_importance(self) -> pd.DataFrame:
|
|||
|
|
"""获取特征重要性"""
|
|||
|
|
if self.model is None:
|
|||
|
|
raise RuntimeError("请先训练模型")
|
|||
|
|
|
|||
|
|
# 处理随机森林/XGBoost等模型
|
|||
|
|
if hasattr(self.model, 'feature_importances_'):
|
|||
|
|
# 获取特征名称(兼容新旧版本scikit-learn)
|
|||
|
|
if hasattr(self.model, 'feature_names_in_'):
|
|||
|
|
feature_names = self.model.feature_names_in_
|
|||
|
|
elif hasattr(self.model, 'feature_importances_'):
|
|||
|
|
# 对于旧版本,使用X_train的列名(需要确保X_train是DataFrame)
|
|||
|
|
if hasattr(self.model, 'estimator') and hasattr(self.model.estimator, 'feature_names_in_'):
|
|||
|
|
feature_names = self.model.estimator.feature_names_in_
|
|||
|
|
else:
|
|||
|
|
# 如果无法获取特征名,使用通用名称
|
|||
|
|
feature_names = [f'feature_{i}' for i in range(len(self.model.feature_importances_))]
|
|||
|
|
|
|||
|
|
# 使用保存的实际特征名称或通用名称
|
|||
|
|
if self.feature_names_ and len(self.feature_names_) == len(self.model.feature_importances_):
|
|||
|
|
feature_names = self.feature_names_
|
|||
|
|
else:
|
|||
|
|
feature_names = [f'feature_{i}' for i in range(len(self.model.feature_importances_))]
|
|||
|
|
return pd.DataFrame({
|
|||
|
|
'feature': feature_names,
|
|||
|
|
'importance': self.model.feature_importances_
|
|||
|
|
}).sort_values('importance', ascending=False)
|
|||
|
|
|
|||
|
|
# 处理线性回归模型
|
|||
|
|
elif hasattr(self.model, 'coef_'):
|
|||
|
|
if isinstance(self.model.coef_, np.ndarray) and self.model.coef_.ndim == 1:
|
|||
|
|
# 单输出回归
|
|||
|
|
coef = np.abs(self.model.coef_)
|
|||
|
|
else:
|
|||
|
|
# 多输出回归
|
|||
|
|
coef = np.mean(np.abs(self.model.coef_), axis=0)
|
|||
|
|
|
|||
|
|
# 使用保存的特征名称或通用名称
|
|||
|
|
if hasattr(self, 'feature_names_') and len(self.feature_names_) == len(coef):
|
|||
|
|
feature_names = self.feature_names_
|
|||
|
|
else:
|
|||
|
|
feature_names = [f'feature_{i}' for i in range(len(coef))]
|
|||
|
|
|
|||
|
|
return pd.DataFrame({
|
|||
|
|
'feature': feature_names,
|
|||
|
|
'importance': coef
|
|||
|
|
}).sort_values('importance', ascending=False)
|
|||
|
|
|
|||
|
|
return pd.DataFrame()
|
|||
|
|
|
|||
|
|
def sort_metrics(self, metrics: Dict[str, float],
|
|||
|
|
ascending: bool = False) -> Dict[str, float]:
|
|||
|
|
"""
|
|||
|
|
对评估指标进行排序
|
|||
|
|
:param metrics: 评估指标字典
|
|||
|
|
:param ascending: 是否升序排列
|
|||
|
|
:return: 排序后的指标字典
|
|||
|
|
"""
|
|||
|
|
return dict(sorted(metrics.items(),
|
|||
|
|
key=lambda x: x[1],
|
|||
|
|
reverse=not ascending))
|
|||
|
|
|
|||
|
|
def evaluate(self, X: pd.DataFrame, y: pd.Series,
|
|||
|
|
average: str = 'weighted',
|
|||
|
|
multioutput: Union[str, List[float]] = 'uniform_average',
|
|||
|
|
sample_weight: Optional[np.ndarray] = None) -> Dict[str, float]:
|
|||
|
|
"""
|
|||
|
|
评估模型性能(支持多输出)
|
|||
|
|
:param X: 特征数据
|
|||
|
|
:param y: 目标数据
|
|||
|
|
:param average: 多分类评估方式('micro','macro','weighted')
|
|||
|
|
:param multioutput: 多输出评估方式('raw_values','uniform_average',权重数组)
|
|||
|
|
:param sample_weight: 样本权重
|
|||
|
|
:return: 评估指标字典
|
|||
|
|
"""
|
|||
|
|
if self.model is None:
|
|||
|
|
raise RuntimeError("请先训练模型")
|
|||
|
|
|
|||
|
|
preds = self.predict(X)
|
|||
|
|
y_arr = y.values if isinstance(y, pd.Series) else y
|
|||
|
|
results = {}
|
|||
|
|
|
|||
|
|
if self.algorithm.endswith('_clf'):
|
|||
|
|
# 分类任务评估
|
|||
|
|
if preds.ndim == 1 or y_arr.ndim == 1:
|
|||
|
|
# 单输出分类
|
|||
|
|
results.update({
|
|||
|
|
'accuracy': accuracy_score(y_arr, preds, sample_weight=sample_weight),
|
|||
|
|
'precision': precision_score(y_arr, preds, average=average, sample_weight=sample_weight),
|
|||
|
|
'recall': recall_score(y_arr, preds, average=average, sample_weight=sample_weight),
|
|||
|
|
'f1': f1_score(y_arr, preds, average=average, sample_weight=sample_weight)
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
# 多输出分类
|
|||
|
|
for i in range(preds.shape[1]):
|
|||
|
|
results[f'output_{i}_accuracy'] = accuracy_score(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
|
|||
|
|
results[f'output_{i}_precision'] = precision_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
|
|||
|
|
results[f'output_{i}_recall'] = recall_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
|
|||
|
|
results[f'output_{i}_f1'] = f1_score(y_arr[:,i], preds[:,i], average=average, sample_weight=sample_weight)
|
|||
|
|
|
|||
|
|
# 计算平均指标
|
|||
|
|
results['mean_accuracy'] = np.mean([v for k,v in results.items() if 'accuracy' in k])
|
|||
|
|
results['mean_precision'] = np.mean([v for k,v in results.items() if 'precision' in k])
|
|||
|
|
results['mean_recall'] = np.mean([v for k,v in results.items() if 'recall' in k])
|
|||
|
|
results['mean_f1'] = np.mean([v for k,v in results.items() if 'f1' in k])
|
|||
|
|
else:
|
|||
|
|
# 回归任务评估
|
|||
|
|
results.update({
|
|||
|
|
'mse': mean_squared_error(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight),
|
|||
|
|
'mae': mean_absolute_error(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight),
|
|||
|
|
'r2': r2_score(y_arr, preds, multioutput=multioutput, sample_weight=sample_weight)
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
if preds.ndim > 1 and multioutput == 'raw_values':
|
|||
|
|
# 多输出回归的详细指标
|
|||
|
|
for i in range(preds.shape[1]):
|
|||
|
|
results[f'output_{i}_mse'] = mean_squared_error(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
|
|||
|
|
results[f'output_{i}_mae'] = mean_absolute_error(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
|
|||
|
|
results[f'output_{i}_r2'] = r2_score(y_arr[:,i], preds[:,i], sample_weight=sample_weight)
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _get_column_names(self, y: Union[pd.Series, pd.DataFrame, np.ndarray]) -> List[str]:
|
|||
|
|
"""获取列名列表"""
|
|||
|
|
if isinstance(y, pd.Series):
|
|||
|
|
return [y.name] if y.name else ['target']
|
|||
|
|
elif isinstance(y, pd.DataFrame):
|
|||
|
|
return y.columns.tolist()
|
|||
|
|
else: # numpy array
|
|||
|
|
if y.ndim == 1:
|
|||
|
|
return ['target']
|
|||
|
|
return [f'target_{i+1}' for i in range(y.shape[1])]
|
|||
|
|
|
|||
|
|
def plot_predictions(self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame, np.ndarray],
|
|||
|
|
n_samples: int = 50, figsize: Union[tuple, list] = (12, 6),
|
|||
|
|
save_path: Optional[str] = None) -> None:
|
|||
|
|
"""
|
|||
|
|
绘制预测值与实际值的对比图
|
|||
|
|
:param X: 特征数据
|
|||
|
|
:param y: 实际值
|
|||
|
|
:param n_samples: 显示的样本数量(默认50)
|
|||
|
|
:param figsize: 图表大小(默认(12,6))
|
|||
|
|
:param save_path: 图片保存路径(如'plot.png'), None表示不保存
|
|||
|
|
"""
|
|||
|
|
# 验证figsize参数
|
|||
|
|
if not isinstance(figsize, (tuple, list)) or len(figsize) != 2:
|
|||
|
|
figsize = (12, 6)
|
|||
|
|
try:
|
|||
|
|
figsize = tuple(float(x) for x in figsize)
|
|||
|
|
except (TypeError, ValueError):
|
|||
|
|
figsize = (12, 6)
|
|||
|
|
"""
|
|||
|
|
绘制预测值与实际值的对比图
|
|||
|
|
:param X: 特征数据
|
|||
|
|
:param y: 实际值
|
|||
|
|
:param n_samples: 显示的样本数量
|
|||
|
|
:param figsize: 图表大小
|
|||
|
|
:param save_path: 图片保存路径(如'plot.png'), None表示不保存
|
|||
|
|
"""
|
|||
|
|
import matplotlib.pyplot as plt
|
|||
|
|
|
|||
|
|
if self.model is None:
|
|||
|
|
raise RuntimeError("请先训练模型")
|
|||
|
|
|
|||
|
|
# 获取预测结果
|
|||
|
|
predictions = self.predict(X)
|
|||
|
|
|
|||
|
|
# 获取列名
|
|||
|
|
col_names = self._get_column_names(y)
|
|||
|
|
|
|||
|
|
# 转换实际值为numpy数组
|
|||
|
|
if isinstance(y, (pd.Series, pd.DataFrame)):
|
|||
|
|
y_values = y.values
|
|||
|
|
else:
|
|||
|
|
y_values = y
|
|||
|
|
|
|||
|
|
# 创建图表
|
|||
|
|
plt.figure(figsize=figsize)
|
|||
|
|
|
|||
|
|
# 处理单输出情况
|
|||
|
|
if predictions.ndim == 1 or y_values.ndim == 1:
|
|||
|
|
plt.plot(y_values[:n_samples], label=col_names[0], marker='o', alpha=0.7)
|
|||
|
|
plt.plot(predictions[:n_samples], label=f'{col_names[0]}_predicted', marker='x', alpha=0.7)
|
|||
|
|
else:
|
|||
|
|
# 多输出情况
|
|||
|
|
for i in range(predictions.shape[1]):
|
|||
|
|
col_name = col_names[i] if i < len(col_names) else f'target_{i+1}'
|
|||
|
|
plt.plot(y_values[:n_samples, i], label=col_name, alpha=0.7)
|
|||
|
|
plt.plot(predictions[:n_samples, i], '--', label=f'{col_name}_predicted', alpha=0.7)
|
|||
|
|
|
|||
|
|
plt.title('predicted value and actual value')
|
|||
|
|
plt.xlabel('index of sample')
|
|||
|
|
plt.ylabel('value')
|
|||
|
|
plt.legend()
|
|||
|
|
plt.grid(True)
|
|||
|
|
plt.tight_layout()
|
|||
|
|
|
|||
|
|
# 保存图表
|
|||
|
|
if save_path:
|
|||
|
|
supported_formats = ['.png', '.jpg', '.jpeg', '.pdf', '.svg']
|
|||
|
|
if not any(save_path.lower().endswith(fmt) for fmt in supported_formats):
|
|||
|
|
save_path += '.png' # 默认png格式
|
|||
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
|||
|
|
print(f"图表已保存到 {save_path}")
|
|||
|
|
|
|||
|
|
#plt.show()
|
|||
|
|
|
|||
|
|
def save_prediction_results(self, X: pd.DataFrame, y: Union[pd.Series, pd.DataFrame, np.ndarray],
|
|||
|
|
filepath: str = 'prediction_results.csv') -> None:
|
|||
|
|
"""
|
|||
|
|
保存预测结果和实际值到CSV文件
|
|||
|
|
:param X: 特征数据
|
|||
|
|
:param y: 实际值(Series/DataFrame或numpy数组)
|
|||
|
|
:param filepath: 输出文件路径
|
|||
|
|
"""
|
|||
|
|
if self.model is None:
|
|||
|
|
raise RuntimeError("请先训练模型")
|
|||
|
|
|
|||
|
|
# 获取预测结果
|
|||
|
|
predictions = self.predict(X)
|
|||
|
|
|
|||
|
|
# 创建结果DataFrame
|
|||
|
|
results_df = pd.DataFrame()
|
|||
|
|
|
|||
|
|
# 处理实际值(支持单输出和多输出)
|
|||
|
|
if isinstance(y, (pd.Series, pd.DataFrame)):
|
|||
|
|
# 获取列名
|
|||
|
|
if isinstance(y, pd.Series):
|
|||
|
|
actual_cols = [y.name] if y.name else ['target']
|
|||
|
|
else:
|
|||
|
|
actual_cols = y.columns.tolist()
|
|||
|
|
|
|||
|
|
# 处理单列和多列情况
|
|||
|
|
if len(actual_cols) == 1:
|
|||
|
|
results_df[actual_cols[0]] = y.values.ravel()
|
|||
|
|
else:
|
|||
|
|
for i, col in enumerate(actual_cols):
|
|||
|
|
results_df[col] = y.iloc[:, i].values if isinstance(y, pd.DataFrame) else y[:, i]
|
|||
|
|
|
|||
|
|
elif isinstance(y, np.ndarray):
|
|||
|
|
if y.ndim == 1:
|
|||
|
|
results_df['target'] = y
|
|||
|
|
else:
|
|||
|
|
for i in range(y.shape[1]):
|
|||
|
|
results_df[f'target_{i+1}'] = y[:, i]
|
|||
|
|
|
|||
|
|
# 处理预测值(支持单输出和多输出)
|
|||
|
|
if isinstance(predictions, np.ndarray):
|
|||
|
|
if predictions.ndim == 1:
|
|||
|
|
col_name = actual_cols[0] if 'actual_cols' in locals() else 'target'
|
|||
|
|
results_df[f'{col_name}_predicted'] = predictions
|
|||
|
|
else:
|
|||
|
|
cols = actual_cols if 'actual_cols' in locals() else [f'target_{i+1}' for i in range(predictions.shape[1])]
|
|||
|
|
for i, col in enumerate(cols):
|
|||
|
|
results_df[f'{col}_predicted'] = predictions[:, i]
|
|||
|
|
|
|||
|
|
# 保存到CSV
|
|||
|
|
results_df.to_csv(filepath, index=False)
|
|||
|
|
print(f"\nThe prediction results have been saved to {filepath}")
|
|||
|
|
|
|||
|
|
from mlearn.DataFrameCleaner import DataFrameCleaner
|
|||
|
|
from mlearn.PandasDataIO import PandasDataIO
|
|||
|
|
from file.PathUtil import PathUtil
|
|||
|
|
# 使用示例
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
|
|||
|
|
path = PathUtil().getEnv('HOME') + '/data/'
|
|||
|
|
data = PandasDataIO().read_csv(path + 't_mode_pdo.csv')
|
|||
|
|
print("\n原始数据:")
|
|||
|
|
print(data)
|
|||
|
|
cleaned_df = (
|
|||
|
|
DataFrameCleaner(data)
|
|||
|
|
# .normalize_strings('name', case='title', strip=True)
|
|||
|
|
# .normalize_headers(case='lower')
|
|||
|
|
.convert_types({'STEELGRADE': 'category'}) # 转换数据类型
|
|||
|
|
#.encode_categorical(['STEELGRADE'], method='label', drop=True)
|
|||
|
|
).get_cleaned_data()
|
|||
|
|
print("\n清洗后数据:")
|
|||
|
|
print(cleaned_df)
|
|||
|
|
print(cleaned_df.dtypes)
|
|||
|
|
# data拆分成训练机测试集
|
|||
|
|
from sklearn.model_selection import train_test_split
|
|||
|
|
|
|||
|
|
# Split the data into training and testing sets
|
|||
|
|
train_df, test_df = train_test_split(
|
|||
|
|
cleaned_df, test_size=0.2, random_state=42)
|
|||
|
|
|
|||
|
|
# Use the training set for fitting the model and the test set for predictions
|
|||
|
|
# X_train = train_df[['TL_N1_INTERMESH_PV_AVE',
|
|||
|
|
# 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_ELONG_PV_AVE', 'STEELGRADE']]
|
|||
|
|
# y_train = train_df['TL_TEN_ENT_PV_AVE']
|
|||
|
|
|
|||
|
|
# X_test = test_df[['TL_N1_INTERMESH_PV_AVE',
|
|||
|
|
# 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE','TL_ELONG_PV_AVE', 'STEELGRADE']]
|
|||
|
|
# y_test = test_df['TL_TEN_ENT_PV_AVE']
|
|||
|
|
|
|||
|
|
#X_train = train_df.loc[:, train_df.columns != "TL_TEN_ENT_PV_AVE"]
|
|||
|
|
#y_train = train_df['TL_TEN_ENT_PV_AVE']
|
|||
|
|
|
|||
|
|
#X_test = test_df.loc[:, test_df.columns != "TL_TEN_ENT_PV_AVE"]
|
|||
|
|
#y_test = test_df['TL_TEN_ENT_PV_AVE']
|
|||
|
|
|
|||
|
|
X_train = train_df.drop(['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE'], axis=1)
|
|||
|
|
y_train = train_df[['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE']]
|
|||
|
|
|
|||
|
|
|
|||
|
|
X_test = test_df.drop(['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE'], axis=1)
|
|||
|
|
y_test = test_df[['TL_N1_INTERMESH_PV_AVE', 'TL_N2_INTERMESH_PV_AVE', 'TL_N3_INTERMESH_PV_AVE', 'TL_TEN_ENT_PV_AVE', 'TL_TEN_EXT_PV_AVE']]
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 回归分析
|
|||
|
|
print("=== 回归分析 ===")
|
|||
|
|
analyzer_reg = MLAnalyzer('random_forest_reg')
|
|||
|
|
|
|||
|
|
# analyzer_reg = MLAnalyzer('linear_regression', scaler_type='standard')
|
|||
|
|
# analyzer_reg = MLAnalyzer('xgboost_reg')
|
|||
|
|
# analyzer_reg = MLAnalyzer('mlp_regressor')
|
|||
|
|
# analyzer_reg = MLAnalyzer('logistic_regression')
|
|||
|
|
metrics = analyzer_reg.fit(X_train, y_train)
|
|||
|
|
print("模型性能:", metrics)
|
|||
|
|
# analyzer_reg.plot_metrics(metrics)
|
|||
|
|
print("特征重要性:")
|
|||
|
|
print(analyzer_reg.get_feature_importance())
|
|||
|
|
|
|||
|
|
# analyzer_reg.save_model('model.pkl')
|
|||
|
|
# analyzer_reg = MLAnalyzer.load_model('model.pkl')
|
|||
|
|
print(X_test[10:12])
|
|||
|
|
# 模型预测
|
|||
|
|
predictions = analyzer_reg.predict(X_test)
|
|||
|
|
print("\n预测结果(前5个):")
|
|||
|
|
print(predictions[:5])
|
|||
|
|
|
|||
|
|
analyzer_reg.save_prediction_results( X_test, y_test, path + 'prediction_results.csv')
|
|||
|
|
analyzer_reg.plot_predictions(X_test, y_test, 60, save_path = path + 'prediction_results.png')
|
|||
|
|
|
|||
|
|
# 模型评估
|
|||
|
|
print("\n=== 模型评估 ===")
|
|||
|
|
test_metrics = analyzer_reg.evaluate(X_test, y_test)
|
|||
|
|
print("测试集评估指标:")
|
|||
|
|
print(test_metrics)
|
|||
|
|
|
|||
|
|
|