乐于分享
好东西不私藏

Paimon 表 Compact 流程与源码详解

Paimon 表 Compact 流程与源码详解

Paimon 表 Compact 流程与源码详解

目录

  • 一、概述
  • 二、Compact 核心架构
  • 三、Compact 触发机制
  • 四、Append 表 Compact 流程详解
  • 五、主键表 Compact 流程详解
  • 六、UniversalCompaction 策略深度解析
  • 七、定时调度机制实现
  • 八、关键配置项详解
  • 九、源码核心类总结
  • 十、最佳实践与优化建议

一、概述

Apache Paimon 是一个流式数据湖存储系统,采用 LSM-Tree(Log-Structured Merge-Tree)架构来管理数据文件。Compact(压缩)是 Paimon 中至关重要的后台操作,用于:

  1. 减少文件数量:合并小文件,降低读取时的文件扫描开销
  2. 减少空间放大:删除过期数据,回收存储空间
  3. 优化查询性能:减少 Level 0 文件数,加快数据检索
  4. 生成 Changelog:通过 Compact 生成变更日志
  5. 维护删除向量:对于 MOW(Merge-on-Write)模式,维护 Deletion Vectors

本文将深入分析 Paimon 表的定时 Compact 机制,包括触发时机、调度机制、策略选择和执行流程,并结合源码进行详细解析。


二、Compact 核心架构

2.1 整体架构图

graph TB
    subgraph Writer[写入端]
        W1[数据写入] --> CM[CompactManager]
        CM --> |触发| CT[CompactTask]
    end

    subgraph Scheduler[定时调度]
        Timer[定时器] --> Coord[Coordinator]
        Coord --> |扫描快照| Files[文件列表]
        Files --> |生成| Tasks[CompactTask队列]
    end

    subgraph Executor[执行端]
        Tasks --> Compact[Compact执行器]
        CT --> Compact
        Compact --> Merge[文件合并]
        Merge --> NewFiles[新文件]
    end

    subgraph Storage[存储层]
        NewFiles --> Meta[元数据提交]
        Meta --> Snapshot[新快照]
    end

2.2 核心组件

2.2.1 CompactManager

CompactManager 是 Compact 的管理中心,负责:

  • 维护文件的层级结构(Levels)
  • 判断是否需要触发 Compact
  • 调用策略选择需要合并的文件
  • 提交异步 Compact 任务到线程池

主键表实现MergeTreeCompactManager
Append 表实现BucketedAppendCompactManager

// paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
publicinterfaceCompactManager{
// 判断是否应该等待 Compact 完成
booleanshouldWaitForLatestCompaction();

// 添加新文件到 Level 0
voidaddNewFile(DataFileMeta file);

// 触发 Compact
voidtriggerCompaction(boolean fullCompaction);

// 获取 Compact 结果
Optional<CompactResult> getCompactionResult(boolean blocking);
}

2.2.2 CompactStrategy

CompactStrategy 负责文件选择策略,决定哪些文件需要合并:

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
publicinterfaceCompactStrategy{
/**
     * 从 runs 中选择需要 compact 的单元
     * - compaction 是基于 runs 的,不是基于文件的
     * - level 0 是特殊的,一个文件对应一个 run
     * - 其他 level 是一个 level 对应一个 run
     * - compaction 从小 level 到大 level 依次进行
     */

Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
}

2.2.3 CompactTask

CompactTask 是实际执行合并的任务单元:

// paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
publicabstractclassCompactTaskimplementsCallable<CompactResult{
@Override
public CompactResult call()throws Exception {
long startMillis = System.currentTimeMillis();
        CompactResult result = doCompact();

// 记录指标
if (metricsReporter != null) {
            metricsReporter.reportCompactionTime(
                System.currentTimeMillis() - startMillis);
            metricsReporter.increaseCompactionsCompletedCount();
        }

return result;
    }

protectedabstract CompactResult doCompact()throws Exception;
}

三、Compact 触发机制

Paimon 支持三种 Compact 触发方式:

3.1 内联 Compact(Inline Compaction)

触发时机:数据写入过程中自动触发

核心代码MergeTreeWriter.prepareCommit()

// paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java:209-248
privatevoidflushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception 
{
if (writeBuffer.size() > 0) {
// 判断是否需要等待 Compact 完成
if (compactManager.shouldWaitForLatestCompaction()) {
            waitForLatestCompaction = true;
        }

// 刷写 buffer 到文件
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
                writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);

// ... 写入逻辑 ...

// 将新文件添加到 CompactManager
for (DataFileMeta fileMeta : dataWriter.result()) {
            newFiles.add(fileMeta);
            compactManager.addNewFile(fileMeta);
        }
    }

// 等待上一次 Compact 完成
    trySyncLatestCompaction(waitForLatestCompaction);

// 触发新的 Compact
    compactManager.triggerCompaction(forcedFullCompaction);
}

判断是否需要等待 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:105-113
@Override
publicbooleanshouldWaitForLatestCompaction(){
// 当 SortedRun 数量超过阈值时,阻塞写入等待 Compact
return levels.numberOfSortedRuns() > numSortedRunStopTrigger;
}

@Override
publicbooleanshouldWaitForPreparingCheckpoint(){
// 在 checkpoint 准备阶段,如果 runs 数量超过阈值+1,也要等待
return levels.numberOfSortedRuns() > (long) numSortedRunStopTrigger + 1;
}

关键配置

  • num-sorted-run.stop-trigger:默认 Integer.MAX_VALUE,当 SortedRun 数量超过此值时阻塞写入

3.2 定时 Compact Job

触发时机:独立的 Compact 作业,定时扫描表快照生成 Compact 任务

适用场景

  • 多个写作业同时写入同一张表
  • 需要将写入和 Compact 资源隔离
  • 大规模数据写入场景

配置方式

