🏭 那些年,我们被Excel坑过的岁月
说真的,我在工厂车间里做数据系统的时候,见过太多这种场景了——
一台老旧的工控机,桌面上摆着十几个Excel文件,文件名叫"设备数据_最终版_v3_真的最终版.xlsx"。每次要查历史数据,就得打开七八个表格,手动复制粘贴,搞个把小时才能出一份报表。更要命的是,有时候数据还对不上,因为两个班次的操作员各自维护了一份,格式还不一样。
这不是个例。制造业里,Excel作为"数据库"使用的现象极其普遍。它轻便、直观,入门门槛低,但一旦数据量上去了、多人协作了、需要实时查询了——它的局限性就暴露得一干二净。
今天咱们就聊一件很多工程师都绕不开的事:怎么用Python把Excel里的历史数据,优雅地迁移到SQLite或MySQL里,同时还得保证数据不丢、格式不乱、迁移过程可追溯。
🔍 先把问题摸透,再动手写代码
在我接触过的工业数据迁移项目里,有三类问题反复出现,踩坑率极高。
第一类:数据格式的混乱程度超出想象。 同一列"温度"字段,有的行写的是85.3,有的写85.3℃,有的写约85度,甚至还有--表示传感器离线。这种"人工智能"录入方式,直接导致数值列无法直接入库。
第二类:时间戳格式五花八门。2023/8/5、2023-08-05、8月5日 14:30……同一个Excel文件里可能混用三种格式,pandas读进来直接变成object类型,后续时序查询全部废掉。
第三类:多Sheet、多文件的数据孤岛。 按月份拆分的Excel,每个文件有12个Sheet,字段名还不完全一致(有的叫"压力值",有的叫"压力",有的叫"P_value")。合并之前必须做字段映射,否则入库之后数据根本没法用。
搞清楚这三类问题,咱们的迁移方案就有了清晰的骨架。
🛠️ 环境准备,先把工具备齐
bash1pip install pandas openpyxl sqlalchemy pymysql tqdm这几个包各有分工:pandas 负责读取和清洗Excel,openpyxl 是pandas读取.xlsx的底层引擎,sqlalchemy 提供统一的数据库抽象层(SQLite和MySQL都能用),pymysql 是MySQL的Python驱动,tqdm 用来显示迁移进度条——数据量大的时候,没有进度条真的会让人抓狂。
📦 第一步:构建通用的Excel读取器
先写一个能处理"脏数据"的读取模块,这是整个迁移流程的地基。
python1import pandas as pd2import re3import logging4from pathlib import Path5from typing import Optional, Dict, List, Union, Tuple6from datetime import datetime78# 配置日志9logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')10logger = logging.getLogger(__name__)111213class ExcelDataError(Exception):14"""自定义异常类,用于Excel数据处理错误"""15pass161718class ExcelReader:19"""20 工业数据Excel读取器21 专门处理格式混乱的历史数据文件,支持多种数据清洗和验证功能22 """ # 字段名映射表——统一不同时期的命名习惯23FIELD_MAPPING = {24# 压力相关字段25"压力值": "pressure",26"压力": "pressure",27"P_value": "pressure",28"Pressure": "pressure",29"press": "pressure",30"压力(MPa)": "pressure",31"压力/MPa": "pressure",3233# 温度相关字段34"温度值": "temperature",35"温度": "temperature",36"T_value": "temperature",37"Temperature": "temperature",38"temp": "temperature",39"温度(℃)": "temperature",40"温度/℃": "temperature",4142# 时间相关字段43"时间": "record_time",44"记录时间": "record_time",45"timestamp": "record_time",46"Timestamp": "record_time",47"采集时间": "record_time",48"日期": "record_time",49"Date": "record_time",50"DateTime": "record_time",5152# 其他可能的工业数据字段53"流量": "flow_rate",54"Flow": "flow_rate",55"湿度": "humidity",56"Humidity": "humidity",57"设备状态": "device_status",58"Status": "device_status"59 }6061# 数据质量阈值配置62QUALITY_THRESHOLDS = {63"pressure": {"min": 0, "max": 1000}, # MPa64"temperature": {"min": -50, "max": 200}, # ℃65"flow_rate": {"min": 0, "max": 10000}, # 根据实际情况调整66"humidity": {"min": 0, "max": 100} # %67 }6869def __init__(self, file_path: Union[str, Path], sheet_name: Union[str, int] = 0,70 encoding: str = 'utf-8', skip_rows: int = 0):71"""72初始化Excel读取器73 Args: file_path: Excel文件路径74 sheet_name: 工作表名称或索引75 encoding: 文件编码76 skip_rows: 跳过的行数(用于处理表头前的说明文字)77 """ self.file_path = Path(file_path)78 self.sheet_name = sheet_name79 self.encoding = encoding80 self.skip_rows = skip_rows81 self.quality_report = {}8283# 验证文件存在性84if not self.file_path.exists():85raise FileNotFoundError(f"Excel文件不存在: {self.file_path}")8687# 验证文件格式88if self.file_path.suffix.lower() not in ['.xlsx', '.xls']:89raise ExcelDataError(f"不支持的文件格式: {self.file_path.suffix}")9091def read(self, validate_data: bool = True,92 remove_duplicates: bool = True) -> pd.DataFrame:93"""94 读取并处理Excel数据9596 Args: validate_data: 是否进行数据质量验证97 remove_duplicates: 是否移除重复行9899 Returns: 清洗后的DataFrame100 """ try:101 logger.info(f"开始读取Excel文件: {self.file_path}")102103# 读取原始数据104 df = self._read_raw_data()105 original_rows = len(df)106 logger.info(f"原始数据行数: {original_rows}")107108# 数据处理流程109 df = self._normalize_columns(df)110 df = self._clean_numeric_fields(df)111 df = self._parse_datetime(df)112 df = self._drop_invalid_rows(df)113114if remove_duplicates:115 df = self._remove_duplicates(df)116117if validate_data:118 df = self._validate_data_quality(df)119120# 按时间排序121if "record_time" in df.columns:122 df = df.sort_values("record_time").reset_index(drop=True)123124 processed_rows = len(df)125 logger.info(f"处理完成,有效数据行数: {processed_rows} "126f"(数据保留率: {processed_rows / original_rows * 100:.1f}%)")127128# 生成质量报告129 self._generate_quality_report(df, original_rows)130131return df132133except Exception as e:134 logger.error(f"Excel读取失败: {str(e)}")135raise ExcelDataError(f"Excel文件处理失败: {str(e)}")136137def _read_raw_data(self) -> pd.DataFrame:138"""读取原始Excel数据"""139try:140# 尝试读取指定工作表141 df = pd.read_excel(142 self.file_path,143 sheet_name=self.sheet_name,144 engine="openpyxl",145 skiprows=self.skip_rows,146 na_values=['', ' ', 'N/A', 'null', 'NULL', '--', '——', '无数据']147 ) # 检查是否为空文件148if df.empty:149raise ExcelDataError("Excel文件为空或无有效数据")150151return df152153except Exception as e:154# 如果指定工作表读取失败,尝试读取第一个工作表155if isinstance(self.sheet_name, str):156 logger.warning(f"工作表 '{self.sheet_name}' 不存在,尝试读取第一个工作表")157return pd.read_excel(self.file_path, sheet_name=0, engine="openpyxl")158else:159raise ExcelDataError(f"无法读取Excel文件: {str(e)}")160161def _normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame:162"""163 统一列名处理164 1. 去除前后空格和特殊字符165 2. 使用映射表统一字段名166 3. 处理重复列名167 """ # 清理列名168 df.columns = [str(c).strip().replace('\n', '').replace('\r', '')169for c in df.columns]170171# 处理重复列名172 cols = pd.Series(df.columns)173for dup in cols[cols.duplicated()].unique():174 cols[cols[cols == dup].index.values.tolist()] = [175f"{dup}_{i}" if i != 0 else dup176for i in range(sum(cols == dup))177 ] df.columns = cols178179# 应用字段映射180 df.rename(columns=self.FIELD_MAPPING, inplace=True)181182 logger.info(f"列名标准化完成,字段: {list(df.columns)}")183return df184185def _clean_numeric_fields(self, df: pd.DataFrame) -> pd.DataFrame:186"""187 清洗数值字段188 处理各种异常情况:单位符号、异常标记、科学计数法等189 """ # 识别可能的数值字段190 numeric_candidates = [col for col in df.columns191if col in ['pressure', 'temperature', 'flow_rate', 'humidity']]192193for col in numeric_candidates:194if col not in df.columns:195continue196197 logger.info(f"清洗数值字段: {col}")198 original_valid = df[col].notna().sum()199200 df[col] = df[col].apply(self._extract_number)201202 cleaned_valid = df[col].notna().sum()203 logger.info(f" {col}: {original_valid} → {cleaned_valid} 有效值")204205return df206207@staticmethod208def _extract_number(val) -> Optional[float]:209"""210 从混合格式字符串中提取数值211 支持格式:'85.3℃', '1.2MPa', '1.23e-4', '--', 'N/A' 等212 """ if pd.isna(val):213return None214215 s = str(val).strip()216217# 处理明确的无效值218if s.lower() in ['--', '——', 'n/a', 'null', '', '无数据', '异常']:219return None220221try:222# 直接转换数值223return float(s)224except ValueError:225pass226227# 使用正则表达式提取数值(支持科学计数法)228 patterns = [229r'-?\d+\.?\d*[eE][-+]?\d+', # 科学计数法230r'-?\d+\.\d+', # 小数231r'-?\d+' # 整数232 ]233234for pattern in patterns:235 match = re.search(pattern, s)236if match:237try:238return float(match.group())239except ValueError:240continue241242return None243def _parse_datetime(self, df: pd.DataFrame) -> pd.DataFrame:244"""245 解析时间字段,支持多种时间格式246 """ if "record_time" not in df.columns:247 logger.warning("未找到时间字段")248return df249250 logger.info("解析时间字段")251 original_valid = df["record_time"].notna().sum()252253try:254 df["record_time"] = pd.to_datetime(255 df["record_time"],256 errors="coerce",257 format='mixed'258 )259except TypeError:260 df["record_time"] = pd.to_datetime(261 df["record_time"],262 errors="coerce"263 )264265 parsed_valid = df["record_time"].notna().sum()266 logger.info(f"时间解析完成: {original_valid} → {parsed_valid} 有效值")267268return df269270def _drop_invalid_rows(self, df: pd.DataFrame) -> pd.DataFrame:271"""删除无效行"""272 original_rows = len(df)273274# 删除全空行275 df = df.dropna(how='all')276277# 删除时间戳无效的行278if "record_time" in df.columns:279 invalid_time = df["record_time"].isna().sum()280if invalid_time > 0:281 logger.warning(f"删除 {invalid_time} 行无效时间数据")282 df = df.dropna(subset=["record_time"])283284 final_rows = len(df)285 logger.info(f"无效行清理完成: {original_rows} → {final_rows}")286287return df.reset_index(drop=True)288289def _remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:290"""移除重复数据"""291 original_rows = len(df)292293# 基于时间戳去重(保留最后一条记录)294if "record_time" in df.columns:295 df = df.drop_duplicates(subset=["record_time"], keep="last")296else:297 df = df.drop_duplicates()298299 final_rows = len(df)300if original_rows > final_rows:301 logger.info(f"移除重复数据: {original_rows - final_rows} 行")302303return df.reset_index(drop=True)304305def _validate_data_quality(self, df: pd.DataFrame) -> pd.DataFrame:306"""数据质量验证和异常值处理"""307for col, thresholds in self.QUALITY_THRESHOLDS.items():308if col not in df.columns:309continue310311# 统计异常值312 valid_mask = (df[col] >= thresholds["min"]) & (df[col] <= thresholds["max"])313 outliers = (~valid_mask & df[col].notna()).sum()314315if outliers > 0:316 logger.warning(f"{col} 发现 {outliers} 个异常值 "317f"(范围: {thresholds['min']}-{thresholds['max']})")318319# 可选:将异常值设为NaN而不是删除整行320 df.loc[~valid_mask, col] = None321322return df323324def _generate_quality_report(self, df: pd.DataFrame, original_rows: int):325"""生成数据质量报告"""326 self.quality_report = {327"file_info": {328"file_path": str(self.file_path),329"processed_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")330 },331"data_summary": {332"original_rows": original_rows,333"final_rows": len(df),334"retention_rate": len(df) / original_rows if original_rows > 0 else 0,335"columns": list(df.columns)336 },337"field_statistics": {}338 }339340# 各字段统计341for col in df.columns:342if df[col].dtype in ['float64', 'int64']:343 self.quality_report["field_statistics"][col] = {344"valid_count": df[col].notna().sum(),345"missing_count": df[col].isna().sum(),346"min": float(df[col].min()) if df[col].notna().any() else None,347"max": float(df[col].max()) if df[col].notna().any() else None,348"mean": float(df[col].mean()) if df[col].notna().any() else None349 }350351def get_quality_report(self) -> Dict:352"""获取数据质量报告"""353return self.quality_report354355def print_quality_report(self):356"""打印数据质量报告"""357if not self.quality_report:358print("尚未生成质量报告,请先调用 read() 方法")359return360361 report = self.quality_report362print("\n" + "=" * 50)363print("📊 数据质量报告")364print("=" * 50)365366# 文件信息367print(f"文件路径: {report['file_info']['file_path']}")368print(f"处理时间: {report['file_info']['processed_time']}")369370# 数据概览371 summary = report['data_summary']372print(f"\n原始行数: {summary['original_rows']}")373print(f"有效行数: {summary['final_rows']}")374print(f"数据保留率: {summary['retention_rate']:.1%}")375print(f"字段列表: {', '.join(summary['columns'])}")376377# 字段统计378if report['field_statistics']:379print("\n字段统计:")380for field, stats in report['field_statistics'].items():381print(f" {field}:")382print(f" 有效值: {stats['valid_count']} | " f"缺失值: {stats['missing_count']}")383if stats['min'] is not None:384print(f" 范围: {stats['min']:.3f} ~ {stats['max']:.3f} | " f"均值: {stats['mean']:.3f}")385386print("=" * 50)387388389def create_sample_data(filename: str = "industrial_data.xlsx"):390"""391 创建示例数据文件用于测试392 """ import numpy as np393from datetime import datetime, timedelta394395# 生成示例数据396 np.random.seed(42)397 n_records = 1000398399# 时间序列400 start_time = datetime(2024, 1, 1, 0, 0, 0)401 time_series = [start_time + timedelta(minutes=i * 5) for i in range(n_records)]402403# 模拟工业数据404 data = {405'时间': time_series,406'压力(MPa)': np.random.normal(5.0, 0.5, n_records),407'温度(℃)': np.random.normal(85.0, 10.0, n_records),408'Flow': np.random.normal(100.0, 15.0, n_records),409'Status': ['正常'] * (n_records - 50) + ['异常'] * 50410 }411412# 添加一些异常值和缺失值413 data['压力(MPa)'][50:60] = ['--'] * 10 # 缺失值414 data['温度(℃)'][100:105] = [300, -100, 500, 'N/A', '异常'] # 异常值415416 df = pd.DataFrame(data)417 df.to_excel(filename, index=False)418print(f"示例数据已创建: {filename}")419420return df421422423def main():424"""使用示例"""425try:426# 如果测试文件不存在,创建示例数据427 test_file = "industrial_data.xlsx"428if not Path(test_file).exists():429print("创建示例数据文件...")430create_sample_data(test_file)431432# 创建读取器实例433 reader = ExcelReader(434 file_path=test_file,435 sheet_name=0,436 skip_rows=0437 )438439# 读取和处理数据440 df = reader.read(441 validate_data=True,442 remove_duplicates=True443 )444445print(f"\n处理结果:")446print(f"数据形状: {df.shape}")447print(f"字段类型:\n{df.dtypes}")448print(f"\n前5行数据:")449print(df.head())450451# 打印质量报告452 reader.print_quality_report()453454# 保存清洗后的数据455 output_path = "cleaned_industrial_data.xlsx"456 df.to_excel(output_path, index=False)457print(f"\n清洗后数据已保存至: {output_path}")458459except Exception as e:460 logger.error(f"处理失败: {str(e)}")461462463if __name__ == "__main__":464main()
这个读取器干了几件关键的事:字段名归一化、数值清洗(用正则提取数字部分)、时间解析容错处理。实际项目里,你可以根据自己的字段情况扩展FIELD_MAPPING,这个设计可以复用到大多数工业Excel文件上。
🗄️ 第二步:数据库层抽象——SQLite和MySQL统一接口
这里用SQLAlchemy做抽象层,好处是切换数据库只需要改一行连接字符串,迁移逻辑代码完全不用动。
python1from sqlalchemy import create_engine, text2from sqlalchemy.engine import Engine34class DatabaseManager:5"""6 支持SQLite和MySQL的数据库管理器7 通过连接字符串区分目标数据库类型8 """910def __init__(self, db_type: str = "sqlite", **kwargs):11 self.engine = self._create_engine(db_type, **kwargs)1213def _create_engine(self, db_type: str, **kwargs) -> Engine:14if db_type == "sqlite":15 db_path = kwargs.get("db_path", "industrial_data.db")16 url = f"sqlite:///{db_path}"1718elif db_type == "mysql":19 host = kwargs.get("host", "localhost")20 port = kwargs.get("port", 3306)21 user = kwargs.get("user", "root")22 password = kwargs.get("password", "")23 database = kwargs.get("database", "industrial")24 url = (25f"mysql+pymysql://{user}:{password}"26f"@{host}:{port}/{database}"27f"?charset=utf8mb4"28 )29else:30raise ValueError(f"不支持的数据库类型: {db_type}")3132return create_engine(url, echo=False)3334def init_table(self, table_name: str):35"""建表——如果已存在则跳过"""36 ddl = f"""37 CREATE TABLE IF NOT EXISTS `{table_name}` (38 id INTEGER PRIMARY KEY AUTOINCREMENT,39 record_time DATETIME NOT NULL,40 pressure FLOAT,41 temperature FLOAT,42 source_file VARCHAR(255),43 created_at DATETIME DEFAULT CURRENT_TIMESTAMP44 )45 """46# MySQL不支持AUTOINCREMENT关键字,需要换成AUTO_INCREMENT47if "mysql" in str(self.engine.url):48 ddl = ddl.replace("AUTOINCREMENT", "AUTO_INCREMENT")4950with self.engine.connect() as conn:51 conn.execute(text(ddl))52 conn.commit()53print(f" ✅ 表 [{table_name}] 初始化完成")5455def write_dataframe(56 self,57 df: pd.DataFrame,58 table_name: str,59 chunk_size: int = 50060 ) -> int:61"""62 分批写入DataFrame,返回实际写入行数63 chunk_size控制每批次大小,防止单次提交过大导致内存溢出64 """65if df.empty:66return 06768 df.to_sql(69 name=table_name,70 con=self.engine,71 if_exists="append",72 index=False,73 chunksize=chunk_size,74 method="multi" # 批量INSERT,比逐行快5-10倍75 )76return len(df)关于SQLite和MySQL的选择,有个实际经验分享一下:数据量在500万行以内、单机查询为主,SQLite完全够用,零配置、零维护,文件直接拷走就能用;超过这个量级、需要多人并发读写或者对接其他系统,就上MySQL。别一上来就觉得SQLite"不够专业"——很多工控场景里,SQLite的读写性能其实相当惊人。
🚀 第三步:迁移主流程——把一切串起来
python1from tqdm import tqdm2import glob3import os45class MigrationPipeline:6"""7 Excel → 数据库的完整迁移流水线8 支持单文件、多文件、多Sheet批量处理9 """1011def __init__(self, db_manager: DatabaseManager, table_name: str):12 self.db = db_manager13 self.table = table_name14 self.stats = {"total": 0, "success": 0, "failed": 0}1516def migrate_file(self, file_path: str):17"""迁移单个Excel文件(处理所有Sheet)"""18 path = Path(file_path)19print(f"\n📂 处理文件: {path.name}")2021# 获取所有Sheet名称22 xl = pd.ExcelFile(file_path, engine="openpyxl")23 sheets = xl.sheet_names2425for sheet in tqdm(sheets, desc=f" Sheet进度", leave=False):26try:27 reader = ExcelReader(file_path, sheet_name=sheet)28 df = reader.read()2930if df.empty:31continue3233# 记录数据来源,方便后续追溯34 df["source_file"] = f"{path.name}::{sheet}"3536 written = self.db.write_dataframe(df, self.table)37 self.stats["success"] += written38 self.stats["total"] += written3940except Exception as e:41print(f" ❌ Sheet [{sheet}] 处理失败: {e}")42 self.stats["failed"] += 14344def migrate_directory(self, dir_path: str, pattern: str = "*.xlsx"):45"""批量迁移目录下所有匹配的Excel文件"""46 files = glob.glob(os.path.join(dir_path, pattern))4748if not files:49print(f"⚠️ 目录 [{dir_path}] 下没有找到匹配文件")50return5152print(f"🔍 共发现 {len(files)} 个文件,开始迁移...\n")5354for f in tqdm(files, desc="总体进度"):55 self.migrate_file(f)5657 self._print_summary()5859def _print_summary(self):60print("\n" + "="*45)61print("📊 迁移完成报告")62print(f" 成功写入: {self.stats['success']:,} 行")63print(f" 失败批次: {self.stats['failed']} 个")64print("="*45)▶️ 实际运行示例
把上面的模块拼起来,跑起来就是这样:
python1if __name__ == "__main__":23# ---- 目标:SQLite(本地轻量场景)----4 sqlite_db = DatabaseManager(5 db_type="sqlite",6 db_path="./factory_data.db"7 )8 sqlite_db.init_table("device_records")910 pipeline = MigrationPipeline(sqlite_db, "device_records")11 pipeline.migrate_directory("./excel_data/", pattern="*.xlsx")121314# ---- 目标:MySQL(生产环境场景)----15# mysql_db = DatabaseManager(16# db_type="mysql",17# host="192.168.1.100",18# port=3306,19# user="data_admin",20# password="your_password",21# database="industrial_db"22# )23# mysql_db.init_table("device_records")24# pipeline = MigrationPipeline(mysql_db, "device_records")25# pipeline.migrate_directory("./excel_data/")
⚡ 性能这块,有几个坑必须提前说
坑一:df.to_sql默认逐行插入,慢到怀疑人生。 一定要加method="multi",实测在MySQL上,10万行数据从原来的180秒降到了约22秒——差了将近8倍。
坑二:MySQL连接字符串里别忘了charset=utf8mb4。 工厂数据里经常有中文备注,utf8(三字节版本)遇到某些特殊字符会直接报错,utf8mb4才是正确选择。
坑三:大文件迁移时内存会撑爆。 如果单个Excel文件超过10万行,建议在ExcelReader里加分块读取逻辑(pandas的chunksize参数),每次只处理一批,处理完立刻释放内存,别想着一口气全读进来。
坑四:迁移前务必检查重复数据。 如果同一份数据被迁移了两次,数据库里就会出现重复记录。可以在建表时对(record_time, source_file)加唯一约束,或者在写入前先做去重查询。
💬 聊几句
数据迁移这件事,表面上看是个体力活,实际上藏着很多细节——数据清洗策略、异常容错机制、写入性能调优,每一块都能单独展开讲很久。
我见过有人写了个"一键迁移脚本",跑完发现数据量对不上,查了两天才发现是时区问题导致时间解析错乱,损失惨重。所以迁移完之后,一定要做数据核验:抽样比对原始Excel和数据库里的记录,验证行数、验证关键字段的统计值(最大值、最小值、均值),这步不能省。
你在项目里做过类似的数据迁移工作吗?遇到过什么奇葩的数据格式问题?欢迎在评论区分享,说不定你的经历能帮到下一个踩坑的人。
夜雨聆风