Java教程

手写一个JAVA线程池和手写一个JAVA生产消费

本文主要是介绍手写一个JAVA线程池和手写一个JAVA生产消费,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

手写一个JAVA线程池 

package LearnCases;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created with IntelliJ IDEA.
 *
 * @Auther: suibin
 * @Date: 2021/06/03/21:04
 * @Description:手写一个线程池。
 * 原理1:当线程池没有任务的时候阻塞,当有任务且线程池空的时候插入新任务,当线程池满了的时候阻塞。
 * 原理2:添加Runnable任务线程到BlockingQueue中,它存放5个Runnable。
 * 原理3:定义3个线程,每个线程去BlockingQueue中拉取任务Runnable并执行。
 *
 * 用BlockingQueue实现,阻塞队列,它是线程安全;他的作用
 * 1、当阻塞队列为空时,从队列中获取元素的操作将会被阻塞。
 * 2、当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。
 *
 * ====为什么用? 有什么好处?
 * 在多线程领域:所谓阻塞,在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
 *
 * ====为什么需要阻塞队列
 * 好处是我们不需要关心什么时候需要阻塞线程,什么时候唤醒线程,因为这一切BlockingQueue都给你一手包办了。
 *
 * ====阻塞队列的种类分析
 * ArrayBlockingQueue:由数组结构组成的有界阻塞队列,
 * LinkedBlockingQueue:由链表结构组成的有界(但是默认值大小为Integer,MAX_VALUE)阻塞队列。
 * PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
 * DelayQueue:使用优先级队列支持延迟的无界阻塞队列。
 * SynchronousQueue:不存储元素的队列,也即单个元素的队列。没有容量,每一个put操作必须等待一个take操作,否则不能添加元素,反之亦然。
 * LinkedTransferQueue:由链表结构组成的无界阻塞队列。
 * LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
 *
 *================API说明=================
 * |它的API|抛出异常  |返回true.false|阻塞  |超时
 * |插入   |add(e)   |offer(e)      |put(e)|offer(e,time,unit)
 * |移除   |remove() |poll()       |take()|poll(time,unit)
 * |检查   |element()|peek()      |无    |无
 * add:当队列满时,抛出异常;
 * remove:当队列空时,抛出异常;
 * offer:插入失败,返回false
 * poll:队列是空,返回null;否则返回对象
 * put:队列是满时,阻塞。等空了,才能放入
 * take:队列是空时,阻塞等。
 * offer(e,time,unit):超时时间过了,返回false
 * poll(time,unit):超时时间过了,返回false
 *
 */
public class MyFixedSizeTreadPool {

    private BlockingQueue<Runnable> tasks;//定义一个阻塞队列,作为线程池
    List<Worker> workers;//定义一个工作线程数组,用来
    volatile boolean working =true;//定义一个是否正在工作的状态,用来在shutdown时判断工作线程是否正在执行;需要可见性,区分不同线程的working

    //poolSize是多少个任务可以并发执行。queueSize是阻塞队列的长度。
    public MyFixedSizeTreadPool(int poolSize,int queueSize) {
        tasks=new LinkedBlockingQueue(queueSize);//创建一个阻塞队列,大小是6
        workers=new ArrayList<>(poolSize);//创建一个工作线程数组,大小是3,也就是说会开3个线程去处理阻塞队列中的任务
        for (int i=0;i<poolSize;i++){//这里并发是3,执行3次
            Worker w=new Worker(this);//创建一个工作线程,入参是阻塞队列
            workers.add(w);//记录3个线程到数组里,这3个线程是系统实实在在开的3个
            w.start();//分别启动3个线程,在这个线程里会while循环执行放在LinkedBlockingQueue里存放的submit来的Runnable对象。
        }
    }
    //提交任务给线程池API
    public boolean submit(Runnable task){
        if(working){//如果工作状态
            return tasks.offer(task);//工作线程插入阻塞队列
        }
        return false;
    }
    //回收线程池,等待线程池中所有任务都执行完之后,结束线程池。必须回收3个线程,不然会造成线程僵死。
    public void shutdown(){
        //1、不能接受新任务
        this.working=false;//设置不在工作状态,则run while不会继续执行,也就不会继续run新任务
        //2、已有的任务执行完;
        for(Thread t: workers){
            System.out.println(t.getState());
            //线程的6种状态:
            //Thread.State.NEW 创建一个线程
            //Thread.State.RUNNABLE start开始一个线程
            //Thread.State.TERMINATED 线程执行完
            if(t.getState()==Thread.State.TIMED_WAITING //线程执行中,带时间的
                ||t.getState()==Thread.State.WAITING //线程执行中,不带时间的
                    ||t.getState()==Thread.State.BLOCKED //抢锁抢不到的状态
            ){
                t.interrupt();//(注意:执行shutdown这里中断的是3个线程本身,因为3个线程已经开启了5个Worker线程,它关闭不影响5个线程的执行,主线程关掉子线程还继续执行完会自动关闭
                // 如果这里不强制中断3个线程,他们可能还会被占用成为僵尸线程)包含上面3种状态,就中断该线程
            }
        }

    }
    //定义一个工作线程类,从阻塞队列中取出工作线程并执行run。
    static class Worker extends Thread{
        private MyFixedSizeTreadPool pool;

        public Worker(MyFixedSizeTreadPool pool) {
            this.pool=pool;
        }
        public void run(){
            while (pool.working||pool.tasks.size()>0){//循环遍历:当满足1、工作状态,2、或者阻塞队列有元素的时候,去阻塞队列里取出一个工作线程run。否则停止遍历。
                Runnable task=null;
                try {
                    if(pool.working){//(注意:执行shutdown如果在这个run之前执行,是不需要阻塞队列直接poll;如果take则后续没有shutdown释放它)
                        task=  pool.tasks.take();//取出一个线程对象,然后锁队列不能继续取值;阻塞队列这个位置的线程被占用,不能操作;
                    }else{
                        task=  pool.tasks.poll();//取出一个线程对象,不锁队列(注意:当working期间shutdown)
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(task!=null){
                    task.run();//当取出的线程对象不为空,执行该线程对象
                }
            }
        }
    }
    //开始使用线程池
    public static void main(String[] args) throws InterruptedException {
        //poolSize是线程数,不设置过大,会增加系统的线程开销;计算任务设置CPU*2;IO任务根据IO阻塞时长设置,比如tomcat是200线程数;
        MyFixedSizeTreadPool pool=new MyFixedSizeTreadPool(3,6);//创建一个线程池
        //给线程池加入5个工作线程(任务),它们会依次插入到阻塞队列当中
        for(int i=0;i<5;i++){
            int finalI = i;
            pool.submit(()->{
                //这里sleep代表模拟执行逻辑
                try {
                    Thread.sleep(1200L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(finalI +"任务执行完毕");
            });
        }
        Thread.sleep(100L);
        pool.shutdown();//关闭线程池中的3个线程
    }
}

手写一个JAVA生产消费:

class MyShare {
    private volatile boolean FLAG = true;//默认开启,进行生产和消费
    AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<String> blockingQueue = null;

    public MyShare(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws Exception {
        String data = null;
        boolean result;
        while (FLAG) {
            data = String.valueOf(atomicInteger.incrementAndGet());
            result = blockingQueue.offer(data, 2L, TimeUnit.MICROSECONDS);
            if (result) {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列data:" + data + "成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列data:" + data + "失败");
            }
            //一秒生产一个
            try {TimeUnit.SECONDS.sleep(1);}catch (InterruptedException e){e.printStackTrace();}


        }
        System.out.println(Thread.currentThread().getName() + "\t 生产被叫停,Flag为false");

    }

    public void myConsumer() throws Exception {
        String value = null;
        while (FLAG) {
            value = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (value == null || value.equalsIgnoreCase("")) {
//                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 超过两秒没收到消息");
                System.out.println("");
                System.out.println("");
                System.out.println("");
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 消费队列value:" + value + "成功");
        }

    }

    public void stop() {
        FLAG = false;
    }
}

/**
 * @author liujian
 * @descripts 生产者消费者阻塞队列版
 * @create 2019-06-24 22:43
 */
public class ProductConsumer_BlockingQueueDemo {
    public static void main(String[] args) {
        MyShare myShare = new MyShare(new ArrayBlockingQueue<>(10));
        new Thread(() -> {
            System.out.println("生产线程启动");
            try {
                myShare.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "product").start();

        new Thread(() -> {
            System.out.println("消费线程启动");
            try {
                System.out.println("");
                System.out.println("");
                System.out.println("");
                myShare.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, "consumer").start();
        //休眠五秒
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        myShare.stop();
    }


}

 

这篇关于手写一个JAVA线程池和手写一个JAVA生产消费的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!