【多线程系列】基于 AQS 实现的同步器源码精讲(ReentrantLock、ReentrantRe

社区Java
回顾
导读
  • 了解如何基于 AQS 实现自己的同步器
  • ReentrantLock、ReentrantReadWriteLock 实现原理
基于 AQS 实现的同步器
  • JUC 并发包中一部分同步器都是基于 AQS 实现,前面介绍 AQS 时提到过模版方法,而同步器的不同特性主要通过重写这些模板方法实现:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
  • 下面是基于 AQS 同步器的主要流程,标注了实现公平锁、非公平锁、可重入、共享等特性的拓展点:

picture.image

ReentrantLock
  • ReentrantLock 是基于 AQS 实现的可重入式独占锁,支持公平锁和非公平锁两种模式。

简单使用示例

public class MainTest {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            System.out.println("线程1获取锁");
            // 条件等待 释放锁
            System.out.println("线程1条件等待、释放锁");
            condition.awaitUninterruptibly();
            System.out.println("线程1重新获取锁");
            lock.unlock();
            System.out.println("线程1释放锁");
        }).start();

        new Thread(() -> {
            lock.lock();
            System.out.println("线程2获取锁");
            // 条件等待 释放锁
            System.out.println("线程唤醒条件队列的的一个锁");
            condition.signal();
            lock.unlock();
            System.out.println("线程2释放锁");
        }).start();
    }
}

// 运行结果:
线程1获取锁
线程1条件等待、释放锁
线程2获取锁
线程唤醒条件队列的的一个锁
线程2释放锁
线程1重新获取锁
线程1释放锁

公平锁和非公平锁

  • ReentrantLock 通过继承 AQS 实现 FairSync、NonfairSync 两种模式,默认提供非公平锁(整体性能更高):
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new ReentrantLock.NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync();
    }

公平锁和非公平锁具体实现

锁获取

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 公平锁:当互斥资源未被占用,需要先判断等待队列中是否有线程,若有,先唤醒等待队列中线程
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            // 支持重入:当互斥资源被占用时,判断持有线程是否当前线程,若为当前线程,重入次数 +1
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

    /* 判断等待队列中是否存在等待中的线程 */
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // 判断是否存在先等待的线程 具体分析如下
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }

  • hasQueuedPredecessors() 图示展示

picture.image

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        // 非公平锁多次 CAS 直接尝试竞争锁 尽可能避免阻塞带来的上下文切换
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 非公平锁:直接竞争锁,不管等待是否存在等待节点
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

锁释放

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 获取独占锁的线程才可以释放线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // state == 0 表示所有锁已释放
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

可重入

  • 当互斥资源被占用时,判断持有线程是否当前线程,若为当前线程,重入次数 +1
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            // 支持重入:当互斥资源被占用时,判断持有线程是否当前线程,若为当前线程,重入次数 +1
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
ReentrantReadWriteLock
  • ReentrantLock 是独占式锁,当互斥资源读多写少时,性能较差;ReentrantReadWriteLock 通过 AQS 实现 ReadLock 和 WriteLock,实现了读写分离,从而达到读写互斥、读读不互斥,提高了线程并发性能。
  • ReentrantReadWriteLock 同样支持公平锁和非公平锁(默认非公平锁),以及可重入的特性。
  • 支持锁降级,但不支持锁升级。
锁降级指当一个线程获取写锁,再获取读锁,此时再释放写锁,这个过程称为锁降级。
锁升级指一个线程获取读锁,然后获取写锁(获取失败:获取读锁后无法同时再去获取写锁)。

使用示例

// 不支持锁升级验证
public class MainTest {
    public static void main(String[] args) throws InterruptedException {
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

        new Thread(()->{
            readLock.lock();
            System.out.println("子线程获取读锁");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            readLock.unlock();
            System.out.println("子线程释放读锁");
        }).start();

        Thread.sleep(2000);
        System.out.println("主线程开始获取写锁");
        writeLock.lock();
        System.out.println("主线程获取写锁成功");
        writeLock.unlock();
        System.out.println("主线程释放写锁成功");
    }
}

// 输出
子线程获取读锁
主线程开始获取写锁
子线程释放读锁
主线程获取写锁成功
主线程释放写锁成功

