package LearnCases; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * Created with IntelliJ IDEA. * * @Auther: suibin * @Date: 2021/06/03/21:04 * @Description:手写一个线程池。 * 原理1:当线程池没有任务的时候阻塞,当有任务且线程池空的时候插入新任务,当线程池满了的时候阻塞。 * 原理2:添加Runnable任务线程到BlockingQueue中,它存放5个Runnable。 * 原理3:定义3个线程,每个线程去BlockingQueue中拉取任务Runnable并执行。 * * 用BlockingQueue实现,阻塞队列,它是线程安全;他的作用 * 1、当阻塞队列为空时,从队列中获取元素的操作将会被阻塞。 * 2、当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。 * * ====为什么用? 有什么好处? * 在多线程领域:所谓阻塞,在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。 * * ====为什么需要阻塞队列 * 好处是我们不需要关心什么时候需要阻塞线程,什么时候唤醒线程,因为这一切BlockingQueue都给你一手包办了。 * * ====阻塞队列的种类分析 * ArrayBlockingQueue:由数组结构组成的有界阻塞队列, * LinkedBlockingQueue:由链表结构组成的有界(但是默认值大小为Integer,MAX_VALUE)阻塞队列。 * PriorityBlockingQueue:支持优先级排序的无界阻塞队列。 * DelayQueue:使用优先级队列支持延迟的无界阻塞队列。 * SynchronousQueue:不存储元素的队列,也即单个元素的队列。没有容量,每一个put操作必须等待一个take操作,否则不能添加元素,反之亦然。 * LinkedTransferQueue:由链表结构组成的无界阻塞队列。 * LinkedBlockingDeque:由链表结构组成的双向阻塞队列。 * *================API说明================= * |它的API|抛出异常 |返回true.false|阻塞 |超时 * |插入 |add(e) |offer(e) |put(e)|offer(e,time,unit) * |移除 |remove() |poll() |take()|poll(time,unit) * |检查 |element()|peek() |无 |无 * add:当队列满时,抛出异常; * remove:当队列空时,抛出异常; * offer:插入失败,返回false * poll:队列是空,返回null;否则返回对象 * put:队列是满时,阻塞。等空了,才能放入 * take:队列是空时,阻塞等。 * offer(e,time,unit):超时时间过了,返回false * poll(time,unit):超时时间过了,返回false * */ public class MyFixedSizeTreadPool { private BlockingQueue<Runnable> tasks;//定义一个阻塞队列,作为线程池 List<Worker> workers;//定义一个工作线程数组,用来 volatile boolean working =true;//定义一个是否正在工作的状态,用来在shutdown时判断工作线程是否正在执行;需要可见性,区分不同线程的working //poolSize是多少个任务可以并发执行。queueSize是阻塞队列的长度。 public MyFixedSizeTreadPool(int poolSize,int queueSize) { tasks=new LinkedBlockingQueue(queueSize);//创建一个阻塞队列,大小是6 workers=new ArrayList<>(poolSize);//创建一个工作线程数组,大小是3,也就是说会开3个线程去处理阻塞队列中的任务 for (int i=0;i<poolSize;i++){//这里并发是3,执行3次 Worker w=new Worker(this);//创建一个工作线程,入参是阻塞队列 workers.add(w);//记录3个线程到数组里,这3个线程是系统实实在在开的3个 w.start();//分别启动3个线程,在这个线程里会while循环执行放在LinkedBlockingQueue里存放的submit来的Runnable对象。 } } //提交任务给线程池API public boolean submit(Runnable task){ if(working){//如果工作状态 return tasks.offer(task);//工作线程插入阻塞队列 } return false; } //回收线程池,等待线程池中所有任务都执行完之后,结束线程池。必须回收3个线程,不然会造成线程僵死。 public void shutdown(){ //1、不能接受新任务 this.working=false;//设置不在工作状态,则run while不会继续执行,也就不会继续run新任务 //2、已有的任务执行完; for(Thread t: workers){ System.out.println(t.getState()); //线程的6种状态: //Thread.State.NEW 创建一个线程 //Thread.State.RUNNABLE start开始一个线程 //Thread.State.TERMINATED 线程执行完 if(t.getState()==Thread.State.TIMED_WAITING //线程执行中,带时间的 ||t.getState()==Thread.State.WAITING //线程执行中,不带时间的 ||t.getState()==Thread.State.BLOCKED //抢锁抢不到的状态 ){ t.interrupt();//(注意:执行shutdown这里中断的是3个线程本身,因为3个线程已经开启了5个Worker线程,它关闭不影响5个线程的执行,主线程关掉子线程还继续执行完会自动关闭 // 如果这里不强制中断3个线程,他们可能还会被占用成为僵尸线程)包含上面3种状态,就中断该线程 } } } //定义一个工作线程类,从阻塞队列中取出工作线程并执行run。 static class Worker extends Thread{ private MyFixedSizeTreadPool pool; public Worker(MyFixedSizeTreadPool pool) { this.pool=pool; } public void run(){ while (pool.working||pool.tasks.size()>0){//循环遍历:当满足1、工作状态,2、或者阻塞队列有元素的时候,去阻塞队列里取出一个工作线程run。否则停止遍历。 Runnable task=null; try { if(pool.working){//(注意:执行shutdown如果在这个run之前执行,是不需要阻塞队列直接poll;如果take则后续没有shutdown释放它) task= pool.tasks.take();//取出一个线程对象,然后锁队列不能继续取值;阻塞队列这个位置的线程被占用,不能操作; }else{ task= pool.tasks.poll();//取出一个线程对象,不锁队列(注意:当working期间shutdown) } } catch (InterruptedException e) { e.printStackTrace(); } if(task!=null){ task.run();//当取出的线程对象不为空,执行该线程对象 } } } } //开始使用线程池 public static void main(String[] args) throws InterruptedException { //poolSize是线程数,不设置过大,会增加系统的线程开销;计算任务设置CPU*2;IO任务根据IO阻塞时长设置,比如tomcat是200线程数; MyFixedSizeTreadPool pool=new MyFixedSizeTreadPool(3,6);//创建一个线程池 //给线程池加入5个工作线程(任务),它们会依次插入到阻塞队列当中 for(int i=0;i<5;i++){ int finalI = i; pool.submit(()->{ //这里sleep代表模拟执行逻辑 try { Thread.sleep(1200L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(finalI +"任务执行完毕"); }); } Thread.sleep(100L); pool.shutdown();//关闭线程池中的3个线程 } }
class MyShare { private volatile boolean FLAG = true;//默认开启,进行生产和消费 AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyShare(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception { String data = null; boolean result; while (FLAG) { data = String.valueOf(atomicInteger.incrementAndGet()); result = blockingQueue.offer(data, 2L, TimeUnit.MICROSECONDS); if (result) { System.out.println(Thread.currentThread().getName() + "\t 插入队列data:" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + "\t 插入队列data:" + data + "失败"); } //一秒生产一个 try {TimeUnit.SECONDS.sleep(1);}catch (InterruptedException e){e.printStackTrace();} } System.out.println(Thread.currentThread().getName() + "\t 生产被叫停,Flag为false"); } public void myConsumer() throws Exception { String value = null; while (FLAG) { value = blockingQueue.poll(2, TimeUnit.SECONDS); if (value == null || value.equalsIgnoreCase("")) { // FLAG = false; System.out.println(Thread.currentThread().getName() + "\t 超过两秒没收到消息"); System.out.println(""); System.out.println(""); System.out.println(""); return; } System.out.println(Thread.currentThread().getName() + "\t 消费队列value:" + value + "成功"); } } public void stop() { FLAG = false; } } /** * @author liujian * @descripts 生产者消费者阻塞队列版 * @create 2019-06-24 22:43 */ public class ProductConsumer_BlockingQueueDemo { public static void main(String[] args) { MyShare myShare = new MyShare(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println("生产线程启动"); try { myShare.myProd(); } catch (Exception e) { e.printStackTrace(); } }, "product").start(); new Thread(() -> { System.out.println("消费线程启动"); try { System.out.println(""); System.out.println(""); System.out.println(""); myShare.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer").start(); //休眠五秒 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } myShare.stop(); } }