Java教程

Java AQS原理和AQS的同步组件总结

本文主要是介绍Java AQS原理和AQS的同步组件总结,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

AQS 简单介绍

AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。

 

AQS 就是一个抽象类,主要用来构建锁和同步器。

1 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
2 }

 

AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基于 AQS 的。

 

 

AQS 原理

AQS 原理概览

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

看个 AQS(AbstractQueuedSynchronizer)原理图:

 

AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。

1 private volatile int state;//共享变量,使用volatile修饰保证线程可见性

 

状态信息通过 protected 类型的getState()setState()compareAndSetState() 进行操作

 1 /返回同步状态的当前值
 2 protected final int getState() {
 3         return state;
 4 }
 5  // 设置同步状态的值
 6 protected final void setState(int newState) {
 7         state = newState;
 8 }
 9 //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
10 protected final boolean compareAndSetState(int expect, int update) {
11         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
12 }

 

AQS 对资源的共享方式

AQS 定义两种资源共享方式

1) Exclusive(独占)

只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍:

  • 公平锁 :按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁 :当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。

 

下面来看 ReentrantLock 中相关的源代码:

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)

1 /** Synchronizer providing all implementation mechanics */
2 private final Sync sync;
3 public ReentrantLock() {
4     // 默认非公平锁
5     sync = new NonfairSync();
6 }
7 public ReentrantLock(boolean fair) {
8     sync = fair ? new FairSync() : new NonfairSync();
9 }

 

ReentrantLock 中公平锁的 lock 方法

 1 static final class FairSync extends Sync {
 2     final void lock() {
 3         acquire(1);
 4     }
 5     // AbstractQueuedSynchronizer.acquire(int arg)
 6     public final void acquire(int arg) {
 7         if (!tryAcquire(arg) &&
 8             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 9             selfInterrupt();
10     }
11     protected final boolean tryAcquire(int acquires) {
12         final Thread current = Thread.currentThread();
13         int c = getState();
14         if (c == 0) {
15             // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
16             if (!hasQueuedPredecessors() &&
17                 compareAndSetState(0, acquires)) {
18                 setExclusiveOwnerThread(current);
19                 return true;
20             }
21         }
22         else if (current == getExclusiveOwnerThread()) {
23             int nextc = c + acquires;
24             if (nextc < 0)
25                 throw new Error("Maximum lock count exceeded");
26             setState(nextc);
27             return true;
28         }
29         return false;
30     }
31 }

 

非公平锁的 lock 方法:

 1 static final class NonfairSync extends Sync {
 2     final void lock() {
 3         // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
 4         if (compareAndSetState(0, 1))
 5             setExclusiveOwnerThread(Thread.currentThread());
 6         else
 7             acquire(1);
 8     }
 9     // AbstractQueuedSynchronizer.acquire(int arg)
10     public final void acquire(int arg) {
11         if (!tryAcquire(arg) &&
12             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
13             selfInterrupt();
14     }
15     protected final boolean tryAcquire(int acquires) {
16         return nonfairTryAcquire(acquires);
17     }
18 }
19 /**
20  * Performs non-fair tryLock.  tryAcquire is implemented in
21  * subclasses, but both need nonfair try for trylock method.
22  */
23 final boolean nonfairTryAcquire(int acquires) {
24     final Thread current = Thread.currentThread();
25     int c = getState();
26     if (c == 0) {
27         // 这里没有对阻塞队列进行判断
28         if (compareAndSetState(0, acquires)) {
29             setExclusiveOwnerThread(current);
30             return true;
31         }
32     }
33     else if (current == getExclusiveOwnerThread()) {
34         int nextc = c + acquires;
35         if (nextc < 0) // overflow
36             throw new Error("Maximum lock count exceeded");
37         setState(nextc);
38         return true;
39     }
40     return false;
41 }

 

总结:

公平锁和非公平锁只有两处不同:

  1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
  2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

 

2) Share(共享)

多个线程可同时执行,如 Semaphore/CountDownLatchSemaphoreCountDownLatCh、 CyclicBarrierReadWriteLock 我们都会在后面讲到。

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。

 

AQS 底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:

1 protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
2 protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
3 protected boolean tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
4 protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
5 protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

 

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:用Java8 改造后的模板方法模式真的是 yyds!open in new window。

除了上面提到的钩子方法之外,AQS 类中的其他方法都是 final ,所以无法被其他类重写。

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1 。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 state=0 ),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

推荐两篇 AQS 原理和相关源码分析的文章:

  • Java并发之AQS详解
  • Java并发包基石-AQS详解

 

Semaphore(信号量)

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

示例代码如下:

 1 /**
 2  *
 3  * @author Snailclimb
 4  * @date 2018年9月30日
 5  * @Description: 需要一次性拿一个许可的情况
 6  */
 7 public class SemaphoreExample1 {
 8   // 请求的数量
 9   private static final int threadCount = 550;
10 
11   public static void main(String[] args) throws InterruptedException {
12     // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
13     ExecutorService threadPool = Executors.newFixedThreadPool(300);
14     // 一次只能允许执行的线程数量。
15     final Semaphore semaphore = new Semaphore(20);
16 
17     for (int i = 0; i < threadCount; i++) {
18       final int threadnum = i;
19       threadPool.execute(() -> {// Lambda 表达式的运用
20         try {
21           semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
22           test(threadnum);
23           semaphore.release();// 释放一个许可
24         } catch (InterruptedException e) {
25           // TODO Auto-generated catch block
26           e.printStackTrace();
27         }
28 
29       });
30     }
31     threadPool.shutdown();
32     System.out.println("finish");
33   }
34 
35   public static void test(int threadnum) throws InterruptedException {
36     Thread.sleep(1000);// 模拟请求的耗时操作
37     System.out.println("threadnum:" + threadnum);
38     Thread.sleep(1000);// 模拟请求的耗时操作
39   }
40 }

 

执行 acquire() 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire() 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。

当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:

1 semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
2 test(threadnum);
3 semaphore.release(5);// 释放5个许可

 

除了 acquire() 方法之外,另一个比较常用的与之对应的方法是 tryAcquire() 方法,该方法如果获取不到许可就立即返回 false。

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

 

Semaphore 对应的两个构造方法如下:

1 public Semaphore(int permits) {
2         sync = new NonfairSync(permits);
3     }
4 
5 public Semaphore(int permits, boolean fair) {
6         sync = fair ? new FairSync(permits) : new NonfairSync(permits);
7     }

 

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

Semaphore 与 CountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release() 方法,release() 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits 数量的线程能自旋成功,便限制了执行任务线程的数量。

 

CountDownLatch (倒计时器)

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

CountDownLatch 的两种典型用法

1、某一线程在开始运行前等待 n 个线程执行完毕。

将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

2、实现多个线程开始执行任务的最大并行性。

注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 (new CountDownLatch(1)),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。

 

CountDownLatch 的使用示例

 1 public class CountDownLatchExample1 {
 2   // 请求的数量
 3   private static final int threadCount = 550;
 4 
 5   public static void main(String[] args) throws InterruptedException {
 6     // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
 7     ExecutorService threadPool = Executors.newFixedThreadPool(300);
 8     final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 9     for (int i = 0; i < threadCount; i++) {
10       final int threadnum = i;
11       threadPool.execute(() -> {// Lambda 表达式的运用
12         try {
13           test(threadnum);
14         } catch (InterruptedException e) {
15           // TODO Auto-generated catch block
16           e.printStackTrace();
17         } finally {
18           countDownLatch.countDown();// 表示一个请求已经被完成
19         }
20 
21       });
22     }
23     countDownLatch.await();
24     threadPool.shutdown();
25     System.out.println("finish");
26   }
27 
28   public static void test(int threadnum) throws InterruptedException {
29     Thread.sleep(1000);// 模拟请求的耗时操作
30     System.out.println("threadnum:" + threadnum);
31     Thread.sleep(1000);// 模拟请求的耗时操作
32   }
33 }

 

