线程池这东西,用了几次还是不得其解,简直是:求之不得,寤寐思服。悠哉悠哉,辗转反侧。
线程池,好东西啊,它有一池子的线程,所以叫线程池。
为什么说它是好东西呢?有的人会觉得,那一池子线程,放在那边又不用,不浪费资源?
其实这笔账很好算的:假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
当 T1+T3 > T2 && 这种线程被多次调度的时候,你还会觉得浪费资源吗?况且线程池内部又不是缺乏管理,相反,线程池内部管理很严格,吃白饭的线程很难有立足之地,用不上就裁员呗。
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目
看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
线程池的组成部分如下:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务; 2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务; 3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等; 4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池的外部支持还有:
5、锁 6、条件变量
这,就是线程池。
还是配上代码来讲,不然我自己也晕。
//E_Pthread_Pool.h 我的代码的头文件 #ifndef E_EPOLL_POOL_H #define E_EPOLL_POOL_H #include <pthread.h> #include <unistd.h> #include <list> //据说list不安全,不安全就不安全吧,更不安全的都忍了 #include "d_pthread_cond.h" //封装过的条件变量类,继承自封装的mutex锁类,所以具有锁和条件变量的双重属性 using namespace std; class Task //任务接口,每个任务必须实现的接口,以供工作线程调度任务的执行 { public: Task(){} virtual ~Task(){} virtual int run()=0; //留给子类实现 }; typedef list<Task *> list_task; //任务队列,用于暂存等待处理的任务,等待线程唤醒时处理,提供一种缓冲机制。 class E_PThread_Pool //线程池类 { public: E_PThread_Pool(unsigned int max=100,unsigned int min=10,unsigned int wait=60); ~E_PThread_Pool(); void addTask(Task *task); // 往任务队列中添加新线程 private: static void *taskThread(void *arg);// 工作线程 void createThread(); // 新建一个线程 void destroyThread(); // 销毁一个线程池 unsigned int maxcount; // 最大线程数 unsigned int mincount; // 最小线程数 unsigned int count; // 当前线程池中线程数 unsigned int waitcount; // 等待线程数 unsigned int waitsec; // 等待时间 list_task taskList; //任务队列 D_Pthread_Cond taskCond; //任务锁,线程接任务时使用 D_Pthread_Cond cond; //线程锁,创建线程时使用 bool Stop; //线程池是否被允许运作,初始化线程池对象时置0,线程池销毁时置为1 }; #endif
//我的线程池源文件 #include "e_pthread_pool.h" //开放接口1 E_PThread_Pool::E_PThread_Pool(unsigned int max,unsigned int min,unsigned int wait) { //配置基本参数 count = 0; //当前线程池为空 waitcount = 0; //没有等待线程 mincount = min; //核心线程数(出厂配置) maxcount = max; //最大线程数(能承受的最高配置) waitsec = wait; //线程保活时长(过了时长还没接到任务,那就裁掉) Stop = false; //允许运作 //上锁,创建一定数量的线程作为初始线程池 cond.lock(); for (unsigned i = 0; i < mincount; i++) { createThread(); //跳转到这个函数的实现->->->->-> } cond.unlock(); } E_PThread_Pool::~E_PThread_Pool() { destroyThread(); //销毁线程池 } void E_PThread_Pool::createThread() { pthread_t tid; int ret = pthread_create(&tid, NULL, taskThread, (void *)this); //以执行taskThread()为目的创建线程,跳转到taskThread()函数的实现 ->->->->-> if (ret < 0) perror("pthread create error"); else count++; } // 工作线程 void * E_PThread_Pool::taskThread(void *arg) { pthread_detach(pthread_self()); //设置线程自分离属性 E_PThread_Pool *pool=(E_PThread_Pool *)arg; while(1) { pool->cond.lock(); //如果没有工作线程在等待 if (pool->taskList.empty()) { if(pool->Stop) //当收到线程池停止运行的消息时 { pool->count--; //线程数减一 pool->cond.unlock(); pthread_exit(NULL); //本线程强制退出 } pool->waitcount++; //等待任务的线程数加一 bool bSignal = pool->cond.timewait(pool->waitsec); //新任务等待被唤醒 pool->waitcount--; //没等到,没事干,喝西北风了 // 删除无用线程 if (!bSignal && pool->count > pool->mincount) //如果没事干 && 有多余线程 { pool->count--; //先裁员一个,不要一次做绝了,反正是在while循环里面,没事干裁员机会多得是 pool->cond.unlock(); pthread_exit(NULL); } } pool->cond.unlock(); //记得要释放锁 //如果有工作线程在等待 if (!pool->taskList.empty()) { pool->taskCond.lock(); //上任务锁 Task *t = pool->taskList.front(); //获取任务队列中最前端的任务并执行 pool->taskList.pop_front(); //移除被领取的任务 pool->taskCond.unlock();//记得解锁 t->run(); //任务开始 delete t; //弄完就删了 } } pthread_exit(NULL); } //开放接口2,向任务队列中添加任务 void E_PThread_Pool::addTask(Task *task) { if (Stop) //线程池是否停止工作 return; //向任务队列中添加新任务 taskCond.lock(); //上任务锁 taskList.push_back(task); //添加任务 taskCond.unlock(); //记得解锁 cond.lock(); //上线程锁 if(waitcount) //如果有空闲线程 { cond.signal(); //唤醒一个线程 } else if(count < maxcount) //如果没有空闲线程,一般来说,走到这里面来,那这个线程池的设计是有点失败了 { createThread(); //那就创建一个 cond.signal(); //然后唤醒 } cond.unlock(); } void E_PThread_Pool::destroyThread() { printf("destroy?\n"); #if 0 //强行清理 list_task::iterator it = taskList.begin(); for (; it! = taskList.end(); it++) { Task *t = *it; delete t; t = NULL; } taskList.clear(); #endif // 等待所有线程执行完毕 Stop = true; while (count > 0) { cond.lock(); cond.broadcast(); //广播 cond.unlock(); sleep(1); } }
代码注释已经够详尽了,如果还有不明白的,可以底下评论。