【多线程系列】JUC 中的另一重要大杀器 AQS 抽象队列同步器

Java社区
回顾
导读
  • AQS 是什么、底层原理(独占模式、共享模式实现)
  • AQS 变种 CLH 相比于原始 CLH 的改变
版本及说明
AQS
  • AQS 全称是 AbstractQueuedSynchronizer,是 Java 并发包中的一个抽象类,用于构建各种同步器和锁,如 ReentrantLock、CountDownLatch、Semaphore 等。

核心思想

  • 基于 CAS 和 变种 CLH 实现对互斥资源的访问;访问互斥资源时,当互斥资源空闲时,通过 CAS 操作将互斥资源置为锁定状态,并将访问线程置为当前线程,当互斥资源被其他线程锁定时,通过变种 CLH 实现的逻辑 FIFO 队列实现对线程的阻塞以及资源释放时的唤醒机制。

结构

picture.image

  • AQS 使用 int 成员变量 state 表示同步状态,通过内置的 FIFO 等待队列 来完成获取资源线程的排队工作。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private transient volatile Node head;

    private transient volatile Node tail;

    // 使用 volatile 保证变量的可见性
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    // 提供 CAS 操作更新 state 的值
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
 }

state 状态

状态名描述
SIGNAL(-1)表示该节点正常等待
PROPAGATE(-3)应将 releaseShared 传播到其他节点
CONDITION(-2)该节点位于条件队列,不能用于同步队列节点
CANCELLED(1)由于超时、中断或其他原因,该节点被取消
(0)节点初始状态

Node 节点

static final class Node {
    /**
     * Marker to indicate a node is waiting in shared mode
     */
    static final Node SHARED = new Node();
    /**
     * Marker to indicate a node is waiting in exclusive mode
     */
    static final Node EXCLUSIVE = null;

    /**
     * Status field, taking on only the values:
     */
    volatile int waitStatus;


    // 前置节点
    volatile Node prev;

    // 后置线程
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.
     */
    Node nextWaiter;

}

独占模式和共享模式

  • AQS 支持两种资源共享方式:Exclusive(独占,只有一个线程能执行,如基于独占模式实现的 ReentrantLock)和 Share(共享,多个线程可同时执行,如基于共享模式实现的 Semaphore/CountDownLatch)。

独占模式

获取锁

    /**
     * 获取独占锁主流程:
     * 1、阻塞获取锁,获取锁逻辑由具体同步器重写 tryAcquire() 实现
     * 2、获取锁成功直接返回,获取锁失败进入 FIFO 线程进行线程的阻塞和唤醒
     * 2.1、调用 addWaiter() 将当前线程封装为 Node 节点并入队
     * 2.2、入队成功后在 acquireQueued() 方法尝试自旋获取锁或阻塞当前线程
     *
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 将当前线程封装为 Node 节点并入队
     *
     * @param mode
     * @return
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 当队尾节点不为 null 时使用 CAS 快速入队
        // 这种写法可以借鉴,可以提高性能(减少小概率的临界值判断)
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速入队失败 重新入队直到入队成功
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            // 队尾节点为空 初始化一个哨兵节点
            // 作用:统一处理逻辑,首节点是哨兵节点或持有锁线程(正在持有或已释放唤起后续线程)
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // CAS 入队
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * Node 节点入 FIFO 等待队列后 进行 CAS 操作获取锁或线程阻塞
     * @param node
     * @param arg
     * @return
     */
    final boolean acquireQueued(final Node node, int arg) {
        // 获取锁结果
        boolean failed = true;
        try {
            // 是否被中断
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                // 当前节点为等待队列中的第二个节点 尝试 CAS 获取锁
                // 前置可能节点为 哨兵节点 或 已经释放锁节点 尝试 CAS 获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取成功设置当前线程为 头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 非第二节点/获取失败判断是否阻塞当前线程 & 阻塞线程并判断线程是否被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 超时、中断导致线程获取锁失败时 标记节点状态为 Cancel
            if (failed)
                cancelAcquire(node);
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前置节点 处于等待状态 当前节点线程阻塞挂起
        if (ws == Node.SIGNAL)
            return true;
        // 前置节点已取消 去除队列中已取消节点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 前置节点状态只能是 0 或 PROPAGATE 可能需要等待
            // 将前置节点 状态置为 SIGNAL 并 重新尝试是否可以获取锁
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 唤醒节点条件:pred == head或者pred.thread == null 第一个节点;
                // ((ws = pred.waitStatus) != Node.SIGNAL 并且 (ws >0 || compareAndSetWaitStatus(pred, ws, Node.SIGNAL) == false)):前置节点突然释放锁
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

释放锁

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 等待队列不为空 同时状态不为 初始状态(节点初始化已完成)
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            // 将头节点状态置为 初始状态 0
            compareAndSetWaitStatus(node, ws, 0);

        // 从尾到头查找到最早的入队可以唤醒的节点(不包括头节点)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒找到的节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享模式

获取锁

  /**
     * 获取共享锁主流程:
     * 1、尝试获取贡献锁 获取锁逻辑由具体同步器重写 tryAcquire() 实现
     * 2、获取锁成功直接返回,获取锁失败进入 FIFO 线程进行线程的阻塞和唤醒
     * 2.1、调用 addWaiter() 将当前线程封装为 Node 节点并入队
     *2.2、入队成功后尝试自旋获取锁(获取成功后走共享锁唤醒逻辑 setHeadAndPropagate)或阻塞当前线程
     * @param arg
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 设置头节点 & 共享锁传播唤醒逻辑
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 和独占锁一致
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 和独占锁一致
            if (failed)
                cancelAcquire(node);
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 设置为头结点
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点
            if (s == null || s.isShared())
                // 读锁唤醒往后传播(A 被唤醒获取锁唤醒 B ,B 被唤醒被获取锁唤醒 C...)
                // 见释放锁
                doReleaseShared();
        }
    }

释放锁

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 唤醒后一个等待节点
            doReleaseShared();
            return true;
        }
        return false;
    }

    private void doReleaseShared() {
         */
        for (;;) {
            // 执行唤醒逻辑(如果从setHeadAndPropagate方法调用该方法,那么这里的head是新的头节点)
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // CAS原子操作,因为setHeadAndPropagate和releaseShared这两个方法都会调用doReleaseShared,避免多次unpark唤醒操作
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒节点
                    unparkSuccessor(h);
                }
                // 如果后继节点暂时不需要唤醒,那么当前头节点状态更新为PROPAGATE,确保后续可以传递给后继节点
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 防止其它线程设置了头节点,其它线程已经获取锁,交给其它线程处理
            if (h == head)                   // loop if head changed
                break;
        }
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            // 将头节点状态置为 初始状态 0
            compareAndSetWaitStatus(node, ws, 0);

        // 从尾到头查找到最早的入队可以唤醒的节点(不包括头节点)
        // 从尾到头的原因:避免已经入队但通过 next 节点查找不到(https://blog.csdn.net/foxException/article/details/108917338)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒找到的节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

模版方法的使用

  • AQS 使用了模板方法模式,当实现自定义同步器时需要重写下面几个 AQS 提供的钩子方法:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()

AQS 条件队列 Condition

  • 在 AQS 的基础上,Java 提供了一个更高级的同步工具 Condition,它允许线程在特定条件下等待和唤醒,以实现更复杂的线程间通信。
  • 实现 synchronized 对象锁中的wait、notify、notifyAll, Condition 支持多条件,可以实现更细粒度的控制。
  • 仅支持独占锁。

picture.image

使用示例

public class MainTest {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            System.out.println("线程1获取锁");
            // 条件等待 释放锁
            System.out.println("线程1条件等待、释放锁");
            condition.awaitUninterruptibly();
            System.out.println("线程1重新获取锁");
            lock.unlock();
            System.out.println("线程1释放锁");
        }).start();

        new Thread(() -> {
            lock.lock();
            System.out.println("线程2获取锁");
            // 条件等待 释放锁
            System.out.println("线程唤醒条件队列的的一个锁");
            condition.signal();
            lock.unlock();
            System.out.println("线程2释放锁");
        }).start();
    }
}

