【多线程系列】终于懂了 Java 中的各种锁

社区Java
源码版本
  • JDK 8
前言
  • Java 中提供了种类丰富的锁,每种锁因有不同的特性在不同的场景能够展现出较高的性能,本文在概念的基础上结合源码 + 使用场景进行举例,让读者对 Java 中的锁有更加深刻的认识,Java 中按照是否包含某一特性来定义锁,下面是本文中介绍的锁的分类图:

picture.image

乐观锁 & 悲观锁
  • 乐观锁和悲观锁是一种广义上的概念,体现了线程对互斥资源进行同步的两种不同的态度,在 Java 和数据中都有实际的运用。

概念

  • 对一个互斥资源的同步操作,悲观锁认为自己访问时,一定有其它线程来修改,因此在访问互斥资源时悲观锁会先加锁;而乐观锁认为自己在访问时不会有其它线程来修改,访问时不加锁,而是在更新数据时去判断有无被其他线程修改,若没被修改则写入成功,若被其他线程修改则进行重试或报错。

picture.image

适应场景

  • 由上面我们可以看出,乐观锁适用于读操作多的场景,而悲观锁适用于写操作多的场景。

源码分析

  • 我们常见的synchronized、ReentrantLock 都属于悲观锁,而AtomicInteger.incrementAndGet 则属于乐观锁。
        // ----------------- 悲观锁 -------------------------
        synchronized (MUTEX) {
            // 同步代码块
        }

        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        // 同步代码块
        lock.unlock();

        // ----------------- 乐观锁 -------------------------

        AtomicInteger atomicInteger = new AtomicInteger(0);
        atomicInteger.incrementAndGet();

// 悲观锁的实现方式很直观,先进行加锁,然后访问互斥资源,最后释放锁;那么乐观锁时如何实现的呢?我们通过介绍乐观锁主要的实现方式 CAS 来为大家解惑。
// 这里简单给大家回顾一下 CAS ,有需要了解更多的读者请去阅读 CAS 章节。

CAS全称 Compare And Swap(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞)的情况下实现多线程之间的同步。
CAS算法涉及到三个操作数:当前内存值 V、原始值 A、要写入的新值 B。
当且仅当 V 的值等于 A 时,CAS通过原子方式用新值B来更新V的值(“比较+更新”整体是一个原子操作),否则不会执行任何操作。

//  atomicInteger.incrementAndGet() 使用上述方式实现:

    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            // v 表示获取到的内存中的当前值
            v = getIntVolatile(o, offset);
        // compareAndSwapInt() 是一个原子操作、进行比较更新
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }
阻塞 & 非阻塞
  • 了解阻塞和非阻塞前,大家需要知道唤醒和阻塞一个Java线程需要操作系统进行用户态到内核态的切换,这种切换是十分耗时处理器时间的,如果同步代码块的内容过于简单,状态转换消耗的时间可能比用户代码执行时间还长,这是十分不划算的,因此我们引入了非阻塞的概念。

概念

  • 从上面的介绍中我们其实已经可以了解到阻塞和非阻塞的概念。多线程访问互斥资源时,当互斥资源已被占用,阻塞线程,当互斥释放时,唤醒线程进行竞争称为阻塞式同步;而当互斥资源被占用时,不进行线程阻塞而通过自旋等待其它线程释放锁或直接返回错误的方式称为非阻塞式同步,自旋方式又可以分为普通自旋和自适应自旋。

picture.image

使用场景

  • 非阻塞自旋的方式本身是有缺点的,不能完全代替阻塞同步,非阻塞自旋虽然避免了线程切换的开销但是会占用处理器的时间,如果锁被占用的时间很短,那么自旋等待的效果很好,如果锁被占用时间很长那么只会白白浪费处理器时间。所以自旋一般会设置一定限制,比如Java中默认是10次(使用-XX:PreBlockSpin来修改)。
  • 自适应自旋意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。
  • 因此,阻塞式同步适用于同步代码块执行时间比较长,线程获取锁时间间隔比较长的场景,而非阻塞式同步适用于同步代码块执行比较短,线程获取锁时间间隔比较短的场景。

