乐于分享
好东西不私藏

源码解析之ReentrantLock(Condition)(三)

源码解析之ReentrantLock(Condition)(三)

前情回顾

在上一篇文章源码解析之ReentrantLock(二)中,我们详细分析了公平锁的入队、挂起和唤醒流程。我们了解到,AQS通过内部的FIFO等待队列来管理竞争锁的线程,并通过LockSupport.park/unpark实现线程的阻塞与唤醒。

但仅仅有锁显然是不够的,很多时候我们需要线程间协作,比如等待某个条件满足后再继续执行。这正是Condition诞生的背景,它提供了类似Object.wait/notify的功能,但与特定的Lock绑定,可以支持多个等待队列(条件变量)。今天我们就来看看ReentrantLock中Condition的底层是如何实现的。

本文说明

  • Java版本:1.8。

  • 本文主要分析ReentrantLock中Condition的实现,即AQS的内部类ConditionObject

  • 涉及等待队列(用于锁竞争)和条件队列(用于条件等待)两个队列的协作。

  • 不涉及Condition在共享模式下的应用


一、Condition的基本用法

在深入源码前,我们先来看一下Condition的用法。

ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();// 线程1:等待条件lock.lock();try {    while (条件不满足) {        condition.await(); // 释放锁,进入等待状态    }    // 执行条件满足后的逻辑finally {    lock.unlock();}// 线程2:触发条件lock.lock();try {    // 改变条件状态    condition.signal(); // 唤醒一个等待线程finally {    lock.unlock();}

这里有两个关键点需要注意:

  1. 调用await()前必须持有锁,否则会抛出IllegalMonitorStateException

  2. await()会释放锁,并将当前线程封装成节点加入条件队列;当被signal唤醒后,会重新竞争锁。

下面我们看一下Condition的内部结构。


二、创建Condition对象

// ReentrantLock.javapublic Condition newCondition() {    return sync.newCondition();}// ReentrantLock.Syncfinal ConditionObject newCondition() {    return new ConditionObject();}

ConditionObject是AbstractQueuedSynchronizer(AQS)的内部类,它实现了Condition接口。每个Condition对象内部都维护着一个独立的FIFO条件队列。注意,这个队列和前面文章里讲的同步队列(等待锁的队列)是两个完全不同的队列,它们节点类型相同(都是AQS.Node),但作用和使用方式截然不同。

我们先来看ConditionObject的核心字段:

