多线程并发是Java语言中非常重要的一块内容,同时,也是Java基础的一个难点。说它重要是因为多线程是日常开发中频繁用到的知识,说它难是因为多线程并发涉及到的知识点非常之多,想要完全掌握Java的并发相关知识并非易事。也正因此,Java并发成了Java面试中最高频的知识点之一。本系列文章将从Java内存模型、volatile关键字、synchronized关键字、ReetrantLock、Atomic并发类以及线程池等方面来系统的认识Java的并发知识。通过本系列文章的学习你将深入理解volatile关键字的作用,了解到synchronized实现原理、AQS和CLH队列锁,清晰的认识自旋锁、偏向锁、乐观锁、悲观锁…等等一系列让人眼花缭乱的并发知识。
本文是Java并发系列的第五篇文章,将深入分析Java的唤醒与等待机制。
这一次,彻底搞懂Java内存模型与volatile关键字
这一次,彻底搞懂Java中的synchronized关键字
这一次,彻底搞懂Java中的ReentranLock实现原理
这一次,彻底搞懂Java并发包中的Atomic原子类
深入理解Java线程的等待与唤醒机制(一)
深入理解Java线程的等待与唤醒机制(二)
上篇文章我们从“生产者-消费者”模型出发,深入的分析了wait和notify/notifyAll的底层实现。并且了解到生产者线程与消费者线程在调用wait时都会被加入到synchronized锁对象monitor的WaitSet队列中。那么在唤醒线程的时候就无法准确的唤醒某一类线程。而在这一次,彻底搞懂Java中的ReentranLock实现原理这一篇文章中我们认识了更为灵活地显式锁ReentranLock。ReentranLock与synchronized类似,也有一套类似wait与notify/notifyAll的等待唤醒机制–Condition。本篇文章我们就来深入的认识ReentranLock的Condition与线程的等待与唤醒机制。
在[这一次,彻底搞懂Java中的ReentranLock实现原理]中关于Condition其实也有所提及,在使用Lock来保证线程同步时,我们可以使用Condition来协调线程间的协作。相比synchronize的监视器锁,Condition提供了更加灵活和精确的线程控制。它的最大特点是可以为不同的线程建立多个Condition,从而达到精确控制某一些线程的休眠与唤醒。
Condition是一个接口,内部主要提供了一些线程休眠与唤醒相关的方法,代码如下:
public interface Condition { // 使当前线程进入等待状态,可以相应中断请求 void await() throws InterruptedException; // 使当前线程进入等待状态,不响应中断请求 void awaitUninterruptibly(); // 使当前线程进入等待状态,直到被唤醒或中断,或者经过指定的等待时间。nanosTimeout单位纳秒 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同awaitNanos方法,可以指定时间单位 boolean await(long time, TimeUnit unit) throws InterruptedException; // 使线程进入等待状态,直到被被唤醒或者中断,或者到截止的时间 boolean awaitUntil(Date deadline) throws InterruptedException; // 唤醒一个等待在Condition上的线程,与notify功能类似 void signal(); // 唤醒所有等待在Condition上的线程,与notifyAll类似 void signalAll(); }
Condition的实现类是在AQS中的ConditionObject,关于ConditionObject我们后边再看,接下来看下如何使用Condition来实现线程的等待与唤醒。
仍然以“生产者-消费者”模式来看Condition的使用,沿用上篇文章生产面包的例子,稍加改动后的面包容器类如下:
public class BreadContainer { LinkedList<Bread> list = new LinkedList<>(); private final static int CAPACITY = 10; Lock lock = new ReentrantLock(); private final Condition providerCondition = lock.newCondition(); private final Condition consumerCondition = lock.newCondition(); public void put(Bread bread) { try { lock.lock(); while (list.size() == CAPACITY) { try { // 如果容器已满,则阻塞生产者线程 providerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(bread); // 面包生产成功后通知消费者线程 consumerCondition.signalAll(); System.out.println(Thread.currentThread().getName() + " product a bread" + bread.toString() + " size = " + list.size()); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void take() { try { lock.lock(); while (list.isEmpty()) { try { // 如果容器为空,则阻塞消费者线程 consumerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } Bread bread = list.removeFirst(); // 消费后通知生产者生产面包 providerCondition.signalAll(); System.out.println("Consumer " + Thread.currentThread().getName() + " consume a bread" + bread.toString() + " size = " + list.size()); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
可以看到,在上述代码中我们声明了两个Condition,一个生产者Condition,一个消费者Condition。在put方法中使用ReentranLock来实现同步,同时,当容器满时调用生产者Condition的await方法使生产者线程进入等待状态。如果生成成功,则调用消费者Condition的signalAll方法来唤醒消费者线程。take方法与put类似,不再赘述。这里要注意的是在使用Condition前必须先获得锁。
生产者消费者类与synchronize的实现一致,代码如下:
// 生产者 public class Producer implements Runnable { private final BreadContainer container; public Producer(BreadContainer container) { this.container = container; } @Override public void run() { container.put(new Bread()); } } // 消费者 public class Consumer implements Runnable { private final BreadContainer container; public Consumer(BreadContainer container) { this.container = container; } @Override public void run() { container.take(); } }
那接下来测试类我们仍然实例化多个生产者线程与多个消费者线程,如下:
public class Test { public static void main(String[] args) { BreadContainer container = new BreadContainer(); new Thread(() -> { for (int i = 0; i < 100; i++) { new Thread(new Producer(container)).start(); } }).start(); new Thread(() -> { for (int i = 0; i < 100; i++) { new Thread(new Consumer(container)).start(); } }).start(); } }
运行后生产者线程与消费者线程可以很好的实现线程协作。与使用synchronized不同的是这里有两个Condition,分别来控制生产者和消费者。
接下来,我们分析一下Condition的实现原理
上一章中我们已经知道Condition仅仅是一个接口,它的具体实现是在AQS的内部类ConditionObject中。调用ReentranLock的newCondition实际上就是实例化了一个ConditionObject,代码如下:
// ReentranLock#Sync final ConditionObject newCondition() { return new ConditionObject(); }
可见,在第一章BreadContainer中的providerCondition与consumerCondition是两个不同的ConditionObject实例。
ConditionObject的类结构如下:
public class ConditionObject implements Condition, java.io.Serializable { // 指向等待队列的头结点 private transient Node firstWaiter; // 指向等待队列的尾结点 private transient Node lastWaiter; public ConditionObject() { } }
ConditionObject的结构比较简单,它内部维护了一个Node类型等待队列(这里注意与AQS中的同步队列区分)。其中firstWaiter指向队列的头结点,而lastWaiter指向队列的尾结点。关于Node节点,在ReentranLock那篇文章中已经详细介绍过了,它封装的是一个线程的节点,这里也不再赘述。在线程中调用了Condition的await方法后,线程就会被封装成一个Node节点,并将Node的waitStatus设置成CONDITION状态,然后插入到这个Condition的等待队列中。等到收到singal或者被中断、超时就会被从等待队列中移除。其结构示意图如下:
接下来我们从源码的角度来分析Condition的实现。
public final void await() throws InterruptedException { // 如果线程被标记位中断状态,则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程封装成一个Node节点,并添加到等待队列 Node node = addConditionWaiter(); // 释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // 判断当前node是否在同步队列中,注意如果不在同步队列,则是一个阻塞的死循环 while (!isOnSyncQueue(node)) { // 不在同步队列中,则挂起线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 到这里说明节点已加入到同步队列中,调用acquireQueued开始竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // 清理被标记为CANCLLED状态的节点 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
在wait方法中首先会调用addConditionWaiter方法将线程封装成一个Node节点,并加入到等待队列中。addConditionWaiter的代码如下:
private Node addConditionWaiter() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node t = lastWaiter; // 清除CANCLLED状态的lastWaiter节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 实例化一个Node节点,并标记为CONDITION状态 Node node = new Node(Node.CONDITION); // 将node加入到等待队列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
addConditionWaiter方法的逻辑比较简单,就是将线程封装成Node并加入等待队列的操作。加入队列后,await方法又调用了fullyRelease去释放锁,在fullyRelease方法中会将state置为0,代码如下:
final int fullyRelease(Node node) { try { // 获取AQS中的state int savedState = getState(); // 调用release释放锁 if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { // 释放失败则将节点置为CANCELLED状态 node.waitStatus = Node.CANCELLED; throw t; } }
这个方法主要是调用了release方法来释放锁,如果释放失败,则将节点置为CANCELLED状态。关于release这个方法在ReentranLock中已经分析过,这里不再赘述。
释放锁之后,开启while来调用isOnSyncQueue方法,这个方法是用来判断当前节点是否在同步队列中。如果不在同步队列,则会进入自旋,并阻塞线程,等待节点进入同步队列。isOnSyncQueue的代码如下:
final boolean isOnSyncQueue(Node node) { // 如果waitStatus是CONDITION状态或者node的前驱节点是null,说明该节点在等待队列中,而非同步队列。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果node.next不为null,则一定在同步队列 if (node.next != null) return true; // 如果前面没有确定node是否在同步队列,则遍历同步队列查看是否存在node节点 return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { // tail即同步队列的队尾,从队尾遍历并与node对比 for (Node p = tail;;) { if (p == node) return true; if (p == null) return false; p = p.prev; } }
如果isOnSyncQueue返回了true,那么说明该node节点已经进入同步队列中了,则会结束自旋并调用acquireQueued,关于acquireQueued在ReentranLock文章中已经详细分析过了,即一个获取锁的操作。
总的来说,调用await方法会让线程进入等待队列,并释放锁。当等待队列中的节点被唤醒时,会将节点移入到同步队列,然后await结束自旋,并调用acquireQueued来获取锁。
这里我们选用signal方法来分析,signal方法类似Object中的notify方法,调用signal方法会将等待队列的首节点移入同步队列并唤醒。它的实现相比await来说比较简单,看下代码:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 唤醒等待队列的第一个节点 doSignal(first); } final boolean transferForSignal(Node node) { Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
在signal中会拿到等待队列的首节点并调用doSignal方法将其唤醒,doSignal代码如下:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 尝试唤醒等待队列的首节点,如果唤醒失败则继续尝试 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal方法中是一个循环唤醒等待队列首节点的操作,核心方法是transferForSignal,代码如下:
final boolean transferForSignal(Node node) { // 如果当前节点状态为CONDITION,则CAS将状态改为0,准备加入同步队列,如果状态不为CONDITION,则说明线程被中断,返回false,然后唤醒当前节点的后继节点 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; // 将节点加入到同步队列,并返回同步队列的先驱节点 Node p = enq(node); int ws = p.waitStatus; // waitStatus>0为取消状态,则CAS尝试修改成SINGAL状态 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 如果修改状态失败,那么久直接唤醒当前线程 LockSupport.unpark(node.thread); return true; } private Node enq(Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); } } }
transferForSignal实际上就是做了一个队列的转移,将node从等待队列移动到了同步队列。进入同步队列后,在wait方法中的自旋操作便能检测到node节点的状态,从而执行acquireQueued方法拿锁。
总的来说signal方法会从等待队列的队首开始,尝试唤醒队首线程,如果该节点是CANCELLED状态,则继续唤醒下一个。当节点被唤醒后会将其加入到同步队列,接着wait方法停止自旋执行acquireQueued方法。
通过对Condition的await与signal方法的分析,可以看得出来这两个方法并非独立存在,而是一个相互配合的关系。await方法会将执行的线程封装成Node加入到等待队列,然后开启一个循环检测这个node看是否被加入到了同步队列,如果被加入到同步队列,那么调用acquireQueued继续竞争锁,如果没有被加入同步队列,则会一直等待。而signal方法则是将等待队列中的队首元素移动到同步队列,这样就出发了await方法的循环终结,继而能够执行acquireQueued方法。其流程如下图所示:
关于Java线程的等待与唤醒机制,到这里就全部结束了,通过本篇文章的学习,更加深入的了解了线程等待与唤醒的原理,其实可以看得出来无论synchronized监视器锁的等待与唤醒还是Lock锁的等待与唤醒都有着类似的原理,只不过synchronized是虚拟机底层实现,而ReentranLock是基于Java层的实现。