-- 在写表时禁用 Compact
CREATETABLE my_table (
    ...
WITH (
'write-only' = 'true'-- 跳过 Compact 和 Snapshot 过期
);

-- 启动专门的 Compact 作业
CALL sys.compact('default.my_table');

或使用 Action Jar:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action.jar \
    compact \
    --warehouse hdfs:///path/to/warehouse \
    --database default \
    --table my_table

定时间隔配置

-- 设置扫描间隔为 30 秒
CREATETABLE my_table (...) WITH (
'continuous.discovery-interval' = '30s'
);

3.3 手动触发 Compact

Flink SQL 方式

-- 对特定分区进行 Compact
CALL sys.compact(
`table` => 'default.my_table'
`partitions` => 'dt=2024-01-01',
`options` => 'sink.parallelism=10'
);

-- Full Compaction
CALL sys.compact(
`table` => 'default.my_table',
`compact_strategy` => 'full'
);

Spark 方式

val table = spark.table("default.my_table")
table.createOrReplaceTempView("tmp")
spark.sql("CALL paimon.sys.compact(table => 'tmp')")

四、Append 表 Compact 流程详解

4.1 整体流程图

sequenceDiagram
    participant Timer as 定时器
    participant Coord as AppendCompactCoordinator
    participant SubCoord as SubCoordinator
    participant Task as AppendCompactTask
    participant Writer as FileWriter
    participant Commit as Committer

    Timer->>Coord: 触发扫描(每10秒)
    Coord->>Coord: scan() 扫描快照
    Note over Coord: 批量读取100k个文件
    Coord->>SubCoord: notifyNewFiles(partition, files)
    SubCoord->>SubCoord: 过滤需要Compact的文件
    SubCoord->>SubCoord: agePack() 打包文件
    Note over SubCoord: 使用FileBin打包<br/>目标:2倍targetFileSize
    SubCoord->>Task: 生成CompactTask
    Task->>Writer: doCompact() 执行合并
    Writer->>Writer: 读取文件并合并
    Writer->>Commit: 生成新文件
    Commit->>Commit: 提交元数据

4.2 核心类:AppendCompactCoordinator

AppendCompactCoordinator 是 Append 表的 Compact 协调器,负责扫描快照、过滤文件、生成任务。

类定义

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:70-109
publicclassAppendCompactCoordinator{
privatestaticfinalint FILES_BATCH = 100_000;  // 批量处理文件数
protectedstaticfinalint REMOVE_AGE = 10;      // 移除年龄阈值
protectedstaticfinalint COMPACT_AGE = 5;      // 强制Compact年龄

privatefinal SnapshotManager snapshotManager;
privatefinallong targetFileSize;               // 目标文件大小
privatefinallong compactionFileSize;           // Compact阈值
privatefinaldouble deleteThreshold;            // 删除率阈值
privatefinalint minFileNum;                    // 最小文件数

// 按分区组织的子协调器
final Map<BinaryRow, SubCoordinator> subCoordinators = new HashMap<>();

public List<AppendCompactTask> run(){
// 扫描快照中的文件
if (scan()) {
// 生成 Compact 任务
return compactPlan();
        }
return Collections.emptyList();
    }
}

4.3 文件扫描逻辑

scan() 方法

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:122-148
booleanscan(){
    Map<BinaryRow, List<DataFileMeta>> files = new HashMap<>();

// 批量读取文件(每次最多 100,000 个)
for (int i = 0; i < FILES_BATCH; i++) {
        ManifestEntry entry;
try {
            entry = filesIterator.next();
        } catch (EndOfScanException e) {
if (!files.isEmpty()) {
                files.forEach(this::notifyNewFiles);
returntrue;
            }
throw e;
        }
if (entry == null) {
break;
        }
        BinaryRow partition = entry.partition();
        files.computeIfAbsent(partition, k -> new ArrayList<>()).add(entry.file());
    }

if (files.isEmpty()) {
returnfalse;
    }

    files.forEach(this::notifyNewFiles);
returntrue;
}

文件过滤条件

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:467-482
privatebooleanshouldCompact(BinaryRow partition, DataFileMeta file){
// 条件1: 文件大小小于 compaction.file-size
if (file.fileSize() < compactionFileSize) {
returntrue;
    }

// 条件2: 删除率超过阈值(启用 Deletion Vector 时)
return tooHighDeleteRatio(partition, file);
}

privatebooleantooHighDeleteRatio(BinaryRow partition, DataFileMeta file){
if (dvMaintainerCache != null) {
        DeletionFile deletionFile =
                dvMaintainerCache.dvMaintainer(partition).getDeletionFile(file.fileName());
if (deletionFile != null) {
            Long cardinality = deletionFile.cardinality();
long rowCount = file.rowCount();
return cardinality == null || cardinality > rowCount * deleteThreshold;
        }
    }
returnfalse;
}

4.4 文件打包策略:SubCoordinator

每个分区有一个 SubCoordinator,负责该分区的文件打包和任务生成。

打包逻辑

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:233-274
private List<List<DataFileMeta>> agePack() {
    List<List<DataFileMeta>> packed;

if (dvMaintainerCache == null) {
// 普通打包模式
        packed = pack(toCompact);
    } else {
// Deletion Vector 模式:按删除文件分组
        packed = packInDeletionVectorVMode(toCompact);
    }

if (packed.isEmpty()) {
// 如果没有可打包的,增加年龄
if (++age > COMPACT_AGE && toCompact.size() > 1) {
// 年龄超过阈值,强制打包所有文件
            List<DataFileMeta> all = new ArrayList<>(toCompact);
            toCompact.clear();
            packed = Collections.singletonList(all);
        }
    }

return packed;
}

private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
// 按文件大小排序
    ArrayList<DataFileMeta> files = new ArrayList<>(toCompact);
    files.sort(Comparator.comparingLong(DataFileMeta::fileSize));

    List<List<DataFileMeta>> result = new ArrayList<>();
    FileBin fileBin = new FileBin();

for (DataFileMeta fileMeta : files) {
        fileBin.addFile(fileMeta);
if (fileBin.enoughContent()) {
// 满足打包条件:文件数>1 且 总大小 >= 2*targetFileSize
            result.add(fileBin.drain());
        }
    }

if (fileBin.enoughInputFiles()) {
// 文件数满足最小要求
        result.add(fileBin.drain());
    }

return result;
}

FileBin 打包单元

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:324-352
privateclassFileBin{
    List<DataFileMeta> bin = new ArrayList<>();
long totalFileSize = 0;

publicvoidaddFile(DataFileMeta file){
        totalFileSize += file.fileSize() + openFileCost;
        bin.add(file);
    }

privatebooleanenoughContent(){
// 文件数 > 1 且 总大小 >= 2 倍 targetFileSize
return bin.size() > 1 && totalFileSize >= targetFileSize * 2;
    }

privatebooleanenoughInputFiles(){
// 文件数 >= minFileNum
return bin.size() >= minFileNum;
    }
}

4.5 任务执行:AppendCompactTask

任务定义

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java:44-67
publicclassAppendCompactTask{
privatefinal BinaryRow partition;
privatefinal List<DataFileMeta> compactBefore;  // 待合并文件
privatefinal List<DataFileMeta> compactAfter;   // 合并后文件

publicAppendCompactTask(BinaryRow partition, List<DataFileMeta> files){
this.partition = partition;
        compactBefore = new ArrayList<>(files);
        compactAfter = new ArrayList<>();
    }

public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite write)
throws Exception 
{
// 执行文件合并
// ... 合并逻辑 ...
    }
}

