本章主要和大家聊聊之前的阻塞队列,并且聊聊他如何使用,以及JUC中常用的一些工具,例如【CountDownLatch】、【Semaphore】、【CyclicBarrier】这些都是控制线程的一些工具,我们会聊聊他们如何使用,以及实现原理。
【LinkedTransferQueue】:由一个链表构建的一个无界阻塞队列(实际上是一个Interger.maxvalue实际上还是有大小的,只不过他真的足够大):他既是一个队列,也有一个生产者对应一个消费者的功能,因为继承了AbstractQueue并且实现了TransferQueue(这个在SynchronousQueue中有使用,他的特性就是一个生产者对应一个消费者),我们可以认为他是一个链表和SynchronousQueue特性和合体。
【LinkedBlockingDeque】:是一个双向链表的队列,他可以支持两端的插入和移除(这就在一定程度上解决了多线程添加元素的竞争问题,因为这样可以减少一半的竞争),只不过增加了几个方法(借鉴网上的图片)
我们能想到用异步mq解决问题的地方,都可以使用阻塞队列。大家都了解责任链模式,想当于一个流水线,每个环节处理相关的请求,但是这个责任链点如果太多,势必返回的时间就就会很长,我们就可以利用阻塞队列来提升一下这个性能,减少同步请求带来的损耗。这里写一个责任链中中使用阻塞队列的demo去削峰
首先我们定义四个责任链点,我们知道,普通的责任链点是某个链路点运行完成他自己的任务后,才把责任交给下一个链路,咱们这里,让每个责任链路直接放行,把任务放到阻塞队列中,用一个自旋去消费,这就可以达到削峰的目的,切记,这个只能是不需要立马返回值的业务情况。这些阻塞队列的底层大部分都使用了lock和condition万变不离其宗
//检验数据责任链 public class ValidateProcessor extends Thread implements IRequestProcessor { public ValidateProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一个执行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //当某个请求经过我这里的时候,我先不处理,把他放在队列中,然后放行(流量削峰) requests.add(request); } @Override public void run() { // 这里不断的对数据进行消费 while (true) { try { // 异步进行请求的处理,其实这里的底层都是使用的lock Request request = requests.take(); System.out.println(this.getClass().getSimpleName() + "处理" + request.getName()); if (iRequestProcessor != null) { iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } // 保存数据责任链 public class SaveProcessor extends Thread implements IRequestProcessor { public SaveProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一个执行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //当某个请求经过我这里的时候,我先不处理,把他放在队列中,然后放行(流量削峰) requests.add(request); } @Override public void run() { while (true) { try { // 异步进行请求的处理 Request request = requests.take(); System.out.println(this.getClass().getSimpleName()+"处理"+request.getName()); if (iRequestProcessor!=null){ iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } // 打印数据责任链 public class PrintProcessor extends Thread implements IRequestProcessor { public PrintProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一个执行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //当某个请求经过我这里的时候,我先不处理,把他放在队列中,然后放行(流量削峰) requests.add(request); } @Override public void run() { while (true) { try { // 异步进行请求的处理 Request request = requests.take(); System.out.println(this.getClass().getSimpleName()+"处理"+request.getName()); if (iRequestProcessor!=null){ iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } //最后一个责任链 public class FinalProcessor extends Thread implements IRequestProcessor { @Override public void doMyDuty(Request request) { // you can do whatever you want here } }对责任链进行组装和测试
public class ChainExample { public static void main(String[] args) { FinalProcessor finalProcessor = new FinalProcessor(); finalProcessor.start(); SaveProcessor saveProcessor = new SaveProcessor(finalProcessor); saveProcessor.start(); PrintProcessor printProcessor = new PrintProcessor(saveProcessor); printProcessor.start(); ValidateProcessor validateProcessor = new ValidateProcessor(printProcessor); validateProcessor.start(); Request request=new Request(); request.setName("Glen"); // 这里就把问的请求传递给每个消费者,那我们就可以使用 validateProcessor.doMyDuty(request); } }这些责任链都需要实现同一个接口,同时有一个dao去传递数据
public interface IRequestProcessor { void doMyDuty(Request request); } @Data public class Request { String name; }
【CountDownLatch】:是一个同步工具,允许一个或者多个线程一直等待。然后通过某个线程的执行完毕而唤醒其他等待中的线程。他主要提供两方法【await】【countdown】,简而言之他就是一个倒计时的计数器,我们定义一个数字,比如三,那有三个线程都调用他的countdown方法,他的底层是每次一个线程调用一下countdown方法,他体内的数字就减去一,直到数字为0,则被阻塞的线程被唤醒。demo(其实他的作用点像JOIN)->我们看到,其实他就类似于一个信号,当一个线程执行后,告诉下个线程我执行完了,然后在总数中减一,当总数为零则唤醒被阻塞的线程
public class CountDownExample { static CountDownLatch countDownLatch=new CountDownLatch(3); static class Thread1 extends Thread{ @Override public void run() { System.out.println("作为自己的事情"+Thread.currentThread().getName()); countDownLatch.countDown(); } } static class Thread2 extends Thread{ @Override public void run() { System.out.println("作为自己的事情"+Thread.currentThread().getName()); countDownLatch.countDown(); } } static class Thread3 extends Thread{ @Override public void run() { System.out.println("作为自己的事情"+Thread.currentThread().getName()); countDownLatch.countDown(); } } public static void main(String[] args) throws InterruptedException { Thread thread1=new Thread1(); thread1.start(); Thread thread2=new Thread2(); thread2.start(); Thread thread3=new Thread3(); thread3.start(); countDownLatch.await(); System.out.println("执行main,所有线程执行玩成"); } }我们可以用它来做服务校验,当所有我们依赖的服务都正常启动后,我们在启动我们的主线程。这里我们使用一个模板模式来模拟这个流程。
首先我们定义一个模板方法类,在这个类中使用线程调用各个子类的验证服务方法。
@Data public abstract class BaseHealthChecker implements Runnable { String serviceName; CountDownLatch countDownLatch; public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch) { this.serviceName = serviceName; this.countDownLatch=countDownLatch; } abstract void verifyService() throws InterruptedException; //异步验证 @Override public void run() { try { //调用子类的方法 verifyService(); //对计数器进行减一操作 countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }这里是各个子类的执行逻辑(线程休眠这里只是模拟他去发包和收包的过程)
public class CacheServiceChecker extends BaseHealthChecker { public CacheServiceChecker(CountDownLatch countDownLatch) { super("CacheServiceChecker",countDownLatch); } @Override void verifyService() throws InterruptedException { System.out.println("checking..."+this.getServiceName()); Thread.sleep(3000); System.out.println(this.getServiceName()+"all things are ok"); } } public class DataBaseServiceChecker extends BaseHealthChecker { public DataBaseServiceChecker(CountDownLatch countDownLatch) { super("DataBaseServiceChecker",countDownLatch); } @Override void verifyService() throws InterruptedException { System.out.println("checking..."+this.getServiceName()); Thread.sleep(3000); System.out.println(this.getServiceName()+"all things are ok"); } }这里去组装以及启动各个模板子类的线程
public class ApplicationStartUp { static List<BaseHealthChecker> checkers; private static CountDownLatch countDownLatch=new CountDownLatch(2); static { checkers = new ArrayList<>(); checkers.add(new CacheServiceChecker(countDownLatch)); checkers.add(new DataBaseServiceChecker(countDownLatch)); } static ApplicationStartUp INSTANCE = new ApplicationStartUp(); ApplicationStartUp getInstance() { return INSTANCE; } static boolean CheckDependentServices() throws InterruptedException { for (BaseHealthChecker checker : checkers) { //每一个服务都采用线程去执行 new Thread(checker).start(); } //这里对主线程进行阻塞 countDownLatch.await(); return true; } }这里对外暴露一个服务,我们的主线程去调用他进行校验各个服务的可用性
public class StartUpMain { public static void main(String[] args) { try { ApplicationStartUp.CheckDependentServices(); } catch (InterruptedException e) { e.printStackTrace(); } //可以打印这句话就证明主线前面的各个校验线程已经执行完成 System.err.println("all dependent services are checked and are all available"); } }阻塞多个线程的意思就是,我们在每个线程执行完成后都调用【await】在main中调用【countDown】,然后给countdownlatch初始为0,这样main就相当于一个发令枪,当mian执行了countdown,所有被阻塞的线程也就活了。
CountDownLatch总结:
这就是一种共享锁,可以允许多个线程同时抢到锁,然后等到计数器归零,则同时唤醒。我们来看一下他类的关系图,我们看到他的底层实际上还是用AQS实现的,只不过他走的是一个共享锁
,大概流程就是,
- 他维护了一个state的数字,每个线程执行完成后,他的state就--
- 直到state为0,他则唤醒队列中的所有线程,
- 这点和我们之前讲到的不同,之前讲到的是只唤醒头节点后的下一个节点
await(源码解析)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果他小于零(计数器不为零),那当前线程就应该被阻塞 if (tryAcquireShared(arg) < 0) //这里进行共享锁的抢占 doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //把当前线程加入到一个节点中,在这里面构建一个双向链 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //如果当前节点是头结点那就进行抢占 if (p == head) { //共享锁的方式进行抢占,如果大于零则说明抢占锁成功 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //判断自己是否应该挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }countDown(源码解析)
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; //每次对原来的数字减一 int nextc = c-1; //如果数字为零,则唤醒所有节点 if (compareAndSetState(c, nextc)) return nextc == 0; } } } //对节点进行唤醒 private void doReleaseShared() { //自旋唤醒 for (;;) { //不断的取下一个节点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //只要节点状态是signal则进行唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //这里进行唤醒,唤醒后就回到await中上次线程被阻塞的地方,在await的自旋中队列中的线程逐个被唤醒 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }【Semaphore】:我们可以把它叫做新信号灯,实际上我们可以把它理解成一个限流器,它可以限制资源的访问,就是限流,例如【sentinel】,抢占一个令牌,抢占到就通信,没有抢占到就阻塞,它里面主要有两个方法
- acquire(抢占令牌数):这里是抢占一个令牌,一次可以抢占多个,没有传递参数则只抢占一个
- realise:释放令牌
如何使用semaphore(这里模拟一个停车场,一共有10个车位,也就是一次只能运行10个线程,当10个中的一个释放,其他才能进行资源的获取)
public class SemaphoreExample { public static void main(String[] args) { // 限制资源访问的并发数量 Semaphore semaphore=new Semaphore(10); for (int i = 0; i <20 ; i++) { new Thread(new car(i,semaphore)).start(); } } static class car extends Thread{ private int num; private Semaphore semaphore; public car(int num, Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } @Override public void run() { try { //获得一个令牌 semaphore.acquire(); System.out.println("第"+num+"车抢到一个车位"); TimeUnit.SECONDS.sleep(2); System.out.println("第"+num+"释放车位!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 释放一个令牌 semaphore.release(); } } } }结果:
【Semaphore】实现原理:
acquire(总数-1):
- 当总数为0的时候则阻塞,就类似咱们上面的场景(车库满了)
- 可能同阻塞n个线程(别的车就无法进入车库)
realist(总数+1):
- 有令牌就阻塞的线程中唤醒(那肯定有一个队列来存储这些个被阻塞的队列)
源码解析(我们发现它这里还用的是aqs,,所以这里就不赘述了,他底层用的是共享锁):
问题:为什么他要使用共享锁呢?这里好像用同步锁更好点,其实想一下,用共享锁就意味着他可以一次唤醒多个线程,那就意味着多个线程可以同时执行,那就提升了性能。
CyclicBarrier:他是一个可以重复的栅栏,简而言之就是他有类似与一个阀门,当到达一个极值或者顶点的时候,允许多个线程同时运行,这就有点像【countdownlaunch】多个线程等于调用了【await】方法,然后一个线程使用【countdown】方法去唤醒,就等于说是一个投票,当所有人投完票,才能公布结果。一个例子来看一下如何使用。
使用方法:
public class CyclicBarrierExample { public static void main(String[] args) { int n=3; CyclicBarrier cyclicBarrier=new CyclicBarrier(n,()->{ System.out.println("所有线程执行完成"); }); for (int i = 0; i <n ; i++) { new ballot(cyclicBarrier).start(); } } static class ballot extends Thread{ CyclicBarrier cyclicBarrier; public ballot(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"投票完成,等待其他人进行投票。。"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } }实现原理(基于【ReentrantLock】和【Condition】实现):
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); //参与的线程数 this.parties = parties; //用于记录当前已经执行的数 this.count = parties; //执行完成后的毁掉方法 this.barrierCommand = barrierAction; }private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //首先加锁,保证线程安全性 lock.lock(); try { //这里就是栅栏,比如当你的计数器成为0,它有可以回到你的初始数值 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //如果计数器为零 if (index == 0) { // tripped boolean ranAction = false; try { //这里执行咱们传递过去的action final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) //trip是一个condition trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { //这里是signal breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }