回顾
- 前面我们讲解 JUC 中一个重要的基础工具 CAS, 今天我们来分享 JUC 中的另一重要工具 AQS
- 【多线程系列】高效的 CAS (Compare and Swap)
- 【多线程系列】CAS 常见的两个升级版本 CLH、MCS
导读
- AQS 是什么、底层原理(独占模式、共享模式实现)
- AQS 变种 CLH 相比于原始 CLH 的改变
版本及说明
AQS
- AQS 全称是 AbstractQueuedSynchronizer,是 Java 并发包中的一个抽象类,用于构建各种同步器和锁,如 ReentrantLock、CountDownLatch、Semaphore 等。
核心思想
- 基于 CAS 和 变种 CLH 实现对互斥资源的访问;访问互斥资源时,当互斥资源空闲时,通过 CAS 操作将互斥资源置为锁定状态,并将访问线程置为当前线程,当互斥资源被其他线程锁定时,通过变种 CLH 实现的逻辑 FIFO 队列实现对线程的阻塞以及资源释放时的唤醒机制。
结构
- AQS 使用 int 成员变量 state 表示同步状态,通过内置的 FIFO 等待队列 来完成获取资源线程的排队工作。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
private transient volatile Node tail;
// 使用 volatile 保证变量的可见性
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 提供 CAS 操作更新 state 的值
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
state 状态
状态名 | 描述 |
---|---|
SIGNAL(-1) | 表示该节点正常等待 |
PROPAGATE(-3) | 应将 releaseShared 传播到其他节点 |
CONDITION(-2) | 该节点位于条件队列,不能用于同步队列节点 |
CANCELLED(1) | 由于超时、中断或其他原因,该节点被取消 |
(0) | 节点初始状态 |
Node 节点
static final class Node {
/**
* Marker to indicate a node is waiting in shared mode
*/
static final Node SHARED = new Node();
/**
* Marker to indicate a node is waiting in exclusive mode
*/
static final Node EXCLUSIVE = null;
/**
* Status field, taking on only the values:
*/
volatile int waitStatus;
// 前置节点
volatile Node prev;
// 后置线程
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED.
*/
Node nextWaiter;
}
独占模式和共享模式
- AQS 支持两种资源共享方式:Exclusive(独占,只有一个线程能执行,如基于独占模式实现的 ReentrantLock)和 Share(共享,多个线程可同时执行,如基于共享模式实现的 Semaphore/CountDownLatch)。
独占模式
获取锁
/**
* 获取独占锁主流程:
* 1、阻塞获取锁,获取锁逻辑由具体同步器重写 tryAcquire() 实现
* 2、获取锁成功直接返回,获取锁失败进入 FIFO 线程进行线程的阻塞和唤醒
* 2.1、调用 addWaiter() 将当前线程封装为 Node 节点并入队
* 2.2、入队成功后在 acquireQueued() 方法尝试自旋获取锁或阻塞当前线程
*
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 将当前线程封装为 Node 节点并入队
*
* @param mode
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 当队尾节点不为 null 时使用 CAS 快速入队
// 这种写法可以借鉴,可以提高性能(减少小概率的临界值判断)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速入队失败 重新入队直到入队成功
enq(node);
return node;
}
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
// 队尾节点为空 初始化一个哨兵节点
// 作用:统一处理逻辑,首节点是哨兵节点或持有锁线程(正在持有或已释放唤起后续线程)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// CAS 入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Node 节点入 FIFO 等待队列后 进行 CAS 操作获取锁或线程阻塞
* @param node
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
// 获取锁结果
boolean failed = true;
try {
// 是否被中断
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
// 当前节点为等待队列中的第二个节点 尝试 CAS 获取锁
// 前置可能节点为 哨兵节点 或 已经释放锁节点 尝试 CAS 获取锁
if (p == head && tryAcquire(arg)) {
// 获取成功设置当前线程为 头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 非第二节点/获取失败判断是否阻塞当前线程 & 阻塞线程并判断线程是否被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 超时、中断导致线程获取锁失败时 标记节点状态为 Cancel
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前置节点 处于等待状态 当前节点线程阻塞挂起
if (ws == Node.SIGNAL)
return true;
// 前置节点已取消 去除队列中已取消节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 前置节点状态只能是 0 或 PROPAGATE 可能需要等待
// 将前置节点 状态置为 SIGNAL 并 重新尝试是否可以获取锁
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒节点条件:pred == head或者pred.thread == null 第一个节点;
// ((ws = pred.waitStatus) != Node.SIGNAL 并且 (ws >0 || compareAndSetWaitStatus(pred, ws, Node.SIGNAL) == false)):前置节点突然释放锁
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 等待队列不为空 同时状态不为 初始状态(节点初始化已完成)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将头节点状态置为 初始状态 0
compareAndSetWaitStatus(node, ws, 0);
// 从尾到头查找到最早的入队可以唤醒的节点(不包括头节点)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒找到的节点
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式
获取锁
/**
* 获取共享锁主流程:
* 1、尝试获取贡献锁 获取锁逻辑由具体同步器重写 tryAcquire() 实现
* 2、获取锁成功直接返回,获取锁失败进入 FIFO 线程进行线程的阻塞和唤醒
* 2.1、调用 addWaiter() 将当前线程封装为 Node 节点并入队
*2.2、入队成功后尝试自旋获取锁(获取成功后走共享锁唤醒逻辑 setHeadAndPropagate)或阻塞当前线程
* @param arg
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头节点 & 共享锁传播唤醒逻辑
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 和独占锁一致
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 和独占锁一致
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置为头结点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点
if (s == null || s.isShared())
// 读锁唤醒往后传播(A 被唤醒获取锁唤醒 B ,B 被唤醒被获取锁唤醒 C...)
// 见释放锁
doReleaseShared();
}
}
释放锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 唤醒后一个等待节点
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
*/
for (;;) {
// 执行唤醒逻辑(如果从setHeadAndPropagate方法调用该方法,那么这里的head是新的头节点)
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// CAS原子操作,因为setHeadAndPropagate和releaseShared这两个方法都会调用doReleaseShared,避免多次unpark唤醒操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒节点
unparkSuccessor(h);
}
// 如果后继节点暂时不需要唤醒,那么当前头节点状态更新为PROPAGATE,确保后续可以传递给后继节点
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 防止其它线程设置了头节点,其它线程已经获取锁,交给其它线程处理
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将头节点状态置为 初始状态 0
compareAndSetWaitStatus(node, ws, 0);
// 从尾到头查找到最早的入队可以唤醒的节点(不包括头节点)
// 从尾到头的原因:避免已经入队但通过 next 节点查找不到(https://blog.csdn.net/foxException/article/details/108917338)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒找到的节点
if (s != null)
LockSupport.unpark(s.thread);
}
模版方法的使用
- AQS 使用了模板方法模式,当实现自定义同步器时需要重写下面几个 AQS 提供的钩子方法:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
AQS 条件队列 Condition
- 在 AQS 的基础上,Java 提供了一个更高级的同步工具 Condition,它允许线程在特定条件下等待和唤醒,以实现更复杂的线程间通信。
- 实现 synchronized 对象锁中的wait、notify、notifyAll, Condition 支持多条件,可以实现更细粒度的控制。
- 仅支持独占锁。
使用示例
public class MainTest {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
System.out.println("线程1获取锁");
// 条件等待 释放锁
System.out.println("线程1条件等待、释放锁");
condition.awaitUninterruptibly();
System.out.println("线程1重新获取锁");
lock.unlock();
System.out.println("线程1释放锁");
}).start();
new Thread(() -> {
lock.lock();
System.out.println("线程2获取锁");
// 条件等待 释放锁
System.out.println("线程唤醒条件队列的的一个锁");
condition.signal();
lock.unlock();
System.out.println("线程2释放锁");
}).start();
}
}
// 运行结果:
线程1获取锁
线程1条件等待、释放锁
线程2获取锁
线程唤醒条件队列的的一个锁
线程2释放锁
线程1重新获取锁
线程1释放锁
类图
ConditionObject 类结构
- Condition 接口提供了常见的标准方法,ConditionObject 类是具体实现。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
Condition 接口提供的方法
//响应线程中断的条件等待
void await() throws InterruptedException;
//不响应线程中断的条件等待
void awaitUninterruptibly();
//设置相对时间的条件等待(不进行自旋)
long awaitNanos(long nanosTimeout) throws InterruptedException;
//设置相对时间的条件等待(进行自旋)
boolean await(long time, TimeUnit unit) throws InterruptedException;
//设置绝对时间的条件等待
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒条件队列中的头结点
void signal();
//唤醒条件队列的所有结点
void signalAll();
核心方法
- 下面以 await、signal 两个核心方法介绍 Condition 的底层实现。
await()
public final void await() throws InterruptedException {
// 如果线程被中断 抛出中断异常
if (Thread.interrupted()) throw new InterruptedException();
// 将节点加入到条件队列
Node node = addConditionWaiter();
// 释放之前获取的锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当不再同步队列时才挂起线程(因为唤醒时会重新加入同步队列竞争锁)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// AQS acquireQueued 方法逻辑 加入同步队列后等待获取锁逻辑
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 中断逻辑处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 从后往前清理已经取消的线程
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程加入到条件队列中(获取互斥锁时执行 所有不用加锁)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// AQS 释放锁逻辑
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前节点的 state
int savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
final boolean isOnSyncQueue(Node node) {
// 说明在条件队列中,不再同步队列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
// 从同步队列尾部开始遍历线程是否在同步队列中
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (; ; ) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
signal()
public final void signal() {
// 当前持有锁线程才可唤醒
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 存在需要唤醒的线程
if (first != null)
doSignal(first);
}
/**
* 遍历条件队列 从前往后尝试获取一个有效的线程(非取消)
*
* @param first
*/
private void doSignal(Node first) {
do {
// firstWaiter 头节点指向条件队列头的下一个节点
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 原来的头节点和同步队列断开
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 是否唤醒一个有效线程(从前往后依次尝试)
final boolean transferForSignal(Node node) {
// 判断节点是否已经在之前被取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 调用 enq 添加到 同步队列的尾部
Node p = enq(node);
int ws = p.waitStatus;
// 同步节点前置节点 修改为 SIGNAL 等待后续唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
// AQS 入同步队列逻辑
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
// 尾节点为空 需要初始化头节点,此时头尾节点是一个
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 不为空 循环赋值
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// signalAll() 区别在于会唤醒条件队列中的所有等待线程
private void doSignalAll(AbstractQueuedSynchronizer.Node first) {
lastWaiter = firstWaiter = null;
do {
AbstractQueuedSynchronizer.Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
补充
AQS 变种 CLH 改进点
- 将 CLH 自旋操作改为线程阻塞操作
- 扩展每个节点的状态、显式的维护前驱节点和后继节点以及出队节点显式设为 null 等辅助 GC 的优化来支持更多功能
参考
预告
- 下一篇文章将介绍基于 AQS 实现的同步器,也就是我们常常使用的 ReentrantLock 、ReentrantReadWriteLock 等等。
- 你的点赞收藏是我更新的最大动力,谢谢。