在大模型时代,我们都在追逐更好的模型效果,但往往忽略了一个关键问题:好的数据比好的模型更重要。今天要拆解的 DataFlow,就是一个专门解决这个问题的项目——它把数据准备这件事从"手工活"变成了"工业化流水线"。
GitHub 项目地址:https://github.com/OpenDCAI/DataFlow
一、为什么 DataFlow 值得你关注?
做过模型训练的人都知道,数据准备是最耗时、最头疼的环节。你可能要花几周时间写各种脚本,从 PDF 提取文本、清洗数据、生成问答对、评估质量……但这些脚本往往不可复用,换个场景又要重写。
DataFlow 就是来解决这个痛点的。它用"算子+流水线"的设计,把数据准备的每个环节都模块化,让你能像搭积木一样组合出完整的数据处理流程。更重要的是,这个项目来自北大、港科大等顶尖机构,已经在 ICDE、KDD 等顶级会议发表论文,还拿了 ICML 2025 和 LIC 2025 的竞赛冠军——学术价值和工程实用性兼备。
二、项目定位:数据中心 AI 的基础设施
技术分类
DataFlow 属于 数据中心 AI(Data-Centric AI) 领域的基础设施。它不是 LangChain 那种应用开发框架,也不是 LlamaIndex 那种 RAG 工具,而是专门服务于数据准备阶段的工作流编排系统。

在 AI 生态中的位置
如果把 AI 开发流程比作一条生产线:
上游:数据采集(爬虫、API) 中游:DataFlow 在这里——数据清洗、生成、评估 下游:模型训练(预训练、SFT、RL)
它填补了"原始数据"到"训练数据"之间的空白。
面向用户
研究人员:需要可复现的数据处理流程来做实验对比 算法工程师:需要高效准备训练数据来提升模型效果 中小团队:没有资源自建完整数据处理平台,需要开箱即用的方案
对标产品
目前市场上类似的系统有 Nemo-Curator、Data-Juicer。但 DataFlow 的差异化在于:
更强的数据合成能力:不只是过滤,还能生成新数据 更灵活的编程模型:类 PyTorch 的 Pipeline → Operator → Prompt 三层结构 更丰富的预置流水线:文本、数学、代码、Text2SQL 等领域都有现成方案
三、系统架构:像工厂流水线一样处理数据
DataFlow 的架构设计非常清晰,可以用"四层流水线"来理解:
第一层:输入层
接收各种格式的原始数据:JSON、JSONL、CSV、PDF、Word 统一转换为表格化的存储结构 每个数据项都有明确的字段(Key)
第二层:调度层
Pipeline(流水线):负责整体流程编排 编译时检查数据字段的完整性,确保上游输出能匹配下游输入 支持断点续传、批量处理、分布式执行
第三层:核心层(算子层)
Operator(算子):每个算子封装一个具体的数据处理任务 按功能分类:生成、评估、过滤、精炼 按领域分类:文本、数学、代码、Text2SQL、Agentic RAG 等
第四层:输出层
输出高质量的训练数据 可以直接对接 LLaMA-Factory 等训练框架 支持数据版本管理和追溯
如果你要画架构图,可以这样画:
┌─────────────────────────────────────────────────────────┐
│ 用户输入 │
│ (PDF / 文本 / 低质量 QA / 表格数据) │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Storage 存储层 │
│ (统一数据格式,按 Key 管理字段) │
└────────────────────┬────────────────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ 算子1 │ │ 算子2 │ │ 算子3 │
│Generator│ │Evaluator│ │ Filter │
└────┬───┘ └────┬───┘ └────┬───┘
│ │ │
└─────────────┼─────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ 算子4 │ │ 算子5 │ │ 算子6 │
│ Refiner│ │ SQL │ │ Code │
└────┬───┘ └────┬───┘ └────┬───┘
│ │ │
└─────────────┼─────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Pipeline 编排层 │
│ (编译检查、执行调度、资源管理、断点续传) │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 输出结果 │
│ (高质量训练数据 / RAG 知识库 / 评估报告) │
└─────────────────────────────────────────────────────────┘
四、核心模块拆解:从设计逻辑看工程实现
1. Operator(算子):最小处理单元
这个模块解决什么问题:把复杂的数据处理拆分成独立、可复用的单元。
核心思路:每个算子只做一件事,通过 input_key 和 output_key 与外界交互。
设计逻辑:
继承自 OperatorABC抽象基类必须实现 run()方法通过 Storage 对象读写数据 可以选择性地使用 LLM Serving 来调用大模型
数据流动:
Storage (input_key) → Operator.run() → Storage (output_key)
算子不直接持有数据,而是通过 Storage 的 Key 来读取和写入,这种设计让算子之间完全解耦。
2. Pipeline(流水线):流程编排者
这个模块解决什么问题:把多个算子组合成完整的工作流,确保数据能按正确顺序流动。
核心思路:先编译、后执行,编译时做静态检查,避免运行时才发现错误。
代码结构(Pipeline.py):
forward():用户定义算子的执行顺序compile():编译流水线,构建算子图,检查 Key 的完整性_compiled_forward():实际执行draw_graph():可视化流水线
关键设计:
Key 完整性检查:编译时会验证每个算子的 input_key 是否在前面的输出中存在,避免运行时 KeyError OperatorNode 图:把算子和数据字段都建模成节点,用指针连接,形成完整的数据流图 资源管理:自动管理 LLM Serving 的生命周期,用完即释放,节省资源
3. Storage(存储):数据的载体
这个模块解决什么问题:统一不同格式的数据读写,让算子不用关心数据存在哪里。
核心思路:提供统一的 Key-Value 访问接口,底层可以是文件、内存或数据库。
数据流动:
原始文件 → read() → 内存 DataFrame → step() → 下一步算子 → write() → 输出文件
每个步骤会生成新的文件,保留中间结果,方便调试和断点续传。
4. LLM Serving(模型服务):大模型的抽象
这个模块解决什么问题:把不同的大模型 API(OpenAI、vLLM、本地模型)封装成统一接口。
核心思路:面向接口编程,算子只依赖 LLMServingABC 抽象,不依赖具体实现。
设计优势:
可以随时切换底层模型,不用修改算子代码 支持本地部署(vLLM)和云端 API 自动管理模型加载和释放
五、关键技术点:为什么这样设计?
1. 算子基设计 vs 配置文件设计
为什么用算子基设计?
替代方案是用配置文件(YAML/JSON)定义流程,但 DataFlow 选择了代码即配置的方式。
当前方案的优势:
灵活性:可以在 forward()里写任意 Python 逻辑,支持条件分支、循环调试方便:可以用标准的 Python 调试工具 类型安全:IDE 可以提供代码补全和类型检查
劣势:
学习曲线稍陡 可视化需要额外的代码解析
Trade-off:选择了灵活性和工程性,牺牲了一点易用性。但通过 WebUI 的可视化拖拽,又弥补了这一点。
2. 编译时 Key 检查 vs 运行时检查
为什么在编译时做 Key 检查?
如果等到运行时才发现 Key 不匹配,可能已经浪费了几小时的计算时间。
当前方案的优势:
提前发现错误:在真正执行前就知道流程有没问题 节省成本:避免跑了一半才报错 更好的用户体验:错误信息很清晰,告诉你哪个算子缺哪个 Key
实现方式(在 _build_operator_nodes_graph() 中):
维护一个 accumulated_keys列表,记录每步之后有哪些字段对每个算子,检查它的 input_key 是否在 accumulated_keys中如果不在,抛出清晰的错误信息
3. Storage 按步骤保存 vs 单文件更新
为什么每个步骤都保存新文件?
替代方案是直接在原文件上修改,但 DataFlow 选择了每步生成新文件。
当前方案的优势:
可追溯:可以查看每个步骤的中间结果 可调试:出错了可以从中间步骤重新开始,不用从头跑 安全:不会破坏原始数据
劣势:
占用更多磁盘空间 小数据量时有点冗余
Trade-off:对于数据准备这种"一跑就是几小时"的场景,可追溯性和可调试性比节省磁盘空间更重要。
4. 模块化 LLM Serving vs 硬编码 API 调用
为什么抽象 LLM Serving?
如果在每个算子里直接调用 OpenAI API,以后想换模型就很麻烦。
当前方案的优势:
可切换:可以随时换用本地模型或其他 API 可测试:可以 Mock LLM Serving 来做单元测试 资源共享:多个算子可以复用同一个模型实例
六、核心代码讲解
1. OperatorABC:算子的根基
解决什么问题:定义所有算子的统一接口。
核心思路:用抽象基类强制规范,确保每个算子都有 run() 方法。
代码结构(简化版):
from abc import ABC, abstractmethod
classOperatorABC(ABC):
def__init__(self):
self.logger = get_logger()
self.ALLOWED_PROMPTS = tuple([type[DIYPromptABC | PromptABC]])
@abstractmethod
defrun(self) -> None:
"""核心逻辑,子类必须实现"""
pass
设计要点:
用 @abstractmethod强制子类实现run()提供统一的 logger ALLOWED_PROMPTS用于类型安全的提示词管理
2. PipelineABC:流水线的核心
解决什么问题:编排多个算子,管理执行流程。
核心思路:把用户定义的 forward() 编译成可执行的图。
代码结构(简化版):
classPipelineABC(ABC):
def__init__(self):
self.op_runtimes = [] # 存储算子运行时
self.compiled = False
self.accumulated_keys = [] # 记录每步的字段
@abstractmethod
defforward(self):
"""用户在这里定义算子顺序"""
pass
defcompile(self):
"""编译流水线:
1. 把算子包装成 AutoOP
2. 调用 forward() 收集算子
3. 构建算子图
4. 检查 Key 完整性
"""
self.compiled = True
# ... 省略具体实现 ...
self._build_operator_nodes_graph()
def_compiled_forward(self, resume_step: int = 0):
"""实际执行:按顺序跑每个算子"""
for idx, op_node in enumerate(self.op_nodes_list):
if idx - 1 < resume_step:
continue
op_node.op_obj.run(
storage=op_node.storage,
**op_node.kwargs
)
使用示例:
classMyPipeline(PipelineABC):
defforward(self):
# 定义算子
self.generator = PromptedGenerator(llm_serving=llm, system_prompt="...")
self.filter = QualityFilter(llm_serving=llm)
# 定义执行顺序
self.generator.run(storage=storage.step(), input_key="question", output_key="answer")
self.filter.run(storage=storage.step(), input_key="answer", output_key="filtered_answer")
# 使用
pipeline = MyPipeline()
pipeline.compile() # 编译并检查
pipeline.forward() # 执行
3. Storage:数据管理
解决什么问题:统一数据读写接口。
核心思路:用 DataFrame 存储数据,按 Key 访问字段。
核心概念:
step():创建下一步的 Storage,会生成新文件input_key:从哪个字段读取output_key:写入哪个字段
七、完整执行流程:像讲故事一样理解
让我们用一个实际场景——"把 PDF 转成高质量问答对"——来走一遍完整流程:
第一步:用户输入
用户有一份 100 页的医疗领域 PDF,想把它转成能用来微调模型的问答对数据。
第二步:初始化 Storage
storage = FileStorage(first_entry_file_name="./pdf_extracted.json")
DataFlow 先用 MinerU 把 PDF 提取成结构化的文本,存储在 JSON 文件里。
第三步:定义 Pipeline
用户写一个 Pipeline 类:
classPDF2QAPipeline(PipelineABC):
defforward(self):
# 算子1:从文本生成候选问答对
self.qa_generator = QAGenerator(llm_serving=gpt4)
# 算子2:评估问答对的质量
self.evaluator = QualityEvaluator(llm_serving=gpt4)
# 算子3:过滤掉低质量的
self.filter = ThresholdFilter(threshold=0.8)
# 算子4:精炼高质量的问答对
self.refiner = AnswerRefiner(llm_serving=gpt4)
# 执行顺序
self.qa_generator.run(storage=storage.step(), input_key="text", output_key="qa_pairs")
self.evaluator.run(storage=storage.step(), input_key="qa_pairs", output_key="quality_score")
self.filter.run(storage=storage.step(), input_key="quality_score", output_key="filtered_qa")
self.refiner.run(storage=storage.step(), input_key="filtered_qa", output_key="final_qa")
第四步:编译 Pipeline
pipeline = PDF2QAPipeline()
pipeline.compile()
这一步发生了什么?
把所有算子包装成 AutoOP 调用 forward(),收集每个算子的调用信息构建算子图,连接数据字段 检查 Key: qa_generator需要text→ 有,没问题evaluator需要qa_pairs→ 上一个算子输出,没问题以此类推... 所有检查通过,可以执行了!
第五步:执行 Pipeline
pipeline.forward()
执行过程详解:
第 1 步:QA Generator 运行
从 Storage 读取 text字段调用 GPT-4,对每段文本生成 3-5 个问答对 把结果写入 qa_pairs字段保存到 dataflow_step1.json
第 2 步:Quality Evaluator 运行
读取 qa_pairs字段用 GPT-4 从准确性、有用性、清晰度三个维度评分 把分数写入 quality_score字段保存到 dataflow_step2.json
第 3 步:Threshold Filter 运行
读取 quality_score字段过滤掉分数低于 0.8 的问答对 把剩下的写入 filtered_qa字段保存到 dataflow_step3.json
第 4 步:Answer Refiner 运行
读取 filtered_qa字段用 GPT-4 优化答案的表述,让它更符合指令微调的格式 把最终结果写入 final_qa字段保存到 dataflow_step4.json
第六步:输出结果
最后得到的 final_qa 就是高质量的训练数据,可以直接喂给 LLaMA-Factory 去微调模型了!
如果中间出错了怎么办?
比如第 3 步跑了一半报错了 没关系,用 resume_step=2从第 3 步重新开始不用从头跑一遍,节省时间
八、业务应用分析:DataFlow 能用来做什么?
1. 应用场景
场景 1:垂直领域模型微调
需求:一家医疗公司想在通用模型基础上,用自己的医疗文献微调出专业模型。DataFlow 解决方案:
用 PDF 提取算子把文献转成文本 用文本处理流水线生成问答对 用质量评估和过滤确保数据质量 输出训练数据直接对接 LLaMA-Factory
场景 2:RAG 知识库构建
需求:一家企业想把内部文档(Word、PDF、表格)做成 RAG 系统的知识库。DataFlow 解决方案:
用知识库清洗流水线处理各种格式的文档 提取结构化知识 生成候选问答对用于评估检索效果 输出高质量的知识库切片
场景 3:Text2SQL 数据增强
需求:一个 BI 工具想提升 Text2SQL 的准确率,但标注 SQL 数据成本很高。DataFlow 解决方案:
用 Text2SQL 流水线从数据库 schema 生成大量自然语言问题 → SQL 对 用 SQL 感知的评估器检查生成的 SQL 是否正确 用精炼算子优化问题的表述多样性 低成本获得大量高质量训练数据
场景 4:数学推理数据生成
需求:想提升模型的数学推理能力,但高质量数学题数据很少。DataFlow 解决方案:
用推理流水线从基础题目生成变体 自动添加 Chain-of-Thought 推理过程 评估难度并分类 输出的训练数据能显著提升数学成绩(实验显示平均提升 5-10 个百分点)
场景 5:代码训练数据准备
需求:想训练一个代码模型,但公开代码数据集质量参差不齐。DataFlow 解决方案:
用代码流水线过滤低质量代码 生成代码解释和测试用例 评估代码的可执行性和正确性 精炼代码表述,让它更符合教学风格
2. 业务价值
提升什么:
模型效果:用 DataFlow 处理过的数据训练,模型在数学、代码、知识等任务上都有明显提升(见 README 中的实验结果) 数据质量:标准化的流程减少了人工错误 开发效率:不用重复写数据处理脚本
替代什么:
替代手写的各种 Python 脚本 替代 ad-hoc 的数据处理流程 替代昂贵的人工标注(部分替代)
是否真的有价值:
对研究人员:价值很大——可复现的流程,方便对比不同方法 对大公司:有价值——标准化数据准备流程,不同团队可以共享流水线 对小团队:非常有价值——不用自己搭平台,直接用现成的
3. 落地难度
是否容易接入:
安装简单: pip install open-dataflow就行有预置流水线:文本、数学、代码等都有现成的 有 WebUI:不会写代码也能用拖拽方式构建流水线 总体评价:比较容易,有 Python 基础就能上手
是否适合中小团队:
非常适合——中小团队通常没有资源自建完整的数据处理平台 DataFlow 提供了一站式解决方案,从数据处理到训练对接都有 可以用 API 调用大模型,不用自己买 GPU(当然也支持本地部署)
九、优势 & 不足:客观评价
优势
设计清晰:Pipeline → Operator → Prompt 三层结构,概念清晰,容易理解 灵活性强:代码即配置,想怎么组合就怎么组合 预置丰富:100+ 算子,多个领域的预置流水线,拿来就能用 工程质量高:编译检查、断点续传、资源管理,该有的都有 学术背书强:顶会论文 + 竞赛冠军,证明了方案的有效性 可视化友好:WebUI 拖拽 + 流程图可视化,降低使用门槛 生态完整:有 DataFlow-Agent、RayOrch、DataFlow-MM 等配套组件
不足(很重要)
文档还不够完善:虽然有基本文档,但一些高级功能的说明不够详细 调试工具可以更丰富:虽然有中间文件,但缺少更直观的调试界面 算子开发门槛:写一个新算子需要理解不少概念,对新手不太友好 版本兼容性:依赖项比较多,版本冲突有时会出现 WebUI 功能还在完善:WebUI 刚发布不久,一些高级功能还不支持 业务限制:主要面向结构化数据处理,对于复杂的非结构化处理(如视频)支持还比较有限
架构问题:
Pipeline 的编译模式虽然安全,但对于动态流程(如根据数据决定下一步)支持不够灵活 Storage 按步骤保存对小数据量有点冗余,虽然可以接受
十、对比主流方案:差异在哪里?
DataFlow vs LangChain
| 定位 | ||
| 核心场景 | ||
| 输出 | ||
| 设计理念 | ||
| 差异总结 |
什么时候用 DataFlow:你需要准备训练数据时。什么时候用 LangChain:你要做一个 AI 应用时。
DataFlow vs LlamaIndex
| 定位 | ||
| 核心能力 | ||
| 数据流向 | ||
| 差异总结 |
什么时候用 DataFlow:你不仅需要 RAG,还需要生成训练数据时。什么时候用 LlamaIndex:你主要做 RAG 应用时。
DataFlow vs OpenAI(API)
| 定位 | ||
| 关系 | ||
| 差异总结 |
DataFlow 的价值:即使你只用 OpenAI,DataFlow 也能帮你把复杂的数据处理流程标准化、可复现。
十一、总结:这个项目值不值得关注?
我的答案是:非常值得关注——如果你属于以下人群。
适合谁?
正在做模型微调的团队
你会发现 DataFlow 简直是为你量身定做的 从数据准备到训练对接,一站式解决 AI 研究人员
需要可复现的数据处理流程 需要公平对比不同方法 DataFlow 的学术背景和你很配 想学习数据中心 AI 的开发者
这个项目的代码质量很高,设计很清晰 值得读源码学习 需要处理大量数据的中小团队
不用自己搭平台,直接用 DataFlow 节省几个月的开发时间
不适合谁?
只是想做个简单的 Chatbot
用 LangChain 更合适 DataFlow 对你来说太重了 完全没有 Python 基础
虽然有 WebUI,但要发挥全部威力还是需要写代码 可以先用 WebUI 试试看
最后说两句
在大模型时代,我们花了太多精力在模型架构上,而对数据的关注还远远不够。但实际上,好的数据往往比好的模型更重要。
DataFlow 代表了一个方向:把数据准备这件事工程化、标准化、可复用化。它来自顶尖研究团队,有扎实的学术成果,又有很好的工程实现——这样的项目在开源界并不多见。
如果你还在手写各种数据处理脚本,如果你还在为数据质量头疼,如果你想让你的数据准备流程可复现、可分享——去试试 DataFlow 吧。
GitHub 项目地址:https://github.com/OpenDCAI/DataFlow
夜雨聆风