C/C++教程

并发编程-阻塞队列&JUC常用工具

本文主要是介绍并发编程-阻塞队列&JUC常用工具,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

并发编程-阻塞队列&JUC常用工具

本章主要和大家聊聊之前的阻塞队列,并且聊聊他如何使用,以及JUC中常用的一些工具,例如【CountDownLatch】、【Semaphore】、【CyclicBarrier】这些都是控制线程的一些工具,我们会聊聊他们如何使用,以及实现原理。

阻塞队列(之前聊过一下,现在再次补充)

LinkedTransferQueue】:由一个链表构建的一个无界阻塞队列(实际上是一个Interger.maxvalue实际上还是有大小的,只不过他真的足够大):他既是一个队列,也有一个生产者对应一个消费者的功能,因为继承了AbstractQueue并且实现了TransferQueue(这个在SynchronousQueue中有使用,他的特性就是一个生产者对应一个消费者),我们可以认为他是一个链表和SynchronousQueue特性和合体。

LinkedBlockingDeque】:是一个双向链表的队列,他可以支持两端的插入和移除(这就在一定程度上解决了多线程添加元素的竞争问题,因为这样可以减少一半的竞争),只不过增加了几个方法(借鉴网上的图片)

阻塞队列的使用(实际上他相当于异步mq)

我们能想到用异步mq解决问题的地方,都可以使用阻塞队列。大家都了解责任链模式,想当于一个流水线,每个环节处理相关的请求,但是这个责任链点如果太多,势必返回的时间就就会很长,我们就可以利用阻塞队列来提升一下这个性能,减少同步请求带来的损耗。这里写一个责任链中中使用阻塞队列的demo去削峰

首先我们定义四个责任链点,我们知道,普通的责任链点是某个链路点运行完成他自己的任务后,才把责任交给下一个链路,咱们这里,让每个责任链路直接放行,把任务放到阻塞队列中,用一个自旋去消费,这就可以达到削峰的目的,切记,这个只能是不需要立马返回值的业务情况。这些阻塞队列的底层大部分都使用了lock和condition万变不离其宗

//检验数据责任链
public class ValidateProcessor extends Thread implements IRequestProcessor {
        public ValidateProcessor(IRequestProcessor iRequestProcessor) {
                this.iRequestProcessor = iRequestProcessor;
        }

        BlockingQueue<Request> requests = new LinkedBlockingDeque<>();

        //下一个执行者
        IRequestProcessor iRequestProcessor;

        @Override
        public void doMyDuty(Request request) {
                //当某个请求经过我这里的时候,我先不处理,把他放在队列中,然后放行(流量削峰)
                requests.add(request);
        }