4.6 Bucketed Append 表的 Compact

对于有 Bucket 的 Append 表,使用 BucketedAppendCompactManager

文件选择逻辑

// paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java:201-227
Optional<List<DataFileMeta>> pickCompactBefore() {
if (toCompact.isEmpty()) {
return Optional.empty();
    }

long totalFileSize = 0L;
int fileNum = 0;
    LinkedList<DataFileMeta> candidates = new LinkedList<>();

// 使用优先队列按序号排序文件
while (!toCompact.isEmpty()) {
        DataFileMeta file = toCompact.poll();
        candidates.add(file);
        totalFileSize += file.fileSize();
        fileNum++;

if (fileNum >= minFileNum) {
// 满足最小文件数要求
return Optional.of(candidates);
        } elseif (totalFileSize >= targetFileSize) {
// 总大小超过目标,移除第一个文件继续
            DataFileMeta removed = candidates.pollFirst();
            totalFileSize -= removed.fileSize();
            fileNum--;
        }
    }

    toCompact.addAll(candidates);
return Optional.empty();
}

五、主键表 Compact 流程详解

5.1 LSM-Tree 架构

主键表采用 LSM-Tree(Log-Structured Merge-Tree)架构,数据文件分为多个 Level:

Level 0:  [File1] [File2] [File3] ...  (可能有重叠)
          ↓ Compact
Level 1:  [File_L1_1 | File_L1_2 | File_L1_3]  (有序,无重叠)
          ↓ Compact  
Level 2:  [File_L2_1 --------- | File_L2_2 ---------]
          ↓ Compact
Level N:  [File_LN_1 -------------------------------]

Level 特点

  • Level 0:新写入的文件,文件之间可能有 Key 重叠,一个文件对应一个 SortedRun
  • **Level 1+**:合并后的文件,同一 Level 内文件按 Key 有序且不重叠,一个 Level 对应一个 SortedRun

5.2 核心类:MergeTreeCompactManager

类定义

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:53-102
publicclassMergeTreeCompactManagerextendsCompactFutureManager{
privatefinal ExecutorService executor;
privatefinal Levels levels;                      // LSM-Tree 层级结构
privatefinal CompactStrategy strategy;           // Compact 策略
privatefinal Comparator<InternalRow> keyComparator;
privatefinallong compactionFileSize;
privatefinalint numSortedRunStopTrigger;
privatefinal CompactRewriter rewriter;

@Nullableprivatefinal CompactionMetrics.Reporter metricsReporter;
@Nullableprivatefinal BucketedDvMaintainer dvMaintainer;
@Nullableprivatefinal RecordLevelExpire recordLevelExpire;

// ... 构造方法 ...
}

5.3 文件添加与触发 Compact

添加新文件到 Level 0

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:116-124
@Override
publicvoidaddNewFile(DataFileMeta file){
    levels.addLevel0File(file);
    MetricUtils.safeCall(this::reportMetrics, LOG);
}

触发 Compact 逻辑

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:127-194
@Override
publicvoidtriggerCompaction(boolean fullCompaction){
    Optional<CompactUnit> optionalUnit;
    List<LevelSortedRun> runs = levels.levelSortedRuns();

if (fullCompaction) {
// 强制全量 Compact
        Preconditions.checkState(taskFuture == null,
"A compaction task is still running while forcing a new compaction.");

        optionalUnit = CompactStrategy.pickFullCompaction(
                levels.numberOfLevels(),
                runs,
                recordLevelExpire,
                dvMaintainer,
                forceRewriteAllFiles);
    } else {
// 正常 Compact
if (taskFuture != null) {
return;  // 已有任务在运行
        }

// 使用策略选择文件
        optionalUnit = strategy.pick(levels.numberOfLevels(), runs)
                .filter(unit -> !unit.files().isEmpty())
                .filter(unit ->
                        unit.files().size() > 1
                        || unit.files().get(0).level() != unit.outputLevel());
    }

    optionalUnit.ifPresent(unit -> {
// 判断是否可以删除旧记录
boolean dropDelete = unit.outputLevel() != 0
                && (unit.outputLevel() >= levels.nonEmptyHighestLevel()
                        || dvMaintainer != null);

        submitCompaction(unit, dropDelete);
    });
}

5.4 Compact 任务提交

submitCompaction() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:201-245
privatevoidsubmitCompaction(CompactUnit unit, boolean dropDelete){
    CompactTask task;

if (unit.fileRewrite()) {
// 文件级别重写(仅修改元数据)
        task = new FileRewriteCompactTask(rewriter, unit, dropDelete, metricsReporter);
    } else {
// 正常 Compact(读取合并数据)
        task = new MergeTreeCompactTask(
                keyComparator,
                compactionFileSize,
                rewriter,
                unit,
                dropDelete,
                levels.maxLevel(),
                metricsReporter,
                compactDfSupplier,
                recordLevelExpire,
                forceRewriteAllFiles);
    }

if (LOG.isDebugEnabled()) {
        LOG.debug("Pick these files (name, level, size) for {} compaction: {}",
                task.getClass().getSimpleName(),
                unit.files().stream()
                        .map(file -> String.format("(%s, %d, %d)",
                                file.fileName(), file.level(), file.fileSize()))
                        .collect(Collectors.joining(", ")));
    }

// 提交到线程池
    taskFuture = executor.submit(task);

if (metricsReporter != null) {
        metricsReporter.increaseCompactionsQueuedCount();
        metricsReporter.increaseCompactionsTotalCount();
    }
}

5.5 MergeTreeCompactTask 执行逻辑

doCompact() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java:82-113
@Override
protected CompactResult doCompact()throws Exception {
    List<List<SortedRun>> candidate = new ArrayList<>();
    CompactResult result = new CompactResult();

// 遍历分区(IntervalPartition 将重叠文件分组)
for (List<SortedRun> section : partitioned) {
if (section.size() > 1) {
// 有多个 Run,需要合并
            candidate.add(section);
        } else {
            SortedRun run = section.get(0);
// 单个 Run:根据文件大小决定是否需要重写
for (DataFileMeta file : run.files()) {
if (file.fileSize() < minFileSize) {
// 小文件:需要重写
                    candidate.add(singletonList(SortedRun.fromSingle(file)));
                } else {
// 大文件:先重写之前的候选,然后升级当前文件
                    rewrite(candidate, result);
                    upgrade(file, result);
                }
            }
        }
    }

// 重写剩余的候选文件
    rewrite(candidate, result);
    result.setDeletionFile(compactDfSupplier.get());
return result;
}

upgrade() 方法(文件升级)

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java:123-138
privatevoidupgrade(DataFileMeta file, CompactResult toUpdate)throws Exception {
// 需要重写的情况:
// 1. 输出到最高层且包含删除记录
// 2. 强制重写所有文件
// 3. 包含过期记录
if ((outputLevel == maxLevel && containsDeleteRecords(file))
            || forceRewriteAllFiles
            || containsExpiredRecords(file)) {
        List<List<SortedRun>> candidate = new ArrayList<>();
        candidate.add(singletonList(SortedRun.fromSingle(file)));
        rewriteImpl(candidate, toUpdate);
return;
    }

// 仅修改文件的 level 元数据(无需重写数据)
if (file.level() != outputLevel) {
        CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
        toUpdate.merge(upgradeResult);
        upgradeFilesNum++;
    }
}

5.6 IntervalPartition 算法

IntervalPartition 用于将多个数据文件划分为最少数量的 SortedRun,处理文件间的 Key 重叠:

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java:32-91
publicclassIntervalPartition{
privatefinal List<DataFileMeta> files;
privatefinal Comparator<InternalRow> keyComparator;

/**
     * 返回二维列表:
     * - 外层列表:sections(不同 section 的 Key 区间不重叠)
     * - 内层列表:SortedRuns(同一 section 内的多个 Run)
     */

public List<List<SortedRun>> partition() {
        List<List<SortedRun>> result = new ArrayList<>();
        List<DataFileMeta> section = new ArrayList<>();
        InternalRow bound = null;

// 按 minKey 排序
        files.sort((o1, o2) -> {
int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
return leftResult == 0
                    ? keyComparator.compare(o1.maxKey(), o2.maxKey())
                    : leftResult;
        });

for (DataFileMeta meta : files) {
// 如果当前文件的 minKey > 上一个 section 的 bound
// 说明没有重叠,开始新的 section
if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
                result.add(partition(section));
                section.clear();
                bound = null;
            }
            section.add(meta);

// 更新 bound 为当前 section 的最大 maxKey
if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
                bound = meta.maxKey();
            }
        }

if (!section.isEmpty()) {
            result.add(partition(section));
        }

return result;
    }
}

5.7 Levels 结构维护

Levels 类管理 LSM-Tree 的层级结构

// paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java:38-98
publicclassLevels{
privatefinal Comparator<InternalRow> keyComparator;
privatefinal TreeSet<DataFileMeta> level0;  // Level 0 文件(按序号排序)
privatefinal List<SortedRun> levels;        // Level 1+ 文件

publicvoidupdate(List<DataFileMeta> before, List<DataFileMeta> after){
// 按 Level 分组
        Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
        Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);

// 更新每个 Level
for (int i = 0; i < numberOfLevels(); i++) {
            updateLevel(i,
                    groupedBefore.getOrDefault(i, emptyList()),
                    groupedAfter.getOrDefault(i, emptyList()));
        }
    }

privatevoidupdateLevel(int level, List<DataFileMeta> before, List<DataFileMeta> after){
if (level == 0) {
// Level 0: 从 TreeSet 中移除旧文件,添加新文件
            before.forEach(level0::remove);
            level0.addAll(after);
        } else {
// Level 1+: 从 SortedRun 中移除旧文件,添加新文件,重新排序
            List<DataFileMeta> files = new ArrayList<>(runOfLevel(level).files());
            files.removeAll(before);
            files.addAll(after);
            levels.set(level - 1, SortedRun.fromUnsorted(files, keyComparator));
        }
    }
}

六、UniversalCompaction 策略深度解析

6.1 策略概述

UniversalCompaction 是 Paimon 的核心 Compact 策略,源自 RocksDB 的 Universal Compaction。

设计目标

  • 降低写放大(Write Amplification)
  • 控制空间放大(Space Amplification)
  • 平衡读放大(Read Amplification)

6.2 策略参数

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:42-64
publicclassUniversalCompactionimplementsCompactStrategy{
privatefinalint maxSizeAmp;              // 最大空间放大百分比
privatefinalint sizeRatio;               // 大小比例阈值
privatefinalint numRunCompactionTrigger; // 触发 Compact 的 Run 数量

@Nullableprivatefinal FullCompactTrigger fullCompactTrigger;
@Nullableprivatefinal OffPeakHours offPeakHours;
}

配置项对应

参数
配置键
默认值
说明
maxSizeAmp compaction.max-size-amplification-percent
200
最大空间放大百分比
sizeRatio compaction.size-ratio
1
大小比例阈值
numRunCompactionTrigger num-sorted-run.compaction-trigger
5
触发 Compact 的 Run 数

6.3 策略决策流程

pick() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:67-107
@Override
public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs){
int maxLevel = numLevels - 1;

// 【优先级0】尝试定时全量 Compact
if (fullCompactTrigger != null) {
        Optional<CompactUnit> unit = fullCompactTrigger.tryFullCompact(numLevels, runs);
if (unit.isPresent()) {
return unit;
        }
    }

// 【优先级1】检查空间放大
    CompactUnit unit = pickForSizeAmp(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
            LOG.debug("Universal compaction due to size amplification");
        }
return Optional.of(unit);
    }

// 【优先级2】检查大小比例
    unit = pickForSizeRatio(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
            LOG.debug("Universal compaction due to size ratio");
        }
return Optional.of(unit);
    }

// 【优先级3】检查文件数量
if (runs.size() > numRunCompactionTrigger) {
// 超过阈值,触发 Compact
int candidateCount = runs.size() - numRunCompactionTrigger + 1;
if (LOG.isDebugEnabled()) {
            LOG.debug("Universal compaction due to file num");
        }
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
    }

return Optional.empty();
}

6.4 详细策略说明

6.4.1 定时全量 Compact(FullCompactTrigger)

触发条件

  1. 距离上次全量 Compact 的时间间隔超过 compaction.optimization-interval
  2. 文件总大小小于 compaction.total-size-threshold
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java:64-88
public Optional<CompactUnit> tryFullCompact(int numLevels, List<LevelSortedRun> runs){
if (runs.size() == 1) {
return Optional.empty();
    }

int maxLevel = numLevels - 1;

// 条件1: 时间间隔触发
if (fullCompactionInterval != null) {
if (lastFullCompaction == null
                || currentTimeMillis() - lastFullCompaction > fullCompactionInterval) {
            LOG.debug("Universal compaction due to full compaction interval");
            updateLastFullCompaction();
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
        }
    }

// 条件2: 总大小触发
if (totalSizeThreshold != null) {
long totalSize = 0;
for (LevelSortedRun run : runs) {
            totalSize += run.run().totalSize();
        }
if (totalSize < totalSizeThreshold) {
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
        }
    }

return Optional.empty();
}

配置示例

CREATETABLE my_table (...) WITH (
'compaction.optimization-interval' = '1 h',  -- 每小时全量 Compact
'compaction.total-size-threshold' = '10 GB'-- 总大小小于10GB时全量 Compact
);

6.4.2 空间放大检查(pickForSizeAmp)

空间放大定义:额外空间占比 = (所有文件大小 – 最早文件大小) / 最早文件大小 × 100%

触发条件:当空间放大超过 maxSizeAmp 时,触发全量 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:125-147
CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs){
if (runs.size() < numRunCompactionTrigger) {
returnnull;
    }

// 计算候选文件总大小(除了最早的文件)
long candidateSize =
            runs.subList(0, runs.size() - 1).stream()
                    .map(LevelSortedRun::run)
                    .mapToLong(SortedRun::totalSize)
                    .sum();

// 最早文件的大小
long earliestRunSize = runs.get(runs.size() - 1).run().totalSize();

// 空间放大 = candidateSize * 100 / earliestRunSize
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
if (fullCompactTrigger != null) {
            fullCompactTrigger.updateLastFullCompaction();
        }
// 触发全量 Compact
return CompactUnit.fromLevelRuns(maxLevel, runs);
    }

returnnull;
}

示例

假设 maxSizeAmp = 200,有以下文件:

  • Run 0: 10 MB
  • Run 1: 20 MB
  • Run 2: 30 MB
  • Run 3: 100 MB (最早)

空间放大 = (10 + 20 + 30) × 100 / 100 = 60% < 200%,不触发

如果 Run 3 只有 20 MB: 空间放大 = 60 × 100 / 20 = 300% > 200%,触发全量 Compact

6.4.3 大小比例检查(pickForSizeRatio)

触发条件:当较小的 Run 的总大小与较大 Run 的比例超过阈值时,触发 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:163-182
public CompactUnit pickForSizeRatio(
int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick)
{
long candidateSize = candidateSize(runs, candidateCount);

// 从 candidateCount 开始,向后扩展
for (int i = candidateCount; i < runs.size(); i++) {
        LevelSortedRun next = runs.get(i);

// 计算比例:(candidateSize * (100 + sizeRatio + offPeakRatio)) / 100
if (candidateSize * (100.0 + sizeRatio + ratioForOffPeak()) / 100.0
                < next.run().totalSize()) {
// 下一个文件太大,停止扩展
break;
        }

// 继续包含下一个文件
        candidateSize += next.run().totalSize();
        candidateCount++;
    }

if (forcePick || candidateCount > 1) {
return createUnit(runs, maxLevel, candidateCount);
    }

returnnull;
}

示例

假设 sizeRatio = 1(即 100%),有以下 Runs:

  • Run 0: 10 MB
  • Run 1: 15 MB
  • Run 2: 40 MB
  • Run 3: 100 MB

从 Run 0 开始:

  1. candidateSize = 10 MB
  2. 判断:10 × (100 + 100) / 100 = 20 MB < 15 MB?,包含 Run 1
  3. candidateSize = 25 MB
  4. 判断:25 × 200 / 100 = 50 MB < 40 MB?,包含 Run 2
  5. candidateSize = 65 MB
  6. 判断:65 × 200 / 100 = 130 MB < 100 MB?,包含 Run 3
  7. candidateSize = 165 MB
  8. 没有更多 Run,返回所有 4 个 Runs 的 CompactUnit

6.4.4 文件数量检查

触发条件:当 Run 数量超过 numRunCompactionTrigger 时,触发 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:96-104
if (runs.size() > numRunCompactionTrigger) {
// 超过阈值,触发 Compact
int candidateCount = runs.size() - numRunCompactionTrigger + 1;
if (LOG.isDebugEnabled()) {
        LOG.debug("Universal compaction due to file num");
    }
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
}

示例

假设 numRunCompactionTrigger = 5,当前有 8 个 Runs:

  • candidateCount = 8 – 5 + 1 = 4
  • 调用 pickForSizeRatio(maxLevel, runs, 4) 选择至少 4 个 Runs 进行 Compact

6.5 输出 Level 的确定

createUnit() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:197-226
CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount){
int outputLevel;

if (runCount == runs.size()) {
// 合并所有 Runs,输出到最高层
        outputLevel = maxLevel;
    } else {
// 输出到下一个 Run 的 level - 1
        outputLevel = Math.max(0, runs.get(runCount).level() - 1);
    }

if (outputLevel == 0) {
// 不输出到 Level 0,向后扩展
for (int i = runCount; i < runs.size(); i++) {
            LevelSortedRun next = runs.get(i);
            runCount++;
if (next.level() != 0) {
                outputLevel = next.level();
break;
            }
        }
    }

if (runCount == runs.size()) {
if (fullCompactTrigger != null) {
            fullCompactTrigger.updateLastFullCompaction();
        }
        outputLevel = maxLevel;
    }

return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));
}

6.6 Off-Peak Hours 优化

定义:在非高峰时段使用更激进的 Compact 策略

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:184-186
privateintratioForOffPeak(){
return offPeakHours == null ? 0 : offPeakHours.currentRatio(LocalDateTime.now().getHour());
}

配置示例

CREATETABLE my_table (...) WITH (
'compaction.offpeak.start.hour' = '2',   -- 凌晨2点开始
'compaction.offpeak.end.hour' = '6',     -- 早上6点结束
'compaction.offpeak-ratio' = '20'-- 非高峰期使用20%的比例
);

在非高峰时段(2:00-6:00),sizeRatio 会加上 offpeak-ratio,使 Compact 更容易触发。


七、定时调度机制实现

7.1 Flink 流式 Compact 作业架构

graph LR
    subgraph AppendTable[Append表Compact]
        AT1[定时器] --> AT2[AppendBypassCoordinateOperator]
        AT2 --> AT3[生成CompactTask]
        AT3 --> AT4[下游执行器]
    end

    subgraph PrimaryKeyTable[主键表Compact]
        PT1[CompactorSource] --> PT2[定时扫描]
        PT2 --> PT3[发现需要Compact的Bucket]
        PT3 --> PT4[StoreCompactOperator]
        PT4 --> PT5[执行Compact]
    end

