CyclicBarrier 也可以实现类似 CountDownLatch 的功能,而且比 CountDownLatch 更强大,因为 CyclicBarrier 可以重复被使用。
代码示例:
@Test public void test() throws InterruptedException { int parties = 3; // 定义一个线程池 // CyclicBarrier 中线程执行完成会阻塞等待其它线程到达屏障点,所以可用线程至少需要 parties 个 ExecutorService executor = Executors.newFixedThreadPool(6); CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> { System.out.println("任务都到达了屏障点,主线程继续执行任务"); }); IntStream.range(0, parties).forEach(i -> { executor.submit(() -> { System.out.println("条件" + (i + 1) + "正在执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("条件" + (i + 1) + "执行完成,到达屏障点"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); // parties 个线程到达屏障点后,接着执行构造方法传递的 barrierAction 任务 // 然后会调用 nextGeneration() 方法,唤醒所有到达屏障点的线程,再重置 count 的值为 parties // 所以可以接着执行下面的条件4 ~ 条件6的线程 // 但是没有控制线程执行的顺序,所以并不是条件1~条件3这三个线程先执行,条件4 ~ 条件6这三个线程后执行的 IntStream.range(parties, parties + parties).forEach(i -> { executor.submit(() -> { System.out.println("条件" + (i + 1) + "正在执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("条件" + (i + 1) + "执行完成,到达屏障点"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); Thread.sleep(10000); executor.shutdown(); /** * 可能的执行结果: * * 条件1正在执行 * 条件2正在执行 * 条件3正在执行 * 条件4正在执行 * 条件6正在执行 * 条件5正在执行 * 条件6执行完成,到达屏障点 * 条件5执行完成,到达屏障点 * 条件3执行完成,到达屏障点 * 条件4执行完成,到达屏障点 * 任务都到达了屏障点,主线程继续执行任务 * 条件1执行完成,到达屏障点 * 条件2执行完成,到达屏障点 * 任务都到达了屏障点,主线程继续执行任务 */ }
reset() 方法代码示例:
@Test public void test2() throws InterruptedException { int parties = 3; // 定义一个线程池 // CyclicBarrier 中线程执行完成会阻塞等待其它线程到达屏障点,所以可用线程至少需要 parties 个 ExecutorService executor = Executors.newFixedThreadPool(6); CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> { System.out.println("任务都到达了屏障点,主线程继续执行任务"); }); IntStream.range(0, parties).forEach(i -> { executor.submit(() -> { System.out.println("条件" + (i + 1) + "正在执行"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("条件" + (i + 1) + "执行完成,到达屏障点"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 调用 reset 方法会将当前代终止,如果主线程调用 reset 方法时,有线程阻塞在 dowait 方法中 // 那么已经被阻塞的线程被唤醒时,会抛出 BrokenBarrierException 异常,未阻塞的线程不会跑出异常 // 测试 reset() 方法 System.out.println("执行 reset 方法"); cyclicBarrier.reset(); IntStream.range(parties, parties + parties).forEach(i -> { executor.submit(() -> { System.out.println("条件" + (i + 1) + "正在执行"); try { Thread.sleep(i * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("条件" + (i + 1) + "执行完成,到达屏障点"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); Thread.sleep(10000); executor.shutdown(); /** * 可能的执行结果 1: * * 条件1正在执行 * 条件3正在执行 * 条件2正在执行 * 执行 reset 方法 * 条件3执行完成,到达屏障点 * 条件1执行完成,到达屏障点 * java.util.concurrent.BrokenBarrierException * at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250) * at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362) * at com.github.wuchao.webdemo.core.concurrent.CyclicBarrierDemo.lambda$null$6(CyclicBarrierDemo.java:116) * at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) * at java.util.concurrent.FutureTask.run(FutureTask.java:266) * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) * at java.lang.Thread.run(Thread.java:748) * 条件5正在执行 * 条件6正在执行 * 条件4正在执行 * 条件2执行完成,到达屏障点 * 条件4执行完成,到达屏障点 * 任务都到达了屏障点,主线程继续执行任务 * 条件5执行完成,到达屏障点 * 条件6执行完成,到达屏障点 * * * 可能的执行结果2: * * 条件1正在执行 * 条件3正在执行 * 条件2正在执行 * 执行 reset 方法 * 条件4正在执行 * 条件5正在执行 * 条件2执行完成,到达屏障点 * 条件3执行完成,到达屏障点 * 条件1执行完成,到达屏障点 * 任务都到达了屏障点,主线程继续执行任务 * 条件6正在执行 * 条件4执行完成,到达屏障点 * 条件5执行完成,到达屏障点 * 条件6执行完成,到达屏障点 * 任务都到达了屏障点,主线程继续执行任务 */ }
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ // 任务线程数(这些线程全部执行完成后(即到达屏障点),才能继续执行主线程任务) private final int parties; /* The command to run when tripped */ // 每当 parties 个任务线程都完成后(即 parties 个线程都到达了屏障点),会执行一次 barrierCommand 任务 // 构造函数如果不传 barrierCommand,则不执行 private final Runnable barrierCommand; /** The current generation */ // 当前代 private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ // parties 线程中,当前正在执行的任务线程数(即当前还有多少线程未到达屏障点) // 每执行完一个任务后,count 减一 private int count;
private static class Generation { boolean broken = false; } // 创建下一代 private void nextGeneration() { // 唤醒上一代所有阻塞在 trip 条件的线程 trip.signalAll(); // 重置 count 为 parties count = parties; // 重置当前代 generation = new Generation(); }
// parties 表示有 parties 个线程任务全部执行完后(即到达屏障点),主线程才能继续执行 // barrierAction 表示所有 parties 线程都到达屏障点后,需要执行的一个任务 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
// 终止屏障当前代 private void breakBarrier() { // 设置当前代的终止标记为 true generation.broken = true; // 重置 count count = parties; // 唤醒所有阻塞在 trip 条件的线程 trip.signalAll(); }
// 阻塞当前线程(不带超时时间) public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // 阻塞当前线程(带超时时间) public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } // 真正阻塞线程的逻辑 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 先获取锁 lock.lock(); try { // 获取当前代 final Generation g = generation; // 当前代已被终止 if (g.broken) // 则抛出 BrokenBarrierException 异常 throw new BrokenBarrierException(); // 线程被中断 if (Thread.interrupted()) { // 则终止屏障 breakBarrier(); // 并抛出 InterruptedException 异常 throw new InterruptedException(); } // count 自减 1 int index = --count; // index 等于 0 表示 parties 个线程都已完成了(都到达了屏障点) if (index == 0) { // tripped boolean ranAction = false; try { // 获取构造方法传的 barrierCommand final Runnable command = barrierCommand; // 如果 barrierCommand 不等于 null if (command != null) // 则执行 barrierCommand 的 run() 方法(最后一个到达屏障点的线程执行的此方法) command.run(); ranAction = true; // 开始下一代 nextGeneration(); return 0; } finally { // 如果执行 barrierCommand 出现异常 if (!ranAction) // 则也终止屏障 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 自旋等待,除非所有线程都到达了屏障点、屏障被终止、线程被中断或者等待超时 for (;;) { try { // 没有设置超时时间 if (!timed) // 则阻塞等待 trip.await(); else if (nanos > 0L) // 设置了超时时间,超时等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 判断当前代是否已被终止 // 如果调用了 breakBarrier() 方法或 reset() 方法(reset 方法里也是调用了 breakBarrier 方法)会唤醒当前阻塞的线程,然后走到这一步,此时 g.broken 是 true if (g.broken) // 抛出 BrokenBarrierException 异常 throw new BrokenBarrierException(); // g != generation,表示已经创建了下一代 // 到达屏障点,会唤醒所有阻塞的线程,并调用 nextGeneration() 方法,然后走到这一步,此时 g != generation if (g != generation) return index; // 等待已超时 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放锁 lock.unlock(); } }
代码中有好几处都抛出了异常,总结一下:
// 判断屏障当前代是否已被终止 public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
public void reset() { final ReentrantLock lock = this.lock; // 先获取锁 lock.lock(); try { // 终止屏障当前代 breakBarrier(); // break the current generation // 开始下一代 nextGeneration(); // start a new generation } finally { // 释放锁 lock.unlock(); } }