1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
| import pandas as pd import numpy as np from typing import List, Dict, Any import re
class DataProcessor: """数据处理器with异常处理""" def __init__(self, logger: RPALogger): self.logger = logger self.validation_rules = {} def validate_excel_data(self, file_path: str) -> Dict[str, Any]: """验证Excel数据的完整性和格式""" validation_result = { 'is_valid': True, 'errors': [], 'warnings': [], 'data_summary': {} } try: df = pd.read_excel(file_path) if df.empty: validation_result['is_valid'] = False validation_result['errors'].append("Excel文件为空") return validation_result validation_result['data_summary'] = { 'total_rows': len(df), 'total_columns': len(df.columns), 'columns': list(df.columns), 'null_counts': df.isnull().sum().to_dict() } required_columns = ['日期', '金额', '科目', '备注'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: validation_result['is_valid'] = False validation_result['errors'].append(f"缺少必需列: {missing_columns}") self._validate_data_types(df, validation_result) self._validate_data_ranges(df, validation_result) duplicates = df.duplicated().sum() if duplicates > 0: validation_result['warnings'].append(f"发现 {duplicates} 行重复数据") self.logger.log_step( "数据验证", "SUCCESS" if validation_result['is_valid'] else "FAILED", validation_result['data_summary'] ) except Exception as e: validation_result['is_valid'] = False validation_result['errors'].append(f"读取文件异常: {str(e)}") self.logger.log_exception("数据验证", e) return validation_result def _validate_data_types(self, df: pd.DataFrame, result: Dict[str, Any]): """验证数据类型""" try: if '日期' in df.columns: date_errors = [] for idx, date_val in enumerate(df['日期']): if pd.isna(date_val): date_errors.append(f"第{idx+1}行日期为空") elif not self._is_valid_date(str(date_val)): date_errors.append(f"第{idx+1}行日期格式错误: {date_val}") if date_errors: result['errors'].extend(date_errors[:5]) if len(date_errors) > 5: result['errors'].append(f"还有 {len(date_errors)-5} 个日期格式错误") if '金额' in df.columns: amount_errors = [] for idx, amount in enumerate(df['金额']): if pd.isna(amount): amount_errors.append(f"第{idx+1}行金额为空") elif not isinstance(amount, (int, float)) and not str(amount).replace('.', '').replace('-', '').isdigit(): amount_errors.append(f"第{idx+1}行金额格式错误: {amount}") if amount_errors: result['errors'].extend(amount_errors[:5]) if len(amount_errors) > 5: result['errors'].append(f"还有 {len(amount_errors)-5} 个金额格式错误") except Exception as e: result['errors'].append(f"数据类型验证异常: {str(e)}") def _validate_data_ranges(self, df: pd.DataFrame, result: Dict[str, Any]): """验证数据范围""" try: if '金额' in df.columns: numeric_amounts = pd.to_numeric(df['金额'], errors='coerce') large_amounts = numeric_amounts[numeric_amounts.abs() > 1000000] if not large_amounts.empty: result['warnings'].append(f"发现 {len(large_amounts)} 笔大额交易(>100万)") zero_amounts = numeric_amounts[numeric_amounts == 0] if not zero_amounts.empty: result['warnings'].append(f"发现 {len(zero_amounts)} 笔零金额交易") except Exception as e: result['errors'].append(f"数据范围验证异常: {str(e)}") def _is_valid_date(self, date_str: str) -> bool: """验证日期格式""" date_patterns = [ r'\d{4}-\d{2}-\d{2}', r'\d{4}/\d{2}/\d{2}', r'\d{2}/\d{2}/\d{4}', r'\d{4}年\d{2}月\d{2}日' ] for pattern in date_patterns: if re.match(pattern, date_str): return True return False def clean_and_transform_data(self, df: pd.DataFrame) -> pd.DataFrame: """数据清洗和转换""" try: self.logger.log_step("数据清洗", "STARTING", {"original_rows": len(df)}) df_cleaned = df.dropna(how='all') if '日期' in df_cleaned.columns: df_cleaned['日期'] = pd.to_datetime(df_cleaned['日期'], errors='coerce') if '金额' in df_cleaned.columns: df_cleaned['金额'] = pd.to_numeric(df_cleaned['金额'], errors='coerce') text_columns = ['科目', '备注'] for col in text_columns: if col in df_cleaned.columns: df_cleaned[col] = df_cleaned[col].astype(str).str.strip() self.logger.log_step( "数据清洗", "SUCCESS", { "cleaned_rows": len(df_cleaned), "removed_rows": len(df) - len(df_cleaned) } ) return df_cleaned except Exception as e: self.logger.log_exception("数据清洗", e) raise
|