7.2 Append 表定时调度

7.2.1 AppendBypassCoordinateOperator

核心类AppendBypassCoordinateOperator 是 Flink 的一个 OneInputStreamOperator,负责定时生成 Compact 任务。

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:49-86
publicclassAppendBypassCoordinateOperator<CommitT>
extendsAbstractStreamOperator<Either<CommitTAppendCompactTask>>
implementsOneInputStreamOperator<CommitTEither<CommitTAppendCompactTask>>,
ProcessingTimeCallback
{

privatestaticfinallong MAX_PENDING_TASKS = 5000;

privatefinal FileStoreTable table;
privatetransient ScheduledExecutorService executorService;
privatetransient LinkedBlockingQueue<AppendCompactTask> compactTasks;

@Override
publicvoidopen()throws Exception {
super.open();

// 获取扫描间隔
long intervalMs = table.coreOptions().continuousDiscoveryInterval().toMillis();

// 初始化任务队列
this.compactTasks = new LinkedBlockingQueue<>();

// 创建协调器
        AppendCompactCoordinator coordinator = new AppendCompactCoordinator(table, truenull);

// 创建定时执行器
this.executorService =
                Executors.newSingleThreadScheduledExecutor(
                        newDaemonThreadFactory("Compaction Coordinator"));

// 定时异步生成任务
this.executorService.scheduleWithFixedDelay(
                () -> asyncPlan(coordinator), 0, intervalMs, TimeUnit.MILLISECONDS);

// 定时发送任务到下游
this.getProcessingTimeService().scheduleWithFixedDelay(this0, intervalMs);
    }
}

异步生成任务

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:88-102
privatevoidasyncPlan(AppendCompactCoordinator coordinator){
// 持续生成任务,直到队列满或没有新任务
while (compactTasks.size() < MAX_PENDING_TASKS) {
try {
            List<AppendCompactTask> tasks = coordinator.run();
            compactTasks.addAll(tasks);
if (tasks.isEmpty()) {
break;
            }
        } catch (Throwable t) {
            LOG.error("Fatal exception happened when generating compaction tasks.", t);
this.throwable = t;
break;
        }
    }
}

发送任务到下游

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:104-113
@Override
publicvoidonProcessingTime(long time){
// 从队列中取出任务,发送到下游
while (true) {
        AppendCompactTask task = compactTasks.poll();
if (task == null) {
return;
        }
        output.collect(new StreamRecord<>(Either.Right(task)));
    }
}

7.2.2 Flink Job 构建

AppendTableCompact 类

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java:87-128
publicvoidbuild(){
// 构建 Source
    DataStreamSource<AppendCompactTask> source = buildSource();

// 如果配置了分区空闲时间,过滤任务
if (!isContinuous && partitionIdleTime != null) {
        Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
long historyMilli =
                LocalDateTime.now()
                        .minus(partitionIdleTime)
                        .atZone(ZoneId.systemDefault())
                        .toInstant()
                        .toEpochMilli();

        SingleOutputStreamOperator<AppendCompactTask> filterStream =
                source.filter(task -> {
                    BinaryRow partition = task.partition();
return partitionInfo.get(partition) <= historyMilli;
                });
        source = new DataStreamSource<>(filterStream);
    }

// 构建完整的 Flink 作业
    sinkFromSource(source);
}

private DataStreamSource<AppendCompactTask> buildSource(){
// 获取扫描间隔
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();

// 创建 Source
    AppendTableCompactSource source =
new AppendTableCompactSource(table, isContinuous, scanInterval, partitionPredicate);

return AppendTableCompactSource.buildSource(env, source, tableIdentifier);
}

7.3 主键表定时调度

7.3.1 CompactorSource

核心类CompactorSource 定时扫描表快照,发现需要 Compact 的 Bucket。

工作流程

  1. 定时扫描表快照(间隔由 continuous.discovery-interval 控制)
  2. 比较当前快照与上次快照,发现新增/修改的文件
  3. 找出需要 Compact 的 partition-bucket 组合
  4. 生成 Compact 记录发送到下游

关键配置

// paimon-api/src/main/java/org/apache/paimon/CoreOptions.java:464-468
publicstaticfinal ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
        key("continuous.discovery-interval")
                .durationType()
                .defaultValue(Duration.ofSeconds(10))
                .withDescription("The discovery interval of continuous reading.");

7.3.2 StoreCompactOperator

核心类StoreCompactOperator 接收 Compact 任务并执行。

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java:133-180
@Override
publicvoidprocessElement(StreamRecord<RowData> element)throws Exception {
    RowData record = element.getValue();

long snapshotId = record.getLong(0);
    BinaryRow partition = deserializeBinaryRow(record.getBinary(1));
int bucket = record.getInt(2);
byte[] serializedFiles = record.getBinary(3);
    List<DataFileMeta> files = dataFileMetaSerializer.deserializeList(serializedFiles);

if (write.streamingMode()) {
// 流式模式:通知新文件
        write.notifyNewFiles(snapshotId, partition, bucket, files);
    }

// 记录需要 Compact 的 partition-bucket
    waitToCompact.add(Pair.of(partition, bucket));
}

@Override
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException 
{
try {
// 执行所有待 Compact 的 partition-bucket
for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
            write.compact(partitionBucket.getKey(), partitionBucket.getRight(), fullCompaction);
        }
    } catch (Exception e) {
thrownew RuntimeException("Exception happens while executing compaction.", e);
    }
    waitToCompact.clear();

// 准备提交
    List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
return committables;
}

7.4 调度流程图

sequenceDiagram
    participant Timer as 定时器
    participant Source as CompactorSource
    participant Enum as Enumerator
    participant Op as StoreCompactOperator
    participant CM as CompactManager
    participant Exec as CompactExecutor
    participant Commit as Committer

    Timer->>Source: 触发扫描(每10秒)
    Source->>Enum: discoverSplits()
    Enum->>Enum: 扫描快照,发现新文件
    Enum->>Source: 返回 CompactTask
    Source->>Op: 发送记录
    Op->>CM: write.compact(partition, bucket)
    CM->>CM: triggerCompaction()
    CM->>Exec: 提交异步任务
    Exec->>Exec: 执行文件合并
    Exec-->>CM: 返回 CompactResult
    CM->>Op: 更新 Levels
    Op->>Commit: prepareCommit()
    Commit->>Commit: 提交新快照

八、关键配置项详解

8.1 定时调度配置

配置项
默认值
类型
说明
continuous.discovery-interval
10s
Duration
Compact 任务的扫描间隔,控制多久检查一次是否有新文件需要 Compact