        @Override
        public void run() {
                // 这里不断的对数据进行消费
                while (true) {
                        try {
                                // 异步进行请求的处理,其实这里的底层都是使用的lock
                                Request request = requests.take();
                                System.out.println(this.getClass().getSimpleName() + "处理" + request.getName());
                                if (iRequestProcessor != null) {
                                        iRequestProcessor.doMyDuty(request);
                                }
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
        }

}
// 保存数据责任链
public class SaveProcessor extends Thread implements IRequestProcessor {
    public SaveProcessor(IRequestProcessor iRequestProcessor) {
        this.iRequestProcessor = iRequestProcessor;
    }

    BlockingQueue<Request> requests = new LinkedBlockingDeque<>();

    //下一个执行者
    IRequestProcessor iRequestProcessor;
    @Override
    public void doMyDuty(Request request) {
        //当某个请求经过我这里的时候,我先不处理,把他放在队列中,然后放行(流量削峰)
        requests.add(request);
    }
    @Override
    public void run() {
        while (true) {
            try {
                // 异步进行请求的处理
                Request request = requests.take();
                System.out.println(this.getClass().getSimpleName()+"处理"+request.getName());
                if (iRequestProcessor!=null){
                    iRequestProcessor.doMyDuty(request);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
// 打印数据责任链
public class PrintProcessor extends Thread implements IRequestProcessor {
    public PrintProcessor(IRequestProcessor iRequestProcessor) {
        this.iRequestProcessor = iRequestProcessor;
    }

    BlockingQueue<Request> requests = new LinkedBlockingDeque<>();

    //下一个执行者
    IRequestProcessor iRequestProcessor;
    @Override
    public void doMyDuty(Request request) {
        //当某个请求经过我这里的时候,我先不处理,把他放在队列中,然后放行(流量削峰)
        requests.add(request);
    }
    @Override
    public void run() {
        while (true) {
            try {
                // 异步进行请求的处理
                Request request = requests.take();
                System.out.println(this.getClass().getSimpleName()+"处理"+request.getName());
                if (iRequestProcessor!=null){
                    iRequestProcessor.doMyDuty(request);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
//最后一个责任链
public class FinalProcessor  extends Thread implements IRequestProcessor {

    @Override
    public void doMyDuty(Request request) {
        // you can do whatever you want here
    }

}

对责任链进行组装和测试

public class ChainExample {
    public static void main(String[] args) {
        FinalProcessor finalProcessor = new FinalProcessor();
        finalProcessor.start();
        SaveProcessor saveProcessor = new SaveProcessor(finalProcessor);
        saveProcessor.start();
        PrintProcessor printProcessor = new PrintProcessor(saveProcessor);
        printProcessor.start();
        ValidateProcessor validateProcessor = new ValidateProcessor(printProcessor);
        validateProcessor.start();
        Request request=new Request();
        request.setName("Glen");

        // 这里就把问的请求传递给每个消费者,那我们就可以使用
        validateProcessor.doMyDuty(request);
    }
}

这些责任链都需要实现同一个接口,同时有一个dao去传递数据

public interface IRequestProcessor {
    void  doMyDuty(Request request);
}
@Data
public class Request {
    String name;
}

JUC常用并发工具

【CountDownLatch】:是一个同步工具,允许一个或者多个线程一直等待。然后通过某个线程的执行完毕而唤醒其他等待中的线程。他主要提供两方法【await】【countdown】,简而言之他就是一个倒计时的计数器,我们定义一个数字,比如三,那有三个线程都调用他的countdown方法,他的底层是每次一个线程调用一下countdown方法,他体内的数字就减去一,直到数字为0,则被阻塞的线程被唤醒。demo(其实他的作用点像JOIN)->我们看到,其实他就类似于一个信号,当一个线程执行后,告诉下个线程我执行完了,然后在总数中减一,当总数为零则唤醒被阻塞的线程

public class CountDownExample {
    static CountDownLatch countDownLatch=new CountDownLatch(3);
    static class Thread1 extends Thread{
        @Override
        public void run() {
            System.out.println("作为自己的事情"+Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }
    static class Thread2 extends Thread{
        @Override
        public void run() {
            System.out.println("作为自己的事情"+Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }
    static class Thread3 extends Thread{
        @Override
        public void run() {
            System.out.println("作为自己的事情"+Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1=new Thread1();
        thread1.start();
        Thread thread2=new Thread2();
        thread2.start();
        Thread thread3=new Thread3();
        thread3.start();
        countDownLatch.await();
        System.out.println("执行main,所有线程执行玩成");
    }
}

我们可以用它来做服务校验,当所有我们依赖的服务都正常启动后,我们在启动我们的主线程。这里我们使用一个模板模式来模拟这个流程。

首先我们定义一个模板方法类,在这个类中使用线程调用各个子类的验证服务方法。

@Data
public abstract  class  BaseHealthChecker implements Runnable {
    String serviceName;
    CountDownLatch countDownLatch;

    public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch) {
        this.serviceName = serviceName;
        this.countDownLatch=countDownLatch;
    }

    abstract void verifyService() throws InterruptedException;

    //异步验证
    @Override
    public void run() {
        try {
        //调用子类的方法
            verifyService();
        //对计数器进行减一操作
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这里是各个子类的执行逻辑(线程休眠这里只是模拟他去发包和收包的过程)

public class CacheServiceChecker extends BaseHealthChecker {


    public CacheServiceChecker(CountDownLatch countDownLatch) {
        super("CacheServiceChecker",countDownLatch);
    }

    @Override
    void verifyService() throws InterruptedException {
        System.out.println("checking..."+this.getServiceName());
        Thread.sleep(3000);
        System.out.println(this.getServiceName()+"all things are ok");
    }
}
public class DataBaseServiceChecker extends BaseHealthChecker {
    public DataBaseServiceChecker(CountDownLatch countDownLatch) {
        super("DataBaseServiceChecker",countDownLatch);
    }

    @Override
    void verifyService() throws InterruptedException {
        System.out.println("checking..."+this.getServiceName());
        Thread.sleep(3000);
        System.out.println(this.getServiceName()+"all things are ok");
    }
}

这里去组装以及启动各个模板子类的线程

public class ApplicationStartUp {
    static List<BaseHealthChecker> checkers;
    private static CountDownLatch countDownLatch=new CountDownLatch(2);
    static {
        checkers = new ArrayList<>();
        checkers.add(new CacheServiceChecker(countDownLatch));
        checkers.add(new DataBaseServiceChecker(countDownLatch));
    }
    static ApplicationStartUp INSTANCE = new ApplicationStartUp();
    ApplicationStartUp getInstance() {
        return INSTANCE;
    }

    static boolean CheckDependentServices() throws InterruptedException {
        for (BaseHealthChecker checker : checkers) {
            //每一个服务都采用线程去执行
            new Thread(checker).start();
        }
    //这里对主线程进行阻塞
        countDownLatch.await();
        return true;
    }
}

这里对外暴露一个服务,我们的主线程去调用他进行校验各个服务的可用性

public class StartUpMain {
    public static void main(String[] args) {
        try {
            ApplicationStartUp.CheckDependentServices();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    //可以打印这句话就证明主线前面的各个校验线程已经执行完成
        System.err.println("all dependent services are checked and are all available");

    }
}

阻塞多个线程的意思就是,我们在每个线程执行完成后都调用【await】在main中调用【countDown】,然后给countdownlatch初始为0,这样main就相当于一个发令枪,当mian执行了countdown,所有被阻塞的线程也就活了。

CountDownLatch总结:

这就是一种共享锁,可以允许多个线程同时抢到锁,然后等到计数器归零,则同时唤醒。我们来看一下他类的关系图,我们看到他的底层实际上还是用AQS实现的,只不过他走的是一个共享锁

大概流程就是

  • 他维护了一个state的数字,每个线程执行完成后,他的state就--
  • 直到state为0,他则唤醒队列中的所有线程,
  • 这点和我们之前讲到的不同,之前讲到的是只唤醒头节点后的下一个节点

 await(源码解析)

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果他小于零(计数器不为零),那当前线程就应该被阻塞
    if (tryAcquireShared(arg) < 0)
        //这里进行共享锁的抢占
        doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //把当前线程加入到一个节点中,在这里面构建一个双向链
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果当前节点是头结点那就进行抢占
            if (p == head) {
                //共享锁的方式进行抢占,如果大于零则说明抢占锁成功
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //判断自己是否应该挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown(源码解析)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        //每次对原来的数字减一
        int nextc = c-1;
        //如果数字为零,则唤醒所有节点
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
}
//对节点进行唤醒
private void doReleaseShared() {
    //自旋唤醒
    for (;;) {
        //不断的取下一个节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //只要节点状态是signal则进行唤醒
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; 
                //这里进行唤醒,唤醒后就回到await中上次线程被阻塞的地方,在await的自旋中队列中的线程逐个被唤醒
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

 【Semaphore】:我们可以把它叫做新信号灯,实际上我们可以把它理解成一个限流器,它可以限制资源的访问,就是限流,例如【sentinel】,抢占一个令牌,抢占到就通信,没有抢占到就阻塞,它里面主要有两个方法

  • acquire(抢占令牌数):这里是抢占一个令牌,一次可以抢占多个,没有传递参数则只抢占一个
  • realise:释放令牌

如何使用semaphore(这里模拟一个停车场,一共有10个车位,也就是一次只能运行10个线程,当10个中的一个释放,其他才能进行资源的获取)

public class SemaphoreExample {
    public static void main(String[] args) {
        // 限制资源访问的并发数量
        Semaphore semaphore=new Semaphore(10);
        for (int i = 0; i <20 ; i++) {
            new Thread(new car(i,semaphore)).start();
        }
    }
    static  class  car extends Thread{
        private int num;
        private Semaphore semaphore;

        public car(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                //获得一个令牌
                semaphore.acquire();
                System.out.println("第"+num+"车抢到一个车位");
                TimeUnit.SECONDS.sleep(2);
                System.out.println("第"+num+"释放车位!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 释放一个令牌
                semaphore.release();
            }
        }
    }
}

结果:

Semaphore】实现原理:

acquire(总数-1):

  • 当总数为0的时候则阻塞,就类似咱们上面的场景(车库满了)
  • 可能同阻塞n个线程(别的车就无法进入车库)

realist(总数+1):

  • 有令牌就阻塞的线程中唤醒(那肯定有一个队列来存储这些个被阻塞的队列)

源码解析(我们发现它这里还用的是aqs,,所以这里就不赘述了,他底层用的是共享锁):

 

 

 问题:为什么他要使用共享锁呢?这里好像用同步锁更好点,其实想一下,用共享锁就意味着他可以一次唤醒多个线程,那就意味着多个线程可以同时执行,那就提升了性能。

 CyclicBarrier:他是一个可以重复的栅栏,简而言之就是他有类似与一个阀门,当到达一个极值或者顶点的时候,允许多个线程同时运行,这就有点像【countdownlaunch】多个线程等于调用了【await】方法,然后一个线程使用【countdown】方法去唤醒,就等于说是一个投票,当所有人投完票,才能公布结果。一个例子来看一下如何使用。 

使用方法:

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int n=3;
        CyclicBarrier cyclicBarrier=new CyclicBarrier(n,()->{
            System.out.println("所有线程执行完成");
        });
        for (int i = 0; i <n ; i++) {
            new ballot(cyclicBarrier).start();
        }
    }
    static  class  ballot extends Thread{
        CyclicBarrier cyclicBarrier;
        public ballot(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName()+"投票完成,等待其他人进行投票。。");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

实现原理(基于【ReentrantLock】和【Condition】实现):

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        //参与的线程数
        this.parties = parties;
        //用于记录当前已经执行的数
        this.count = parties;
        //执行完成后的毁掉方法
        this.barrierCommand = barrierAction;
    }
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    //首先加锁,保证线程安全性
    lock.lock();
    try {
        //这里就是栅栏,比如当你的计数器成为0,它有可以回到你的初始数值
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        //如果计数器为零
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                //这里执行咱们传递过去的action
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
            try {
                if (!timed)
                    //trip是一个condition
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
              
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
          //这里是signal
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

 

这篇关于并发编程-阻塞队列&JUC常用工具的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!