具身智能数据集怎么做?OpenClaw 数据采集全流程
"数据是新时代的石油"这句话在具身智能领域要改一改——数据是新时代的铀矿:稀缺、难采、提炼成本高,但一旦搞定,能量巨大。今天我们聊具身智能领域最头疼的问题之一:数据从哪来,怎么采,怎么管。OpenClaw 提供了一套完整的数据工具链,我们从头拆解。
一、为什么数据是具身智能的瓶颈?
大语言模型靠互联网上几万亿 token 的文本数据起飞。视觉模型靠 LAION-5B 等数十亿张图片的数据集崛起。但具身智能呢?看几个数字就知道差距有多大:
差了 5-6 个数量级。这不是个小数字。根本原因有三个:
- 物理交互不可并行
:你可以开 1000 个爬虫同时抓网页,但不能同时操作 1000 台机器人(至少目前不能便宜地做到)。 - 硬件异构性
:不同机器人的形态、自由度、传感器配置都不同,数据难以通用。一个 7-DoF 机械臂的数据,灵巧手用不了。 - 标注成本极高
:不只是打个标签的事——你需要精确的关节轨迹、力反馈数据、接触点信息,这些都需要专业设备记录。
所以仿真数据生成成了当前最主要的扩充数据的方式,这也是 OpenClaw 重点支持的方向。
二、数据采集方式对比
在具身智能领域,获取数据主要有以下几种方式:
2.1 遥操作(Teleoperation)
操作者通过遥操作设备(手套、手柄、主从臂)实时控制机器人,记录轨迹。
- 优点
:数据质量最高,直接来自人类操作经验 - 缺点
:速度慢、成本高、操作者疲劳 - 典型设备
:GELLO、Dexterous Teleoperation Glove、SpaceMouse - 采集速度
:约 30-60 条轨迹/小时
2.2 VR 采集
用 VR 头显和手柄在虚拟环境中操作,记录手部动作映射到机器人。
- 优点
:无需真实机器人,操作直觉性好 - 缺点
:VR 到机器人的运动映射存在误差 - 典型设备
:Meta Quest 3、Apple Vision Pro(实验性) - 采集速度
:约 40-80 条轨迹/小时
2.3 视觉演示(Video Demonstration)
从人类操作视频中提取动作信息,映射到机器人动作空间。
- 优点
:数据来源广泛,YouTube 上有海量操作视频 - 缺点
:视觉到动作的映射精度有限,retargeting 是难题 - 代表工作
:DexMV、MimicGen、HOI4D
2.4 仿真自动生成
在仿真环境中用脚本化策略或 RL 策略自动生成大量数据。
- 优点
:速度极快、可无限扩充、标注精确、完全可控 - 缺点
:sim-to-real gap,仿真数据的多样性依赖场景设计 - 采集速度
:数千到数万条轨迹/小时(取决于并行度)
结论:实践中通常是 仿真自动生成(量)+ 少量遥操作(质) 的组合方案。
三、OpenClaw 数据格式规范
在动手采集之前,先理解数据怎么存。OpenClaw 采用 HDF5 格式,结构如下:
dataset.hdf5├── metadata/ # 元数据│ ├── robot_type: "leap_hand"│ ├── task: "grasp_cube"│ ├── sim_engine: "mujoco"│ ├── control_freq: 20│ ├── total_episodes: 500│ └── collection_date: "2026-03-26"├── episode_0/ # 第 0 条轨迹│ ├── observations/ # 观测数据│ │ ├── joint_pos: (T, 16) # 关节位置│ │ ├── joint_vel: (T, 16) # 关节速度│ │ ├── fingertip_pos: (T, 4, 3) # 指尖位置│ │ ├── object_pos: (T, 3) # 物体位置│ │ ├── object_quat: (T, 4) # 物体四元数│ │ └── images/ # 可选:图像观测│ │ ├── wrist_cam: (T, 224, 224, 3)│ │ └── third_person: (T, 224, 224, 3)│ ├── actions: (T, 16) # 动作序列│ ├── rewards: (T,) # 奖励序列│ ├── dones: (T,) # 终止标志│ └── info/ # 附加信息│ ├── is_success: bool│ ├── grasp_time: int│ └── lift_height: float├── episode_1/│ └── ...└── episode_499/ └── ...用 Python 读取数据的方式:
"""inspect_dataset.py - 检查数据集结构和统计信息"""import h5pyimport numpy as npdefinspect_dataset(data_path):with h5py.File(data_path, "r") as f:# 读取元数据 meta = dict(f["metadata"].attrs)print("=== 数据集元数据 ===")for k, v in meta.items():print(f" {k}: {v}")# 统计信息 episodes = [k for k in f.keys() if k.startswith("episode_")]print(f"\n总轨迹数: {len(episodes)}")# 分析第一条轨迹 ep0 = f["episode_0"]print(f"\n=== Episode 0 结构 ===")print(f" 轨迹长度: {ep0['actions'].shape[0]} 步")print(f" 动作维度: {ep0['actions'].shape[1]}") obs = ep0["observations"]for key in obs.keys():ifisinstance(obs[key], h5py.Dataset):print(f" obs/{key}: shape={obs[key].shape}, "f"dtype={obs[key].dtype}")# 统计成功率 successes = sum(1for ep in episodesif f[ep]["info"].attrs.get("is_success", False) )print(f"\n成功轨迹: {successes}/{len(episodes)} "f"({successes/len(episodes):.1%})")# 动作分布统计 all_actions = np.concatenate([ f[ep]["actions"][:] for ep in episodes[:50] # 采样 50 条 ])print(f"\n=== 动作统计(采样 50 条轨迹)===")print(f" 均值: {all_actions.mean(axis=0)[:4]}...")print(f" 标准差: {all_actions.std(axis=0)[:4]}...")print(f" 最大值: {all_actions.max():.3f}")print(f" 最小值: {all_actions.min():.3f}")if __name__ == "__main__": inspect_dataset("data/demos/grasp_cube/dataset.hdf5")四、遥操作采集实战
如果你有 LEAP Hand 硬件和遥操作设备,可以这样采集真实数据。
4.1 硬件连接
[操作者手套/手柄] --USB/蓝牙--> [PC] --串口--> [LEAP Hand] | [RealSense D435] --USB--> (物体追踪)4.2 软件配置
# configs/data/teleop_config.yamlteleop:input_device:type:"rokoko_glove"# 手套类型# type: "spacemouse" # 或 SpaceMousemapping:mode:"joint_retargeting"# 关节重定向映射smoothing:0.8# 动作平滑系数dead_zone:0.02# 死区阈值robot:type:"leap_hand"port:"/dev/ttyUSB0"control_freq:20perception:camera:type:"realsense_d435"resolution: [640, 480]fps:30object_tracker:type:"aruco"# ArUco 标记追踪marker_size:0.03recording:save_dir:"data/real_demos/"save_format:"hdf5"save_images:trueimage_resize: [224, 224]auto_segment:true# 自动分割轨迹4.3 采集脚本
"""teleop_collect.py - 遥操作数据采集"""from openclaw.data.collector import TeleopCollectorfrom openclaw.deploy.real_robot import RealRobotInterfacefrom openclaw.deploy.perception import ObjectTrackerfrom openclaw.data.teleop import TeleopInputdefcollect_teleop_data():# 初始化硬件 robot = RealRobotInterface( robot_type="leap_hand", serial_port="/dev/ttyUSB0", control_freq=20, ) input_device = TeleopInput( device_type="rokoko_glove", mapping="joint_retargeting", ) tracker = ObjectTracker(camera_type="realsense_d435")# 创建采集器 collector = TeleopCollector( robot=robot, input_device=input_device, object_tracker=tracker, save_dir="data/real_demos/grasp_cube/", save_format="hdf5", )print("=" * 50)print("遥操作数据采集")print("=" * 50)print("操作说明:")print(" - 戴上手套后自然抓取桌面上的物体")print(" - 按 [S] 开始录制当前轨迹")print(" - 按 [E] 结束当前轨迹")print(" - 按 [D] 丢弃当前轨迹(操作失误时)")print(" - 按 [Q] 退出采集")print("=" * 50)# 开始采集循环 collector.run_interactive( max_episode_length=200, auto_label_success=True, # 自动标注成功/失败 success_criteria={"lift_height": 0.10, }, )# 打印采集统计 stats = collector.get_stats()print(f"\n采集完成!")print(f" 总轨迹: {stats['total']}")print(f" 成功: {stats['success']}")print(f" 丢弃: {stats['discarded']}")print(f" 平均时长: {stats['avg_length']:.1f} 步")if __name__ == "__main__": collect_teleop_data()五、仿真自动采集:批量生产数据
仿真采集是扩充数据量的核心手段。OpenClaw 支持多进程并行采集,单机就能高效产出大规模数据集。
"""sim_batch_collect.py - 仿真环境批量数据采集"""from openclaw.core.env import make_vec_envfrom openclaw.data.collector import SimBatchCollectorfrom openclaw.tasks.grasp.expert import ScriptedGraspExpertdefbatch_collect():# 创建并行环境 envs = make_vec_env( task="grasp_cube", robot="leap_hand", sim_engine="mujoco", num_envs=32, # 32 个并行环境 seed=42, )# 创建专家策略(带随机化,增加多样性) expert = ScriptedGraspExpert( noise_std=0.05, # 给专家动作加噪声 grasp_style_randomize=True, # 随机抓取风格 )# 创建批量采集器 collector = SimBatchCollector( envs=envs, policy=expert, save_dir="data/sim_demos/grasp_cube_large/", save_format="hdf5", )# 采集配置 stats = collector.collect( num_episodes=10000, # 采集 1 万条轨迹 only_successful=True, # 只保留成功的 domain_randomization={ # 环境随机化"object_size": [0.03, 0.06],"object_mass": [0.05, 0.3],"friction": [0.5, 2.0],"init_pos_noise": 0.05, }, save_images=False, # 不保存图像(节省空间) chunk_size=1000, # 每 1000 条存一个文件 num_workers=8, # 8 个数据写入线程 verbose=True, )print(f"\n=== 批量采集完成 ===")print(f" 目标轨迹数: 10000")print(f" 实际采集: {stats['total_attempts']} 次")print(f" 成功保存: {stats['saved_episodes']} 条")print(f" 成功率: {stats['success_rate']:.1%}")print(f" 总耗时: {stats['elapsed_time']:.1f} 秒")print(f" 采集速度: {stats['episodes_per_second']:.1f} 条/秒")print(f" 数据大小: {stats['total_size_mb']:.1f} MB") envs.close()if __name__ == "__main__": batch_collect()# 运行批量采集(32 并行环境,约 20-30 分钟完成 1 万条)python sim_batch_collect.py性能参考(RTX 3090,32 并行环境):
六、数据清洗与质量控制
采集完不是终点,原始数据需要清洗。垃圾数据进去,垃圾策略出来——garbage in, garbage out。
"""clean_dataset.py - 数据清洗与质量控制"""from openclaw.data.dataset import DemoDatasetfrom openclaw.data.cleaning import DataCleanerdefclean_data():# 加载原始数据 dataset = DemoDataset("data/sim_demos/grasp_cube_large/")print(f"原始数据: {len(dataset)} 条轨迹")# 创建清洗器 cleaner = DataCleaner(dataset)# Step 1: 移除失败轨迹 cleaner.filter_by_success(keep_only_success=True)print(f"过滤失败轨迹后: {len(cleaner)} 条")# Step 2: 移除异常轨迹 cleaner.filter_outliers(# 轨迹长度异常(过短或过长) length_range=(20, 200),# 动作幅度异常 action_magnitude_max=2.0,# 奖励异常(可能是 bug 产生的) reward_range=(-100, 500), )print(f"过滤异常后: {len(cleaner)} 条")# Step 3: 动作平滑度检查 cleaner.filter_by_smoothness( max_jerk=5.0, # 最大加加速度阈值 max_discontinuity=0.5, # 最大不连续性 )print(f"平滑度过滤后: {len(cleaner)} 条")# Step 4: 去重(相似轨迹只保留一条) cleaner.deduplicate( similarity_threshold=0.95, # DTW 相似度阈值 sample_ratio=0.1, # 采样比例(全量计算太慢) )print(f"去重后: {len(cleaner)} 条")# Step 5: 计算质量分数 cleaner.compute_quality_scores( metrics=["smoothness", "efficiency", "success_margin"], weights=[0.3, 0.3, 0.4], )# 保存清洗后的数据集 cleaner.save( output_dir="data/sim_demos/grasp_cube_cleaned/",format="hdf5", include_quality_scores=True, )# 打印质量报告 report = cleaner.quality_report()print(f"\n=== 数据质量报告 ===")print(f" 清洗前: {report['original_count']} 条")print(f" 清洗后: {report['cleaned_count']} 条")print(f" 保留率: {report['retention_rate']:.1%}")print(f" 平均质量分: {report['avg_quality_score']:.3f}")print(f" 质量分分布: "f"Q1={report['quality_q1']:.3f}, "f"Q2={report['quality_q2']:.3f}, "f"Q3={report['quality_q3']:.3f}")if __name__ == "__main__": clean_data()七、数据集管理:存储、版本控制、共享
数据量大了,管理就成了问题。几个实用建议:
7.1 存储方案
# 目录结构建议data/├── raw/ # 原始数据(不可修改)│ ├── sim_grasp_v1/│ └── real_teleop_v1/├── cleaned/ # 清洗后的数据│ ├── sim_grasp_v1_clean/│ └── real_teleop_v1_clean/├── splits/ # 训练/验证/测试划分│ ├── train.json # 文件列表│ ├── val.json│ └── test.json└── registry.json # 数据集注册表7.2 版本控制
代码用 Git,数据用 DVC(Data Version Control)或者 HuggingFace Datasets:
# 方案 A:使用 DVCpip install dvc dvc-s3dvc initdvc add data/cleaned/sim_grasp_v1_clean/git add data/cleaned/sim_grasp_v1_clean.dvc .gitignoregit commit -m "add cleaned grasp dataset v1"dvc push # 推送到远程存储(S3/OSS)# 方案 B:使用 HuggingFace Hubpip install huggingface_hubpython -c "from huggingface_hub import HfApiapi = HfApi()api.upload_folder( folder_path='data/cleaned/sim_grasp_v1_clean/', repo_id='your-org/openclaw-grasp-dataset', repo_type='dataset',)"7.3 数据集注册表
"""register_dataset.py - 注册数据集到本地注册表"""import jsonfrom datetime import datetimeregistry_path = "data/registry.json"new_entry = {"name": "sim_grasp_cube_v1","version": "1.0.0","path": "data/cleaned/sim_grasp_v1_clean/","task": "grasp_cube","robot": "leap_hand","source": "simulation","num_episodes": 8500,"avg_episode_length": 142,"success_rate": 1.0, # 清洗后全部成功"avg_quality_score": 0.82,"created_at": datetime.now().isoformat(),"description": "仿真采集的灵巧手抓取方块数据集,经过清洗和去重","tags": ["grasp", "cube", "leap_hand", "sim"],}# 读取现有注册表try:withopen(registry_path, "r") as f: registry = json.load(f)except FileNotFoundError: registry = {"datasets": []}# 添加新条目registry["datasets"].append(new_entry)withopen(registry_path, "w") as f: json.dump(registry, f, indent=2, ensure_ascii=False)print(f"数据集 '{new_entry['name']}' 已注册")八、开源数据集资源推荐
站在巨人的肩膀上。以下是具身智能领域值得关注的开源数据集:
| DROID | ||||
| Bridge V2 | ||||
| RH20T | ||||
| Open X-Embodiment | ||||
| DexArt | ||||
| DexGraspNet | ||||
| OAKINK2 |
使用建议:
- 起步阶段
:用 OpenClaw 的仿真数据生成工具快速产出数据,验证算法可行性。 - 提升阶段
:引入 Open X-Embodiment 等大规模多任务数据,做预训练。 - 部署阶段
:在目标任务上采集少量真实遥操作数据做微调,弥补 sim-to-real gap。
总结
这一篇我们系统梳理了具身智能数据集的全生命周期:
- 现状
:数据是瓶颈,比 NLP/CV 少 5-6 个数量级 - 采集方式
:遥操作求质,仿真求量,两者结合是主流方案 - 数据格式
:HDF5 存储,观测-动作-奖励三元组,元数据完备 - 仿真采集
:32 并行环境 + 域随机化,30 分钟可产出 1 万条轨迹 - 质量控制
:过滤、去噪、去重、质量评分,一个都不能少 - 管理
:DVC 做版本控制,注册表做索引,HuggingFace Hub 做共享
核心观点:具身智能的数据问题不会被某个天才算法一次性解决,它需要的是系统工程——采集工具链、质量控制流程、数据管理基础设施。 OpenClaw 正在朝这个方向构建完整的工具链,而整个社区的协作(如 Open X-Embodiment)正在逐步改变数据稀缺的现状。数据够了,算法才有底气。下一个阶段,我们会看到具身智能像 LLM 一样进入 scaling law 的快车道。
AI开源纪 — 解码前沿技术,连接开源世界。 关注我们,一起见证AI开源时代。
夜雨聆风