调优建议

  • 高频写入场景:缩短间隔(如 5s),及时发现需要 Compact 的文件
  • 低频写入场景:延长间隔(如 30s-1min),减少不必要的扫描开销

8.2 Compact 策略配置

配置项
默认值
类型
说明
num-sorted-run.compaction-trigger
5
Integer
触发 Compact 的 Run 数量阈值
num-sorted-run.stop-trigger
Integer.MAX_VALUE
Integer
阻塞写入等待 Compact 的 Run 数量阈值
compaction.max-size-amplification-percent
200
Integer
最大空间放大百分比,超过此值触发全量 Compact
compaction.size-ratio
1
Integer
大小比例阈值,控制是否合并相邻的 Runs
compaction.optimization-interval
Duration
定时全量 Compact 间隔
compaction.total-size-threshold
MemorySize
触发全量 Compact 的总文件大小阈值

调优建议

1. 控制文件数量

-- 更激进的 Compact:Run 数量达到 3 就触发
CREATETABLE my_table (...) WITH (
'num-sorted-run.compaction-trigger' = '3'
);

2. 控制空间放大

-- 更严格的空间控制:100% 空间放大就触发
CREATETABLE my_table (...) WITH (
'compaction.max-size-amplification-percent' = '100'
);

3. 定时全量 Compact

-- 每天凌晨2点执行全量 Compact
CREATETABLE my_table (...) WITH (
'compaction.optimization-interval' = '24 h'
);

8.3 文件大小配置

配置项
默认值
类型
说明
target-file-size
128 MB
MemorySize
目标文件大小,Compact 后的文件尽量接近此大小
compaction.file-size
target-file-size
MemorySize
触发 Compact 的文件大小阈值,小于此值的文件会被合并
compaction.min.file-num
5
Integer
Append 表触发 Compact 的最小文件数

调优建议

-- 小表场景:减小文件大小,加快 Compact
CREATETABLE small_table (...) WITH (
'target-file-size' = '64 MB',
'compaction.min.file-num' = '3'
);

-- 大表场景:增大文件大小,减少文件数量
CREATETABLE large_table (...) WITH (
'target-file-size' = '256 MB',
'compaction.min.file-num' = '10'
);

8.4 Off-Peak Hours 配置

配置项
默认值
类型
说明
compaction.offpeak.start.hour
-1
Integer
非高峰时段开始时间(小时,0-23),-1 表示禁用
compaction.offpeak.end.hour
-1
Integer
非高峰时段结束时间(小时,0-23),-1 表示禁用
compaction.offpeak-ratio
0
Integer
非高峰时段的额外 Compact 比例

示例

-- 凌晨2点到6点使用更激进的 Compact 策略
CREATETABLE my_table (...) WITH (
'compaction.offpeak.start.hour' = '2',
'compaction.offpeak.end.hour' = '6',
'compaction.offpeak-ratio' = '20',
'compaction.size-ratio' = '1'
);
-- 在非高峰时段,effective size-ratio = 1 + 20 = 21

8.5 其他重要配置

配置项
默认值
类型
说明
write-only
false
Boolean
是否仅写入不 Compact,适用于专门的 Compact 作业场景
compaction.delete-ratio-threshold
0.3
Double
删除率阈值,启用 Deletion Vector 时使用
prepare-commit.wait-compaction
false
Boolean
Checkpoint 时是否等待 Compact 完成

专门 Compact 作业配置

-- 写表配置
CREATETABLE my_table (...) WITH (
'write-only' = 'true'-- 禁用 Compact
);

-- Compact 作业配置
CALL sys.compact(
`table` => 'default.my_table',
`options` => 'continuous.discovery-interval=5s,sink.parallelism=20'
);

九、源码核心类总结

9.1 Compact 管理类

类名
路径
说明
CompactManager paimon-core/src/main/java/org/apache/paimon/compact/
Compact 管理器接口
MergeTreeCompactManager paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
主键表 Compact 管理器
BucketedAppendCompactManager paimon-core/src/main/java/org/apache/paimon/append/
Append 表(有 Bucket)Compact 管理器
AppendCompactCoordinator paimon-core/src/main/java/org/apache/paimon/append/
Append 表 Compact 协调器

9.2 Compact 策略类

类名
路径
说明
CompactStrategy paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
Compact 策略接口
UniversalCompaction paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
Universal Compaction 策略实现
FullCompactTrigger paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
定时全量 Compact 触发器
ForceUpLevel0Compaction paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
强制 Level 0 Compact 策略
OffPeakHours paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
非高峰时段配置

9.3 Compact 任务类

类名
路径
说明
CompactTask paimon-core/src/main/java/org/apache/paimon/compact/
Compact 任务抽象类
MergeTreeCompactTask paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
主键表 Compact 任务
FileRewriteCompactTask paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
文件重写任务(仅修改元数据)
AppendCompactTask paimon-core/src/main/java/org/apache/paimon/append/
Append 表 Compact 任务
CompactResult paimon-core/src/main/java/org/apache/paimon/compact/
Compact 结果
CompactUnit paimon-core/src/main/java/org/apache/paimon/compact/
Compact 单元(待合并的文件集合)

9.4 数据结构类

类名
路径
说明
Levels paimon-core/src/main/java/org/apache/paimon/mergetree/
LSM-Tree 层级结构
SortedRun paimon-core/src/main/java/org/apache/paimon/mergetree/
有序文件集合
LevelSortedRun paimon-core/src/main/java/org/apache/paimon/mergetree/
带 Level 信息的 SortedRun
IntervalPartition paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
文件分区算法(处理重叠文件)

9.5 Flink 集成类

类名
路径
说明
AppendBypassCoordinateOperator paimon-flink/paimon-flink-common/.../flink/source/
Append 表 Compact 协调算子
StoreCompactOperator paimon-flink/paimon-flink-common/.../flink/sink/
主键表 Compact 执行算子
CompactorSourceBuilder paimon-flink/paimon-flink-common/.../flink/source/
Compact Source 构建器
AppendTableCompact paimon-flink/paimon-flink-common/.../flink/compact/
Append 表 Compact 作业构建器
CompactAction paimon-flink/paimon-flink-common/.../flink/action/
Compact Action 实现

9.6 写入相关类

类名
路径
说明
MergeTreeWriter paimon-core/src/main/java/org/apache/paimon/mergetree/
主键表写入器
AppendOnlyWriter paimon-core/src/main/java/org/apache/paimon/append/
Append 表写入器
AbstractFileStoreWrite paimon-core/src/main/java/org/apache/paimon/operation/
文件存储写入抽象类
KeyValueFileStoreWrite paimon-core/src/main/java/org/apache/paimon/operation/
主键表写入实现
BaseAppendFileStoreWrite paimon-core/src/main/java/org/apache/paimon/operation/
Append 表写入实现

十、最佳实践与优化建议

10.1 场景一:多写作业共享表

问题:多个作业同时写入同一张表,内联 Compact 会导致文件冲突。

解决方案:使用专门的 Compact 作业

配置

-- 写表配置:禁用 Compact
CREATETABLE shared_table (
idBIGINT,
nameSTRING,
    dt STRING,
    PRIMARY KEY (id, dt) NOTENFORCED
) PARTITIONED BY (dt) WITH (
'write-only' = 'true',
'bucket' = '8'
);

-- 专门的 Compact 作业
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action.jar \
    compact \
--warehouse hdfs:///warehouse \
--database default \
--table shared_table \
--continuous \
--table_conf continuous.discovery-interval=10s \
--table_conf sink.parallelism=16

优点

  • 写作业之间无冲突
  • 资源隔离,Compact 不影响写入性能
  • 可以独立调整 Compact 并行度

10.2 场景二:大规模数据写入

问题:高吞吐写入导致 Level 0 文件数量激增,影响查询性能。

解决方案:调整 Compact 策略,加快 Compact 速度

配置

CREATETABLE high_throughput_table (...) WITH (
-- 减少触发阈值,更频繁地 Compact
'num-sorted-run.compaction-trigger' = '3',

-- 增大目标文件大小,减少文件数量
'target-file-size' = '256 MB',

-- 设置阻塞阈值,防止 Level 0 文件过多
'num-sorted-run.stop-trigger' = '10',

-- 使用更激进的空间放大控制
'compaction.max-size-amplification-percent' = '150'
);

Compact 作业并行度

--table_conf sink.parallelism=32  # 增大并行度加快 Compact

10.3 场景三:低延迟查询要求

问题:Level 0 文件过多导致查询延迟较高。

解决方案:优化 Compact 策略,保持 Level 0 文件数量较少

配置

CREATETABLE low_latency_table (...) WITH (
-- 更激进的 Compact 触发
'num-sorted-run.compaction-trigger' = '2',

-- 定时全量 Compact,确保查询性能
'compaction.optimization-interval' = '1 h',

-- Checkpoint 时等待 Compact 完成
'prepare-commit.wait-compaction' = 'true'
);

10.4 场景四:定时离峰 Compact

问题:白天业务高峰期 Compact 占用资源,影响写入和查询。

解决方案:配置 Off-Peak Hours,在夜间进行更激进的 Compact

配置

CREATETABLE peak_sensitive_table (...) WITH (
-- 白天使用温和的 Compact 策略
'compaction.size-ratio' = '5',
'num-sorted-run.compaction-trigger' = '10',

-- 夜间2点到6点使用激进策略
'compaction.offpeak.start.hour' = '2',
'compaction.offpeak.end.hour' = '6',
'compaction.offpeak-ratio' = '30'
);

效果

  • 白天:effective size-ratio = 5,Compact 触发较少
  • 夜间:effective size-ratio = 5 + 30 = 35,Compact 更频繁

10.5 场景五:Append 表小文件过多

问题:Append 表写入产生大量小文件。

解决方案:调整文件打包策略

配置

CREATETABLE append_table (
idBIGINT,
dataSTRING,
    dt STRING
) PARTITIONED BY (dt) WITH (
-- 降低最小文件数要求
'compaction.min.file-num' = '3',

-- 减小 Compact 文件大小阈值
'compaction.file-size' = '64 MB',

-- 增大目标文件大小
'target-file-size' = '256 MB',

-- 缩短扫描间隔
'continuous.discovery-interval' = '5s'
);

10.6 监控指标

关键指标

  1. Level 0 文件数level0.file.count

    • 正常范围:< 10
    • 告警阈值:> 20
  2. Compact 延迟compaction.time.ms

    • 正常范围:< 5min
    • 告警阈值:> 15min
  3. Compact 队列长度compactions.queued.count

    • 正常范围:< 5
    • 告警阈值:> 10
  4. 总文件大小total.file.size.bytes

    • 监控空间放大情况

Flink Metrics

// paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
publicinterfaceReporter{
voidreportLevel0FileCount(long count);
voidreportCompactionTime(long milliseconds);
voidincreaseCompactionsQueuedCount();
voidincreaseCompactionsCompletedCount();
voidreportTotalFileSize(long bytes);
}

10.7 故障排查

问题1:Compact 任务失败

排查步骤

  1. 检查日志中的异常信息
  2. 确认是否有并发写冲突
  3. 检查资源是否充足(内存、磁盘)

解决方案

  • 启用 write-only 模式,使用专门的 Compact 作业
  • 增加 Compact 作业的资源配置

问题2:Compact 速度慢

排查步骤

  1. 检查 compactions.queued.count 指标
  2. 查看 Compact 任务的并行度
  3. 确认文件大小和数量

解决方案

  • 增大 Compact 并行度:sink.parallelism
  • 调整策略参数:降低 num-sorted-run.compaction-trigger
  • 增大文件大小:target-file-size

问题3:内存溢出(OOM)

排查步骤

  1. 检查 Compact 任务的堆内存使用
  2. 查看单个 Compact 任务处理的文件数量
  3. 确认是否有大文件

解决方案

  • 增大 TaskManager 内存
  • 减小 target-file-size
  • 限制单次 Compact 的文件数量

总结

Paimon 的定时 Compact 机制是保证数据湖性能的关键组件。本文详细分析了:

  1. 三种触发方式:内联 Compact、定时 Compact Job、手动触发
  2. Append 表流程:基于 AppendCompactCoordinator 的文件扫描、打包和任务生成
  3. 主键表流程:基于 LSM-Tree 的 Levels 管理和 MergeTreeCompactManager
  4. UniversalCompaction 策略:四级策略(定时全量、空间放大、大小比例、文件数量)
  5. 定时调度机制:基于 Flink 的流式处理框架,使用 ScheduledExecutorService 和 ProcessingTimeService
  6. 关键配置项:扫描间隔、策略参数、文件大小、Off-Peak Hours 等
  7. 最佳实践:多写作业、大规模写入、低延迟查询、离峰 Compact、小文件处理等场景的优化方案

通过合理配置和监控,可以在写入性能、查询性能和存储成本之间取得良好的平衡。


本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » Paimon 表 Compact 流程与源码详解

评论 抢沙发

3 + 4 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