//Thread.java //调用start启动线程,进入Runnable状态,等待系统调度执行 public synchronized void start(){//synchronized同步执行 if (threadStatus != 0) //0 代表new状态,非0则抛出错误 throw new IllegalThreadStateException(); ... start0(); //本地方法方法 private native void start0() ... } //Running状态,新线程执行的代码方法,可被子类重写 public void run() { if (target != null) { //target是Runnable,new Thread(Runnable)时传入 target.run(); } }
//Thread.java @Deprecated public final void stop(); //中断线程 public void interrupt() //判断的是当前线程是否处于中断状态 public static boolean interrupted()
interrupt函数中断线程,但它不一定会让线程退出的。它比stop函数优雅,可控制
//Thread.java //阻塞等待其他线程 public final synchronized void join(final long millis) //暂时让出CPU执行 public static native void yield(); //休眠一段时间 public static native void sleep(long millis) throws InterruptedException;
start与run方法的区别
Thread.sleep与Object.wait区别
//ThreadPoolExecutor.java public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
//ThreadPoolExecutor.java public void execute(Runnable command) { ... if (workerCountOf(c) < corePoolSize) { //plan A if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //plan B int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //addWorker(command, false) false代表可创建非核心线程来执行任务 else if (!addWorker(command, false)) //plan C reject(command); // //plan D }
操作方法 | 抛出异常 | 返回特殊值 | 阻塞线程 | 超时退出 |
---|---|---|---|---|
插入元素 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除元素 | remove() | poll() | take() | pull(timeout, unit) |
检查 | element() | peek() | 无 | 无 |
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
DelayQueue
SynchronousQueue
LinkedTransferQueue
LinkedBlockingDeque
//Executors.java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
//Executors.java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
//Executors.java public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } //指定延迟执行时间 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
//Executors.java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); //无界队列 }
public List<Runnable> shutdownNow() { ... final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //加锁 try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); //interrupt关闭线程 tasks = drainQueue(); //未执行任务 ...
先考虑下为啥线程池的线程不会被释放,它是怎么管理线程的生命周期的呢
//ThreadPoolExecutor.Worker.class final void runWorker(Worker w) { ... //工作线程会进入一个循环获取任务执行的逻辑 while (task != null || (task = getTask()) != null) ... } private Runnable getTask(){ ... Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //线程会阻塞挂起等待任务, ... }
可以看出,无任务执行时,线程池其实是利用阻塞队列的take方法挂起,从而维持核心线程的存活
//Worker class,一个worker一个线程 Worker(Runnable firstTask) { //禁止新线程未开始就被中断 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } final void runWorker(Worker w) { .... //对应构造Worker是的setState(-1) w.unlock(); // allow interrupts boolean completedAbruptly = true; .... w.lock(); //加锁同步 .... try { ... task.run(); afterExecute(task, null); } finally { .... w.unlock(); //释放锁 }
worker继承AQS的意义:A 禁止线程未开始就被中断;B 同步runWorker方法的处理逻辑
A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.
如果任务被拒绝了,则由提交任务的线程执行此任务
当线程执行完自己deque的任务,且其他线程deque还有多的任务,则会启动窃取策略,从其他线程deque队尾获取线程
//该demo代码是引用他人的,如有侵权,请联系我 public class ForkJoinPoolTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); for (int i = 0; i < 10; i++) { ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i)); System.out.println(task.get()); } } static class Fibonacci extends RecursiveTask<Integer> { int n; public Fibonacci(int n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) { return n; } Fibonacci fib1 = new Fibonacci(n - 1); fib1.fork(); //相当于开启新线程执行 Fibonacci fib2 = new Fibonacci(n - 2); fib2.fork(); //相当于开启新线程执行 return fib1.join() + fib2.join(); //合并返回结果 } } }
https://juejin.im/post/5f016b...