CyclicBarrier也是一种线程同步工具,用于多个线程之间的同步,也是适用于一个线程等待多个线程。和CountDownLatch相比,CyclicBarrier有多个改进:
1、CyclicBarrier可以循环利用
2、CyclicBarrier中的线程的同步更加严谨。CountDownLatch中的线程在countDown()后就会执行代码,而CyclicBarrier中的线程会一直阻塞,直到被同步的方法调用,所以说CyclicBarrier中的线程的同步程度更高。
package juc; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier { public static void main(String[] args) { final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("线程集合集合完毕"); } }); new Thread(){ @Override public void run() { while (true){ try { System.out.println("线程1开始执行"); Thread.sleep(5000); System.out.println("线程1执行完毕"); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }.start(); new Thread(){ @Override public void run() { while (true){ try { System.out.println("线程2开始执行"); Thread.sleep(5000); System.out.println("线程2执行完毕"); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }.start(); } }
如上述例子所述,CyclicBarrier中分为两种线程:
1、同步执行的n个线程
2、等待n个线程完成后才开始执行的线程,我们叫他最后线程吧
从上述结构图看,CyclicBarrier主要是利用ReentrantLock来保证同步的。
1、初始化
final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("线程集合集合完毕"); } }); public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); //对应的线程个数 this.parties = parties; //当前未阻塞的线程个数 this.count = parties; //执行的线程 this.barrierCommand = barrierAction; } private static class Generation { boolean broken = false; } //独占锁 private final ReentrantLock lock = new ReentrantLock(); //因为CyclicBarrier是重复可利用的,一轮代表一代 private Generation generation = new Generation(); //当调用await后,会被阻塞。 private final Condition trip = lock.newCondition();
初始化代码调用的构造方法需要传入一个count和一个Runnable对象。
count对应着线程数量,Runnable对应着最后待同步的线程!
await()
在阅读原码前,朋友们先明白CyclicBarrier的一个机制:CyclicBarrier是可循环利用的,每个循环对应着一个年代,对应着CyclicBarrier就内置了一个Generation类,用来描述当前年代的状态。
private static class Generation { //n+1个线程中只要有一个被中断或者出现超时,就将broken设置为true boolean broken = false; }
带着这个概念去阅读源码,思路就十分清晰了
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { final Generation g = generation; //有线程超时了,或者是被中断了 if (g.broken) throw new BrokenBarrierException(); //如果线程被中断了,将所有线程唤醒,可中断 if (Thread.interrupted()) { //唤醒所有等待的线程,重置count,并将g.broken=true breakBarrier(); //抛出异常 throw new InterruptedException(); } //await后调用后,count-- int index = --count; if (index == 0) { //count==0,说明所有线程都调用await()了,可以唤醒那n-1个线程了。 //是否执行最后线程了 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) //开始执行 command.run(); //执行最后线程了 ranAction = true; //释放所有等待在trip中condition queue中的线程 nextGeneration(); return 0; } finally { //如果执行ranAction失败,说明出现异常 if (!ranAction) //上述代码出现异常,执行breakBarrier breakBarrier(); } } //运行到这里的代码,都是n个线程中的n-1个线程 for (;;) { try { //阻塞式调用 if (!timed) //阻塞将线程挂起到condition queue trip.await(); //调用的是await(timeout),设置的有超时时间 else if (nanos > 0L) //最多阻塞nanos时间将线程挂起到condition queue nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //在等待的过程中,如果线程被中断,如果其他线程还未调用breakBarrier,那么当前线程就调用 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //如果其他线程已经breakBarrier,就直接自我中断就ok了 Thread.currentThread().interrupt(); } } //如果正常流程被打断,抛出BrokenBarrierException if (g.broken) throw new BrokenBarrierException(); //如果正常的更新换代的话,返回index if (g != generation) return index; //如果await超时的话,也会breakBarrier,抛出TimeoutException if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void breakBarrier() { //n+1个线程当中有线程被中断,或者await请求超时了,就将当前这个代打断,唤醒所有线程,各个线程自己单独处理这个异常 //设置broken为true generation.broken = true; //重置count count = parties; // 唤醒等待在trip的condition queue中的全部线程 trip.signalAll(); } private void nextGeneration() { //当前这轮线程全部执行成功,重置年代,开始下一轮同步 // 唤醒等待在trip的condition queue中的全部线程 trip.signalAll(); // count 重置为 parties count = parties; //generation也重新初始化 generation = new Generation(); }
1、CyclicBarrier的底层是通过一个ReentrantLock、一个int变量count、一个Condition、一个Generation来实现的。
流程:
1、首先初始化一个ReentranLock对象、将count变量赋值给parties,初始化一个Condition、初始化一个Generation,Generation中只有一个broken布尔两对应着当前的同步状态是否被破坏,将Runnable对象保存给barrierCommand。
2、当线程调用await()的时候。会先抢夺lock锁,然后将count--。如果count == 0,说明线程都已经执行完毕,可以开始执行barrierCommand了。并且通过SignalAll()来唤醒所有等待的线程,并且将Generation重新初始化,将count重新赋值为parties,开始下一轮的同步。
3、如果count!=0,说明还有其他线程未完成,就调用condition.await()方法,将线程挂起。
Generation.broken
当有线程被中断或者线程被挂起超时的时候,将generation的broken设置为true,代表同步被破坏了,会立即唤醒所有之前挂起的线程。
新来的线程在操作count--之前,会判断broken,如果broken=true,就会直接抛出异常!