产生背景:
经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
解决思路:
提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具。
package com.tian; import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 测试自定义线程池 */ @Slf4j(topic = "c.TestPool") public class TestPool { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(3, 1000, TimeUnit.MILLISECONDS, 5, (queue, task) -> { // 1. 死等 // queue.put(task); // 2) 带超时等待 queue.offer(task, 1500, TimeUnit.MILLISECONDS); // 3) 让调用者放弃任务执行 // log.debug("放弃{}", task); // 4) 让调用者抛出异常 抛出异常后 下面的任务都不会继续执行 // throw new RuntimeException("任务执行失败 " + task); // 5) 让调用者自己执行任务 // task.run(); }); for (int i = 0; i < 10; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } /** * 拒绝策略: 函数式接口 */ @FunctionalInterface interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); } /** * 线程池 */ @Slf4j(topic = "c.ThreadPool") class ThreadPool { // 任务队列 private BlockingQueue<Runnable> taskQueue; // 线程集合 private final HashSet<Worker> workers = new HashSet<>(); // 核心线程数 private final int coreSize; // 获取任务时的超时时间 private final long timeout; private final TimeUnit timeUnit; private final RejectPolicy<Runnable> rejectPolicy; // 执行任务 public void execute(Runnable task) { // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker(task); log.debug("新增 worker{}, {}", worker, task); workers.add(worker); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { log.debug("正在执行...{}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { log.debug("worker 被移除{}", this); workers.remove(this); } } } } /** * 任务(阻塞)队列 * * @param <T> */ @Slf4j(topic = "c.BlockingQueue") class BlockingQueue<T> { // 1. 任务队列 private final Deque<T> queue = new ArrayDeque<>(); // 2. 锁 private final ReentrantLock lock = new ReentrantLock(); // 3. 生产者条件变量 private final Condition fullWaitSet = lock.newCondition(); // 4. 消费者条件变量 private final Condition emptyWaitSet = lock.newCondition(); // 5. 容量 private final int capacity; public BlockingQueue(int capacity) { this.capacity = capacity; } // 带超时阻塞获取 public T poll(long timeout, TimeUnit unit) { lock.lock(); try { // 将 timeout 统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { // 返回值是剩余时间 if (nanos <= 0) { return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞获取 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // 阻塞添加 public void put(T task) { lock.lock(); try { while (queue.size() == capacity) { try { log.debug("等待加入任务队列 {} ...", task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 带超时时间阻塞添加 public boolean offer(T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capacity) { try { if (nanos <= 0) { return false; } log.debug("等待加入任务队列 {} ...", task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { // 判断队列是否满 if (queue.size() == capacity) { rejectPolicy.reject(this, task); } else { // 有空闲 log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }
运行结果: