来源:【尚硅谷】大厂必备技术之JUC并发编程2021最新版
在 Java 中,线程部分是一个重点,本篇文章说的 JUC 也是关于线程的。 JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包, JDK 1.5 开始出现的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xzXLRLL8-1627717457873)(assets2/1627185831660.png)]
**进程(Process)**是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。
**线程(thread)**是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
总结来说:
进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程—资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程—程序执行的最小单位。
java.lang.Thread.State
public enum State { // 新生 NEW, // 新生 RUNNABLE, // 阻塞 BLOCKED, //等待,死等 WAITING, //超时等待 TIMED_WAITING, // 阻塞 TERMINATED; }
串行表示所有任务都一一按先后顺序进行。串行意味着必须先装完一车柴才能运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤。
串行是一次只能取得一个任务,并执行这个任务。
并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU。
并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,并发的重点在于它是一种现象, 并发描述的是多进程同时运行的现象。但实际上,对于单核心CPU来说,同一时刻只能运行一个线程。所以,这里的"同时运行"表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占CPU的,而是执行一会停一会。
要解决大并发问题,通常是将大任务分解成多个小任务,由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。这可能会出现一些现象:
并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:春运抢票电商秒杀…
并行:多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yFS2JkgK-1627717457877)(assets2/1627187007847.png)]
管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现),但是这样并不能保证进程以设计的顺序执行。
JVM中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程(monitor)对象,管程(monitor)会随着java对象一同创建和销毁。
执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程。
用户线程:平时用到的普通线程,自定义线程。
守护线程:运行在后台,是一种特殊的线程,比如垃圾回收。
当主线程结束后,用户线程还在运行,JVM存活。
如果没有用户线程,都是守护线程,JVM结束。
package com.liuyh; //演示用户线程和守护线程 public class Main { public static void main(String[] args) { Thread aa = new Thread(() -> { // Thread.currentThread().isDaemon()判断线程是否是守护线程 System.out.println(Thread.currentThread().getName() + "::" + Thread.currentThread().isDaemon()); while (true) { } }, "aa"); aa.start(); System.out.println(Thread.currentThread().getName()+" over"); } }
运行结果:线程阻塞中
main over aa::false
修改上述代码,设置aa线程为守护线程
package com.liuyh; //演示用户线程和守护线程 public class Main { public static void main(String[] args) { Thread aa = new Thread(() -> { // Thread.currentThread().isDaemon()判断线程是否是守护线程 System.out.println(Thread.currentThread().getName() + "::" + Thread.currentThread().isDaemon()); while (true) { } }, "aa"); //设置守护线程 aa.setDaemon(true); aa.start(); System.out.println(Thread.currentThread().getName()+" over"); } }
运行结果:线程运行结束
main over aa::true Process finished with exit code 0
synchronized是Java中的关键字,是一种同步锁。它修饰的对象有以下几种:
修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象;
修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;
虽然可以使用synchronized来定义方法,但synchronized并不属于方法定义的一部分,因此,synchronized关键字不能被继承。如果在父类中的某个方法使用了synchronized关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上synchronized关键字才可以。当然,还可以在子类方法中调用父类中相应的方法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了。
修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象;
修改一个类,其作用的范围是synchronized后面括号括起来的部分,作用主的对象是这个类的所有对象。
package com.liuyh.sync; //第一步 创建资源类,定义属性和和操作方法 class Ticket { //票数 private int number = 30; //操作方法:卖票 public synchronized void sale() { //判断:是否有票 if(number > 0) { System.out.println(Thread.currentThread().getName()+" : 卖出:"+(number--)+" 剩下:"+number); } } } public class SaleTicket { //第二步 创建多个线程,调用资源类的操作方法 public static void main(String[] args) { //创建Ticket对象 Ticket ticket = new Ticket(); //创建三个线程 new Thread(new Runnable() { @Override public void run() { //调用卖票方法 for (int i = 0; i < 40; i++) { ticket.sale(); } } },"AA").start(); new Thread(new Runnable() { @Override public void run() { //调用卖票方法 for (int i = 0; i < 40; i++) { ticket.sale(); } } },"BB").start(); new Thread(new Runnable() { @Override public void run() { //调用卖票方法 for (int i = 0; i < 40; i++) { ticket.sale(); } } },"CC").start(); } }
如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:
那么如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。
因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过Lock就可以办到。
Lock锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock提供了比synchronized更多的功能。
Lock与的Synchronized区别
package java.util.concurrent.locks; import java.util.concurrent.TimeUnit; public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}
块中进行,并且将释放锁的操作放在finally
块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock
来进行同步的话,是以下面这种形式去使用的:
Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); }
关键字synchronized与wait()/notify()这两个方法一起使用可以实现等待/通知模式,Lock锁的newContition()方法返回Condition对象,Condition类也可以实现等待/通知模式。
用notify()通知时,JVM会随机唤醒某个等待的线程,使用Condition类可以进行选择性通知,Condition比较常用的两个方法:
注意:在调用Condition的await()/signal()方法前,也需要线程持有相关的Lock锁,调用await()后线程会释放这个锁,在singal()调用后会从当前Condition对象的等待队列中,唤醒一个线程,唤醒的线程尝试获得锁,一旦获得锁成功就继续执行。
ReentrantLock,意思是“可重入锁”,关于可重入锁的概念将在后面讲述。
ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。下面通过一些实例看具体看一下如何使用。
package com.liuyh.lock; import java.util.concurrent.locks.ReentrantLock; //第一步 创建资源类,定义属性和和操作方法 class LTicket { //票数量 private int number = 30; //创建可重入锁 private final ReentrantLock lock = new ReentrantLock(true); //卖票方法 public void sale() { //上锁 lock.lock(); try { //判断是否有票 if(number > 0) { System.out.println(Thread.currentThread().getName()+" :卖出"+(number--)+" 剩余:"+number); } } finally { //解锁 lock.unlock(); } } } public class LSaleTicket { //第二步 创建多个线程,调用资源类的操作方法 //创建三个线程 public static void main(String[] args) { LTicket ticket = new LTicket(); new Thread(()-> { for (int i = 0; i < 40; i++) { ticket.sale(); } },"AA").start(); new Thread(()-> { for (int i = 0; i < 40; i++) { ticket.sale(); } },"BB").start(); new Thread(()-> { for (int i = 0; i < 40; i++) { ticket.sale(); } },"CC").start(); } }
ReadWriteLock也是一个接口,在它里面只定义了两个方法:
package java.util.concurrent.locks; public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口。
ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。
假如有多个线程要同时进行读操作的话,先看一下synchronized达到的效果:
package com.liuyh.readwrite; public class Test { public static void main(String[] args) { final Test test = new Test(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); } public synchronized void get(Thread thread) { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName() + "正在进行读操作"); } System.out.println(thread.getName() + "读操作完毕"); } }
而改成用读写锁的话:
package com.liuyh.readwrite; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test2 { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final Test test = new Test(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); } public void get(Thread thread) { rwl.readLock().lock(); try{ long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName() + "正在进行读操作"); } System.out.println(thread.getName() + "读操作完毕"); }finally { rwl.readLock().unlock(); } } }
说明thread1和thread2在同时进行读操作。这样就大大提升了读操作的效率。
注意:
Lock和synchronized有以下几点不同:
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。
线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KZGK7x4U-1627717457880)(assets2/1627194056006.png)]
场景—两个线程,一个线程对当前数值加1,另一个线程对当前数值减1,要求用线程间通信
package com.liuyh.sync; //第一步 创建资源类,定义属性和操作方法 class Share { //初始值 private int number = 0; //+1的方法 public synchronized void incr() throws InterruptedException { //第二步 判断 干活 通知 while(number != 0) { //判断number值是否是0,如果不是0,等待 此处使用while防止虚假唤醒 this.wait(); //在哪里睡,就在哪里醒 } //如果number值是0,就+1操作 number++; System.out.println(Thread.currentThread().getName()+" :: "+number); //通知其他线程 this.notifyAll(); } //-1的方法 public synchronized void decr() throws InterruptedException { //判断 while(number != 1) { this.wait(); } //干活 number--; System.out.println(Thread.currentThread().getName()+" :: "+number); //通知其他线程 this.notifyAll(); } } public class ThreadDemo1 { //第三步 创建多个线程,调用资源类的操作方法 public static void main(String[] args) { Share share = new Share(); //创建线程 new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.incr(); //+1 } catch (InterruptedException e) { e.printStackTrace(); } } },"AA").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.decr(); //-1 } catch (InterruptedException e) { e.printStackTrace(); } } },"BB").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.incr(); //+1 } catch (InterruptedException e) { e.printStackTrace(); } } },"CC").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.decr(); //-1 } catch (InterruptedException e) { e.printStackTrace(); } } },"DD").start(); } }
package com.liuyh.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //第一步 创建资源类,定义属性和操作方法 class Share { private int number = 0; //创建Lock private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //+1 public void incr() throws InterruptedException { //上锁 lock.lock(); try { //判断 while (number != 0) { condition.await(); } //干活 number++; System.out.println(Thread.currentThread().getName()+" :: "+number); //通知 condition.signalAll(); }finally { //解锁 lock.unlock(); } } //-1 public void decr() throws InterruptedException { lock.lock(); try { while(number != 1) { condition.await(); } number--; System.out.println(Thread.currentThread().getName()+" :: "+number); condition.signalAll(); }finally { lock.unlock(); } } } public class ThreadDemo2 { public static void main(String[] args) { Share share = new Share(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.incr(); } catch (InterruptedException e) { e.printStackTrace(); } } },"AA").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.decr(); } catch (InterruptedException e) { e.printStackTrace(); } } },"BB").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.incr(); } catch (InterruptedException e) { e.printStackTrace(); } } },"CC").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { share.decr(); } catch (InterruptedException e) { e.printStackTrace(); } } },"DD").start(); } }
问题: A线程打印5次A,B线程打印10次B,C线程打印15次C,按照此顺序循环10轮
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xOma4aMU-1627717457883)(assets2/1627194322413.png)]
package com.liuyh.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //第一步 创建资源类 class ShareResource { //定义标志位 private int flag = 1; // 1 AA 2 BB 3 CC //创建Lock锁 private Lock lock = new ReentrantLock(); //创建三个condition private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); //打印5次,参数第几轮 public void print5(int loop) throws InterruptedException { //上锁 lock.lock(); try { //判断 while(flag != 1) { //等待 c1.await(); } //干活 for (int i = 1; i <=5; i++) { System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop); } //通知 flag = 2; //修改标志位 2 c2.signal(); //通知BB线程 }finally { //释放锁 lock.unlock(); } } //打印10次,参数第几轮 public void print10(int loop) throws InterruptedException { lock.lock(); try { while(flag != 2) { c2.await(); } for (int i = 1; i <=10; i++) { System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop); } //修改标志位 flag = 3; //通知CC线程 c3.signal(); }finally { lock.unlock(); } } //打印15次,参数第几轮 public void print15(int loop) throws InterruptedException { lock.lock(); try { while(flag != 3) { c3.await(); } for (int i = 1; i <=15; i++) { System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop); } //修改标志位 flag = 1; //通知AA线程 c1.signal(); }finally { lock.unlock(); } } } public class ThreadDemo3 { public static void main(String[] args) { ShareResource shareResource = new ShareResource(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { shareResource.print5(i); } catch (InterruptedException e) { e.printStackTrace(); } } },"AA").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { shareResource.print10(i); } catch (InterruptedException e) { e.printStackTrace(); } } },"BB").start(); new Thread(()->{ for (int i = 1; i <=10; i++) { try { shareResource.print15(i); } catch (InterruptedException e) { e.printStackTrace(); } } },"CC").start(); } }
package com.liuyh.lock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * list集合线程不安全 */ public class ThreadDemo4 { public static void main(String[] args) { //创建ArrayList集合 List<String> list = new ArrayList<>(); for (int i = 0; i <30; i++) { new Thread(()->{ //向集合添加内容 list.add(UUID.randomUUID().toString().substring(0,8)); //从集合获取内容 System.out.println(list); },String.valueOf(i)).start(); } } }
运行时产生异常java.util.ConcurrentModificationException
问题:为什么会出现并发修改异常?
查看ArrayList的add方法源码,add
方法没有synchronized
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ public boolean add(E e) { ensureCapacityInternal(size + 1); // Increments modCount!! elementData[size++] = e; return true; }
那么我们如何去解决List类型的线程安全问题?
Vector是矢量队列,它是JDK1.0版本添加的类。继承于AbstractList,实现了List,RandomAccess,Cloneable这些接口。Vector继承了AbstractList,实现了List;所以,它是一个队列,支持相关的添加、删除、修改、遍历等功能。Vector实现了RandmoAccess接口,即提供了随机访问功能。
RandmoAccess是java中用来被List实现,为List提供快速访问功能的。在Vector中,我们即可以通过元素的序号快速获取元素对象;这就是快速随机访问。Vector实现了Cloneable接口,即实现clone()函数。它能被克隆。
和ArrayList不同,Vector中的操作是线程安全的。
package com.liuyh.lock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * list集合线程安全 */ public class ThreadDemo4 { public static void main(String[] args) { // Vector解决 List<String> list = new Vector<>(); for (int i = 0; i <30; i++) { new Thread(()->{ //向集合添加内容 list.add(UUID.randomUUID().toString().substring(0,8)); //从集合获取内容 System.out.println(list); },String.valueOf(i)).start(); } } }
现在没有运行出现并发异常,为什么?
查看Vector的add方法,add方法被synchronized同步修饰,线程安全!因此没有并发异常
public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; }
Collections提供了方法synchronizedList保证list是同步线程安全的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBzbjTCc-1627717457886)(assets2/1627196615122.png)]
package com.liuyh.lock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * list集合线程安全 */ public class ThreadDemo4 { public static void main(String[] args) { //Collections解决 List<String> list = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i <30; i++) { new Thread(()->{ //向集合添加内容 list.add(UUID.randomUUID().toString().substring(0,8)); //从集合获取内容 System.out.println(list); },String.valueOf(i)).start(); } } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7s04IZPW-1627717457887)(assets2/1627197549419.png)]
package com.liuyh.lock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; /** * list集合线程安全 */ public class ThreadDemo4 { public static void main(String[] args) { // CopyOnWriteArrayList解决 List<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i <30; i++) { new Thread(()->{ //向集合添加内容 list.add(UUID.randomUUID().toString().substring(0,8)); //从集合获取内容 System.out.println(list); },String.valueOf(i)).start(); } } }
首先我们对CopyOnWriteArrayList进行学习,其特点如下:
它相当于线程安全的ArrayList。和ArrayList一样,它是个可变数组;但是和ArrayList不同的时,它具有以下特性:
原理分析:
独占锁效率低:采用读写分离思想解决
写线程获取到锁,其他写线程阻塞
复制思想:
当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来得及写会内存,其他的线程就会读到了脏数据。
这就是CopyOnWriteArrayList的思想和原理。就是拷贝一份。
原因分析(重点):动态数组与线程安全
下面从“动态数组”和“线程安全”两个方面进一步对CopyOnWriteArrayList的原理进行说明。
“动态数组”机制
它内部有个“volatile数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”,这就是它叫做CopyOnWriteArrayList的原因。
由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很低;但是单单只是进行遍历查找的话,效率比较高。
“线程安全”机制
Set 和 List 同理可得:多线程情况下,普通的 Set 集合是线程不安全的;
解决方案有两种;
package com.liuyh.lock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; /** * list集合线程安全 */ public class ThreadDemo4 { public static void main(String[] args) { //演示Hashset // Set<String> set = new HashSet<>(); Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 0; i <30; i++) { new Thread(()->{ //向集合添加内容 set.add(UUID.randomUUID().toString().substring(0,8)); //从集合获取内容 System.out.println(set); },String.valueOf(i)).start(); } } }
package com.liuyh.lock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; /** * list集合线程安全 */ public class ThreadDemo4 { public static void main(String[] args) { //演示HashMap // Map<String,String> map = new HashMap<>(); Map<String,String> map = new ConcurrentHashMap<>(); for (int i = 0; i <30; i++) { String key = String.valueOf(i); new Thread(()->{ //向集合添加内容 map.put(key,UUID.randomUUID().toString().substring(0,8)); //从集合获取内容 System.out.println(map); },String.valueOf(i)).start(); } } }
线程安全与线程不安全集合
集合类型中存在线程安全与线程不安全的两种,常见例如:
ArrayList-----Vector
HashMap----HashTable
但是以上都是通过synchronized关键字实现,效率较低。
Collections构建的线程安全集合
java.util.concurrent并发包下
CopyOnWriteArrayList
、CopyOnWriteArraySet
类型,通过动态数组与线程安全各方面保证线程安全。
package com.liuyh.sync; import java.util.concurrent.TimeUnit; class Phone { public synchronized void sendSMS() throws Exception { //停留4秒 TimeUnit.SECONDS.sleep(4); System.out.println("------sendSMS"); } public synchronized void sendEmail() throws Exception { System.out.println("------sendEmail"); } public void getHello() { System.out.println("------getHello"); } } /** * @Description: 8锁 * 1 标准访问,先打印短信还是邮件 (//TimeUnit.SECONDS.sleep(4) --注释; phone.sendEmail(); --打开) ------sendSMS ------sendEmail 2 停4秒在短信方法内,先打印短信还是邮件 (TimeUnit.SECONDS.sleep(4) --打开; phone.sendEmail(); --打开) ------sendSMS ------sendEmail 3 新增普通的hello方法,是先打短信还是hello(TimeUnit.SECONDS.sleep(4) --打开; phone.sendEmail(); --注释 phone.getHello();--打开) ------getHello ------sendSMS 4 现在有两部手机,先打印短信还是邮件(TimeUnit.SECONDS.sleep(4) --打开; //Phone phone2 = new Phone(); --打开; phone2.sendEmail(); --打开) ------sendEmail ------sendSMS 5 两个静态同步方法(sendSMS与sendEmail方法添加static关键字),1部手机,先打印短信还是邮件(TimeUnit.SECONDS.sleep(4) --打开; //Phone phone2 = new Phone(); --注释; phone.sendEmail(); --打开) ------sendSMS ------sendEmail 6 两个静态同步方法(sendSMS与sendEmail方法添加static关键字),2部手机,先打印短信还是邮件(TimeUnit.SECONDS.sleep(4) --打开; //Phone phone2 = new Phone(); --打开; phone2.sendEmail(); --打开) ------sendSMS ------sendEmail 7 1个静态同步方法(sendSMS添加static关键字),1个普通同步方法(sendEmail方法无static),1部手机,先打印短信还是邮件(TimeUnit.SECONDS.sleep(4) --打开; phone.sendEmail(); --打开) ------sendEmail ------sendSMS 8 1个静态同步方法(sendSMS添加static关键字),1个普通同步方法(sendEmail方法无static),2部手机,先打印短信还是邮件(TimeUnit.SECONDS.sleep(4) --打开; phone2.sendEmail(); --打开) ------sendEmail ------sendSMS */ public class Lock_8 { public static void main(String[] args) throws Exception { Phone phone = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "AA").start(); //Thread.sleep(100); new Thread(() -> { try { phone.sendEmail(); // phone.getHello(); //phone2.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "BB").start(); } }
结论:
一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法
锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法。
加个普通方法后发现和同步锁无关。
换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象。
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tlsIbYmL-1627717457889)(assets2/1627208012472.png)]
package com.liuyh.sync; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //可重入锁 public class SyncLockDemo { public synchronized void add() { add(); } public static void main(String[] args) { //Lock演示可重入锁 Lock lock = new ReentrantLock(); //创建线程 new Thread(()->{ try { //上锁 lock.lock(); System.out.println(Thread.currentThread().getName()+" 外层"); try { //上锁 lock.lock(); System.out.println(Thread.currentThread().getName()+" 内层"); }finally { //释放锁 lock.unlock(); } }finally { //释放做 lock.unlock(); } },"t1").start(); //创建新线程 new Thread(()->{ lock.lock(); System.out.println("aaaa"); lock.unlock(); },"aa").start(); // new SyncLockDemo().add(); // synchronized // Object o = new Object(); // new Thread(()->{ // synchronized(o) { // System.out.println(Thread.currentThread().getName()+" 外层"); // // synchronized (o) { // System.out.println(Thread.currentThread().getName()+" 中层"); // // synchronized (o) { // System.out.println(Thread.currentThread().getName()+" 内层"); // } // } // } // // },"t1").start(); } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rMCO6NV6-1627717457891)(assets2/1627209066445.png)]
package com.liuyh.sync; import java.util.concurrent.TimeUnit; /** * 演示死锁 */ public class DeadLock { //创建两个对象 static Object a = new Object(); static Object b = new Object(); public static void main(String[] args) { new Thread(()->{ synchronized (a) { System.out.println(Thread.currentThread().getName()+" 持有锁a,试图获取锁b"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (b) { System.out.println(Thread.currentThread().getName()+" 获取锁b"); } } },"A").start(); new Thread(()->{ synchronized (b) { System.out.println(Thread.currentThread().getName()+" 持有锁b,试图获取锁a"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a) { System.out.println(Thread.currentThread().getName()+" 获取锁a"); } } },"B").start(); } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nTMI3e2v-1627717457892)(assets2/1627210273866.png)]
目前我们学习了有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,
Java中提供了Callable接口。
创建线程的第三种方案—Callable接口
Callable接口的特点如下(重点)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mfZ4rxpL-1627717457894)(assets2/1627211012396.png)]
Java库具有具体的FutureTask类型,该类型实现Runnable和Future,并方便地将两种功能组合在一起。可以通过为其构造函数提供Callable来创建FutureTask。然后,将FutureTask对象提供给Thread的构造函数以创建Thread对象。因此,间接地使用Callable创建线程。
核心原理:(重点)
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成
package com.liuyh.callable; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; //比较两个接口 //实现Runnable接口 class MyThread1 implements Runnable { @Override public void run() { } } //实现Callable接口 class MyThread2 implements Callable { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName()+" come in callable"); return 200; } } public class Demo1 { public static void main(String[] args) throws ExecutionException, InterruptedException { //Runnable接口创建线程 new Thread(new MyThread1(),"AA").start(); //Callable接口,报错 // new Thread(new MyThread2(),"BB").start(); //FutureTask FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2()); //lam表达式 FutureTask<Integer> futureTask2 = new FutureTask<>(()->{ System.out.println(Thread.currentThread().getName()+" come in callable"); return 1024; }); //创建一个线程 new Thread(futureTask2,"lucy").start(); new Thread(futureTask1,"mary").start(); // while(!futureTask2.isDone()) { // System.out.println("wait....."); // } //调用FutureTask的get方法 System.out.println(futureTask2.get()); System.out.println(futureTask1.get()); System.out.println(Thread.currentThread().getName()+" come over"); //FutureTask原理 未来任务 /** * 1、老师上课,口渴了,去买水不合适,讲课线程继续。 * 单开启线程找班上班长帮我买水,把水买回来,需要时候直接get水 * * 2、4个同学, 1同学 1+2...+5 , 2同学 10+11+12....+50, 3同学 60+61+62, 4同学 100+200 * 第2个同学计算量比较大, * FutureTask单开启线程给2同学计算,先汇总 1 3 4 ,最后等2同学计算位完成,统一汇总 * * 3、考试,做会做的题目,最后看不会做的题目 * * 汇总一次 * */ } }
当call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用Future对象。
将Future视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable返回)。Future基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写5种方法,这里列出了重要的方法,如下:
public boolean cancel(boolean mayInterrupt)
:用于停止任务。
如果尚未启动,它将停止任务。如果已启动,则仅在mayInterrupt为true时才会中断任务。
V get() throws InterruptedException, ExecutionException;
:用于获取任务的结果。
如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。
public boolean isDone()
:如果任务完成,则返回true,否则返回false
可以看到Callable和Future做两件事Callable与Runnable类似,因为它封装了要在另一个线程上运行的任务,而Future用于存储从另一个线程获得的结果。实际上,future也可以与Runnable一起使用。
要创建线程,需要Runnable。为了获得结果,需要Future。
JUC中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时Lock锁的频繁操作。这三种辅助类为:
CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。
场景:6个同学陆续离开教室后值班同学才可以关门。
package com.liuyh.juc; import java.util.concurrent.CountDownLatch; //演示 CountDownLatch public class CountDownLatchDemo { //6个同学陆续离开教室之后,班长锁门 public static void main(String[] args) throws InterruptedException { //创建CountDownLatch对象,设置初始值 CountDownLatch countDownLatch = new CountDownLatch(6); //6个同学陆续离开教室之后 for (int i = 1; i <=6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" 号同学离开了教室"); //计数 -1 countDownLatch.countDown(); },String.valueOf(i)).start(); } //等待 countDownLatch.await(); System.out.println(Thread.currentThread().getName()+" 班长锁门走人了"); } }
cyclic [ˈsaɪklɪk]:adj. 循环的;周期的
barrier [ˈbæriər] :n 障碍;屏障;阻力;关卡;分界线;隔阂
CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后
的语句。可以将CyclicBarrier理解为加1操作。
场景:集齐7颗龙珠就可以召唤神龙
package com.liuyh.juc; import java.util.concurrent.CyclicBarrier; //集齐7颗龙珠就可以召唤神龙 public class CyclicBarrierDemo { //创建固定值 private static final int NUMBER = 7; public static void main(String[] args) { //创建CyclicBarrier CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER,()->{ System.out.println("*****集齐7颗龙珠就可以召唤神龙"); }); //集齐七颗龙珠过程 for (int i = 1; i <=7; i++) { new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" 星龙被收集到了"); //等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
semaphore [ˈseməfɔːr]
Semaphore的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用acquire方法获得许可证,release方法释放许可。
场景:抢车位,6部汽车3个停车位
package com.liuyh.juc; import java.util.Random; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; //6辆汽车,停3个车位 public class SemaphoreDemo { public static void main(String[] args) { //创建Semaphore,设置许可数量 Semaphore semaphore = new Semaphore(3); //模拟6辆汽车 for (int i = 1; i <=6; i++) { new Thread(()->{ try { //抢占 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+" 抢到了车位"); //设置随机停车时间 TimeUnit.SECONDS.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName()+" ------离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放 semaphore.release(); } },String.valueOf(i)).start(); } } }
悲观锁与乐观锁
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TpsMiloQ-1627717457896)(assets2/1627712750560.png)]
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w4rciyTN-1627717457897)(assets2/1627221547075.png)]
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。
而读写锁有以下三个重要的特性:
reentrant [rɪˈentrənt] :可重入的;可再入的;可重新进入;可重入程序
ReentrantReadWriteLock类的整体结构
package java.util.concurrent.locks; import java.util.concurrent.TimeUnit; import java.util.Collection; public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; /** Inner class providing readlock 读锁*/ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock 写锁*/ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; /** * Creates a new {@code ReentrantReadWriteLock} with * default (nonfair) ordering properties. 使用默认(非公平)的排序属性创建一个新的ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy 使用给定的公平策略创建一个新的ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /**返回用于写入操作的锁*/ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /**返回用于读取操作的锁*/ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } /** * Synchronization implementation for ReentrantReadWriteLock. * Subclassed into fair and nonfair versions. */ abstract static class Sync extends AbstractQueuedSynchronizer {} /** * Nonfair version of Sync */ static final class NonfairSync extends Sync {} /** * Fair version of Sync */ static final class FairSync extends Sync {} /** * The lock returned by method {@link ReentrantReadWriteLock#readLock}. */ public static class ReadLock implements Lock, java.io.Serializable {} /** * The lock returned by method {@link ReentrantReadWriteLock#writeLock}. */ public static class WriteLock implements Lock, java.io.Serializable {} }
可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。
场景:使用ReentrantReadWriteLock对一个hashmap进行读和写操作
package com.liuyh.readwrite; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; //资源类 class MyCache { //创建map集合 private volatile Map<String,Object> map = new HashMap<>(); //创建读写锁对象 private ReadWriteLock rwLock = new ReentrantReadWriteLock(); //放数据 public void put(String key,Object value) { //添加写锁 rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+" 正在写操作"+key); //暂停一会 TimeUnit.MICROSECONDS.sleep(300); //放数据 map.put(key,value); System.out.println(Thread.currentThread().getName()+" 写完了"+key); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放写锁 rwLock.writeLock().unlock(); } } //取数据 public Object get(String key) { //添加读锁 rwLock.readLock().lock(); Object result = null; try { System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key); //暂停一会 TimeUnit.MICROSECONDS.sleep(300); result = map.get(key); System.out.println(Thread.currentThread().getName()+" 取完了"+key); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放读锁 rwLock.readLock().unlock(); } return result; } } public class ReadWriteLockDemo { public static void main(String[] args) throws InterruptedException { MyCache myCache = new MyCache(); //创建线程放数据 for (int i = 1; i <=5; i++) { final int num = i; new Thread(()->{ myCache.put(num+"",num+""); },String.valueOf(i)).start(); } TimeUnit.MICROSECONDS.sleep(300); //创建线程取数据 for (int i = 1; i <=5; i++) { final int num = i; new Thread(()->{ myCache.get(num+""); },String.valueOf(i)).start(); } } }
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
原因:当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u6vjHdWy-1627717457899)(assets2/1627221572057.png)]
当队列是空的,从队列中获取元素的操作将会被阻塞。
当队列是满的,从队列中添加元素的操作将会被阻塞。
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。
常用的队列主要有以下两种:
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。
为什么需要BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
方式 | 抛出异常 | 不会抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(timenum,timeUnit) |
移出 | remove | poll | take | poll(timenum,timeUnit) |
判断队首元素 | element | peek | - | - |
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BSQAC5BM-1627717457901)(assets2/1627698753422.png)]
BlockingQueue 的核心方法:
1.放入数据
2.获取数据
package com.liuyh.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; //阻塞队列 public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { //创建阻塞队列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //第一组 // System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //System.out.println(blockingQueue.element()); //System.out.println(blockingQueue.add("w")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //第二组 // System.out.println(blockingQueue.offer("a")); // System.out.println(blockingQueue.offer("b")); // System.out.println(blockingQueue.offer("c")); // System.out.println(blockingQueue.offer("www")); // // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); //第三组 // blockingQueue.put("a"); // blockingQueue.put("b"); // blockingQueue.put("c"); // //blockingQueue.put("w"); // // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); //第四组 System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS)); } }
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外, ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析, ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。 Doug Lea 之 所以没这样去做,也许是因为ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其 在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元 素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC 的影 响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制 对象的内部锁是否采用公平锁,默认采用非公平锁。
一句话总结: 由数组结构组成的有界阻塞队列。
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回; 只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过 构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份 } } 数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。 而LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生 产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发 的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
一句话总结: 由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列。
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到 该元素。 DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
一句话总结: 使用优先级队列实现的延迟无界阻塞队列。
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来 决定), 但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而 只会在没有可消费的数据时,阻塞数据的消费者。 因此使用的时候要特别注意, 生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。 在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。
一句话总结: 支持优先级排序的无界阻塞队列。
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得 产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
一句话总结: 不存储元素的阻塞队列,也即单个元素的队列。
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列, LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如 果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
一句话总结: 由链表组成的无界阻塞队列。
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队 列的两端插入和移除元素。
对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况
一句话总结: 由链表组成的双向阻塞队列。
线程池(英语: thread pool):一种线程使用模式。线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
例子: 10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球, CPU 需 要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换 效率高。
线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量, 超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K3cmESeh-1627717457902)(assets2/1627701005951.png)]
本次介绍 5 种类型的线程池
线程池中,有三个重要的参数,决定影响了拒绝策略: corePoolSize - 核心线 程数,也即最小的线程数。 workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻 塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话, 当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程。
特点:
创建方式:
/** * corePoolSize 线程池的核心线程数 * maximumPoolSize 能容纳的最大线程数 * keepAliveTime 空闲线程存活时间 * unit 存活的时间单位 * workQueue 存放提交但未执行任务的队列 * threadFactory 创建线程的工厂类:可以省略 * handler 等待队列满后的拒绝策略:可以省略 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较 短,任务多的场景。
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这 些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线 程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中 等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线 程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池 中的线程将一直存在。
特征:
创建方式:
/** * 固定长度线程池 * @return */ public static ExecutorService newFixedThreadPool(){ /** * corePoolSize 线程池的核心线程数 * maximumPoolSize 能容纳的最大线程数 * keepAliveTime 空闲线程存活时间 * unit 存活的时间单位 * workQueue 存放提交但未执行任务的队列 * threadFactory 创建线程的工厂类:可以省略 * handler 等待队列满后的拒绝策略:可以省略 */ return new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景。
作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该 线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程, 那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即 可使用其他的线程。
特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
创建方式:
/** * 单一线程池 * @return */ public static ExecutorService newSingleThreadExecutor(){ /** * corePoolSize 线程池的核心线程数 * maximumPoolSize 能容纳的最大线程数 * keepAliveTime 空闲线程存活时间 * unit 存活的时间单位 * workQueue 存放提交但未执行任务的队列 * threadFactory 创建线程的工厂类:可以省略 * handler 等待队列满后的拒绝策略:可以省略 */ return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景。
作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参 数,最大线程数为整形的最大数的线程池。
特征: (1) 线程池中具有指定数量的线程,即便是空线程也将保留 (2)可定时或者 延迟执行线程活动
创建方式:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
场景: 适用于需要多个后台线程执行周期任务的场景。
jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务。
创建方式:
public static ExecutorService newWorkStealingPool(int parallelism) { /** * parallelism:并行级别,通常默认为 JVM 可用的处理器个数 * factory:用于创建 ForkJoinPool 中使用的线程。 * handler:用于处理工作线程未处理的异常,默认为 null * asyncMode:用于控制WorkQueue 的工作模式:队列---反队列 */ return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
场景: 适用于大耗时,可并行执行的场景。
场景: 火车站 3 个售票口, 10 个用户买票
package com.liuyh.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; //演示线程池三种常用分类 public class ThreadPoolDemo1 { public static void main(String[] args) { //一池五线程 ExecutorService threadPool1 = Executors.newFixedThreadPool(5); //5个窗口 //一池一线程 ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); //一个窗口 //一池可扩容线程 ExecutorService threadPool3 = Executors.newCachedThreadPool(); //10个顾客请求 try { for (int i = 1; i <=10; i++) { //执行 threadPool3.execute(()->{ System.out.println(Thread.currentThread().getName()+" 办理业务"); }); } }catch (Exception e) { e.printStackTrace(); }finally { //关闭 threadPool3.shutdown(); } } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BMdE80iH-1627717457904)(assets2/1627703748220.png)]
在创建了线程池后,线程池中的线程数为零
当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程
池会启动饱和拒绝策略来执行。
当一个线程完成任务时,它会从队列中取下一个任务来执行
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
4.2所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6qRRGn7m-1627717457905)(assets2/1627703906005.png)]
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor的7个参数,自定义线程池。
创建线程池推荐适用 ThreadPoolExecutor 及其 7个参数手动创建
o corePoolSize 线程池的核心线程数
o maximumPoolSize 能容纳的最大线程数
o keepAliveTime 空闲线程存活时间
o unit 存活的时间单位
o workQueue 存放提交但未执行任务的队列
o threadFactory 创建线程的工厂类
o handler 等待队列满后的拒绝策略
为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BztRdiR8-1627717457906)(assets2/1627704019301.png)]
Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子 任务结果合并成最后的计算结果,并进行输出。 Fork/Join 框架要完成两件事情:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3DQ89ByA-1627717457907)(assets2/1627714378271.png)]
在 Java 的 Fork/Join 框架中,使用两个类完成上述操作
Fork/Join 框架的实现原理
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而ForkJoinWorkerThread 负责执行这些任务。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UZZG9FCY-1627717457908)(assets2/1627714825430.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y9t2IvLW-1627717457909)(assets2/1627715003453.png)]
Fork 方法的实现原理: 当我们调用ForkJoinTask的 fork 方法时,程序会把 任务放在 ForkJoinWorkerThread 的 pushTask的workQueue 中,异步地执行这个任务,然后立即返回结果。
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回 什么结果,任务状态有 4 种:
已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出 现异常(EXCEPTIONAL)
让我们分析一下 doJoin 方法的实现
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
在doJoin()方法流程如下:
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的
getException 方法获取异常。
getException 方法返回Throwable 对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回 null。
场景: 生成一个计算任务,计算 1+2+3 …+1000,==每 100 个数切分一个子任务
package com.liuyh.forkjoin; import java.util.concurrent.*; class MyTask extends RecursiveTask<Integer> { //拆分差值不能超过10,计算10以内运算 private static final Integer VALUE = 10; private int begin ;//拆分开始值 private int end;//拆分结束值 private int result ; //返回结果 //创建有参数构造 public MyTask(int begin,int end) { this.begin = begin; this.end = end; } //拆分和合并过程 @Override protected Integer compute() { //判断相加两个数值是否大于10 if((end-begin)<=VALUE) { //相加操作 for (int i = begin; i <=end; i++) { result = result+i; } } else {//进一步拆分 //获取中间值 int middle = (begin+end)/2; //拆分左边 MyTask task01 = new MyTask(begin,middle); //拆分右边 MyTask task02 = new MyTask(middle+1,end); //调用方法拆分 task01.fork(); task02.fork(); //合并结果 result = task01.join()+task02.join(); } return result; } } public class ForkJoinDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建MyTask对象 MyTask myTask = new MyTask(0,100); //创建分支合并池对象 ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask); //获取最终合并之后结果 Integer result = forkJoinTask.get(); System.out.println(result); //关闭池对象 forkJoinPool.shutdown(); } }
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞, 可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可 以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future 接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类。
在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提 交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:
不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
不支持进一步的非阻塞调用
通过 Future 的get方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能
不支持链式调用
对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。
不支持多个 Future 合并
比如我们有 10 个 Future 并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过 Future 实现的。
不支持异常处理
Future 的API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。
场景:主线程里面创建一个 CompletableFuture, 然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。
package com.liuyh.completable; import java.util.concurrent.CompletableFuture; //异步调用和同步调用 public class CompletableFutureDemo { public static void main(String[] args) throws Exception { //同步调用 CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread().getName()+" : CompletableFuture1"); }); completableFuture1.get(); //mq消息队列 //异步调用 CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+" : CompletableFuture2"); //模拟异常 int i = 10/0; return 1024; }); completableFuture2.whenComplete((t,u)->{ System.out.println("------t="+t); System.out.println("------u="+u); }).get(); } }