本文主要是介绍线程池实现例子,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
ThreadPool接口
public interface ThreadPool {
//提交任务到线程池
void execute(Runnable runnable);
//关闭线程池
void shutdown();
//获取线程池的初始化大小
int getInitSize();
//获取线程池的核心线程数量
int getCoreSize();
//获取线程池的最大线程数量
int getMaxSize();
//获取线程池中用于缓存任务队列的大小
int getQueueSize();
//获取线程池中活跃的线程的数量
int getActiveCount();
//查看线程池是否已经被shutdown
boolean isShutdown();
}
ThreadFactory接口
/**
* 创建个性化线程
*
* ThreadFactory提供创建线程的接口,以便个性化定制Thread,比如Thread应该被加入到哪个
* Thread Group中,优先级、线程名称,以及是否为守护线程等
**/
@FunctionalInterface
public interface ThreadFactory {
Thread createThread(Runnable runnable);
}
RunnableQueue接口
/**
* 线程队列基本操作
*
* RunnableQueue主要用于存放提交的Runnable
* 该Runnable是一个BlockedQueue,并且有limit限制
**/
public interface RunnableQueue {
//当有新的任务进来时首先会offer到队列中
void offer(Runnable runnable);
//工作线程通过take方法获取Runnable
Runnable take() throws InterruptedException;
//获取任务队列中任务的数量
int size();
}
DenyPolicy接口
/**
* 线程池满时拒绝策略
**/
@FunctionalInterface
public interface DenyPolicy {
void reject(Runnable runnable,ThreadPool threadPool);
//该拒绝策略会直接将任务丢弃
class DiscardDenyPolicy implements DenyPolicy
{
@Override
public void reject(Runnable runnable,ThreadPool threadPool)
{
//do nothing
}
}
//该拒绝策略向任务提交者抛出异常
class AbortDenyPolicy implements DenyPolicy
{
@Override
public void reject(Runnable runnable,ThreadPool threadPool)
{
throw new RuntimeException("The runnable "+runnable+" will be abort.");
}
}
//该拒绝策略会使任务在提交者所在的线程中执行任务
class RunnerDenyPolicy implements DenyPolicy
{
@Override
public void reject(Runnable runnable,ThreadPool threadPool)
{
if(!threadPool.isShutdown())
{
runnable.run();
}
}
}
}
InternalTask
/**
* 不断从runnableQueue中取出Runnable并执行任务
**/
public class InternalTask implements Runnable{
private final RunnableQueue runnableQueue;
private volatile boolean running=true;
public InternalTask(RunnableQueue runnableQueue){
this.runnableQueue=runnableQueue;
}
@Override
public void run()
{
//如果当前任务为running且没有被中断,则将其不断地从queue中获取runnable,然后执行run
while(running && !Thread.currentThread().isInterrupted())
{
try
{
Runnable task=runnableQueue.take();
task.run();
}catch (InterruptedException e){
running=false;
break;
}
}
}
//停止当前任务,主要会在线程池的shutdown方法中使用
public void stop()
{
this.running=false;
}
}
LinkedRunnableQueue
/**
* 双向循环链表实现线程任务队列基本操作
**/
public class LinkedRunnableQueue implements RunnableQueue{
//任务队列的最大容量,在构造时传入
private final int limit;
//若任务队列中的任务已经满了,则需要执行拒绝策略
private final DenyPolicy denyPolicy;
//存放任务的队列
private final LinkedList<Runnable> runnableList = new LinkedList<>();
private final ThreadPool threadPool;
public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
this.limit = limit;
this.denyPolicy = denyPolicy;
this.threadPool = threadPool;
}
@Override
public void offer(Runnable runnable) {
synchronized (runnableList){
if (runnableList.size()>=limit){
//无法容纳新的任务时执行拒绝策略
denyPolicy.reject(runnable,threadPool);
}else {
//将任务加入到队尾,并且唤醒阻塞中的线程
runnableList.addLast(runnable);
runnableList.notifyAll();
}
}
}
@Override
public Runnable take() throws InterruptedException {
synchronized (runnableList){
while (runnableList.isEmpty()){
try {
//如果任务队列没有可执行任务,则当前线程会挂起,
//进入runnableList关联的monitor set中等待唤醒
runnableList.wait();
}catch (InterruptedException e){
//被中断时将异常抛出
throw e;
}
}
return runnableList.removeFirst();
}
}
@Override
public int size() {
synchronized (runnableList){
//返回当前任务队列的任务数
return runnableList.size();
}
}
}
RunnableDenyException
/**
* 错误抛出
*
* RunnableDenyException是RuntimeException的子类,主要通知人物提交者,任务队列
* 无法再接收新的任务
**/
public class RunnableDenyException extends RuntimeException{
public RunnableDenyException(String message)
{
super(message);
}
}
BasicThreadPool
/**
* 实现ThreadPool
*
* 线程池的初始化:数量控制属性、创建线程工厂、任务队列策略等功能
**/
public class BasicThreadPool extends Thread implements ThreadPool{
//初始化线程数量
private final int initSize;
//线程池最大线程数量
private final int maxSize;
//线程池核心线程数量
private final int coreSize;
//当前活跃的线程数量
private int activeCount;
//创建线程所需的工厂
private final ThreadFactory threadFactory;
//任务队列
private final RunnableQueue runnableQueue;
//线程池是否已经被shutdown
private volatile boolean isShutdown = false;
//工作线程队列
private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
private static final DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
private final long keepAliveTime;
private final TimeUnit timeUnit;
//构造线程时传参
public BasicThreadPool(int initSize,int maxSize,int coreSize,int queueSize){
this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,queueSize, DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
}
public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,
int queueSize,DenyPolicy denyPolicy,
long keepAliveTime, TimeUnit timeUnit) {
this.initSize = initSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
this.threadFactory = threadFactory;
this.runnableQueue = new LinkedRunnableQueue(queueSize,denyPolicy,this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.init();
}
//初始化时,先创建initSize个线程
private void init(){
start();
for (int i = 0; i < initSize; i++){
newThread();
}
}
@Override
public void execute(Runnable runnable) {
if (this.isShutdown){
throw new IllegalStateException("The thread pool is destroy");
}
//提交任务只是简单第往任务队列中插入Runnable
this.runnableQueue.offer(runnable);
}
private void newThread(){
//创建任务线程并且启动
InternalTask internalTask=new InternalTask(runnableQueue);
Thread thread=this.threadFactory.createThread(internalTask);
ThreadTask threadTask=new ThreadTask(thread,internalTask);
threadQueue.offer(threadTask);
this.activeCount++;
thread.start();
}
private void removeThread(){
//从线程池移除某个线程
ThreadTask threadTask=threadQueue.remove();
threadTask.internalTask.stop();
this.activeCount--;
}
@Override
public void run() {
//run方法继承自Thread,主要用于维护线程数量,比如扩容,回收
while (!isShutdown && !isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
}catch (InterruptedException e){
isShutdown=true;
break;
}
synchronized (this){
if (isShutdown){
break;
}
//当前队列中有任务尚未处理,并且activeCount<coreSize则继续扩容
if (runnableQueue.size()>0&&activeCount<coreSize){
for (int i=initSize;i<coreSize;i++){
newThread();
}
//continue的目的在于不想让线程的扩容直接达到maxSize
continue;
}
//当前队列中有任务尚未处理,并且activeCount<maxSize则继续扩容
if (runnableQueue.size()>0&&activeCount<maxSize){
for (int i=coreSize;i<maxSize;i++){
newThread();
}
}
//如果任务队列中没有任务,则需要回收,回收至coreSize即可
if (runnableQueue.size()==0&&activeCount>coreSize){
for (int i=coreSize;i<activeCount;i++){
removeThread();
}
}
}
}
}
@Override
public void shutdown() {
synchronized (this){
if (isShutdown)return;
isShutdown=true;
threadQueue.forEach(threadTask -> {
threadTask.internalTask.stop();
threadTask.thread.interrupt();
});
this.interrupt();
}
}
@Override
public int getInitSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.initSize;
}
@Override
public int getCoreSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.coreSize;
}
@Override
public int getQueueSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return runnableQueue.size();
}
@Override
public int getMaxSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.maxSize;
}
@Override
public int getActiveCount() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.activeCount;
}
@Override
public boolean isShutdown() {
return this.isShutdown;
}
private static class ThreadTask{
Thread thread;
InternalTask internalTask;
public ThreadTask(Thread thread, InternalTask internalTask) {
this.thread = thread;
this.internalTask = internalTask;
}
}
private static class DefaultThreadFactory implements ThreadFactory{
private static final AtomicInteger group_counter=new AtomicInteger(1);
private static final ThreadGroup group =
new ThreadGroup("myGroup-"+group_counter.getAndDecrement());
public static final AtomicInteger COUNTER =new AtomicInteger(0);
@Override
public Thread createThread(Runnable runnable) {
return new Thread(group,runnable,"thread-poll-"+COUNTER.getAndDecrement());
}
}
}
测试线程池
/**
* 一个简单的程序分别测试线程池的任务提交、线程池线程数量的动态扩展,以及线程池的销毁功能
*/
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
//定义线程池,初始化程数为2,核心或程数为4,最大程数为6.任务队列最多允许1000个任务
final ThreadPool threadPool=new BasicThreadPool(2,6,4,1000);
//定义20个任务并且提交蛤线程池
for (int i=0;i<20;i++){
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getName()+" is" +
" running and done.");
}catch (InterruptedException e){
e.printStackTrace();
}
});
}
for (; ; ){
//不断输出线程池的信息
System.out.println("getActiveCount = "+threadPool.getActiveCount());
System.out.println("getQueueSize = "+threadPool.getQueueSize());
System.out.println("getCoreSize = "+threadPool.getCoreSize());
System.out.println("getMaxSize = "+threadPool.getMaxSize());
System.out.println("==========================================");
TimeUnit.SECONDS.sleep(5);
}
}
}
这篇关于线程池实现例子的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!