上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");

与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。

 

再插一嘴:CountDownLatch 的 await() 方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:

1 for (int i = 0; i < threadCount-1; i++) {
2 .......
3 }

这样就导致 count 的值没办法等于 0,然后就会导致一直等待。

 

CountDownLatch 的不足

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

 

CountDownLatch 相常见面试题

  • CountDownLatch 怎么用?应用场景是什么?
  • CountDownLatch 和 CyclicBarrier 的不同之处?
  • CountDownLatch 类中主要的方法?

 

 

CyclicBarrier(循环栅栏)

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

再来看一下它的构造函数:

 1 public CyclicBarrier(int parties) {
 2     this(parties, null);
 3 }
 4 
 5 public CyclicBarrier(int parties, Runnable barrierAction) {
 6     if (parties <= 0) throw new IllegalArgumentException();
 7     this.parties = parties;
 8     this.count = parties;
 9     this.barrierCommand = barrierAction;
10 }

其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。

 

CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

 

CyclicBarrier 的使用示例

示例 1

 1 public class CyclicBarrierExample2 {
 2   // 请求的数量
 3   private static final int threadCount = 550;
 4   // 需要同步的线程数量
 5   private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
 6 
 7   public static void main(String[] args) throws InterruptedException {
 8     // 创建线程池
 9     ExecutorService threadPool = Executors.newFixedThreadPool(10);
10 
11     for (int i = 0; i < threadCount; i++) {
12       final int threadNum = i;
13       Thread.sleep(1000);
14       threadPool.execute(() -> {
15         try {
16           test(threadNum);
17         } catch (InterruptedException e) {
18           // TODO Auto-generated catch block
19           e.printStackTrace();
20         } catch (BrokenBarrierException e) {
21           // TODO Auto-generated catch block
22           e.printStackTrace();
23         }
24       });
25     }
26     threadPool.shutdown();
27   }
28 
29   public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
30     System.out.println("threadnum:" + threadnum + "is ready");
31     try {
32       /**等待60秒,保证子线程完全执行结束*/
33       cyclicBarrier.await(60, TimeUnit.SECONDS);
34     } catch (Exception e) {
35       System.out.println("-----CyclicBarrierException------");
36     }
37     System.out.println("threadnum:" + threadnum + "is finish");
38   }
39 
40 }

运行结果如下:

 1 threadnum:0is ready
 2 threadnum:1is ready
 3 threadnum:2is ready
 4 threadnum:3is ready
 5 threadnum:4is ready
 6 threadnum:4is finish
 7 threadnum:0is finish
 8 threadnum:1is finish
 9 threadnum:2is finish
10 threadnum:3is finish
11 threadnum:5is ready
12 threadnum:6is ready
13 threadnum:7is ready
14 threadnum:8is ready
15 threadnum:9is ready
16 threadnum:9is finish
17 threadnum:5is finish
18 threadnum:8is finish
19 threadnum:7is finish
20 threadnum:6is finish
21 ......

 

可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await() 方法之后的方法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。示例代码如下

 1 public class CyclicBarrierExample3 {
 2   // 请求的数量
 3   private static final int threadCount = 550;
 4   // 需要同步的线程数量
 5   private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
 6     System.out.println("------当线程数达到之后,优先执行------");
 7   });
 8 
 9   public static void main(String[] args) throws InterruptedException {
10     // 创建线程池
11     ExecutorService threadPool = Executors.newFixedThreadPool(10);
12 
13     for (int i = 0; i < threadCount; i++) {
14       final int threadNum = i;
15       Thread.sleep(1000);
16       threadPool.execute(() -> {
17         try {
18           test(threadNum);
19         } catch (InterruptedException e) {
20           // TODO Auto-generated catch block
21           e.printStackTrace();
22         } catch (BrokenBarrierException e) {
23           // TODO Auto-generated catch block
24           e.printStackTrace();
25         }
26       });
27     }
28     threadPool.shutdown();
29   }
30 
31   public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
32     System.out.println("threadnum:" + threadnum + "is ready");
33     cyclicBarrier.await();
34     System.out.println("threadnum:" + threadnum + "is finish");
35   }
36 
37 }

运行结果如下:

 1 threadnum:0is ready
 2 threadnum:1is ready
 3 threadnum:2is ready
 4 threadnum:3is ready
 5 threadnum:4is ready
 6 ------当线程数达到之后,优先执行------
 7 threadnum:4is finish
 8 threadnum:0is finish
 9 threadnum:2is finish
10 threadnum:1is finish
11 threadnum:3is finish
12 threadnum:5is ready
13 threadnum:6is ready
14 threadnum:7is ready
15 threadnum:8is ready
16 threadnum:9is ready
17 ------当线程数达到之后,优先执行------
18 threadnum:9is finish
19 threadnum:5is finish
20 threadnum:6is finish
21 threadnum:8is finish
22 threadnum:7is finish
23 ......

 

 

CyclicBarrier 的源码分析

 

当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

1 public int await() throws InterruptedException, BrokenBarrierException {
2   try {
3         return dowait(false, 0L);
4   } catch (TimeoutException toe) {
5         throw new Error(toe); // cannot happen
6   }
7 }

dowait(false, 0L)

 1 // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
 2     private int count;
 3     /**
 4      * Main barrier code, covering the various policies.
 5      */
 6     private int dowait(boolean timed, long nanos)
 7         throws InterruptedException, BrokenBarrierException,
 8                TimeoutException {
 9         final ReentrantLock lock = this.lock;
10         // 锁住
11         lock.lock();
12         try {
13             final Generation g = generation;
14 
15             if (g.broken)
16                 throw new BrokenBarrierException();
17 
18             // 如果线程中断了,抛出异常
19             if (Thread.interrupted()) {
20                 breakBarrier();
21                 throw new InterruptedException();
22             }
23             // cout减1
24             int index = --count;
25             // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
26             if (index == 0) {  // tripped
27                 boolean ranAction = false;
28                 try {
29                     final Runnable command = barrierCommand;
30                     if (command != null)
31                         command.run();
32                     ranAction = true;
33                     // 将 count 重置为 parties 属性的初始化值
34                     // 唤醒之前等待的线程
35                     // 下一波执行开始
36                     nextGeneration();
37                     return 0;
38                 } finally {
39                     if (!ranAction)
40                         breakBarrier();
41                 }
42             }
43 
44             // loop until tripped, broken, interrupted, or timed out
45             for (;;) {
46                 try {
47                     if (!timed)
48                         trip.await();
49                     else if (nanos > 0L)
50                         nanos = trip.awaitNanos(nanos);
51                 } catch (InterruptedException ie) {
52                     if (g == generation && ! g.broken) {
53                         breakBarrier();
54                         throw ie;
55                     } else {
56                         // We're about to finish waiting even if we had not
57                         // been interrupted, so this interrupt is deemed to
58                         // "belong" to subsequent execution.
59                         Thread.currentThread().interrupt();
60                     }
61                 }
62 
63                 if (g.broken)
64                     throw new BrokenBarrierException();
65 
66                 if (g != generation)
67                     return index;
68 
69                 if (timed && nanos <= 0L) {
70                     breakBarrier();
71                     throw new TimeoutException();
72                 }
73             }
74         } finally {
75             lock.unlock();
76         }
77     }

 

总结:CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

 

CyclicBarrier 和 CountDownLatch 的区别

下面这个是国外一个大佬的回答:

CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

这篇关于Java AQS原理和AQS的同步组件总结的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!