在 Java 5.0 提供了java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步IO和轻量级任务框架;还提供了设计用于多线程上下文中的Collection实现等。
进程:是程序的一次执行过程,或是正在运行的一个程序,是动态的。是资源分配的单位。
线程:是一个进程内部的一条执行路径,作为最小的调度和执行单位,每个线程有独立的运行栈和程序计数器(pc)。
要区分与线程的生命周期,与线程生命周期不同。
public enum State { /** * 新生 */ NEW, /** * 运行 */ RUNNABLE, /** * 阻塞 */ BLOCKED, /** * 等待,死死的等 */ WAITING, /** * 超时等待 */ TIMED_WAITING, /** * 终止 */ TERMINATED; }
1、synchronized关键字包裹同步区域。
2、Lock接口实现类。
其中Lock接口实现类ReentrantLock在java.util.concurrent.locks包下,synchronized依赖于JVM而ReentrantLock依赖于API。ReentrantLock 比 synchronized 增加了一些高级功能。
相比synchronized,ReentrantLock增加了一些高级功能。主要来说主要有三点:
测试代码:
public class TestJava20211228{ public static void main(String[] args){ TicketSalerReentrantLock saler1 = new TicketSalerReentrantLock(); TicketSalerReentrantLock saler2 = new TicketSalerReentrantLock(); TicketSalerReentrantLock saler3 = new TicketSalerReentrantLock(); saler1.start(); saler2.start(); saler3.start(); } } class TicketSalerReentrantLock extends Thread{ private static final Lock lock = new ReentrantLock(); private static final Random rand = new Random(); private static int ticket = 100; public void run(){ while(true){ try{ lock.lock(); if(ticket <= 0) break; Thread.sleep(rand.nextInt(100)); ticket--; System.out.println("TICKET IS " + ticket + "."); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } }
条件变量,是为了解决等待同步需求,实现线程间协作通信的一种机制。条件变量用来自动阻塞一个线程,直到某特殊情况发生唤醒该线程为止。通常条件变量和锁机制同时使用。
Java中提供的条件变量:
1、synchronized + Object对象 实现方法 – [ wait()、notify()、notifyAll() ]
2、ReentrantLock + Condition对象 实现方法 – [ await()、signal()、signalAll() ]
使用条件变量实现交替打印奇偶数:
测试代码(synchronized + Object对象):
public class TestJava20211228{ public int num = 10; public static void main(String[] args){ TestJava20211228 test = new TestJava20211228(); PrinterOne one = new PrinterOne(test); PrinterTwo two = new PrinterTwo(test); one.start(); two.start(); } } class PrinterOne extends Thread{ private TestJava20211228 test; public PrinterOne(TestJava20211228 test){ this.test = test; } public void run(){ try{ while(test.num > 0){ synchronized(test){ if(test.num%2 == 0){ test.wait(); }else{ System.out.println(Thread.currentThread().getName() + " -> " + test.num--); test.notify(); } } } }catch(Exception e){ e.printStackTrace(); } } } class PrinterTwo extends Thread{ private TestJava20211228 test; public PrinterTwo(TestJava20211228 test){ this.test = test; } public void run(){ try{ while(test.num > 0){ synchronized(test){ if(test.num%2 != 0){ test.wait(); }else{ System.out.println(Thread.currentThread().getName() + " -> " + test.num--); test.notify(); } } } }catch(Exception e){ e.printStackTrace(); } } }
测试代码(ReentrantLock + Condition对象):
public class TestJava20211228{ public int num = 10; public static void main(String[] args){ TestJava20211228 test = new TestJava20211228(); Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); PrinterOne one = new PrinterOne(test, lock, condition); PrinterTwo two = new PrinterTwo(test, lock, condition); one.start(); two.start(); } } class PrinterOne extends Thread{ private Lock lock; private TestJava20211228 test; private Condition condition; public PrinterOne(TestJava20211228 test, Lock lock, Condition condition){ this.test = test; this.lock = lock; this.condition = condition; } public void run(){ while(test.num > 0){ lock.lock(); try{ if(test.num%2 == 0){ condition.await(); }else{ System.out.println(Thread.currentThread().getName() + " -> " + test.num--); condition.signal(); } }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } } class PrinterTwo extends Thread{ private Lock lock; private TestJava20211228 test; private Condition condition; public PrinterTwo(TestJava20211228 test, Lock lock, Condition condition){ this.test = test; this.lock = lock; this.condition = condition; } public void run(){ while(test.num > 0){ lock.lock(); try{ if(test.num%2 != 0){ condition.await(); }else{ System.out.println(Thread.currentThread().getName() + " -> " + test.num--); condition.signal(); } }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } }
作用:
1、保证变量的可见性:
在 JDK1.2之前,Java的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的。而在当前的Java内存模型下,线程可以把变量保存本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。要解决这个问题,就需要把变量声明为volatile,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。
2、防止JVM的指令重排:
重排序是指编译器和处理器为了优化程序性能而对指令序列进行排序的一种手段。重排序需要遵守一定规则: (1)、重排序操作不会对存在数据依赖关系的操作进行重排序。比如:a=1;b=a; 这个指令序列,由于第二个操作依赖于第一个操作,所以在编译时和处理器运行时这两个操作不会被重排序。
(2)、重排序是为了优化性能,但是不管怎么重排序,单线程下程序的执行结果不能被改变。比如:a=1;b=2;c=a+b这三个操作,第一步(a=1)和第二步(b=2)由于不存在数据依赖关系, 所以可能会发生重排序,但是c=a+b这个操作是不会被重排序的,因为需要保证最终的结果一定是c=a+b=3。
1、什么是ThreadLocal
ThreadLocal 叫做本地线程变量,意思是说,ThreadLocal 中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。
2、ThreadLocal 使用方法
public class TestJava20211228{ public int num = 10; public static void main(String[] args){ MyThread th1 = new MyThread(); MyThread th2 = new MyThread(); th1.start(); th2.start(); } } class MyThread extends Thread{ private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>(); public void run(){ int value = new Random().nextInt(); threadLocal.set(value); System.out.println(Thread.currentThread().getName() + " save value is " + value); System.out.println(Thread.currentThread().getName() + " value is " + threadLocal.get()); } }
ThreadLocal类中有set(value)方法,get()方法以及remove()方法可以用来操作ThreadLocal。当值被设置之后,会为每一个线程单独开辟一个位置来存储这个值。底层实现方式为哈希表。
ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。获取数据和添加数据都是使用同一个锁对象。
LinkedBlockingQueue:是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为整形最大值。底层使用链表来维护队列,在添加和删除队列中的元素的时候,会创建和销毁节点对象,在高并发和大量数据的时候,GC压力很大。获取数据和添加数据使用不同的锁对象。
PriorityBlockingQueue:是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。继承Comparable类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
DelayQueue:是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
SynchronousQueue:是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。
LinkedTransferQueue:是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法,如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻传输给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法,tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
LinkedBlockingDeque:LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在工作窃取模式中。
/** * add remove element * add 不可以加空,向已满的队列中添加数据抛出异常。 * remove 移除元素,若队列为空,则抛出异常。 * element 检查队列是否存在某元素,若队列为空则抛出异常。 * * offer poll peek * offer 向队列中添加一个元素,不可以添加null值,添加成功则返回true,添加失败则返回false。 * poll 从队列中取出一个元素,若队列为空则返回null。 * peek 检查队列中是否有元素,有则返回该元素的值,没有则返回空。 * * put take * put 插入一个元素,若队列已满则阻塞插入元素的线程。 * take 从队列中获取一个元素,若队列为空则阻塞当前获取元素的线程。 */
测试代码:
线程池是指在初始化一个多线程应用程序过程中创建的一个线程集合,线程池在任务未到来之前,会创建一定数量的线程放入空闲队列中.这些线程都是处于睡眠状态,即均未启动,因此不消耗CPU,只是占用很小的内存空间.当请求到来之后,线程池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。
当预先创建的线程都处于运行状态时,线程池可以再创建一定数量的新线程,用于处理更多的任务请求。如果线程池中的最大线程数使用满了,则会抛出异常,拒绝请求.当系统比较清闲时,也可以通过移除一部分一直处于停用状态的线程,线程池中的每个线程都有可能被分配多个任务,一旦任务完成,线程回到线程池中并等待下一次分配任务。
如果程序中有大量短时间任务的线程任务,由于创建和销毁线程需要和底层操作系统交互,大量时间都耗费在创建和销毁线程上,因而比较浪费时间,系统效率很低。而且线程的创建和销毁相比于普通的对象更为消耗资源,线程池技术的引入,就是为了解决这一问题。
线程池里的每一个线程任务结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用,因而借助线程池可以提高程序的执行效率。
// 用于接收创建的特定线程池对象 java.util.concurrent.ExecutorService // 线程池工厂类 java.util.concurrent.Executors
newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
1)FixedThreadPool和SingleThreadPool允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool和 ScheduledThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
ThreadPoolExecutor类相关属性:
/** * 线程池属性 * corePoolSize:线程池中最小的工作线程数量 * maximumPoolSize:线程池最大线程数 * keepAliveTime:空闲线程等待执行任务的超时时间(纳秒) * TimeUnit:时间单位 * workQueue:任务缓存队列,用来存放等待执行的任务 * ThreadFactory:线程工厂 * handler:任务拒绝策略 */ ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小 Runtime.getRuntime().availableProcessors(), // 线程池最大线程数 取CPU个数 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // 根据业务设置队列大小,队列大小一定要设置 Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.AbortPolicy() //拒绝策略 );
1)、创建一个线程池,在还没有任务提交的时候,默认线程池里面是没有线程的。当然,你也可以调用prestartCoreThread方法,来预先创建一个核心线程。
2)、线程池里还没有线程或者线程池里存活的线程数小于核心线程数corePoolSize时,这时对于一个新提交的任务,线程池会创建一个线程去处理提交的任务。当线程池里面存活的线程数小于等于核心线程数corePoolSize时,线程池里面的线程会一直存活着,就算空闲时间超过了keepAliveTime,线程也不会被销毁,而是一直阻塞在那里一直等待任务队列的任务来执行。
3)、当线程池里面存活的线程数已经等于corePoolSize了,这是对于一个新提交的任务,会被放进任务队列workQueue排队等待执行。而之前创建的线程并不会被销毁,而是不断的去拿阻塞队列里面的任务,当任务队列为空时,线程会阻塞,直到有任务被放进任务队列,线程拿到任务后继续执行,执行完了过后会继续去拿任务。这也是为什么线程池队列要是用阻塞队列。
4)、当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列也满了,这里假设maximumPoolSize>corePoolSize(如果等于的话,就直接拒绝了),这时如果再来新的任务,线程池就会继续创建新的线程来处理新的任务,直到线程数达到maximumPoolSize,就不会再创建了。这些新创建的线程执行完了当前任务过后,在任务队列里面还有任务的时候也不会销毁,而是去任务队列拿任务出来执行。在当前线程数大于corePoolSize过后,线程执行完当前任务,会有一个判断当前线程是否需要销毁的逻辑,如果能从任务队列中拿到任务,那么继续执行,如果拿任务时阻塞(说明队列中没有任务),那超过keepAliveTime时间就直接返回null并且销毁当前线程,直到线程池里面的线程数等于corePoolSize之后才不会进行线程销毁。
5)、如果当前的线程数达到了maximumPoolSize,并且任务队列也满了,这种情况下还有新的任务过来,那就直接采用拒绝的处理器进行处理。默认的处理器逻辑是抛出一个RejectedExecutionException异常。你也就可以指定其他的处理器,或者自定义一个拒绝处理器来实现拒绝逻辑的处理(比如将这些任务存储起来)。JDK提供了四种拒绝策略处理类:
均为ThreadPoolExecutor类中的静态内部类:
AbortPolicy(抛出一个异常,默认)
DiscardPolicy(直接丢弃任务)
DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
CallerRunsPolicy(交给线程池调用所在的线程进行处理)
测试代码:
import java.util.concurrent.*; public class ThreadPoolExecutorTest{ public static void main(String[] args){ ExecutorService executor = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 2L, TimeUnit.SECONDS, new LinkedBlockingQueue(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); Future<Integer> result = executor.submit(new MyTask2()); try{ System.out.println("result is " + result.get()); }catch(Exception e){ e.printStackTrace(); } executor.execute(new MyTask()); } } class MyTask extends Thread{ @Override public void run(){ System.out.println("MyTask executed"); } } class MyTask2 implements Callable<Integer>{ @Override public Integer call() throws Exception{ System.out.println("MyTask2 executed"); return 1; } }
List列表: java.util包 List<String> list = new Vector<>(); List<String> list = Collections.synchronizedList(new ArrayList<>()); java.util.concurrent包 List<String> list = new CopyOnWriteArrayList<>(); Set集合: java.util包 Set<String> set= Collections.synchronizedSet(new HashSet<>()); java.util.concurrent包 Set<String> set=new CopyOnWriteArraySet<>(); Map集合: java.util包 HashTable Map<String,String> map= Collections.synchronizedMap(new HashMap<>()); java.util.concurrent包 Map<String,String> map= new ConcurrentHashMap<>();
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。如图所示:
fork/join优秀的地方就在于这个算法,假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一 一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
ForkJoinTask类:是任务本身,使用它来创建任务,提供fork(),join(),compute()等核心方法。
提供两个实现子类:
RecursiveTask:需要返回值时。
RecursiveAction:不需要返回值时。
ForkJoinPool类:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
计算1-1001累加后的和:
import java.util.concurrent.*; public class ForkJoinTest0118{ public static void main(String[] args){ // 使用ForkJoinPool执行任务 ForkJoinPool forkJoinPool = new ForkJoinPool(); // 类似于向线程池提交任务 ForkJoinTask<Integer> task = forkJoinPool.submit(new MyForkJoinTask(1, 1001)); try{ Integer result = task.get(); System.out.println(result); }catch(Exception e){ e.printStackTrace(); } } } // 创建一个任务类 继承RecursiveTask或RecursiveAction类 class MyForkJoinTask extends RecursiveTask<Integer>{ private static final Integer MAX = 200; private Integer startValue; private Integer endValue; public MyForkJoinTask(Integer startValue, Integer endValue){ this.startValue = startValue; this.endValue = endValue; } // 重写compute方法 @Override protected Integer compute(){ // 当满足一定条件时执行计算 if(endValue - startValue < MAX) { System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue); Integer totalValue = 0; for(int index = this.startValue ; index <= this.endValue ; index++) { totalValue += index; } return totalValue; }else { // 任务过大则进行Fork操作 // 具体流程 创建俩个新的ForkJoinTask 调用fork方法 MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2); subTask1.fork(); MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue); subTask2.fork(); // 返回时进行任务合并 调用join方法 return subTask1.join() + subTask2.join(); } } }