在一个系统中,如果线程数量较多,且功能分配比较明确,就可以将相同功能的线程放在同一个线程组(ThreadGroup) 中。
ThreadGroup有两个比较重要的功能:
activeCount()
方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值list()
方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助线程组使用举例:
public class TestThreadGroup { public static void main(String[] args) { ThreadGroup applesThread = new ThreadGroup("apples"); ThreadGroup bananasThread = new ThreadGroup("bananas"); Apple apple = new Apple(); Banana banana = new Banana(); for (int i = 0; i < 3; i++) { new Thread(applesThread, apple, "apple" + i).start(); } for (int i = 0; i < 3; i++) { new Thread(bananasThread, banana, "banana" + i).start(); } try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } // activeCount()方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值 System.out.println("当前活动线程总数: " + applesThread.activeCount()); // list()方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助 System.out.println("bananas组中的所有线程信息: "); bananasThread.list(); } } class Apple implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + ": I'm an apple"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Banana implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + ": I'm a banana"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
线程池是Java并发编程中一项重要的工具。
为解决此问题,我们可以使用JDK提供的工具——线程池。
JDK为线程池提供了一套Executor
框架,以帮助开发人员有效的进行线程控制
Executor框架的核心成员:
Interface Executor
interface ExecutorService extends Executor
abstract class AbstractExecutorService implements ExecutorService
class ThreadPoolExecutor extends AbstractExecutorService
:
- 实现了
Executor
接口,任何Runnable
的对象都可以被ThreadPoolExecutor
线程池调用- 是线程池工厂(Executors)重要的内部实现支持类。
interface ScheduledExecutorService extends ExecutorService
:
- 在
ExecutorService
接口之上扩展了在给定时间执行某任务的功能。- 如固定延时后执行、周期性执行等。
class Executors
:
- 线程池工厂,可以利用此工厂构建线程池
主要的工厂方法:
工厂方法 | 功能 |
---|---|
ExecutorService newFixedThreadPool(int nThreads) | 返回一个固定线程数量的线程池,当有新任务提交时,如有空闲线程则立即执行,否则暂存在任务队列中进行等待(先进先出)。 |
ExecutorService newSingleThreadExecutor() | 相当于newFixedThreadPool(1) 。 |
ExecutorService newCachedThreadPool() | 返回一个可根据实际情况调整的线程池,优先复用线程,线程不够时创建。 |
ScheduledExecutorService newSingleThreadScheduledExecutor() | 返回一个ScheduledExecutorService 对象,线程池大小为1。ScheduledExecutorService 接口在ExecutorService 接口之上扩展了在给定时间执行某任务的功能。 |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 与newSingleThreadScheduledExecutor() 类似,但此方法可以指定线程数量。 |
以 newFixedThreadPool(int nThreads) 为例,介绍线程池的使用:
public class TestPool { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable task = () -> { System.out.println(Thread.currentThread().getName() + ": hello thread pool!"); }; for (int i = 0; i < 4; i++) { executorService.submit(task); } // 执行完所有任务后,关闭线程池(不再接受新任务)。如果线程池已经关闭,则调用没有其他效果。 executorService.shutdown(); } }
Java线程池还提供了计划执行的功能,主要包括以下三个方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
方法会在给定的时间,对任务进行一次调度
... scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
对任务进行周期性的调度,任务的调度频率是唯一的(从第一次开始执行到第二次开始执行的时间是一定的)。
... scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
对任务进行周期性的调度,任务的调度频率不是唯一的(从第一次执行结束到第二次开始执行的时间是一定的)。
schedule举例:
public class ScheduledTask { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); Runnable runnable = () -> { System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法"); // 任务需要执行1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束"); }; System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start"); /* 延时2s后调用任务: schedule(Runnable command, long delay, TimeUnit unit) */ scheduledExecutorService.schedule(runnable, 2, TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } }
scheduleAtFixedRate举例:
public class ScheduledTask { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); Runnable runnable = () -> { System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法"); // 任务需要执行1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束"); }; System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start"); /* 每隔2s调用一次runnable任务: scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) */ scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS); } }
scheduleWithFixedDelay举例:
package com.ju.pool; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledTask { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); Runnable runnable = () -> { System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法"); // 任务需要执行1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束"); }; System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start"); /* 一开始延时5s开始执行任务,从前一次任务结束到下一次任务开始间隔2s: scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit) */ scheduledExecutorService.scheduleWithFixedDelay(runnable, 5, 2, TimeUnit.SECONDS); } }
除了计划执行外,其余三个工厂方法(newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
)的内部实现均使用了ThreadPoolExcutor
类。
ThreadPoolExcutor 类的一个参数较全的构造器如下所示:
public ThreadPoolExecutor(int corePoolSize, // 指定线程池中固定存在的线程数量 int maximumPoolSize, // 指定了线程池的最大线程数量 long keepAliveTime, // 指定了线程池中超过corePoolSize的空闲线程,在多长时间内会被销毁 TimeUnit unit, // keepAliveTime的单位 BlockingQueue<Runnable> workQueue, // 任务队列,由于存储已被提交但未被执行的任务 ThreadFactory threadFactory, // 线程工厂,由于创建线程,一般用默认的即可 RejectedExecutionHandler handler) // 拒绝策略:任务太多来不及处理时,如何拒绝任务
其中大部分的参数均比较容易理解,需要解释类型的有三个:
直接提交的队列(SynchronousQueue):
BlockingQueue
,没有容量,每一个插入操作都要等待一个相应的删除操作maximumPoolSize
),则执行拒绝策略有界的任务队列(ArrayBlockingQueue):
队列遵循先进先出规则
corePoolSize
有剩余则直接获得线程处理任务
corePoolSize
已满则将任务加入队列
如等待队列已满,在总线程不大于maximumPoolSize
的前提下创建新的线程执行任务
如大于maximumPoolSize
,则执行拒绝策略
无界的任务队列(LinkedBlockingQueue):
corePoolSize
有剩余则直接获得线程处理任务corePoolSize
已满则将任务加入队列优先任务队列(PriorityBlockingQueue):
用于创建线程池中的线程
可以自定义实现自己的线程工厂
public interface ThreadFactory { Thread newThread(Runnable r); }
AbortPolicy
: 直接抛出异常,阻之系统正常工作(默认的拒绝策略)CallerRunsPolicy
: 只要线程池未关闭,就直接在调用者线程中,运行当前被丢弃的任务DiscardOldestPolicy
: 丢弃 最“老” 的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务DiscardPolicy
:丢弃无法处理的任务,不予任何处理对于newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
方法,其方法内部均调用了下面的构造器。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public static ExecutorService newFixedThreadPool(int nThreads) { // 方法参数 nThreads,线程池中固定存在的线程数量 return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize 和 maximumPoolSize 均为 nThreads 0L, TimeUnit.MILLISECONDS, // 存活时间为0ms new LinkedBlockingQueue<Runnable>()); // 使用无界的任务队列(LinkedBlockingQueue) }
// 基本与 newFixedThreadPool 相同,只是限定了线程池中固定存在的线程数量为1 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // corePoolSize = 0; maximumPoolSize = Integer.MAX_VALUE; 60L, TimeUnit.SECONDS, // keepAliveTime = 60s; new SynchronousQueue<Runnable>()); // 直接提交的队列(SynchronousQueue) }
在默认的ThreadPoolExecutor 实现中,提供了空的beforeExecute()
、afterExecute()
和terminated()
方法。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。
beforeExecute()
:任务执行前被调用afterExecute()
:任务执行后被调用terminated()
:线程池退出前被调用public class MyPool { public static void main(String[] args) { ExecutorService myPool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(t.getName() + ": 任务执行前"); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("任务执行结束"); } @Override protected void terminated() { System.out.println("线程池退出"); } }; myPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "任务执行中..."); }); myPool.shutdown(); } }
“分而治之”是一个非常有效地处理大量数据的方法。
利用JDK提供的 Fork & Join 框架,我们可以十分方便的将一个大任务分解为多个子任务。当所有的子任务都执行完成后,在收集它们各自的结果,从而得到最终的结果。
使用示例:
// 利用fork&join,实现start到end的累加 public class TestForkJoinPool extends RecursiveTask<Long> { private static final int THRESHOLD = 30000; // 划分门槛 private static final int GROUP_SIZE = 10; // 每次划分的组数 private long start; // 计数起始值 private long end; // 计数终止值 public TestForkJoinPool(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; if (end - start < THRESHOLD) { for (long i = start; i <= end; i++) { sum += i; } } else { // 将任务划分位多个子任务 long step = (end - start + 1) / GROUP_SIZE; ArrayList<TestForkJoinPool> subTasks = new ArrayList<>(); long head = start; long tail; for (int i = 0; i < GROUP_SIZE; i++) { tail = head + step; if (tail > end) { tail = end; } TestForkJoinPool subTask = new TestForkJoinPool(head, tail); head = tail + 1; subTasks.add(subTask); subTask.fork(); } // 收集子任务的计算结果 for (TestForkJoinPool subTask : subTasks) { sum += subTask.join(); } } return sum; } public static void main(String[] args) { long startTime = 0; long endTime = 0; long result = 0; ForkJoinPool forkJoinPool = new ForkJoinPool(); startTime = System.currentTimeMillis(); ForkJoinTask<Long> taskResult = forkJoinPool.submit(new TestForkJoinPool(0, 2000000000L)); try { result = taskResult.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } endTime = System.currentTimeMillis(); System.out.println("fork&join计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms"); startTime = System.currentTimeMillis(); result = 0; for (int i = 0; i <= 2000000000L; i++) { result += i; } endTime = System.currentTimeMillis(); System.out.println("普通计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms"); } }
参考书籍: Java高并发程序设计(第二版)