Java教程

并发编程-线程池的设计原理

本文主要是介绍并发编程-线程池的设计原理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

并发编程-线程池的设计原理

这一篇,我们来聊一聊线程池,和他的设计原理,以及一些关键参数如何动态设计线程参数,很多东西咱们在之前的文章中已经讨论过,这一篇就相对轻松点,run。

线程池

创建线程遇见的麻烦

  • 线程的频繁创建和销毁需要消耗cpu的资源 
  • 线程数量越多,cpu的资源的开销就越大
    • 如果线程的数量,远远大于cpu的核心数量的话,那么cpu就需要不断的进行上下文切换(比如线程从A切换到B那程序要记住上次执行的线程A执行的位置),而如果他不断的进行切换的话,那从某个方面就耗费了性能,因为他就没有空做正事了,而全身心的都在线程切换上了。

  那我们就自然而然的想到了线程的复用,这里金引入了【池化技术】:连接池、对象池、内存池、线程池这些都是池化技术的体现,那池化技术的核心就是复用,把一些复杂的对象,创建过程中需要消耗很多资源的对象,放置在一个池子中。

线程池】:我可以提前创建一系列的现场保存在池子中,有任务要执行的时候,就从池子中获取一个线程去执行

java中提供的线程池

java中提供一个Executors去操纵线程,其中提供这么几个方法进行线程池的构建(他们都是使用的是【ThreadPoolExecutor):

  • newFixedThreadPool】:固定线程数量,根据你传递的数构建相应的线程数
    •   
      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
      }

       

  • newSingleThreadExecutor】:只有一个线程的线程池
    • public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }

       

  • newCachedThreadPool】:可以缓存的线程池(有多少请求,此线程即可以创建多少个线程来处理请求)
    •   public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }

       

  • newScheduledThreadPool】:提供了按照周期执行的线程池(只是顶层对定时器做了一层封装,底层还是调用的)
    • public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
          return new DelegatedScheduledExecutorService
              (new ScheduledThreadPoolExecutor(1));
      }

 线程池的设计思考

【需求】:

  实现线程的重复利用

【问题】:

  • 如何实现线程的复用】:让线程实现复用的唯一办法就是让线程不结束,结束的点有:interrupt方法,stop方法,以及程序的正常运行,那可把他们放在一个循环中
  • 如何让线程执行指派的新任务】:使用共享内存可以做到(多线程访问一个公共的资源),比如定义一个全局的list,线程去不断的从这个里面取数据(其实就是一个生产者,和消费者的模型,类似于阻塞队列
  • 【如果队列满了怎么办】:究其根本就是数据量太多,线程太少,消费的速度比不上生产的速度,那我们就需要增加线程,增减消费的速度
    •  【如果扩容后,依然无法消费庞大的数据怎么办】:【拒绝策略我们都知道线程的数量是由cpu决定的,如果你创建了很多线程那从某种情况下就导致cpu负荷,降低了效率,所以我们做相应的处理(报错、直接丢弃、直接调用run方法、或者把队列中等待时间最久的线程抛弃、也可以让用户自定义)
    •  【请求峰过去了如何销毁线程】:前面我们介绍阻塞队列常用方法的时候有一个【poll方法(如果超时还没有产生数据,那就返回null)我们就可以使用这个方法去判断,然后把当前的自旋结束
  • 线程一直处于运行状态是否合理】:有任务来的时候运行,没有任务的时候对线程进行阻塞。

需要的参数:

    线程数量、最大线程数量、选择哪种拒绝策略、任务队列(阻塞队列)、扩容后的线程需要的存活时间和存活单位这个为pool(因为一旦请求峰过去 了,那就不需要这么多线程去处理任务了,就需要设置他们可以存活多久)

public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                          int maximumPoolSize, //最大线程数
                          long keepAliveTime,//存活时间
                          TimeUnit unit,//存活单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列
                          ThreadFactory threadFactory //线程工厂(如果想自定义线程池中线程的名字),
                          RejectedExecutionHandler handler) //拒绝策略{
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

