Condition的使用
创建
1. `ReentrantLock reentrantLock = new ReentrantLock(true);`
2. `Condition condition = reentrantLock.newCondition();`
reentrantLock.newCondition()方法返回的对象类型是ConditionObject类型,ConditionObject是AbstractQueuedSynchronizer的内部类,它对象的创建依赖于外部类的对象,在它里面可以调用外部类中的方法。
等待
1. `try{`
2. `reentrantLock.lock();`
3. `condition.await();`
4. `} catch (InterruptedException e) {`
5. `e.printStackTrace();`
6. `}finally {`
7. `if(reentrantLock.isLocked()){`
8. `reentrantLock.unlock();`
9. `}`
10. `}`
11.
12.
13. `还有的等待方法有:`
14. `condition.await(5, TimeUnit.SECONDS);`
15. `condition.awaitNanos(10000);`
16. `condition.awaitUninterruptibly();`
17. `condition.awaitUntil(new Date())`
唤醒
1. `condition.signal();`
2. `condition.signalAll();`
Condition实现类ConditionObject
1. `public class ConditionObject implements Condition, java.io.Serializable {`
2. `private static final long serialVersionUID = 1173984872572414699L;`
3. `/** First node of condition queue. */`
4. `private transient Node firstWaiter;`
5. `/** Last node of condition queue. */`
6. `private transient Node lastWaiter;`
- firstWaiter:指向condition队列的第一个节点;
- lastWaiter:指向condition队列的最后一个节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()方法:
1. `public final void await() throws InterruptedException {`
2. `// 如果线程中断了,则抛出异常`
3. `if (Thread.interrupted())`
4. `throw new InterruptedException();`
5. `// 将当前waitStatus为condition状态的节点添加到condition的waiter队列中(将会在其中clean掉已经处于canceled状态的节点`
6. `Node node = addConditionWaiter();`
7. `// 获取到当前aqs的state(当前线程对锁的所有占有权),释放掉当前线程对锁的所有占有权,并调用aqs的release方法唤醒当前aqs头节点的继承节点(如果有的话)`
8. `int savedState = fullyRelease(node);`
9. `int interruptMode = 0;`
10. `// 如果当前节点不在同步队列中`
11. `while (!isOnSyncQueue(node)) {`
12. `// 阻塞当前线程,进入等待状态`
13. `LockSupport.park(this);`
14. `if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)`
15. `break;`
16. `}`
17.
18. `if (acquireQueued(node, savedState) && interruptMode != THROW_IE)`
19. `interruptMode = REINTERRUPT;`
20. `if (node.nextWaiter != null) // clean up if cancelled`
21. `unlinkCancelledWaiters();`
22. `if (interruptMode != 0)`
23. `reportInterruptAfterWait(interruptMode);`
24. `}`
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter:
1. `/**`
2. `* Adds a new waiter to wait queue.`
3. `* @return its new wait node`
4. `*/`
5. `private Node addConditionWaiter() {`
6. `// 如果当前线程不是锁的独占线程,则抛出异常`
7. `if (!isHeldExclusively())`
8. `throw new IllegalMonitorStateException();`
9. `Node t = lastWaiter;`
10. `// If lastWaiter is cancelled, clean out.`
11. `if (t != null && t.waitStatus != Node.CONDITION) {`
12. `// 去掉已经处于canceled状态的waiters,在清理时可能会改变lastWaiter的指向`
13. `unlinkCancelledWaiters();`
14. `// 重新将可能更新过的lastWaiter赋值给t`
15. `t = lastWaiter;`
16. `}`
17. `// 新创建一个waitStatus状态为condition的节点`
18. `Node node = new Node(Node.CONDITION);`
19. `if (t == null)`
20. `// 如果t为null,则代表当前等待队列为空,firstWaiter就是当前节点`
21. `firstWaiter = node;`
22. `else`
23. `// t指向的是lastWaiter,所以这里将当前节点放在队列的最尾部`
24. `t.nextWaiter = node;`
25. `// 将当前ConditionObject的lastWaiter指向node`
26. `lastWaiter = node;`
27. `return node;`
28. `}`
- isHeldExclusively方法调用的是ConditionObject外部类AQS对象中的方法,我们看下ReentrantLock类中的isHeldExclusively方法:
1. `protected final boolean isHeldExclusively() {`
2. `// While we must in general read state before owner,`
3. `// we don't need to do so to check if current thread is owner`
4. `return getExclusiveOwnerThread() == Thread.currentThread();`
5. `}`
该方法的作用主要是判断当前锁的独占线程是不是当前线程。
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease:
1. `/**`
2. `* Invokes release with current state value; returns saved state.`
3. `* Cancels node and throws exception on failure.`
4. `* @param node the condition node for this wait`
5. `* @return previous sync state`
6. `*/`
7. `final int fullyRelease(Node node) {`
8. `try {`
9. `// 这里调用的是AQS的getState方法`
10. `int savedState = getState();`
11. `// 调用的是AQS的release方法,如果成功则会释放当前线程所占有的锁并唤醒头节点的继任节点(如果存在的话),当然在 release方法内部又会调用tryRelease方法,该方法会根据公平锁和非公平锁有不同的实现(这个在上一篇中已经分析过。`
12. `if (release(savedState))`
13. `return savedState;`
14. `// 如果release失败,则会抛出异常,然后在catch块中将节点waitStatus修改为canceled状态`
15. `throw new IllegalMonitorStateException();`
16. `} catch (Throwable t) {`
17. `// 出现了异常之后将节点的waitStatus置为canceled状态`
18. `node.waitStatus = Node.CANCELLED;`
19. `throw t;`
20. `}`
21. `}`
java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue:
1. `// 当一个节点以初始状态被放入了一个condition队列中,现在正处于AQS同步队列中等待重新获取锁时返回true`
2. `final boolean isOnSyncQueue(Node node) {`
3. `// condition节点的初始waitStatus为CONDITION`
4. `// 当节点等待状态为condition或者节点的前置节点为null时返回false`
5. `if (node.waitStatus == Node.CONDITION || node.prev == null)`
6. `return false;`
7. `// 如果节点拥有后续节点,则表明该节点已经在AQS的同步队列中了`
8. `if (node.next != null) // If has successor, it must be on queue`
9. `return true;`
10. `//节点的前置节点不为空但是可能还没有在队列中的原因是通过cas将它添加到队列中的操作可能会失败。所以我们必须反转遍历(从尾节点开始遍历)来确保它实际上完成了入队。在此方法的调用中,它将始终靠近尾部,并且除非CAS失败(这不太可能),否则它将是在离尾部很近的地方,所以我们几乎没有遍历`
11. `return findNodeFromTail(node);`
12. `}`
13.
14. `// 当从后面的tail节点往前搜索时节点在同步队列中时返回true;会在isOnSyncQueue方法有需要的时候进行调用`
15. `private boolean findNodeFromTail(Node node) {`
16. `// We check for node first, since it's likely to be at or near tail.`
17. `// tail is known to be non-null, so we could re-order to "save"`
18. `// one null check, but we leave it this way to help the VM.`
19. `for (Node p = tail;;) {`
20. `// 先校验当前节点,因为它可能就在tail节点附近。找到了就表明在已经在同步队列中了`
21. `if (p == node)`
22. `return true;`
23. `// tail 是不能为空的,所以当遍历到空时代表需要重新排序来保存`
24. `if (p == null)`
25. `return false;`
26. `// 从后向前查找`
27. `p = p.prev;`
28. `}`
29. `}`
这个isOnSyncQueue主要用来判断当前节点是否在同步队列中,新添加的condition节点是不会在同步队列中的,它只位于condition队列中。
这里有必要区分下condition的等待队列和AQS的同步队列,因为可以不止一次的调用lock.newCondition方法,这说明AQS中不止维护了一个等待队列。object监视器上只能拥有一个同步队列和一个等待队列,而AQS却拥有一个同步队列,多个等待队列。具体如下图:
注:图片来源于网络。图片中上面是AQS的同步队列,下面是一个或多个condition队列。
condition wait状态下的中断
1. `/*`
2. `* For interruptible waits, we need to track whether to throw`
3. `* InterruptedException, if interrupted while blocked on`
4. `* condition, versus reinterrupt current thread, if`
5. `* interrupted while blocked waiting to re-acquire.`
6. `*/`
7.
8. `/** Mode meaning to reinterrupt on exit from wait */`
9. `private static final int REINTERRUPT = 1;`
10. `/** Mode meaning to throw InterruptedException on exit from wait */`
11. `private static final int THROW_IE = -1;`
两种中断模式:
- REINTERRUPT:wait状态下退出时重新尝试中断;
- THROW_IE:wait状态下退出时抛出InterruptedException。
唤醒
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal方法
1. `// 移动等待时间最长的线程,从conditon的等待队列到AQS的同步队列`
2. `public final void signal() {`
3. `// 如果当前线程没有拥有独占权则抛出异常`
4. `if (!isHeldExclusively())`
5. `throw new IllegalMonitorStateException();`
6. `// 用本地变量保存firstWaiter`
7. `Node first = firstWaiter;`
8. `if (first != null)`
9. `// 执行signal操作`
10. `doSignal(first);`
11. `}`
12.
13. `/**`
14. `* Removes and transfers nodes until hit non-cancelled one or`
15. `* null. Split out from signal in part to encourage compilers`
16. `* to inline the case of no waiters.`
17. `* @param first (non-null) the first node on condition queue`
18. `*/`
19. `// 删除并转移condition队列中的节点到AQS队列中,直到击中不可取消的节点或空值。从signal中分离出来去让编译器在没有waiters的时候进行内联优化`
20. `private void doSignal(Node first) {`
21. `do {`
22. `// 将firstWaiter指向first节点的nextWaiter`
23. `if ( (firstWaiter = first.nextWaiter) == null)`
24. `// firstWaiter为null时将lastWaiter也指向null`
25. `lastWaiter = null;`
26. `// 将first节点的nextWaiter节点置为null`
27. `first.nextWaiter = null;`
28. `// 循环着尝试将节点从condition队列移到AQS的同步队列中`
29. `} while (!transferForSignal(first) &&`
30. `(first = firstWaiter) != null);// 这里需要注意的是,如果头节点迁移失败,存在firstWaiter时会尝试唤醒这个firstWaiter,循环会继续`
31. `}`
32.
33.
34. `// 将一个节点从condition队列转移到AQS同步队列中去。成功时返回true`
35. `final boolean transferForSignal(Node node) {`
36. `// 如果改变节点状态失败则代表节点状态已经被改变了,会继续doSignal中的循环,如果节点存在下一个waiter,则尝试对节点的下一个waiter进行transferForSignal`
37. `if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))`
38. `return false;`
39.
40. `// 拼接到队列上并尝试设置前任节点的waitStatus以指示线程(可能)正在等待。如果取消或者尝试设置`
41. `// waitStatus失败,唤醒线程去重新同步(在这种情况下,waitStatus可能是出现了短暂而无害的错误)`
42. `Node p = enq(node); // 将node节点入队并返回node节点的前置节点即之前AQS的前置节点`
43. `int ws = p.waitStatus;`
44. `// ws>0代表p节点处于canceled状态,即已经被取消`
45. `// 尝试设置p节点为SIGNAL状态,设置失败时也会直接唤醒当前线程`
46. `if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))`
47. `// 唤醒当前线程`
48. `LockSupport.unpark(node.thread);`
49. `return true;`
50. `}`
doSignal主要做了以下几件事:1、将头节点从等待队列移除(如果头节点迁移失败,存在firstWaiter时会尝试唤醒这个firstWaiter,循环会继续);2、尝试将头节点状态由CONDITION改为0,即初始状态,如果失败并且节点存在下一个waiter,则尝试对节点的下一个waiter进行transferForSignal;3、将节点从同步队列尾部插入;4、在迁移的过程中如果遇到前置节点处于canceled状态或者waitStatus执行CAS将前置节点的状态改为Node.SIGNAL时失败的直接唤醒当前节点线程。这个方法的基础作用是唤醒当前condition的condition队列中等待时间最久的那个线程。
那么,在大多数情况下,竞争比较多时,condition的await方法的线程park状态是在哪里被唤醒的呢?
一个是前置节点出现上面第4种情况时,在transferForSignal方法中被唤醒,一个是在AQS的release方法中唤醒。如await方法中调用的fullyRelease方法。这两种都是很少量情况下的唤醒,注意一下,在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。
这时我们再回过头来看之前的await方法的代码:
1. `public final void await() throws InterruptedException {`
2. `// 如果线程中断了,则抛出异常`
3. `if (Thread.interrupted())`
4. `throw new InterruptedException();`
5. `// 将当前waitStatus为condition状态的节点添加到condition的waiter队列中(将会在其中clean掉已经处于canceled状态的节点`
6. `Node node = addConditionWaiter();`
7. `// 获取到当前aqs的state(当前线程对锁的所有占有权),释放掉当前线程对锁的所有占有权,并调用aqs的release方法唤醒当前aqs头节点的继承节点(如果有的话)`
8. `int savedState = fullyRelease(node);`
9. `int interruptMode = 0;`
10. `// 如果当前节点不在同步队列中,则会直接让当前线程进入等待状态,等待signal方法或者AQS的release方法唤醒(节点释放锁时会唤醒后继节点线程),while循环的作用是防止虚假唤醒,这一点在之前有写过专门的文章介绍过。`
11. `while (!isOnSyncQueue(node)) {`
12. `// 阻塞当前线程,进入等待状态。注意当前节点是在condition队列中的,在调用signal方法后会将condition队列中的节点迁移至AQS队列中,然后在队列中竞争在AQS的锁所有权。`
13. `LockSupport.park(this);//注意这里Park的blocker是当前aqs对象`
14. `if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)`
15. `break;`
16. `}`
17. `// 如果节点不在AQS同步队列中,没有进入上面的while循环,或者是在上面的循环中阻塞后被唤醒之后会进入这里`
18. `// 尝试获取锁的所有权或者在AQS队列中等待,具体代码分析见上一篇文章`
19. `if (acquireQueued(node, savedState) && interruptMode != THROW_IE)`
20. `interruptMode = REINTERRUPT;`
21. `// 如果node的nextWaiter不为null的时候会清理掉等待队列中的cancelled状态的节点`
22. `if (node.nextWaiter != null) // clean up if cancelled`
23. `unlinkCancelledWaiters();`
24. `if (interruptMode != 0)`
25. `reportInterruptAfterWait(interruptMode);`
26. `}`
- 阻塞等待的地方主要有两个:1. while (!isOnSyncQueue(node))循环中的LockSupport.park(this);2. acquireQueued的时候如果不能获取到锁也会进行LockSupport.park操作。
- while循环中包含LockSupport.park操作时,在unpark之前while循环还在不停地判断。
- 注意一下,在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。await方法中的park操作结束,接下来就会进入到acquireQueued中尝试让被唤醒的节点重新来竞争锁的占有权。
接着来看java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting方法:
1. `/**`
2. `* Checks for interrupt, returning THROW_IE if interrupted`
3. `* before signalled, REINTERRUPT if after signalled, or`
4. `* 0 if not interrupted.`
5. `*/`
6. `private int checkInterruptWhileWaiting(Node node) {`
7. `return Thread.interrupted() ?`
8. `(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :`
9. `0;`
10. `}`
11.
12. `// 在取消等待后,如有必要,将节点迁移到AQS队列中。`
13. `final boolean transferAfterCancelledWait(Node node) {`
14. `// cas 操作成功时则将节点入列,然后返回true`
15. `if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {`
16. `// 入队列`
17. `enq(node);`
18. `return true;`
19. `}`
20. `/*`
21. `* If we lost out to a signal(), then we can't proceed`
22. `* until it finishes its enq(). Cancelling during an`
23. `* incomplete transfer is both rare and transient, so just`
24. `* spin.`
25. `*/`
26. `// 如果我们没有被signal()方法唤醒,那么直到它完成enq()之前我们无法继续进行。在不完整的传输过程中取消是罕见且短暂的,因此只需自旋。直到node被加入到了sync队列中(也就是AQS队列),或许下一个signal()方法可以做到这一点`
27. `while (!isOnSyncQueue(node))`
28. `Thread.yield();`
29. `return false;`
30. `}`
这个方法主要用来在await方法中线程被唤醒之后判断是否是因为有interrupted事件导致的等待中止。如果是因为中断事件导致的等待中止,则需要在取消等待后尝试将节点迁移到AQS队列中。有一个罕见情况是,如果迁移不成功,判断到节点仍然不在AQS的sync队列中,则让当前线程自旋,直到下一个signal方法或其他方式将节点迁移到队列中之后停止。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
1. `/**`
2. `* Moves all threads from the wait queue for this condition to`
3. `* the wait queue for the owning lock.`
4. `*`
5. `* @throws IllegalMonitorStateException if {@link #isHeldExclusively}`
6. `* returns {@code false}`
7. `*/`
8. `public final void signalAll() {`
9. `// 线程是否拥有独占锁`
10. `if (!isHeldExclusively())`
11. `throw new IllegalMonitorStateException();`
12. `Node first = firstWaiter;`
13. `if (first != null)`
14. `doSignalAll(first);`
15. `}`
16.
17. `/**`
18. `* Removes and transfers all nodes.`
19. `* @param first (non-null) the first node on condition queue`
20. `*/`
21. `private void doSignalAll(Node first) {`
22. `// 将当前condition的condition队列的firstWaiter和lastWaiter都置为null`
23. `lastWaiter = firstWaiter = null;`
24. `do {`
25. `// 从头节开始,将condition队列中的所有节点都尝试迁移到AQS的sync队列中去`
26. `Node next = first.nextWaiter;`
27. `first.nextWaiter = null;`
28. `transferForSignal(first);`
29. `first = next;`
30. `} while (first != null);`
31. `}`
signalAll方法的操作与signal方法的操作一样,只是一个调用的是doSignalAll方法,另一个调用的是doSignal方法。与doSignal方法只尝试将等待时间最长的节点唤醒。而doSignalAll是会尝试唤醒所有的condition队列中的节点。
- await方法除了会让当前线程进行等待状态外,还会释放掉当前锁的占有权。await的其他方法无非是在此基础上添加了等待的条件,比如时间和中断等。
- signalAll方法的操作与signal方法的操作一样,只是一个调用的是doSignalAll方法,另一个调用的是doSignal方法。与doSignal方法只尝试将等待时间最长的节点唤醒。而doSignalAll是会尝试唤醒所有的condition队列中的节点。
- 在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。await方法中的park操作结束,接下来就会进入到acquireQueued中尝试让被唤醒的节点重新来竞争锁的占有权。
- 示例:
1. `ReentrantLock reentrantLock = new ReentrantLock(true);`
2. `Condition condition = reentrantLock.newCondition();`
3. `new Thread(new Runnable() {`
4. `@Override`
5. `public void run() {`
6. `reentrantLock.lock();`
7. `System.out.println("===================wait start===========");`
8. `try {`
9. `condition.await();`
10. `} catch (InterruptedException e) {`
11. `e.printStackTrace();`
12. `}`
13. `System.out.println("=====================wait end==============");`
14. `reentrantLock.unlock();`
15. `}`
16. `}).start();`
17.
18. `new Thread(new Runnable() {`
19. `@Override`
20. `public void run() {`
21. `reentrantLock.lock();`
22. `System.out.println("===================signal start===========");`
23. `condition.signal();`
24. `System.out.println("=====================signal end==============");`
25. `reentrantLock.unlock();`
26. `}`
27. `}).start();`
可以debug下源码进行分析。
