/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;// 32-3=29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 2的29次方-1,0001... 低29位表示线程数最大数,高3位表示executors状态 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }// 运行状态,即上面的 RUNNING等 private static int workerCountOf(int c) { return c & CAPACITY; } // worker即工人数量 private static int ctlOf(int rs, int wc) { return rs | wc; }// runState 与 workerCount的和
private final ReentrantLock mainLock = new ReentrantLock();// 访问works的锁 private final HashSet<Worker> workers = new HashSet<Worker>();
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
自旋+CAS:增加 ctl内 worker数量
new Worker,再加入 works
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))// 数量限制与workers数量比较,决定能否新增worker return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
public void run() { runWorker(this); }
1、取任务,来自 firstTask 或者 getTask()
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true;// 突然中断,如果while条件未满足则非突然的,其他都是突然的 try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
1、是否淘汰(核心线程运行超时 或 worker数量大于corePoolSize)
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 是否淘汰 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 需要淘汰,超时时间内获取任务,此处使用空闲时间 workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
1、完成任务计数 更新
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();// 是突然的,ctl的work计数未调整,此处调整 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;// 完成任务计数 更新 workers.remove(w);// 移除当前worker } finally { mainLock.unlock(); } tryTerminate();// 不太清楚有什么用 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) {// 正常完成 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许核心线程空闲超时时死亡,则线程池最小线程数为0;否则最小线程数是corePoolSize if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
int corePoolSize 核心线程数 int maximumPoolSize 最大线程数 queueSize 队列长度
每秒请求数(QPS,如 100~1000)+
corePoolSize = QPS/(1/COST) = QPS/2 = 50~500
其中 (1/COST) 可以理解为单个线程 1s内可以完成的请求数 n(0<n<无限大),此处为 2,即 1s内一个线程能完成 2个请求
哦,网上还说了个什么 8020原则,貌似希望核心线程数满足 80% 的最大请求数,那么此处应该就是 400
queueSize = (MAXRSP-COST) * (max(QPS)-corePoolSize*(1/COST)) = 1.5 * (1000-800) = 300
最大响应时间 2s - 请求耗时 0.5s = 最长待 1.5秒,即 1.5s内核心线程数可以堆积的任务数
maximumPoolSize = max(QPS)/(1/COST) = 500
太大,创建过多线程,OOM;应该大于corePoolSize=400 但是小于最大 QPS 所需线程数=500