目录
线程状态
线程池-基本原理
线程池 - Executors默认线程池
线程池 - ThreadPoolExecutor
线程池参数-拒绝策略
volatile
原子性
原子性 - AtomicInteger
AtomicInteger - 内存解析
AtomicInteger - 源码解析
悲观锁和乐观锁
并发工具类 - Hashtable
并发工具类 - ConcurrentHashMap基本使用
ConcurrentHashMap1.7原理
ConcurrentHashMap1.8原理
并发工具类 - CountDownLatch
并发工具类 - Semaphore
如何验证线程的状态有几种?
API中查询Thread.State内部类, 可以看到虚拟机中线程的六种状态
NEW
RUNNABLE
BLOCKED
WAITING
TIMED_WAITING
TERMINATED
虚拟机中线程的六种状态?
NEW 创建对象
RUNNABLE start()
BLOCKED 无法获得锁对象
WAITING wait()
TIMED_WAITING sleep()
TERMINATED 所有代码执行完毕
吃饭摔碗故事的解决思路?
1. 找一个柜子放碗, 此时柜子是空的
2. 第一次吃饭, 买一次碗
3. 吃完后将碗放入柜子
4. 第二次吃饭, 就不需要买碗了, 直接从柜子中拿
5. 吃完再次将碗放回柜子
之前使用多线程存在类似的问题? -> 效率问题
1. 用到线程就要创建
2. 用完之后线程就消亡了
如何解决?
1. 创建一个池子(线程池), 刚开始是空的
2. 有任务要执行时, 才会创建线程对象, 任务执行完毕, 将线程对象归还给池子
3. 所有任务都执行完毕, 关闭连接池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务。执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
简单来说,线程池就相当于是线程的集合。
Executors默认线程池使用思路?
1. 创建一个池子(线程池), 刚开始是空的 -> 使用Executors的静态方法 -> 查阅API
2. 有任务要执行时, 才会创建线程对象, 任务执行完毕, 将线程对象归还给池子 -> submit();
3. 所有任务都执行完毕, 关闭连接池 -> shutdown();
Executors类创建线程池的方法?
1. newCachedThreadPool();
创建一个可根据需要创建新线程的线程池, 最大容纳int的MAX_VALUE个线程
2. newFixedThreadPool(int 最大容量);创建一个可重用固定线程数的线程池
创建线程池:newCachedThreadPool()
public class ThreadPoolDemo01 { public static void main(String[] args) throws InterruptedException { // 创建线程池服务对象, 可以控制线程池 ExecutorService executorService = Executors.newCachedThreadPool(); // 通过线程池服务对象, 创建线程 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "线程在执行了"); }); Thread.sleep(1000); // 通过线程池服务对象创建线程 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "线程在执行了"); }); // 关闭线程池 executorService.shutdown(); ExecutorService executorService = Executors.newFixedThreadPool(2); System.out.println(executorService); // } }
创建指定线程数量的线程池:newFixedThreadPool()
Executors类创建线程池的方法? 1. newCachedThreadPool(); 创建线程池, 默认是空的, 最大容纳int的MAX_VALUE 2. newFixedThreadPool(int 最大容量); 如果查询默认的线程数量? 1. debug查看, workers表示线程数量, 初始size=0 2. 调用ThreadPoolExecutor对象的getPoolSize方法 代码示例: public class ThreadPoolDemo02 { public static void main(String[] args) { // 创建线程服务对象 ExecutorService executorService = Executors.newFixedThreadPool(10); // 强转executorService ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService; System.out.println(executor.getPoolSize()); //0 // 启动线程1 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "线程在执行了"); }); // 启动线程2 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "线程在执行了"); }); System.out.println(executor.getPoolSize()); //2 // 释放资源 executorService.shutdown(); } }
上述方式都是java帮我们创建线程池对象, 如果我们想自己创建怎么办?
跟进上述两个方法的源码 1. newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 2. newFixedThreadPool(int 最大容量); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 我们发现底层都new了ThreadPoolExecutor, 自己创建的话可以使用该类!
餐厅员工服务举例:
1. 正式员工数量 核心线程数量
2. 餐厅最大员工数 最大线程数量
3. 临时员工被辞退空闲时长(值) 空闲时间(值)
4. 临时员工被辞退空闲时长(单位) 空闲时间(单位)
5. 排队客户 任务队列
6. 从哪里招人 创建线程的方式
7. 当排队人数过多, 拒绝策略 要执行的任务过多时的解决方案
ThreadPoolExecutor pool = new ThreadPoolExecutor(
核心线程数量, //不能小于0
最大线程数量, //不能小于等于0, 最大数量 ?= 核心线程数量
空闲线程最大存活时间, //不能小于0
时间单位, //时间单位
任务队列, //不能为null
创建线程工厂, //不能为null
拒绝策略 //不能为null
);
public class ThreadPoolDemo03 { public static void main(String[] args) { // 创建线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor( 3, //核心线程数量 5, //最大线程数量 2, //空闲线程最大存活时间 TimeUnit.SECONDS, //秒 new ArrayBlockingQueue<>(10), //任务队列(阻塞队列) Executors.defaultThreadFactory(), //默认工厂 new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略(停止) ); // 创建线程对象 pool.submit(new MyRunnable()); pool.submit(new MyRunnable()); // 释放资源 pool.shutdown(); } } class MyRunnable implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + "线程执行了"); } }
1. 什么时候拒绝?
提交的任务 > 线程池最大容量 + 任务队列容量的时候
2. 如何拒绝?
2.1 ThreadPoolExecutor.AbortPolicy(); 丢弃任务抛出异常(默认策略)
2.2 ThreadPoolExecutor.DiscardPolicy(); 丢弃任务不抛出异常(不推荐)
2.3 ThreadPoolExecutor.DiscardOldestPolicy(); 抛弃队列中等待时间最久的,将当前任务加入
2.4 ThreadPoolExecutor.CallerRunsPolicy(); 调用任务的run()绕过线程池执行
默认策略代码示例:
public class ThreadPoolDemo04 { public static void main(String[] args) { // 创建线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor( 2, //核心线程数量 5, //最大线程数量 2, //空闲线程最大存活时间 TimeUnit.SECONDS, //秒 new ArrayBlockingQueue<>(10), //任务队列: 让任务在队列中等有线程空闲了,再从队列中获取任务并执行 Executors.defaultThreadFactory(), //默认工厂: 底层会按照默认方式创建线程对象 new ThreadPoolExecutor.AbortPolicy() //拒绝策略(默认): 提交任务 > 最大线程数量 + 任务队列时拒绝 ); // ThreadPoolExecutor.AbortPolicy() 丢弃任务抛出异常(默认策略) // 如果提交任务 > 最大线程数量 + 任务队列, 抛出异常 for (int i = 1; i <= 16; i++) { pool.submit(new MyRunnable()); // java.util.concurrent.RejectedExecutionException } // 释放资源 pool.shutdown(); } }
非默认任务拒绝策略:
ThreadPoolExecutor.DiscardPolicy(); 丢弃任务不抛出异常(不推荐)
只能看到线程池最大线程数量 + 任务队列容量条结果, 没有报错提示
ThreadPoolExecutor.DiscardOldestPolicy(); 抛弃队列中等待时间最久的,将当前任务加入
只能看到线程池最大线程数量 + 任务队列容量条结果, 其中最后一条是最的任务
ThreadPoolExecutor.CallerRunsPolicy(); 调用任务的run()绕过线程池执行
只能看到线程池最大线程数量 + 任务队列容量条结果, main方法帮我们执行了其他任务
当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。
使用volatile关键字会强制将修改的值立即写入主存! 后续再出详细的讲解
class Test { public static void main(String[] args) { //创建线程对象 Girl g = new Girl(); g.setName("女孩"); Boy b = new Boy(); b.setName("男孩"); //开启线程 g.start(); b.start(); } } public class Money { // 通过volatile关键字解决(修饰共享数据) // 强制要求每一次使用共享数据前, 先去堆看一下共享数据, 及时更新自己的[变量副本]为最新值 public static volatile int money = 100000; } public class Boy extends Thread{ @Override public void run() { try { Thread.sleep(10); // 修改金额 Money.money = 90000; } catch (InterruptedException e) { e.printStackTrace(); } } } public class Girl extends Thread{ @Override public void run() { while (Money.money == 100000){ } System.out.println("金额发生变动, 停止循环"); } }
volatile关键字不能保证原子性(同步锁可以),
volatile关键字只能保证当前线程栈中, 变量副本的值是新的
送100次100个冰淇淋代码示例: class Test { public static void main(String[] args) { // 创建MyAtomThread对象 MyAtomThread mt = new MyAtomThread(); // 送100次100个冰淇淋 for (int i = 1; i <= 100; i++) { Thread t = new Thread(mt); t.start(); } } } public class MyAtomThread implements Runnable { private int count = 0; // 冰淇淋的个数 @Override public void run() { // 送100个冰淇淋 for (int i = 1; i <= 100; i++) { /* 代码运行结果可能错误! 问题分析: count++不是一个原子性的操作, 每一步操作都有可能被抢执行权! 1.从共享数据中读取数据到本线程栈中 2.修改本线程栈中的变量副本 3.将本线程栈中的变量副本的值,赋值给共享数据 */ //Thread.sleep(10); count++; System.out.println(Thread.currentThread().getName() + "送了第" + count + "个冰激凌"); } } }
原子性是指是指在一次或者多次操作过程中,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉。即使在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰。我们先来看看哪些是原子操作,哪些不是原子操作,有一个直观的印象:int a = 10; //1
a++; //2
int b=a; //3
a = a+1; //4
上面这四个语句中只有第1个语句是原子操作,将10赋值给线程工作内存的变量a
语句2(a++),实际上包含了三个操作:1. 读取变量a的值;2:对a进行加一的操作;3.将计算后的值再赋值给变量a,而这三个操作无法构成原子操作。
对语句3,4的分析同理可得这两条语句不具备原子性。
在JDK1.5之后提供了一个在util包下的comcurrent包下的atomic包(原子包)
里面提供对各种类的原子操作
AtomicInteger常用方法
构造方法:
public AtomicInteger(); 初始值为 0
的新 AtomicInteger
public AtomicInteger(int initialValue); 给定初始值的新 AtomicInteger
成员方法:
int get(); 获取值
int getAndIncrement(); 以原子方式将当前值+1, 返回自增前的值
int incrementAndGet(); 以原子方式将当前值+1, 返回自增后的值
int addAndGet(int data); 以原子方式将当前值和参数相加, 返回相加结果
int getAndSet(int value); 以原子方式设置为参数的值, 返回旧值
public class Demo01_Constructor { public static void main(String[] args) { // 空参构造 public AtomicInteger(); AtomicInteger ac = new AtomicInteger(); System.out.println(ac); //0 // 带参构造 public AtomicInteger(int initialValue); AtomicInteger ac2 = new AtomicInteger(10); System.out.println(ac2); //10 } } public class Demo02_Method { public static void main(String[] args) { // 创建对象 AtomicInteger ac = new AtomicInteger(10); // int get(); 获取值 System.out.println(ac.get()); //获取值 10 // int getAndIncrement(); 以原子方式将当前值+1, 返回自增前的值 System.out.println(ac.getAndIncrement()); //返回自增前的值 10 System.out.println(ac.get()); //10 + 1 = 11 // int incrementAndGet(); 以原子方式将当前值+1, 返回自增后的值 System.out.println(ac.incrementAndGet()); //11 + 1 = 12 // int addAndGet(int data); 以原子方式将当前值和参数相加, 返回相加结果 System.out.println(ac.addAndGet(20)); //返回相加结果 32 // int getAndSet(int value); 以原子方式设置为参数的值, 返回旧值 System.out.println(ac.getAndSet(50)); //返回旧值 32 System.out.println(ac.get()); //设置为参数的值 50 } }
首先我们通过AtomicInteger将代码实现: class Test { public static void main(String[] args) { MyAtomThread mt = new MyAtomThread(); for (int i = 1; i <= 100; i++) { Thread t = new Thread(mt); t.start(); } } } public class MyAtomThread implements Runnable { //2. AtomicInteger(效率高,线程安全) AtomicInteger ac = new AtomicInteger(); //或者给0 @Override public void run() { for (int i = 1; i <= 100; i++) { // count++; //数据错误, 原因count++不具备原子性 // int incrementAndGet(); 以原子方式将当前值+1, 返回自增后的值 int count = ac.incrementAndGet(); System.out.println(Thread.currentThread().getName() + "送了第" + count + "个冰激凌"); } } } AtomicInteger原理: CAS + 自旋 有三个操作数据 (内存值V, 旧值A, 要修改的值B) 当旧值A == 内存值, 证明在当前线程操作时, 没有其他线程来过, 此时可以修改, 将V改为B 当旧值A != 内存值, 证明在当前线程操作时, 有其他线程来过, 此时不能修改, 进行自旋 自旋: 重新获取现在的最新内存值V, 继续进行上述判断 简单理解: 在修改共享数据时, 将修改前的旧值记录下来 如果现在的内存值, 和原来的旧值一样, 证明没有其他县城操作过内存值, 则可以修改内存值 如果现在的内存值, 和原来的旧值不一样, 证明有其他线程操作过内存值, 则不能修改内存值 继续获取最新的内存值, 再次进行上述操作(自旋)
源码解析: public AtomicInteger(int initialValue) { value = initialValue; } // 先自增, 然后返回自增后的结果 public final int incrementAndGet() { // this: 当前的AtomicInteger对象 // 1: 自增一次 // +1: 最后的就是自增后的结果 return U.getAndAddInt(this, VALUE, 1) + 1; } @HotSpotIntrinsicCandidate public final int getAndAddInt(Object o, long offset, int delta) { // o: 内存值 // v + delta: 修改后的值 // v: 旧值 int v; // do while: 自旋, 不断获取旧值 do { v = getIntVolatile(o, offset); // 判断条件: weakCompareAndSetInt方法比较内存中的值和旧值是否相等 // 情况1: 比较内存值和旧值是否相等, 相等就修改, 返回true结束循环 // 情况2: 比较内存值和旧值是否相等, 不相等, 不能修改, 返回false继续循环(自旋转) } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; }
synchronized和CAS的区别:
1.相同点:
在多线程的情况下, 都可以保证共享数据的安全性
2.不同点
synchronized是从最坏的角度出发, 认为每次获取数据的时候, 别人都有可能修改, 所以每次操作共享数据之前, 都会上锁 (悲观锁)
CAS时候从乐观的角度出发, 认为每次获取数据的时候, 别人都不会修改, 所以不会上锁. 只不过在修改共享数据的时候, 检查一下别人有没有操作过这个数据 (乐观锁)
如果操作了, 那么再次获取最新的值
如果没有操作, 那么直接修改共享数据的值
HashMap是线程不安全的, 在多线程环境下会存在问题
为了保证线程安全, 我们可以使用Hashtable, 但是Hashtable效率低
Hashtable底层是哈希表结构, 由数组 + 链表组成
数组默认长度16, 加载因子0.75 (存满12个要扩容)
链表是当计算当前元素要存入的位置有元素时, 先判断内容
如果一样则不存
如果不一样老元素挂在新元素下, 形成链表 (哈希桶)
Hashtable效率低的原因:
通过查看底层代码我们发现, 底层使用悲观锁(synchronized), 每一次操作都会讲整张表锁起来
HashMap: 线程不安全
Hashtable: 线程安全, 效率低 (底层悲观锁锁整张表)
ConcurrentHashMap: 线程安全, 效率高(分析JDK7和8的底层区别)
创建ConcurrentHashMap对象时:
创建一个长度为16的大数组, 加载因子是0.75 (Segment[])
创建一个长度为2的小数组, 将地址值赋值给0索引处, 其他索引位置都为null (HashEntry[])
0索引处数组是用来当做模板使用,当新元素添加进来会以0索引数组为模板来创建长度为2的小数组
添加元素时, 根据键的哈希值来计算出在大数组中的位置
如果为null, 按照模板创建小数组
创建完毕, 会二次哈希计算出在小数组中应存入的位置, 由于第一次都是null所以直接存入
如果不为null, 会二次哈希, 计算出在小数组中应存入的位置
如果小数组需要扩容, 则扩容为2倍 (存到索引1的地方)
如果不需要扩容, 则会判断小数组当前索引位置是否为null
如果为null代表没有元素, 直接存入
如果不为null代表有元素, 则根据equals方法比较属性值
一样则不存
不一样则将老元素挂在新元素下, 形成链表 (哈希桶)
综上所述, 如果这个大数组Segment[]存满了, 就是一个16*16的大哈希表
为什么效率高?
因为每一次操作只会锁小表 (小数组HashEntry[]), 不会锁大表
所以在JDK1.7之前, 某一时刻最多允许16个线程同时访问
相关快捷键:
Alt + 7: 底层中展示所有方法
Ctrl + Alt + Shift + U: 底层中展示继承结构
ConcurrentHashMap在JDK1.8底层分析:
结构: 哈希表 (数组 + 链表 + 红黑树)
线程安全: CAS机制 + synchronized同步代码块
1. 如果使用空参构造创建ConcurrentHashMap对象时, 则什么都不做 (查看空参构造及父类的空参)
2. 在第一次添加元素时 (调用put方法时) 创建哈希表 (initTable方法)
计算当前元素应存入的索引位置
如果为null, 代表没有元素, 则通过CAS算法, 将本节点添加到数组中
如果不为null, 代表有元素, 则利用volatile获得当前索引位置最新的节点地址挂在它下面, 形成链表, 链表长度大于等于8的时候, 自动转为红黑树
3. 每次操作, 会以链表或者树的头结点为锁对象, 配合悲观锁(synchronized) 保证多线程操作集合时的安全问题
CountDownLatch同步计数器,当计数器数值减为0时,所有受其影响而等待的线程将会被激活,这样保证模拟并发请求的真实性。
CountDownLatch应用场景
让一条线程等待其他线程执行完毕后再执行
CountDownLatch相关方法
1. CountDownLatch(int count); (构造方法)表示要等待的线程数量
2. public void await(); 让线程等待
3. public void countDown(); 表示当前线程执行完毕
案例: 使用代码实现, 妈妈等三个孩子吃饺子, 吃完收拾碗筷 妈妈 等待 await(); sout("收拾碗筷"); 孩子*3 补齐带参构造 sout(name+吃饺子); 说一声自己吃完了 countDown() 测试类 开启4条线程 创建CountDownLatch()对象传递给4条线程 代码示例 public class CountDownLatchTest { public static void main(String[] args) { //创建CountDownLatch()对象传递给4条线程 CountDownLatch countDownLatch = new CountDownLatch(3); //要等待三个(孩子)线程 //开启4条线程 Mother mother = new Mother(countDownLatch); mother.start(); Child01 c1 = new Child01(countDownLatch); Child02 c2 = new Child02(countDownLatch); Child03 c3 = new Child03(countDownLatch); c1.start(); c2.start(); c3.start(); } } //孩子1 class Child01 extends Thread { //补齐带参构造 private CountDownLatch countDownLatch; public Child01(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //打印结果 for (int i = 1; i < 10; i++) { System.out.println(getName() + "在吃饺子" + ",吃了" + i + "个"); } //说一声自己吃完了 countDownLatch.countDown(); } } //孩子2 class Child02 extends Thread { //补齐带参构造 private CountDownLatch countDownLatch; public Child02(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //打印结果 for (int i = 1; i < 15; i++) { System.out.println(getName() + "在吃饺子" + ",吃了" + i + "个"); } //说一声自己吃完了 countDownLatch.countDown(); } } //孩子3 class Child03 extends Thread { //补齐带参构造 private CountDownLatch countDownLatch; public Child03(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //打印结果 for (int i = 1; i < 20; i++) { System.out.println(getName() + "在吃饺子" + ",吃了" + i + "个"); } //说一声自己吃完了 countDownLatch.countDown(); } } //妈妈 class Mother extends Thread { //补齐带参构造 private CountDownLatch countDownLatch; public Mother(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //等待 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //打印结果 System.out.println("妈妈正在收拾碗筷~"); } }
通常用于限制可以访问某些资源(物理或逻辑的)的线程数目
也可以比作同行证,给一个或多个线程发同行证,到出口(或执行完毕)时再把通行证归还,再发给下一个或多个线程
Semaphore构造方法
Semaphore(int 最大通行数量);
创建具有给定的许可数和非公平的公平设置的 Semaphore
Semaphore(int
permits, boolean fair
);
创建具有给定的许可数和给定的公平设置的 Semaphore
Semaphore使用步骤分析
1. 需要有人管理这个通道 - 创建Semaphore对象
2. 有车子进来, 发放通行证 - acquire();发
3. 有车子出去, 收回通行证 - release();收
4. 如果通行证都发出去了, 那么只允许车子等待 - 自动完成
代码示例 (实现Runnable接口实现多线程) public class SemaphoreTest { public static void main(String[] args) { //创建MyRannable对象 MyRannable mr = new MyRannable(); //多次启动线程 for (int i = 1; i <= 50 ; i++) { new Thread(mr).start(); } } } class MyRannable implements Runnable{ //1. 需要有人管理这个通道 - 创建Semaphore对象 Semaphore semaphore = new Semaphore(2); //最大通行数量 @Override public void run() { //2. 有车子进来, 发放通行证 - acquire();发 try { semaphore.acquire(); System.out.println("获取通行证,车子进来!"); semaphore.release(); System.out.println("归还通行证,车子出去了!"); } catch (InterruptedException e) { e.printStackTrace(); } } }