步骤 1 :自定义拒绝策略接口
@FunctionalInterface // 线程池拒绝策略 interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); }
步骤 2 :自定义任务队列
@Slf4j(topic = "c.BlockingQueue") class BlockingQueue<T> { // 1. 任务队列, 双向队列 private final Deque<T> queue = new ArrayDeque<>(); // 2. 锁 private final ReentrantLock lock = new ReentrantLock(); // 3. 生产者条件变量 private final Condition fullWaitSet = lock.newCondition(); // 4. 消费者条件变量 private final Condition emptyWaitSet = lock.newCondition(); // 5. 队列容量 private final int capacity; public BlockingQueue(int capacity) { this.capacity = capacity; } // 容量大小 public int size() { lock.lock(); try { return this.capacity; } finally { lock.unlock(); } } // 阻塞获取 @Deprecated public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.wait(); // 阻塞等待 } catch (Exception e) { // 任务被打断 e.printStackTrace(); } } T t = this.queue.removeFirst(); fullWaitSet.signal(); // 唤醒 return t; } finally { lock.unlock(); } } // 带超时的阻塞获取方法 public T poll(long timeout, TimeUnit unit) { lock.lock(); try { // 将timeout统一转化为纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0) return null; // 返回的是剩余时间 nanos = emptyWaitSet.awaitNanos(nanos); // 阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } T t = this.queue.removeFirst(); fullWaitSet.signal(); // 唤醒 return t; } finally { lock.unlock(); } } // 阻塞添加 @Deprecated public void put(T element) { lock.lock(); try { while (queue.size() == this.capacity) { try { log.debug("任务等待加入任务队列: {}", element); fullWaitSet.await(); // 阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(element); log.debug("任务加入任务队列: {}", element); emptyWaitSet.signal(); // 唤醒 } finally { lock.unlock(); } } // 带超时时间的阻塞添加方法 public boolean offer(T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == this.capacity) { try { if (nanos <= 0) { return false; } log.debug("任务等待加入任务队列: {}", task); nanos = fullWaitSet.awaitNanos(nanos);// 阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(task); log.debug("任务加入任务队列: {}", task); emptyWaitSet.signal(); // 唤醒 return true; } finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capacity) { // 队列已满 rejectPolicy.reject(this, task); } else { // 有空闲 queue.addLast(task); log.debug("任务加入任务队列: {}", task); emptyWaitSet.signal(); // 唤醒 } } finally { lock.unlock(); } } }
步骤 3 :自定义线程池
@Slf4j(topic = "c.ThreadPool") class ThreadPool { // 任务队列 private final BlockingQueue<Runnable> taskQueue; // 线程集合 private final HashSet<Worker> workers = new HashSet<>(); // 线程池中核心线程数 private final int coreSize; // 获取任务的超时时间 private final long timeout; private final TimeUnit timeUnit; private final RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapacity); this.rejectPolicy = rejectPolicy; } // 执行任务 public void execute(Runnable task) { // 任务数未超过coreSize时, 直接交给 worker 对象执行 // 任务数超过 coreSize时, 加入任务队列缓存 synchronized (workers) { if (workers.size() < this.coreSize) { Worker worker = new Worker(task); log.debug("任务直接执行: {}, 新增 worker: {}", task, worker); workers.add(worker); worker.start(); } else { // taskQueue.put(task); // 死等 // 线程池的拒绝策略 // 1. 死等 // 2. 带超时等待 // 3. 放弃任务执行 // 4. 抛出异常 // 5. 让调用者自己执行任务 // ... // 策略模式 taskQueue.tryPut(rejectPolicy, task); } } } class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1. 当task不为null, 执行任务 // 2. 当task执行完毕, 从任务队列中获取task并执行 /*while (task != null || (task = taskQueue.take()) != null) {*/ while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { log.debug("worker: {}, 正在执行task: {}", this, task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } // 移除当前worker synchronized (workers) { workers.remove(this); log.debug("worker被移除: {}", this); } } } }
步骤 4 :测试
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @Slf4j(topic = "c.TestPool") public class TestPool { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool( 1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> { // 策略1. 死等 // queue.put(task); // 策略2, 带超时的等待 // queue.offer(task, 1500, TimeUnit.MILLISECONDS); // 策略3, 让调用者放弃任务执行 // log.debug("放弃任务执行...{}", task); // (无逻辑) // 策略4, 让调用者自己抛出异常 // throw new RuntimeException("任务执行失败... " + task); // 策略5, 让调用者自己执行任务 task.run(); }); for (int i = 0; i < 4; i++) { String taskName = "task-" + i; threadPool.execute(() -> { log.debug("{}", taskName); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
// c 为旧值, ctlOf 返回结果为新值 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们 private static int ctlOf(int rs, int wc) { return rs | wc; }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特点
评价 适用于任务量已知,相对耗时的任务
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j(topic = "c.TestThreadPoolExecutors") public class TestThreadPoolExecutors { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute(() -> log.debug("task1")); pool.execute(() -> log.debug("task2")); pool.execute(() -> log.debug("task3")); pool.execute(() -> log.debug("task4")); pool.execute(() -> log.debug("task5")); } }
自定义线程工厂:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @Slf4j(topic = "c.TestThreadPoolExecutors") public class TestThreadPoolExecutors { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2, // 自定义线程工厂 new ThreadFactory() { private final AtomicInteger t = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "myPool-t-" + t.getAndIncrement()); } } ); pool.execute(() -> log.debug("task1")); pool.execute(() -> log.debug("task2")); pool.execute(() -> log.debug("task3")); pool.execute(() -> log.debug("task4")); pool.execute(() -> log.debug("task5")); } }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特点
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.TestSynchronousQueue") public class TestSynchronousQueue { public static void main(String[] args) throws InterruptedException { SynchronousQueue<Integer> integers = new SynchronousQueue<>(); new Thread(() -> { try { log.debug("putting {} ", 1); integers.put(1); // 阻塞 log.debug("{} putted...", 1); log.debug("putting {} ", 2); integers.put(2); log.debug("{} putted...", 2); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { log.debug("taking {}", 1); Integer i = integers.take(); // 结束阻塞 System.out.println("i: " + i); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { log.debug("taking {}", 2); Integer i = integers.take(); System.out.println("i: " + i); } catch (InterruptedException e) { e.printStackTrace(); } }, "t3").start(); } }
输出
评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1 分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
使用场景:
希望多个任务排队执行。线程数固定为 1 ,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
区别:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j(topic = "c.TestExecutors") public class TestExecutors { public static void main(String[] args) { test2(); } private static void test2() { ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(() -> { log.debug("task: 1"); int i = 1 / 0; }); pool.execute(() -> log.debug("task: 2")); pool.execute(() -> log.debug("task: 3")); } }
// 执行任务 void execute(Runnable command); // 提交任务 task,用返回值 Future 获得任务执行结果 <T> Future<T> submit(Callable<T> task); // 提交 tasks 中所有任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 提交 tasks 中所有任务,带超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
submit
栗子:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; @Slf4j(topic = "c.TestSubmit") public class TestSubmit { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { log.debug("running..."); TimeUnit.SECONDS.sleep(2); log.debug("create a result!"); return "one fine"; } }); String result = future.get(); // 阻塞 log.debug("result: {}", result); } }
invokeAll
栗子:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; @Slf4j(topic = "c.TestInvokeAll") public class TestInvokeAll { public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> futures = pool.invokeAll(Arrays.asList( () -> { log.debug("begin-1"); TimeUnit.SECONDS.sleep(1); return "1"; }, () -> { log.debug("begin-2"); TimeUnit.SECONDS.sleep(2); return "2"; }, () -> { log.debug("begin-3"); TimeUnit.SECONDS.sleep(2); return "3"; } )); futures.forEach(f -> { try { String result = f.get(); log.debug("result: " + result); } catch (Exception e) { e.printStackTrace(); } }); } }
invokeAny
栗子:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.concurrent.*; @Slf4j(topic = "c.TestInvokeAny") public class TestInvokeAny { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(1); String result = pool.invokeAny(Arrays.asList( () -> { log.debug("begin-1"); TimeUnit.SECONDS.sleep(1); log.debug("end-1"); return "1"; }, () -> { log.debug("begin-2"); TimeUnit.SECONDS.sleep(2); log.debug("end-2"); return "2"; }, () -> { log.debug("begin-3"); TimeUnit.SECONDS.sleep(2); log.debug("end-3"); return "3"; } )); log.debug("{}", result); } }
定义:
/* 线程池状态变为 SHUTDOWN - 不会接收新任务 - 但已提交任务会执行完 - 此方法不会阻塞调用线程的执行 */ void shutdown();
实现:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); }
栗子:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.TestShutDown") public class TestShutDown { public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future<Integer> result1 = pool.submit(() -> { log.debug("task 1 running..."); TimeUnit.SECONDS.sleep(1); log.debug("task 1 finish!"); return 1; }); Future<Integer> result2 = pool.submit(() -> { log.debug("task 2 running..."); TimeUnit.SECONDS.sleep(1); log.debug("task 2 finish!"); return 2; }); Future<Integer> result3 = pool.submit(() -> { log.debug("task 3 running..."); TimeUnit.SECONDS.sleep(4); log.debug("task 3 finish!"); return 1; }); log.debug("shutdown start..."); pool.shutdown(); log.debug("shutdown end!"); boolean b = pool.awaitTermination(3, TimeUnit.SECONDS); // 阻塞3s log.debug("阻塞结束..."); // java.util.concurrent.RejectedExecutionException // Future<Integer> result4 = pool.submit(() -> { // log.debug("task 4 running..."); // TimeUnit.SECONDS.sleep(1); // log.debug("task 4 finish!"); // return 1; // }); } }
定义:
/* 线程池状态变为 STOP - 不会接收新任务 - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务 */ List<Runnable> shutdownNow();
实现:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终结 tryTerminate(); return tasks; }
栗子:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.TestShutDown") public class TestShutDown { public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future<Integer> result1 = pool.submit(() -> { log.debug("task 1 running..."); TimeUnit.MILLISECONDS.sleep(400); log.debug("task 1 finish!"); return 1; }); Future<Integer> result2 = pool.submit(() -> { log.debug("task 2 running..."); TimeUnit.SECONDS.sleep(2); log.debug("task 2 finish!"); return 2; }); Future<Integer> result3 = pool.submit(() -> { log.debug("task 3 running..."); TimeUnit.SECONDS.sleep(4); log.debug("task 3 finish!"); return 3; }); Future<Integer> result4 = pool.submit(() -> { log.debug("task 4 running..."); TimeUnit.SECONDS.sleep(4); log.debug("task 4 finish!"); return 4; }); TimeUnit.MILLISECONDS.sleep(500); log.debug("shutdownNow start..."); List<Runnable> runnables = pool.shutdownNow(); // 未执行的任务, 这里是task4 log.debug("shutdownNow end!"); log.debug("runnables: {}", runnables); } }
// 不在 RUNNING 状态的线程池,此方法就返回 true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
Worker Thread
-工作线程让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工
固定大小线程池会有饥饿现象
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j(topic = "c.TestStarvation") public class TestStarvation { static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅"); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(() -> { log.debug("处理点餐..."); Future<String> f = executorService.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); /*executorService.execute(() -> { log.debug("处理点餐..."); Future<String> f = executorService.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } });*/ } }
输出
当注释取消后,可能的输出(注意:这里是饥饿,并不是死锁)
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j(topic = "c.TestDeadLock") public class TestDeadLock { static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅"); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService waiterPool = Executors.newFixedThreadPool(1); ExecutorService cookPool = Executors.newFixedThreadPool(1); waiterPool.execute(() -> { log.debug("处理点餐..."); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); waiterPool.execute(() -> { log.debug("处理点餐..."); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
输出
通常采用 cpu 核数 + 1
能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 10% = 40
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer
来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.TestTimer") public class TestTimer { public static void main(String[] args) { Timer timer = new Timer(); TimerTask task1 = new TimerTask() { @Override public void run() { log.debug("task 1"); // int i = 1 / 0; try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }; TimerTask task2 = new TimerTask() { @Override public void run() { log.debug("task 2"); } }; log.debug("main start..."); // 使用 timer 添加两个任务,希望它们都在 1s 后执行 // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行 timer.schedule(task1, 1000); timer.schedule(task2, 1000); } }
输出
使用 ScheduledExecutorService
改写:–延时执行任务
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.TestScheduledExecutorService") public class TestScheduledExecutorService { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); log.debug("main start..."); // 添加两个任务,希望它们都在 1s 后执行 executor.schedule(() -> { log.debug("task1 start..."); // int i = 1 / 0; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }, 1000, TimeUnit.MILLISECONDS); executor.schedule(() -> { log.debug("task2 start..."); }, 1000, TimeUnit.MILLISECONDS); } }
输出
scheduleAtFixedRate
例子:–定时执行任务
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.ScheduleAtFixedRateTest") public class ScheduleAtFixedRateTest { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("main start..."); // delay pool.scheduleAtFixedRate(() -> { log.debug("running..."); }, 1, 1, TimeUnit.SECONDS); } }
输出
scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.ScheduleAtFixedRateTest") public class ScheduleAtFixedRateTest { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("main start..."); // delay pool.scheduleAtFixedRate(() -> { log.debug("running..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 1, TimeUnit.SECONDS); // delay从上一次任务结束时开始算 /*pool.scheduleWithFixedDelay(() -> { log.debug("running..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 1, TimeUnit.SECONDS);*/ } }
输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
scheduleWithFixedDelay 例子:
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.ScheduleAtFixedRateTest") public class ScheduleAtFixedRateTest { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("main start..."); // delay /*pool.scheduleAtFixedRate(() -> { log.debug("running..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 1, TimeUnit.SECONDS);*/ // delay从上一次任务结束时开始算 pool.scheduleWithFixedDelay(() -> { log.debug("running..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 1, TimeUnit.SECONDS); } }
输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s
评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务
方法 1 :主动捉异常
private static void test1() { ExecutorService pool = Executors.newFixedThreadPool(1); pool.submit(() -> { try { log.debug("task1"); int i = 1 / 0; } catch (Exception e) { log.error("error:", e); } }); }
输出
方法 2 :使用 Future
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j(topic = "c.TestScheduleException") public class TestScheduleException { public static void main(String[] args) { // test1(); test2(); } private static void test2() { ExecutorService pool = Executors.newFixedThreadPool(1); Future<Boolean> f = pool.submit(() -> { log.debug("task1"); int i = 1 / 0; return true; }); Boolean result = null; try { result = f.get(); } catch (Exception e) { log.error("error: {}", e.getMessage()); } log.debug("result:{}", result); } private static void test1() { ExecutorService pool = Executors.newFixedThreadPool(1); pool.submit(() -> { try { log.debug("task1"); int i = 1 / 0; } catch (Exception e) { log.error("error: {}", e.getMessage()); } }); } }
输出
如何让每周四 18:00:00 定时执行任务?
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.time.DayOfWeek; import java.time.Duration; import java.time.LocalDateTime; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j(topic = "c.TestSchedule") public class TestSchedule { public static void main(String[] args) { // 获得当前时间 LocalDateTime now = LocalDateTime.now(); // 获取本周四 18:00:00.000 LocalDateTime thursday = now.with(DayOfWeek.THURSDAY) // 本周四 .withHour(18).withMinute(0).withSecond(0).withNano(0); // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000 if (now.compareTo(thursday) >= 0) { thursday = thursday.plusWeeks(1); // 下周四 } // 计算时间差,即延时执行时间 long initialDelay = Duration.between(now, thursday).toMillis(); // 计算间隔时间,即 1 周的毫秒值 long oneWeek = 7 * 24 * 3600 * 1000; ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); System.out.println("开始时间:" + new Date()); executor.scheduleAtFixedRate(() -> { System.out.println("执行时间:" + new Date()); }, initialDelay, oneWeek, TimeUnit.MILLISECONDS); } }
Tomcat 在哪里用到了线程池呢
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
源码 tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent.isShutdown() ) throw new RejectedExecutionException( "Executor not running, can't force a command into the queue" ); return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected }
Connector 配置
Executor 线程配置
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; @Slf4j(topic = "c.TestForkJoin2") public class TestForkJoin { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); Integer result = pool.invoke(new MyTask(5)); log.debug("result: {}", result); } } // 求1-n之间整数的和 @Slf4j(topic = "c.MyTask") class MyTask extends RecursiveTask<Integer> { private final int n; public MyTask(int n) { this.n = n; } @Override public String toString() { return "{" + n + '}'; } // 做任务拆分逻辑 @Override protected Integer compute() { // 终止条件; 如果 n 已经为 1,可以求得结果了 if (n == 1) { log.debug("join() {}", n); return n; } // 将任务进行拆分(fork) MyTask t1 = new MyTask(n - 1); t1.fork(); // 让线程执行此任务 log.debug("fork(): {} + {}", n, t1); // 合并(join)结果 int result = n + t1.join(); // 当前结果 与 获取任务的结果 相加 log.debug("join(): {} + {} = {}", n, t1, result); return result; } }
结果
改进
package top.onefine.test.c8; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; @Slf4j(topic = "c.TestForkJoin2") public class TestForkJoin2 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); Integer result = pool.invoke(new MyTask2(1, 5)); log.debug("result: {}", result); } } // 求1-n之间整数的和 @Slf4j(topic = "c.MyTask2") class MyTask2 extends RecursiveTask<Integer> { int begin; int end; public MyTask2(int begin, int end) { this.begin = begin; this.end = end; } @Override public String toString() { return "{" + begin + "," + end + '}'; } @Override protected Integer compute() { // 5, 5 if (begin == end) { log.debug("join() {}", begin); return begin; } // 4, 5 if (end - begin == 1) { log.debug("join() {} + {} = {}", begin, end, end + begin); return end + begin; } // 1 5 int mid = (end + begin) / 2; // 3 MyTask2 t1 = new MyTask2(begin, mid); // 1,3 t1.fork(); MyTask2 t2 = new MyTask2(mid + 1, end); // 4,5 t2.fork(); log.debug("fork() {} + {} = ?", t1, t2); int result = t1.join() + t2.join(); log.debug("join() {} + {} = {}", t1, t2, result); return result; } }
结果
用图来表示