大家好,我是懒羊不想刷算法
今天来讲一下 JUC 包下一个非常常用的并发工具类:CountDownLatch
假设现在有一个需求:
主线程需要等 3 个工作线程全部处理完任务,再汇总最终结果。
最简单粗暴的写法是什么?
主线程写一个 while 死循环,隔一会检查一下任务是不是全结束了。
这个写法不能说完全不能用,只能说 CPU 看了想报警。
JUC 已经帮我们提供了一个专门解决这类问题的工具:CountDownLatch。
它的名字拆开来看非常形象:
CountDown:倒计数Latch:门闩
创建对象时设置一个初始值,每完成一个任务就调用一次 countDown(),计数器减一。等待结果的线程调用 await(),只要计数器还没有归零,就老老实实等待。
当计数器减到 0,门闩打开,所有等待线程继续向下执行。

基础用法如下:
publicstaticvoidmain(String[] args)throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
int taskId = i;
new Thread(() -> {
System.out.println("任务 " + taskId + " 执行完成");
latch.countDown();
}).start();
}
latch.await();
System.out.println("全部任务结束,主线程开始汇总结果");
}
使用起来非常简单,但是有几个特点需要先记住:
countDown()不会阻塞调用它的线程await()才会阻塞线程计数器只能减,不能加 计数器减到 0 后不能重置,想重复使用可以看 CyclicBarrier
那么接下来我们一起看看,如此方便的并发工具类是怎么实现的。
一、CountDownLatch 的整体结构
CountDownLatch 的源码并不长,真正干活的也不是他自己,而是内部类 Sync。
publicclassCountDownLatch{
privatefinal Sync sync;
privatestaticfinalclassSync
extendsAbstractQueuedSynchronizer{
Sync(int count) {
setState(count);
}
intgetCount(){
return getState();
}
}
}
又看到了熟悉的 AbstractQueuedSynchronizer,也就是 AQS。
如果把 JUC 比作一栋大楼,那么 AQS 就是经常出现在地下室里的承重结构。ReentrantLock、Semaphore、CountDownLatch 这些工具类,表面上的功能各不相同,底层很多都依赖 AQS 完成线程排队和唤醒。
CountDownLatch 自己的逻辑其实非常少:
使用 AQS 的 state保存剩余计数使用 AQS 的等待队列保存被 await()阻塞的线程使用 AQS 的共享模式唤醒等待线程
构造方法也非常简单:
publicCountDownLatch(int count){
if (count < 0) {
thrownew IllegalArgumentException("count < 0");
}
this.sync = new Sync(count);
}
创建 new CountDownLatch(3) 时,本质上就是把 AQS 的 state 设置为 3。
后面每调用一次 countDown(),就尝试将 state 减一。
每调用一次 await(),就检查 state 是不是已经等于 0。
是不是突然就感觉没那么神秘了?
二、核心方法 countDown
先来看 countDown():
publicvoidcountDown(){
sync.releaseShared(1);
}
这段源码短得甚至有点敷衍。
他直接调用了 AQS 的 releaseShared(1) 方法:
publicfinalbooleanreleaseShared(int arg){
if (tryReleaseShared(arg)) {
doReleaseShared();
returntrue;
}
returnfalse;
}
这里可以拆成两步:
调用 tryReleaseShared()尝试释放共享资源如果返回 true,调用doReleaseShared()唤醒等待队列中的线程
真正负责计数器减一的是 CountDownLatch.Sync 重写的 tryReleaseShared():
protectedbooleantryReleaseShared(int releases){
for (;;) {
int c = getState();
if (c == 0) {
returnfalse;
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
这一段代码很短,但是很关键,我们逐行来看。
首先开启一个死循环:
for (;;) {
然后读取当前 state:
int c = getState();
如果 state 已经是 0,说明门闩早就打开了,直接返回 false:
if (c == 0) {
returnfalse;
}
这也是为什么多调用几次 countDown(),计数器不会减成负数,更不会抛异常。
接下来计算减一后的结果:
int nextc = c - 1;
最后使用 CAS 更新 state:
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
为什么这里要用 CAS?
因为多个工作线程可能同时调用 countDown()。
假设当前 state = 3,线程 A 和线程 B 同时读到了 3。如果直接赋值,两个线程都会写入 2,明明执行了两次 countDown(),结果计数器只减了一次。
使用 CAS 后,只有一个线程能把 3 更新成 2。另一个线程更新失败后,会回到循环开头,重新读取最新值 2,再尝试更新成 1。

还有一个细节:
tryReleaseShared() 只有在计数器刚好减到 0 时才返回 true。
也就是说,不是每次 countDown() 都会去唤醒等待线程。只有跑完最后一个任务的线程,才会进入 doReleaseShared(),真正打开门闩。
这最后一棒,有点像小组作业里负责提交文件的那个人。
三、核心方法 await
再来看等待线程调用的 await():
publicvoidawait()throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
他调用了 AQS 的 acquireSharedInterruptibly():
publicfinalvoidacquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) {
thrownew InterruptedException();
}
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg);
}
}
逻辑也可以总结为两步:
先检查当前线程是否被中断 尝试获取共享资源,获取失败就进入等待队列
CountDownLatch.Sync 重写的 tryAcquireShared() 更加简单:
protectedinttryAcquireShared(int acquires){
return (getState() == 0) ? 1 : -1;
}
没错,就这一行。
这里的含义是:
state == 0:所有任务已经完成,返回 1,线程直接通过state != 0:还有任务没有完成,返回 -1,线程进入 AQS 等待队列

如果获取失败,AQS 会调用 doAcquireSharedInterruptibly(),把当前线程包装为一个节点,加入等待队列,并在合适的时候通过 LockSupport.park() 挂起。
简化后的逻辑可以理解为:
privatevoiddoAcquireSharedInterruptibly(int arg)
throws InterruptedException {
Node node = addWaiter(Node.SHARED);
for (;;) {
Node predecessor = node.predecessor();
if (predecessor == head) {
int result = tryAcquireShared(arg);
if (result >= 0) {
setHeadAndPropagate(node, result);
return;
}
}
if (shouldParkAfterFailedAcquire(predecessor, node)
&& parkAndCheckInterrupt()) {
thrownew InterruptedException();
}
}
}
这一段源码初看又是循环,又是前驱节点,又是 park,看起来比较唬人。
但是核心思路并不复杂:
等待线程先加入 AQS 队列 轮到自己时,再次检查 state如果 state仍然不为 0,挂起线程,避免空转浪费 CPU最后一个工作线程调用 countDown()后,唤醒队列中的等待线程
之前主线程一直在门口蹲着,现在终于有人来开门了。
四、为什么使用共享模式?
看到这里可能有人会问:
为什么源码里调用的是 releaseShared() 和 acquireSharedInterruptibly(),而不是普通的独占模式?
因为等待同一个 CountDownLatch 的线程可能不止一个。
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> await(latch), "线程-A").start();
new Thread(() -> await(latch), "线程-B").start();
new Thread(() -> await(latch), "线程-C").start();
latch.countDown();
当 state 减到 0 后,A、B、C 三个线程都应该继续执行。
如果使用独占模式,一次只允许一个线程拿到资源,显然不符合需求。
共享模式的核心思路是:
一个等待线程被唤醒并确认 state == 0 后,会继续把唤醒动作向队列后方传播。
源码中对应的是 AQS 的 setHeadAndPropagate() 和 doReleaseShared()。
最终效果就是:
门一旦打开,所有等待线程都可以通过。
这也是 CountDownLatch 和锁最大的区别。
锁更像是卫生间钥匙,同一时间通常只给一个人。
CountDownLatch 更像是商场开门,时间一到大家都能进。
五、带超时时间的 await
除了无限等待,CountDownLatch 还提供了一个带超时时间的方法:
publicbooleanawait(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(
1, unit.toNanos(timeout));
}
使用方式如下:
boolean completed = latch.await(3, TimeUnit.SECONDS);
if (!completed) {
System.out.println("等待超时,还有任务没有完成");
}
在真实项目里,这种写法往往更加稳妥。
因为如果某个工作线程执行异常,没有走到 countDown(),无限等待的线程就会一直挂在那里。
因此,countDown() 通常应该放进 finally:
new Thread(() -> {
try {
doSomething();
} finally {
latch.countDown();
}
}).start();
别小看这个 finally,少写一次,线上排查问题的时候可能就要多喝两杯咖啡。
六、CountDownLatch 能不能重复使用?
不能。
当 state 减到 0 后,CountDownLatch 不提供任何重置计数器的方法。
CountDownLatch latch = new CountDownLatch(2);
latch.countDown();
latch.countDown();
// 门闩已经永久打开
latch.await();
后续再调用 await() 会直接通过,再调用 countDown() 也不会发生任何变化。
如果业务需要多轮线程互相等待,可以考虑:
CyclicBarrier:一组线程互相等待,到齐后一起继续,可以重复使用Phaser:支持更灵活的多阶段任务协调
选工具时不要只看名字眼熟,先看业务到底是在等一次,还是要循环等很多轮。
七、完整流程总结
最后,把 CountDownLatch 的源码逻辑串起来:
创建对象
new CountDownLatch(3);
内部将 AQS 的 state 设置为 3。
工作线程调用 countDown
latch.countDown();
内部通过 CAS 将 state 减一。
如果减一后不等于 0,直接返回。
如果减一后等于 0,说明当前线程完成了最后一个任务,调用 AQS 的共享释放逻辑唤醒等待线程。
等待线程调用 await
latch.await();
如果 state == 0,直接继续执行。
如果 state != 0,进入 AQS 等待队列并挂起。
门闩打开
最后一次 countDown() 将 state 减到 0 后,等待队列中的线程会以共享模式被唤醒。
至此,CountDownLatch 的整体源码就讲完了。
表面上看,他只是一个倒计时工具。
但底层其实是把 AQS 的 state、CAS、等待队列、线程挂起和共享唤醒组合在了一起。
源码不算长,非常适合作为学习 AQS 的入门案例。
关于 CountDownLatch 的文章就介绍到这里了,想要分享和补充的朋友欢迎在评论区留言。
我是懒羊不想刷算法,一个非科班转码的后端新人,希望这篇文章对你有帮助,喜欢就点赞关注吧!
夜雨聆风