public class ConditionObject implements Condition, java.io.Serializable {    /** 条件队列的头节点 */    private transient Node firstWaiter;    /** 条件队列的尾节点 */    private transient Node lastWaiter;    // ... 其他}

这里的Node正是我们在第二篇文章中见到的那个内部类,不过在条件队列中,我们只使用Node中的nextWaiter属性来串联节点,而不是prevnext。这是为了与同步队列区分开。


三、await() 的实现

await()方法会让当前线程释放锁,并进入条件队列等待,直到被signal唤醒或线程中断。我们先看整体结构,再逐个拆解。

public final void await() throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    // 1. 将当前线程封装成节点,并加入到条件队列中    Node node = addConditionWaiter();    // 2. 完全释放当前线程持有的锁(考虑重入)    int savedState = fullyRelease(node);    int interruptMode = 0;    // 3. 自旋,检查节点是否已进入到同步队列    while (!isOnSyncQueue(node)) {        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    // 4. 此时节点已在同步队列中,尝试获取锁    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    // 5. 清理条件队列中已取消的节点(被转移到同步队列或已取消)    if (node.nextWaiter != null// clean up if cancelled        unlinkCancelledWaiters();    // 6. 处理中断    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);}

这个方法比较复杂,我们来逐步拆分。

3.1 加入条件队列:addConditionWaiter()

private Node addConditionWaiter() {    Node t = lastWaiter;    // 清除条件队列中取消等待的节点    if (t != null && t.waitStatus != Node.CONDITION) {        unlinkCancelledWaiters(); // 所有节点状态不等于Node.CONDITION都会被删除        t = lastWaiter;    }    // 创建当前线程的节点,状态为 CONDITION(-2),处于条件等待中    Node node = new Node(Thread.currentThread(), Node.CONDITION);    if (t == null)        firstWaiter = node;    else        t.nextWaiter = node;    lastWaiter = node;    return node;}

这里必须注意:条件队列中的节点状态被设置为CONDITION(值为-2)时,这表示该节点正处于条件等待中,尚未转移到同步队列。与同步队列不同,条件队列的入队操作不需要CAS,因为调用await()时线程已经持有了锁,不存在并发竞争。

unlinkCancelledWaiters()方法会遍历条件队列,移除所有状态不是CONDITION的节点。

3.2 释放锁:fullyRelease()

final int fullyRelease(Node node) {    boolean failed = true;    try {        int savedState = getState();        // 调用 release(savedState) 释放所有的锁(在之前的文章有详细分析,本次不做过多描述)        if (release(savedState)) {            failed = false;            return savedState;        } else {            throw new IllegalMonitorStateException();        }    } finally {        if (failed)            node.waitStatus = Node.CANCELLED;    }}

fullyRelease会释放当前线程持有的所有锁(包括重入锁),并返回释放前的state值。这个值会被保存下来,当线程被唤醒重新获取锁时,需要恢复到这个重入计数。

如果释放失败(比如当前线程没有持有锁),就会抛出IllegalMonitorStateException,这就是我们调用await()前必须先获取锁的原因。

3.3 等待进入同步队列:isOnSyncQueue()

while (!isOnSyncQueue(node)) {    LockSupport.park(this);    // ...}

线程释放锁后,就进入等待状态,直到被signal唤醒。isOnSyncQueue(node)用来判断节点是否已经从条件队列转移到了同步队列(等待锁的队列)。只有转移到同步队列后,线程才有机会竞争锁。

final boolean isOnSyncQueue(Node node) {    // 如果状态是 CONDITION,或者 prev 指针为 null(说明不在同步队列中),返回false    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    // 如果 next 指针不为 null,说明一定在同步队列中    if (node.next != null)        return true;    // 否则从尾部向前查找    return findNodeFromTail(node);}

关于为什么需要findNodeFromTail,我们在第二篇文章中解释过:同步队列的next指针是后设置的,可能存在可见性问题,所以需要反向查找。

3.4 被唤醒后竞争锁:acquireQueued

signal将节点从条件队列转移到同步队列后(下面会解释signal如何转移节点),isOnSyncQueue返回true,循环退出。接着调用acquireQueued(node, savedState),这个方法我们在第二篇文章中详细分析过——它负责在同步队列中自旋、挂起、尝试获取锁。注意,这里传入的savedState正是之前保存的重入计数,获取锁后会恢复state

3.5 清理和中断处理

如果节点不是通过signal正常唤醒,而是因为中断,checkInterruptWhileWaiting会返回对应的中断模式(THROW_IE或REINTERRUPT),后续在reportInterruptAfterWait中决定是抛出中断异常还是重新设置中断标志。


四、signal() 的实现

signal方法用于唤醒条件队列中等待时间最长的线程(即第一个节点)。与await类似,调用signal必须持有锁,否则会抛出异常。

public final void signal() {    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    Node first = firstWaiter;    if (first != null)        doSignal(first);}

isHeldExclusively是AQS中的一个钩子方法,在ReentrantLock中实现为isHeldExclusively(),返回当前线程是否持有锁。这个检查确保了只有锁的持有者才能发送信号。

4.1 doSignal

private void doSignal(Node first) {    do {        if ( (firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        first.nextWaiter = null// 断开原头节点    } while (!transferForSignal(first) &&             (first = firstWaiter) != null);}

这个循环的作用:从条件队列头开始,逐个尝试将节点转移到同步队列,直到成功转移一个节点或队列为空。

4.2 从条件队列转移到同步队列:transferForSignal()

final boolean transferForSignal(Node node) {    // CAS将节点状态从CONDITION改为0,失败则说明节点已被取消    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;    // 将该节点插入同步队列的尾部,返回原尾节点(即新节点的前驱)    Node p = enq(node);    int ws = p.waitStatus;    // 如果前驱状态为CANCELLED,或者设置前驱状态为SIGNAL失败,    // 则直接唤醒该线程,让它自己去同步队列中竞争    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;}

这段代码有两个关键点:

  1. 将节点状态从CONDITION改为0,表示它不再处于条件等待状态。如果CAS失败,说明节点已经被取消或已被转移。

  2. 调用enq(node)将节点加入同步队列。这里入队成功后返回的是节点的前驱节点。根据我们之前分析的同步队列规则:一个节点需要被唤醒,其前驱节点的waitStatus必须为SIGNAL。如果前驱节点已取消或无法设置SIGNAL,就直接唤醒该线程,让它自己处理。


五、条件队列与同步队列的协作关系

到这里,Condition的核心流程已经清晰了,我画了一张简图来帮助大家理解两个队列的协作:

       同步队列(等待锁)    head ──> node1 ──> node2 ──> tail       条件队列(等待条件)    firstWaiter ──> nodeA ──> nodeB ──> lastWaiterawait:   线程进入 nodeA → 释放锁 → 进入条件队列 → 挂起signal:  将 nodeA 从条件队列移除 → 状态改为0 → enq入同步队列 → 设置前驱为SIGNAL

重要区别:

  • 同步队列是所有线程竞争锁的战场,节点通过prevnext连接,入队出队都需要CAS保证线程安全。

  • 条件队列是每个Condition私有的休息室,节点通过nextWaiter连接,由于访问它时线程已经持有锁,所以不需要CAS。


六、注意点:中断与虚假唤醒

await()的循环判断中,使用了while (!isOnSyncQueue(node)),这正是为了防止虚假唤醒。即使没有被signal,线程也可能因为操作系统原因被唤醒,此时它仍不在同步队列中,需要继续挂起。

另外,Condition响应中断有两种方式:

  • THROW_IE:在等待时检测到中断,直接抛出InterruptedException。

  • REINTERRUPT:在获取锁后重新设置中断标记。

具体的判断逻辑在checkInterruptWhileWaiting中实现,这里不再展开,大家知道AQS会妥善处理即可。


七、总结

经过以上分析,我们可以看到Condition的实现完全依赖于AQS的底层框架,其巧妙之处在于:

  1. 同一套Node节点,两种队列,通过不同的字段(prev/next & nextWaiter)链接,职责分离。

  2. 状态字段waitStatus的多重语义SIGNALCANCELLEDCONDITION等,控制线程状态流转。

  3. 等待条件时必须持有锁,这是由fullyReleaseisHeldExclusively强制保证的,避免了丢失唤醒信号的风险。

  4. signal不会立即唤醒线程,只是将节点从条件队列移到同步队列,线程仍需排队竞争锁,这符合“公平”的原则。

对比Object.wait/notify,Condition提供了更丰富的功能:支持多条件变量、可中断等待、超时等待、公平/非公平选择等,但其底层复杂度也相应更高。


写在最后

至此,ReentrantLock中Condition的核心源码我们已经过了一遍。相比加锁解锁,Condition的逻辑确实更加绕,尤其是两个队列之间的切换,以及状态位的流转。建议大家结合文中的流程图,自己走一遍完整的await/signal流程,相信会有更深的理解。欢迎大家继续关注,也感谢大家的耐心阅读。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » 源码解析之ReentrantLock(Condition)(三)

评论 抢沙发

7 + 7 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