乐于分享
好东西不私藏

Milvus VTS 源码级拆解: 2 种写入模式与分片策略,带你实战向量数据迁移的最佳实践

Milvus VTS 源码级拆解: 2 种写入模式与分片策略,带你实战向量数据迁移的最佳实践

🚩 2026 年「术哥无界」系列实战文档 X 篇原创计划 第 98 篇,Milvus 最佳实战「2026」系列第 7 篇

大家好,欢迎来到 术哥无界 | ShugeX | 运维有术

我是术哥,一名专注于 AI 编程、AI 智能体、Agent Skills、MCP、云原生、AIOps、Milvus 向量数据库的技术实践者与开源布道者

Talk is cheap, let’s explore。无界探索,有术而行。

Milvus VTS 信息图封面

做过向量数据库迁移的人都知道这事有多头疼。

不同向量数据库之间没有标准化的数据格式,Schema 互不兼容,传统 ETL 工具(比如 Airbyte、SeaTunnel 主线版本)压根不支持向量类型。写过迁移脚本的同学应该深有体会:光是处理 FloatVector 和 SparseFloatVector 的类型映射,就能耗掉一整天。

Zilliz 官方推出的 VTS(Vector Transport Service)试图从根本上解决这个问题。它基于 Apache SeaTunnel 框架,已经支持 Milvus、Pinecone、Qdrant、Elasticsearch、pgvector 等 8 种以上数据源之间的互相迁移。根据官方公布的基准测试数据,Pinecone → Milvus 迁移 1 亿向量,4 核 8GB 的机器上跑出了 2,961 vectors/s 的吞吐量。

翻了一圈 VTS 的源码(GitHub: zilliztech/vts),发现它的架构设计有不少值得细看的地方。今天这篇文章,就从源码层面拆解 VTS 的核心机制:分片策略怎么设计的、两种写入模式分别适合什么场景、错误处理有多完善、性能调优该怎么配参数。

说明:本文内容基于 VTS 源码(zilliztech/vts)和 Apache SeaTunnel 官方文档分析整理而成,源码分析基于笔者本地仓库版本,尚未在生产环境中完成全场景验证。文中的配置模板和参数建议仅供参考,实际效果请以你的业务数据和环境测试结果为准。如果有实际使用经验,欢迎在评论区分享交流。

1. 整体架构:Source → Transform → Sink

VTS 架构流程图

VTS 继承了 Apache SeaTunnel 的核心架构:Source(数据读取) → Transform(数据转换) → Sink(数据写入)。三段式设计的好处是每一段都可以独立扩展,新增一个数据源只需要实现对应的 Connector。

在 VTS 的 Java 代码里,Milvus Connector 的核心类结构如下:

Source 端

  • MilvusSource → MilvusSourceSplitEnumerator → MilvusSourceReader → MilvusBufferReader

Sink 端

  • MilvusSink → MilvusSinkWriter → MilvusBufferBatchWriter(直接写入)或 MilvusBulkWriter(Bulk Import)

这个类层级不是随便设计的。SplitEnumerator 负责把数据切成多个分片,Reader 负责从每个分片里读数据,Writer 负责把数据写到目标端。SplitEnumerator 和 Reader 之间通过分片分配机制解耦,Reader 可以在多个节点上并行执行。

VTS 还继承了 SeaTunnel Zeta 引擎的几个关键能力:

能力
说明
分布式快照
基于检查点(Checkpoint)的数据一致性保障
流批统一
同一套框架同时支持实时同步和离线批量导入
动态线程共享
多表同步时共享线程,减少资源浪费
WAL 预写日志
集群重启后可恢复作业状态

这些能力不是 VTS 自己实现的,而是 SeaTunnel 框架层提供的。VTS 在此基础上,增加了向量数据库专属的 Schema 匹配、类型映射和 Embedding Transform。

2. Source 端:分区感知的分片策略

分片策略决策流程图

VTS 的 Source 端最值得分析的是它的分片策略。分片决定了数据读取的并行度,直接影响迁移速度。

分片枚举器的工作流程

MilvusSourceSplitEnumerator 是分片策略的核心。它的 run() 方法逻辑很清晰:

  1. 遍历所有待迁移的 collection
  2. 对每个 collection 调用 generateSplits() 生成分片
  3. 把分片分配给注册的 Reader

关键在 generateSplits() 方法里。源码里,它先做了一件事:检查 collection 是否有 Partition Key

// MilvusSourceSplitEnumerator.java 第 116-118 行boolean hasPartitionKey =    describeCollectionResp.getCollectionSchema().getFieldSchemaList().stream()        .anyMatch(CreateCollectionReq.FieldSchema::getIsPartitionKey);

两种分片路径

根据 hasPartitionKey 的结果,分片走两条不同的路径:

路径 A:有 Partition Key

如果 collection 设置了 partition key,Milvus 内部会自动管理分区,VTS 直接按 collection 整体作为一个分片,然后通过 splitByOffset() 按 offset 拆分。

路径 B:没有 Partition Key,但有多个分区

如果 collection 没有设置 partition key,但手动创建了多个分区(partition),VTS 会先按分区分片,再对每个分区按 offset 拆分。这是两层并行。

// MilvusSourceSplitEnumerator.java 第 126-137 行if (!hasPartitionKey && partitionList.size() > 1) {for (String partitionName : partitionList) {        MilvusSourceSplit split = MilvusSourceSplit.builder()            .tablePath(table.getTablePath())            .splitId(String.format("%s-%s", table.getTablePath(), partitionName))            .partitionName(partitionName)            .build();        milvusSourceSplits.addAll(splitByOffset(split, parallelism));    }}

splitByOffset:按行数均分

splitByOffset() 是分片的精细控制逻辑。它的实现方式很直接:

  1. 对 collection(或分区)执行 count(*) 查询获取总行数
  2. 按 parallelism 参数均分
  3. 最后一个分片不设 limit,兜底剩余数据
// MilvusSourceSplitEnumerator.java 第 151-205 行(简化)long numOfEntities = // count(*) 查询结果long splitSize = Math.max(1, numOfEntities / parallelism);for (int i = 0; i < parallelism; i++) {long offset = i * splitSize;if (i == parallelism - 1) {// 最后一个分片不设 limit        newSplit = ...offset(offset).build();    } else {        newSplit = ...offset(offset).limit(splitSize).build();    }}

这里有个细节值得注意:如果 parallelism < 2splitByOffset() 直接返回单个分片,不做拆分。也就是说,并行度设为 1 时退化为单线程读取。

实际调优建议parallelism 不是越大越好。每个分片对应一个 Reader 线程,并行度太大会给 Milvus 服务端造成并发压力,反而容易触发 Rate Limit。从源码来看,一个合理的范围是 2 到 CPU 核心数之间。

3. Sink 端:双模式写入机制

双模式写入对比图

VTS 的 Sink 端有两种写入模式,这是理解它性能特征的关键。两种模式分别对应 MilvusBufferBatchWriter 和 MilvusBulkWriter 两个类。

BufferBatchWriter:Insert API 直写

这是默认模式。工作方式是:数据缓存到 milvusDataCache(一个 List<JsonObject>),当缓存大小达到 batch_size 时,调用 Milvus Insert API 批量写入。

// MilvusBufferBatchWriter.java 第 102-119 行publicvoidwrite(SeaTunnelRow element){    JsonObject data = milvusSinkConverter.buildMilvusData(        catalogTable, descriptionCollectionResp, milvusFieldMapper, element);    milvusDataCache.add(data);    writeCache.incrementAndGet();    writeCount.incrementAndGet();if (needCommit()) {        commit(true);    }}

needCommit() 的判断逻辑很简单:writeCache >= batchSize

优点:实现简单,延迟低,适合中小规模数据迁移。

缺点:数据量大时会给 Milvus 服务端造成写入压力;单条消息过大会报 received message larger than max 错误。

不过 VTS 在这里做了一个有意思的处理——遇到 Rate Limit 或消息过大的错误时,自动把 batch_size 减半重试

// MilvusBufferBatchWriter.java 第 172-199 行(简化)if (e.getMessage().contains("rate limit exceeded")        || e.getMessage().contains("received message larger than max")) {if (data.size() > 2) {this.batchSize = this.batchSize / 2;        Thread.sleep(60000);  // 等待 1 分钟// 对半拆分后递归重试        insertWrite(partitionName, firstHalf);        insertWrite(partitionName, secondHalf);    }}

递归减半直到 data.size() <= 2,如果还失败就抛异常。这是一个实用但稍显粗暴的自适应策略。

BulkWriter:Parquet + Import API

这是面向大规模数据迁移的模式。工作方式完全不同:

  1. 数据写入 Parquet 文件
  2. Parquet 文件上传到对象存储(S3 或 Azure Blob)
  3. 调用 Milvus Import API 从对象存储导入
// MilvusBulkWriter.java 核心逻辑RemoteBulkWriterParam param = RemoteBulkWriterParam.newBuilder()    .withCollectionSchema(describeCollectionResp.getCollectionSchema())    .withConnectParam(storageConnectParam)  // S3 或 Azure    .withChunkSize(stageBucket.getChunkSize() * 1024 * 1024)    .withRemotePath(stageBucket.getPrefix() + "/" + collectionName + "/" + partitionName)    .withFileType(BulkFileType.PARQUET)    .build();remoteBulkWriter = new RemoteBulkWriter(param);

BulkWriter 支持两种云存储:S3 兼容存储(通过 S3ConnectParam)和 Azure Blob(通过 AzureConnectParam)。选择逻辑基于 stageBucket.getCloudId() 判断是 az/azure 还是其他。

如果 stageBucket.getAutoImport() 为 true,写入完成后会自动调用 Milvus Import API 导入数据,并且会等待导入任务完成:

// MilvusBulkWriter.java 第 133-151 行publicvoidclose()throws Exception {    remoteBulkWriter.close();if (stageBucket.getAutoImport()) {        String objectFolder = ...;        milvusImport.importFolder(objectFolder);    }}publicvoidwaitJobFinish(){if (stageBucket.getAutoImport()) {        milvusImport.waitImportFinish();    }}

BulkWriter 适用场景:亿级向量的大规模迁移,尤其是 Milvus 集群间迁移。Import API 的吞吐远高于 Insert API。

两种模式怎么选?

维度
BufferBatchWriter
BulkWriter
写入方式
Milvus Insert API
Parquet → 对象存储 → Import API
适用规模
万级到百万级
百万级到亿级
额外依赖
S3 或 Azure 对象存储
配置复杂度
延迟
低(直接写入)
较高(需经过对象存储中转)
服务端压力
小(Import API 更友好)

一句话:数据量小(百万以下)用 BufferBatchWriter,数据量大(百万以上)用 BulkWriter。

4. Schema 转换与类型映射

跨向量数据库迁移,Schema 映射是最容易出问题的一环。VTS 通过 MilvusSchemaConverter 处理从 SeaTunnel 内部类型到 Milvus 类型的转换。

完整的类型映射表

从 MilvusSchemaConverter.convertSqlTypeToDataType() 方法可以提取出完整的映射关系:

SeaTunnel 类型
Milvus 类型
说明
BOOLEAN
Bool
布尔值
TINYINT
Int8
8 位整数
SMALLINT
Int16
16 位整数
INT
Int32
32 位整数
BIGINT
Int64
64 位整数
FLOAT
Float
单精度浮点
DOUBLE
Double
双精度浮点
STRING
VarChar
变长字符串,默认 maxLength=65535
MAP
JSON
JSON 对象
ARRAY
Array
数组,元素类型由 elementType 决定
FLOAT_VECTOR
FloatVector
浮点向量
BINARY_VECTOR
BinaryVector
二值向量
FLOAT16_VECTOR
Float16Vector
半精度浮点向量
BFLOAT16_VECTOR
BFloat16Vector
BF16 向量
SPARSE_FLOAT_VECTOR
SparseFloatVector
稀疏浮点向量
TIMESTAMP / TIMESTAMP_TZ
Timestamptz
时间戳
GEOMETRY
Geometry
地理空间数据
ROW
JSON
嵌套结构体

有几个点值得注意:

  • STRING 不一定是 VarChar。源码里有个判断:如果列的 options 里标记了 JSON = true,STRING 会被映射为 JSON 类型
  • ARRAY[Struct] 有特殊处理。如果数组元素类型是 ROW,元素类型会被设为 Struct 而不是按常规映射
  • 主键类型会被统一处理。不管源端是 INT 还是 BIGINT,只要是主键,都会被映射为 Int64;如果是字符串类型主键,映射为 VarChar

Geometry 字段的处理