源码分析

  • ReentrantLock以及synchronized中的重量级锁都属于阻塞式同步,而 Java 中的原子操作类中的 CAS 失败后自旋则运用了非阻塞自旋的思想。
更正一下:网上很多文章说 synchronized中轻量级锁则运用了非阻塞自旋的思想,其实上是错误的;
实际上一次 CAS 尝试获取轻量级锁失败后直接升级为重量级锁,而不会自旋。
公平锁 & 非公平锁

概念

  • 公平锁和非公平锁指的是获取线程获取锁时的顺序。公平锁指按照锁申请的顺序来获取锁,线程直接进入队列中,队列中的第一个线程才能获取锁。非公平锁指多个线程获取锁时,直接尝试获取锁,只有当线程未获取到锁时才放入队列中。

picture.image

picture.image

适应场景

  • 公平锁的优点是不会造成饥饿,但整体性能会比非公平锁低,因为除等待队列中的第一个线程,其它线程都需要进行阻塞和唤醒操作。而非公平锁有几率直接获得锁,减少了线程阻塞和唤醒的次数,但可能会造成饥饿。因此在饥饿无影响或不会产生饥饿的场景下优先考虑非公平锁。

源码分析

  • ReentrantLock 提供了公平锁和非公平锁两种实现,默认使用非公平锁。

非公平锁

    final void lock() {
        // 多次尝试获取锁,避免将线程阻塞再唤醒
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
         else
            acquire(1);
    }
    
    public final void acquire(int arg) {
        // 尝试获取锁失败后再放入等待队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
        // 重点:不判断队列中是否有排队线程直接获取锁
        if (compareAndSetState(0, acquires)) {            
            setExclusiveOwnerThread(current);
            return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
            }
        return false;
    }
  • 我们可以注意到非公平锁实现中两次尝试使用compareAndSetState()来获取锁,其实这里就是类似自旋的作用,避免线程阻塞再唤醒的过程,从而提高性能。

公平锁

    final void lock() {
        acquire(1);
    }
        
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
        // 重点:当互斥资源未被占用时,先判断队列中是否存在等待线程,若无尝试竞争锁,若有或竞争失败则将当前线程放入队列中
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(current);
             return true;
            }
        }else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
            }
            return false;
        }
    }
    
    // 判断当前线程是否位于同步队列中的第一个。如果是则返回true,否则返回false。
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
可重入锁 & 不可重入锁

概念

  • 可重入锁又称递归锁,是指同一线程在外层获取锁后,进入内层方法再次获取同一锁时会自动获取锁。可重入锁的好处是可以一定程度避免死锁。

picture.image

源码分析

  • Java 中 ReentrantLock 和 synchronized 都是可重入锁,我们以 ReentrantLock 为例进行分析:
// ReentrantLock FairSync
// 获取锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 重点:在已经获取锁的情况下,对比当前线程ID和占用锁线程ID是否一致,若一致锁计数器 +1
    // 不可重入的情况下,则无此判断
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// 释放锁
protected final boolean tryRelease(int releases) {
    // 每次释放时进行-1
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 直到计数器为 0 代表锁释放
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
排它锁 & 共享锁

概念

  • 排它锁和共享锁的主要区别在于互斥资源锁是否能被多个线程同时持有。同时只能被一个线程持有称为排它锁;当能够被多个线程同时持有称为共享锁。

作用

  • 进一步细化加锁粒度,提高并发性能。比如我们常见读写锁,实现读读不互斥,高效并发读,而读写、写读、写写的过程互斥。

源码分析

  • 我们以 ReentrantReadWriteLock 读写锁为例,ReentrantReadWriteLock 中有两把锁 ReadLock 和 WriteLock ,一个是读锁为共享锁,一个是写锁为排它锁:
  • 当前线程已获取读锁无写锁,其它线程可以获取读锁;当前线程已获取写锁,仅当前线程可以获取读锁。
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
    // 读锁
    ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    // 写锁
    ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    // 读锁 公平锁
    public void lock() {
        sync.acquireShared(1);
    }


    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
    }


    protected final int tryAcquireShared(int unused) {
        /*
         * Walkthrough:
         * 1. If write lock held by another thread, fail.
         * 2. Otherwise, this thread is eligible for
         *    lock wrt state, so ask if it should block
         *    because of queue policy. If not, try
         *    to grant by CASing state and updating count.
         *    Note that step does not check for reentrant
         *    acquires, which is postponed to full version
         *    to avoid having to check hold count in
         *    the more typical non-reentrant case.
         * 3. If step 2 fails either because thread
         *    apparently not eligible or CAS fails or count
         *    saturated, chain to version with full retry loop.
         */
        Thread current = Thread.currentThread();
        int c = getState();
        // 存在写锁且不是当前线程
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
        // 当前线程持有写锁或仅有读锁或无锁
        int r = sharedCount(c);
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
            // 如果是第一次获取 初始化 firstReader、firstReaderHoldCount 不是第一次获取 对 readHolds  对应线程计数+1
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0) readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }


    final int fullTryAcquireShared(Thread current) {
        /*
         * This code is in part redundant with that in
         * tryAcquireShared but is simpler overall by not
         * complicating tryAcquireShared with interactions between
         * retries and lazily reading hold counts.
         */
        HoldCounter rh = null;
        // 自旋 不挂起线程
        for (; ; ) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                // 非当前线程获取到写锁获取失败
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) { 
                // 走到这里说明没有写锁被占有 判断是否存在重入
                // Make sure we're not acquiring read lock reentrantly
                // 当前线程为 firstReader 走下面 CAS
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        // cachedHoldCounter 没有缓存或缓存的不是当前线程
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            // 说明上一行是初始化 移除上面产生的初始化
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            // 是否已经达到读锁获取次数上限
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // CAS 获取锁
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 读锁初始化和计数
                if (sharedCount(c) == 0) {
                    // 第一次添加读锁
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // firstReader 为当前线程
                    firstReaderHoldCount++;
                } else {
                    // 否则更新 readHolds 对应线程读锁计数
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

    // 无法获取读锁,将获取读锁线程放入等待队列中
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted) selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
            }
        } finally {
            if (failed) cancelAcquire(node);
        }
    }

    // 写锁 公平锁
    public void lock() {
        sync.acquire(1);
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
    }


    protected final boolean tryAcquire(int acquires) {
        /*
         * Walkthrough:
         * 1. If read count nonzero or write count nonzero
         *    and owner is a different thread, fail.
         * 2. If count would saturate, fail. (This can only
         *    happen if count is already nonzero.)
         * 3. Otherwise, this thread is eligible for lock if
         *    it is either a reentrant acquire or
         *    queue policy allows it. If so, update state
         *    and set owner.
         */
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        // 存在读锁或写锁
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 存在读锁或存在写锁但不是当前线程持有获取失败
            if (w == 0 || current != getExclusiveOwnerThread()) return false;
            // 获取锁是否超过上限
            if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 走到这里说明当前线程持有写锁 重入
            setState(c + acquires);
            return true;
        }
        // 不存在锁 判断队列阻塞策略 并进行 CAS 尝试获取锁
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;
        setExclusiveOwnerThread(current);
        return true;
    }

  • ReentrantReadWriteLock 巧妙的将AQS中的state一分为二高16位为读计数,低16为为写计数,将两个原子性操作(读竞争和写竞争)合并为一个原子操作。

picture.image

synchronized 中的无锁、偏向锁、轻量级锁、重量级锁
  • synchronized 中的无锁、偏向锁、轻量级锁、重量级锁是 synchronized 特有的概念,参考 volatile & synchronized 章节。
0
0
0
0
关于作者
相关资源
火山引擎边缘云游戏行业解决方案:“加速”游戏体验升级
《“加速”游戏体验升级,火山引擎边缘云游戏行业解决方案》 许思安 | 火山引擎边缘云高级总监
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论