java.util.concurrent在并发编程中使用的工具类:并发包,并发原子包,并发lock包
进程:运行在后台的某个程序
线程:概念性的东西就不重复了,,,可参考OS书上的解释
并行:在某一个时刻只有一个线程在工作
并发:多个线程同一时间点抢争同一个资源
线程的6个状态:NEW,RUNNABLE就绪,WAITING等待,BLOCKED阻塞,TIMED_WAITING等待超时, TERMINATED
线程的sleep和wait会导致线程进入到BLOCKED,但是sleep并不释放锁,进入锁池或者等待池,唤醒之后可以主动竞争锁;而wait会释放锁,需要唤醒
在高内聚低耦合的前提下,线程操纵(通过资源类对外暴露的方法供线程使用)资源类
package com.hz.juc; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; // 定义资源类 class Ticket { private int number = 100; private Lock lock = new ReentrantLock(); // 可重入的互斥锁 public void sale(){ lock.lock(); try{ if(number > 0){ System.out.println(Thread.currentThread().getName() + " 卖出第:" + (number--) + "还剩下:" + number); }else{ System.out.println("票已抢完..."); } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } /** * 在高内聚低耦合的前提下,线程操纵(对外暴露调用方法供其他线程使用)调用资源类 * */ public class SaleTicket { public static void main(String[] args) { Ticket ticket = new Ticket(); // Thread(Runnable target, String name) // 匿名内部类 // new Thread(new Runnable() { // @Override // public void run() { // for (int i = 0; i < 110; i++) { // ticket.sale(); // } // } // }, "t1").start(); // new Thread(new Runnable() { // @Override // public void run() { // for (int i = 0; i < 110; i++) { // ticket.sale(); // } // } // }, "t2").start(); // new Thread(new Runnable() { // @Override // public void run() { // for (int i = 0; i < 110; i++) { // ticket.sale(); // } // } // }, "t3").start(); // lambda表达式 new Thread(() -> { for (int i = 0; i < 110; i++) { ticket.sale(); }}, "t1").start(); new Thread(() -> { for (int i = 0; i < 110; i++) { ticket.sale(); }}, "t2").start(); new Thread(() -> { for (int i = 0; i < 110; i++) { ticket.sale(); }}, "t3").start(); } }
两个线程,可以操作初始值为0的一个变量,实现一个线程对该变量加1,一个线程对该变量-1。实现交替,重复10轮;
判断资源是否还有 -> 操作资源的方法 -> 通知线程
class Mutex{ private int number = 0; public synchronized void product() throws InterruptedException { // 判断当前互斥变量是否为0 第一次初始值为0 则跳过if 如果不为0 则当前线程进行等待 while(number != 0) { this.wait(); } // 使用资源 number++; System.out.println(Thread.currentThread().getName() + " 生产资源:" + number); // 通知 由于使用wait方法等待会释放锁 所以使用资源结束需要唤醒进程(判断条件不能用if 只能用while) this.notifyAll(); } public synchronized void consumer() throws InterruptedException { while(number == 0){ this.wait(); } number--; System.out.println(Thread.currentThread().getName() + " 消费资源:" + number); this.notifyAll(); } } public class ProducerAndCustomer { public static void main(String[] args) { Mutex mutex = new Mutex(); new Thread(() -> { for (int i = 1; i <=10; i++) { try{ mutex.product(); }catch (Exception e){ e.printStackTrace(); } } }, "t1").start(); new Thread(() -> { for (int i = 1; i <= 10; i++) { try{ mutex.consumer(); }catch (Exception e){ e.printStackTrace(); } } }, "t2").start(); } }
控制台打印输出:
可以正确的输出1,0,1,0…证明使用了synchronized+wait+notify(/notifyAll)可以解决线程并发问题。
在JDK1.8版本之后,JCU中出现了新的方法来解决JCU中的lock替代之前的synchronized,Condition中的方法await和signal替换之前的wait和notify(或notifyAll)
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Mutex2{ private int number = 0; final Lock lock = new ReentrantLock(); final Condition condition = lock.newCondition(); public void producer(){ lock.lock(); try{ while(number != 0){ condition.await(); } number++; System.out.println(Thread.currentThread().getName() + " 生产资源:" + number); condition.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void consumer(){ lock.lock(); try{ while(number == 0){ condition.await(); } number --; System.out.println(Thread.currentThread().getName() + " 消费资源:" + number); condition.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } public class ProducerAndCustomer_2 { public static void main(String[] args) { Mutex2 mutex = new Mutex2(); new Thread(() -> { for (int i = 1; i <=10; i++) { try{ mutex.producer(); }catch (Exception e){ e.printStackTrace(); } } }, "t1").start(); new Thread(() -> { for (int i = 1; i <= 10; i++) { try{ mutex.consumer(); }catch (Exception e){ e.printStackTrace(); } } }, "t2").start(); } }
但是新方法的特性可以解决以前多线程交互中的虚假唤醒问题,如果想要实现:
多线程之间按顺序调用实现 A -> B -> C
lock配合Condition使用:精确通知 精准唤醒
class ShareResources{ private int mutex = 1; private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void print5(){ lock.lock(); try{ while(mutex != 1){ condition1.await(); } for (int i = 1; i <= 5; i++) { System.out.println(Thread.currentThread().getName() + " 第" + i + "次"); } // 通知 精确唤醒2 修改标志位 mutex = 2; condition2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print10(){ lock.lock(); try{ while(mutex != 2){ condition2.await(); } for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName() + " 第" + i + "次"); } // 通知 精确唤醒1 修改标志位 mutex = 3; condition3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print15(){ lock.lock(); try{ while(mutex != 3){ condition3.await(); } for (int i = 1; i <= 15; i++) { System.out.println(Thread.currentThread().getName() + " 第" + i + "次"); } // 通知 精确唤醒 修改标志位 mutex = 1; condition1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } public class ProducerAndCustomer_3 { public static void main(String[] args) { ShareResources resources = new ShareResources(); new Thread( () -> { for (int i = 0; i < 10; i++) { resources.print5(); } }, "t1").start(); new Thread( () -> { for (int i = 0; i < 10; i++) { resources.print10(); } }, "t2").start(); new Thread( () -> { for (int i = 0; i < 10; i++) { resources.print15(); } }, "t3").start(); } }
这里对于三个方法设置了3个不同的condition相当于三把钥匙,对于打印方法1,判断条件为当前互斥变量mutex是否等于1,不等于1则当前线程进入到while条件里,进项等待,反之进行打印输出,并且通知下一个需要工作的线程(也就是修改标志位),并且唤醒下一个线程(做到了精确唤醒);2和3的流程类似,形成按照顺序A->B->C->A的执行链。
class Phone{ public static synchronized void sendEmail(){ try { TimeUnit.SECONDS.sleep(4); // 暂停4秒钟 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发送邮件..."); } public static synchronized void sendWechat(){ System.out.println("发微信..."); } public void watchMovie(){ System.out.println("看电影..."); } } public class Lock8 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone(); // Phone phone2 = new Phone(); new Thread(() -> { try{ phone1.sendEmail(); }catch (Exception e){ e.printStackTrace(); } }, "t1").start(); Thread.sleep(100); new Thread(() -> { try{ phone1.sendWechat(); // phone1.watchMovie(); // phone2.sendWechat(); }catch (Exception e){ e.printStackTrace(); } }, "t2").start(); } }
标准访问,先打印邮件再执行发微信
邮件先暂停4秒,先打印邮件再打印微信
以上两种锁较为简单,一个类里面如果有多个synchronized普通方法,某一个时刻内,只能有一个线程去调用其中的一个synchronized方法,其他的线程只能等待;锁的是当前访问对象this(调用者),被锁定后,其他实例对象的线程都不能进入到当前对象的其他synchronized方法
新增一个普通方法,邮件先暂停4秒,先打印看电影再打印发邮件:由于新增加的方法并没有加上synchronized上锁,并不需要和发邮件方法去抢占资源,所以可以直接调用watchMovie方法
两个资源类,两个同步普通方法,邮件暂停4秒,先执行发微信再执行发邮件: sendEmail方法锁的是当前对象phone1,而phone2是另一个对象,不是同一把锁,各用各的,所以phone2的方法先执行
两个静态同步方法,同一个资源,邮件暂停4秒,先打印邮件再打印微信
两个静态同步方法,两个资源,邮件暂停4秒,先打印邮件再打印微信
由于静态同步方法锁的是所在的整个类模板(类锁),加锁的对象从实例对象变成类,无论有多少资源,同一个时刻只能有一个线程去调用方法,与情况4相反
普通同步方法锁的是当前new出来的实例对象(this),而静态同步方法锁的是这个类模板(.class),一个是对象锁一个是类锁,互不影响,所以微信方法执行;对于两个资源,一个实例对象也是一个新的对象锁,与类锁也不会竞争
举例说明list是线程不安全的:线程数增加时,出现java.util.ConcurrentModificationException也就是常说的并发修改异常
public class ListAndMap { public static void main(String[] args) { // 当线程数增多时会出现异常:java.util.ConcurrentModificationException并发修改异常 // 导致原因:add方法并没有加线程安全 List<String> list = new ArrayList<>(); for (int i = 0; i < 30; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(list); }, String.valueOf(i)).start(); } } }
出现的原因是:对于ArrayList的源码中的add方法并没有加锁导致,同一时刻可以有多个线程进行调用
解决方案:
测试打印输出:程序可以正常运行
关于HashSet也是同理解决线程不安全的问题,HashSet底层就是HashMap,add方法就是调用HashMap中的put方法,而对于value是一个固定的常量PRESENT
同理,使用使用Collections工具类中的synchronizedSet方法将HashSet加锁;或者CopyOnWriteArraySet;
Collections.synchronizedMap()或者ConcurrentHashMap来解决map的线程不安全问题。
HashMap的底层是数组+链表+红黑树,默认初始值16,负载因子0.75(可修改),当size达到12时,map会进行扩容(翻倍),插入数据时,比较hash地址,如果地址相同,并且插入的key与之前在该位置的key相同,则替换之前的value,如果不相同,该hash地址则进行链表存储(链地址法);当size>8时单向链表变成红黑树存储
多线程中,获得多线程的方式:
创建Runnable实现类的实例,并以此实例作为Thread类的target来创建Thread对象,该Thread对象才是真正的线程对象。
callable和Runnable接口的区别:
由于Thread的构造方法中没有以Callable接口为参数的,而Callable接口中存在一个子接口RunnableFuture,该子接口的实现类有一个FutureTask,并且在该类的构造方法中存在FutureTask(Callable callable)。因此可以使用该类去包装Callable接口 才能作为Thread的target来创建线程:
Thread(Runnable target, String name) ->
Thread(RunnableFuture target, String name) ->
Thread(FutureTask(Callable) target, String name)
class MyThread1 implements Callable<Integer>{ @Override public Integer call() throws Exception { TimeUnit.SECONDS.sleep(4); System.out.println("call...."); return 999; } } class MyThread2 implements Runnable{ @Override public void run() { } } public class CallableDemo { public static void main(String[] args) throws Exception { MyThread1 thread1 = new MyThread1(); FutureTask task = new FutureTask(thread1); new Thread(task, "t1").start(); new Thread(task, "t2").start(); System.out.println(Thread.currentThread().getName() + "====完成===="); System.out.println("返回值:" + task.get()); // 获取callable中call方法的返回值 } }
控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务;
如果线程数量超过最大数量,超出数量的线程排队等候,等待其他线程处理完毕,再从队列中取出任务来执行特点
线程复用,控制最大并发数,管理线程
线程池通过Executor框架实现的,用到Executor,Executors(线程池工具类,类似Arrays和Collections),
ExecutorService和ThreadPoolExecutor
使用线程池工具类来创建线程池的3大方法:
三个方法的源码实现都是new一个<ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
而ThreadPoolExecutor构造函数中除了上述5个参数外,还包括threadFactory, defaultHandler
使用者提交任务 -> 线程池首先判断corePoolSize是否已满 -> 否 -> 创建线程执行任务;
反之查询阻塞队列workQueue是否已满 -> 是 -> 线程池进行扩容能够满足需求 -> 否 -> 按照拒绝策略处理无法执行的任务;
阻塞队列没有满,则将任务添加到队列中进行等待;
线程池扩容能够满足需要,则创建线程执行任务
线程池的创建方法?
线程池不允许使用Executors去创建,而是使用ThreadPoolExecutor来创建,由于Executors返回线程池对象的缺点有:
如何使用线程池?
import java.util.concurrent.*; public class ThreadPool_2 { public static void main(String[] args) { ExecutorService pool = new ThreadPoolExecutor( 2, 5, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); try{ for (int i = 1; i <= 9; i++) { pool.execute( () -> { System.out.println(Thread.currentThread().getName() + "====执行===="); }); } }catch (Exception e){ e.printStackTrace(); }finally { pool.shutdown(); } } public static void initPool(){ } }
这里再创建线程池使用的是默认拒绝策略AbortPolicy:当任务数 > maximumPoolSIze + 阻塞Queue的size时直接会抛出异常
CallerRunsPolicy:调用者运行一种机制,既不会抛出任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
DiscardPolicy:默认丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许丢失任务,是最好的一种策略
DiscardOldestPolicy:抛弃任务队列中等待最久的任务,然后将当前任务加入队列中,再次提交当前任务
如果是CPU密集型,设置maximumPoolSize为当前主机上的核数+1