上一篇文章我们介绍了 AQS
的信号量 Semaphore
,接下来应该轮到 CountDownLatch
了。
CountDownLatch
是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就减1,当计数器的值为 0
时,表示所有线程都执行完毕,然后在闭锁上(调用await方法的线程)等待的线程就可以恢复工作了。
CountDownLatch
可以用来干什么呢?有什么应用场景?实际项目中有应用的场景吗?这应该才是大家比较关心的。我们先来看看官网提供的例子是如何进行应用的https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html 官方提供了两个 demo
我直接把它转成了图片顺带推荐下这个代码转图片的网址https://www.dute.org/code-snapshot 还挺好用的。
★
The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed; The second is a completion signal that allows the driver to wait until all workers have completed.
”
第一个开始信号( startSignal
)会阻止任何工人( worker
)开始工作,在司机到来之前。说白了就是司机没来工人就不能干活。
第二个是完成信号 ( doneSignal
),允许司机 Driver
等待,直到所有的工人完成.说白了就是司机要等到所有工人完工为止。
★
Another typical usage would be to divide a problem into N parts, describe each part with a Runnable that executes that portion and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating thread will be able to pass through await.
”
另一种典型的用法就是把一个大任务拆分N个部分,让多个线程(Worker)执行,每个线程(Worker)执行完自己的部分计数器就减1,当所有子部分都完成后,Driver 才继续向下执行才继续执行。就好比富士康手机加工的流水线一样,组装一步手机需要一条条的流水线来相互配合完成。一条条流水线(Worker),每条线都干自己的活。有的流水线是贴膜的,有的流水线是打螺丝的,有的流水线是质检的、有的流水线充电的、有的流水线贴膜的。等这些流水线都干完了就把一部手机组装完成了。
上面两个就是官方提供的demo,下面我再来两个我们平时开发中可以用到的栗子:
有时候我们写了接口想去压测下它,看看它的最大并发数大概是多少。当然我们可以使用 Jmeter
来进行压测,但是有时候我们不想去下载工具,其实就可以借助 CountDownLatch
来实现。
/** * @author: 公众号:java金融 */ public class TestCountDownLatch1 { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { new Thread(() -> { try { //所有请求都阻塞在这,等待 countDownLatch.await(); // 调用测试接口 System.out.println(Thread.currentThread().getName() + "开始执行……"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 让请求都准备好 Thread.sleep(2000); // 让所有请求统一请求 countDownLatch.countDown(); } }
我们通过 CountDownLatch.await()
,让多个参与者线程启动后阻塞等待,然后在主线程 调用CountDownLatch.countdown()
将计数减为 0
,让所有线程一起往下执行;以此实现了多个线程在同一时刻并发执行,来模拟并发请求的目的。
/** * @author: 公众号:java金融 */ public class TestCountDownLatch1 { public static void main(String[] args) throws InterruptedException { int count = 3; CountDownLatch countDownLatch = new CountDownLatch(count); for (int i = 0; i < count; i++) { final int index = i; new Thread(() -> { try { Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000)); System.out.println("finish" + index + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); }finally{ countDownLatch.countDown(); } }).start(); } countDownLatch.await();// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。 System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); } }
这种场景应该是用的最多了,比如我们打开一个电商的个人中心页面,我们需要调用,用户信息接口、用户订单接口、用户会员信息等接口,然后合并后一起给到前端,假设每个接口最长耗时为 1s
,如果我们同步调用的话最大耗时时间是3s,如果我们采用异步调用然后合并结果,所以最大的耗时时间是 3s
。每个接口调用返回数据后调用 countDown
方法,让计数器进行减1,当把计数器减为0时的这个线程会去唤醒主线程,让它继续往下走。
CountDownLatch
是通过 AQS
的 state
字段来实现的一个计数器,计数器的初始值( state
的值)为 new CountDownLatch
设置的数量,每次调用 countDown
的时候,state的值会进行减1,最后某个线程将state值减为0时,会把调用了 await()
进行阻塞等待的线程进行唤醒。 CountDownLatch
重写了 tryReleaseShared
这个方法,只有当 state
这个字段被设置为0时,也就是 tryReleaseShared
返回 true
的情况就会执行 doReleaseShared
方法,把调用了await的线程进行唤醒。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
CountDownLatch
的其他源码就不进行分析了
CountDownLatch
不能重新初始化或者修改 CountDownLatch
内部计数器的值。
CountDownLatch Semaphore AQS
CountDownLatch join() join
CountDownLatch n countDown
join
() 的实现原理是不停检查join线程是否存活,如果 join
线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。