线程池底层维护了一个HashMap集合用来存放worker对象,这个worker类实现了runnable接口,代表它是一个可执行的任务,worker类中有两个重要属性:具体工作线程,第一次要执行的任务。
初始化worker类时,它会创建一个线程并将当先对象封装到线程中,也就是说当此线程启动时执行的是此对象的run方法。而worker类中的run方法,他其实调用了runworker()方法,此方法是真正实现线程复用的地方
此方法中用一个while循环用来不断地获得任务,交给当前线程去执行。
//存放工作线程的集合,线程池的底层实现,是ThreadPollExecutor类中的一个属性 private final HashSet<Worker> workers = new HashSet<Worker>();
Worker实现了Runnable接口,代表是一个可执行的任务,
final Thread thread;//具体工作的线程 Runnable firstTask;//第一次要执行的任务
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //getThreadFactory()得到一个线程工厂的方法 //newThread(this):传入Runnable任务得到一个线程,调用此线程的start()方法时,会调用此类中的run()方法 this.thread = getThreadFactory().newThread(this); }
public void run() { runWorker(this); }
//此方法中的主要代码 final void runWorker(Worker w) { Runnable task = w.firstTask;//辅助变量用来存放获取的任务 while (task != null || (task = getTask()) != null) {//不断地调用getTask()方法获得新的任务 try(){ task.run();//执行到此代码,就完成了一个任务,之后循环获得新的任务再次执行执行。 }catch (Throwable x) { thrown = x; throw new Error(x); } } }
worker类中的getTask()方法
getTask():获取线程任务,通过两种方式从任务队列中获得线程任务,一种是核心线程获取任务,一种是临时线程获取任务。
//getTask()方法中获取任务的核心方法 /** *workQueue:任务队列 *r = timed:判断此线程是否为核心线程 */ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://临时线程获取任务方法,超时获取就会退出循环,线程被回收 workQueue.take();//核心线程获取任务方法,获取不到就一直阻塞着 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
//execute()核心代码 int c = ctl.get(); //当前线程工作数与核心线程相比较,小于创建核心线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //尝试向工作队列中添加任务,添加成功则进入if if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//对线程池进行一个状态的检查 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //尝试创建临时线程,失败则进行拒绝策略的调用 else if (!addWorker(command, false)) reject(command); }
//创建Work工作任务并入workers队列 for (;;) { int wc = workerCountOf(c);//当前工作线程的数量 if (wc >= CAPACITY || //判断当前(核心)线程数是否大于(核心)最大线程数 wc >= (core ? corePoolSize : maximumPoolSize)) return false; } try{ w = new Worker(firstTask);//创建worker,并将任务传给worker,此类中创建工作线程 final Thread t = w.thread; if(t!=null){ workers.add(worker);//将新创建的worker添加到集合中 workerAdded = true;//添加成功 if (workerAdded) {//添加成功 t.start();//开启线程 workerStarted = true; } } }