& 是所有的2进制位数“与”出的最终结果,“与”的规则是两者都为1时才得1,否则就得0
~(A) = -(A+1)
例如:1 << 2 向左移2位
0001 -> 0100 = 4
Integer.SIZE 为何是32?
Integer 值的大小范围为 -231~231-1,int类型数据占4字节,1一个 字节8个bit位。
-1的二进制表示
先取1的原码:00000000 00000000 00000000 00000001
得反码: 11111111 11111111 11111111 11111110
得补码: 11111111 11111111 11111111 11111111
newFixedThreadPool newSingleThreadExecutor newCachedThreadPool
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) /** 参数: corePoolSize : 核心线程数 maximumPoolSize:最大线程数 keepAliveTime:非核心线程数最大的空闲时间 TimeUnit:时间单位 workQueue:工作队列 1. 同步队列 SynchronousQueue 长度为0 2. 有界队列 ArrayBlockingQueue 长度自己指定 3. 无界队列 LinkedBlockingQueue 无界 threadFactory:线程工厂,用来创建线程池中的线程的,可以自己实现这个线程工厂。 handler:拒绝策略,可以自己实现拒绝策略。 jdk自身实现了4种: (1)ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出 RejectedExecutionException 异常。默认 (2)ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用线程执行该任务。 (3)ThreadPoolExecutor.DiscardOldestPolicy : 抛弃队列最前面的任务,然后重新尝试执行任务。 (4)ThreadPoolExecutor.DiscardPolicy,丢弃任务,不抛出异常。 注:当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。 **/
几种常用写法(入口调用不同方法,底层都是调用的ThreadPoolExecutor的execute方法)
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ThreadTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 100, 60, TimeUnit.MICROSECONDS, new SynchronousQueue<>(), new MyThreadFactory(), new MyRejectedExecutionHandler()); try { // for (int i = 1; i < 100; i++) { // threadPoolExecutor.execute(new MyTask(i)); // } // // for (int i = 1; i < 100; i++) { // threadPoolExecutor.submit(new MyCallable(i)); // } List<MyCallable> tasks = new ArrayList<MyCallable>(); for (int i = 1; i <= 105; i++) { tasks.add(new MyCallable(i)); } threadPoolExecutor.invokeAll(tasks); } catch (Exception e) { e.printStackTrace(); } finally { threadPoolExecutor.shutdown(); } } } class MyTask extends Thread { private int i; public MyTask(int i) { this.i = i; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "=" + i); } } class MyCallable implements Callable<Object> { private int i; public MyCallable(int i) { this.i = i; } @Override public Object call() throws Exception { System.out.println("Callable" + Thread.currentThread().getName() + "=" + i); return i; } } // 自定义线程工厂 class MyThreadFactory implements ThreadFactory{ private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; private final Long stackSize; MyThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "myPool" + poolNumber.getAndIncrement() + "-thread-"; stackSize = 1024L; } /** * 创建线程时指定线程所在组,线程预期栈大小,优先级,是否为守护线程,名字等。 * * @param r * @return */ public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), stackSize); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } // 自定义任务拒绝策略 class MyRejectedExecutionHandler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("任务被拒绝"); } }
任务添加顺序:
核心线程–》工作队列–》非核心线程。见源码解析
任务执行顺序:
核心线程–》非核心线程–》工作队列。见源码解析
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // ctl = running private static final int COUNT_BITS = Integer.SIZE - 3; //COUNT_BITS = 29; private static final int CAPACITY = (1 << COUNT_BITS) - 1; //0010 - 1= 2^29-1 // runState is stored in the high-order bits 线程池运行状态存储在高3位,剩下的低29位用来记录当前工作线程数量,高明之处就是一个int类型的值,去记录线程池中几个维度的状态和数据 // 能接受新任务以及处理已添加的任务 private static final int RUNNING = -1 << COUNT_BITS; // 1110 // 不接受新任务,可以处理已经添加的任务,调用shutdown方法时 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0000 // 不接受新任务,不处理已经添加的任务,并且中断正在处理的任务 调用shutdownnow方法时 private static final int STOP = 1 << COUNT_BITS; // 0010 // 所有的任务已经终止,ctl记录的“任务数量” 为0 ,ctl负责记录线程池的运行状态与活动线程数量 private static final int TIDYING = 2 << COUNT_BITS; // 0100 // 线程池彻底终止,则线程池转变为terminated 状态 private static final int TERMINATED = 3 << COUNT_BITS; // 0110 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 获取ctl的值,初始值是running的值 int c = ctl.get(); // 获取到工作线程的数量 和 核心线程数进行比较,小于核心线程数,则增加ctl的值+1 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池是不是running状态,同时往队列里面添加任务是否成功 if (isRunning(c) && workQueue.offer(command)) { // int recheck = ctl.get(); // 再次判断是不是running状态,不是的话,直接移除任务 if (! isRunning(recheck) && remove(command)) // 拒绝任务 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 核心线程已满 队列已满,就走下面 else if (!addWorker(command, false)) reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 除了加入的队列的任务之外,其他的任务进来都会把ctl这个值加1,代表线程池的工作线程数加1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 池中线程启动的地方 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 这里体现了线程池中线程的复用,这里就有一个问题,这里和调addWorker的地方是异步的,所以这里可能是核心线程触发的,但是会读到后续添加到队列中的任务 6 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 我们传递的任务被执行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; ****** w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { // 是否异常结束的 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 减少线程池中工作线程数量 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 非原子操作,固加锁 completedTaskCount += w.completedTasks; // 从线程池中移除当前工作线程 workers.remove(w); } finally { // 解锁 mainLock.unlock(); } // 尝试中断线程池中的线程,假如当前线程池不处于running状态 tryTerminate(); // 获取ctl值 int c = ctl.get(); // 判断线程池的状态是不是小于stop的 就是 running,shutdown ,假如上一步将线程池的状态改为了TIDYING 或者 TERMINATED 状态下面就不会走了 if (runStateLessThan(c, STOP)) { // 很显然这里是true if (!completedAbruptly) { // min = corePoolSize 一般不会0,看自己设置的是多大 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 假如为0,同时工作队列不是空的,那么min = 1; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果线程池的工作线程 大于或等于了 min (我觉得就可以理解为核心线程数) 直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); // ? } } // 这个方法 只要return null, 这个工作线程就会死掉,同时线程池中的工作线程数就减一 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // The pool is stopped. The pool is shutdown and the queue is empty. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // 核心线程是不是开启了超时设置 工作线程是不是大于核心线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize). if ((wc > maximumPoolSize || (timed && timedOut)) // 7 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 1 timed 为true 时,一般来说就是工作线程大于了核心线程,非核心线程就需要就等待最长空闲时间,2.timed为false时,只有核心线程,然后就一直wait 主线程往队列里面添加任务,一有任务就开始处理,直到shutdown Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); // 8 // poll 是等待这个时间,没有的话就返回null,take 是 阻塞 if (r != null) return r; // 当r为null时,代表非核心线程超时了,再次循环,7的地方,就直接返回null了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } // 由于每次都是核心线程最开始执行,非核心线程后开始执行,所以当池中只有核心线程时,timed 就是 false , 当池中有非核心线程时,timed 就是 true ,所以上面 8处就是来处理空闲的非核心线程的
总结:我们传递给线程池的任务task,最后在worker(线程池中的线程)的run方法中被执行,而且是以task.run()的方法进行直接调用的。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
疑问:
核心线程和非核心线程都是用ctl这个值来维护的,
是的,都是由ctl这个值来维护。
线程池如何区分池中的线程是核心的还是非核心的呢?没有看到worker携带相关标识
不用区分,在没有设置核心线程超时的情况时,非核心线程是优先被清理的。
workers的数量何时减少,我们是设置了一个空闲等待时间,啥时候触发?
由于每次都是核心线程最开始执行,非核心线程后开始执行,所以当池中只有核心线程时,timed 就是 false , 当池中有非核心线程时,timed 就是 true ,所以上面 8处就是来处理空闲的非核心线程的
什么时候从队列中获取任务并执行的?
当工作线程(包括核心非核心)执行完首次分配的任务后,都会从队列中获取任务
线程的复用在哪里体现了?
第6处
为何设置核心线程数可以超时时,当核心线程空闲时间超过空闲时间时,线程池会自动退出?
因为线程池的运行状态 也是由内部的工作线程维持的,当线程池中没有工作线程是,自然就停止了。
源码中所用设计模式: