java.util.concurrent :concrrent包
java.util.concurrent.atomic :原子包
java.util.concurrent.;locks :锁lock包
package com.bin.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Resources { private int num = 30; private Lock lock = new ReentrantLock();//可重入锁 public void subtractNum() { lock.lock(); try { if (num > 0) { System.out.println(Thread.currentThread().getName() + "售卖前:" + (num--) + "售卖后:num=" + num); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } /** * 1.高内聚低耦合的情况下: 线程 操作(对外暴露的调用方法) 资源类 * 2.判断 / 干活 / 通知 * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while 不能用if * * @Author: 邱成兵 * @Date: Created in 19:30 2021/4/27 */ public class SaleTicket { public static void main(String[] args) { Resources resources = new Resources(); new Thread(() -> { for (int i = 0; i < 40; i++) resources.subtractNum(); }, "A").start(); new Thread(() -> { for (int i = 0; i < 40; i++) resources.subtractNum(); }, "B").start(); new Thread(() -> { for (int i = 0; i < 40; i++) resources.subtractNum(); }, "C").start(); /*new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 40; i++) { resources.subtractNum(); } } },"A").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 40; i++) { resources.subtractNum(); } } },"B").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 40; i++) { resources.subtractNum(); } } },"C").start();*/ } }
package com.bin.concurrent; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class ShareResource { private int number=1;//A:1 B:2 C:3 private Lock lock=new ReentrantLock();//可重入锁 private Condition condition1=lock.newCondition(); private Condition condition2=lock.newCondition(); private Condition condition3=lock.newCondition(); public void print5(){ lock.lock(); try { //判断 if(number!=1){ condition1.await(); } //干活 for (int i = 1; i <=5 ; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=2; //通知 condition2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print10(){ lock.lock(); try { //判断 if(number!=2){ condition2.await(); } //干活 for (int i = 1; i <=10 ; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=3; //通知 condition3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print15(){ lock.lock(); try { //判断 if(number!=3){ condition3.await(); } //干活 for (int i = 1; i <=15 ; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=1; //通知 condition1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } /** * 精准唤醒 * 1.高内聚低耦合的情况下: 线程 操作(对外暴露的调用方法) 资源类 * 2.判断 / 干活 / 通知 * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while 不能用if * 4.标志位 * * @Author: 邱成兵 * @Date: Created in 11:03 2021/4/30 */ public class ThreadOrderAccess { public static void main(String[] args) { ShareResource shareResource=new ShareResource(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print5(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print10(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print15(); } }, "C").start(); } }
package com.bin.concurrent; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class AirConditioner{ private int num=0; private Lock lock=new ReentrantLock();//可重入锁 //代替 synchronized 里面的 wait notify notifyall 方法 awati signal signalAll private Condition condition=lock.newCondition(); public void addNum() throws InterruptedException { lock.lock(); try { while (num!=0){ // wait(); condition.await(); } num++; System.out.println(Thread.currentThread().getName() +"生产" +num); // notify(); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void subtractNum() throws InterruptedException { lock.lock(); try { //判断 while(num==0){ // wait(); condition.await(); } //干活 num--; System.out.println(Thread.currentThread().getName() +"消费"+num); // notify(); //通知 condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } /** * 老版本 * **/ /*public synchronized void addNum() throws InterruptedException { while (num>0){ wait(); } num++; System.out.println(Thread.currentThread().getName() +"生产" +num); notify(); } public synchronized void subtractNum() throws InterruptedException { while(num<=0){ wait(); } num--; System.out.println(Thread.currentThread().getName() +"消费"+num); notify(); }*/ } /** * 交替打印出消费数字 * 2.判断 / 干活 / 通知 * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while 不能用if * * @Author: 邱成兵 * @Date: Created in 11:18 2021/4/28 */ public class ThreadWaitNotifyDemo { public static void main(String[] args) { AirConditioner airConditioner=new AirConditioner(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { airConditioner.addNum(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "A").start(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { airConditioner.subtractNum(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "B").start(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { airConditioner.addNum(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "C").start(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { airConditioner.subtractNum(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "D").start(); } }
同步方法,锁是当前this
静态同步方法,锁是当前对象的.class
同步方法块,锁是()里传的对象,结束或异常必须释放锁
普通方法,互不影响
java.util.concurrentModificationException 并发修改异常
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
每次添加进来先拿到锁, 先拿到以前的集合,再获取长度,在扩容+1,把添加的数据放入扩容过后的集合,再放入以前的集合,并通知下一个-标志位
集合扩容是一半 map扩容是一倍2*n次方 每次加1
并发经验:如果能确认map的大小直接给确定值,避免后续再扩容操作
package com.bin.concurrent; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; /** * 线程安全 * 1.故障现象: java.util.ConCurrentModificationException * 2.导致原因: 线程不安全导致的 * 3.解决方案: 线程安全的集合或map 集合工具类 collections 并发包:concurrent * 4.优化建议: (同样的错误 不出现第二次) * * @Author: qcb * @Date: Created in 11:47 2021/5/6 */ public class NotSafeDemo { public static void mapNotSafe() { // 线程安全的map Map<Object, Object> collections = new ConcurrentHashMap<>();//Collections.synchronizedMap()//new HashMap<>(); for (int i = 0; i < 30; i++) { new Thread(() -> { collections.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0, 8)); System.out.println(collections); }, String.valueOf(i)).start(); } } public static void setNotSafe() { // 线程安全的set Set<String> collections = new CopyOnWriteArraySet(); //Collections.synchronizedSet(new HashSet<>())//new HashSet<>(); for (int i = 0; i < 30; i++) { new Thread(() -> { collections.add(UUID.randomUUID().toString().substring(0, 8)); System.out.println(collections); }, String.valueOf(i)).start(); } } public static void listNotSafe(){ // 线程安全的list List<String> collections= new CopyOnWriteArrayList();//Collections.synchronizedList()//new Vector<>();//new ArrayList<>(); for (int i = 0; i <30 ; i++) { new Thread(() -> { collections.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(collections); }, String.valueOf(i)).start(); } } }
- 传统有2种,java5之后增加了2种 总共4种
package com.bin.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("come in here"); return 1024; } } /** * @Author: 邱成兵 * @Date: Created in 15:15 2021/5/6 * 获取多线程的方式 * 1.继承Thread * 2.实现runnable * 3.实现callable<> * 4.线程池 */ public class CallableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask(new MyThread()); //实现原理 java 多态 new Thread(futureTask, "A").start(); //多次调用 走的是缓存 只会调用一次 //new Thread(futureTask, "B").start(); //获取结果一定要放到最后 不然会阻塞拿到结果在执行 System.out.println(futureTask.get()); } }
作用: 是一种通用的同步工具 等其他线程执行完了 主线程再关闭 关门走人
原理:
- CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞), * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
package com.bin.concurrent; import java.util.concurrent.CountDownLatch; /** * JUC辅助类 * @Author: 邱成兵 * @Date: Created in 16:09 2021/5/6 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //减少计数 开始初始 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进; CountDownLatch countDownLatch=new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(() -> { System.out.println("第"+Thread.currentThread().getName()+"个同学出教室 /t"); countDownLatch.countDown(); }, String.valueOf(i)).start(); } //第二个是完成信号,允许司机等到所有的工作人员完成。 countDownLatch.await(); System.out.println("关门"); } }
允许一组线程全部等待彼此达到共同屏障点的同步辅助 集齐7龙珠
原理:
- CyclicBarrier * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是, * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞, * 直到最后一个线程到达屏障时,屏障才会开门,所有 * 被屏障拦截的线程才会继续干活。 * 线程进入屏障通过CyclicBarrier的await()方法。
package com.bin.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 允许一组线程全部等待彼此达到共同屏障点的同步辅助 * * @Author: 邱成兵 * @Date: Created in 16:53 2021/5/6 */ public class CyclicBarrierDemo { public static void main(String[] args) { //参数1:多少个线程跳闸 参数2:跳闸之后执行的线程 CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.println("集齐7颗龙珠"); }); for (int i = 0; i < 7; i++) { final int temInt=i; new Thread(() -> { System.out.println(Thread.currentThread().getName()+" /t 第" +temInt+"颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
可做秒杀线程数量控制 不管前端请求多少个 只接受固定能接受的数量
主要用于并发的控制可资源的互斥 限流
如果设置semaphore 为1 等同于 synchronized (场景 一个线程持有一个资源多久 可以用这个实现)
原理:
在信号量上我们定义两种操作: * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1), * 要么一直等下去,直到有线程释放信号量,或超时。 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 * * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
package com.bin.concurrent; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * 信号等 * 用户并发的控制和资源的互斥 限流 * @Author: 邱成兵 * @Date: Created in 17:15 2021/5/6 */ public class SemaphoreDemo { public static void main(String[] args) { //模拟3个车位 Semaphore semaphore=new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(() -> { //资源减一 try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"/t 抢到了车位"); try { TimeUnit.SECONDS.sleep(3); }catch (Exception e){ } System.out.println(Thread.currentThread().getName()+"/t 退出停车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { //释放资源 semaphore.release(); } }, String.valueOf(i)).start(); } } }
读读能共存 读写不能共存 写写不能共存
package com.bin.concurrent; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; class MyCache{ private volatile Map<String,Object> map=new HashMap<>(); //读写锁 private ReadWriteLock lock=new ReentrantReadWriteLock(); public void put(String key,Object value){ try { lock.writeLock().lock(); System.out.println(key+" /t 开始写入数据"); TimeUnit.MILLISECONDS.sleep(3); map.put(key,value); System.out.println(key+" /t 写入完成"+value); }catch (Exception e){ e.printStackTrace(); }finally { lock.writeLock().unlock(); } } public void get(String key) { try { lock.readLock().lock(); System.out.println(key+" /t 开始读数据"); TimeUnit.MILLISECONDS.sleep(3); Object o = map.get(key); System.out.println(key+" /t 读取读数据"+o); }catch (Exception e){ e.printStackTrace(); }finally { lock.readLock().unlock(); } } } /** * 读写锁 * @Author: 邱成兵 * @Date: Created in 10:32 2021/5/7 */ public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache=new MyCache(); for (int i = 0; i < 5; i++) { myCache.put(i+"",i+""); } for (int i = 0; i < 5; i++) { myCache.get(i+""); } } }
不用手动阻塞、唤醒线程了 (wait/await notify/signal notifyall/signalAll) BlockingQueue全部自动实现了
package com.bin.concurrent; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 阻塞队列 * @Author: 邱成兵 * @Date: Created in 14:35 2021/5/7 */ public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(3); blockingQueue.add("a");//添加 超过界限会报异常 queue full :队列已满 blockingQueue.remove();//移除先进的第一位 没有会报异常 blockingQueue.element();//检查 没有会报异常 blockingQueue.offer("a"); //超出返回false blockingQueue.poll(); //没有移除的返回 null blockingQueue.peek();//检查 没有返回null blockingQueue.put("a");//添加 超出会阻塞 blockingQueue.take();//移除 没有会阻塞 blockingQueue.offer("a",3l, TimeUnit.SECONDS);//超出多少时间后直接退出 blockingQueue.poll(3l, TimeUnit.SECONDS);//超出多少时间后直接退出 } }
例子:10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。
线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:线程复用;控制最大并发数;管理线程。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
package com.bin.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 线程池三大主流使用方法 * @Author: 邱成兵 * @Date: Created in 15:59 2021/5/7 */ public class ThreadDemo { public static void main(String[] args) { //执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程 // ExecutorService executorService= Executors.newFixedThreadPool(5); //一个任务一个任务的执行,一池一线程 // ExecutorService executorService= Executors.newSingleThreadExecutor(); //执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强 ExecutorService executorService= Executors.newCachedThreadPool(); try { for (int i = 1; i <= 10; i++) { final int teamInt=i; executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+" /t 办理业务"+teamInt); }); } }catch (Exception e){ e.printStackTrace(); }finally { executorService.shutdown(); } } }
1、corePoolSize:线程池中的常驻核心线程数
2、maximumPoolSize:线程池中能够容纳同时
执行的最大线程数,此值必须大于等于1
3、keepAliveTime:多余的空闲线程的存活时间
当前池中线程数量超过corePoolSize时,当空闲时间
达到keepAliveTime时,多余线程会被销毁直到
只剩下corePoolSize个线程为止
4、unit:keepAliveTime的单位
5、workQueue:任务队列,被提交但尚未被执行的任务
6、threadFactory:表示生成线程池中工作线程的线程工厂,
用于创建线程,一般默认的即可
7、handler:拒绝策略,表示当队列满了,并且工作线程大于
等于线程池的最大线程数(maximumPoolSize)时如何来拒绝
请求执行的runnable的策略
1、在创建了线程池后,开始等待请求。
2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1、如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2、如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3、如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4、如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?
一个都不用,我们工作中只能使用自定义的
OOM(OutOfMemoryError? -JAVA内存溢出 虚拟机暴露故障
ExecutorService executorService=new ThreadPoolExecutor(2,//核心 5,//主线程 2L,//过期时间 TimeUnit.SECONDS,//过期单位 new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小 Executors.defaultThreadFactory(), //线程工厂 一般都是用默认的 new ThreadPoolExecutor.AbortPolicy());//拒绝策略
public static void main(String[] args) { //cpu的数量 int cpuNum = Runtime.getRuntime().availableProcessors(); //cpu密集型 内核数+1或+2 //IO密集型 1/总核数/阻塞系数 ExecutorService executorService=new ThreadPoolExecutor(2,//核心 5,//主线程 2L,//过期时间 TimeUnit.SECONDS,//过期单位 new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小 Executors.defaultThreadFactory(), //线程工厂 一般都是用默认的 new ThreadPoolExecutor.AbortPolicy());//拒绝策略 默认抛出异常 }
定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。
这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。(或者加1到2)
特点:
01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用
02:针对单台机器,最大线程数一般只需要设置为CPU核心数的线程个数就可以了
03:这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。
package pool; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Demo02 { public static void main(String[] args) { //自定义线程池! 工作中只会使用 ThreadPoolExecutor /** * 最大线程该如何定义(线程池的最大的大小如何设置!) * 1、CPU 密集型,几核,就是几,可以保持CPU的效率最高! */ //获取电脑CPU核数 System.out.println(Runtime.getRuntime().availableProcessors()); //8核 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, //核心线程池大小 Runtime.getRuntime().availableProcessors(), //最大核心线程池大小(CPU密集型,根据CPU核数设置) 3, //超时了没有人调用就会释放 TimeUnit.SECONDS, //超时单位 new LinkedBlockingDeque<>(3), //阻塞队列 Executors.defaultThreadFactory(), //线程工厂,创建线程的,一般不用动 new ThreadPoolExecutor.AbortPolicy()); //银行满了,还有人进来,不处理这个人的,抛出异常 try { //最大承载数,Deque + Max (队列线程数+最大线程数) //超出 抛出 RejectedExecutionException 异常 for (int i = 1; i <= 9; i++) { //使用了线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完,程序结束,关闭线程池 threadPool.shutdown(); //(为确保关闭,将关闭方法放入到finally中) } } }
定义:IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。
我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。
01:磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
02:网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。
IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU
时间片让出去,直到缓冲区写满。既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待):
CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU
空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率;线程 CPU
时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中
package pool; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Demo02 { public static void main(String[] args) { //自定义线程池! 工作中只会使用 ThreadPoolExecutor /** * 最大线程该如何定义(线程池的最大的大小如何设置!) * 2、IO 密集型 >判断你程序中十分耗IO的线程 * 程序 15个大型任务 io十分占用资源! (最大线程数设置为30) * 设置最大线程数为十分耗io资源线程个数的2倍 */ //获取电脑CPU核数 System.out.println(Runtime.getRuntime().availableProcessors()); //8核 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, //核心线程池大小 16, //若一个IO密集型程序有15个大型任务且其io十分占用资源!(最大线程数设置为 2*CPU 数目) 3, //超时了没有人调用就会释放 TimeUnit.SECONDS, //超时单位 new LinkedBlockingDeque<>(3), //阻塞队列 Executors.defaultThreadFactory(), //线程工厂,创建线程的,一般不用动 new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试和最早的竞争,也不会抛出异常 try { //最大承载数,Deque + Max (队列线程数+最大线程数) //超出 抛出 RejectedExecutionException 异常 for (int i = 1; i <= 9; i++) { //使用了线程池之后,使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完,程序结束,关闭线程池 threadPool.shutdown(); //(为确保关闭,将关闭方法放入到finally中) } } }
1:高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
2:并发不高、任务执行时间长的业务这就需要区分开看了:
a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,线程池中的线程数设置得少一些,减少线程上下文的切换
(其实从一二可以看出无论并发高不高,对于业务中是否是cpu密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)
3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,我们的项目使用的时redis作为缓存(这类非关系型数据库还是挺好的)。增加服务器是第二步(一般政府项目的首先,因为不用对项目技术做大改动,求一个稳,但前提是资金充足),至于线程池的设置,设置参考
2
。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件(任务时间过长的可以考虑拆分逻辑放入队列等操作)对任务进行拆分和解耦。三.:总结:
01:一个计算为主的程序(CPU密集型程序),多线程跑的时候,可以充分利用起所有的 CPU 核心数,比如说 8 个核心的CPU ,开8
个线程的时候,可以同时跑 8 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU
核心数量,反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。02:如果是一个磁盘或网络为主的程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候
CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起
CPU 来是很慢的。此时线程数等于CPU核心数的两倍是最佳的。
最大线程数量 引用地址
/*Function<String,Integer> function=new Function<String, Integer>() { @Override public Integer apply(String s) { return 1024; } };*/ //新写法 lambda 表达式写法 // Function<String,Integer> function=(s) -> {return 1024;}; //一个参数可以省略() 返回可以省略{return } Function<String,Integer> function=s -> 1024; System.out.println(function.apply("a"));
/*Predicate<String> predicate=new Predicate<String>() { @Override public boolean test(String s) { return false; } };*/ //lambda 表达式 // Predicate<String> predicate=s -> s.isEmpty(); //lambada 表达式+方法的引用 Predicate<String> predicate=String::isEmpty; System.out.println(predicate.test("qcb"));
/*Consumer<String> consumer=new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } };*/ //lambda 表达式 // Consumer consumer= s -> System.out.println(s); Consumer consumer= System.out::print; consumer.accept("a");
/*Supplier<String> supplier=new Supplier<String>() { @Override public String get() { return "aa"; } };*/ //lambda 表达式 // Supplier<String> supplier=() -> {return "aa";}; Supplier<String> supplier=() -> "aa"; System.out.println(supplier.get());
流(Stream) 到底是什么呢?
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。“集合讲的是数据,流讲的是计算!”
//题目:请按照给出数据,找出同时满足 * //偶数ID且年龄大于24且用户名转为大写且用户名字母倒排序 * // 最后只输出一个用户名字 User u1 = new User(11, "a", 23); User u2 = new User(12, "b", 24); User u3 = new User(13, "c", 22); User u4 = new User(14, "d", 28); User u5 = new User(16, "e", 26); List<User> list = Arrays.asList(u1, u2, u3, u4, u5); list.stream().filter(user -> { return user.getId() % 2 == 0 && user.getAge() > 24; //id偶数 并且 大于24的 })./*filter(user -> { return user.getAge() > 24; //Function<T,R> mapper) }).*/map(user -> { return user.getUserName().toUpperCase(); //把名称转为大写 }).sorted(/*(user1, user2) -> { return user2.compareTo(user1); //倒叙 }*/Collections.reverseOrder()).limit(1).forEach(System.out::println);//只查一条
原理:Fork:把一个复杂任务进行分拆,大事化小Join:把分拆任务的结果进行合并
ForkJoinPool:分支合并池 类比=> 线程池
ForkJoinTask:ForkJoinTask 类比=> FutureTask
RecursiveTask:递归任务:继承后可以实现递归(自己调自己)调用的任务
@Override protected Integer compute() { if((end-begin)<=ADJUST_VALUE){ for (int i = begin; i <= end; i++) { result=result+i; } }else { int middle = (end + begin) / 2; MyTask myTask1=new MyTask(begin,middle); //分支1 MyTask myTask2=new MyTask(middle+1,end);//分支2 myTask1.fork();//开启分支1 myTask2.fork();//开启分支2 result=myTask1.join()+myTask2.join();//合并 } return result; }
package com.bin.concurrent; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; //RecursiveTask 继承ForkJoinTask 继承 Future 可以调用callable接口 返回计算结果 class MyTask extends RecursiveTask<Integer> { private static final Integer ADJUST_VALUE=10; private int begin; private int end; private int result; public MyTask(Integer begin, Integer end) { this.begin = begin; this.end = end; } @Override protected Integer compute() { if((end-begin)<=ADJUST_VALUE){ for (int i = begin; i <= end; i++) { result=result+i; } }else { int middle = (end + begin) / 2; MyTask myTask1=new MyTask(begin,middle); MyTask myTask2=new MyTask(middle+1,end); myTask1.fork(); myTask2.fork(); result=myTask1.join()+myTask2.join(); } return result; } } /** * 分支合并框架 * @Author: 邱成兵 * @Date: Created in 18:04 2021/5/8 */ public class ForkJoinPollDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { MyTask myTask=new MyTask(0,100); ForkJoinPool forkJoinPool=new ForkJoinPool(); ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask); System.out.println(submit.get()); forkJoinPool.shutdown(); } }
package com.bin.concurrent; import java.util.concurrent.CompletableFuture; /** * 异步调用 * * @Author: 邱成兵 * @Date: Created in 14:37 2021/5/11 */ public class CompletableFutureDemo { public static void main(String[] args) throws Exception { /*CompletableFuture<Void> completableFuture= CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("a"); } });*/ //执行异步调用没有返回参数的 CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println("没有返回值cccccc"); }); completableFuture.get(); /*CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return 1024; } });*/ //执行异步调用有返回参数的 CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { // int a=1024/0; return 1024; }); System.out.println(completableFuture1.whenComplete((t, d) -> { System.out.println("t:" + t); System.out.println("d:" + d); }).exceptionally(f -> { System.out.println(f.getMessage()); return 4444; }).get()); } }
原理:https://blog.csdn.net/finalheart/article/details/87615546