源码解析之ReentrantLock(Condition)(三)
在上一篇文章源码解析之ReentrantLock(二)中,我们详细分析了公平锁的入队、挂起和唤醒流程。我们了解到,AQS通过内部的FIFO等待队列来管理竞争锁的线程,并通过LockSupport.park/unpark实现线程的阻塞与唤醒。
但仅仅有锁显然是不够的,很多时候我们需要线程间协作,比如等待某个条件满足后再继续执行。这正是Condition诞生的背景,它提供了类似Object.wait/notify的功能,但与特定的Lock绑定,可以支持多个等待队列(条件变量)。今天我们就来看看ReentrantLock中Condition的底层是如何实现的。
本文说明
-
Java版本:1.8。
-
本文主要分析ReentrantLock中Condition的实现,即AQS的内部类
ConditionObject。 -
涉及等待队列(用于锁竞争)和条件队列(用于条件等待)两个队列的协作。
-
不涉及
Condition在共享模式下的应用
一、Condition的基本用法
在深入源码前,我们先来看一下Condition的用法。
ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();// 线程1:等待条件lock.lock();try {while (条件不满足) {condition.await(); // 释放锁,进入等待状态}// 执行条件满足后的逻辑} finally {lock.unlock();}// 线程2:触发条件lock.lock();try {// 改变条件状态condition.signal(); // 唤醒一个等待线程} finally {lock.unlock();}
这里有两个关键点需要注意:
-
调用
await()前必须持有锁,否则会抛出IllegalMonitorStateException。 -
await()会释放锁,并将当前线程封装成节点加入条件队列;当被signal唤醒后,会重新竞争锁。
下面我们看一下Condition的内部结构。
二、创建Condition对象
// ReentrantLock.javapublic Condition newCondition() {return sync.newCondition();}// ReentrantLock.Syncfinal ConditionObject newCondition() {return new ConditionObject();}
ConditionObject是AbstractQueuedSynchronizer(AQS)的内部类,它实现了Condition接口。每个Condition对象内部都维护着一个独立的FIFO条件队列。注意,这个队列和前面文章里讲的同步队列(等待锁的队列)是两个完全不同的队列,它们节点类型相同(都是AQS.Node),但作用和使用方式截然不同。
我们先来看ConditionObject的核心字段:
public class ConditionObject implements Condition, java.io.Serializable {/** 条件队列的头节点 */private transient Node firstWaiter;/** 条件队列的尾节点 */private transient Node lastWaiter;// ... 其他}
这里的Node正是我们在第二篇文章中见到的那个内部类,不过在条件队列中,我们只使用Node中的nextWaiter属性来串联节点,而不是prev和next。这是为了与同步队列区分开。
三、await() 的实现
await()方法会让当前线程释放锁,并进入条件队列等待,直到被signal唤醒或线程中断。我们先看整体结构,再逐个拆解。
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 1. 将当前线程封装成节点,并加入到条件队列中Node node = addConditionWaiter();// 2. 完全释放当前线程持有的锁(考虑重入)int savedState = fullyRelease(node);int interruptMode = 0;// 3. 自旋,检查节点是否已进入到同步队列while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 4. 此时节点已在同步队列中,尝试获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 5. 清理条件队列中已取消的节点(被转移到同步队列或已取消)if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 6. 处理中断if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
这个方法比较复杂,我们来逐步拆分。
3.1 加入条件队列:addConditionWaiter()
private Node addConditionWaiter() {Node t = lastWaiter;// 清除条件队列中取消等待的节点if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters(); // 所有节点状态不等于Node.CONDITION都会被删除t = lastWaiter;}// 创建当前线程的节点,状态为 CONDITION(-2),处于条件等待中Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
这里必须注意:条件队列中的节点状态被设置为CONDITION(值为-2)时,这表示该节点正处于条件等待中,尚未转移到同步队列。与同步队列不同,条件队列的入队操作不需要CAS,因为调用await()时线程已经持有了锁,不存在并发竞争。
unlinkCancelledWaiters()方法会遍历条件队列,移除所有状态不是CONDITION的节点。
3.2 释放锁:fullyRelease()
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 调用 release(savedState) 释放所有的锁(在之前的文章有详细分析,本次不做过多描述)if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}
fullyRelease会释放当前线程持有的所有锁(包括重入锁),并返回释放前的state值。这个值会被保存下来,当线程被唤醒重新获取锁时,需要恢复到这个重入计数。
如果释放失败(比如当前线程没有持有锁),就会抛出IllegalMonitorStateException,这就是我们调用await()前必须先获取锁的原因。
3.3 等待进入同步队列:isOnSyncQueue()
while (!isOnSyncQueue(node)) {LockSupport.park(this);// ...}
线程释放锁后,就进入等待状态,直到被signal唤醒。isOnSyncQueue(node)用来判断节点是否已经从条件队列转移到了同步队列(等待锁的队列)。只有转移到同步队列后,线程才有机会竞争锁。
final boolean isOnSyncQueue(Node node) {// 如果状态是 CONDITION,或者 prev 指针为 null(说明不在同步队列中),返回falseif (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果 next 指针不为 null,说明一定在同步队列中if (node.next != null)return true;// 否则从尾部向前查找return findNodeFromTail(node);}
关于为什么需要findNodeFromTail,我们在第二篇文章中解释过:同步队列的next指针是后设置的,可能存在可见性问题,所以需要反向查找。
3.4 被唤醒后竞争锁:acquireQueued
当signal将节点从条件队列转移到同步队列后(下面会解释signal如何转移节点),isOnSyncQueue返回true,循环退出。接着调用acquireQueued(node, savedState),这个方法我们在第二篇文章中详细分析过——它负责在同步队列中自旋、挂起、尝试获取锁。注意,这里传入的savedState正是之前保存的重入计数,获取锁后会恢复state。
3.5 清理和中断处理
如果节点不是通过signal正常唤醒,而是因为中断,checkInterruptWhileWaiting会返回对应的中断模式(THROW_IE或REINTERRUPT),后续在reportInterruptAfterWait中决定是抛出中断异常还是重新设置中断标志。
四、signal() 的实现
signal方法用于唤醒条件队列中等待时间最长的线程(即第一个节点)。与await类似,调用signal也必须持有锁,否则会抛出异常。
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}
isHeldExclusively是AQS中的一个钩子方法,在ReentrantLock中实现为isHeldExclusively(),返回当前线程是否持有锁。这个检查确保了只有锁的持有者才能发送信号。
4.1 doSignal
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null; // 断开原头节点} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
这个循环的作用:从条件队列头开始,逐个尝试将节点转移到同步队列,直到成功转移一个节点或队列为空。
4.2 从条件队列转移到同步队列:transferForSignal()
final boolean transferForSignal(Node node) {// CAS将节点状态从CONDITION改为0,失败则说明节点已被取消if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 将该节点插入同步队列的尾部,返回原尾节点(即新节点的前驱)Node p = enq(node);int ws = p.waitStatus;// 如果前驱状态为CANCELLED,或者设置前驱状态为SIGNAL失败,// 则直接唤醒该线程,让它自己去同步队列中竞争if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
这段代码有两个关键点:
-
将节点状态从CONDITION改为0,表示它不再处于条件等待状态。如果CAS失败,说明节点已经被取消或已被转移。
-
调用
enq(node)将节点加入同步队列。这里入队成功后返回的是节点的前驱节点。根据我们之前分析的同步队列规则:一个节点需要被唤醒,其前驱节点的waitStatus必须为SIGNAL。如果前驱节点已取消或无法设置SIGNAL,就直接唤醒该线程,让它自己处理。
五、条件队列与同步队列的协作关系
到这里,Condition的核心流程已经清晰了,我画了一张简图来帮助大家理解两个队列的协作:
同步队列(等待锁)head ──> node1 ──> node2 ──> tail条件队列(等待条件)firstWaiter ──> nodeA ──> nodeB ──> lastWaiterawait: 线程进入 nodeA → 释放锁 → 进入条件队列 → 挂起signal: 将 nodeA 从条件队列移除 → 状态改为0 → enq入同步队列 → 设置前驱为SIGNAL
重要区别:
-
同步队列是所有线程竞争锁的战场,节点通过
prev、next连接,入队出队都需要CAS保证线程安全。 -
条件队列是每个Condition私有的休息室,节点通过
nextWaiter连接,由于访问它时线程已经持有锁,所以不需要CAS。
六、注意点:中断与虚假唤醒
在await()的循环判断中,使用了while (!isOnSyncQueue(node)),这正是为了防止虚假唤醒。即使没有被signal,线程也可能因为操作系统原因被唤醒,此时它仍不在同步队列中,需要继续挂起。
另外,Condition响应中断有两种方式:
-
THROW_IE:在等待时检测到中断,直接抛出InterruptedException。
-
REINTERRUPT:在获取锁后重新设置中断标记。
具体的判断逻辑在checkInterruptWhileWaiting中实现,这里不再展开,大家知道AQS会妥善处理即可。
七、总结
经过以上分析,我们可以看到Condition的实现完全依赖于AQS的底层框架,其巧妙之处在于:
-
同一套Node节点,两种队列,通过不同的字段(
prev/next&nextWaiter)链接,职责分离。 -
状态字段waitStatus的多重语义:
SIGNAL、CANCELLED、CONDITION等,控制线程状态流转。 -
等待条件时必须持有锁,这是由
fullyRelease和isHeldExclusively强制保证的,避免了丢失唤醒信号的风险。 -
signal不会立即唤醒线程,只是将节点从条件队列移到同步队列,线程仍需排队竞争锁,这符合“公平”的原则。
对比Object.wait/notify,Condition提供了更丰富的功能:支持多条件变量、可中断等待、超时等待、公平/非公平选择等,但其底层复杂度也相应更高。
写在最后
至此,ReentrantLock中Condition的核心源码我们已经过了一遍。相比加锁解锁,Condition的逻辑确实更加绕,尤其是两个队列之间的切换,以及状态位的流转。建议大家结合文中的流程图,自己走一遍完整的await/signal流程,相信会有更深的理解。欢迎大家继续关注,也感谢大家的耐心阅读。
夜雨聆风