// 运行结果:
线程1获取锁
线程1条件等待、释放锁
线程2获取锁
线程唤醒条件队列的的一个锁
线程2释放锁
线程1重新获取锁
线程1释放锁

类图

picture.image

ConditionObject 类结构

  • Condition 接口提供了常见的标准方法,ConditionObject 类是具体实现。
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
    }

Condition 接口提供的方法

 //响应线程中断的条件等待
   void await() throws InterruptedException;
   
   //不响应线程中断的条件等待
   void awaitUninterruptibly();
   
   //设置相对时间的条件等待(不进行自旋)
   long awaitNanos(long nanosTimeout) throws InterruptedException;
   
   //设置相对时间的条件等待(进行自旋)
   boolean await(long time, TimeUnit unit) throws InterruptedException;
   
   //设置绝对时间的条件等待
   boolean awaitUntil(Date deadline) throws InterruptedException;
   
   //唤醒条件队列中的头结点
   void signal();
   
   //唤醒条件队列的所有结点
   void signalAll();

核心方法

  • 下面以 await、signal 两个核心方法介绍 Condition 的底层实现。

await()

  public final void await() throws InterruptedException {
        // 如果线程被中断 抛出中断异常
        if (Thread.interrupted()) throw new InterruptedException();
        // 将节点加入到条件队列
        Node node = addConditionWaiter();
        // 释放之前获取的锁资源
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 当不再同步队列时才挂起线程(因为唤醒时会重新加入同步队列竞争锁)
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // AQS acquireQueued 方法逻辑 加入同步队列后等待获取锁逻辑
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 中断逻辑处理
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 从后往前清理已经取消的线程
            unlinkCancelledWaiters();
            t = lastWaiter;
        }

        // 将当前线程加入到条件队列中(获取互斥锁时执行 所有不用加锁)
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

    // AQS 释放锁逻辑
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取当前节点的 state
            int savedState = getState();
            // 释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    final boolean isOnSyncQueue(Node node) {
        // 说明在条件队列中,不再同步队列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

    // 从同步队列尾部开始遍历线程是否在同步队列中
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (; ; ) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

signal()

  public final void signal() {
        // 当前持有锁线程才可唤醒
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        // 存在需要唤醒的线程
        if (first != null)
            doSignal(first);
    }


    /**
     * 遍历条件队列 从前往后尝试获取一个有效的线程(非取消)
     *
     * @param first
     */
    private void doSignal(Node first) {
        do {
            // firstWaiter 头节点指向条件队列头的下一个节点
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            // 原来的头节点和同步队列断开
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

    // 是否唤醒一个有效线程(从前往后依次尝试)
    final boolean transferForSignal(Node node) {
        // 判断节点是否已经在之前被取消
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 调用 enq 添加到 同步队列的尾部
        Node p = enq(node);
        int ws = p.waitStatus;
        // 同步节点前置节点 修改为 SIGNAL 等待后续唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    // AQS 入同步队列逻辑
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            // 尾节点为空 需要初始化头节点,此时头尾节点是一个
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 不为空 循环赋值
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // signalAll() 区别在于会唤醒条件队列中的所有等待线程
    private void doSignalAll(AbstractQueuedSynchronizer.Node first) {
        lastWaiter = firstWaiter = null;
        do {
            AbstractQueuedSynchronizer.Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
补充

AQS 变种 CLH 改进点

  • 将 CLH 自旋操作改为线程阻塞操作
  • 扩展每个节点的状态、显式的维护前驱节点和后继节点以及出队节点显式设为 null 等辅助 GC 的优化来支持更多功能
参考
预告
  • 下一篇文章将介绍基于 AQS 实现的同步器,也就是我们常常使用的 ReentrantLock 、ReentrantReadWriteLock 等等。
  • 你的点赞收藏是我更新的最大动力,谢谢。
0
0
0
0
关于作者
相关资源
火山引擎边缘云游戏行业解决方案:“加速”游戏体验升级
《“加速”游戏体验升级,火山引擎边缘云游戏行业解决方案》 许思安 | 火山引擎边缘云高级总监
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论