【conclusion】:那基于上面的猜想,我们可以使用阻塞队列让其在整个线程池可见(公共资源),并且可以让线程一直处于活着的状态。当有任务的时候,每个线程自己进行自旋,充当一个消费者,对任务进行执行 ,并且前面也说了阻塞队列是线程安全的,如果到达数据高峰期则进行扩容(线程增加),请求峰期过去后,对某些线程进行释放

 源码分析

!!! 线程池中的线程是延迟加载的

  • 那首先肯定就是初始化核心线程 调用阻塞队列的方法,
  • 把任务传递进去(offer()会返回是否存储成功和失败)
    • true:说明当前核心线程可以支持当前的访问量
    • false:增减工作线程
      •   如果添加失败,则说明当前的工作线程达到了你传递的最大的线程数,直接调用你传递的拒绝策略
 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
         //判断当前工作的线程是否小于核心线程数(延迟初始化)
        if (workerCountOf(c) < corePoolSize) {
            //添加工作线程,并且执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
         //添加到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
         //如果阻塞队列满了,则添加工作线程
        else if (!addWorker(command, false))
            //调用拒绝策略
            reject(command);
    }

添加任务到阻塞队列中,因为是延迟初始化,需要创建线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //通过原子操作增加线程数量
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //判断是否可以进行创建(是否是最大数)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //通过原子操作增加线程数量
            if (compareAndIncrementWorkerCount(c))
                //直接走到最上面向下执行
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //初始化工作线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //构建一个工作线程,并传入任务(但是这里还没有启动)
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //如果不是中断状态则添加他到容器中这是存储工作线程的线程容器(HashSet<Worker> workers = new HashSet<Worker>();)
                    workers.add(w);
                    int s = workers.size();
                    //重新更新最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //这里添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果失败,把添加的线程释放,然后回归元素
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

运行线程去获取任务

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //保证当前线程不结束,直到task为空
        while (task != null || (task = getTask()) != null) {
            //worker继承了Aqs 这里实现了一个互斥锁,表示当前线程正在运行一个任务,如果其他地方呀shutdown则必须保证线程执行完成
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //这里是空的实现,我们可以自定义自己的扩展
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //这里是空的实现,我们可以自定义自己的扩展
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

进行任务获取

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //如果线程池已经是结束状态,,则清理所有的线程
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //是否允许超时
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //如果当前线程大于核心线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //通过cas减少工作线程数量
            if (compareAndDecrementWorkerCount(c))
                //表示要销毁当前工作线程
                return null;
            continue;
        }

        //进行任务的获取
        try {
            //如果当前阻塞没有任务,当前线程会阻塞在这里
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

 线程数量的动态修改

线程池提供了修改线程的数量,我们可以通过配置中心来动态配置,然而我们需要设置多少线程呢,其实也没有一个准确的数字,因为系统中可能不止你一个线程池。常见的计算线程数的算法

IO密集型 = 2Ncpu(数据库数据交互、文件上传下载、网络数据传输等等)

计算密集型 = Ncpu(复杂算法)

下面给出动态改变线程数和队列数的代码。队列数是final修饰的,我们只有复制原来的队列出来,给里面加一个方法

public class DynamicChangeNumber {

    static ThreadPoolExecutor executorService =
            new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ReSizeLinkedBlockingQueue<>(30));

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                try {
                    //模拟任务的执行
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        printThreadStatus("改变之前");
        executorService.setCorePoolSize(4);
        executorService.setMaximumPoolSize(10);
        ReSizeLinkedBlockingQueue reSizeLinkedBlockingQueue = (ReSizeLinkedBlockingQueue) executorService.getQueue();
        reSizeLinkedBlockingQueue.setCapacity(100);
        printThreadStatus("改变之后");
    }

    public static void printThreadStatus(String name) {
        ReSizeLinkedBlockingQueue reSizeLinkedBlockingQueue = (ReSizeLinkedBlockingQueue) executorService.getQueue();
        System.out.println(name+"------>核心线程数" + executorService.getPoolSize() +
                "最大线程数" + executorService.getMaximumPoolSize() +
                "队列容量" + (reSizeLinkedBlockingQueue.size() + reSizeLinkedBlockingQueue.remainingCapacity()));

    }
}

结果

 

这篇关于并发编程-线程池的设计原理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!