eis/py/comlib/mlearn/PandasDataIO.py

350 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Pandas数据文件读写工具模块
该模块提供基于pandas的数据文件读写功能支持多种文件格式CSV、Excel、JSON
主要类PandasDataIO封装了常用的数据读取和写入方法并提供了灵活的参数配置选项。
Author:
-Author : zoufuzhou
-Date : 2025-05-21 16:34:37
-LastEditTime : 2025-05-21 16:34:37
"""
import pandas as pd
import os
from typing import Union, Optional, Literal, Generator, Dict, List
class PandasDataIO:
"""基于pandas的数据文件读写工具类(支持CSV/Excel/JSON)"""
def __init__(self, index_col: Optional[Union[str, int]] = None,
encoding: str = 'utf-8', na_values: Optional[list] = None):
"""
初始化数据IO处理器
:param index_col: 用作行索引的列
:param encoding: 文件编码默认为utf-8
:param na_values: 识别为NA/NaN的字符串列表
"""
self.default_read_params = {
'index_col': index_col,
'encoding': encoding,
'na_values': na_values
}
self.default_write_params = {
'index': False,
'encoding': encoding
}
def read_csv(self, file_path: str, **kwargs) -> Union[pd.DataFrame, Generator[pd.DataFrame, None, None]]:
"""
读取CSV文件(支持分块模式)
:param file_path: CSV文件路径
:param kwargs: 传递给pandas.read_csv的额外参数
:return: 单个DataFrame或生成器(分块模式)
"""
try:
params = {**self.default_read_params, **kwargs}
if 'chunksize' in kwargs:
return pd.read_csv(file_path, **params)
return pd.read_csv(file_path, **params)
except FileNotFoundError:
raise FileNotFoundError(f"文件 {file_path} 不存在")
except Exception as e:
raise Exception(f"读取CSV文件时出错: {str(e)}")
def read_csv_chunks(self, file_path: str, chunksize: int = 10000, **kwargs) -> Generator[pd.DataFrame, None, None]:
"""
分块读取CSV文件
:param file_path: CSV文件路径
:param chunksize: 每块行数
:param kwargs: 传递给pandas.read_csv的额外参数
:return: DataFrame生成器
"""
try:
params = {**self.default_read_params, 'chunksize': chunksize, **kwargs}
return pd.read_csv(file_path, **params)
except FileNotFoundError:
raise FileNotFoundError(f"文件 {file_path} 不存在")
except Exception as e:
raise Exception(f"分块读取CSV文件时出错: {str(e)}")
def read_excel_chunks(self, file_path: str, chunksize: int = 10000, **kwargs) -> Generator[pd.DataFrame, None, None]:
"""
分块读取Excel文件(模拟实现)
:param file_path: Excel文件路径
:param chunksize: 每块行数
:param kwargs: 额外参数
:return: DataFrame生成器
"""
try:
# 读取整个文件
df = self.read_excel(file_path, **kwargs)
# 模拟分块
for i in range(0, len(df), chunksize):
yield df.iloc[i:i + chunksize]
except Exception as e:
raise Exception(f"分块读取Excel文件时出错: {str(e)}")
def validate_data(self, df: pd.DataFrame,
rules: Dict[str, List[str]] = None,
check_na: bool = True) -> Dict[str, List[str]]:
"""
数据验证
:param df: 要验证的DataFrame
:param rules: 验证规则 {列名: [数据类型]}
:param check_na: 是否检查缺失值
:return: 错误信息字典 {列名: [错误信息]}
"""
errors = {}
# 检查缺失值
if check_na:
na_cols = df.columns[df.isna().any()].tolist()
if na_cols:
errors['_na'] = [f"缺失值存在于列: {', '.join(na_cols)}"]
# 检查数据类型
if rules:
for col, types in rules.items():
if col in df.columns:
col_type = str(df[col].dtype)
if not any(t in col_type for t in types):
if col not in errors:
errors[col] = []
errors[col].append(f"数据类型不符: 期望{types}, 实际{col_type}")
return errors
def read_excel(self, file_path: str, sheet_name: Union[str, int, list, None] = 0, **kwargs) -> pd.DataFrame:
"""
读取Excel文件
:param file_path: Excel文件路径
:param sheet_name: 工作表名称或索引默认为0(第一个工作表)
:param kwargs: 传递给pandas.read_excel的额外参数
:return: 包含Excel数据的DataFrame
"""
try:
# 复制默认参数但排除encoding
params = {k:v for k,v in self.default_read_params.items() if k != 'encoding'}
params.update(kwargs)
return pd.read_excel(file_path, sheet_name=sheet_name, **params)
except FileNotFoundError:
raise FileNotFoundError(f"文件 {file_path} 不存在")
except Exception as e:
raise Exception(f"读取Excel文件时出错: {str(e)}")
def read_json(self, file_path: str, orient: str = 'records', **kwargs) -> pd.DataFrame:
"""
读取JSON文件
:param file_path: JSON文件路径
:param orient: JSON格式取向('records', 'columns', 'index'等)
:param kwargs: 传递给pandas.read_json的额外参数
:return: 包含JSON数据的DataFrame
"""
try:
params = {'orient': orient, **kwargs}
return pd.read_json(file_path, **params)
except FileNotFoundError:
raise FileNotFoundError(f"文件 {file_path} 不存在")
except Exception as e:
raise Exception(f"读取JSON文件时出错: {str(e)}")
def read_file(self, file_path: str, file_type: Literal['auto', 'csv', 'excel', 'json'] = 'auto', **kwargs) -> pd.DataFrame:
"""
通用文件读取方法,自动识别文件类型
:param file_path: 文件路径
:param file_type: 文件类型('auto'自动识别, 'csv', 'excel''json')
:param kwargs: 传递给对应读取方法的额外参数
:return: 包含文件数据的DataFrame
"""
if file_type == 'auto':
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
return self.read_csv(file_path, **kwargs)
elif ext in ('.xls', '.xlsx', '.xlsm', '.xlsb'):
return self.read_excel(file_path, **kwargs)
elif ext == '.json':
return self.read_json(file_path, **kwargs)
else:
raise ValueError(f"不支持的文件类型: {ext}")
elif file_type == 'csv':
return self.read_csv(file_path, **kwargs)
elif file_type == 'excel':
return self.read_excel(file_path, **kwargs)
elif file_type == 'json':
return self.read_json(file_path, **kwargs)
else:
raise ValueError(f"无效的文件类型参数: {file_type}")
def write_json(self, file_path: str, data: Union[pd.DataFrame, dict, list],
orient: str = 'records', **kwargs) -> None:
"""
写入JSON文件
:param file_path: JSON文件路径
:param data: 要写入的数据可以是DataFrame、字典或列表
:param orient: JSON格式取向('records', 'columns', 'index'等)
:param kwargs: 传递给DataFrame.to_json的额外参数
"""
try:
if not isinstance(data, pd.DataFrame):
data = pd.DataFrame(data)
params = {'orient': orient, **kwargs}
data.to_json(file_path, **params)
except Exception as e:
raise Exception(f"写入JSON文件时出错: {str(e)}")
def write_excel(self, file_path: str, data: Union[pd.DataFrame, dict, list],
sheet_name: str = 'Sheet1', **kwargs) -> None:
"""
写入Excel文件
:param file_path: Excel文件路径
:param data: 要写入的数据可以是DataFrame、字典或列表
:param sheet_name: 工作表名称,默认为'Sheet1'
:param kwargs: 传递给DataFrame.to_excel的额外参数
"""
try:
if not isinstance(data, pd.DataFrame):
data = pd.DataFrame(data)
params = {'sheet_name': sheet_name, **kwargs}
data.to_excel(file_path, **params)
except Exception as e:
raise Exception(f"写入Excel文件时出错: {str(e)}")
def write_file(self, file_path: str, data: Union[pd.DataFrame, dict, list],
file_type: Literal['auto', 'csv', 'excel', 'json'] = 'auto', **kwargs) -> None:
"""
通用文件写入方法,自动识别文件类型
:param file_path: 文件路径
:param data: 要写入的数据
:param file_type: 文件类型('auto'自动识别, 'csv', 'excel''json')
:param kwargs: 传递给对应写入方法的额外参数
"""
if file_type == 'auto':
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
self.write_csv(file_path, data, **kwargs)
elif ext in ('.xls', '.xlsx', '.xlsm', '.xlsb'):
self.write_excel(file_path, data, **kwargs)
elif ext == '.json':
self.write_json(file_path, data, **kwargs)
else:
raise ValueError(f"不支持的文件类型: {ext}")
elif file_type == 'csv':
self.write_csv(file_path, data, **kwargs)
elif file_type == 'excel':
self.write_excel(file_path, data, **kwargs)
elif file_type == 'json':
self.write_json(file_path, data, **kwargs)
else:
raise ValueError(f"无效的文件类型参数: {file_type}")
def write_csv(self, file_path: str, data: Union[pd.DataFrame, dict, list],
**kwargs) -> None:
"""
写入CSV文件
:param file_path: CSV文件路径
:param data: 要写入的数据可以是DataFrame、字典或列表
:param kwargs: 传递给DataFrame.to_csv的额外参数
"""
try:
if not isinstance(data, pd.DataFrame):
data = pd.DataFrame(data)
params = {**self.default_write_params, **kwargs}
data.to_csv(file_path, **params)
except Exception as e:
raise Exception(f"写入CSV文件时出错: {str(e)}")
# 使用示例
#from file.PathUtil import PathUtil
#if __name__ == "__main__":
# io = PandasDataIO()
#
# # 示例数据
# sample_data = [
# {"name": "Alice", "age": 25, "city": "New York"},
# {"name": "Bob", "age": 30, "city": "London"}
# ]
#
# path = PathUtil().getEnv("HOME") + "/data/";
# # CSV操作
# print("\n=== CSV操作 ===")
# io.write_file(path + "example.csv", sample_data) # 自动识别类型
# print("CSV读取结果:\n", io.read_file(path + "example.csv").head())
#
# # Excel操作
# print("\n=== Excel操作 ===")
# try:
# # 测试写入和读取
# io.write_file(path + "example.xlsx", sample_data) # 自动识别类型
# excel_data = io.read_file("example.xlsx")
# print("Excel读取结果:\n", excel_data.head())
#
# # 测试带sheet名的写入和读取
# io.write_excel(path + "example_sheet.xlsx", sample_data, sheet_name="Employees")
# sheet_data = io.read_excel("example_sheet.xlsx", sheet_name="Employees")
# print("\n自定义sheet读取结果:\n", sheet_data.head())
#
# # 测试参数传递
# print("\n带参数读取测试:")
# param_data = io.read_excel(path + "example.xlsx", header=0, skiprows=0)
# print(param_data.head())
#
# except ImportError:
# print("Excel功能需要安装openpyxl: pip install openpyxl")
# except Exception as e:
# print(f"Excel操作出错: {str(e)}")
#
# # JSON操作
# print("\n=== JSON操作 ===")
# io.write_file(path + "example.json", sample_data) # 自动识别类型
# print("JSON读取结果(records格式):\n", io.read_file(path + "example.json").head())
#
# # 自定义JSON格式
# io.write_json(path + "example_columns.json", sample_data, orient="columns")
# print("JSON读取结果(columns格式):\n", pd.read_json(path + "example_columns.json"))
#
# # 分块读取演示
# print("\n=== 分块读取演示 ===")
# try:
# # 创建大CSV文件用于测试
# large_data = pd.concat([pd.DataFrame(sample_data)] * 1000)
# io.write_csv(path + "large_data.csv", large_data)
#
# print("分块读取CSV文件:")
# chunk_count = 0
# for chunk in io.read_csv_chunks(path + "large_data.csv", chunksize=500):
# chunk_count += 1
# print(f"处理第{chunk_count}块, 行数: {len(chunk)}")
# print(f"共处理{chunk_count}块数据")
# except Exception as e:
# print(f"分块读取失败: {str(e)}")
#
# # 数据验证演示
# print("\n=== 数据验证演示 ===")
# test_data = [
# {"name": "Alice", "age": 25, "city": "New York"},
# {"name": "Bob", "age": "thirty", "city": None}, # 故意制造错误
# {"name": "Charlie", "age": 35, "city": "London"}
# ]
# test_df = pd.DataFrame(test_data)
#
# # 定义验证规则
# rules = {
# "age": ["int"],
# "city": ["object"]
# }
#
# errors = io.validate_data(test_df, rules=rules, check_na=True)
# if errors:
# print("数据验证发现错误:")
# for col, msgs in errors.items():
# print(f"{col}:")
# for msg in msgs:
# print(f" - {msg}")
# else:
# print("数据验证通过")