
说到并行计算,很多人第一反应是开多个线程,每个线程处理一部分数据。
但有个问题:如果某个线程任务特别重,其他线程处理完了只能干等着,整体效率被拖慢。
Java7引入的Fork/Join框架,用一种叫工作窃取(Work Stealing)的算法完美解决这个问题:忙碌的线程从空闲线程"偷"任务,大家都能高效利用CPU。
这篇文章从源码层面拆解Fork/Join框架的核心原理:分治思想、工作窃取算法、ForkJoinPool线程池设计、任务队列实现。
一、分治思想与工作窃取算法
1.1 分治思想
Fork/Join的核心是分治:把大任务拆成小任务,小任务并行执行,最后合并结果。
大任务
↓ Fork拆分
┌───┬───┬───┬───┐
│T1 │T2 │T3 │T4 │ 小任务并行执行
└───┴───┴───┴───┘
↓ Join合并
最终结果
典型场景:
大数组求和 并行排序 矩阵运算 文件批量处理
1.2 工作窃取算法
传统线程池问题:共享任务队列,所有线程竞争同一个队列,锁竞争激烈。
Fork/Join的改进:每个线程维护自己的任务队列。
线程1队列: [T1, T2, T3] → 处理T1
线程2队列: [T4, T5] → 处理T4
线程3队列: [T6] → 处理T6(快完了)
线程4队列: [] → 空闲!
线程4发现线程1队列还有T2、T3,"窃取"一个任务:
线程4队列: [T2] → 处理T2
关键设计:
每个线程双端队列,自己从队尾取任务(LIFO) 其他线程从队头"窃取"任务(FIFO) 窃取操作无锁竞争,用CAS实现
二、核心组件源码解析
2.1 ForkJoinPool架构
publicclassForkJoinPoolextendsAbstractExecutorService{
volatile WorkQueue[] workQueues; // 工作队列数组
final ForkJoinWorkerThreadFactory factory; // 线程工厂
volatilelong ctl; // 线池控制状态(活跃线程数、偷取计数等)
}
WorkQueue:每个线程的任务队列
staticfinalclassWorkQueue{
ForkJoinTask<?>[] array; // 任务数组(双端队列)
int base; // 队头索引(窃取位置)
int top; // 队尾索引(添加/取出位置)
final ForkJoinPool pool; // 所属池
final ForkJoinWorkerThread owner; // 所属线程(null表示共享队列)
int stealCount; // 窃取计数
}
2.2 ForkJoinTask核心方法
publicabstractclassForkJoinTask<V> implementsFuture<V> {
// 拆分任务,加入当前线程队列
publicfinal ForkJoinTask<V> fork(){
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
((ForkJoinWorkerThread)t).workQueue.push(this);
} else {
ForkJoinPool.common.externalPush(this);
}
returnthis;
}
// 等待任务完成并获取结果
publicfinal V join(){
int s = doJoin();
if ((s & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
// 核心执行逻辑
privateintdoJoin(){
int s; ForkJoinTask<?> t;
while ((s = status) >= 0) {
if ((t = getCompletion(this, s)) != null)
t.awaitDone(s); // 任务未完成,等待
elseif ((s = compareAndSwapStatus(s, COMPLETING)) == s) {
try {
V r = doExec(); // 执行任务
setRawResult(r);
compareAndSwapStatus(COMPLETING, NORMAL);
} finally {
if (status != NORMAL)
tryComplete();
}
break;
}
}
return s;
}
// 执行任务compute方法
final V doExec(){
int s; V r;
if ((s = status) >= 0) {
r = compute(); // 用户实现的计算逻辑
if (r != null || (s = compareAndSwapStatus(s, COMPLETING)) == s)
setRawResult(r);
s = status;
}
return getRawResult();
}
}
2.3 工作窃取源码实现
// 线程从自己队列取任务(队尾LIFO)
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; int s;
if ((a = array) != null && (s = top) != base) {
long j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, j);
if (U.compareAndSwapObject(a, j, t, null)) { // CAS取出
top = s;
return t;
}
}
returnnull;
}
// 其他线程窃取任务(队头FIFO)
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b;
while ((b = base) != top && (a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (t != null && base == b &&
U.compareAndSwapObject(a, j, t, null)) { // CAS窃取
base = b + 1;
stealCount++; // 窃取计数增加
return t;
}
}
returnnull;
}
关键点:
pop()从队尾取,自己操作,CAS无锁poll()从队头取,其他线程窃取,volatile读取+CAS窃取成功后 stealCount++,用于负载均衡统计
三、实战案例:大数组求和
3.1 RecursiveTask实现
publicclassArraySumTaskextendsRecursiveTask<Long> {
privatestaticfinalint THRESHOLD = 10000; // 任务阈值
privatefinallong[] array;
privatefinalint start;
privatefinalint end;
publicArraySumTask(long[] array, int start, int end){
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute(){
int length = end - start;
// 小任务直接计算
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 大任务拆分
int mid = start + length / 2;
ArraySumTask leftTask = new ArraySumTask(array, start, mid);
ArraySumTask rightTask = new ArraySumTask(array, mid, end);
// fork拆分
leftTask.fork();
rightTask.fork();
// join合并
return leftTask.join() + rightTask.join();
}
}
3.2 执行任务
publicclassForkJoinDemo{
publicstaticvoidmain(String[] args){
long[] array = newlong[100_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
ForkJoinPool pool = new ForkJoinPool();
ArraySumTask task = new ArraySumTask(array, 0, array.length);
long start = System.currentTimeMillis();
Long result = pool.invoke(task);
long end = System.currentTimeMillis();
System.out.println("结果: " + result);
System.out.println("耗时: " + (end - start) + "ms");
pool.shutdown();
}
}
3.3 RecursiveAction(无返回值)
publicclassFileProcessTaskextendsRecursiveAction{
privatestaticfinalint THRESHOLD = 10;
privatefinal List<File> files;
publicFileProcessTask(List<File> files){
this.files = files;
}
@Override
protectedvoidcompute(){
if (files.size() <= THRESHOLD) {
// 小任务直接处理
for (File file : files) {
processFile(file);
}
} else {
// 大任务拆分
int mid = files.size() / 2;
FileProcessTask left = new FileProcessTask(files.subList(0, mid));
FileProcessTask right = new FileProcessTask(files.subList(mid, files.size()));
invokeAll(left, right); // 同时提交两个任务
}
}
privatevoidprocessFile(File file){
System.out.println("处理文件: " + file.getName());
}
}
四、踩坑经验
坑1:任务粒度太细,fork开销大于计算
// 错误:每个元素都fork
if (length > 1) {
ArraySumTask left = new ArraySumTask(array, start, start + 1);
left.fork();
}
问题:fork/join有开销(队列操作、线程调度),任务太小得不偿失。
解决:设置合理的阈值,一般建议每个任务执行时间>1ms。
坑2:join阻塞导致线程空闲
// 错误:fork后立即join阻塞
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join(); // 当前线程阻塞等待
问题:当前线程fork两个任务后阻塞等待,无法窃取其他任务。
解决:使用invokeAll或合理拆分
// 方式1:invokeAll同时提交
invokeAll(leftTask, rightTask);
return leftTask.join() + rightTask.join();
// 方式2:fork一个,自己执行另一个
leftTask.fork();
long rightResult = rightTask.compute(); // 自己执行
long leftResult = leftTask.join();
return leftResult + rightResult;
坑3:在compute中调用阻塞方法
// 错误:compute中sleep或IO阻塞
protected Long compute(){
Thread.sleep(1000); // 阻塞线程
File file = new File("data.txt");
file.read(); // IO阻塞
return sum;
}
问题:阻塞线程导致工作窃取失效,线程池效率下降。
解决:Fork/Join适合CPU密集型任务,IO密集型用CompletableFuture。
五、最佳实践
5.1 合理设置任务阈值
// 阈值 = 总任务量 / (CPU核心数 * 期望每个任务执行次数)
int threshold = totalSize / (Runtime.getRuntime().availableProcessors() * 100);
经验:
每个任务执行时间1ms-10ms为宜 阈值太小:fork开销大 阈值太大:并行度不够
5.2 使用CommonPool
// 推荐:使用公共池,避免创建新池
ForkJoinTask<Long> task = new ArraySumTask(array, 0, array.length);
Long result = task.invoke(); // 自动使用commonPool
公共池特点:
进程级别共享,减少资源占用 parallelStream默认使用commonPool 可配置并行度: -Djava.util.concurrent.ForkJoinPool.common.parallelism=4
5.3 监控线程池状态
ForkJoinPool pool = new ForkJoinPool(4);
// 获取监控指标
System.out.println("活跃线程: " + pool.getActiveThreadCount());
System.out.println("并行度: " + pool.getParallelism());
System.out.println("窃取次数: " + pool.getStealCount());
System.out.println("待处理任务: " + pool.getQueuedTaskCount());
六、面试加分Q&A
Q1:Fork/Join和普通线程池的区别?
A:
任务队列:普通线程池共享队列,Fork/Join每个线程独立队列 任务窃取:普通线程池竞争同一个队列,Fork/Join空闲线程窃取其他队列 任务类型:普通线程池执行独立任务,Fork/Join执行可拆分的递归任务 适用场景:普通线程池适合IO密集型,Fork/Join适合CPU密集型
Q2:工作窃取算法为什么高效?
A:
每个线程双端队列,自己操作队尾(LIFO),窃取操作队头(FIFO) 队尾操作无竞争(只有自己操作) 队头窃取用CAS,避免锁竞争 负载自动均衡,忙线程多任务,闲线程帮忙
Q3:ForkJoinPool的commonPool是什么?
A:
JDK8引入的公共ForkJoinPool,整个JVM共享 parallelStream默认使用commonPool执行 配置: -Djava.util.concurrent.ForkJoinPool.common.parallelism=N优点:减少池创建开销,缺点:所有任务共享资源
Q4:什么场景不适合用Fork/Join?
A:
IO密集型任务:阻塞导致窃取失效 任务不可拆分:无法分治 任务间有依赖:Fork/Join假设子任务独立 数据量太小:fork开销大于收益
Q5:fork和invoke的区别?
A:
fork():异步提交任务到队列,立即返回invoke():提交任务并等待完成,同步执行invokeAll(tasks):批量提交并等待所有完成
七、总结
Fork/Join框架的核心设计:
记住三个原则:
任务粒度适中,fork开销不大于计算收益 避免在compute中阻塞,保持线程活跃 CPU密集型用Fork/Join,IO密集型用CompletableFuture
源码层面,核心是WorkQueue的双端队列设计和CAS无锁窃取操作。理解这个,就理解了Fork/Join的高效本质。
📖 往期推荐
如果觉得有帮助,欢迎转发给需要的朋友 💙
有问题评论区见 ✨
夜雨聆风