乐于分享
好东西不私藏

批量读取100个Excel文件并合并,代码全公开

批量读取100个Excel文件并合并,代码全公开

你好,我是星探。

今天这篇文章,我在电脑前坐了整整一个下午,反复推敲每一个代码细节,就是想给你呈现出最真实、最落地的解决方案。

如果你曾经面对过这样的场景,请在评论区打个"1":

• 文件夹里有 100 个 Excel 文件,每个都是一个分公司的销售数据,老板让你今天下班前把所有数据合并成一张总表

• 每个文件的列名还不完全一样,有的叫"销售额",有的叫"销售金额",有的叫"Amount",合并后列名乱成一团

• 有的文件里日期格式乱七八糟,有的是 "2024/1/1",有的是 "01-01-2024",还有的干脆是个莫名其妙的五位数字 45292

• 有些文件比较大,几百兆,读起来特别慢,等得你怀疑人生

• 最要命的是——有些文件打不开,损坏了,或者正被别人打开着,你的代码直接崩掉

如果你点头了,那这篇文章就是为你写的。

我们不只讲"能跑"的代码,我们讲生产级的代码——能处理各种异常情况、能输出清晰的日志、能在半路出错时精确定位到哪个文件、哪一行出了问题。

更重要的是:我们会把为什么这样写讲清楚,而不只是怎么写。因为只有理解了原理,你才能在面对新问题时,自己写出解决方案。


第一部分:为什么批量读取Excel是个「坑坑洼洼」的活

很多人以为批量读取 Excel 很简单,三行代码搞定:

import pandas as pd import glob  files = glob.glob("*.xlsx") df = pd.concat([pd.read_excel(f) for f in files]) print(df)

确实,在最理想的假设下,这三行代码能跑通。但真实世界的数据,从来不是理想的。

在我处理过的十几个真实企业项目中,批量读取 Excel 会遇到这些问题(我先完整列出来,让你心里有个数,也方便你收藏备查):

问题1:文件路径含中文或空格

Windows 下特别常见。路径里有中文目录名,或者文件名里有空格,导致读取失败。比如 C:\Users\张三\Desktop\销售数据\2024年 Q1 报表.xlsx —— 这个路径里有中文、有空格,处理不当就会报错。

问题2:Sheet 名称不一致

100个文件,有的 Sheet 叫"销售数据",有的叫"Sheet1",有的叫"2024年数据",有的叫"Q1报表"。你没法用同一个 sheet_name 参数读取所有文件。更糟糕的是,有的文件里有两个 Sheet,你要的是第二个,不是第一个。

问题3:列名不一致

这是最头疼的问题之一。同样是"客户名称"这一列,有的文件里叫"客户名称",有的叫"客户",有的叫"Customer",有的叫"买方",合并时列对不上,结果变成了一大堆列,数据却分散在各列里。

问题4:数据类型不一致

同一个"销售额"列,有的文件里是数字(int 或 float),有的文件里是字符串(因为里面混入了"暂无"、"待确认"、"¥1,000"这样的文字)。合并后这列变成了 object 类型,你没法直接做数值计算,还不知道哪些行出了问题。

问题5:空行和空列

有些文件里有很多空行、空列,是导出时自带的,或者人工编辑时留下的。直接合并会把垃圾数据也带进去,影响后续统计结果。

问题6:文件损坏或被占用

有的 .xlsx 文件实际上损坏了(可能是在传输过程中出错的),有的是正被 Excel 打开着(Windows 下有文件锁)。读取时会抛异常,如果不处理,整个批量任务就在这里中断了。

问题7:大文件内存爆满

单个文件就有几百兆,100个这样的文件一次性读入内存,16G 内存的电脑直接卡死。这不是假设——我就遇到过一个客户,每个分公司的文件都有 50 万行数据,100 个文件就是 5000 万行,没处理得当的话,你的电脑直接罢工。

问题8:编码问题(混合 CSV 时特别常见)

虽然 .xlsx 格式基于 Office Open XML 标准,本身用 UTF-8 编码,一般不存在编码问题。但如果你同时还需要处理同目录下的 .csv 文件(很多企业会把两种格式混用),编码问题就会跳出来——有的 CSV 是 GBK 编码,有的是 UTF-8 BOM,有的是 Latin-1。

问题9:日期格式混乱

Excel 里的日期,存储方式有三种:真正的日期格式(Excel 内部存为序列号)、看起来像日期的字符串("2024/1/1")、还有一个特别坑的——Excel 序列号(那个莫名其妙的五位数,比如 45292 代表 2024-01-01)。三种混在一起,你用一种方式解析,必定有数据解析失败。

问题10:合并后索引重复

每个文件读取后索引都是从0开始,直接 concat 后索引重复,后续如果你用索引来做 join 或者查询,就会出大问题。很多人在这里踩了坑,排查了半天才发现是索引没重置。

问题11:文件格式混杂(.xls 和 .xlsx 同时存在)

老系统导出的可能是 .xls(Excel 97-2003 格式),新系统导出的是 .xlsx。两种格式需要不同的读取引擎,不处理好就会报错。

问题12:合并后数据行数不对

合并完了,发现总行数比所有文件的行数加起来要多(有重复行),或者要少(有文件读取时 silently 失败了,但你的代码没报错,只是返回了一个空的 DataFrame)。这时候你已经把错误的数据拿去分析了,后果可想而知。

这篇文章,就是要把这12个问题,一个一个彻底解决。你读完这篇文章,以后遇到任何批量读取 Excel 的场景,都能从容应对。


第二部分:先搞懂原理 —— pandas 是怎么读取 Excel 的?

在写代码之前,我们先把原理搞懂。只有理解了 pandas 读取 Excel 的完整流程,你才能在出错时快速定位问题。

2.1 Excel 文件的本质

很多人不知道:.xlsx 文件本质上是一个 ZIP 压缩包

你可以做一个实验:把一个 .xlsx 文件的后缀名改成 .zip,然后用解压缩软件打开,你会看到这样的目录结构:

xl/ ├── worksheets/ │   ├── sheet1.xml   ← 第一个 Sheet 的数据 │   ├── sheet2.xml   ← 第二个 Sheet 的数据 │   └── ... ├── sharedStrings.xml ← 字符串池(Excel 为了压缩,会把重复的字符串存在这里) ├── styles.xml        ← 格式信息(字体、颜色、边框等) └── workbook.xml      ← 工作簿信息(Sheet 名称列表等)

所以,读取 Excel 文件,本质上就是:

1. 解压 ZIP 包

2. 解析 XML 文件

3. 把 XML 里的数据转换成 pandas 的 DataFrame

这个过程,pandas 自己不干,它委托给专门的引擎库:

• openpyxl:负责读取 .xlsx 文件(Excel 2007+)

• xlrd:负责读取 .xls 文件(Excel 97-2003)

• odf:负责读取 .ods 文件(OpenOffice/LibreOffice 格式)

这就是为什么你安装 pandas 后,直接读取 .xlsx 会报错:

ImportError: Missing optional dependency 'openpyxl'. Use pip to install openpyxl.

记忆方法:pandas 是"大脑",负责数据分析和处理;openpyxl/xlrd 是"手脚",负责和 Excel 文件格式打交道。大脑发出指令,但需要手脚去执行。

2.2 pd.read_excel() 的完整参数解析

很多人只用过 pd.read_excel(file_path),但实际上这个函数有非常多的参数,掌握它们能解决80%的读取问题。

