countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
它是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
countDownLatch类中只提供了一个构造器:
//参数count为计数值 public CountDownLatch(int count) { };
三个重要方法:
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public void await() throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //将count值减1 public void countDown() { };
public void multiThreadTest() { final CountDownLatch begin = new CountDownLatch(1); // 控制线程开始 final CountDownLatch end = new CountDownLatch(30); // 控制线程结束 final ExecutorService exec = Executors.newFixedThreadPool(15); // 线程池管理线程 for (int j = 0; j < 30; j++) { exec.submit(new ServiceThread(begin, end, j)); // 多个线程运动员准备中 } begin.countDown(); // 发送开始号令,多线程运动员们开始运行 try { end.await(); // 等待子线程全都运行完成之后,执行后续动作 } catch (InterruptedException e) { log.error(e.getMessage()); } exec.shutdown(); System.out.println("Done"); } @Data @AllArgsConstructor public class ServiceThread implements Runnable { private CountDownLatch begin; private CountDownLatch end; private int j; @Override public void run() { try { begin.await(); // 线程运动员准备中,阻塞在这等待其它运动员准备完成,然后等待号令begin.countDown()之后开始运行 for (int i = 0; i < 10; i++) { System.out.println("j是" +j+",i是"+i); } } catch (Exception e) { } finally { /* * 一个子线程运动员完成了,但主线程会阻塞在end.await()这里, * 当end这个减为0时,将越过end.await()方法,执行后续操作。 * */ end.countDown(); } } }
1、创建计数器
当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数;
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);//创建同步队列,并设置初始计数器值 }
2、阻塞线程
当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
判断计数器是计数完毕,未完毕则把当前线程加入阻塞队列
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
构建阻塞队列的双向链表,挂起当前线程
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新建节点加入阻塞队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获得当前节点pre节点 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//返回锁的state if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //重组双向链表,清空无效节点,挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。
public void countDown() { //递减锁重入次数,当state=0时唤醒所有阻塞线程 sync.releaseShared(1); }
public final boolean releaseShared(int arg) { //递减锁的重入次数 if (tryReleaseShared(arg)) { doReleaseShared();//唤醒队列所有阻塞的节点 return true; } return false; } private void doReleaseShared() { //唤醒所有阻塞队列里面的线程 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始 continue; unparkSuccessor(h);//成功则唤醒线程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }