创建线程的三种方式
继承thread类
- 重写run()方法,编写线程执行体
- 创建线程对象,调用start()方法启动线程
实现Runable接口
实现Callable接口(了解)
- 需要返回值类型
- 重写call方法,需要抛出异常
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class CallableTest { public static void main(String[] args) { new Thread().start(); //启动callable CallableThread thread = new CallableThread(); //适配类 FutureTask futureTask = new FutureTask(thread); //结果会被缓存,效率高 new Thread(futureTask,"A").start(); new Thread(futureTask,"B").start(); try { //callable的返回值,这个get方法可能会产生阻塞,把他放在最后 Object o = futureTask.get(); //或者使用异步通信来处理 System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } class CallableThread implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("测试callable"); return 1; } }PS:有缓存,结果可能需要等待会阻塞
thread.setDaemon(true)
- 线程分为
用户线程
和守护线程
- 虚拟机必须确保用户线程执行完毕
- 虚拟机不用等待守护线程执行完毕
并发:
同一对象
被多个线程
同时操作例:抢票,同时取钱
多个线程访问同一个对象,并且某个线程想修改对象,这时需要
线程同步
,
线程同步其实就是一种等待机制
,多个需要同时访问此对象的线程进入这个对象的等待池
形成队列。等待前面的线程完成,下一个线程再使用。
锁机制
Synchronized,当一个线程获得对象的排它锁,独占资源,其他线程必须等待,使用后释放锁即可。存在以下问题:
- 一个线程持有锁会导致其他所有需要此锁的线程挂起
- 在多线程竞争下,加锁,释放锁会导致比较多的上下文切换和调度延时,引起性能问题
- 如果一个线程的优先级高等待一个优先级低的线程释放锁会导致优先级倒置,引起性能问题
synchronized方法控制对”对象“的访问,每个对象对应一把锁,每个synchronized方法都必须获得调用该方法的对象的锁才能执行,否则线程会阻塞,方法一旦执行,就独占资源,直到该方法返回才释放锁,后面被阻塞的线程才能获得这个锁,继续执行
缺陷
:若将一个大的方法声明为synchronized将会影响效率方法里面需要修改的内容才需要锁,锁太多浪费资源
同步方法,锁的是this
锁的对象就是变量的量,需要增删改查的对象
多个线程各自占有一些共享资源﹐并且互相等待其他线程占有的资源才能运行﹐而导致两个或者多个线程都在等待对方释放资源﹐都停止执行的情形.某一个同步块同时拥有
“两个以上对象的锁”
时﹐就可能会发生“死锁”的问题.
/** * 死锁:多个线程互相抱着对方需要的资源,然后形成僵持 * 解决:一个锁只锁一个对象 */ class DeadLock { public static void main(String[] args) { Makeup makeup = new Makeup(0, "灰姑娘"); Makeup makeup1 = new Makeup(1, "白雪公主"); makeup.start(); makeup1.start(); } } //口红 class Lipstick { } //镜子 class Mirror { } class Makeup extends Thread { /** * 需要的资源只有一份,用static保证只有一份 */ static Lipstick lipstick = new Lipstick(); static Mirror mirror = new Mirror(); /** * 选择 */ int choice; /** * 使用化妆品的人 */ String girlName; public Makeup(int choice, String girlName) { this.choice = choice; this.girlName = girlName; } @Override public void run() { //化妆 try { makeup(); } catch (InterruptedException e) { e.printStackTrace(); } } private void makeup() throws InterruptedException { if (choice == 0) { //获得口红的锁 synchronized (lipstick) { System.out.println(this.girlName + "获得口红的锁"); //一秒钟后想获得镜子 Thread.sleep(1000); synchronized (mirror) { System.out.println(this.girlName + "获得镜子的锁"); } } } else { //获得口红镜子 synchronized (mirror) { System.out.println(this.girlName + "获得镜子的锁"); Thread.sleep(2000); //二秒钟后想获得的锁 synchronized (lipstick) { System.out.println(this.girlName + "获得口红的锁"); } } } } }
解决:
/** * 死锁:多个线程互相抱着对方需要的资源,然后形成僵持 * 解决:一个锁只锁一个对象 */ class DeadLock { public static void main(String[] args) { Makeup makeup = new Makeup(0, "灰姑娘"); Makeup makeup1 = new Makeup(1, "白雪公主"); makeup.start(); makeup1.start(); } } //口红 class Lipstick { } //镜子 class Mirror { } class Makeup extends Thread { /** * 需要的资源只有一份,用static保证只有一份 */ static Lipstick lipstick = new Lipstick(); static Mirror mirror = new Mirror(); /** * 选择 */ int choice; /** * 使用化妆品的人 */ String girlName; public Makeup(int choice, String girlName) { this.choice = choice; this.girlName = girlName; } @Override public void run() { //化妆 try { makeup(); } catch (InterruptedException e) { e.printStackTrace(); } } private void makeup() throws InterruptedException { if (choice == 0) { //获得口红的锁 synchronized (lipstick) { System.out.println(this.girlName + "获得口红的锁"); //一秒钟后想获得镜子 Thread.sleep(1000); } synchronized (mirror) { System.out.println(this.girlName + "获得镜子的锁"); } } else { //获得口红镜子 synchronized (mirror) { System.out.println(this.girlName + "获得镜子的锁"); Thread.sleep(2000); //二秒钟后想获得的锁 } synchronized (lipstick) { System.out.println(this.girlName + "获得口红的锁"); } } } }
死锁避免方法:破坏一个或多个以下的必要条件
产生死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个进程使用
- 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:进程已获得的资源,在未使用完之前,不能强行剥夺
- 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系
java.util.concurrent.locks包下常用的类与接口(lock是jdk 1.5后新增的)
Lock 接口支持那些语义不同(重入、公平等)的锁规则,可以在非阻塞式结构的上下文(包括 hand-over-hand 和锁重排算法)中使用这些规则。主要的实现是 ReentrantLock。
Lock 实现提供了比 synchronized 关键字 更广泛的锁操作,它能以更优雅的方式处理线程同步问题。也就是说,Lock提供了比synchronized更多的功能。
默认:非公平锁,可以插队
公平锁:先来后到
public interface Lock { //获取锁方式1:最常用的方式。如果当前锁不可用,当前线程无法调度并进入休眠状态直到获取到锁 void lock(); //获取锁方式2:获取锁(除非当前线程被中断)。如果当前锁不可用,当前线程无法调度并进入休眠状态直到当前线程获取到锁或者其它线程中断了当前的线程。注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。因为调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。 void lockInterruptibly() throws InterruptedException; //获取锁方式3:获取锁(仅当锁在调用时处于空闲状态时才获取锁)。如果成功获取锁,返回true,否则,返回false;无论如何都会立即返回。在拿不到锁时不会一直在那等待。 boolean tryLock(); //获取锁方式4:获取锁(在规定的等待时间内且线程没有被中断,如果锁处于空闲状态时则获取锁)。如果成功获取锁,返回true,否则,返回false; //如果当前锁不可用,当前线程无法调度并进入休眠状态直到(1)当前线程获取到锁(2)当前线程被其它线程中中断(3)等待时间结束 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //释放锁 void unlock(); //返回绑定到此 Lock 实例的新 Condition 实例 Condition newCondition(); }
实现,买票问题
import java.util.concurrent.locks.ReentrantLock; public class BuyTickets { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(ticket,"张三").start(); new Thread(ticket,"李四").start(); new Thread(ticket,"王五").start(); } } class Ticket implements Runnable{ private Integer ticketNum = 1000; /** * 定义锁 */ private final ReentrantLock lock = new ReentrantLock(); @Override public void run() { while (true){ //上锁 lock.lock(); try { if(ticketNum<=0){ break; } Thread.sleep(1); System.out.println(Thread.currentThread().getName()+"拿到"+ ticketNum--); }catch (Exception e){ e.printStackTrace(); }finally { //解锁 lock.unlock(); } } } }
- Lock是显式锁(手动开启和关闭锁,
别忘记关闭锁
)synchronized是隐式锁,出了作用域自动释放- Lock只有代码块锁,synchronized有代码块锁和方法锁
使用Lock锁,JVM将花费较少的时间来调度线程,性能更好。并且具有更好的扩展性(提供更多的子类)- 优先使用顺序:
Lock >同步代码块(已经进入了方法体,分配了相应资源)>同步方法(在方法体之外)
ps:JUC安全集合List,CopyOnWriteArrayList(线程安全)
应用场景:生产者消费者问题
- 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费.
- 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止.
- 如果仓库中放有产品﹐则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止.
分析:生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件
- 对于生产者,没有生产产品之前,要通知消费者等待﹒而生产了产品之后﹐又需要马上通知消费者消费
- 对于消费者﹐在消费之后,要通知生产者已经结束消费﹐需要生产新的产品
以供消费.- 在生产者消费者问题中,仅有synchronized是不够的
- synchronized可阻止并发更新同一个共享资源,实现了同步
- synchronized不能用来实现不同线程之间的消息传递(通信)
方式一:并发协作模型“生产者/消费者模式”--> 管程法
- 生产者︰负责生产数据的模块(可能是方法﹐对象﹐线程﹐进程)
- 消费者︰负责处理数据的模块(可能是方法﹐对象﹐线程﹐进程)
- 缓冲区∶消费者不能直接使用生产者的数据﹐他们之间有个“缓冲区生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据
实现
/** * @ClassName Communication * @Author ZC * @Date 2022/7/23 9:59 * @Version 1.0 * @Description 管程法解决通信 ->生产者- 缓存区 - 消费者 */ public class Communication { public static void main(String[] args) { CacheArea cacheArea = new CacheArea(); new Producer(cacheArea).start(); new Consumer(cacheArea).start(); } } /** * 生产者 */ class Producer extends Thread{ private CacheArea cacheArea; public Producer(CacheArea cacheArea){ this.cacheArea = cacheArea; } @Override public void run() { for (int i = 0; i <100 ; i++) { //生产者将生产的产品放到缓存区中 cacheArea.push(new Product(i)); System.out.println("生产者生产了->>"+i+"号商品"); } } } /** * 消费者 */ class Consumer extends Thread{ private CacheArea cacheArea; public Consumer(CacheArea cacheArea) { this.cacheArea = cacheArea; } @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("消费者消费了->>"+cacheArea.pop().getId()+"号商品"); } } } /** * 产品 */ class Product { private Integer id; public Product(Integer id) { this.id = id; } public Integer getId() { return id; } } /** * 缓存区 */ class CacheArea { /** * 缓存容器大小 */ Product[] products = new Product[10]; //容器计数器 int count = 0; //生产者放入产品 public synchronized void push(Product product) { //如果容器满了,需要等待消费者消费 /*如果是if的话,假如消费者1消费了最后一个,这是index变成0此时释放锁被消费者2拿到而不是生产者拿到,这时消费者的wait是在if里所以它就直接去消费index-1下标越界,如果是while就会再去判断一下index得值是不是变成0了*/ while (count == products.length) { //通知消费者消费,等待生产 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //如果没有满,需要丢入产品 products[count] = product; count++; //通知消费者消费 this.notifyAll(); } //消费者消费产品 public synchronized Product pop() { //判断是否能消费 while (count <= 0) { //等待生产者生产 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //如果可以消费 count--; Product product = products[count]; //吃完了 通知生产者生产 this.notifyAll(); return product; } }
方式二: 并发协作模型“生产者/消费者模式”--> 信号法
可以利用一个标志位标志
- 背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
- 思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。
可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具.。- 好处:
- 提高响应速度(减少了创建新线程的时间)
- 降低资源消耗(重复利用线程池中线程,不需要每次都创建)
- 便于线程管理
- corePoolSize:核心池的大小
- maximumPoolSize:最大线程数
- keenAliveTime:线程没有任各时最多保持多长时间后终止
JDK 5.0起提供了线程池相关API: ExecutorService 和Executors ExecutorService:真正的线程池接口。常见子类ThreadPoolExecutor void execute(Runnable command) :执行任务/命令,没有返回值,一般用来执行Runnable <T> Future<T> submit(allable<T> task):执行任务,有返回值,- -般又来执行Callable void shutdown() :关闭连接池 Executors: 工具类、线程池的工厂类,用于创建并返回不同类型的线程池
import java.lang.reflect.Executable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestThreadPool { public static void main(String[] args) { //创建线程池 ExecutorService service = Executors.newFixedThreadPool(10); service.execute(new MyThread()); service.execute(new MyThread()); service.execute(new MyThread()); service.execute(new MyThread()); //关闭线程池 service.shutdown(); } } class MyThread extends Thread { @Override public void run() { System.out.println("今天你进步了?"); } }
JUC是java.util.concurrent包的简称,在Java5.0添加,目的就是为了更好的支持高并发任务。让开发者进行多线程编程时减少竞争条件和死锁的问题!
进程 : 一个运行中的程序的集合,一个进程往往可以包含多个线程,至少包含一个线程
java默认有几个线程? 两个
main线程 gc线程
线程 : 线程(thread)是操作系统能够进行运算调度的最小单位。
并发(多线程操作同一个资源,交替执行)
CPU一核, 模拟出来多条线程,快速交替
核心
:充分利用cpu资源并行(多个人一起行走, 同时进行)
CPU多核,多个线程同时进行 ; 使用线程池操作
- 来自不同的类
- wait:object类
- sleep:线程类
- 关于锁的释放
- wait:释放锁
- sleep:不释放锁
- 使用的范围不同
- wait:必须在同步代码块中
- sleep:可以在任何地方睡眠
又称信号量三组工具类,包含有
- CountDownLatch(闭锁) 是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
- CyclicBarrier(栅栏) 之所以叫barrier,是因为是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 ,并且在释放等待线程后可以重用
- Semaphore(信号量) 是一个计数信号量,它的本质是一个“共享锁“。信号量维护了一个信号量许可集。线程可以通过调用 acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可
Java里面线程池的顶级接口,但它只是一个执行线程的工具,真正的线程池接口是ExecutorService,里面包含的类有:
- ScheduledExecutorService 解决那些需要任务重复执行的问题
- ScheduledThreadPoolExecutor 周期性任务调度的类实现
JDK提供的一组原子操作类,
包含有
AtomicBoolean
、AtomicInteger
、AtomicIntegerArray
等原子变量类,他们的实现原理大多是持有它们各自的对应的类型变量value,而且被volatile关键字修饰了。这样来保证每次一个线程要使用它都会拿到最新的值。
JDK提供的锁机制,相比synchronized关键字来进行同步锁,功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁包含的实现类有:
- ReentrantLock 它是独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁
- ReentrantReadWriteLock 它包括子类ReadLock和WriteLock。ReadLock是共享锁,而WriteLock是独占锁
- LockSupport 它具备阻塞线程和解除阻塞线程的功能,并且不会引发死锁
主要是提供线程安全的集合
- ArrayList对应的高并发类是CopyOnWriteArrayList
- HashSet对应的高并发类是 CopyOnWriteArraySet
- HashMap对应的高并发类是ConcurrentHashMap等等
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Test { public static void main(String[] args) { Business business = new Business(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.increment(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.decrement(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.increment(); } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.decrement(); } }, "D").start(); } } class Business { private int num = 0; private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //condition.await(); 等待 condition.signalAll();唤醒 public void increment() { lock.lock(); try { while (num != 0) { condition.await(); } num++; System.out.println(Thread.currentThread().getName() + "-->" + num); condition.signalAll(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } public void decrement() { lock.lock(); try { //防止虚假唤醒if换成while while (num == 0) { condition.await(); } num--; System.out.println(Thread.currentThread().getName() + "-->" + num); condition.signalAll(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }
A-B-C依次执行,达到一定条件之后通过condition监视等待/唤醒指定业务
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class TestCondition { public static void main(String[] args) { Business1 business = new Business1(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { business.printC(); } }, "C").start(); } } class Business1{ private int num = 1; private ReentrantLock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition(); public void printA(){ lock.lock(); try { while (num!=1){ conditionA.await(); } System.out.println("A执行"); num=2; conditionB.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void printB(){ lock.lock(); try { while (num!=2){ conditionB.await(); } System.out.println("B执行"); num=3; conditionC.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void printC(){ lock.lock(); try { while (num!=3){ conditionC.await(); } System.out.println("C执行"); num=1; conditionA.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
1、以下代码,先执行打电话还是发短信?
先执行发短信,使用synchronize锁,锁的对象是方法的调用者。同一个对象调用方法,谁先拿到锁谁先执行
import java.util.concurrent.TimeUnit; /** * 1、先执行打电话还是发短信 * 发短信? */ public class Test01 { public static void main(String[] args) { Phone phone = new Phone(); //phone同一个对象 new Thread(()->{phone.sendsms();},"A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } new Thread(()->{phone.call();},"B").start(); } } class Phone{ //打电话 /** * sychronized 锁的对象是方法的调用者 * 同一个对象调用,谁先拿到谁先执行 */ public synchronized void call(){ System.out.println("打电话"); } //发短信 public synchronized void sendsms(){ try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("发短信"); } }两个对象,以下代码先执行打电话
import java.util.concurrent.TimeUnit; public class Test01 { public static void main(String[] args) { Phone phone = new Phone(); Phone phone1 = new Phone(); //phone同一个对象 new Thread(()->{phone.sendsms();},"A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } new Thread(()->{phone1.call();},"B").start(); } } class Phone{ public synchronized void call(){ System.out.println("打电话"); } //发短信 public synchronized void sendsms(){ try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("发短信"); } public void hello(){ System.out.println("未加锁,不受锁的影响"); } }jstatic静态修饰的synchronized,静态方法在一加载时就有 这是锁的就是class模板对象(class模板对象唯一,属于同一对象)
package com.example.juc.lock8; import java.util.concurrent.TimeUnit; /** * static修饰的synchronized 锁对象,锁的是class模板对象 */ public class Test02 { public static void main(String[] args) { Phone1 phone = new Phone1(); Phone1 phone1 = new Phone1(); //phone同一个对象 new Thread(()->{phone.sendsms();},"A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } new Thread(()->{phone1.call();},"B").start(); } } class Phone1{ /** * static修饰 锁的是class模板对象唯一 */ public static synchronized void call(){ System.out.println("打电话"); } //发短信 public static synchronized void sendsms(){ try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("发短信"); } }PS: synchronized作用在普通方法上,锁是方法调用者(对象),作用在静态方法上锁是Class类模板
并发下ArrayList集合不安全
public class TestList { public static void main(String[] args) { List<Object> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 6)); System.out.println(list); }, String.valueOf(i)).start(); } } }以上代码出现异常
java.util.ConcurrentModificationException【并发修改异常】
Vector
Collections.synchronizedList()
集合工具类转换
CopyOnWriteArrayList
:写入时复制(避免写入时覆盖)
COW 计算机程序设计领域的一种优化策略
多个线程调用时,list读取的时候固定,写入时可能覆盖
读写分离
#部分源码 private transient volatile Object[] array; /** * Sets the array. */ final void setArray(Object[] a) { array = a; } /** * Creates an empty list. */ public CopyOnWriteArrayList() { setArray(new Object[0]); }ps:vector的add使用了synchronized而CopyOnWriteArrayList的add使用的lock锁
- Set
CopyOnWriteArraySet
ps:同CopyOnWriteArrayList原理一样
hashSet底层原理基于hashmap
public HashSet() { map = new HashMap<>(); } //set的add就是map的key public boolean add(E e) { return map.put(e, PRESENT)==null; }
hashmap的了解HashMap实现原理
- loadFactor (加载因子)默认0.75
- initialCapacity(初始容量)16
1、Collections.synchronizedMap(new HashMap<>());
2、
ConcurrentHashMap
- 原理:查看ConcurrentHashMap原理
import java.util.concurrent.CountDownLatch; /** * 减法计数器-线程执行时countDownLatch.countDown(); * countDownLatch.await();归零后再向下执行 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //总数是6 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"go out"); //进行-1 countDownLatch.countDown(); },String.valueOf(i)).start(); } //等待计数器归零,然后向下执行 countDownLatch.await(); System.out.println("关闭"); } }
原理:
- countDown()数量减1
- await()等待计数器归0,然后再向下执行
每次有线程执行时调用countDown()数量减1,当都执行完成后计数器归0,则countDownLatch.await()唤醒,继续执行
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public static void main(String[] args) { //指定一个阀值,当线程执行达到这个阀值后才执行 CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{ System.out.println("可以兑换5块钱纸币"); }); for (int i = 1; i <=5 ; i++) { final int temp = i; new Thread(()->{ System.out.println("收集到个1块钱硬币"); try { cyclicBarrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } },String.valueOf(i)).start(); } } }
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * 模拟抢车位 * 10辆车5个车位 * 应用实例:限流等 */ public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(5); for (int i = 1; i <= 10; i++) { new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"离开了车位"); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { semaphore.release(); } },String.valueOf(i)).start(); } } }
原理:
- semaphore. acquire() 获得,假设如果已经满了, 等待,等待被释放为止!
- semaphore. release() 释放 ,会将当前的信号量释放+ 1 ,然后唤醒等待的线程!
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!
对于锁更加的细致
独占锁(写锁) 一次只能被一个线程占有
共享锁(读锁) 可以同时被多个线程占有import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo { public static void main(String[] args) { ReadWrite readWrite = new ReadWrite(); //写 for (int i = 1; i < 5; i++) { final int temp = i; new Thread(()->{ readWrite.put(temp+"","写"); },String.valueOf(i)).start(); } //读 for (int i = 1; i < 5; i++) { final int temp = i; new Thread(()->{ readWrite.get(temp+""); },String.valueOf(i)).start(); } } } class ReadWrite{ private Map<String,Object> map = new ConcurrentHashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put(String key,Object value){ readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"写入"); map.put(key,value); System.out.println(Thread.currentThread().getName()+"写入完成"); }catch (Exception e){ e.printStackTrace(); }finally { readWriteLock.writeLock().unlock(); } } public void get(String key){ readWriteLock.readLock().lock(); try { map.get(key); System.out.println(Thread.currentThread().getName()+"读取成功"); }catch (Exception e){ e.printStackTrace(); }finally { readWriteLock.readLock().unlock(); } } }
写入:队满时,阻塞等待
读取:对空时,阻塞等待
4种方式
方式 | 抛出异常 | 不抛异常 | 一直阻塞 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(Object 0,long timeout, TimeUnit unit) |
移除 | remove() | poll() | take() | poll(long timeout, TimeUnit unit) |
取队列首 | element() | peek() |
- 抛出异常
public static void test1() { //阀值3队列长度 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3); arrayBlockingQueue.add("a"); arrayBlockingQueue.add("b"); arrayBlockingQueue.add("c"); //添加第四个的时候抛出异常 Queue full // arrayBlockingQueue.add("d"); System.out.println("队头元素"+arrayBlockingQueue.element()); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); //队空的情况下还移除 抛出NoSuchElementException // arrayBlockingQueue.remove(); }
- 不抛异常
public static void test2() { //阀值3队列长度 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3); arrayBlockingQueue.offer("a"); arrayBlockingQueue.offer("b"); arrayBlockingQueue.offer("c"); arrayBlockingQueue.offer("d"); System.out.println("队头元素"+arrayBlockingQueue.peek()); arrayBlockingQueue.poll(); arrayBlockingQueue.poll(); arrayBlockingQueue.poll(); arrayBlockingQueue.poll(); }
- 一直阻塞
public static void test3() throws InterruptedException { //阀值3队列长度 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3); arrayBlockingQueue.put("a"); arrayBlockingQueue.put("b"); arrayBlockingQueue.put("c"); //队满添加元素时 程序一直处于等待状态 arrayBlockingQueue.put("d"); System.out.println("队头元素"+arrayBlockingQueue.peek()); arrayBlockingQueue.take(); arrayBlockingQueue.take(); arrayBlockingQueue.take(); //队空移除元素时 程序一直处于等待状态 arrayBlockingQueue.take(); }
- 超时等待
public static void test4() throws InterruptedException { //阀值3队列长度 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3); System.out.println(arrayBlockingQueue.offer("a")); System.out.println(arrayBlockingQueue.offer("b")); System.out.println(arrayBlockingQueue.offer("c")); //添加元素发现队满后等待一定时间然后再向下执行 System.out.println(arrayBlockingQueue.offer("d",3, TimeUnit.SECONDS)); System.out.println("队头元素"+arrayBlockingQueue.peek()); System.out.println("--------------------------------"); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); //移除元素发现队空后等待一定时间返回null然后再向下执行 System.out.println(arrayBlockingQueue.poll(3,TimeUnit.SECONDS)); }
- put()
- take()
一次只能添加一个元素,添加后必须消费,下个才能添加和消费
ps:同步队列不保留元素
- Executors.newSingleThreadExecutor(); //单个线程
- Executors.newFixedThreadPool(4);//指定线程池大小
- Executors.newCachedThreadPool();//由性能决定
底层实现都是创建new ThreadPoolExecutor()
不推荐使用Executors创建线程池
- Executors各个方法的弊端:
- newFixedThreadPool和newSingleThreadExecutor
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM- newCachedThreadPool和newScheduledThreadPool
主要问题是线程数最大数是Integer.MAX_VALUE(约为21亿),可能会创建数量非常多的线程,甚至OOM
自定义线程池的7大参数
# new ThreadPoolExecutor()部分源码-分析 public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 int maximumPoolSize,//最大核心线程池大小 long keepAliveTime,//超时等待时间,没有就会释放 TimeUnit unit,//超时单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂,创建线程的,一般不动 RejectedExecutionHandler handler) {//拒绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }4种拒绝策略
当线程池达到最大承载时执行
- new ThreadPoolExecutor.AbortPolicy() 当还有哦任务时不处理抛出异常RejectedExecutionException
- new ThreadPoolExecutor.CallerRunsPolicy() 对满时不抛出异常,丢掉任务
- new ThreadPoolExecutor.DiscardPolicy() 不接收,哪来回哪去
- new ThreadPoolExecutor.DiscardOldestPolicy() 队满尝试和最早的竞争,不抛出异常
例:
import java.util.concurrent.*; public class ThreadPoolDemo { public static void main(String[] args) { //Runtime.getRuntime().availableProcessors() //cpu核数 ExecutorService pool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy() ); //最大承载:队列+max值 for (int i = 0; i < 9; i++) { //使用了线程池之后,使用线程池来创建线程 pool.execute(() -> { System.out.println(Thread.currentThread().getName() + "ok"); }); } //线程池用完,程序结束,关闭线程池 pool.shutdown(); } }最大核心线程池数量如何定义?
cpu密集型
几核就是几,保证cpu的效率最高
Runtime.getRuntime().availableProcessors()IO密集型
并行执行任务,提高效率
特点:工作窃取(例如两个任务,一个执行完另一个没有执行完,执行完的会去获取未执行完的执行)
import java.util.concurrent.RecursiveTask; public class ForkJoinDemo extends RecursiveTask<Long> { /** * 开始值 */ private Long start; /** * 末值 */ private long end; /** * 临界值 */ private long temp = 1_0000_0000; public ForkJoinDemo(Long start, long end) { this.start = start; this.end = end; } /** * 计算1-10000的累加和 */ public Long compute() { long sum = 0; if (end - start < temp) { for (long i = start; i <= end; i++) { sum += i; } return sum; } else { //中间值 long middle = (start+end)/2; ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end); task2.fork(); return task1.join()+task2.join(); } } }
#ForkJoin并行计算累加和 public static void calculate2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task= new ForkJoinDemo(1l,10_0000_0000); ForkJoinTask<Long> submit = pool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum="+sum+"\t耗时"+(end-start)); }
Future :可以让调用方立即返回,然后它自己会在后面慢慢处理,此时调用者拿到的仅仅是一个凭证,调用者可以先去处理其它任务,在真正需要用到调用结果的场合,再使用凭证去获取调用结果。这个凭证就是这里的Future。
证需要具有以下特点:
在将来某个时间点,可以通过凭证获取任务的结果;
可以支持取消。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class FetureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { test2(); } /** * 没有返回值 * * @throws ExecutionException * @throws InterruptedException */ public static void test1() throws ExecutionException, InterruptedException { /** * CompletableFuture<Void> 没有返回值得淘汰 */ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "测试CompletableFuture"); } catch (InterruptedException e) { throw new RuntimeException(e); } }); System.out.println("no return"); //获取结果 voidCompletableFuture.get(); } /** * 有返回值得回调 */ public static void test2() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "测试有返回值的回调"); //测试出现错误时异步回调 int i = 1024 / 0; return "1024"; }); /** * 获取异步回调的结果 * 成功后的回调 */ String s = completableFuture.whenComplete((t, u) -> { System.out.println(t);//回调成功的返回结果 System.out.println(u);//异常信息 }).exceptionally(//失败后的错误回调 (e) -> { System.out.println("--->" + e.getMessage()); return "出现错误信息,请查看"; } ).get(); //返回值得异步回调信息 System.out.println("------------------------"); System.out.println(s); } }
什么是JMM
java的内存模型,一种约定,概念
1、线程在解锁前,必须把共享的变量立刻刷新回主存!
2、线程在加锁前,必须读取主存中最新的值到工作内存(线程有自己的工作内存)中!
3、加锁和解锁是同一把锁
- lock (锁定) - 作用于主内存的变量,它把一个变量标识为一条线程独占的状态。【加琐时的操作】
- unlock (解锁) - 作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
- read (读取) - 作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用。
- write (写入) - 作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值放入主内存的变量中。
- load (载入) - 作用于工作内存的变量,它把 read 操作从主内存中得到的变量值放入工作内存的变量副本中。
- use (使用) - 作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值得字节码指令时就会执行这个操作【运算】。
- assign (赋值) - 作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作【赋值】。
- store (存储) - 作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后 write 操作使用。
ps:执行流程:主内存->read->load->线程工作内存->use->业务方法->assign->工作内存->store->write->主内存
Volatile
Java虚拟机提供的轻量级的同步机制
- 保证可见性
public class VolatileDemo { /** * 未添加volatile num发生变化时候 线程监听不到 * 添加了volatile 线程发现主内存中值改变 停止(volatile的可见性) */ private static volatile int num = 0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while (num==0){ } }).start(); TimeUnit.SECONDS.sleep(2); num = 1; System.out.println(num); } }
不保证原子性
public class VDemo01 { /** * 会发现每次执行结果都不相同 */ private static volatile int num = 0; public static void add(){ num++; } public static void main(String[] args) { for (int i =0;i<1000;i++){ new Thread(()->{ add(); }).start(); } System.out.println(Thread.currentThread()+"->"+num); while (Thread.activeCount()>2){ Thread.yield(); } } }解决方式
- lock
- synchronized
使用原子类
- 由于内存屏障(竞争指令顺序交换),禁止指令重排
- 保证特定的操作执行循序
- 可以保证某些变量的内存可见性(利用这些特性,保证valitale实现了可见性)
源码程序顺序和执行的顺序不一致,重排序的对象是指令。指令重排序是编译器处于性能考虑,在不影响程序(单线程程序)正确性的条件下进行重新排序。指令重排序不是必然发生的,指令重排序会导致线程安全问题。指令重排序也被称为处理器的乱序执行,在这种情况下尽管指令的执行顺序可能没有完全按照程序顺序执行,但是由于指令的执行结果的提交(反应到寄存器和内存中),仍然是按照程序顺序来的,因此处理器的指令重排序并不会对单线程的正确性产生影响。指令重排序不会对单线程程序的正确性产生影响,但他可能导致多线程程序出现非预期结果。
public class SingletonH { /** * 一开始就创建对象 */ private SingletonH(){ } private final static SingletonH singletonH = new SingletonH(); public static SingletonH getInstance(){ return singletonH; } }
public class LazySingleton { private volatile static LazySingleton lazySingleton; private LazySingleton(){ } /** * 需要的时候才被创建 * 单线程模式模式下,存在问题 * @return */ private static LazySingleton getInstance(){ if(lazySingleton==null){ lazySingleton = new LazySingleton(); } return lazySingleton; } }
public class LazySingleton { private volatile static LazySingleton lazySingleton; private LazySingleton(){ } private static LazySingleton getInstance(){ if(lazySingleton==null){ synchronized (LazySingleton.class){ if(lazySingleton==null){ lazySingleton = new LazySingleton(); } } } return lazySingleton; } }
//静态内部类 public class Holder { private Holder() { } public static Holder getInstance() { return InnerClass.HOLDER; } public static class InnerClass { private static final Holder HOLDER = new Holder(); } }
因为反射,单例模式不安全,采用枚举(反射不能破坏枚举,尝试反射的话会报Cannot reflectively create enum objects)
public enum EnumSingle { INSTANCE; public EnumSingle getInstance() { return INSTANCE; } }
CAS【CAS是英文单词Compare and Swap的缩写,翻译过来就是比较并替换】
CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B
ps:CAS的缺点:
- CPU开销过大
在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很到的压力。
- 不能保证代码块的原子性
CAS机制所保证的知识一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用synchronized了。
- ABA问题
例如:两个业务同时操作一个数据
- 业务二已经对数据进行了修改并还原,这时业务一操作数据
- 业务一不知道数据被修改过
public class CASDemo { public static void main(String[] args) { //原子类 AtomicInteger atomicInteger = new AtomicInteger(2022); //模拟ABA问题 //业务一 new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("--------"+Thread.currentThread().getName()+"操作--------"); System.out.println(atomicInteger.compareAndSet(2022, 2019)); System.out.println(atomicInteger.get()); },"1").start(); //业务二 new Thread(()->{ System.out.println("--------"+Thread.currentThread().getName()+"操作--------"); System.out.println(atomicInteger.compareAndSet(2022, 2019)); System.out.println(atomicInteger.compareAndSet(2019, 2022)); System.out.println(atomicInteger.get()); },"2").start(); } }发现都是true,希望一个业务修改还原后,希望另一个业务操作出现false
new AtomicStampedReference<>()解决这个问题,原理同乐观锁
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicStampedReference; public class CASDemo { public static void main(String[] args) { /** * initialRef 初始内容 * initialStamp 初始版本号 */ AtomicStampedReference<Integer> integerAtomicStampedReference = new AtomicStampedReference<>(1, 1); //模拟ABA问题 //业务一 new Thread(()->{ System.out.println("--------"+Thread.currentThread().getName()+"操作-s-------"); //获取版本号 int stamp = integerAtomicStampedReference.getStamp(); System.out.println("a-初始->版本:"+stamp); try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { throw new RuntimeException(e); } boolean b = integerAtomicStampedReference.compareAndSet(1, 3, stamp, stamp + 1); System.out.println(b); System.out.println("--------"+Thread.currentThread().getName()+"操作-e-------"); },"a").start(); //业务二 new Thread(()->{ System.out.println("--------"+Thread.currentThread().getName()+"操作-s-------"); int stamp = integerAtomicStampedReference.getStamp(); System.out.println("b-初始-版本:"+stamp); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } integerAtomicStampedReference.compareAndSet(1,2,stamp,stamp+1); int stamp1 = integerAtomicStampedReference.getStamp(); System.out.println("b-first->版本:"+stamp1); integerAtomicStampedReference.compareAndSet(2,1,stamp1,stamp1+1); int stamp2 = integerAtomicStampedReference.getStamp(); System.out.println("b-second->版本:"+stamp2); System.out.println("--------"+Thread.currentThread().getName()+"操作-e-------"); },"b").start(); } }