乐于分享
好东西不私藏

源码分析: AbstractQueuedSynchronizer

源码分析: AbstractQueuedSynchronizer

有时候在一段代码上花费大量时间,在一段代码上反复花时间,都是值得的。本文的jdk版本是1.8. 本文将演示AQS如何在多线程场景下进行入队和出队操作。

从一段代码说起

如下代码是事先准备的一段demo,演示的是在多线程环境下,ReentranLock是如何通过底层的AQS对象来实现同步的。

package com.jeff.study.concurrent.aqs;import java.io.IOException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.ReentrantLock;/** * @Description AQS的抢锁和入队过程 * @Date 2021/1/31 7:36 下午 * @Author jeff.sheng * @see AbstractQueuedSynchronizer * @see ReentrantLock */public class AQSDemo {    public static void main(String[] args) throws InterruptedException {        ReentrantLock lock = new ReentrantLock();        ExecutorService executorService = Executors.newFixedThreadPool(2);        Runnable runnable = () -> {            System.out.println("当前线程: " + Thread.currentThread().getName() + " 请输入:");            lock.lock();            try {                System.in.read();            } catch (IOException e) {                e.printStackTrace();            } finally {                lock.unlock();            }        };        executorService.execute(runnable);        executorService.execute(runnable);        executorService.awaitTermination(200, TimeUnit.SECONDS);        executorService.shutdown();    }}

经过如上代码,相信对于ReentrantLock的基本使用方式大家有了一个大致了解,接下来就看下什么是AQS,以及ReentrantLock是怎么基于底层的AQS实现并发锁的控制.

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer,即抽象队列同步器类。其内部包含了state、head和tail两个Node类型的变量。而Node是AbstractQueuedSynchronizer的内部类,提供属性如下:

  • • prev: 类型为Node,表示当前节点的前一个节点。
  • • next:类型为Node,表示当前节点的后一个节点。
  • • waitStatus:表示当前节点线程的状态。比如CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3).
  • • thread:表示当前AQS队列中当前节点的线程对象。
  • • nextWaiter:AQS队列中下一个等待节点。

那么ReentrantLock是什么呢?

ReentranLock

ReentranLock与AQS的关系

ReentranLock类内部定义了一个Sync类型的变量,而Sync则是ReentranLock的一个抽象的静态内部类,Sync这个内部类继承了AbstractQueuedSynchronizer抽象类。

 abstract static class Sync extends AbstractQueuedSynchronizer{ ... }

而AbstractQueuedSynchronizer又继承了AbstractOwnableSynchronizer类,这个类需要我们重点关注下。因为里边有一个AQS的独占线程对象方法setExclusiveOwnerThread

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable {.....}package java.util.concurrent.locks;import java.io.Serializable;public abstract class AbstractOwnableSynchronizer implements Serializable {    private static final long serialVersionUID = 3737899427754241961L;    private transient Thread exclusiveOwnerThread;    protected AbstractOwnableSynchronizer() {    }    // 设置AQS的独占线程对象    protected final void setExclusiveOwnerThread(Thread thread) {        this.exclusiveOwnerThread = thread;    }    protected final Thread getExclusiveOwnerThread() {        return this.exclusiveOwnerThread;    }}

Sync则继续提供了两种实现,也就是FairSync和NonfailSync.也就是我们常说的公平锁和非公平锁。ReentranLock则提供了非公平Sync的默认构造器,且提供了可选的构造器。

/**     * Creates an instance of {@code ReentrantLock}.     * This is equivalent to using {@code ReentrantLock(false)}.     */    public ReentrantLock() {        sync = new NonfairSync();    }    /**     * Creates an instance of {@code ReentrantLock} with the     * given fairness policy.     *     * @param fair {@code true} if this lock should use a fair ordering policy     */    public ReentrantLock(boolean fair) {        sync = fair ? new FairSync() : new NonfairSync();    }

NonfairSync

ReentranLock的内部非公平实现NonfairSync代码如下:

 /**     * Sync object for non-fair locks     */    static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        /**         * Performs lock.  Try immediate barge, backing up to normal         * acquire on failure.         */        final void lock() {            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }

lock方法

我们先看下lock方法:

final void lock() {            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else                acquire(1);}

当线程T1进入时,通过CAS操作修改state变量的状态.

  • • 如果此时没有其他线程一起竞争,则返回true。然后设置当前独占锁的线程对象为当前线程(Thread.currentThread()).
  • • 如果有线程T2一起竞争,则进入acquire(1)方法。

acquire方法

acquire方法由NonfairSync的父类AbstractQueuedSynchronizer提供。

 public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

我们先看tryAcquire方法,这个方法由NonfairSync实现。

 protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires); }

nonfairTryAcquire方法由ReentranLock提供。

   final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }

当线程T1和T2都过来时,先获取AQS的state值,state值是一个volitale变量。

  • • 如果T1和T2都发现state值等于0则进行CAS操作设置state的值为1,但最终只有一个线程设置成功,并设置独占锁的线程为当前线程。假如T1抢占锁成功则返回true。整个流程结束。那么T2则返回false,抢锁失败!
  • • 如果T1先进来抢占锁成功了,那么state值就会通过CAS操作设置为1.由于volitale变量的可见性,T2进来时发现state不是0,而且发现当前抢锁成功的线程是T1,就直接返回false了,抢锁失败!
  • • 如果T1先抢锁成功且未释放锁,然后又再次进入此方法,那么发现自己就是拥有锁的线程,则给state值加1,锁重入。

以上我们说明了T1线程运气好抢锁成功的场景,而T2线程运气差总是抢锁失败。那么T2抢占锁失败之后返回了false,会如何呢?可以从tryAcquire方法看到,抢锁失败之后,会进一步执行下边的判断:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

我们来看下addWaiter方法。

addWaiter方法

  Node(Thread thread, Node mode) {     // Used by addWaiter            this.nextWaiter = mode;            this.thread = thread; }/**     * Creates and enqueues node for current thread and given mode.     *     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared     * @return the new node     */    private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure        Node pred = tail;        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        enq(node);        return node;    }

addWaiter方法传进来的参数是Node.EXCLUSIVE,看一下Node节点中对它的解释:

 static final class Node {        /** Marker to indicate a node is waiting in shared mode */        static final Node SHARED = new Node();        /** 标记Node节点是独占模式等待状态 */        static final Node EXCLUSIVE = null;        ..........}

T2虽然抢锁失败了,但是T2将会以独占模式节点的状态继续等待。T2进来addWaiter方法时,会包装为一个Node节点。假设T2进入时,此时AQS的tail节点尚未初始化。也就是说AQS对象的tail和head两个头尾Node节点都是NULL。那么就会进入enq方法先初始化:

  /**     * Inserts node into queue, initializing if necessary. See picture above.     * @param node the node to insert     * @return node's predecessor     */    private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) { // Must initialize                if (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }

可以看到tail赋值给t,t此时为null’就进行头结点的初始化了,然后把初始化的head节点引用给了tail。

继续for循环,tail就不为空了,tail赋值给t也不为空,就是初始化的那个节点。将其赋值给T2独占等待状态节点的prev节点。

然后将t,也就是尾结点TAIL更新为T2独占等待节点。

这样以来T2独占等待节点就成了AQS队列中的一个尾节点。接下来最重要的获取AQS队列节点的操作来了。

acquireQueued方法

 /**     * Acquires in exclusive uninterruptible mode for thread already in     * queue. Used by condition wait methods as well as acquire.     *     * @param node the node     * @param arg the acquire argument     * @return {@code true} if interrupted while waiting     */    final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

方法传递进来的参数就是T2独占等待节点,根据前边的描述,它的前一个节点就是哨兵节点也就是head节点,此时T2虽然抢锁失败而进入队列,但是它并不死心,它觉得此时T1可能已经释放锁了,所以想再次tryAcquire一次,如果真的如T2所愿,T1执行结束真的unlock的话,就会执行以下代码并返回interrupted值为false。

 if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted; } /** */将T2独占等待节点指针设置给head节点,也就是T2节点将从队列中出队。设置thread和prev都为null */ private void setHead(Node node) {        head = node;        node.thread = null;        node.prev = null;    }

但如果是T2想的美呢?也就是T1根本就没结束还在持有锁,那么tryAcquire获取的state值就不是0而且持有锁的线程还是T1.那么不好意思,T2线程Acquire失败了,此时将尝试park住T2线程。

  /**     * Checks and updates status for a node that failed to acquire.     * Returns true if thread should block. This is the main signal     * control in all acquire loops.  Requires that pred == node.prev.     *     * @param pred node's predecessor holding status     * @param node the node     * @return {@code true} if thread should block     */    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            /*             * This node has already set status asking a release             * to signal it, so it can safely park.             */            return true;        if (ws > 0) {            /*             * Predecessor was cancelled. Skip over predecessors and             * indicate retry.             */            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            /*             * waitStatus must be 0 or PROPAGATE.  Indicate that we             * need a signal, but don't park yet.  Caller will need to             * retry to make sure it cannot acquire before parking.             */            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }

通过debug来走读下这段代码吧!T2进来之后,先判断pred节点,也就是T2独占等待节点的prev节点的waitStatus状态,可以看到值为0,回顾下开始时提到的waitStatus:

waitStatus:表示当前节点线程的状态。比如:CANCELLED(1)SIGNAL(-1)CONDITION(-2)PROPAGATE(-3).

所以都不满足,于是就使用CAS更新prev节点waitStatus的状态(更新为SIGNAL=-1):

 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

然后继续执行acquireQueued方法中的下一次for循环,仍然进入了shouldParkAfterFailedAcquire方法中,但这一次由于waitStatus=-1满足第一个if条件,于是返回true。再继续判断parkAndCheckInterrupt方法是否满足条件:

 private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    }

当T2线程进入调用LockSupport.park方法时,T2线程阻塞在这里了。此时此刻,T1线程阻塞在:

System.in.read()

T2线程阻塞在LockSupport.park方法。通过调用jstack方法可以观察到此时的线程stack信息:

而T1线程则是RUNABLE状态:

当在控制台输入一行字符串时,T2线程返回了。因为T1线程此时unlock了。

然后T2线程调用Thread.interruped方法检测线程T2是否被中断,本次返回了false。

这里如果检测到T2被中断则清除中断标识返回true,如果返回了true则会调Thread.currentThread().interrupt();重新还原T2的中断标识。

但是由于此时T1线程已经unlock,AQS的state变量值变为0了。T2线程成功的获取到了锁,返回true。

unlock方法

执行完业务逻辑之后,在finally中通常会调用比如ReentranLock的unlock方法,这里我们就看下ReentranLock的unlock方法。

/**     * Attempts to release this lock.     *     * <p>If the current thread is the holder of this lock then the hold     * count is decremented.  If the hold count is now zero then the lock     * is released.  If the current thread is not the holder of this     * lock then {@link IllegalMonitorStateException} is thrown.     *     * @throws IllegalMonitorStateException if the current thread does not     *         hold this lock     */    public void unlock() {        sync.release(1);    }

同样是调用了AQS的release方法,其中可以看到tryRelease这个方法,这个方法是ReentranLock提供的。

/**     * Releases in exclusive mode.  Implemented by unblocking one or     * more threads if {@link #tryRelease} returns true.     * This method can be used to implement method {@link Lock#unlock}.     *     * @param arg the release argument.  This value is conveyed to     *        {@link #tryRelease} but is otherwise uninterpreted and     *        can represent anything you like.     * @return the value returned from {@link #tryRelease}     */    public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }

确切的说,实现了AQS的对象很多都提供了这个方法。而AQS抽象类中提供了一个抛出异常的实现:

   protected boolean tryRelease(int arg) {        throw new UnsupportedOperationException();    }
//ReentranLock提供 protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }

这里的逻辑很明显了,就是要释放state和独占线程这两个资源给其他线程用了。回到AQS的release方法,如果tryRelease执行成功,则会对AQS队列中下一个Node进行unparkSuccessor操作。AQS提供了实现:

 /**     * Wakes up node's successor, if one exists.     *     * @param node the node     */    private void unparkSuccessor(Node node) {        /*         * If status is negative (i.e., possibly needing signal) try         * to clear in anticipation of signalling.  It is OK if this         * fails or if status is changed by waiting thread.         */        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        /*         * Thread to unpark is held in successor, which is normally         * just the next node.  But if cancelled or apparently null,         * traverse backwards from tail to find the actual         * non-cancelled successor.         */        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        if (s != null)            LockSupport.unpark(s.thread);    }

可以看到,在找到了后继者之后唤醒了这个节点的线程:

LockSupport.unpark(s.thread);

公平锁和非公平锁的区别

代码一贴,疑惑全解。

    /**     * Sync object for fair locks     */    static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;        final void lock() {            acquire(1);        }        /**         * Fair version of tryAcquire.  Don't grant access unless         * recursive call or no waiters or is first.         */        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }

其实公平锁的实现FairSync的tryAcquire与非公平实现NonfairSync的区别仅仅在于一行代码:

//抢占锁的时候先判断队列中有没有元素,如果有的话那直接返回false了。!hasQueuedPredecessors()

但是这行代码对应方法的注释挺长!

  /**     * Queries whether any threads have been waiting to acquire longer     * than the current thread.     *     * <p>An invocation of this method is equivalent to (but may be     * more efficient than):     *  <pre> {@code     * getFirstQueuedThread() != Thread.currentThread() &&     * hasQueuedThreads()}</pre>     *     * <p>Note that because cancellations due to interrupts and     * timeouts may occur at any time, a {@code true} return does not     * guarantee that some other thread will acquire before the current     * thread.  Likewise, it is possible for another thread to win a     * race to enqueue after this method has returned {@code false},     * due to the queue being empty.     *     * <p>This method is designed to be used by a fair synchronizer to     * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.     * Such a synchronizer's {@link #tryAcquire} method should return     * {@code false}, and its {@link #tryAcquireShared} method should     * return a negative value, if this method returns {@code true}     * (unless this is a reentrant acquire).  For example, the {@code     * tryAcquire} method for a fair, reentrant, exclusive mode     * synchronizer might look like this:     *     *  <pre> {@code     * protected boolean tryAcquire(int arg) {     *   if (isHeldExclusively()) {     *     // A reentrant acquire; increment hold count     *     return true;     *   } else if (hasQueuedPredecessors()) {     *     return false;     *   } else {     *     // try to acquire normally     *   }     * }}</pre>     *     * @return {@code true} if there is a queued thread preceding the     *         current thread, and {@code false} if the current thread     *         is at the head of the queue or the queue is empty     * @since 1.7     */    public final boolean hasQueuedPredecessors() {        // The correctness of this depends on head being initialized        // before tail and on head.next being accurate if the current        // thread is first in queue.        Node t = tail; // Read fields in reverse initialization order        Node h = head;        Node s;        return h != t &&            ((s = h.next) == null || s.thread != Thread.currentThread());    }

如果当前线程在队头或者队列是空的时候则返回false,才可以对state状态进行CAS更新操作。否则直接抢锁失败,返回false!

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » 源码分析: AbstractQueuedSynchronizer

评论 抢沙发

8 + 4 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