你好,我是星探。
今天这篇文章,我在电脑前坐了整整一个下午,反复推敲每一个代码细节,就是想给你呈现出最真实、最落地的解决方案。
如果你曾经面对过这样的场景,请在评论区打个"1":
• 文件夹里有 100 个 Excel 文件,每个都是一个分公司的销售数据,老板让你今天下班前把所有数据合并成一张总表
• 每个文件的列名还不完全一样,有的叫"销售额",有的叫"销售金额",有的叫"Amount",合并后列名乱成一团
• 有的文件里日期格式乱七八糟,有的是 "2024/1/1",有的是 "01-01-2024",还有的干脆是个莫名其妙的五位数字 45292
• 有些文件比较大,几百兆,读起来特别慢,等得你怀疑人生
• 最要命的是——有些文件打不开,损坏了,或者正被别人打开着,你的代码直接崩掉
如果你点头了,那这篇文章就是为你写的。
我们不只讲"能跑"的代码,我们讲生产级的代码——能处理各种异常情况、能输出清晰的日志、能在半路出错时精确定位到哪个文件、哪一行出了问题。
更重要的是:我们会把为什么这样写讲清楚,而不只是怎么写。因为只有理解了原理,你才能在面对新问题时,自己写出解决方案。
第一部分:为什么批量读取Excel是个「坑坑洼洼」的活
很多人以为批量读取 Excel 很简单,三行代码搞定:
import pandas as pd import glob files = glob.glob("*.xlsx") df = pd.concat([pd.read_excel(f) for f in files]) print(df)确实,在最理想的假设下,这三行代码能跑通。但真实世界的数据,从来不是理想的。
在我处理过的十几个真实企业项目中,批量读取 Excel 会遇到这些问题(我先完整列出来,让你心里有个数,也方便你收藏备查):
问题1:文件路径含中文或空格
Windows 下特别常见。路径里有中文目录名,或者文件名里有空格,导致读取失败。比如 C:\Users\张三\Desktop\销售数据\2024年 Q1 报表.xlsx —— 这个路径里有中文、有空格,处理不当就会报错。
问题2:Sheet 名称不一致
100个文件,有的 Sheet 叫"销售数据",有的叫"Sheet1",有的叫"2024年数据",有的叫"Q1报表"。你没法用同一个 sheet_name 参数读取所有文件。更糟糕的是,有的文件里有两个 Sheet,你要的是第二个,不是第一个。
问题3:列名不一致
这是最头疼的问题之一。同样是"客户名称"这一列,有的文件里叫"客户名称",有的叫"客户",有的叫"Customer",有的叫"买方",合并时列对不上,结果变成了一大堆列,数据却分散在各列里。
问题4:数据类型不一致
同一个"销售额"列,有的文件里是数字(int 或 float),有的文件里是字符串(因为里面混入了"暂无"、"待确认"、"¥1,000"这样的文字)。合并后这列变成了 object 类型,你没法直接做数值计算,还不知道哪些行出了问题。
问题5:空行和空列
有些文件里有很多空行、空列,是导出时自带的,或者人工编辑时留下的。直接合并会把垃圾数据也带进去,影响后续统计结果。
问题6:文件损坏或被占用
有的 .xlsx 文件实际上损坏了(可能是在传输过程中出错的),有的是正被 Excel 打开着(Windows 下有文件锁)。读取时会抛异常,如果不处理,整个批量任务就在这里中断了。
问题7:大文件内存爆满
单个文件就有几百兆,100个这样的文件一次性读入内存,16G 内存的电脑直接卡死。这不是假设——我就遇到过一个客户,每个分公司的文件都有 50 万行数据,100 个文件就是 5000 万行,没处理得当的话,你的电脑直接罢工。
问题8:编码问题(混合 CSV 时特别常见)
虽然 .xlsx 格式基于 Office Open XML 标准,本身用 UTF-8 编码,一般不存在编码问题。但如果你同时还需要处理同目录下的 .csv 文件(很多企业会把两种格式混用),编码问题就会跳出来——有的 CSV 是 GBK 编码,有的是 UTF-8 BOM,有的是 Latin-1。
问题9:日期格式混乱
Excel 里的日期,存储方式有三种:真正的日期格式(Excel 内部存为序列号)、看起来像日期的字符串("2024/1/1")、还有一个特别坑的——Excel 序列号(那个莫名其妙的五位数,比如 45292 代表 2024-01-01)。三种混在一起,你用一种方式解析,必定有数据解析失败。
问题10:合并后索引重复
每个文件读取后索引都是从0开始,直接 concat 后索引重复,后续如果你用索引来做 join 或者查询,就会出大问题。很多人在这里踩了坑,排查了半天才发现是索引没重置。
问题11:文件格式混杂(.xls 和 .xlsx 同时存在)
老系统导出的可能是 .xls(Excel 97-2003 格式),新系统导出的是 .xlsx。两种格式需要不同的读取引擎,不处理好就会报错。
问题12:合并后数据行数不对
合并完了,发现总行数比所有文件的行数加起来要多(有重复行),或者要少(有文件读取时 silently 失败了,但你的代码没报错,只是返回了一个空的 DataFrame)。这时候你已经把错误的数据拿去分析了,后果可想而知。
这篇文章,就是要把这12个问题,一个一个彻底解决。你读完这篇文章,以后遇到任何批量读取 Excel 的场景,都能从容应对。
第二部分:先搞懂原理 —— pandas 是怎么读取 Excel 的?
在写代码之前,我们先把原理搞懂。只有理解了 pandas 读取 Excel 的完整流程,你才能在出错时快速定位问题。
2.1 Excel 文件的本质
很多人不知道:.xlsx 文件本质上是一个 ZIP 压缩包。
你可以做一个实验:把一个 .xlsx 文件的后缀名改成 .zip,然后用解压缩软件打开,你会看到这样的目录结构:
xl/ ├── worksheets/ │ ├── sheet1.xml ← 第一个 Sheet 的数据 │ ├── sheet2.xml ← 第二个 Sheet 的数据 │ └── ... ├── sharedStrings.xml ← 字符串池(Excel 为了压缩,会把重复的字符串存在这里) ├── styles.xml ← 格式信息(字体、颜色、边框等) └── workbook.xml ← 工作簿信息(Sheet 名称列表等)所以,读取 Excel 文件,本质上就是:
1. 解压 ZIP 包
2. 解析 XML 文件
3. 把 XML 里的数据转换成 pandas 的 DataFrame
这个过程,pandas 自己不干,它委托给专门的引擎库:
• openpyxl:负责读取 .xlsx 文件(Excel 2007+)
• xlrd:负责读取 .xls 文件(Excel 97-2003)
• odf:负责读取 .ods 文件(OpenOffice/LibreOffice 格式)
这就是为什么你安装 pandas 后,直接读取 .xlsx 会报错:
ImportError: Missing optional dependency 'openpyxl'. Use pip to install openpyxl.记忆方法:pandas 是"大脑",负责数据分析和处理;openpyxl/xlrd 是"手脚",负责和 Excel 文件格式打交道。大脑发出指令,但需要手脚去执行。
2.2 pd.read_excel() 的完整参数解析
很多人只用过 pd.read_excel(file_path),但实际上这个函数有非常多的参数,掌握它们能解决80%的读取问题。
import pandas as pd df = pd.read_excel( io="数据.xlsx", # 文件路径(必需) sheet_name=0, # Sheet 名称或索引,0=第一个Sheet,"Sheet1"=按名称 header=0, # 表头所在行(0=第一行),None=没有表头 names=None, # 自定义列名(当 header=None 时使用) index_col=None, # 哪一列作为索引列 usecols=None, # 只读取指定的列(可以是列名列表或列索引列表) dtype=None, # 指定列的数据类型(非常重要!) engine=None, # 指定引擎(openpyxl/xlrd/odf) converters=None, # 指定某些列的转换函数 true_values=None, # 视为 True 的值 false_values=None, # 视为 False 的值 skiprows=None, # 跳过前面的几行(可以是整数或列表) nrows=None, # 只读取前 N 行(用于快速预览大文件) na_values=None, # 视为 NaN 的额外值 keep_default_na=True, # 是否将默认的 NaN 标记(如"NA"、"N/A"等)视为 NaN na_filter=True, # 是否检测 NaN 值(关闭可提升性能) verbose=True, # 解析时是否输出额外信息 parse_dates=False, # 是否解析日期列(可以是 True/False/列索引列表) date_parser=None, # 自定义的日期解析函数 thousands=None, # 千分位分隔符(如 ",") decimal=".", # 小数点分隔符 comment=None, # 注释字符(该行从此字符开始的内容会被忽略) skipfooter=0, # 跳过末尾的几行 )这些参数里,最常用的5个:
1. usecols —— 只读取需要的列(大幅提升性能!)
# 只读取"客户名称"和"销售金额"两列 df = pd.read_excel(file_path, usecols=["客户名称", "销售金额"]) # 也可以用列索引 df = pd.read_excel(file_path, usecols=[0, 2, 5]) # 只读第1、3、6列为什么这能大幅提升性能? 因为 pandas 不需要解析你不需要的列,对于宽表(几十列、上百列)效果特别明显。
2. dtype —— 强制指定列的数据类型(避免自动推断错误!)
# 强制"客户名称"列为字符串,"销售金额"列为float32 df = pd.read_excel(file_path, dtype={ "客户名称": "str", "销售金额": "float32", "数量": "int32" })什么时候必须用这个? 当你的"客户ID"列里,有像 "00123" 这样带前导零的值。如果不指定 dtype,pandas 会自动把它转成整数 123,前导零就丢了!
3. nrows —— 只读取前N行(快速预览大文件!)
# 只读取前100行,用于快速了解文件结构 df = pd.read_excel(file_path, nrows=100) print(df.head())这是一个非常重要的调试技巧:面对一个几百兆的大文件,不要直接全量读取,先读100行看看结构对不对、列名是什么、数据格式是什么,确认无误后再全量读取。
4. skiprows —— 跳过前面的行(处理有多行表头的文件!)
# 跳过前两行(真正的表头在第3行) df = pd.read_excel(file_path, skiprows=2, header=0) # 注意:skiprows=2 跳过了前两行,然后 header=0 表示"跳过后的第一行作为表头" # 所以最终表头是原始文件的第3行(索引为2)有些企业导出的 Excel,前面几行是"报表名称"、"生成时间"、"制表人"等信息,真正的表头在第3行或第4行。这时候 skiprows 就是救命参数。
5. parse_dates —— 自动解析日期列
# 解析"日期"列为 datetime 类型 df = pd.read_excel(file_path, parse_dates=["日期"]) # 解析多列 df = pd.read_excel(file_path, parse_dates=["日期", "下单时间"]) # 解析"日期"列,并将解析后的列放在第一列 df = pd.read_excel(file_path, parse_dates={"日期时间": ["日期", "时间"]})注意:parse_dates 不是万能的。如果日期格式特别混乱(比如有的单元格是 Excel 序列号,有的是字符串),还是建议在读取后用 pd.to_datetime() 统一处理(后面会详细讲)。
2.3 glob 模块:批量获取文件列表的利器
Python 标准库里的 glob 模块,是批量处理文件的第一步。它的作用是:根据通配符模式,找到所有匹配的文件路径。
常用通配符(你必须熟记,因为批量处理离不开它们):
* | *.xlsx | |
** | recursive=True) | **/*.xlsx |
? | data?.xlsx | |
[abc] | file[123].xlsx | |
[0-9] | data[0-9].xlsx |
import glob import os # 获取当前目录下所有 .xlsx 文件(不包含子目录) files = glob.glob("*.xlsx") print("当前目录 .xlsx 文件:", files) # 获取当前目录及所有子目录下的 .xlsx 文件(递归) all_files = glob.glob("**/*.xlsx", recursive=True) print("递归找到的文件:", all_files) # 获取指定目录下的所有 Excel 文件(包括 .xls 和 .xlsx) excel_files = glob.glob(os.path.join("数据目录", "*.xls*")) print("所有 Excel 文件:", excel_files) # 只获取文件名(不含路径) file_names = [os.path.basename(f) for f in glob.glob("*.xlsx")] print("文件名列表:", file_names)注意 Windows 路径问题:Windows 下路径分隔符是反斜杠 \,在 Python 字符串里需要转义(写成 \\),或者使用原始字符串(前缀 r):
import glob import os # 方法1:原始字符串(推荐,最直观) path = r"C:\Users\你的用户名\Desktop\销售数据\*.xlsx" files = glob.glob(path) # 方法2:正斜杠(Python 会自动处理,跨平台兼容) path = "C:/Users/你的用户名/Desktop/销售数据/*.xlsx" files = glob.glob(path) # 方法3:os.path.join(最推荐!完全跨平台) base_dir = r"C:\Users\你的用户名\Desktop\销售数据" pattern = os.path.join(base_dir, "*.xlsx") files = glob.glob(pattern) # 在 Linux/Mac 上,这会自动使用 / 作为分隔符 # 在 Windows 上,这会自动使用 \ 作为分隔符为什么方法3最推荐? 因为你的代码可能今天在 Windows 上跑,明天要放到 Linux 服务器上跑。os.path.join() 会根据操作系统自动选择正确的路径分隔符,你的代码不用改一行就能跨平台运行。
第三部分:基础版 —— 把所有 Excel 文件合并成一个 DataFrame
原理搞清楚了,现在我们来写代码。从最基础的版本开始,确保你能跑通,然后再一步步优化。
3.1 版本一:三行代码的简陋版(不要用!)
import pandas as pd import glob # 获取所有 Excel 文件路径 files = glob.glob("*.xlsx") # 批量读取并合并 df = pd.concat([pd.read_excel(f) for f in files], ignore_index=True) # 查看结果 print(df.head()) print(f"共合并 {len(files)} 个文件,{len(df)} 行数据")这段代码的致命缺陷:
1. 没有任何错误处理:任何一个文件出问题,整个程序就崩了,而且你不知道是哪个文件出的问题
2. 没有日志输出:你不知道处理了哪些文件、跳过了哪些文件、每个文件有多少行数据
3. 没有列名统一:如果文件名不一致,合并后列名会乱套
4. 没有数据类型检查:合并后你可能才发现"销售金额"列变成了字符串类型
5. ignore_index=True重置了索引:这有时是好事(避免索引重复),有时不是(你可能需要保留原始文件的行索引信息,以便追溯)
结论:这段代码只能用来做 Demo,绝对不能用在生产环境。
3.2 版本二:带日志和错误处理的基础版
import pandas as pd import glob import os def batch_read_excel_basic(folder_path, file_pattern="*.xlsx"): """ 批量读取指定文件夹下的所有 Excel 文件并合并 参数: folder_path: 文件夹路径(字符串) file_pattern: 文件匹配模式(字符串,默认 *.xlsx) 返回: pandas.DataFrame 或 None(如果没有成功读取任何文件) """ # 拼接完整路径模式 pattern = os.path.join(folder_path, file_pattern) # 获取所有匹配的文件路径 file_list = glob.glob(pattern) print(f"找到 {len(file_list)} 个文件") print("-" * 50) # 用于存储成功读取的 DataFrame df_list = [] for i, file_path in enumerate(file_list, 1): file_name = os.path.basename(file_path) print(f"[{i}/{len(file_list)}] 正在读取:{file_name}") try: # 读取 Excel 文件 df = pd.read_excel(file_path) print(f" 成功:{len(df)} 行 × {len(df.columns)} 列") print(f" 列名:{list(df.columns)}") df_list.append(df) except Exception as e: print(f" ❌ 失败:{type(e).__name__}: {e}") print("-" * 50) if not df_list: print("没有成功读取任何文件!请检查:") print(" 1. 文件夹路径是否正确") print(" 2. 文件格式是否为有效的 Excel 文件") print(" 3. 文件是否正被其他程序打开(如 Excel)") return None # 合并所有 DataFrame print(f"开始合并 {len(df_list)} 个 DataFrame...") result = pd.concat(df_list, ignore_index=True) print(f"合并完成:共 {len(result)} 行 × {len(result.columns)} 列") print(f"合并后的列名:{list(result.columns)}") return result # ==================== 使用示例 ==================== if __name__ == "__main__": # 修改为你的实际文件夹路径 df = batch_read_excel_basic(r"./销售数据") if df is not None: # 保存到 Excel output_path = "合并结果.xlsx" df.to_excel(output_path, index=False) print(f"\n结果已保存到:{output_path}") # 显示基本信息 print("\n数据概览:") print(df.info()) print("\n前5行:") print(df.head()) else: print("\n合并失败,请检查错误信息。")这个版本已经有了明显的进步:
✅ 有进度提示,你知道程序跑到哪里了
✅ 有错误处理(try-except),单个文件失败不影响整体
✅ 有统计信息,你能看到每个文件有多少行多少列
✅ 失败了会告诉你异常类型和信息,方便你排查
但还不够。我们来深入思考几个更深层次的问题。
第四部分:进阶版 —— 处理真实世界的数据混乱
真实世界的数据,远比你想象的混乱。这一部分的每一小节,都是我在一个个真实项目中踩过的坑,总结出来的解决方案。
4.1 问题一:Sheet 名称不一致怎么处理?
真实场景:你拿到了100个分公司的销售数据文件,打开一看:
• 分公司A:销售数据.xlsx,Sheet 名:"Sheet1"
• 分公司B:销售数据-B.xlsx,Sheet 名:"2024年销售数据"
• 分公司C:分公司C-报表.xlsx,Sheet 名:"数据"
• 分公司D:Q1销售.xlsx,有 两个 Sheet:"Sheet1" 和 "说明",你要的是 "Sheet1"
如果你写死 sheet_name="销售数据",分公司A和C就会读取失败。
解决方案:智能 Sheet 读取策略
import pandas as pd import glob import os def read_excel_smart(file_path): """ 智能读取 Excel 文件: 1. 优先尝试读取第一个 Sheet 2. 如果第一个 Sheet 为空或行数太少,尝试读取所有 Sheet 并合并 3. 如果失败,返回错误信息 返回:(DataFrame 或 None, 错误信息 或 None) """ try: # 方法1:读取第一个 Sheet(默认行为) df = pd.read_excel(file_path, sheet_name=0) # sheet_name=0 表示第一个 Sheet # 检查是否读取到了有效数据 if df.empty or len(df) < 1: raise ValueError("第一个 Sheet 为空或行数太少,尝试读取所有 Sheet") return df, None except Exception as e: try: # 方法2:读取所有 Sheet,返回一个字典 {sheet_name: DataFrame} xl = pd.ExcelFile(file_path) sheet_names = xl.sheet_names print(f" 文件包含 {len(sheet_names)} 个 Sheet:{sheet_names}") # 读取所有 Sheet 并合并 df_all = pd.concat( [xl.parse(sheet) for sheet in sheet_names], ignore_index=True ) return df_all, None except Exception as e2: return None, f"方法1失败({e});方法2也失败({e2})" def batch_read_excel_v2(folder_path): """进阶版:智能 Sheet 读取 + 来源追溯""" pattern = os.path.join(folder_path, "*.xlsx") file_list = glob.glob(pattern) print(f"找到 {len(file_list)} 个文件") print("-" * 50) df_list = [] failed_files = [] for i, file_path in enumerate(file_list, 1): file_name = os.path.basename(file_path) print(f"[{i}/{len(file_list)}] {file_name}") df, error = read_excel_smart(file_path) if df is not None: print(f" ✅ 成功:{len(df)} 行 × {len(df.columns)} 列") # 【关键】添加来源文件名列(非常重要!) # 合并后你能追溯每一行数据来自哪个原始文件 # 这在数据核查和业务追溯时极其重要 df["_来源文件"] = file_name df["_来源路径"] = file_path df_list.append(df) else: print(f" ❌ 读取失败:{error}") failed_files.append((file_name, error)) print("-" * 50) print(f"✅ 成功:{len(df_list)} 个文件") print(f"❌ 失败:{len(failed_files)} 个文件") if failed_files: print("\n失败文件列表(建议人工核查):") for name, err in failed_files: print(f" - {name}") print(f" 原因:{err[:100]}") # 只打印前100个字符,避免输出太长 if not df_list: print("\n❌ 没有成功读取任何文件!") return None # 合并所有 DataFrame print(f"\n开始合并 {len(df_list)} 个文件...") result = pd.concat(df_list, ignore_index=True, sort=False) # sort=False 表示不自动按列名排序,保留原始顺序 print(f"✅ 合并完成:{len(result)} 行 × {len(result.columns)} 列") print(f"最终列名:{list(result.columns)}") return result这段代码的关键改进点:
1. _来源文件 和 _来源路径 列:合并后你能追溯每一行数据来自哪个原始文件,这在数据核查时非常重要。比如老板问"这个数字是怎么来的?",你可以直接筛选出 _来源文件 == "分公司A.xlsx" 的数据,一目了然。
2. read_excel_smart() 函数:专门负责智能 Sheet 读取,逻辑清晰,易于单独测试和维护。
3. 失败文件单独记录:哪些文件没读成功、原因是什么,一目了然,不会静默失败。
4. sort=False:合并时保留原始列顺序,而不是按字母排序(默认行为有时候会让你很困扰)。
4.2 问题二:列名不一致怎么处理?(这是最头疼的问题!)
真实场景:你合并100个文件,结果发现合并后的 DataFrame 有20多列:
• 客户名称,客户,Customer,买方,购买方...
• 销售金额,金额,销售额,Amount,Sales...
• 日期,销售日期,交易日期,Date...
直接合并,pandas 会把它们当成不同的列,结果就是:数据分散在多个列里,你做统计分析时只统计了其中一列,结果当然是错的。
解决方案:列名映射表 + 标准化函数
# ==================== 列名映射表(核心配置,根据你的业务修改)==================== # 键:文件中可能出现的各种列名(别名) # 值:标准化后的统一列名 COLUMN_MAPPING = { # 客户名称相关(把所有表示"客户名称"的列名,统一映射为"客户名称") "客户名称": "客户名称", "客户": "客户名称", "Customer": "客户名称", "Customer Name": "客户名称", "客户名": "客户名称", "买方": "客户名称", "购买方": "客户名称", "客户姓名": "客户名称", # 销售金额相关 "销售金额": "销售金额", "金额": "销售金额", "销售额": "销售金额", "Amount": "销售金额", "Sales": "销售金额", "销售": "销售金额", "成交金额": "销售金额", "总金额": "销售金额", # 日期相关 "日期": "日期", "Date": "日期", "销售日期": "日期", "交易日期": "日期", "下单日期": "日期", "日期时间": "日期", # 商品相关 "商品名称": "商品名称", "商品": "商品名称", "产品": "商品名称", "Product": "商品名称", "Product Name": "商品名称", # 数量相关 "数量": "数量", "Quantity": "数量", "Qty": "数量", "购买数量": "数量", } # ==================== 标准化函数 ==================== def normalize_columns(df): """ 标准化 DataFrame 的列名 把所有已知的变体列名,重命名为标准列名 """ # 创建重命名字典(只重命名那些在映射表中存在的列) rename_dict = {} for col in df.columns: if col in COLUMN_MAPPING: rename_dict[col] = COLUMN_MAPPING[col] if rename_dict: df = df.rename(columns=rename_dict) print(f" 列名映射:{rename_dict}") return df # ==================== 在批量读取中应用 ==================== def batch_read_excel_v3(folder_path): """进阶版:列名标准化""" pattern = os.path.join(folder_path, "*.xlsx") file_list = sorted(glob.glob(pattern)) # sorted() 保证文件按名称排序,方便追溯 print(f"找到 {len(file_list)} 个文件") print("-" * 50) df_list = [] failed_files = [] for i, file_path in enumerate(file_list, 1): file_name = os.path.basename(file_path) print(f"[{i}/{len(file_list)}] {file_name}") try: df, error = read_excel_smart(file_path) if df is None: print(f" ❌ 读取失败:{error}") failed_files.append((file_name, error)) continue # 【核心步骤】标准化列名 original_cols = list(df.columns) df = normalize_columns(df) new_cols = list(df.columns) if original_cols != new_cols: print(f" 列名已标准化:{len(original_cols)} → {len(new_cols)} 列") print(f" ✅ 成功:{len(df)} 行 × {len(df.columns)} 列") # 添加来源信息 df["_来源文件"] = file_name df_list.append(df) except Exception as e: print(f" ❌ 异常:{type(e).__name__}: {e}") failed_files.append((file_name, str(e))) print("-" * 50) if not df_list: print("❌ 没有成功读取任何文件!") return None # 合并前,再次检查列名是否完全统一 print(f"合并前最终检查:") for df in df_list: print(f" - {df['_来源文件'].iloc[0]}: {list(df.columns)}") # 执行合并 result = pd.concat(df_list, ignore_index=True, sort=False) print(f"\n✅ 合并完成:{len(result)} 行 × {len(result.columns)} 列") print(f"最终列名:{list(result.columns)}") return result这段代码的精华在于:
1. COLUMN_MAPPING 字典:维护一份"列名别名 → 标准列名"的映射表。只需要维护这一处,整个项目受益。新增文件有新的列名变体?往这个字典里加一行就行。
2. normalize_columns() 函数:专门负责列名标准化,逻辑清晰,易于单独测试。
3. 合并前打印最终列名:让你在保存前就能发现列名是否完全统一,避免把有问题的数据保存出去。
4.3 问题三:数据类型不一致怎么处理?
即使列名统一了,数据类型也可能不一致:
• 文件1的"销售金额"列是 int64(整数)
• 文件2的"销售金额"列是 str(因为里面混入了"暂无"、"待确认"等文字)
• 文件3的"销售金额"列是 float64(浮点数,因为有小数)
合并后,pandas 会把这些列统一成 object 类型(字符串),你就没法直接做数值计算了(比如求 sum、mean 等)。
解决方案:合并后,统一转换数据类型
def clean_and_typecast(df): """ 清洗数据并统一数据类型 这个函数处理了批量合并后最常见的问题 """ original_len = len(df) # ==================== 步骤1:删除全空行 ==================== df = df.dropna(how="all") if len(df) < original_len: print(f" 🧹 删除了 {original_len - len(df)} 行全空行") # ==================== 步骤2:字符串列清洗 ==================== # 所有 object 类型的列,去除首尾空格,把明显的"空值表示"转为真正的 NaN for col in df.select_dtypes(include=["object"]).columns: # 先转成字符串(防止有数字混在里面) df[col] = df[col].astype(str) # 去除首尾空格 df[col] = df[col].str.strip() # 把各种表示"空"的字符串,统一替换为 pd.NA(真正的空值) df[col] = df[col].replace( ["nan", "None", "NaN", "", " ", "NULL", "null", "无", "暂无", "待确认"], pd.NA ) # ==================== 步骤3:销售金额列 → 转为数值类型 ==================== if "销售金额" in df.columns: print(f" 🔢 处理「销售金额」列...") # 先转成字符串,然后去掉常见的非数字字符 df["销售金额_清洗"] = df["销售金额"].astype(str) df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("¥", "", regex=False) df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("¥", "", regex=False) df["销售金额_清洗"] = df["销售金额_清洗"].str.replace(",", "", regex=False) # 去掉千分位逗号 df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("元", "", regex=False) df["销售金额_清洗"] = df["销售金额_清洗"].str.replace(" ", "", regex=False) df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("万", "", regex=False) # 注意:这里有坑! # 【重要】如果数据里有"1.5万"这样的表示,需要先特殊处理 # 这里先标记一下,后面会讲完整的解决方案 # 转换为数值(无法转换的会变成 NaN,而不是报错崩溃) df["销售金额"] = pd.to_numeric(df["销售金额_清洗"], errors="coerce") # 删除临时列 df = df.drop(columns=["销售金额_清洗"]) # 统计转换结果 valid_count = df["销售金额"].notna().sum() na_count = df["销售金额"].isna().sum() print(f" 转换结果:有效 {valid_count} 行,失败(NaN){na_count} 行") if na_count > 0: print(f" ⚠️ 有 {na_count} 行的销售金额无法转换为数值,请检查原始数据") # 打印出无法转换的值(帮助调试) problem_values = df.loc[df["销售金额"].isna(), "销售金额"].unique()[:10] # 只打印前10个不重复的值 print(f" 无法转换的值示例:{list(problem_values)}") # ==================== 步骤4:日期列 → 转为 datetime 类型 ==================== if "日期" in df.columns: print(f" 📅 处理「日期」列...") # errors="coerce":无法解析的日期会变成 NaT(Not a Time,相当于日期版的 NaN) df["日期"] = pd.to_datetime(df["日期"], errors="coerce") valid_count = df["日期"].notna().sum() nat_count = df["日期"].isna().sum() print(f" 转换结果:有效 {valid_count} 行,失败(NaT){nat_count} 行") if nat_count > 0: print(f" ⚠️ 有 {nat_count} 行的日期无法解析,请检查原始数据") # ==================== 步骤5:数量列 → 转为整数类型 ==================== if "数量" in df.columns: print(f" 🔢 处理「数量」列...") df["数量_清洗"] = df["数量"].astype(str).str.replace(",", "", regex=False) df["数量"] = pd.to_numeric(df["数量_清洗"], errors="coerce").fillna(0).astype(int) df = df.drop(columns=["数量_清洗"]) return df # ==================== 在合并后调用 ==================== result = pd.concat(df_list, ignore_index=True, sort=False) print("\n开始清洗数据...") result = clean_and_typecast(result) print("清洗完成!")关键点解释:
• errors="coerce":无法转换的值变成 NaN(数值)或 NaT(日期),而不是报错崩溃。这是生产级代码的重要技巧——你宁愿有些值是空值,也不愿意整个程序崩溃。
• 先转字符串再处理:因为原始数据可能是任何类型,统一转成字符串再清洗,是最安全的做法。
• pd.NA vs np.nan:pandas 1.0+ 推荐使用 pd.NA 表示缺失值,它对所有数据类型都适用;np.nan 主要用于浮点数。
• 为什么要打印转换失败的示例值?因为在生产环境中,知道"哪些值转换失败了"比知道"有多少值转换失败了"更重要。前者帮你定位问题根源,后者只能告诉你有问题。
4.4 处理"万"、"亿"这样的中文数值单位
前面留了一个坑:如果数据里有"1.5万"、"3000万"、"2.3亿"这样的表示,怎么处理?
def clean_chinese_number(value): """ 清洗中文数值表示(万、亿等) 示例: "1.5万" → 15000.0 "3000万" → 30000000.0 "2.3亿" → 230000000.0 "5000" → 5000.0 "暂无" → NaN """ if pd.isna(value): return pd.NA s = str(value).strip() # 先处理"万" if "万" in s: num_part = s.replace("万", "").strip() try: return float(num_part) * 10000 except: return pd.NA # 再处理"亿" if "亿" in s: num_part = s.replace("亿", "").strip() try: return float(num_part) * 100000000 except: return pd.NA # 没有中文单位,直接转数值 try: return float(s) except: return pd.NA # 在 clean_and_typecast 函数中应用: if "销售金额" in df.columns: df["销售金额"] = df["销售金额"].apply(clean_chinese_number)第五部分:不同技术方案对比 —— 什么时候用 Python,什么时候不用?
写到这里,我想和你聊一个更重要的问题:并不是所有批量合并 Excel 的场景都适合用 Python。
作为数据分析师,你需要有一个"技术选型"的思维框架。以下是我总结的决策树:
| Excel 自带功能 | ||
| Excel Power Query | ||
| Python(本文方案) | ||
| Python(本文方案) | ||
| Excel VBA 宏 | ||
| Spark / Dask |
我的建议:
• 如果你只需要做一次,用 Excel Power Query(Excel 2016+ 自带,不需要安装任何插件)
• 如果你需要定期做,且数据量中等,用本文的 Python 方案
• 如果你需要在企业级环境下部署,建议把 Python 脚本封装成一个带界面的小工具(可以用 tkinter 或 PyQt),让不懂 Python 的同事也能用
5.1 Excel Power Query 方案简介(作为对比)
有些读者可能会问:"既然 Python 这么麻烦,为什么不直接用 Power Query?"
好问题。Power Query 确实很强大,但它有几个局限:
1. 处理不规范数据的能力有限:如果列名不一致、数据类型混乱,Power Query 的处理不如 Python 灵活
2. 错误处理不够细粒度:某个文件出错了,Power Query 可能会整个刷新失败,而你不知道是哪个文件的问题
3. 无法做复杂的逻辑判断:比如"如果文件大小超过100MB就用另一种方式读取",这种逻辑用 Python 写很简单,用 Power Query 就做不到
4. 部署和分享困难:你写好的 Power Query 查询,分享给同事需要对方也有同样版本的 Excel,且操作步骤不少
所以,我的建议是:小数据量、一次性分析用 Power Query;需要自动化、需要可维护性的场景,用 Python。
第六部分:性能优化 —— 处理大文件的策略
前面的脚本对付几十个、几百兆的文件没问题。但如果你面对的是上千个文件、总数据量超过内存的情况,就需要更高级的策略了。
6.1 策略一:分块读取(Chunking)
首先要说明一个重要的限制:pd.read_excel()不支持chunksize 参数(这是 pd.read_csv() 的功能)。所以对于极大的 Excel 文件,我们有几个选择:
选择1:先转换成 CSV,再用 chunksize 读取
# 第一步:把大 Excel 文件转换成 CSV(只需要做一次) df = pd.read_excel("大文件.xlsx") df.to_csv("大文件.csv", index=False, encoding="utf-8-sig") # utf-8-sig 兼容 Excel 打开中文 # 第二步:分块读取 CSV chunk_iter = pd.read_csv("大文件.csv", chunksize=10000) for chunk in chunk_iter: # 每个 chunk 是一个 10000 行的 DataFrame print(f"处理 {len(chunk)} 行数据...") # 在这里做你的处理...选择2:用 openpyxl 的 read_only 模式
import openpyxl import pandas as pd def read_large_excel_by_rows(file_path, chunk_size=10000): """ 用 openpyxl 的 read_only 模式,分块读取超大 Excel 文件 适用于单个文件就超过内存的情况 """ wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) ws = wb.active # 获取表头 headers = [cell.value for cell in next(ws.iter_rows(min_row=1, max_row=1))] rows = [] row_count = 0 # 分批读取数据行 for row in ws.iter_rows(min_row=2, values_only=True): rows.append(row) row_count += 1 if row_count % chunk_size == 0: df_chunk = pd.DataFrame(rows, columns=headers) yield df_chunk rows = [] # 释放内存 print(f"已处理 {row_count} 行...") # 最后一批 if rows: df_chunk = pd.DataFrame(rows, columns=headers) yield df_chunk wb.close() print(f"读取完成,共 {row_count} 行") # 使用示例 for chunk_df in read_large_excel_by_rows("超大文件.xlsx", chunk_size=5000): # 处理每个 chunk print(f"Chunk 大小:{len(chunk_df)} 行") # 这里可以做过滤、聚合等操作,然后写入输出文件6.2 策略二:使用更高效的数据格式(强烈推荐!)
如果你需要反复处理同一批数据,不要反复读取 Excel!
最佳实践:
1. 第一次:把所有 Excel 合并,保存为 .parquet 格式(Apache Parquet,列式存储格式)
2. 后续:直接读取 .parquet,速度快 10 倍以上,文件也更小
# 第一次:合并后保存为 parquet(推荐!) df = batch_read_excel_v3(DATA_FOLDER) df.to_parquet("合并结果.parquet", compression="snappy", index=False) print("已保存为 parquet 格式") # 后续:直接读取 parquet,速度极快 df = pd.read_parquet("合并结果.parquet") print(f"从 parquet 读取完成:{len(df)} 行")为什么推荐 parquet?
• 文件体积小:有压缩(默认用 snappy 压缩算法),通常比 CSV 小 50% 以上,比 Excel 小更多
• 读取速度极快:列式存储,只读取你需要的列,不需要像 Excel 那样解析整个文件
• 保留数据类型信息:Excel 读出来后数据类型信息会丢失(比如"客户ID"的前导零),parquet 不会
• 跨语言兼容:Python、R、Spark、Java 都能读
• 支持复杂数据类型:比如 DataFrame 里有列表、字典等,parquet 能正确保存,CSV 就不行
对比:不同格式的读取速度(实测数据,100万行)
结论:parquet 是最好的选择。如果你只在 Python 里用,pickle 也不错( but pickle 有安全风险,不要加载来源不明的 .pkl 文件)。
6.3 策略三:并行读取(多进程加速)
如果你有 N 个文件需要合并,而你的电脑有 N 个 CPU 核心,可以用多进程并行读取,大幅缩短总耗时。
import pandas as pd import glob import os from multiprocessing import Pool, cpu_count import time def read_single_file(file_path): """读取单个文件的包装函数(用于多进程)""" try: df = pd.read_excel(file_path, sheet_name=0) df["_来源文件"] = os.path.basename(file_path) return df, None except Exception as e: return None, (os.path.basename(file_path), str(e)) def batch_read_excel_parallel(folder_path, n_processes=None): """ 并行批量读取 Excel 文件 参数: folder_path: 文件夹路径 n_processes: 进程数(默认等于 CPU 核心数) """ pattern = os.path.join(folder_path, "*.xlsx") file_list = sorted(glob.glob(pattern)) if not file_list: print(f"❌ 在 {folder_path} 中没有找到 .xlsx 文件") return None if n_processes is None: n_processes = min(cpu_count(), len(file_list)) # 不超过文件数量 print(f"找到 {len(file_list)} 个文件") print(f"使用 {n_processes} 个并行进程读取...") print("-" * 50) start_time = time.time() # 创建进程池,并行读取 with Pool(processes=n_processes) as pool: results = pool.map(read_single_file, file_list) # 整理结果 df_list = [] failed_files = [] for result in results: df, error = result if df is not None: df_list.append(df) else: failed_files.append(error) elapsed = time.time() - start_time print(f"并行读取完成,耗时 {elapsed:.2f} 秒") print(f"✅ 成功:{len(df_list)} 个 | ❌ 失败:{len(failed_files)} 个") if not df_list: return None result = pd.concat(df_list, ignore_index=True, sort=False) print(f"合并完成:{len(result)} 行 × {len(result.columns)} 列") return result # 使用示例 if __name__ == "__main__": df = batch_read_excel_parallel(r"./销售数据") if df is not None: df.to_parquet("合并结果_并行.parquet", compression="snappy", index=False) print("结果已保存为 parquet 格式")注意:多进程读取有个前提——你的硬盘读取速度要跟得上。如果是机械硬盘(HDD),并行读取可能反而更慢(因为磁头需要不停跳转)。如果是固态硬盘(SSD),并行读取效果非常明显。
第七部分:生产级完整代码(最终版)
好了,前面把核心问题都拆解了。现在,我把所有这些技巧,整合进一个生产级的完整脚本里。这个脚本你可以直接用于工作,只需要修改配置区,然后运行。
""" 批量读取 Excel 文件并合并 — 生产级完整脚本(最终版) 作者:星探 适用场景:合并多个分公司的销售数据、合并多期报表、批量处理实验数据等 技术支持:关注微信公众号「数据星探」获取更多教程和源码 """ import pandas as pd import glob import os import time from datetime import datetime # ==================== 配置区(修改这里!)==================== # 列名映射表(根据你的实际业务修改) COLUMN_MAPPING = { "客户名称": "客户名称", "客户": "客户名称", "Customer": "客户名称", "Customer Name": "客户名称", "销售金额": "销售金额", "金额": "销售金额", "销售额": "销售金额", "Amount": "销售金额", "日期": "日期", "Date": "日期", "销售日期": "日期", } # 必须的标准列(合并后只保留这些列,其他的列会被丢弃;如果为空则保留所有列) REQUIRED_COLUMNS = [] # 例如:["客户名称", "销售金额", "日期"],留空则保留所有列 # 数据文件夹路径 DATA_FOLDER = r"./销售数据" # 修改为你的实际路径 # 输出文件路径 OUTPUT_EXCEL = r"./合并结果.xlsx" OUTPUT_PARQUET = r"./合并结果.parquet" # ==================== 工具函数 ==================== def log(msg, level="INFO"): """统一的日志输出(带时间戳和级别)""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") prefix = { "INFO": "ℹ️", "SUCCESS": "✅", "WARNING": "⚠️", "ERROR": "❌", "DEBUG": "🔍", }.get(level, "ℹ️") print(f"[{timestamp}] {prefix} {msg}") def normalize_columns(df): """标准化 DataFrame 的列名""" rename_dict = {col: COLUMN_MAPPING[col] for col in df.columns if col in COLUMN_MAPPING} if rename_dict: old_cols = list(df.columns) df = df.rename(columns=rename_dict) new_cols = list(df.columns) # 只打印发生变化的列 changed = {k: v for k, v in rename_dict.items() if k != v} if changed: log(f"列名已标准化:{changed}", "DEBUG") return df def clean_dataframe(df): """清洗 DataFrame:去空行、统一数据类型""" original_len = len(df) # 1. 删除全空行 df = df.dropna(how="all") if len(df) < original_len: log(f"删除了 {original_len - len(df)} 行全空行", "DEBUG") # 2. 字符串列:去空格、统一 NaN 表示 for col in df.select_dtypes(include=["object"]).columns: df[col] = df[col].astype(str).str.strip() df[col] = df[col].replace( ["nan", "None", "NaN", "", " ", "NULL", "null", "无", "暂无"], pd.NA ) # 3. 销售金额列:清洗并转为数值 if "销售金额" in df.columns: log("处理「销售金额」列...", "DEBUG") df["销售金额"] = df["销售金额"].apply(clean_chinese_number) valid = df["销售金额"].notna().sum() na_count = df["销售金额"].isna().sum() if na_count > 0: log(f"销售金额:有效 {valid} 行,失败(NaN){na_count} 行", "WARNING") # 4. 日期列:转为 datetime if "日期" in df.columns: log("处理「日期」列...", "DEBUG") df["日期"] = pd.to_datetime(df["日期"], errors="coerce") valid = df["日期"].notna().sum() nat_count = df["日期"].isna().sum() if nat_count > 0: log(f"日期:有效 {valid} 行,失败(NaT){nat_count} 行", "WARNING") # 5. 如果指定了必须列,只保留这些列(加上来源列) if REQUIRED_COLUMNS: keep_cols = REQUIRED_COLUMNS + ["_来源文件", "_来源路径"] keep_cols = [c for c in keep_cols if c in df.columns] # 只保留实际存在的列 df = df[keep_cols] return df def clean_chinese_number(value): """清洗中文数值表示(万、亿等)""" if pd.isna(value): return pd.NA s = str(value).strip() try: if "亿" in s: return float(s.replace("亿", "").strip()) * 100000000 if "万" in s: return float(s.replace("万", "").strip()) * 10000 # 去掉常见非数字字符 s = s.replace(",", "").replace("¥", "").replace("¥", "").replace("元", "") return float(s) except: return pd.NA def read_single_file(file_path): """ 读取单个 Excel 文件,支持多 Sheet 返回:(DataFrame 或 None, 错误信息 或 None) """ try: # 尝试读取第一个 Sheet df = pd.read_excel(file_path, sheet_name=0, nrows=0) # 先只读列名 df = pd.read_excel(file_path, sheet_name=0) # 再真正读取数据 return df, None except Exception as e: try: # 失败则尝试读取所有 Sheet all_sheets = pd.read_excel(file_path, sheet_name=None) log(f"文件包含 {len(all_sheets)} 个 Sheet,已合并读取", "DEBUG") df = pd.concat(all_sheets.values(), ignore_index=True) return df, None except Exception as e2: return None, f"{type(e).__name__}: {e2}" # ==================== 主流程 ==================== def batch_read_excel(folder_path, file_pattern="*.xlsx", add_source=True): """ 批量读取 Excel 文件并合并(生产级) 参数: folder_path: 文件夹路径 file_pattern: 文件匹配模式 add_source: 是否添加"_来源文件"列 返回: 合并后的 DataFrame,或 None """ pattern = os.path.join(folder_path, file_pattern) file_list = sorted(glob.glob(pattern)) if not file_list: log(f"在 {folder_path} 中没有找到匹配 {file_pattern} 的文件", "ERROR") return None log(f"找到 {len(file_list)} 个文件") log("=" * 50) df_list = [] failed_files = [] for i, file_path in enumerate(file_list, 1): file_name = os.path.basename(file_path) log(f"[{i}/{len(file_list)}] 处理:{file_name}") df, error = read_single_file(file_path) if df is not None: # 标准化列名 df = normalize_columns(df) # 清洗数据 df = clean_dataframe(df) # 添加来源文件列 if add_source: df["_来源文件"] = file_name df["_来源路径"] = file_path df_list.append(df) log(f"成功:{len(df)} 行 × {len(df.columns)} 列", "SUCCESS") else: log(f"读取失败:{error}", "ERROR") failed_files.append((file_name, error)) log("=" * 50) log(f"成功:{len(df_list)} 个 | 失败:{len(failed_files)} 个") if failed_files: log("失败文件列表:", "WARNING") for name, err in failed_files: log(f" - {name}:{err[:80]}", "WARNING") if not df_list: log("没有成功读取任何文件", "ERROR") return None # 合并 log("开始合并...") result = pd.concat(df_list, ignore_index=True, sort=False) log(f"合并完成:{len(result)} 行 × {len(result.columns)} 列", "SUCCESS") log(f"最终列名:{list(result.columns)}") return result # ==================== 程序入口 ==================== if __name__ == "__main__": start_time = time.time() log("=" * 50) log("批量 Excel 合并工具启动(生产级)") log("=" * 50) # 执行批量读取和合并 df = batch_read_excel(DATA_FOLDER) if df is not None: # 保存结果(同时保存 Excel 和 parquet 两种格式) log(f"正在保存到 Excel:{OUTPUT_EXCEL}") df.to_excel(OUTPUT_EXCEL, index=False) log(f"Excel 保存成功!", "SUCCESS") log(f"正在保存到 Parquet:{OUTPUT_PARQUET}") df.to_parquet(OUTPUT_PARQUET, compression="snappy", index=False) log(f"Parquet 保存成功!(推荐后续使用这个格式,读取速度更快)", "SUCCESS") # 统计信息 log("=" * 50) log("📊 数据概览:") log(f" 总行数:{len(df)}") log(f" 总列数:{len(df.columns)}") if "销售金额" in df.columns: valid_amount = df["销售金额"].dropna() log(f" 有效销售金额行数:{len(valid_amount)}") if len(valid_amount) > 0: log(f" 销售金额合计:{valid_amount.sum():,.2f}") log(f" 销售金额平均值:{valid_amount.mean():,.2f}") log(f" 销售金额最大值:{valid_amount.max():,.2f}") log(f" 销售金额最小值:{valid_amount.min():,.2f}") if "日期" in df.columns: valid_dates = df["日期"].dropna() if len(valid_dates) > 0: log(f" 最早日期:{valid_dates.min()}") log(f" 最晚日期:{valid_dates.max()}") if "_来源文件" in df.columns: file_count = df["_来源文件"].nunique() log(f" 来源文件数:{file_count}") elapsed = time.time() - start_time log(f"⏱️ 总耗时:{elapsed:.2f} 秒") log("=" * 50) else: log("合并失败,请检查文件路径和文件格式", "ERROR")这个脚本的优点总结:
✅ 完整的日志系统,每步都有时间戳和级别(INFO/SUCCESS/WARNING/ERROR)
✅ 智能 Sheet 读取,单 Sheet 失败自动尝试多 Sheet
✅ 列名标准化,支持别名映射(维护一处,全局生效)
✅ 数据类型统一清洗,销售金额自动去除货币符号、处理"万/亿"单位
✅ 来源追溯,每个原始文件都有记录(_来源文件、_来源路径)
✅ 失败文件单独记录,不会静默失败
✅ 合并后输出详细统计概览(行数、列数、金额统计、日期范围等)
✅ 同时保存 Excel 和 Parquet 两种格式(Parquet 推荐用于后续分析)
✅ 计时功能,方便你评估性能,发现瓶颈
✅ 配置区集中管理,修改路径和映射表即可,不需要改逻辑代码
第八部分:实战案例 —— 从需求到上线全流程
光看代码不够,我来带你走一遍真实企业项目的完整流程。这个案例来自我真实做过的项目(数据已脱敏)。
案例背景
某零售企业有 50 家分公司,每家分公司每天导出一份销售数据 Excel 文件,上传到总部服务器。你的任务是:每天自动合并前一天的所有文件,生成一张总表,并计算出各分公司的销售排名。
文件特点:
• 每家分公司的文件命名规则:分公司名_日期.xlsx(如 北京分公司_2024-01-01.xlsx)
• 有的分公司用 .xlsx,有的用 .xls(老系统)
• 列名不统一(见前面的问题列表)
• 文件可能损坏或为空
完整解决方案(分步骤)
步骤1:了解你的数据(永远不要跳过这步!)
在写代码之前,先随机打开 5-10 个文件,手工看一下:
1. 列名是什么?(截图记录,填入 COLUMN_MAPPING)
2. 日期格式是什么?
3. 金额有没有货币符号?
4. 有没有合并单元格?(合并单元格用 openpyxl 读取时,只有第一个单元格有值,其余为 NaN)
5. 有没有多行表头?
步骤2:处理特殊情况
如果发现有多行表头(比如前两行是标题,真正的表头在第3行),需要指定 header 和 skiprows 参数:
def read_excel_with_complex_header(file_path): """处理复杂表头的文件""" # 先读取前5行,看看结构 preview = pd.read_excel(file_path, nrows=5) print(f"前5行预览:\n{preview}") # 如果真正的表头在第3行(索引为2,因为从0开始) # 注意:skiprows=2 跳过前两行,然后 header=0 表示"跳过后的第一行作为表头" df = pd.read_excel(file_path, skiprows=2, header=0) return df步骤3:完整自动化脚本(定时任务版)
""" 定时任务版:每天自动合并前一天的数据 配合 Windows 任务计划程序 或 Linux crontab 使用 """ import pandas as pd import glob import os import time from datetime import datetime, timedelta # 配置 DATA_BASE_DIR = r"C:\销售数据" OUTPUT_DIR = r"C:\合并结果" LOG_DIR = r"C:\合并结果\logs" def get_yesterday_folder(): """获取昨天的日期字符串,用于定位文件夹""" yesterday = datetime.now() - timedelta(days=1) date_str = yesterday.strftime("%Y-%m-%d") return os.path.join(DATA_BASE_DIR, date_str) def main(): # 1. 定位昨天的文件夹 folder = get_yesterday_folder() if not os.path.exists(folder): log(f"文件夹不存在:{folder},可能有分公司未上传数据", "WARNING") # 可以选择继续处理(合并已有的文件),或者退出 # 这里选择继续处理 # 2. 批量读取(使用前面定义的生产级函数) df = batch_read_excel(folder, file_pattern="*.xls*") # 同时匹配 .xls 和 .xlsx if df is None: log("没有成功读取任何文件,程序终止", "ERROR") return # 3. 数据质量检查 log("=" * 50) log("数据质量检查:") log(f" 总行数:{len(df)}") log(f" 重复行数:{df.duplicated().sum()}") if "客户名称" in df.columns: log(f" 客户数量(去重):{df['客户名称'].nunique()}") if "销售金额" in df.columns: na_count = df["销售金额"].isna().sum() log(f" 销售金额为空的行数:{na_count}") valid = df["销售金额"].dropna() if len(valid) > 0: log(f" 销售额统计:") log(f" 最小值:{valid.min():,.2f}") log(f" 最大值:{valid.max():,.2f}") log(f" 平均值:{valid.mean():,.2f}") log(f" 合计:{valid.sum():,.2f}") if "日期" in df.columns: valid_dates = df["日期"].dropna() if len(valid_dates) > 0: log(f" 日期范围:{valid_dates.min()} ~ {valid_dates.max()}") # 4. 删除重复行(可选,根据业务需求决定) before = len(df) df = df.drop_duplicates() after = len(df) if before != after: log(f"删除了 {before - after} 行重复数据", "WARNING") # 5. 按日期排序(如果日期列存在且有效) if "日期" in df.columns and df["日期"].notna().sum() > 0: df = df.sort_values("日期") log("已按日期排序", "DEBUG") # 6. 保存结果(同时保存 Excel 和 parquet) today_str = datetime.now().strftime("%Y%m%d") output_excel = os.path.join(OUTPUT_DIR, f"销售数据汇总_{today_str}.xlsx") output_parquet = os.path.join(OUTPUT_DIR, f"销售数据汇总_{today_str}.parquet") df.to_excel(output_excel, index=False) df.to_parquet(output_parquet, compression="snappy", index=False) log(f"✅ Excel 已保存:{output_excel}", "SUCCESS") log(f"✅ Parquet 已保存:{output_parquet}(推荐后续使用这个格式)", "SUCCESS") # 7. 生成简要报告 report_path = os.path.join(LOG_DIR, f"数据报告_{today_str}.txt") os.makedirs(LOG_DIR, exist_ok=True) with open(report_path, "w", encoding="utf-8") as f: f.write("销售数据合并报告\n") f.write("=" * 50 + "\n") f.write(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"总行数:{len(df)}\n") f.write(f"总列数:{len(df.columns)}\n") f.write(f"列名:{list(df.columns)}\n") if "销售金额" in df.columns: valid = df["销售金额"].dropna() f.write(f"销售金额合计:{valid.sum():,.2f}\n") f.write(f"销售金额平均:{valid.mean():,.2f}\n") f.write("=" * 50 + "\n") f.write("各文件数据量:\n") if "_来源文件" in df.columns: for name, group in df.groupby("_来源文件"): f.write(f" {name}:{len(group)} 行\n") log(f"📄 报告已生成:{report_path}", "SUCCESS") log("=" * 50) log("每日合并任务完成!", "SUCCESS") if __name__ == "__main__": main()步骤4:配置定时任务
Windows 下使用"任务计划程序":
1. 打开"任务计划程序"(开始菜单搜索即可)
2. 创建基本任务,触发器选择"每天",时间设置为凌晨 2:00(确保分公司已上传完数据)
3. 操作选择"启动程序",程序路径填 python.exe,参数填你的脚本路径
4. 勾选"使用最高权限运行"
Linux/Mac 下使用 crontab:
# 每天早上 2 点运行 0 2 * * * /usr/bin/python3 /path/to/your/script.py >> /path/to/logfile.log 2>&1第九部分:单元测试 —— 确保你的数据处理管道不出错
生产级代码,必须有测试。不要等数据出问题了再排查,要在代码上线之前就发现问题。
Python 里测试数据处理函数,可以用 pytest 库。下面给你一个完整的测试示例:
""" 测试脚本:test_batch_read.py 用于测试批量读取函数的正确性 运行:pytest test_batch_read.py -v """ import pandas as pd import pytest import os import tempfile def create_test_excel(file_path, data, sheet_name="Sheet1"): """创建测试用的 Excel 文件""" df = pd.DataFrame(data) df.to_excel(file_path, sheet_name=sheet_name, index=False) def test_normalize_columns(): """测试列名标准化函数""" df = pd.DataFrame({ "客户": ["张三", "李四"], "金额": [100, 200], "日期": ["2024-01-01", "2024-01-02"] }) df = normalize_columns(df) # 断言:列名应该被标准化 assert "客户名称" in df.columns assert "销售金额" in df.columns assert "日期" in df.columns # 旧列名应该不存在了 assert "客户" not in df.columns assert "金额" not in df.columns def test_clean_chinese_number(): """测试中文数值清洗函数""" assert clean_chinese_number("1000") == 1000.0 assert clean_chinese_number("1.5万") == 15000.0 assert clean_chinese_number("2.3亿") == 230000000.0 assert clean_chinese_number("¥1,000") == 1000.0 assert pd.isna(clean_chinese_number("暂无")) assert pd.isna(clean_chinese_number(None)) def test_read_single_file(tmp_path): """测试读取单个文件(使用临时目录)""" # 创建测试文件 test_file = tmp_path / "测试.xlsx" create_test_excel(test_file, { "客户名称": ["张三"], "销售金额": [100] }) df, error = read_single_file(str(test_file)) assert df is not None assert error is None assert len(df) == 1 assert "客户名称" in df.columns def test_batch_read_excel(): """测试批量读取(使用临时目录和测试文件)""" with tempfile.TemporaryDirectory() as tmp_dir: # 创建3个测试文件 for i in range(3): file_path = os.path.join(tmp_dir, f"测试_{i}.xlsx") create_test_excel(file_path, { "客户名称": [f"客户{i}"], "销售金额": [100 * (i + 1)] }) # 批量读取 df = batch_read_excel(tmp_dir, add_source=True) assert df is not None assert len(df) == 3 assert "_来源文件" in df.columns if __name__ == "__main__": pytest.main(["-v", __file__])为什么需要测试?
因为你写的代码,六个月后你自己都忘了当初为什么这样写。有了测试,当你修改了某处代码后,运行一下测试,如果通过了,说明你的修改没有破坏原有功能;如果失败了,测试会精确告诉你哪里的逻辑出错了。
第十部分:常见问题 FAQ(收藏版)
Q1:读取时报错 "File is not a ZIP file" 怎么办?
A:这个文件可能根本不是真正的 .xlsx 文件(可能只是被改了后缀名),或者文件已损坏。用 openpyxl.load_workbook() 单独测试这个文件,确认是否能正常打开。另外,检查文件是不是 .xls 格式(Excel 97-2003),如果是,需要安装 xlrd 库。
# 检查文件是否是有效的 ZIP(.xlsx 本质上是 ZIP) import zipfile try: with zipfile.ZipFile("可疑文件.xlsx", 'r') as z: print("有效的 .xlsx 文件,包含以下文件:") print(z.namelist()) except zipfile.BadZipFile: print("❌ 不是有效的 .xlsx 文件(可能已损坏,或者根本不是 Excel 文件)")Q2:读取速度太慢,怎么加速?
A:1. 先用 usecols 参数只读取需要的列;2. 保存为 parquet 格式后后续直接读 parquet;3. 如果数据量真的非常大,考虑用 dask 库(pandas 的分布式版本);4. 用多进程并行读取(见第六部分策略三)。
# 加速技巧1:只读取需要的列 df = pd.read_excel(file_path, usecols=["客户名称", "销售金额", "日期"]) # 加速技巧2:跳过不需要的行 df = pd.read_excel(file_path, skipfooter=10) # 跳过末尾10行 # 加速技巧3:指定数据类型(避免 pandas 自动推断,节省时间) df = pd.read_excel(file_path, dtype={ "客户名称": "category", # category 类型比 object 更省内存 "销售金额": "float32", "数量": "int32" })Q3:合并后内存不够用怎么办?
A:使用分块处理策略(见第六部分),或者升级内存。另外,确保在合并后及时删除不需要的临时变量:
import gc # 合并完成后 del df_list # 删除临时列表,释放内存 gc.collect() # 强制垃圾回收 # 更进一步:只保留需要的列 if "临时计算列" in df.columns: df = df.drop(columns=["临时计算列"])Q4:如何处理有密码保护的 Excel 文件?
A:pandas 不能直接读取有密码保护的 Excel 文件。需要先手动解除密码,或者用 msoffcrypto-tool 库在代码中解密:
# 需要先安装:pip install msoffcrypto-tool import msoffcrypto with open("加密文件.xlsx", "rb") as f_in: with open("解密文件.xlsx", "wb") as f_out: office_file = msoffcrypto.OfficeFile(f_in) office_file.load_key(password="你的密码") office_file.decrypt(f_out) # 然后再用 pandas 读取解密后的文件 df = pd.read_excel("解密文件.xlsx")Q5:如何让代码在后台自动定时运行?
A:可以用 Windows 任务计划程序(Task Scheduler)或 Linux 的 crontab,配置每天自动运行合并脚本。(详见第八部分步骤4)
Q6:合并后有些行数据是重复的,怎么处理?
A:用 drop_duplicates(),可以指定按哪些列去重:
# 按所有列去重 df = df.drop_duplicates() # 按指定列去重(只保留第一次出现的数据) df = df.drop_duplicates(subset=["客户名称", "日期"], keep="first") # 按指定列去重(只保留最后一次出现的数据) df = df.drop_duplicates(subset=["客户名称", "日期"], keep="last")Q7:如何追溯每一行数据来自哪个原始文件?
A:在读取每个文件后,添加来源列(我们的生产级代码已经做了这个):
df["_来源文件"] = file_name df["_来源路径"] = file_path # 合并后,你可以这样追溯: # 找出所有来自"北京分公司"的数据 beijing_data = df[df["_来源文件"].str.contains("北京")] # 统计每个文件贡献了多少行 file_counts = df["_来源文件"].value_counts()Q8:Excel 文件里的公式怎么办?读取后是公式本身还是计算值?
A:默认情况下,pd.read_excel() 读取的是计算后的值,而不是公式本身。如果你需要读取公式,需要用 openpyxl 的 data_only=False 模式(但这样读取到的是公式字符串,不是计算结果)。
Q9:如何处理合并单元格?
A:合并单元格用 openpyxl 读取时,只有合并区域的第一个单元格有值,其余单元格为 NaN。可以用 ffill()(前向填充)来处理:
df = pd.read_excel(file_path) # 对"客户名称"列做前向填充(合并单元格的情况) df["客户名称"] = df["客户名称"].fillna(method="ffill") # 或者更安全的做法:只对连续的 NaN 填充(避免把不同客户的空值也填充了) df["客户名称"] = df.groupby(df["客户名称"].notna().cumsum())["客户名称"].ffill()Q10:如何处理超大文件(百万行以上)?
A:考虑以下方案:
1. 转成 CSV + 用 chunksize 分块读取
2. 直接用 dask.dataframe(pandas 的分布式版本,API 几乎完全兼容)
3. 存到数据库(如 MySQL/PostgreSQL),然后用 SQL 查询
4. 用 openpyxl 的 read_only 模式(见第六部分策略一)
# 使用 dask 处理超大文件(安装:pip install dask) import dask.dataframe as dd # dask 的 API 和 pandas 几乎一样,但它是惰性计算的(不会立即执行,节省内存) df = dd.read_csv("超大文件.csv") # 注意:dask 不支持直接读 Excel,需要先转 CSV result = df.groupby("客户名称")["销售金额"].sum().compute() # .compute() 才会真正执行 print(result)第十一部分:总结与学习路线
这篇文章,我们从最基础的三行代码,一步步走到生产级完整脚本,覆盖了:
✅ glob 批量获取文件列表(支持递归子目录、通配符详解)
✅ pandas 读取 Excel 的完整参数解析(12个核心参数详解)
✅ 智能 Sheet 读取(单 Sheet 失败自动尝试多 Sheet)
✅ 列名标准化(别名映射表,维护一处全局生效)
✅ 数据类型清洗(金额去货币符号、处理"万/亿"单位、日期转 datetime)
✅ 空行处理、重复行处理、合并单元格处理
✅ 来源追溯(哪个文件、哪行数据,一查便知)
✅ 失败文件记录(不会静默失败,失败原因清晰可见)
✅ 性能优化(parquet 格式、多进程并行、分块读取)
✅ 完整日志系统(每步都有时间戳,出了问题好追溯)
✅ 真实企业案例完整落地流程(从需求到上线)
✅ 单元测试基础(用 pytest 确保你的代码不出错)
✅ 技术选型决策树(什么时候用 Python,什么时候不用)
下一步学习方向
如果你把这篇文章的内容都掌握了,你已经具备了处理90%的 Excel 批量合并场景的能力。接下来,我建议你按这个顺序继续学习:
第1步:数据清洗与转换(本系列第3章)
合并完数据,下一步就是清洗:处理空值、重复值、异常值,字符串清洗,日期格式统一。这是数据分析师的核心竞争力。
第2步:数据分析与统计(本系列第4章)
数据干净了,就可以做分析了:分组统计(groupby)、数据透视表(pivot_table)、相关性分析等。
第3步:报表自动化与可视化(本系列第5章)
分析完了,需要输出报表:用 openpyxl 生成带格式的 Excel 报表,用 matplotlib 生成图表。
第4步:把你的代码包装成一个小工具
用 tkinter 或 PyQt 做一个带界面的小工具,让不懂 Python 的同事也能用。这才是"数据分析师的终极技能"——让工具为你工作,而不是你为工具工作。
🔔 关注公众号,回复「合并工具」领取:本文完整代码(.py文件)+ 测试数据生成脚本 + 常见错误处理速查表 + parquet格式性能对比报告
(点击公众号底部菜单 → 领取资料)
如果这篇文章对你有帮助,请点个「在看」,让更多人看到。有任何问题,欢迎在评论区留言,我会一一回复。
夜雨聆风