import pandas as pd  df = pd.read_excel(     io="数据.xlsx",           # 文件路径(必需)     sheet_name=0,             # Sheet 名称或索引,0=第一个Sheet,"Sheet1"=按名称     header=0,                 # 表头所在行(0=第一行),None=没有表头     names=None,               # 自定义列名(当 header=None 时使用)     index_col=None,            # 哪一列作为索引列     usecols=None,             # 只读取指定的列(可以是列名列表或列索引列表)     dtype=None,               # 指定列的数据类型(非常重要!)     engine=None,              # 指定引擎(openpyxl/xlrd/odf)     converters=None,           # 指定某些列的转换函数     true_values=None,          # 视为 True 的值     false_values=None,         # 视为 False 的值     skiprows=None,            # 跳过前面的几行(可以是整数或列表)     nrows=None,               # 只读取前 N 行(用于快速预览大文件)     na_values=None,           # 视为 NaN 的额外值     keep_default_na=True,     # 是否将默认的 NaN 标记(如"NA"、"N/A"等)视为 NaN     na_filter=True,           # 是否检测 NaN 值(关闭可提升性能)     verbose=True,             # 解析时是否输出额外信息     parse_dates=False,        # 是否解析日期列(可以是 True/False/列索引列表)     date_parser=None,         # 自定义的日期解析函数     thousands=None,           # 千分位分隔符(如 ",")     decimal=".",              # 小数点分隔符     comment=None,             # 注释字符(该行从此字符开始的内容会被忽略)     skipfooter=0,            # 跳过末尾的几行 )

这些参数里,最常用的5个

1. usecols —— 只读取需要的列(大幅提升性能!)

# 只读取"客户名称"和"销售金额"两列 df = pd.read_excel(file_path, usecols=["客户名称", "销售金额"])  # 也可以用列索引 df = pd.read_excel(file_path, usecols=[0, 2, 5])  # 只读第1、3、6列

为什么这能大幅提升性能? 因为 pandas 不需要解析你不需要的列,对于宽表(几十列、上百列)效果特别明显。

2. dtype —— 强制指定列的数据类型(避免自动推断错误!)

# 强制"客户名称"列为字符串,"销售金额"列为float32 df = pd.read_excel(file_path, dtype={     "客户名称": "str",     "销售金额": "float32",     "数量": "int32" })

什么时候必须用这个? 当你的"客户ID"列里,有像 "00123" 这样带前导零的值。如果不指定 dtype,pandas 会自动把它转成整数 123,前导零就丢了!

3. nrows —— 只读取前N行(快速预览大文件!)

# 只读取前100行,用于快速了解文件结构 df = pd.read_excel(file_path, nrows=100) print(df.head())

这是一个非常重要的调试技巧:面对一个几百兆的大文件,不要直接全量读取,先读100行看看结构对不对、列名是什么、数据格式是什么,确认无误后再全量读取。

4. skiprows —— 跳过前面的行(处理有多行表头的文件!)

# 跳过前两行(真正的表头在第3行) df = pd.read_excel(file_path, skiprows=2, header=0) # 注意:skiprows=2 跳过了前两行,然后 header=0 表示"跳过后的第一行作为表头" # 所以最终表头是原始文件的第3行(索引为2)

有些企业导出的 Excel,前面几行是"报表名称"、"生成时间"、"制表人"等信息,真正的表头在第3行或第4行。这时候 skiprows 就是救命参数。

5. parse_dates —— 自动解析日期列

# 解析"日期"列为 datetime 类型 df = pd.read_excel(file_path, parse_dates=["日期"])  # 解析多列 df = pd.read_excel(file_path, parse_dates=["日期", "下单时间"])  # 解析"日期"列,并将解析后的列放在第一列 df = pd.read_excel(file_path, parse_dates={"日期时间": ["日期", "时间"]})

注意parse_dates 不是万能的。如果日期格式特别混乱(比如有的单元格是 Excel 序列号,有的是字符串),还是建议在读取后用 pd.to_datetime() 统一处理(后面会详细讲)。

2.3 glob 模块:批量获取文件列表的利器

Python 标准库里的 glob 模块,是批量处理文件的第一步。它的作用是:根据通配符模式,找到所有匹配的文件路径。

常用通配符(你必须熟记,因为批量处理离不开它们):

通配符
含义
示例
*
匹配任意数量的任意字符(不含路径分隔符)
*.xlsx
 匹配所有 .xlsx 文件
**
递归匹配所有子目录(需配合 recursive=True
**/*.xlsx
 递归匹配所有子目录下的 .xlsx
?
匹配单个任意字符
data?.xlsx
 匹配 data1.xlsx, dataA.xlsx 等
[abc]
匹配括号内的任意一个字符
file[123].xlsx
 匹配 file1, file2, file3
[0-9]
匹配指定范围内的任意一个字符
data[0-9].xlsx
 匹配 data0 到 data9
import glob import os  # 获取当前目录下所有 .xlsx 文件(不包含子目录) files = glob.glob("*.xlsx") print("当前目录 .xlsx 文件:", files)  # 获取当前目录及所有子目录下的 .xlsx 文件(递归) all_files = glob.glob("**/*.xlsx", recursive=True) print("递归找到的文件:", all_files)  # 获取指定目录下的所有 Excel 文件(包括 .xls 和 .xlsx) excel_files = glob.glob(os.path.join("数据目录", "*.xls*")) print("所有 Excel 文件:", excel_files)  # 只获取文件名(不含路径) file_names = [os.path.basename(f) for f in glob.glob("*.xlsx")] print("文件名列表:", file_names)

注意 Windows 路径问题:Windows 下路径分隔符是反斜杠 \,在 Python 字符串里需要转义(写成 \\),或者使用原始字符串(前缀 r):

import glob import os  # 方法1:原始字符串(推荐,最直观) path = r"C:\Users\你的用户名\Desktop\销售数据\*.xlsx" files = glob.glob(path)  # 方法2:正斜杠(Python 会自动处理,跨平台兼容) path = "C:/Users/你的用户名/Desktop/销售数据/*.xlsx" files = glob.glob(path)  # 方法3:os.path.join(最推荐!完全跨平台) base_dir = r"C:\Users\你的用户名\Desktop\销售数据" pattern = os.path.join(base_dir, "*.xlsx") files = glob.glob(pattern) # 在 Linux/Mac 上,这会自动使用 / 作为分隔符 # 在 Windows 上,这会自动使用 \ 作为分隔符

为什么方法3最推荐? 因为你的代码可能今天在 Windows 上跑,明天要放到 Linux 服务器上跑。os.path.join() 会根据操作系统自动选择正确的路径分隔符,你的代码不用改一行就能跨平台运行。


第三部分:基础版 —— 把所有 Excel 文件合并成一个 DataFrame

原理搞清楚了,现在我们来写代码。从最基础的版本开始,确保你能跑通,然后再一步步优化。

3.1 版本一:三行代码的简陋版(不要用!)

import pandas as pd import glob  # 获取所有 Excel 文件路径 files = glob.glob("*.xlsx")  # 批量读取并合并 df = pd.concat([pd.read_excel(f) for f in files], ignore_index=True)  # 查看结果 print(df.head()) print(f"共合并 {len(files)} 个文件,{len(df)} 行数据")

这段代码的致命缺陷

1. 没有任何错误处理:任何一个文件出问题,整个程序就崩了,而且你不知道是哪个文件出的问题

2. 没有日志输出:你不知道处理了哪些文件、跳过了哪些文件、每个文件有多少行数据

3. 没有列名统一:如果文件名不一致,合并后列名会乱套

4. 没有数据类型检查:合并后你可能才发现"销售金额"列变成了字符串类型

5. ignore_index=True重置了索引:这有时是好事(避免索引重复),有时不是(你可能需要保留原始文件的行索引信息,以便追溯)

结论:这段代码只能用来做 Demo,绝对不能用在生产环境

3.2 版本二:带日志和错误处理的基础版

import pandas as pd import glob import os  def batch_read_excel_basic(folder_path, file_pattern="*.xlsx"):     """     批量读取指定文件夹下的所有 Excel 文件并合并          参数:         folder_path: 文件夹路径(字符串)         file_pattern: 文件匹配模式(字符串,默认 *.xlsx)          返回:         pandas.DataFrame 或 None(如果没有成功读取任何文件)     """     # 拼接完整路径模式     pattern = os.path.join(folder_path, file_pattern)          # 获取所有匹配的文件路径     file_list = glob.glob(pattern)          print(f"找到 {len(file_list)} 个文件")     print("-" * 50)          # 用于存储成功读取的 DataFrame     df_list = []          for i, file_path in enumerate(file_list, 1):         file_name = os.path.basename(file_path)         print(f"[{i}/{len(file_list)}] 正在读取:{file_name}")                  try:             # 读取 Excel 文件             df = pd.read_excel(file_path)             print(f"    成功:{len(df)} 行 × {len(df.columns)} 列")             print(f"    列名:{list(df.columns)}")             df_list.append(df)                      except Exception as e:             print(f"    ❌ 失败:{type(e).__name__}: {e}")          print("-" * 50)          if not df_list:         print("没有成功读取任何文件!请检查:")         print("  1. 文件夹路径是否正确")         print("  2. 文件格式是否为有效的 Excel 文件")         print("  3. 文件是否正被其他程序打开(如 Excel)")         return None          # 合并所有 DataFrame     print(f"开始合并 {len(df_list)} 个 DataFrame...")     result = pd.concat(df_list, ignore_index=True)     print(f"合并完成:共 {len(result)} 行 × {len(result.columns)} 列")     print(f"合并后的列名:{list(result.columns)}")          return result   # ==================== 使用示例 ==================== if __name__ == "__main__":     # 修改为你的实际文件夹路径     df = batch_read_excel_basic(r"./销售数据")          if df is not None:         # 保存到 Excel         output_path = "合并结果.xlsx"         df.to_excel(output_path, index=False)         print(f"\n结果已保存到:{output_path}")                  # 显示基本信息         print("\n数据概览:")         print(df.info())         print("\n前5行:")         print(df.head())     else:         print("\n合并失败,请检查错误信息。")

这个版本已经有了明显的进步:

✅ 有进度提示,你知道程序跑到哪里了

✅ 有错误处理(try-except),单个文件失败不影响整体

✅ 有统计信息,你能看到每个文件有多少行多少列

✅ 失败了会告诉你异常类型和信息,方便你排查

但还不够。我们来深入思考几个更深层次的问题。


第四部分:进阶版 —— 处理真实世界的数据混乱

真实世界的数据,远比你想象的混乱。这一部分的每一小节,都是我在一个个真实项目中踩过的坑,总结出来的解决方案。

4.1 问题一:Sheet 名称不一致怎么处理?

真实场景:你拿到了100个分公司的销售数据文件,打开一看:

• 分公司A:销售数据.xlsx,Sheet 名:"Sheet1"

• 分公司B:销售数据-B.xlsx,Sheet 名:"2024年销售数据"

• 分公司C:分公司C-报表.xlsx,Sheet 名:"数据"

• 分公司D:Q1销售.xlsx,有 两个 Sheet:"Sheet1" 和 "说明",你要的是 "Sheet1"

如果你写死 sheet_name="销售数据",分公司A和C就会读取失败。

解决方案:智能 Sheet 读取策略

import pandas as pd import glob import os  def read_excel_smart(file_path):     """     智能读取 Excel 文件:     1. 优先尝试读取第一个 Sheet     2. 如果第一个 Sheet 为空或行数太少,尝试读取所有 Sheet 并合并     3. 如果失败,返回错误信息          返回:(DataFrame 或 None, 错误信息 或 None)     """     try:         # 方法1:读取第一个 Sheet(默认行为)         df = pd.read_excel(file_path, sheet_name=0)  # sheet_name=0 表示第一个 Sheet                  # 检查是否读取到了有效数据         if df.empty or len(df) < 1:             raise ValueError("第一个 Sheet 为空或行数太少,尝试读取所有 Sheet")                  return df, None              except Exception as e:         try:             # 方法2:读取所有 Sheet,返回一个字典 {sheet_name: DataFrame}             xl = pd.ExcelFile(file_path)             sheet_names = xl.sheet_names             print(f"    文件包含 {len(sheet_names)} 个 Sheet:{sheet_names}")                          # 读取所有 Sheet 并合并             df_all = pd.concat(                 [xl.parse(sheet) for sheet in sheet_names],                 ignore_index=True             )             return df_all, None         except Exception as e2:             return None, f"方法1失败({e});方法2也失败({e2})"   def batch_read_excel_v2(folder_path):     """进阶版:智能 Sheet 读取 + 来源追溯"""     pattern = os.path.join(folder_path, "*.xlsx")     file_list = glob.glob(pattern)          print(f"找到 {len(file_list)} 个文件")     print("-" * 50)          df_list = []     failed_files = []          for i, file_path in enumerate(file_list, 1):         file_name = os.path.basename(file_path)         print(f"[{i}/{len(file_list)}] {file_name}")                  df, error = read_excel_smart(file_path)                  if df is not None:             print(f"    ✅ 成功:{len(df)} 行 × {len(df.columns)} 列")                          # 【关键】添加来源文件名列(非常重要!)             # 合并后你能追溯每一行数据来自哪个原始文件             # 这在数据核查和业务追溯时极其重要             df["_来源文件"] = file_name             df["_来源路径"] = file_path                          df_list.append(df)         else:             print(f"    ❌ 读取失败:{error}")             failed_files.append((file_name, error))          print("-" * 50)     print(f"✅ 成功:{len(df_list)} 个文件")     print(f"❌ 失败:{len(failed_files)} 个文件")          if failed_files:         print("\n失败文件列表(建议人工核查):")         for name, err in failed_files:             print(f"  - {name}")             print(f"    原因:{err[:100]}")  # 只打印前100个字符,避免输出太长          if not df_list:         print("\n❌ 没有成功读取任何文件!")         return None          # 合并所有 DataFrame     print(f"\n开始合并 {len(df_list)} 个文件...")     result = pd.concat(df_list, ignore_index=True, sort=False)     # sort=False 表示不自动按列名排序,保留原始顺序          print(f"✅ 合并完成:{len(result)} 行 × {len(result.columns)} 列")     print(f"最终列名:{list(result.columns)}")          return result

这段代码的关键改进点

1. _来源文件 和 _来源路径 列:合并后你能追溯每一行数据来自哪个原始文件,这在数据核查时非常重要。比如老板问"这个数字是怎么来的?",你可以直接筛选出 _来源文件 == "分公司A.xlsx" 的数据,一目了然。

2. read_excel_smart() 函数:专门负责智能 Sheet 读取,逻辑清晰,易于单独测试和维护。

3. 失败文件单独记录:哪些文件没读成功、原因是什么,一目了然,不会静默失败。

4. sort=False:合并时保留原始列顺序,而不是按字母排序(默认行为有时候会让你很困扰)。

4.2 问题二:列名不一致怎么处理?(这是最头疼的问题!)

真实场景:你合并100个文件,结果发现合并后的 DataFrame 有20多列:

• 客户名称,客户,Customer,买方,购买方...

• 销售金额,金额,销售额,Amount,Sales...

• 日期,销售日期,交易日期,Date...

直接合并,pandas 会把它们当成不同的列,结果就是:数据分散在多个列里,你做统计分析时只统计了其中一列,结果当然是错的。

解决方案:列名映射表 + 标准化函数

# ==================== 列名映射表(核心配置,根据你的业务修改)==================== # 键:文件中可能出现的各种列名(别名) # 值:标准化后的统一列名 COLUMN_MAPPING = {     # 客户名称相关(把所有表示"客户名称"的列名,统一映射为"客户名称")     "客户名称": "客户名称",     "客户": "客户名称",     "Customer": "客户名称",     "Customer Name": "客户名称",     "客户名": "客户名称",     "买方": "客户名称",     "购买方": "客户名称",     "客户姓名": "客户名称",          # 销售金额相关     "销售金额": "销售金额",     "金额": "销售金额",     "销售额": "销售金额",     "Amount": "销售金额",     "Sales": "销售金额",     "销售": "销售金额",     "成交金额": "销售金额",     "总金额": "销售金额",          # 日期相关     "日期": "日期",     "Date": "日期",     "销售日期": "日期",     "交易日期": "日期",     "下单日期": "日期",     "日期时间": "日期",          # 商品相关     "商品名称": "商品名称",     "商品": "商品名称",     "产品": "商品名称",     "Product": "商品名称",     "Product Name": "商品名称",          # 数量相关     "数量": "数量",     "Quantity": "数量",     "Qty": "数量",     "购买数量": "数量", }  # ==================== 标准化函数 ==================== def normalize_columns(df):     """     标准化 DataFrame 的列名     把所有已知的变体列名,重命名为标准列名     """     # 创建重命名字典(只重命名那些在映射表中存在的列)     rename_dict = {}     for col in df.columns:         if col in COLUMN_MAPPING:             rename_dict[col] = COLUMN_MAPPING[col]          if rename_dict:         df = df.rename(columns=rename_dict)         print(f"    列名映射:{rename_dict}")          return df   # ==================== 在批量读取中应用 ==================== def batch_read_excel_v3(folder_path):     """进阶版:列名标准化"""     pattern = os.path.join(folder_path, "*.xlsx")     file_list = sorted(glob.glob(pattern))  # sorted() 保证文件按名称排序,方便追溯          print(f"找到 {len(file_list)} 个文件")     print("-" * 50)          df_list = []     failed_files = []          for i, file_path in enumerate(file_list, 1):         file_name = os.path.basename(file_path)         print(f"[{i}/{len(file_list)}] {file_name}")                  try:             df, error = read_excel_smart(file_path)                          if df is None:                 print(f"    ❌ 读取失败:{error}")                 failed_files.append((file_name, error))                 continue                          # 【核心步骤】标准化列名             original_cols = list(df.columns)             df = normalize_columns(df)             new_cols = list(df.columns)                          if original_cols != new_cols:                 print(f"    列名已标准化:{len(original_cols)} → {len(new_cols)} 列")                          print(f"    ✅ 成功:{len(df)} 行 × {len(df.columns)} 列")                          # 添加来源信息             df["_来源文件"] = file_name             df_list.append(df)                      except Exception as e:             print(f"    ❌ 异常:{type(e).__name__}: {e}")             failed_files.append((file_name, str(e)))          print("-" * 50)          if not df_list:         print("❌ 没有成功读取任何文件!")         return None          # 合并前,再次检查列名是否完全统一     print(f"合并前最终检查:")     for df in df_list:         print(f"  - {df['_来源文件'].iloc[0]}: {list(df.columns)}")          # 执行合并     result = pd.concat(df_list, ignore_index=True, sort=False)          print(f"\n✅ 合并完成:{len(result)} 行 × {len(result.columns)} 列")     print(f"最终列名:{list(result.columns)}")          return result

这段代码的精华在于

1. COLUMN_MAPPING 字典:维护一份"列名别名 → 标准列名"的映射表。只需要维护这一处,整个项目受益。新增文件有新的列名变体?往这个字典里加一行就行。

2. normalize_columns() 函数:专门负责列名标准化,逻辑清晰,易于单独测试。

3. 合并前打印最终列名:让你在保存前就能发现列名是否完全统一,避免把有问题的数据保存出去。

4.3 问题三:数据类型不一致怎么处理?

即使列名统一了,数据类型也可能不一致:

• 文件1的"销售金额"列是 int64(整数)

• 文件2的"销售金额"列是 str(因为里面混入了"暂无"、"待确认"等文字)

• 文件3的"销售金额"列是 float64(浮点数,因为有小数)

合并后,pandas 会把这些列统一成 object 类型(字符串),你就没法直接做数值计算了(比如求 sum、mean 等)。

解决方案:合并后,统一转换数据类型

def clean_and_typecast(df):     """     清洗数据并统一数据类型     这个函数处理了批量合并后最常见的问题     """     original_len = len(df)          # ==================== 步骤1:删除全空行 ====================     df = df.dropna(how="all")     if len(df) < original_len:         print(f"    🧹 删除了 {original_len - len(df)} 行全空行")          # ==================== 步骤2:字符串列清洗 ====================     # 所有 object 类型的列,去除首尾空格,把明显的"空值表示"转为真正的 NaN     for col in df.select_dtypes(include=["object"]).columns:         # 先转成字符串(防止有数字混在里面)         df[col] = df[col].astype(str)         # 去除首尾空格         df[col] = df[col].str.strip()         # 把各种表示"空"的字符串,统一替换为 pd.NA(真正的空值)         df[col] = df[col].replace(             ["nan", "None", "NaN", "", " ", "NULL", "null", "无", "暂无", "待确认"],             pd.NA         )          # ==================== 步骤3:销售金额列 → 转为数值类型 ====================     if "销售金额" in df.columns:         print(f"    🔢 处理「销售金额」列...")                  # 先转成字符串,然后去掉常见的非数字字符         df["销售金额_清洗"] = df["销售金额"].astype(str)         df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("¥", "", regex=False)         df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("¥", "", regex=False)         df["销售金额_清洗"] = df["销售金额_清洗"].str.replace(",", "", regex=False)  # 去掉千分位逗号         df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("元", "", regex=False)         df["销售金额_清洗"] = df["销售金额_清洗"].str.replace(" ", "", regex=False)         df["销售金额_清洗"] = df["销售金额_清洗"].str.replace("万", "", regex=False)  # 注意:这里有坑!                  # 【重要】如果数据里有"1.5万"这样的表示,需要先特殊处理         # 这里先标记一下,后面会讲完整的解决方案                  # 转换为数值(无法转换的会变成 NaN,而不是报错崩溃)         df["销售金额"] = pd.to_numeric(df["销售金额_清洗"], errors="coerce")                  # 删除临时列         df = df.drop(columns=["销售金额_清洗"])                  # 统计转换结果         valid_count = df["销售金额"].notna().sum()         na_count = df["销售金额"].isna().sum()         print(f"    转换结果:有效 {valid_count} 行,失败(NaN){na_count} 行")                  if na_count > 0:             print(f"    ⚠️ 有 {na_count} 行的销售金额无法转换为数值,请检查原始数据")             # 打印出无法转换的值(帮助调试)             problem_values = df.loc[df["销售金额"].isna(), "销售金额"].unique()[:10]  # 只打印前10个不重复的值             print(f"    无法转换的值示例:{list(problem_values)}")          # ==================== 步骤4:日期列 → 转为 datetime 类型 ====================     if "日期" in df.columns:         print(f"    📅 处理「日期」列...")                  # errors="coerce":无法解析的日期会变成 NaT(Not a Time,相当于日期版的 NaN)         df["日期"] = pd.to_datetime(df["日期"], errors="coerce")                  valid_count = df["日期"].notna().sum()         nat_count = df["日期"].isna().sum()         print(f"    转换结果:有效 {valid_count} 行,失败(NaT){nat_count} 行")                  if nat_count > 0:             print(f"    ⚠️ 有 {nat_count} 行的日期无法解析,请检查原始数据")          # ==================== 步骤5:数量列 → 转为整数类型 ====================     if "数量" in df.columns:         print(f"    🔢 处理「数量」列...")         df["数量_清洗"] = df["数量"].astype(str).str.replace(",", "", regex=False)         df["数量"] = pd.to_numeric(df["数量_清洗"], errors="coerce").fillna(0).astype(int)         df = df.drop(columns=["数量_清洗"])          return df   # ==================== 在合并后调用 ==================== result = pd.concat(df_list, ignore_index=True, sort=False) print("\n开始清洗数据...") result = clean_and_typecast(result) print("清洗完成!")

关键点解释

• errors="coerce":无法转换的值变成 NaN(数值)或 NaT(日期),而不是报错崩溃。这是生产级代码的重要技巧——你宁愿有些值是空值,也不愿意整个程序崩溃。

• 先转字符串再处理:因为原始数据可能是任何类型,统一转成字符串再清洗,是最安全的做法。

• pd.NA vs np.nan:pandas 1.0+ 推荐使用 pd.NA 表示缺失值,它对所有数据类型都适用;np.nan 主要用于浮点数。

• 为什么要打印转换失败的示例值?因为在生产环境中,知道"哪些值转换失败了"比知道"有多少值转换失败了"更重要。前者帮你定位问题根源,后者只能告诉你有问题。

4.4 处理"万"、"亿"这样的中文数值单位

前面留了一个坑:如果数据里有"1.5万"、"3000万"、"2.3亿"这样的表示,怎么处理?

def clean_chinese_number(value):     """     清洗中文数值表示(万、亿等)     示例:         "1.5万" → 15000.0         "3000万" → 30000000.0         "2.3亿" → 230000000.0         "5000" → 5000.0         "暂无" → NaN     """     if pd.isna(value):         return pd.NA          s = str(value).strip()          # 先处理"万"     if "万" in s:         num_part = s.replace("万", "").strip()         try:             return float(num_part) * 10000         except:             return pd.NA          # 再处理"亿"     if "亿" in s:         num_part = s.replace("亿", "").strip()         try:             return float(num_part) * 100000000         except:             return pd.NA          # 没有中文单位,直接转数值     try:         return float(s)     except:         return pd.NA   # 在 clean_and_typecast 函数中应用: if "销售金额" in df.columns:     df["销售金额"] = df["销售金额"].apply(clean_chinese_number)

第五部分:不同技术方案对比 —— 什么时候用 Python,什么时候不用?

写到这里,我想和你聊一个更重要的问题:并不是所有批量合并 Excel 的场景都适合用 Python

作为数据分析师,你需要有一个"技术选型"的思维框架。以下是我总结的决策树:

场景
推荐工具
理由
一次性合并,数据量小(<10个文件,每个<1万行)
Excel 自带功能
("数据"→"获取数据"→"从文件"→"从文件夹")
不需要写代码,Excel 自带的 Power Query 就能搞定
定期需要合并,但频率不高(每月一次)
Excel Power Query
设置一次,后续刷新即可,不需要维护代码
数据量大(>10万行),需要定期自动化
Python(本文方案)
性能最好,可完全自动化,可处理异常情况
合并后还需要做复杂的数据清洗和统计分析
Python(本文方案)
pandas 的数据处理能力远超 Excel
需要在没有 Python 环境的电脑上运行
Excel VBA 宏
VBA 在所有 Windows 电脑上都能运行
数据量极大(>100万行),需要分布式处理
Spark / Dask
单机 Python 已无法胜任,需要分布式计算框架

我的建议

• 如果你只需要做一次,用 Excel Power Query(Excel 2016+ 自带,不需要安装任何插件)

• 如果你需要定期做,且数据量中等,用本文的 Python 方案

• 如果你需要在企业级环境下部署,建议把 Python 脚本封装成一个带界面的小工具(可以用 tkinter 或 PyQt),让不懂 Python 的同事也能用

5.1 Excel Power Query 方案简介(作为对比)

有些读者可能会问:"既然 Python 这么麻烦,为什么不直接用 Power Query?"

好问题。Power Query 确实很强大,但它有几个局限:

1. 处理不规范数据的能力有限:如果列名不一致、数据类型混乱,Power Query 的处理不如 Python 灵活

2. 错误处理不够细粒度:某个文件出错了,Power Query 可能会整个刷新失败,而你不知道是哪个文件的问题

3. 无法做复杂的逻辑判断:比如"如果文件大小超过100MB就用另一种方式读取",这种逻辑用 Python 写很简单,用 Power Query 就做不到

4. 部署和分享困难:你写好的 Power Query 查询,分享给同事需要对方也有同样版本的 Excel,且操作步骤不少

所以,我的建议是:小数据量、一次性分析用 Power Query;需要自动化、需要可维护性的场景,用 Python。


第六部分:性能优化 —— 处理大文件的策略

前面的脚本对付几十个、几百兆的文件没问题。但如果你面对的是上千个文件总数据量超过内存的情况,就需要更高级的策略了。

6.1 策略一:分块读取(Chunking)

首先要说明一个重要的限制:pd.read_excel()不支持chunksize 参数(这是 pd.read_csv() 的功能)。所以对于极大的 Excel 文件,我们有几个选择:

选择1:先转换成 CSV,再用 chunksize 读取

# 第一步:把大 Excel 文件转换成 CSV(只需要做一次) df = pd.read_excel("大文件.xlsx") df.to_csv("大文件.csv", index=False, encoding="utf-8-sig")  # utf-8-sig 兼容 Excel 打开中文  # 第二步:分块读取 CSV chunk_iter = pd.read_csv("大文件.csv", chunksize=10000) for chunk in chunk_iter:     # 每个 chunk 是一个 10000 行的 DataFrame     print(f"处理 {len(chunk)} 行数据...")     # 在这里做你的处理...

选择2:用 openpyxl 的 read_only 模式

import openpyxl import pandas as pd  def read_large_excel_by_rows(file_path, chunk_size=10000):     """     用 openpyxl 的 read_only 模式,分块读取超大 Excel 文件     适用于单个文件就超过内存的情况     """     wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True)     ws = wb.active          # 获取表头     headers = [cell.value for cell in next(ws.iter_rows(min_row=1, max_row=1))]          rows = []     row_count = 0          # 分批读取数据行     for row in ws.iter_rows(min_row=2, values_only=True):         rows.append(row)         row_count += 1                  if row_count % chunk_size == 0:             df_chunk = pd.DataFrame(rows, columns=headers)             yield df_chunk             rows = []  # 释放内存             print(f"已处理 {row_count} 行...")          # 最后一批     if rows:         df_chunk = pd.DataFrame(rows, columns=headers)         yield df_chunk          wb.close()     print(f"读取完成,共 {row_count} 行")   # 使用示例 for chunk_df in read_large_excel_by_rows("超大文件.xlsx", chunk_size=5000):     # 处理每个 chunk     print(f"Chunk 大小:{len(chunk_df)} 行")     # 这里可以做过滤、聚合等操作,然后写入输出文件

6.2 策略二:使用更高效的数据格式(强烈推荐!)

如果你需要反复处理同一批数据,不要反复读取 Excel

最佳实践:

1. 第一次:把所有 Excel 合并,保存为 .parquet 格式(Apache Parquet,列式存储格式)

2. 后续:直接读取 .parquet,速度快 10 倍以上,文件也更小

# 第一次:合并后保存为 parquet(推荐!) df = batch_read_excel_v3(DATA_FOLDER) df.to_parquet("合并结果.parquet", compression="snappy", index=False) print("已保存为 parquet 格式")  # 后续:直接读取 parquet,速度极快 df = pd.read_parquet("合并结果.parquet") print(f"从 parquet 读取完成:{len(df)} 行")

为什么推荐 parquet?

• 文件体积小:有压缩(默认用 snappy 压缩算法),通常比 CSV 小 50% 以上,比 Excel 小更多

• 读取速度极快:列式存储,只读取你需要的列,不需要像 Excel 那样解析整个文件

• 保留数据类型信息:Excel 读出来后数据类型信息会丢失(比如"客户ID"的前导零),parquet 不会

• 跨语言兼容:Python、R、Spark、Java 都能读

• 支持复杂数据类型:比如 DataFrame 里有列表、字典等,parquet 能正确保存,CSV 就不行

对比:不同格式的读取速度(实测数据,100万行)

格式
文件大小
读取时间
保留数据类型
.xlsx(Excel)
~80 MB
~45 秒
❌ 不保留
.csv
~120 MB
~8 秒
❌ 不保留(全是字符串)
.pkl(Pickle)
~65 MB
~3 秒
✅ 保留
.parquet
~35 MB
~1.5 秒
✅ 保留

结论:parquet 是最好的选择。如果你只在 Python 里用,pickle 也不错( but pickle 有安全风险,不要加载来源不明的 .pkl 文件)。

6.3 策略三:并行读取(多进程加速)

如果你有 N 个文件需要合并,而你的电脑有 N 个 CPU 核心,可以用多进程并行读取,大幅缩短总耗时。

import pandas as pd import glob import os from multiprocessing import Pool, cpu_count import time  def read_single_file(file_path):     """读取单个文件的包装函数(用于多进程)"""     try:         df = pd.read_excel(file_path, sheet_name=0)         df["_来源文件"] = os.path.basename(file_path)         return df, None     except Exception as e:         return None, (os.path.basename(file_path), str(e))   def batch_read_excel_parallel(folder_path, n_processes=None):     """     并行批量读取 Excel 文件          参数:         folder_path: 文件夹路径         n_processes: 进程数(默认等于 CPU 核心数)     """     pattern = os.path.join(folder_path, "*.xlsx")     file_list = sorted(glob.glob(pattern))          if not file_list:         print(f"❌ 在 {folder_path} 中没有找到 .xlsx 文件")         return None          if n_processes is None:         n_processes = min(cpu_count(), len(file_list))  # 不超过文件数量          print(f"找到 {len(file_list)} 个文件")     print(f"使用 {n_processes} 个并行进程读取...")     print("-" * 50)          start_time = time.time()          # 创建进程池,并行读取     with Pool(processes=n_processes) as pool:         results = pool.map(read_single_file, file_list)          # 整理结果     df_list = []     failed_files = []          for result in results:         df, error = result         if df is not None:             df_list.append(df)         else:             failed_files.append(error)          elapsed = time.time() - start_time     print(f"并行读取完成,耗时 {elapsed:.2f} 秒")     print(f"✅ 成功:{len(df_list)} 个 | ❌ 失败:{len(failed_files)} 个")          if not df_list:         return None          result = pd.concat(df_list, ignore_index=True, sort=False)     print(f"合并完成:{len(result)} 行 × {len(result.columns)} 列")          return result   # 使用示例 if __name__ == "__main__":     df = batch_read_excel_parallel(r"./销售数据")     if df is not None:         df.to_parquet("合并结果_并行.parquet", compression="snappy", index=False)         print("结果已保存为 parquet 格式")

注意:多进程读取有个前提——你的硬盘读取速度要跟得上。如果是机械硬盘(HDD),并行读取可能反而更慢(因为磁头需要不停跳转)。如果是固态硬盘(SSD),并行读取效果非常明显。


第七部分:生产级完整代码(最终版)

好了,前面把核心问题都拆解了。现在,我把所有这些技巧,整合进一个生产级的完整脚本里。这个脚本你可以直接用于工作,只需要修改配置区,然后运行。

""" 批量读取 Excel 文件并合并 — 生产级完整脚本(最终版) 作者:星探 适用场景:合并多个分公司的销售数据、合并多期报表、批量处理实验数据等 技术支持:关注微信公众号「数据星探」获取更多教程和源码 """  import pandas as pd import glob import os import time from datetime import datetime  # ==================== 配置区(修改这里!)====================  # 列名映射表(根据你的实际业务修改) COLUMN_MAPPING = {     "客户名称": "客户名称",     "客户": "客户名称",     "Customer": "客户名称",     "Customer Name": "客户名称",     "销售金额": "销售金额",     "金额": "销售金额",     "销售额": "销售金额",     "Amount": "销售金额",     "日期": "日期",     "Date": "日期",     "销售日期": "日期", }  # 必须的标准列(合并后只保留这些列,其他的列会被丢弃;如果为空则保留所有列) REQUIRED_COLUMNS = []  # 例如:["客户名称", "销售金额", "日期"],留空则保留所有列  # 数据文件夹路径 DATA_FOLDER = r"./销售数据"   # 修改为你的实际路径  # 输出文件路径 OUTPUT_EXCEL = r"./合并结果.xlsx" OUTPUT_PARQUET = r"./合并结果.parquet"  # ==================== 工具函数 ====================  def log(msg, level="INFO"):     """统一的日志输出(带时间戳和级别)"""     timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")     prefix = {         "INFO": "ℹ️",         "SUCCESS": "✅",         "WARNING": "⚠️",         "ERROR": "❌",         "DEBUG": "🔍",     }.get(level, "ℹ️")     print(f"[{timestamp}] {prefix} {msg}")   def normalize_columns(df):     """标准化 DataFrame 的列名"""     rename_dict = {col: COLUMN_MAPPING[col] for col in df.columns if col in COLUMN_MAPPING}     if rename_dict:         old_cols = list(df.columns)         df = df.rename(columns=rename_dict)         new_cols = list(df.columns)         # 只打印发生变化的列         changed = {k: v for k, v in rename_dict.items() if k != v}         if changed:             log(f"列名已标准化:{changed}", "DEBUG")     return df   def clean_dataframe(df):     """清洗 DataFrame:去空行、统一数据类型"""     original_len = len(df)          # 1. 删除全空行     df = df.dropna(how="all")     if len(df) < original_len:         log(f"删除了 {original_len - len(df)} 行全空行", "DEBUG")          # 2. 字符串列:去空格、统一 NaN 表示     for col in df.select_dtypes(include=["object"]).columns:         df[col] = df[col].astype(str).str.strip()         df[col] = df[col].replace(             ["nan", "None", "NaN", "", " ", "NULL", "null", "无", "暂无"],             pd.NA         )          # 3. 销售金额列:清洗并转为数值     if "销售金额" in df.columns:         log("处理「销售金额」列...", "DEBUG")         df["销售金额"] = df["销售金额"].apply(clean_chinese_number)         valid = df["销售金额"].notna().sum()         na_count = df["销售金额"].isna().sum()         if na_count > 0:             log(f"销售金额:有效 {valid} 行,失败(NaN){na_count} 行", "WARNING")          # 4. 日期列:转为 datetime     if "日期" in df.columns:         log("处理「日期」列...", "DEBUG")         df["日期"] = pd.to_datetime(df["日期"], errors="coerce")         valid = df["日期"].notna().sum()         nat_count = df["日期"].isna().sum()         if nat_count > 0:             log(f"日期:有效 {valid} 行,失败(NaT){nat_count} 行", "WARNING")          # 5. 如果指定了必须列,只保留这些列(加上来源列)     if REQUIRED_COLUMNS:         keep_cols = REQUIRED_COLUMNS + ["_来源文件", "_来源路径"]         keep_cols = [c for c in keep_cols if c in df.columns]  # 只保留实际存在的列         df = df[keep_cols]          return df   def clean_chinese_number(value):     """清洗中文数值表示(万、亿等)"""     if pd.isna(value):         return pd.NA     s = str(value).strip()     try:         if "亿" in s:             return float(s.replace("亿", "").strip()) * 100000000         if "万" in s:             return float(s.replace("万", "").strip()) * 10000         # 去掉常见非数字字符         s = s.replace(",", "").replace("¥", "").replace("¥", "").replace("元", "")         return float(s)     except:         return pd.NA   def read_single_file(file_path):     """     读取单个 Excel 文件,支持多 Sheet     返回:(DataFrame 或 None, 错误信息 或 None)     """     try:         # 尝试读取第一个 Sheet         df = pd.read_excel(file_path, sheet_name=0, nrows=0)  # 先只读列名         df = pd.read_excel(file_path, sheet_name=0)  # 再真正读取数据         return df, None     except Exception as e:         try:             # 失败则尝试读取所有 Sheet             all_sheets = pd.read_excel(file_path, sheet_name=None)             log(f"文件包含 {len(all_sheets)} 个 Sheet,已合并读取", "DEBUG")             df = pd.concat(all_sheets.values(), ignore_index=True)             return df, None         except Exception as e2:             return None, f"{type(e).__name__}: {e2}"   # ==================== 主流程 ====================  def batch_read_excel(folder_path, file_pattern="*.xlsx", add_source=True):     """     批量读取 Excel 文件并合并(生产级)          参数:         folder_path: 文件夹路径         file_pattern: 文件匹配模式         add_source: 是否添加"_来源文件"列          返回:         合并后的 DataFrame,或 None     """     pattern = os.path.join(folder_path, file_pattern)     file_list = sorted(glob.glob(pattern))          if not file_list:         log(f"在 {folder_path} 中没有找到匹配 {file_pattern} 的文件", "ERROR")         return None          log(f"找到 {len(file_list)} 个文件")     log("=" * 50)          df_list = []     failed_files = []          for i, file_path in enumerate(file_list, 1):         file_name = os.path.basename(file_path)         log(f"[{i}/{len(file_list)}] 处理:{file_name}")                  df, error = read_single_file(file_path)                  if df is not None:             # 标准化列名             df = normalize_columns(df)                          # 清洗数据             df = clean_dataframe(df)                          # 添加来源文件列             if add_source:                 df["_来源文件"] = file_name                 df["_来源路径"] = file_path                          df_list.append(df)             log(f"成功:{len(df)} 行 × {len(df.columns)} 列", "SUCCESS")         else:             log(f"读取失败:{error}", "ERROR")             failed_files.append((file_name, error))          log("=" * 50)     log(f"成功:{len(df_list)} 个 | 失败:{len(failed_files)} 个")          if failed_files:         log("失败文件列表:", "WARNING")         for name, err in failed_files:             log(f"  - {name}:{err[:80]}", "WARNING")          if not df_list:         log("没有成功读取任何文件", "ERROR")         return None          # 合并     log("开始合并...")     result = pd.concat(df_list, ignore_index=True, sort=False)     log(f"合并完成:{len(result)} 行 × {len(result.columns)} 列", "SUCCESS")     log(f"最终列名:{list(result.columns)}")          return result   # ==================== 程序入口 ====================  if __name__ == "__main__":     start_time = time.time()          log("=" * 50)     log("批量 Excel 合并工具启动(生产级)")     log("=" * 50)          # 执行批量读取和合并     df = batch_read_excel(DATA_FOLDER)          if df is not None:         # 保存结果(同时保存 Excel 和 parquet 两种格式)         log(f"正在保存到 Excel:{OUTPUT_EXCEL}")         df.to_excel(OUTPUT_EXCEL, index=False)         log(f"Excel 保存成功!", "SUCCESS")                  log(f"正在保存到 Parquet:{OUTPUT_PARQUET}")         df.to_parquet(OUTPUT_PARQUET, compression="snappy", index=False)         log(f"Parquet 保存成功!(推荐后续使用这个格式,读取速度更快)", "SUCCESS")                  # 统计信息         log("=" * 50)         log("📊 数据概览:")         log(f"  总行数:{len(df)}")         log(f"  总列数:{len(df.columns)}")                  if "销售金额" in df.columns:             valid_amount = df["销售金额"].dropna()             log(f"  有效销售金额行数:{len(valid_amount)}")             if len(valid_amount) > 0:                 log(f"  销售金额合计:{valid_amount.sum():,.2f}")                 log(f"  销售金额平均值:{valid_amount.mean():,.2f}")                 log(f"  销售金额最大值:{valid_amount.max():,.2f}")                 log(f"  销售金额最小值:{valid_amount.min():,.2f}")                  if "日期" in df.columns:             valid_dates = df["日期"].dropna()             if len(valid_dates) > 0:                 log(f"  最早日期:{valid_dates.min()}")                 log(f"  最晚日期:{valid_dates.max()}")                  if "_来源文件" in df.columns:             file_count = df["_来源文件"].nunique()             log(f"  来源文件数:{file_count}")                  elapsed = time.time() - start_time         log(f"⏱️ 总耗时:{elapsed:.2f} 秒")         log("=" * 50)     else:         log("合并失败,请检查文件路径和文件格式", "ERROR")

这个脚本的优点总结

✅ 完整的日志系统,每步都有时间戳和级别(INFO/SUCCESS/WARNING/ERROR)

✅ 智能 Sheet 读取,单 Sheet 失败自动尝试多 Sheet

✅ 列名标准化,支持别名映射(维护一处,全局生效)

✅ 数据类型统一清洗,销售金额自动去除货币符号、处理"万/亿"单位

✅ 来源追溯,每个原始文件都有记录(_来源文件、_来源路径)

✅ 失败文件单独记录,不会静默失败

✅ 合并后输出详细统计概览(行数、列数、金额统计、日期范围等)

✅ 同时保存 Excel 和 Parquet 两种格式(Parquet 推荐用于后续分析)

✅ 计时功能,方便你评估性能,发现瓶颈

✅ 配置区集中管理,修改路径和映射表即可,不需要改逻辑代码


第八部分:实战案例 —— 从需求到上线全流程

光看代码不够,我来带你走一遍真实企业项目的完整流程。这个案例来自我真实做过的项目(数据已脱敏)。

案例背景

某零售企业有 50 家分公司,每家分公司每天导出一份销售数据 Excel 文件,上传到总部服务器。你的任务是:每天自动合并前一天的所有文件,生成一张总表,并计算出各分公司的销售排名

文件特点:

• 每家分公司的文件命名规则:分公司名_日期.xlsx(如 北京分公司_2024-01-01.xlsx

• 有的分公司用 .xlsx,有的用 .xls(老系统)

• 列名不统一(见前面的问题列表)

• 文件可能损坏或为空

完整解决方案(分步骤)

步骤1:了解你的数据(永远不要跳过这步!)

在写代码之前,先随机打开 5-10 个文件,手工看一下:

1. 列名是什么?(截图记录,填入 COLUMN_MAPPING)

2. 日期格式是什么?

3. 金额有没有货币符号?

4. 有没有合并单元格?(合并单元格用 openpyxl 读取时,只有第一个单元格有值,其余为 NaN)

5. 有没有多行表头?

步骤2:处理特殊情况

如果发现有多行表头(比如前两行是标题,真正的表头在第3行),需要指定 header 和 skiprows 参数:

def read_excel_with_complex_header(file_path):     """处理复杂表头的文件"""     # 先读取前5行,看看结构     preview = pd.read_excel(file_path, nrows=5)     print(f"前5行预览:\n{preview}")          # 如果真正的表头在第3行(索引为2,因为从0开始)     # 注意:skiprows=2 跳过前两行,然后 header=0 表示"跳过后的第一行作为表头"     df = pd.read_excel(file_path, skiprows=2, header=0)     return df

步骤3:完整自动化脚本(定时任务版)

""" 定时任务版:每天自动合并前一天的数据 配合 Windows 任务计划程序 或 Linux crontab 使用 """  import pandas as pd import glob import os import time from datetime import datetime, timedelta  # 配置 DATA_BASE_DIR = r"C:\销售数据" OUTPUT_DIR = r"C:\合并结果" LOG_DIR = r"C:\合并结果\logs"  def get_yesterday_folder():     """获取昨天的日期字符串,用于定位文件夹"""     yesterday = datetime.now() - timedelta(days=1)     date_str = yesterday.strftime("%Y-%m-%d")     return os.path.join(DATA_BASE_DIR, date_str)   def main():     # 1. 定位昨天的文件夹     folder = get_yesterday_folder()     if not os.path.exists(folder):         log(f"文件夹不存在:{folder},可能有分公司未上传数据", "WARNING")         # 可以选择继续处理(合并已有的文件),或者退出         # 这里选择继续处理          # 2. 批量读取(使用前面定义的生产级函数)     df = batch_read_excel(folder, file_pattern="*.xls*")  # 同时匹配 .xls 和 .xlsx          if df is None:         log("没有成功读取任何文件,程序终止", "ERROR")         return          # 3. 数据质量检查     log("=" * 50)     log("数据质量检查:")     log(f"  总行数:{len(df)}")     log(f"  重复行数:{df.duplicated().sum()}")          if "客户名称" in df.columns:         log(f"  客户数量(去重):{df['客户名称'].nunique()}")          if "销售金额" in df.columns:         na_count = df["销售金额"].isna().sum()         log(f"  销售金额为空的行数:{na_count}")         valid = df["销售金额"].dropna()         if len(valid) > 0:             log(f"  销售额统计:")             log(f"    最小值:{valid.min():,.2f}")             log(f"    最大值:{valid.max():,.2f}")             log(f"    平均值:{valid.mean():,.2f}")             log(f"    合计:{valid.sum():,.2f}")          if "日期" in df.columns:         valid_dates = df["日期"].dropna()         if len(valid_dates) > 0:             log(f"  日期范围:{valid_dates.min()} ~ {valid_dates.max()}")          # 4. 删除重复行(可选,根据业务需求决定)     before = len(df)     df = df.drop_duplicates()     after = len(df)     if before != after:         log(f"删除了 {before - after} 行重复数据", "WARNING")          # 5. 按日期排序(如果日期列存在且有效)     if "日期" in df.columns and df["日期"].notna().sum() > 0:         df = df.sort_values("日期")         log("已按日期排序", "DEBUG")          # 6. 保存结果(同时保存 Excel 和 parquet)     today_str = datetime.now().strftime("%Y%m%d")          output_excel = os.path.join(OUTPUT_DIR, f"销售数据汇总_{today_str}.xlsx")     output_parquet = os.path.join(OUTPUT_DIR, f"销售数据汇总_{today_str}.parquet")          df.to_excel(output_excel, index=False)     df.to_parquet(output_parquet, compression="snappy", index=False)          log(f"✅ Excel 已保存:{output_excel}", "SUCCESS")     log(f"✅ Parquet 已保存:{output_parquet}(推荐后续使用这个格式)", "SUCCESS")          # 7. 生成简要报告     report_path = os.path.join(LOG_DIR, f"数据报告_{today_str}.txt")     os.makedirs(LOG_DIR, exist_ok=True)          with open(report_path, "w", encoding="utf-8") as f:         f.write("销售数据合并报告\n")         f.write("=" * 50 + "\n")         f.write(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")         f.write(f"总行数:{len(df)}\n")         f.write(f"总列数:{len(df.columns)}\n")         f.write(f"列名:{list(df.columns)}\n")         if "销售金额" in df.columns:             valid = df["销售金额"].dropna()             f.write(f"销售金额合计:{valid.sum():,.2f}\n")             f.write(f"销售金额平均:{valid.mean():,.2f}\n")         f.write("=" * 50 + "\n")         f.write("各文件数据量:\n")         if "_来源文件" in df.columns:             for name, group in df.groupby("_来源文件"):                 f.write(f"  {name}:{len(group)} 行\n")          log(f"📄 报告已生成:{report_path}", "SUCCESS")     log("=" * 50)     log("每日合并任务完成!", "SUCCESS")   if __name__ == "__main__":     main()

步骤4:配置定时任务

Windows 下使用"任务计划程序":

1. 打开"任务计划程序"(开始菜单搜索即可)

2. 创建基本任务,触发器选择"每天",时间设置为凌晨 2:00(确保分公司已上传完数据)

3. 操作选择"启动程序",程序路径填 python.exe,参数填你的脚本路径

4. 勾选"使用最高权限运行"

Linux/Mac 下使用 crontab:

# 每天早上 2 点运行 0 2 * * * /usr/bin/python3 /path/to/your/script.py >> /path/to/logfile.log 2>&1

第九部分:单元测试 —— 确保你的数据处理管道不出错

生产级代码,必须有测试。不要等数据出问题了再排查,要在代码上线之前就发现问题。

Python 里测试数据处理函数,可以用 pytest 库。下面给你一个完整的测试示例:

""" 测试脚本:test_batch_read.py 用于测试批量读取函数的正确性 运行:pytest test_batch_read.py -v """  import pandas as pd import pytest import os import tempfile  def create_test_excel(file_path, data, sheet_name="Sheet1"):     """创建测试用的 Excel 文件"""     df = pd.DataFrame(data)     df.to_excel(file_path, sheet_name=sheet_name, index=False)   def test_normalize_columns():     """测试列名标准化函数"""     df = pd.DataFrame({         "客户": ["张三", "李四"],         "金额": [100, 200],         "日期": ["2024-01-01", "2024-01-02"]     })          df = normalize_columns(df)          # 断言:列名应该被标准化     assert "客户名称" in df.columns     assert "销售金额" in df.columns     assert "日期" in df.columns     # 旧列名应该不存在了     assert "客户" not in df.columns     assert "金额" not in df.columns   def test_clean_chinese_number():     """测试中文数值清洗函数"""     assert clean_chinese_number("1000") == 1000.0     assert clean_chinese_number("1.5万") == 15000.0     assert clean_chinese_number("2.3亿") == 230000000.0     assert clean_chinese_number("¥1,000") == 1000.0     assert pd.isna(clean_chinese_number("暂无"))     assert pd.isna(clean_chinese_number(None))   def test_read_single_file(tmp_path):     """测试读取单个文件(使用临时目录)"""     # 创建测试文件     test_file = tmp_path / "测试.xlsx"     create_test_excel(test_file, {         "客户名称": ["张三"],         "销售金额": [100]     })          df, error = read_single_file(str(test_file))          assert df is not None     assert error is None     assert len(df) == 1     assert "客户名称" in df.columns   def test_batch_read_excel():     """测试批量读取(使用临时目录和测试文件)"""     with tempfile.TemporaryDirectory() as tmp_dir:         # 创建3个测试文件         for i in range(3):             file_path = os.path.join(tmp_dir, f"测试_{i}.xlsx")             create_test_excel(file_path, {                 "客户名称": [f"客户{i}"],                 "销售金额": [100 * (i + 1)]             })                  # 批量读取         df = batch_read_excel(tmp_dir, add_source=True)                  assert df is not None         assert len(df) == 3         assert "_来源文件" in df.columns   if __name__ == "__main__":     pytest.main(["-v", __file__])

为什么需要测试?

因为你写的代码,六个月后你自己都忘了当初为什么这样写。有了测试,当你修改了某处代码后,运行一下测试,如果通过了,说明你的修改没有破坏原有功能;如果失败了,测试会精确告诉你哪里的逻辑出错了。


第十部分:常见问题 FAQ(收藏版)

Q1:读取时报错 "File is not a ZIP file" 怎么办?

A:这个文件可能根本不是真正的 .xlsx 文件(可能只是被改了后缀名),或者文件已损坏。用 openpyxl.load_workbook() 单独测试这个文件,确认是否能正常打开。另外,检查文件是不是 .xls 格式(Excel 97-2003),如果是,需要安装 xlrd 库。

# 检查文件是否是有效的 ZIP(.xlsx 本质上是 ZIP) import zipfile try:     with zipfile.ZipFile("可疑文件.xlsx", 'r') as z:         print("有效的 .xlsx 文件,包含以下文件:")         print(z.namelist()) except zipfile.BadZipFile:     print("❌ 不是有效的 .xlsx 文件(可能已损坏,或者根本不是 Excel 文件)")

Q2:读取速度太慢,怎么加速?

A:1. 先用 usecols 参数只读取需要的列;2. 保存为 parquet 格式后后续直接读 parquet;3. 如果数据量真的非常大,考虑用 dask 库(pandas 的分布式版本);4. 用多进程并行读取(见第六部分策略三)。

# 加速技巧1:只读取需要的列 df = pd.read_excel(file_path, usecols=["客户名称", "销售金额", "日期"])  # 加速技巧2:跳过不需要的行 df = pd.read_excel(file_path, skipfooter=10)  # 跳过末尾10行  # 加速技巧3:指定数据类型(避免 pandas 自动推断,节省时间) df = pd.read_excel(file_path, dtype={     "客户名称": "category",  # category 类型比 object 更省内存     "销售金额": "float32",     "数量": "int32" })

Q3:合并后内存不够用怎么办?

A:使用分块处理策略(见第六部分),或者升级内存。另外,确保在合并后及时删除不需要的临时变量:

import gc  # 合并完成后 del df_list  # 删除临时列表,释放内存 gc.collect()  # 强制垃圾回收  # 更进一步:只保留需要的列 if "临时计算列" in df.columns:     df = df.drop(columns=["临时计算列"])

Q4:如何处理有密码保护的 Excel 文件?

A:pandas 不能直接读取有密码保护的 Excel 文件。需要先手动解除密码,或者用 msoffcrypto-tool 库在代码中解密:

# 需要先安装:pip install msoffcrypto-tool import msoffcrypto  with open("加密文件.xlsx", "rb") as f_in:     with open("解密文件.xlsx", "wb") as f_out:         office_file = msoffcrypto.OfficeFile(f_in)         office_file.load_key(password="你的密码")         office_file.decrypt(f_out)  # 然后再用 pandas 读取解密后的文件 df = pd.read_excel("解密文件.xlsx")

Q5:如何让代码在后台自动定时运行?

A:可以用 Windows 任务计划程序(Task Scheduler)或 Linux 的 crontab,配置每天自动运行合并脚本。(详见第八部分步骤4)

Q6:合并后有些行数据是重复的,怎么处理?

A:用 drop_duplicates(),可以指定按哪些列去重:

# 按所有列去重 df = df.drop_duplicates()  # 按指定列去重(只保留第一次出现的数据) df = df.drop_duplicates(subset=["客户名称", "日期"], keep="first")  # 按指定列去重(只保留最后一次出现的数据) df = df.drop_duplicates(subset=["客户名称", "日期"], keep="last")

Q7:如何追溯每一行数据来自哪个原始文件?

A:在读取每个文件后,添加来源列(我们的生产级代码已经做了这个):

df["_来源文件"] = file_name df["_来源路径"] = file_path  # 合并后,你可以这样追溯: # 找出所有来自"北京分公司"的数据 beijing_data = df[df["_来源文件"].str.contains("北京")]  # 统计每个文件贡献了多少行 file_counts = df["_来源文件"].value_counts()

Q8:Excel 文件里的公式怎么办?读取后是公式本身还是计算值?

A:默认情况下,pd.read_excel() 读取的是计算后的值,而不是公式本身。如果你需要读取公式,需要用 openpyxl 的 data_only=False 模式(但这样读取到的是公式字符串,不是计算结果)。

Q9:如何处理合并单元格?

A:合并单元格用 openpyxl 读取时,只有合并区域的第一个单元格有值,其余单元格为 NaN。可以用 ffill()(前向填充)来处理:

df = pd.read_excel(file_path)  # 对"客户名称"列做前向填充(合并单元格的情况) df["客户名称"] = df["客户名称"].fillna(method="ffill")  # 或者更安全的做法:只对连续的 NaN 填充(避免把不同客户的空值也填充了) df["客户名称"] = df.groupby(df["客户名称"].notna().cumsum())["客户名称"].ffill()

Q10:如何处理超大文件(百万行以上)?

A:考虑以下方案:

1. 转成 CSV + 用 chunksize 分块读取

2. 直接用 dask.dataframe(pandas 的分布式版本,API 几乎完全兼容)

3. 存到数据库(如 MySQL/PostgreSQL),然后用 SQL 查询

4. 用 openpyxl 的 read_only 模式(见第六部分策略一)

# 使用 dask 处理超大文件(安装:pip install dask) import dask.dataframe as dd  # dask 的 API 和 pandas 几乎一样,但它是惰性计算的(不会立即执行,节省内存) df = dd.read_csv("超大文件.csv")  # 注意:dask 不支持直接读 Excel,需要先转 CSV result = df.groupby("客户名称")["销售金额"].sum().compute()  # .compute() 才会真正执行 print(result)

第十一部分:总结与学习路线

这篇文章,我们从最基础的三行代码,一步步走到生产级完整脚本,覆盖了:

✅ glob 批量获取文件列表(支持递归子目录、通配符详解)

✅ pandas 读取 Excel 的完整参数解析(12个核心参数详解)

✅ 智能 Sheet 读取(单 Sheet 失败自动尝试多 Sheet)

✅ 列名标准化(别名映射表,维护一处全局生效)

✅ 数据类型清洗(金额去货币符号、处理"万/亿"单位、日期转 datetime)

✅ 空行处理、重复行处理、合并单元格处理

✅ 来源追溯(哪个文件、哪行数据,一查便知)

✅ 失败文件记录(不会静默失败,失败原因清晰可见)

✅ 性能优化(parquet 格式、多进程并行、分块读取)

✅ 完整日志系统(每步都有时间戳,出了问题好追溯)

✅ 真实企业案例完整落地流程(从需求到上线)

✅ 单元测试基础(用 pytest 确保你的代码不出错)

✅ 技术选型决策树(什么时候用 Python,什么时候不用)

下一步学习方向

如果你把这篇文章的内容都掌握了,你已经具备了处理90%的 Excel 批量合并场景的能力。接下来,我建议你按这个顺序继续学习:

第1步:数据清洗与转换(本系列第3章)

合并完数据,下一步就是清洗:处理空值、重复值、异常值,字符串清洗,日期格式统一。这是数据分析师的核心竞争力。

第2步:数据分析与统计(本系列第4章)

数据干净了,就可以做分析了:分组统计(groupby)、数据透视表(pivot_table)、相关性分析等。

第3步:报表自动化与可视化(本系列第5章)

分析完了,需要输出报表:用 openpyxl 生成带格式的 Excel 报表,用 matplotlib 生成图表。

第4步:把你的代码包装成一个小工具

用 tkinter 或 PyQt 做一个带界面的小工具,让不懂 Python 的同事也能用。这才是"数据分析师的终极技能"——让工具为你工作,而不是你为工具工作


🔔 关注公众号,回复「合并工具」领取:本文完整代码(.py文件)+ 测试数据生成脚本 + 常见错误处理速查表 + parquet格式性能对比报告

(点击公众号底部菜单 → 领取资料)

如果这篇文章对你有帮助,请点个「在看」,让更多人看到。有任何问题,欢迎在评论区留言,我会一一回复。

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-05-22 16:02:44 HTTP/1.1 GET : https://www.yeyulingfeng.com/a/648165.html
  2. 运行时间 : 0.187497s [ 吞吐率:5.33req/s ] 内存消耗:4,879.98kb 文件加载:145
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=33590e043a4801cdd17f5e963fd330b2
  1. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_static.php ( 6.05 KB )
  7. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/ralouphie/getallheaders/src/getallheaders.php ( 1.60 KB )
  10. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  11. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  12. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  13. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  14. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  15. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  16. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  17. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  18. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  19. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions_include.php ( 0.16 KB )
  21. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions.php ( 5.54 KB )
  22. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  23. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  24. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  25. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/provider.php ( 0.19 KB )
  26. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  27. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  28. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  29. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/common.php ( 0.03 KB )
  30. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  32. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/alipay.php ( 3.59 KB )
  33. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  34. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/app.php ( 0.95 KB )
  35. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cache.php ( 0.78 KB )
  36. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/console.php ( 0.23 KB )
  37. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cookie.php ( 0.56 KB )
  38. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/database.php ( 2.48 KB )
  39. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/filesystem.php ( 0.61 KB )
  40. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/lang.php ( 0.91 KB )
  41. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/log.php ( 1.35 KB )
  42. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/middleware.php ( 0.19 KB )
  43. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/route.php ( 1.89 KB )
  44. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/session.php ( 0.57 KB )
  45. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/trace.php ( 0.34 KB )
  46. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/view.php ( 0.82 KB )
  47. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/event.php ( 0.25 KB )
  48. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  49. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/service.php ( 0.13 KB )
  50. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/AppService.php ( 0.26 KB )
  51. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  52. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  53. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  54. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  55. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  56. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/services.php ( 0.14 KB )
  57. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  58. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  59. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  60. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  61. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  62. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  63. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  64. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  65. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  66. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  67. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  68. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  69. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  70. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  71. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  72. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  73. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  74. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  75. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  76. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  77. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  78. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  79. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  80. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  81. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  82. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  83. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  84. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  85. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  86. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  87. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/Request.php ( 0.09 KB )
  88. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  89. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/middleware.php ( 0.25 KB )
  90. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  91. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  92. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  93. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  94. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  95. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  96. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  97. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  98. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  99. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  100. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  101. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  102. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  103. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/route/app.php ( 3.94 KB )
  104. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  105. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  106. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Index.php ( 9.87 KB )
  108. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/BaseController.php ( 2.05 KB )
  109. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  110. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  111. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  112. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  113. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  114. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  115. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  116. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  117. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  118. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  119. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  120. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  121. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  122. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  123. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  124. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  125. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  126. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  127. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  128. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  129. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  130. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  131. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  132. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  133. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  134. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  135. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Es.php ( 3.30 KB )
  136. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  137. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  138. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  139. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  140. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  141. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  142. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  143. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  144. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/runtime/temp/c935550e3e8a3a4c27dd94e439343fdf.php ( 31.50 KB )
  145. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.001096s ] mysql:host=127.0.0.1;port=3306;dbname=wenku;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.001808s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000801s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000707s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.001389s ]
  6. SELECT * FROM `set` [ RunTime:0.000654s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.001581s ]
  8. SELECT * FROM `article` WHERE `id` = 648165 LIMIT 1 [ RunTime:0.001605s ]
  9. UPDATE `article` SET `lasttime` = 1779436965 WHERE `id` = 648165 [ RunTime:0.003064s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 64 LIMIT 1 [ RunTime:0.000359s ]
  11. SELECT * FROM `article` WHERE `id` < 648165 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.000488s ]
  12. SELECT * FROM `article` WHERE `id` > 648165 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000387s ]
  13. SELECT * FROM `article` WHERE `id` < 648165 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.000586s ]
  14. SELECT * FROM `article` WHERE `id` < 648165 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.002071s ]
  15. SELECT * FROM `article` WHERE `id` < 648165 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.001539s ]
0.189161s