源码分析: AbstractQueuedSynchronizer
有时候在一段代码上花费大量时间,在一段代码上反复花时间,都是值得的。本文的jdk版本是1.8. 本文将演示AQS如何在多线程场景下进行入队和出队操作。
从一段代码说起
如下代码是事先准备的一段demo,演示的是在多线程环境下,ReentranLock是如何通过底层的AQS对象来实现同步的。
package com.jeff.study.concurrent.aqs;import java.io.IOException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.ReentrantLock;/** * @Description AQS的抢锁和入队过程 * @Date 2021/1/31 7:36 下午 * @Author jeff.sheng * @see AbstractQueuedSynchronizer * @see ReentrantLock */public class AQSDemo { public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable runnable = () -> { System.out.println("当前线程: " + Thread.currentThread().getName() + " 请输入:"); lock.lock(); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } finally { lock.unlock(); } }; executorService.execute(runnable); executorService.execute(runnable); executorService.awaitTermination(200, TimeUnit.SECONDS); executorService.shutdown(); }}
经过如上代码,相信对于ReentrantLock的基本使用方式大家有了一个大致了解,接下来就看下什么是AQS,以及ReentrantLock是怎么基于底层的AQS实现并发锁的控制.
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer,即抽象队列同步器类。其内部包含了state、head和tail两个Node类型的变量。而Node是AbstractQueuedSynchronizer的内部类,提供属性如下:
• prev: 类型为Node,表示当前节点的前一个节点。 • next:类型为Node,表示当前节点的后一个节点。 • waitStatus:表示当前节点线程的状态。比如CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3). • thread:表示当前AQS队列中当前节点的线程对象。 • nextWaiter:AQS队列中下一个等待节点。
那么ReentrantLock是什么呢?
ReentranLock
ReentranLock与AQS的关系
ReentranLock类内部定义了一个Sync类型的变量,而Sync则是ReentranLock的一个抽象的静态内部类,Sync这个内部类继承了AbstractQueuedSynchronizer抽象类。
abstract static class Sync extends AbstractQueuedSynchronizer{ ... }
而AbstractQueuedSynchronizer又继承了AbstractOwnableSynchronizer类,这个类需要我们重点关注下。因为里边有一个AQS的独占线程对象方法setExclusiveOwnerThread:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable {.....}package java.util.concurrent.locks;import java.io.Serializable;public abstract class AbstractOwnableSynchronizer implements Serializable { private static final long serialVersionUID = 3737899427754241961L; private transient Thread exclusiveOwnerThread; protected AbstractOwnableSynchronizer() { } // 设置AQS的独占线程对象 protected final void setExclusiveOwnerThread(Thread thread) { this.exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return this.exclusiveOwnerThread; }}
Sync则继续提供了两种实现,也就是FairSync和NonfailSync.也就是我们常说的公平锁和非公平锁。ReentranLock则提供了非公平Sync的默认构造器,且提供了可选的构造器。

/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
NonfairSync
ReentranLock的内部非公平实现NonfairSync代码如下:
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
lock方法
我们先看下lock方法:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);}
当线程T1进入时,通过CAS操作修改state变量的状态.
-
• 如果此时没有其他线程一起竞争,则返回true。然后设置当前独占锁的线程对象为当前线程(Thread.currentThread()). -
• 如果有线程T2一起竞争,则进入acquire(1)方法。
acquire方法
acquire方法由NonfairSync的父类AbstractQueuedSynchronizer提供。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们先看tryAcquire方法,这个方法由NonfairSync实现。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire方法由ReentranLock提供。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
当线程T1和T2都过来时,先获取AQS的state值,state值是一个volitale变量。
• 如果T1和T2都发现state值等于0则进行CAS操作设置state的值为1,但最终只有一个线程设置成功,并设置独占锁的线程为当前线程。假如T1抢占锁成功则返回true。整个流程结束。那么T2则返回false,抢锁失败!
• 如果T1先进来抢占锁成功了,那么state值就会通过CAS操作设置为1.由于volitale变量的可见性,T2进来时发现state不是0,而且发现当前抢锁成功的线程是T1,就直接返回false了,抢锁失败!
• 如果T1先抢锁成功且未释放锁,然后又再次进入此方法,那么发现自己就是拥有锁的线程,则给state值加1,锁重入。
以上我们说明了T1线程运气好抢锁成功的场景,而T2线程运气差总是抢锁失败。那么T2抢占锁失败之后返回了false,会如何呢?可以从tryAcquire方法看到,抢锁失败之后,会进一步执行下边的判断:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
我们来看下addWaiter方法。
addWaiter方法
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
addWaiter方法传进来的参数是Node.EXCLUSIVE,看一下Node节点中对它的解释:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** 标记Node节点是独占模式等待状态 */ static final Node EXCLUSIVE = null; ..........}
T2虽然抢锁失败了,但是T2将会以独占模式节点的状态继续等待。T2进来addWaiter方法时,会包装为一个Node节点。假设T2进入时,此时AQS的tail节点尚未初始化。也就是说AQS对象的tail和head两个头尾Node节点都是NULL。那么就会进入enq方法先初始化:
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
可以看到tail赋值给t,t此时为null’就进行头结点的初始化了,然后把初始化的head节点引用给了tail。

继续for循环,tail就不为空了,tail赋值给t也不为空,就是初始化的那个节点。将其赋值给T2独占等待状态节点的prev节点。

然后将t,也就是尾结点TAIL更新为T2独占等待节点。

这样以来T2独占等待节点就成了AQS队列中的一个尾节点。接下来最重要的获取AQS队列节点的操作来了。
acquireQueued方法
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
方法传递进来的参数就是T2独占等待节点,根据前边的描述,它的前一个节点就是哨兵节点也就是head节点,此时T2虽然抢锁失败而进入队列,但是它并不死心,它觉得此时T1可能已经释放锁了,所以想再次tryAcquire一次,如果真的如T2所愿,T1执行结束真的unlock的话,就会执行以下代码并返回interrupted值为false。
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } /** */将T2独占等待节点指针设置给head节点,也就是T2节点将从队列中出队。设置thread和prev都为null。 */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
但如果是T2想的美呢?也就是T1根本就没结束还在持有锁,那么tryAcquire获取的state值就不是0而且持有锁的线程还是T1.那么不好意思,T2线程Acquire失败了,此时将尝试park住T2线程。
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
通过debug来走读下这段代码吧!T2进来之后,先判断pred节点,也就是T2独占等待节点的prev节点的waitStatus状态,可以看到值为0,回顾下开始时提到的waitStatus:
waitStatus:表示当前节点线程的状态。比如:CANCELLED(1)SIGNAL(-1)CONDITION(-2)PROPAGATE(-3).
所以都不满足,于是就使用CAS更新prev节点waitStatus的状态(更新为SIGNAL=-1):
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

然后继续执行acquireQueued方法中的下一次for循环,仍然进入了shouldParkAfterFailedAcquire方法中,但这一次由于waitStatus=-1满足第一个if条件,于是返回true。再继续判断parkAndCheckInterrupt方法是否满足条件:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
当T2线程进入调用LockSupport.park方法时,T2线程阻塞在这里了。此时此刻,T1线程阻塞在:
System.in.read()
T2线程阻塞在LockSupport.park方法。通过调用jstack方法可以观察到此时的线程stack信息:

而T1线程则是RUNABLE状态:

当在控制台输入一行字符串时,T2线程返回了。因为T1线程此时unlock了。


然后T2线程调用Thread.interruped方法检测线程T2是否被中断,本次返回了false。
这里如果检测到T2被中断则清除中断标识返回true,如果返回了true则会调Thread.currentThread().interrupt();重新还原T2的中断标识。
但是由于此时T1线程已经unlock,AQS的state变量值变为0了。T2线程成功的获取到了锁,返回true。

unlock方法
执行完业务逻辑之后,在finally中通常会调用比如ReentranLock的unlock方法,这里我们就看下ReentranLock的unlock方法。
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
同样是调用了AQS的release方法,其中可以看到tryRelease这个方法,这个方法是ReentranLock提供的。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
确切的说,实现了AQS的对象很多都提供了这个方法。而AQS抽象类中提供了一个抛出异常的实现:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }

//ReentranLock提供 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
这里的逻辑很明显了,就是要释放state和独占线程这两个资源给其他线程用了。回到AQS的release方法,如果tryRelease执行成功,则会对AQS队列中下一个Node进行unparkSuccessor操作。AQS提供了实现:
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
可以看到,在找到了后继者之后唤醒了这个节点的线程:
LockSupport.unpark(s.thread);
公平锁和非公平锁的区别
代码一贴,疑惑全解。
/** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
其实公平锁的实现FairSync的tryAcquire与非公平实现NonfairSync的区别仅仅在于一行代码:
//抢占锁的时候先判断队列中有没有元素,如果有的话那直接返回false了。!hasQueuedPredecessors()
但是这行代码对应方法的注释挺长!
/** * Queries whether any threads have been waiting to acquire longer * than the current thread. * * <p>An invocation of this method is equivalent to (but may be * more efficient than): * <pre> {@code * getFirstQueuedThread() != Thread.currentThread() && * hasQueuedThreads()}</pre> * * <p>Note that because cancellations due to interrupts and * timeouts may occur at any time, a {@code true} return does not * guarantee that some other thread will acquire before the current * thread. Likewise, it is possible for another thread to win a * race to enqueue after this method has returned {@code false}, * due to the queue being empty. * * <p>This method is designed to be used by a fair synchronizer to * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>. * Such a synchronizer's {@link #tryAcquire} method should return * {@code false}, and its {@link #tryAcquireShared} method should * return a negative value, if this method returns {@code true} * (unless this is a reentrant acquire). For example, the {@code * tryAcquire} method for a fair, reentrant, exclusive mode * synchronizer might look like this: * * <pre> {@code * protected boolean tryAcquire(int arg) { * if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; * } else { * // try to acquire normally * } * }}</pre> * * @return {@code true} if there is a queued thread preceding the * current thread, and {@code false} if the current thread * is at the head of the queue or the queue is empty * @since 1.7 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
如果当前线程在队头或者队列是空的时候则返回false,才可以对state状态进行CAS更新操作。否则直接抢锁失败,返回false!
夜雨聆风
