C/C++教程

ThreadPoolExecutor线程池原理+源码,了解,字节跳动技术整理

本文主要是介绍ThreadPoolExecutor线程池原理+源码,了解,字节跳动技术整理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 线程池状态 >= SHUTDOWN
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
    	// 内层自旋
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            // 工作中的线程数大于线程池的容量,或者已经大于等于核心线程数,或者大于等于最大线程数
            // core为true,表示要创建核心线程,false表示要创建非核心线程
            // 为什么大于等核心线程数的时候要返回false,因为要添加到缓冲队列,或者创建非核心线程来执行,不能创建核心线程了
            return false;
        if (compareAndIncrementWorkerCount(c))
        	// 以CAS的方式尝试把线程数加1
        	// 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程
        	// 成功后跳出内层自旋
            break retry;
        // CAS失败,再次获取ctl,检查线程池状态    
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
        	// 线程池状态被改变了,从外层自旋开始再次执行之前的逻辑
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}
// 可以看到两层自旋 + CAS,仅仅是为了把线程池中的线程数加1,还没有新建线程

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
	// 把task包装成Worker
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            // 获取锁之后,再次检查线程池的状态
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                	// 检查线程状态
                    throw new IllegalThreadStateException();
                // 添加到worders
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                	// 维护largestPoolSize变量
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
        	// 解锁
            mainLock.unlock();
        }
        if (workerAdded) {
        	// 添加成功
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
    	// 执行worker的线程启动失败
        addWorkerFailed(w);
}
return workerStarted;

}

可以看到`addWorker`方法前一部分,用了外层**自旋**判断线程池的状态,内层**自旋 + CAS**给线程池中的线程数加1。后半部分用了`ReentrantLock`保证创建`Worker`对象,以及启动线程的线程安全。一个方法中三次获取了线程池的状态(不包含该方法调用的其他方法),因为每两次之间,线程池的状态都有可能被改变。

### runWorker

前文在介绍`Worker`内部类时说过,`Worker`会把自己传递给`ThreadFactory`创建的线程执行,最终执行`Worker`的`run`方法,而`Worker`类的`run`方法只有一行代码:

runWorker(this);

所以接下来看看`runWorker`方法是如何实现了

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许外部中断
w.unlock(); // allow interrupts
// 记录worker是不是异常退出的
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 自旋,如果task不为空,或者能从缓冲队列(阻塞队列)中获取任务就继续执行,不能就一直阻塞
// 加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果线程池正在停止,并且当前线程没有被中断,就中断当前线程
wt.interrupt();
try {
// 钩子函数,处理task执行前的逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数,处理task执行后的逻辑
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的任务数加1
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 运行到这里,说明worker没有异常退出
completedAbruptly = false;
} finally {
// 自旋操作被打断了,说明线程需要被回收
processWorkerExit(w, completedAbruptly);
}
}

第10行代码中,task为null时,会通过`getTask()`方法从缓冲队列中取任务,因为缓冲队列是阻塞队列,所以如果获取不到任务会一直被阻塞,接下来看看`getTask`方法的内部实现

### getTask

`getTask`用于**阻塞**式的从缓冲队列中获取任务。

private Runnable getTask() {
// 线程是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
	// 自旋
	// 获取线程池状态
    int c = ctl.get();
    int rs = runStateOf(c);

    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    	// 线程池终止了,或者线程池停止了,且缓冲队列中没有任务了
    	// 自旋 + CAS方式减少线程计数
        decrementWorkerCount();
        return null;
    }

    int wc = workerCountOf(c);

    // 根据allowCoreThreadTimeOut参数来判断,要不要给核心线程设置等待超时时间
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        // 当前线程数大于了maximumPoolSize(因为maximumPoolSize可以动态修改)或者当前线程设置了超时时间且已经超时了
        // 且线程数大于1或者缓冲队列为空
        // 这个条件的意思就是:当前线程需要被回收
        if (compareAndDecrementWorkerCount(c))
        	// 返回null后,上层runWorker方法中断循环,执行processWorkerExit方法回收线程
            return null;
        continue;
    }

    try {
    	// 从阻塞队列中获取任务
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
        	// 成功获取任务
            return r;
        // 没有获取到任务,超时
        timedOut = true;
    } catch (InterruptedException retry) {
    	// 线程被中断,重试
        timedOut = false;
    }
}

}

理解该方法的前提,是要理解**阻塞队列**提供的阻塞式API。
这个方法重点关注两点:

*   从缓冲队列取任务时,`poll`非阻塞,`take`阻塞,调用哪个由当前线程需不需要被回收来决定
*   该方法返回null之后,上层方法会回收**当前线程**

除了这几个核心方法之外,往线程池提交任务还有一个方法叫`submit`

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

`submit`方法可以接收线程池返回的结果,也就是`Futrue`对象,可以接收`Runnable`对象和`Callable`对象。
至于`Future`、`FutureTask`、`Runnable`、`Callable`之间的关系此处不再赘述。

至此`ThreadPoolExecutor`的核心方法的源码以及执行逻辑已经讲解完毕,再来看一些非核心方法,了解一下即可

*   `public void shutdown()`:关闭线程池,已经提交过的任务还会执行(线程池中未运行完毕的,缓冲队列中的)
*   `public List<Runnable> shutdownNow()`:停止线程池,试图停止正在执行的任务,暂停缓冲队列中的任务,并且返回
*   `public void allowCoreThreadTimeOut(boolean value)`:设置**核心线程**是否允许回收
*   `protected void beforeExecute(Thread t, Runnable r)`:钩子函数,处理线程执行任务前的逻辑,这里是**空实现**
*   `protected void afterExecute(Runnable r, Throwable t)`:钩子函数,处理线程执行任务后的逻辑,这里是**空实现**
*   `public int getActiveCount()`:返回正在执行任务的线程的**大致数量**
*   `public long getCompletedTaskCount()`:返回执行完成的任务的**大致数量**

除此之外还需要了解的是,构造方法中的七个参数,除了`BlockingQueue`是不能动态设置外,其余六个参数都可以动态设置,分别调用对于的`setXxx`方法即可,当然也可以通过对于的`getXxx`方法获取对应的信息。

鉴于此,我们再来看一个常见的问题

> Java有几种线程池?

JDK(准确的说是`java.util.concurrent.Executors`工具类)提供了四种线程池:

*   `CachedThreadPool`:缓冲线程池

    ```
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    ```

*   `FixedThreadPool`:固定线程数的线程池

    ```
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    ```

*   `SingleThreadExecutor`:单线程的线程池

    ```
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
    }

    ```

*   `ScheduledThreadPool`:可定时调度的线程池

    ```
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    // ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以super()还是调用ThreadPoolExecutor的构造方法
    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
    	public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
        }
    }

    ```

仔细看下这四种线程池,最终都调用了ThreadPoolExecutor的构造方法,只是传递的参数有所不同。

*   `CachedThreadPool`和`ScheculedThreadPool`设置的最大线程数都是`Integer.MAX_VALUE`,可能线程数过多而产生OOM
*   `SingleThreadExecutor`和`FixedThreadPool`使用的都是无界队列,最大元素个数为`Integer.MAX_VALUE`,可能缓冲队列中堆积的任务过多,而产生OOM

这两点正是**阿里巴巴代码规范**里禁止使用这四种线程池的原因。
想要使用线程池,必须通过`ThreadPoolExecutor`的方法来**创建线程池**。


这篇关于ThreadPoolExecutor线程池原理+源码,了解,字节跳动技术整理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!