我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。下图演示了这一过程。
在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。
1 //同步操作锁 2 private final ReentrantLock lock = new ReentrantLock(); 3 //线程拦截器 4 private final Condition trip = lock.newCondition(); 5 //每次拦截的线程数 6 private final int parties; 7 //换代前执行的任务 8 private final Runnable barrierCommand; 9 //表示栅栏的当前代 10 private Generation generation = new Generation(); 11 //计数器 12 private int count; 13 14 //静态内部类Generation 15 private static class Generation { 16 boolean broken = false; 17 }
上面贴出了CyclicBarrier所有的成员变量,可以看到CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。接下来我们看看它的构造器。
1 //构造器1 2 public CyclicBarrier(int parties, Runnable barrierAction) { 3 if (parties <= 0) throw new IllegalArgumentException(); 4 this.parties = parties; 5 this.count = parties; 6 this.barrierCommand = barrierAction; 7 } 8 9 //构造器2 10 public CyclicBarrier(int parties) { 11 this(parties, null); 12 }
CyclicBarrier有两个构造器,其中构造器1是它的核心构造器,在这里你可以指定本局游戏的参与者数量(要拦截的线程数)以及本局结束时要执行的任务,还可以看到计数器count的初始值被设置为parties。CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。
1 //非定时等待 2 public int await() throws InterruptedException, BrokenBarrierException { 3 try { 4 return dowait(false, 0L); 5 } catch (TimeoutException toe) { 6 throw new Error(toe); 7 } 8 } 9 10 //定时等待 11 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { 12 return dowait(true, unit.toNanos(timeout)); 13 }
可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法,只不过是传入的参数不同而已。下面我们就来看看dowait方法都做了些什么。
1 //核心等待方法 2 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 final Generation g = generation; 7 //检查当前栅栏是否被打翻 8 if (g.broken) { 9 throw new BrokenBarrierException(); 10 } 11 //检查当前线程是否被中断 12 if (Thread.interrupted()) { 13 //如果当前线程被中断会做以下三件事 14 //1.打翻当前栅栏 15 //2.唤醒拦截的所有线程 16 //3.抛出中断异常 17 breakBarrier(); 18 throw new InterruptedException(); 19 } 20 //每次都将计数器的值减1 21 int index = --count; 22 //计数器的值减为0则需唤醒所有线程并转换到下一代 23 if (index == 0) { 24 boolean ranAction = false; 25 try { 26 //唤醒所有线程前先执行指定的任务 27 final Runnable command = barrierCommand; 28 if (command != null) { 29 command.run(); 30 } 31 ranAction = true; 32 //唤醒所有线程并转到下一代 33 nextGeneration(); 34 return 0; 35 } finally { 36 //确保在任务未成功执行时能将所有线程唤醒 37 if (!ranAction) { 38 breakBarrier(); 39 } 40 } 41 } 42 43 //如果计数器不为0则执行此循环 44 for (;;) { 45 try { 46 //根据传入的参数来决定是定时等待还是非定时等待 47 if (!timed) { 48 trip.await(); 49 }else if (nanos > 0L) { 50 nanos = trip.awaitNanos(nanos); 51 } 52 } catch (InterruptedException ie) { 53 //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程 54 if (g == generation && ! g.broken) { 55 breakBarrier(); 56 throw ie; 57 } else { 58 //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作 59 Thread.currentThread().interrupt(); 60 } 61 } 62 //如果线程因为打翻栅栏操作而被唤醒则抛出异常 63 if (g.broken) { 64 throw new BrokenBarrierException(); 65 } 66 //如果线程因为换代操作而被唤醒则返回计数器的值 67 if (g != generation) { 68 return index; 69 } 70 //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 71 if (timed && nanos <= 0L) { 72 breakBarrier(); 73 throw new TimeoutException(); 74 } 75 } 76 } finally { 77 lock.unlock(); 78 } 79 }
上面贴出的代码中注释都比较详细,我们只挑一些重要的来讲。可以看到在dowait方法中每次都将count减1,减完后立马进行判断看看是否等于0,如果等于0的话就会先去执行之前指定好的任务,执行完之后再调用nextGeneration方法将栅栏转到下一代,在该方法中会将所有线程唤醒,将计数器的值重新设为parties,最后会重新设置栅栏代次,在执行完nextGeneration方法之后就意味着游戏进入下一局。如果计数器此时还不等于0的话就进入for循环,根据参数来决定是调用trip.awaitNanos(nanos)还是trip.await()方法,这两方法对应着定时和非定时等待。如果在等待过程中当前线程被中断就会执行breakBarrier方法,该方法叫做打破栅栏,意味着游戏在中途被掐断,设置generation的broken状态为true并唤醒所有线程。同时这也说明在等待过程中有一个线程被中断整盘游戏就结束,所有之前被阻塞的线程都会被唤醒。线程醒来后会执行下面三个判断,看看是否因为调用breakBarrier方法而被唤醒,如果是则抛出异常;看看是否是正常的换代操作而被唤醒,如果是则返回计数器的值;看看是否因为超时而被唤醒,如果是的话就调用breakBarrier打破栅栏并抛出异常。这里还需要注意的是,如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒。下面贴出nextGeneration方法和breakBarrier方法的具体代码。
1 //切换栅栏到下一代 2 private void nextGeneration() { 3 //唤醒条件队列所有线程 4 trip.signalAll(); 5 //设置计数器的值为需要拦截的线程数 6 count = parties; 7 //重新设置栅栏代次 8 generation = new Generation(); 9 } 10 11 //打翻当前栅栏 12 private void breakBarrier() { 13 //将当前栅栏状态设置为打翻 14 generation.broken = true; 15 //设置计数器的值为需要拦截的线程数 16 count = parties; 17 //唤醒所有线程 18 trip.signalAll(); 19 }