回顾
- 前面我们讲解 JUC 中两个核心的基础工具 CAS 和 AQS,下面这篇文章我们聊聊 JUC 是如何使用这两大核心组件实现同步器
- 【多线程系列】高效的 CAS (Compare and Swap)
- 【多线程系列】CAS 常见的两个升级版本 CLH、MCS
- 【多线程系列】JUC 中的另一重要大杀器 AQS 抽象队列同步器
导读
- 了解如何基于 AQS 实现自己的同步器
- ReentrantLock、ReentrantReadWriteLock 实现原理
基于 AQS 实现的同步器
- JUC 并发包中一部分同步器都是基于 AQS 实现,前面介绍 AQS 时提到过模版方法,而同步器的不同特性主要通过重写这些模板方法实现:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
- 下面是基于 AQS 同步器的主要流程,标注了实现公平锁、非公平锁、可重入、共享等特性的拓展点:
ReentrantLock
- ReentrantLock 是基于 AQS 实现的可重入式独占锁,支持公平锁和非公平锁两种模式。
简单使用示例
public class MainTest {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
System.out.println("线程1获取锁");
// 条件等待 释放锁
System.out.println("线程1条件等待、释放锁");
condition.awaitUninterruptibly();
System.out.println("线程1重新获取锁");
lock.unlock();
System.out.println("线程1释放锁");
}).start();
new Thread(() -> {
lock.lock();
System.out.println("线程2获取锁");
// 条件等待 释放锁
System.out.println("线程唤醒条件队列的的一个锁");
condition.signal();
lock.unlock();
System.out.println("线程2释放锁");
}).start();
}
}
// 运行结果:
线程1获取锁
线程1条件等待、释放锁
线程2获取锁
线程唤醒条件队列的的一个锁
线程2释放锁
线程1重新获取锁
线程1释放锁
公平锁和非公平锁
- ReentrantLock 通过继承 AQS 实现 FairSync、NonfairSync 两种模式,默认提供非公平锁(整体性能更高):
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new ReentrantLock.NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync();
}
公平锁和非公平锁具体实现
锁获取
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平锁:当互斥资源未被占用,需要先判断等待队列中是否有线程,若有,先唤醒等待队列中线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 支持重入:当互斥资源被占用时,判断持有线程是否当前线程,若为当前线程,重入次数 +1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
/* 判断等待队列中是否存在等待中的线程 */
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 判断是否存在先等待的线程 具体分析如下
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
- hasQueuedPredecessors() 图示展示
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 非公平锁多次 CAS 直接尝试竞争锁 尽可能避免阻塞带来的上下文切换
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 非公平锁:直接竞争锁,不管等待是否存在等待节点
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
锁释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 获取独占锁的线程才可以释放线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state == 0 表示所有锁已释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可重入
- 当互斥资源被占用时,判断持有线程是否当前线程,若为当前线程,重入次数 +1
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 支持重入:当互斥资源被占用时,判断持有线程是否当前线程,若为当前线程,重入次数 +1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantReadWriteLock
- ReentrantLock 是独占式锁,当互斥资源读多写少时,性能较差;ReentrantReadWriteLock 通过 AQS 实现 ReadLock 和 WriteLock,实现了读写分离,从而达到读写互斥、读读不互斥,提高了线程并发性能。
- ReentrantReadWriteLock 同样支持公平锁和非公平锁(默认非公平锁),以及可重入的特性。
- 支持锁降级,但不支持锁升级。
锁降级指当一个线程获取写锁,再获取读锁,此时再释放写锁,这个过程称为锁降级。
锁升级指一个线程获取读锁,然后获取写锁(获取失败:获取读锁后无法同时再去获取写锁)。
使用示例
// 不支持锁升级验证
public class MainTest {
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
new Thread(()->{
readLock.lock();
System.out.println("子线程获取读锁");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
readLock.unlock();
System.out.println("子线程释放读锁");
}).start();
Thread.sleep(2000);
System.out.println("主线程开始获取写锁");
writeLock.lock();
System.out.println("主线程获取写锁成功");
writeLock.unlock();
System.out.println("主线程释放写锁成功");
}
}
// 输出
子线程获取读锁
主线程开始获取写锁
子线程释放读锁
主线程获取写锁成功
主线程释放写锁成功
如何记录读锁写锁的重入次数
- 我们都知道 AQS 使用 state 字段记录锁的重入次数,而 ReentrantReadWriteLock 创意性的将 state 字段的高 16 位用于表示读状态,低 16 位表示写状态,把两个写操作合并到一个 CAS 操作。
ReadLock
获取锁
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 存在写锁且不是当前线程
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
// 当前线程持有写锁或仅有读锁或无锁
int r = sharedCount(c);
// 根据公平锁和非公平锁重写 readerShouldBlock()
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第一次获取 初始化 firstReader、firstReaderHoldCount 不是第一次获取 对 readHolds 对应线程计数+1
if (r == 0) {
// 第一次添加读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// firstReader 为当前线程
firstReaderHoldCount++;
} else {
// 否则更新 readHolds 对应线程读锁计数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) readHolds.set(rh);
rh.count++;
}
return 1;
}
// 自旋尝试获取读锁(只要满足获取读锁条件)
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
// 自旋 不挂起线程
for (; ; ) {
int c = getState();
if (exclusiveCount(c) != 0) {
// 非当前线程获取到写锁 获取失败
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// 走到这里说明没有写锁被占有 判断是否存在重入
// Make sure we're not acquiring read lock reentrantly
// 当前线程为 firstReader 走下面 CAS
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
// cachedHoldCounter 没有缓存或缓存的不是当前线程
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
// 说明上一行是初始化 移除上面产生的初始化
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 是否已经达到读锁获取次数上限
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS 获取锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁初始化和计数
if (sharedCount(c) == 0) {
// 第一次添加读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// firstReader 为当前线程
firstReaderHoldCount++;
} else {
// 否则更新 readHolds 对应线程读锁计数
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
释放锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 根据共享锁次数来设置 firstReader 不存在并发修改问题
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// CAS 更新 state
for (; ; ) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
WriteLock
获取锁
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 存在读锁或写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 存在读锁或存在写锁但不是当前线程持有获取失败
if (w == 0 || current != getExclusiveOwnerThread()) return false;
// 获取锁是否超过上限
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 走到这里说明当前线程持有写锁 重入
setState(c + acquires);
return true;
}
// 不存在锁 判断公平非公平阻塞策略 || 进行 CAS 尝试获取锁
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;
setExclusiveOwnerThread(current);
return true;
}
释放锁
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
预告
- 下一篇我们将基于 AQS 实现自己的锁。
如果文章对大家有帮助,希望大家多多点赞关注,你的点赞关注是我更新的最大动力,感谢大家