CountDownLatch 类似于 Thread 的 join 方法,使用时,先构造 CountDownLatch 对象,构造函数传线程数据数 n,表示等待这 n 个线程都完成后再执行主线程代码。
主线程使用 await 方法阻塞等待 n 个线程执行完成;n 个线程执行完成后调用 countDown() 方法,表示完成了一个线程,当 n 个线程都调用了一次 countDown() 方法后,await 阻塞的主线程就会被唤醒继续执行。
Java8 的 CompletableFuture 也可以用来代替 CountDownLatch。
代码示例:
@Test public void test() throws InterruptedException { // 定义一个线程池 ExecutorService executor = Executors.newFixedThreadPool(2); // CountDownLatch 用来作为一个开关,控制主线程阻塞,直到几个异步线程都完成后,主线程再接着执行 // CountDownLatch 的构造函数传参是一个 int 型数据,等待几个异步线程,就传异步线程的数量即可 // 比如,当前线程需要等待 3 个异步线程先执行完成后,再执行,那么传参为 3 int count = 3; CountDownLatch countDownLatch = new CountDownLatch(count); IntStream.range(0, count).forEach(i -> { executor.submit(() -> { System.out.println("条件" + (i + 1) + "正在执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("条件" + (i + 1) + "执行完成"); // 线程执行完成后,需要调用 countDown() 方法,告诉主线程当前有一个线程完成了 countDownLatch.countDown(); }); }); System.out.println("主线程阻塞等待条件完成"); // 主线程等待条件完成,当所有线程都调用了 countDown() 后阻塞的主线程就会被唤醒,继续往下执行 countDownLatch.await(); System.out.println("条件都已完成,继续执行主线程逻辑"); executor.shutdown(); /** * 执行结果: * * 主线程阻塞等待条件完成 * 条件2正在执行 * 条件1正在执行 * 条件1执行完成 * 条件3正在执行 * 条件2执行完成 * 条件3执行完成 * 条件都已完成,继续执行主线程逻辑 */ }
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 共享线程数 Sync(int count) { setState(count); } int getCount() { return getState(); } // 尝试获取共享锁 protected int tryAcquireShared(int acquires) { // state == 0 时返回 1,否则返回 -1 return (getState() == 0) ? 1 : -1; } // 尝试释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync;
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
AQS 共享锁参考 ReentrantLock。
// CountDownLatch 构造方法,将传的参数 count 赋值给 state // 每次调用 countDown 方法时,会释放一个锁,即 state 减一 // 如果主线程调用了 await 时会去判断,如果 state > 0,那么主线程就会被阻塞 // 等待 count 个线程都调用了 countDown 方法后,才会唤醒主线程 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
// 线程释放一个共享锁,state 减一 public void countDown() { // 会去唤醒阻塞的主线程 // 主线程被唤醒后,如果条件不符合(state > 0)主线程还是会继续阻塞,直到最后一个线程调用了 countDown // 此时 state == 0,主线程被唤醒后不会再被阻塞 sync.releaseShared(1); }