VTS 对 Geometry(地理空间)字段有专门的配置。源码中定义了两个相关参数:

  • geometry_convert_mode(默认 passthrough):控制是否对 Geometry 字符串做格式转换。passthrough 模式直接透传,适合 Milvus → Milvus 迁移;parse 模式会自动检测 WKT/EWKT/GeoJSON/WKB 等格式并转换,适合从 Elasticsearch 或 PostgreSQL 迁移
  • geometry_string_coord_order(默认 lat_lon):控制裸坐标字符串(如 "31.23,121.47")的解析顺序。Elasticsearch 的 geo_point 习惯 lat 在前,PostgreSQL 某些输出格式 lon 在前。配错会导致坐标全部反转

5. 实战配置与性能调优

基础配置示例

一个完整的 Milvus → Milvus 迁移配置如下:

env {  parallelism = 4  job.mode = "BATCH"}source {  Milvus {    url = "https://source-milvus:19530"    token = "root:Milvus"    database = "default"    collections = ["articles", "products"]    batch_size = 1000  }}sink {  Milvus {    url = "https://target-milvus:19530"    token = "root:Milvus"    database = "default"    batch_size = 1000    enable_dynamic_field = true    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"    data_save_mode = "APPEND_DATA"  }}

如果不指定 collections,VTS 会读取整个 database 的所有 collection。

parallelism 与 batch_size 的权衡

这两个参数是性能调优的核心:

parallelism(源码中 MilvusSourceConfig.PARALLELISM,默认值 1):

  • 控制每个 collection 的分片数量
  • 每个分片由一个独立 Reader 读取
  • 设为 1 时不做 offset 拆分,退化为单线程
  • 建议设为 2 ~ CPU 核心数
  • 设太大会导致大量并发 count(*) 查询和 QueryIterator 请求

batch_size(源码中默认值 1000):

  • Source 端:QueryIteratorReq 的批量读取大小
  • Sink 端:Insert API 的批量写入大小
  • 源码中 BufferBatchWriter 在遇到 Rate Limit 时会自动减半,初始值可以设大一些

一个务实的调优思路:先按默认配置跑,观察日志里有没有 rate limit exceeded 的警告。有就把 batch_size 调小,没有就逐步调大,直到找到吞吐量和稳定性的平衡点。

Rate Limit 处理策略

从源码看,VTS 在 Source 和 Sink 两端都有 Rate Limit 处理:

Source 端MilvusBufferReader):

  • 检测到 rate limit exceeded 后最多重试 3 次
  • 每次重试间隔 30 秒rateLimitRetryIntervalMs = 30000
  • 3 次重试用尽后抛出 READ_DATA_FAIL 异常

Sink 端MilvusBufferBatchWriter):

  • 检测到 Rate Limit 或消息过大后,batch_size 自动减半
  • 等待 60 秒后递归重试
  • 减半直到 data.size() <= 2 还失败则抛异常

两端的策略不一样:Source 端是固定次数重试,Sink 端是自适应降级。Sink 端的策略更实用,因为 batch_size 过大本身可能是问题的根源。

自定义 Schema 的场景

VTS 的 Sink 端支持通过 field_schema 参数完全覆盖源端 Schema。这在源端和目标端 Schema 不一致的场景下很有用:

sink {  Milvus {    url = "https://target-milvus:19530"    token = "root:Milvus"    field_schema = [      {field_name = "id", data_type = 5, is_primary_key = true, auto_id = true}      {field_name = "embedding", data_type = 101, dimension = 768}      {field_name = "category", data_type = 21, max_length = 100, is_partition_key = true}    ]  }}

源码注释里写得很明确:当提供了 field_schema 时,只有其中定义的字段会被使用,源端 Schema 不再参考data_type 使用 Milvus 的数字编码,比如 5 = Int64,21 = VarChar,101 = FloatVector。

还支持从动态字段中提取数据,用 source_field_name 指定源端字段路径:

field_schema = [  {source_field_name = "metadata.title", field_name = "title", data_type = 21, max_length = 200}  {source_field_name = "metadata.tags", field_name = "tags", data_type = 22, element_type = 21}]

6. 错误处理体系

VTS 定义了从 MILVUS-01 到 MILVUS-28 共 28 个错误码,覆盖了连接、读写、Schema 操作等各个环节。按类型梳理如下:

错误码范围
类别
代表性错误
MILVUS-01
服务端响应
SERVER_RESPONSE_FAILED
MILVUS-02 ~ 05
Collection 操作
COLLECTION_NOT_FOUND, COLLECTION_NOT_LOADED
MILVUS-06
数据加载
COLLECTION_NOT_LOADED(读取前会检查 LoadState)
MILVUS-07
类型不支持
NOT_SUPPORT_TYPE
MILVUS-13 ~ 15
DDL 操作
CREATE_DATABASE/COLLECTION/INDEX_ERROR
MILVUS-16
客户端初始化
INIT_CLIENT_ERROR
MILVUS-17 ~ 18
数据读写
WRITE_DATA_FAIL, READ_DATA_FAIL
MILVUS-24 ~ 28
Bulk Import
INIT_WRITER_ERROR, IMPORT_JOB_FAILED, COMPLETED_WITH_ERRORS

有几个错误码在实际使用中比较常见:

MILVUS-06 COLLECTION_NOT_LOADEDSource 端在读取前会检查 Collection 的 LoadState,如果没加载就直接报错。迁移前确保 Collection 已 Load。

MILVUS-17 WRITE_DATA_FAIL写入失败。最常见的原因是 Rate Limit 或单条消息过大。从源码看,BufferBatchWriter 会自动处理这两种情况。

MILVUS-27 COMPLETED_WITH_ERRORSBulkWriter 的 Import 任务部分失败。这个错误码说明导入任务跑完了,但有些行没成功,需要关注错误行数是否超过阈值(MILVUS-28)。

你在迁移过程中遇到过类似的问题吗?欢迎在评论区聊聊你的踩坑经历。

7. 跨数据源迁移:从 Pinecone 到 Milvus

VTS 数据源生态图

VTS 不只是 Milvus → Milvus 的迁移工具。从源码的 connector 目录结构看,它已经支持以下数据源作为 Source:

  • 向量数据库:Milvus、Pinecone、Qdrant、Weaviate、ChromaDB
  • 搜索引擎:Elasticsearch
  • 关系型数据库:PostgreSQL(pgvector 扩展)
  • 云服务:腾讯 VectorDB、S3 Vector
  • 其他:MongoDB、Iceberg、Kafka、File

不同 Source 的 Connector 都遵循 SeaTunnel 的标准接口,差异主要在数据读取和 Schema 转换的实现上。以 Pinecone → Milvus 为例,VTS 会把 Pinecone 的向量数据转换为 SeaTunnel 内部的 SeaTunnelRow,再经过 MilvusSchemaConverter 转为 Milvus 的 Schema 定义,最终通过 Sink 写入。

官方基准测试数据:Pinecone → Milvus 迁移 1 亿向量,同步速率 2,961 vectors/s,总耗时约 9.5 小时,硬件配置仅 4 核 / 8GB 内存。这个数据说明 VTS 在有限资源下能保持不错的吞吐。

不过说实话,跨数据源迁移的坑主要集中在 Schema 不兼容 上。不同向量数据库对向量维度的约束、稀疏向量的表示方式、元数据的存储格式都不一样。VTS 的 Schema 转换能处理大部分情况,但遇到特殊的数据类型(比如某个数据库私有的向量编码格式),还是可能需要通过 field_schema 手动调整。

总结

从源码层面看,VTS 是一个设计比较务实的工具。它没有试图从零造轮子,而是站在 Apache SeaTunnel 的肩膀上,把精力集中在向量数据库的专属问题上。

几个关键设计决策值得肯定:

  1. 分区感知的分片策略:自动识别 Partition Key,根据分区信息做并行分片,对大规模数据迁移很关键
  2. 双模式写入:BufferBatchWriter 和 BulkWriter 分别覆盖小规模快速迁移和大规模批量导入两种场景
  3. 自适应错误恢复:Rate Limit 自动减半重试、固定次数重试等策略,在无人值守迁移时很实用
  4. 灵活的 Schema 控制field_schema 参数允许完全自定义目标端 Schema,包括动态字段提取

但也有需要注意的地方:项目在 GitHub 上只有 124 Star,社区规模还比较小,遇到问题可能得自己翻源码。好在代码结构清晰,核心类不超过 20 个,定位问题并不困难。

如果你的场景是 Milvus 内部迁移(比如集群升级),milvus-backup 可能更简单。但如果涉及跨数据源(从 Pinecone、ES、Qdrant 等迁到 Milvus),或者需要 Transform(字段映射、表名更改、Embedding 转换),VTS 是目前为数不多的选择。

好啦,谢谢你观看我的文章,如果喜欢可以点赞转发给需要的朋友,我们下一期再见!敬请期待!

扫码关注,获取更多 AI 工具的实战经验和最佳实践。不错过每一篇干货!

加入微信群,共同交流,共同进步!