JDK1.8 api文档对CountDownLatch的说明如下:
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
A CountDownLatch
用给定的计数初始化。 await
方法阻塞,直到由于countDown()
方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await
调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier
。
A CountDownLatch
是一种通用的同步工具,可用于多种用途。 一个CountDownLatch
为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await
在门口等待,直到被调用countDown()
的线程打开。 一个CountDownLatch
初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。
CountDownLatch
一个有用的属性是,它不要求调用countDown
线程等待计数到达零之前继续,它只是阻止任何线程通过await
,直到所有线程可以通过。
我认为阅读源码前,有两个必须要了解的:这个东西的有什么用,他大概是怎么实现的。先有一个整体的思路去分析源码会清晰很多。
我们假设有如下场景:
打开游戏前,需要检测游戏地图资源、游戏角色资源、游戏音乐资源;当这些必备资源都检测无误后方可进入游戏。
那我们可以先去检测地图资源,当地图资源加载完后再去检测角色资源、加载完毕后再去检测音乐资源,加载完毕后进入游戏。
当然如果拥有多线程就可以提高这些检测资源速度,我们开启三个线程分别去检测角色、音乐、地图,当这些都加载好了即可进入游戏。
那这个时候怎么去判断这些资源全部都检测好了呢?这时候就可以用CountDownLatch:
public class Game { public static void main(String[] args) { //定义容量为3 CountDownLatch countDownLatch=new CountDownLatch(3); //进入游戏的主线程 new Thread(()->{ System.out.println("等待资源检测中。。。"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("资源检测完毕,进入游戏"); },"游戏主线程").start(); //检测音乐线程 new Thread(()->{ System.out.println("正在检测音乐资源中。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("音乐资源检测无误"); countDownLatch.countDown(); },"检测音乐").start(); //检测角色线程 new Thread(()->{ System.out.println("正在检测角色资源中。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("角色资源检测无误"); countDownLatch.countDown(); },"检测角色").start(); //检测地图线程 new Thread(()->{ System.out.println("正在检测地图资源中。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("地图资源检测无误"); countDownLatch.countDown(); },"检测地图").start(); } }
等待资源检测中。。。 正在检测音乐资源中。。。 正在检测角色资源中。。。 正在检测地图资源中。。。 音乐资源检测无误 角色资源检测无误 地图资源检测无误 资源检测完毕,进入游戏
根据上面案例 我们CountDownLatch一共用了三个函数
在分析这三个函数前,我们带着这几个问题:
先看看构造函数:内部只有一个有参构造函数:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); //Sync这个类不陌生吧 在ReentrantLock里面也见过他。 this.sync = new Sync(count); }
同样的Sync也是继承了AQS类,AQS类这边我们就不丢代码了;需要对AQS最基本的了解:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
CountDownLatch构造函数在AQS里的state字段里设置了一个int类型的数。
在ReentrantLock这边,当持有锁的线程再次去请求锁的时候就会让state++,比如重入了5次,那我们就需要这个线程去释放5次锁让state=0的时候别的线程才能进来抢。
所以这边我们就可以大概分析出:countDown()方法就是让这个state-1,当state==0的时候就让await()执行。await()方法就是如果state!=0的时候就park起来。
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
我们先看看tryReleaseShared():
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 让state-1的一个操作,这边用了个死循环。如果return true的话就会执行doReleaseShared()去唤醒同步队列的节点 for (;;) { int c = getState(); if (c == 0) // 这边可能会有个疑问 为什么c==0的时候返回false? 下面nextc==0的时候就返回true? // 在这个循环里面只有当cas成功后才会退出,那如果进入到这个条件里,就代表这个state不是被因为该线程进行释放后等0的,那既然别的线程让state--后等于0,就会去唤醒。所以这边返回false,防止重复唤醒。 return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
当返回true后执行的唤醒:doReleaseShared();这个方法我们放到await()里讲。
这里大概的逻辑了解了,调用每次countDown会让state-1(不区别线程,同个线程调用两次countDown()就会让state-2)
当state=0的时候就需要去唤醒阻塞的线程 也就是调用await的线程。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //当state!=0的时候 需要挂起 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //addWaiter 将该节点加入到同步队列中 //这边跟ReentrantLock有所不同 ReentrantLock那边是addWaiter(Node.EXCLUSIVE) //这里设置成SHARED会有什么效果呢 带着疑问继续看下去 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //这里这个死循环很重要 for (;;) { //获取刚刚加入到aqs线程的node的前驱节点 final Node p = node.predecessor(); //如果他的前一个节点是头节点 if (p == head) { //再去判断一下state是不是为0 如果是state=0返回1 否则返回-1 int r = tryAcquireShared(arg); if (r >= 0) { //这时候就需要把当前节点设置成头节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //如果不是头节点的话 那就找到合适的位置插入,插入完毕后会将这个节点的前驱节点设置为SIGNAL 也就是-1 代表着这个节点如果释放时需要去唤醒下一个节点 //node在上面addWaite后在同步队列的末尾 //因为当线程取消或者中断的话 持有线程的node是不会执行的也就是无意义的,那就需要替换成有意义的节点 //例如 head->A(-1)<=>B(-1)<=>C(1)<=>D(1)<=>node(0)<-tail //括号里代表节点waitStatus,经过方法会变成: //head->A(-1)<=>B(-1)<=>node(0)<-tail 会将waitStatus>0的节点清空掉 //记住park不会释放资源,只会阻塞当前线程的执行,当调用unpark的时候,这个线程还会在这个循环题内。 //想要退出这个死循环 只有要么这个线程抛出中断异常 要么就是把这个线程设置为头节点 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //设置为头节点 setHead(node); //propagate在上面调用的方法中 传入的只能是1 所以直接进入if内 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //在CountDownLatch中加入的节点都是SHARED,那唤醒这类型的节点的时候还需要进行传播唤醒后续的节点 if (s == null || s.isShared()) doReleaseShared(); } }
final boolean isShared() { return nextWaiter == SHARED; }
我们看看他是如何去传播唤醒下一个节点的
private void doReleaseShared() { for (;;) { //这边这个死循环也很关键 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //调用unpark去唤醒当前节点的下一个节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //当头节点发生变化的时候就会继续唤醒下一个节点 //这时候就有疑问了 这个方法里也没有去改变h变量的操作呀? //其实当unpark后,队列里的第二个节点被唤醒 那就会继续执行park时候的下一行代码,也就是doAcquireSharedInterruptibly方法。当那个节点被唤醒的时候他还在那个for的死循环内,同样的判断到前驱节点是head跟state=0,就把自己设置成头节点,所以在这边head就会发生改变,就再次进入循环内继续唤醒下个节点。 if (h == head) // loop if head changed break; } }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒后会到park的方法内后继续执行,也就是doAcquireSharedInterruptibly里。 LockSupport.unpark(s.thread); }
保持前行!