如何记录读锁写锁的重入次数

  • 我们都知道 AQS 使用 state 字段记录锁的重入次数,而 ReentrantReadWriteLock 创意性的将 state 字段的高 16 位用于表示读状态,低 16 位表示写状态,把两个写操作合并到一个 CAS 操作。

picture.image

ReadLock

获取锁

    protected final int tryAcquireShared(int unused) {
        /*
         * Walkthrough:
         * 1. If write lock held by another thread, fail.
         * 2. Otherwise, this thread is eligible for
         *    lock wrt state, so ask if it should block
         *    because of queue policy. If not, try
         *    to grant by CASing state and updating count.
         *    Note that step does not check for reentrant
         *    acquires, which is postponed to full version
         *    to avoid having to check hold count in
         *    the more typical non-reentrant case.
         * 3. If step 2 fails either because thread
         *    apparently not eligible or CAS fails or count
         *    saturated, chain to version with full retry loop.
         */
        Thread current = Thread.currentThread();
        int c = getState();
        // 存在写锁且不是当前线程
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
        // 当前线程持有写锁或仅有读锁或无锁
        int r = sharedCount(c);
        // 根据公平锁和非公平锁重写 readerShouldBlock()
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
            // 如果是第一次获取 初始化 firstReader、firstReaderHoldCount 不是第一次获取 对 readHolds  对应线程计数+1
            if (r == 0) {
                // 第一次添加读锁
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // firstReader 为当前线程
                firstReaderHoldCount++;
            } else {
                // 否则更新 readHolds 对应线程读锁计数
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0) readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        // 自旋尝试获取读锁(只要满足获取读锁条件)
        return fullTryAcquireShared(current);
    }


    final int fullTryAcquireShared(Thread current) {
        /*
         * This code is in part redundant with that in
         * tryAcquireShared but is simpler overall by not
         * complicating tryAcquireShared with interactions between
         * retries and lazily reading hold counts.
         */
        HoldCounter rh = null;
        // 自旋 不挂起线程
        for (; ; ) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                // 非当前线程获取到写锁 获取失败
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) { 
                // 走到这里说明没有写锁被占有 判断是否存在重入
                // Make sure we're not acquiring read lock reentrantly
                // 当前线程为 firstReader 走下面 CAS
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        // cachedHoldCounter 没有缓存或缓存的不是当前线程
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            // 说明上一行是初始化 移除上面产生的初始化
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            // 是否已经达到读锁获取次数上限
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // CAS 获取锁
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 读锁初始化和计数
                if (sharedCount(c) == 0) {
                    // 第一次添加读锁
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // firstReader 为当前线程
                    firstReaderHoldCount++;
                } else {
                    // 否则更新 readHolds 对应线程读锁计数
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

释放锁

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        // 根据共享锁次数来设置 firstReader 不存在并发修改问题
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            Sync.HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //  CAS 更新 state
        for (; ; ) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

WriteLock

获取锁

 protected final boolean tryAcquire(int acquires) {
        /*
         * Walkthrough:
         * 1. If read count nonzero or write count nonzero
         *    and owner is a different thread, fail.
         * 2. If count would saturate, fail. (This can only
         *    happen if count is already nonzero.)
         * 3. Otherwise, this thread is eligible for lock if
         *    it is either a reentrant acquire or
         *    queue policy allows it. If so, update state
         *    and set owner.
         */
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        // 存在读锁或写锁
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 存在读锁或存在写锁但不是当前线程持有获取失败
            if (w == 0 || current != getExclusiveOwnerThread()) return false;
            // 获取锁是否超过上限
            if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 走到这里说明当前线程持有写锁 重入
            setState(c + acquires);
            return true;
        }
        // 不存在锁 判断公平非公平阻塞策略 || 进行 CAS 尝试获取锁
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;
        setExclusiveOwnerThread(current);
        return true;
    }

释放锁

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
预告
  • 下一篇我们将基于 AQS 实现自己的锁。

如果文章对大家有帮助,希望大家多多点赞关注,你的点赞关注是我更新的最大动力,感谢大家

0
0
0
0
关于作者
相关资源
云原生机器学习系统落地和实践
机器学习在字节跳动有着丰富业务场景:推广搜、CV/NLP/Speech 等。业务规模的不断增大对机器学习系统从用户体验、训练效率、编排调度、资源利用等方面也提出了新的挑战,而 Kubernetes 云原生理念的提出正是为了应对这些挑战。本次分享将主要介绍字节跳动机器学习系统云原生化的落地和实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论