线程池是一般服务端或者客户端编程经常要用到的一种管理线程的类,从网上找到一个比较好的线程池实现,主要运用C++11。记录一下理解过程,帮助学习线程池和C++11。
150行代码手写线程池 :https://www.bilibili.com/video/BV1yQ4y1o7zv/
threadPool.hpp
/* * @Author WangYubo * @Date 09/17/2018 * @Description 线程池实现的头文件 */ #ifndef THREAD_ _POOL_ _H #define THREAD_ _POOL_ _H #include <atomic> #include <condition_ variable> #include <functional> #include <future> #include <mutex> #include <queue> #include <string> #include <thread> #include <vector> class threadPool { using Task = std::function<void(void)>; private: std::vector<std::thread> m_pool; //线程池 std::atomic<int> m_idleThreadNum; //空闲线程数量 std::atomic<bool> m_stoped; //是否停止线程 std::mutex m_lock; //线程池锁 std::queue<Task> m_tasks; //待执行任务 std::condition_variable m_cv; //线程控制 int m_threadNum = 0; //线程总数 std::string m_poolName; //线程池名称 //线程执行函数 void run(); public: //构造函数 threadPool() : m_stoped(true) {} ~threadPool(); //添加线程函数 template <class F, class... Args> auto commit(F &&f, Args &&... args) -> std::future<decltype(f(args...))> { using RetType = decltype(f(args...)); if (m_stoped) { return std::future<RetType>(); } auto task = std::make_shared<std::packaged_task<RetType()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); auto ret = task->get_future(); { std::lock_guard<std::mutex> lock(m_lock); m_tasks.emplace([task] { (*task)(); }); } m_cv.notify_one(); return ret; } // 初始化 virtual void init(int threadNum, std::string name = "ThreadPool"); }; #endif
threadPool.cpp
/* * @Author WangYubo * @Date 09/17/2018 * @Description 线程池实现的cpp文件 */ #include "threadPool . hpp" #include "log . hpp" using namespace std; threadPool::~threadPool() { m_stoped = true; m_cv.notify_all(); for (auto &tmp : m_pool) { if (tmp.joinable()) tmp.join(); } } void threadPool::init(int threadNum, string name) { if (!m_stoped) { LOG_INFO("%s has been started, thread num %d", m_poolName.c_str(), m_threadNum); return; } { unique_lock<mutex> lock(m_lock); if (!m_stoped) { return; } m_stoped = false; // 清理旧的线程,初始化新的线程 m_pool.clear(); for (int i = 0; i < threadNum; i++) { m_pool.emplace_back(thread(&threadPool::run, this)); } m_threadNum = threadNum; m_poolName = name; } LOG_INFO("%s start thread num %d", m_poolName.c_str(), m_threadNum); } void threadPool::run() { while (true) { m_idleThreadNum--; Task task; { unique_lock<mutex> lock(m_lock); m_cv.wait(lock, [this] { return m_stoped || !(m_tasks.empty()); }); if (m_tasks.empty()) { return; } task = move(m_tasks.front()); m_tasks.pop(); } LOG_DEBUG("Handle one task"); task(); m_idleThreadNum++; } }
多线程实现从1累加到10000,使用原子变量实现线程同步
main.cpp
/* * @Author WangYubo * @Date 09/17/2018 * @Description */ #include "baseInstance . hpp " #include "threadPool . hpp" #include "log .hpp" #include <atomic> #include <vector> #include <future> using namespace std; class testPool : public threadPool, public BaseInstance<testPool> { public: void init(int threadNum) { threadPool::init(threadNum, "testPool"); } }; atomic<int> addNum; atomic<int> result; int test() { int add = 0; while (true) { add = addNum++; if (add > 10000) { return add; } result += add; } } int main(int argC, char *arg[]) { LOG_DEBUG("Hello"); testPool &pool = testPool::getInstance(); pool.init(5); vector<future<int>> ret; for (int i = 0; i < 20; i++) { ret.emplace_back(pool.commit(test)); } for (auto &tmp : ret) { tmp.wait(); } LOG_DEBUG("End, result %d", result.load()); return 0; }
乍一看很复杂,但是实现的功能却很强大,支持传入任意参数的任务函数。获取线程结果使用wait函数,充分运用C++11的各种新特性future、atomic(用来不加锁)、condition_variable等。
但是这个线程函数不可以传入引用,由于bind的特性,只允许传入指针才可以修改外部变量。传入的参数都将成为值拷贝的形式。
class threadPool { using Task = std::function<void(void)>; private: std::vector<std::thread> m_pool; //线程池 std::atomic<int> m_idleThreadNum; //空闲线程数量 std::atomic<bool> m_stoped; //是否停止线程 std::mutex m_lock; //线程池锁 std::queue<Task> m_tasks; //待执行任务 std::condition_variable m_cv; //线程控制 int m_threadNum = 0; //线程总数 std::string m_poolName; //线程池名称 //线程执行函数 void run(); public: //构造函数 threadPool() : m_stoped(true) {} ~threadPool(); //添加线程函数 template <class F, class... Args> auto commit(F &&f, Args &&... args) -> std::future<decltype(f(args...))> { using RetType = decltype(f(args...)); if (m_stoped) { return std::future<RetType>(); } auto task = std::make_shared<std::packaged_task<RetType()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); auto ret = task->get_future(); { std::lock_guard<std::mutex> lock(m_lock); m_tasks.emplace([task] { (*task)(); }); } m_cv.notify_one(); return ret; } // 初始化 virtual void init(int threadNum, std::string name = "ThreadPool"); };
入参为第一个是函数指针,剩余为可选参数,包括返回类型都由函数指针定义,实现任意函数任意类型的传入和返回。
auto类型的返回在C++11及以前是不支持的,在C++14才允许了auto类型返回值。
当前使用c++11需要在后面加了一个future<decltype(f(args…))>,就是用参数来推断返回值并且转成future类型
使用模板定义F,可以传入任意类型的函数指针,在C++中,函数也认为是一个类
这个定义可以直接实现任意参数和任意返回值的任务函数
模板函数只能在头文件定义,不然编译会报错,所以commit的实现写在了头文件中,具体讲解实现看后文
thread 线程类
thread是c++11封装的上层线程类,可以直接创建线程
等待线程退出使用join()函数
示例
#include <iostream> #include <thread> using namespace std; void th_func() { std::cout << "hello thread." << std::endl; } int main(int argc, char *argv[]) { std::thread t(th_func); t.join(); return 0; }
示例
#include <thread> #include <mutex> #include <vector> #include <iostream> #include <algorithm> std::mutex my_lock; void add(int &num, int &sum){ while(true){ std::lock_guard<std::mutex> lock(my_lock); if (num > 100){ break; } sum += num++; } } int main(){ int sum = 0; int num = 0; std::vector<std::thread> ver; //保存线程的vector for(int i = 0; i < 20; ++i){ std::thread t = std::thread(add, std::ref(num), std::ref(sum)); ver.emplace_back(std::move(t)); //保存线程 } std::for_each(ver.begin(), ver.end(), std::mem_fn(&std::thread::join)); //join std::cout << sum << std::endl; }
void threadPool::run() { while (true) { m_idleThreadNum--; Task task; { unique_lock<mutex> lock(m_lock); m_cv.wait(lock, [this] { return m_stoped || !(m_tasks.empty()); }); if (m_tasks.empty()) { return; } task = move(m_tasks.front()); m_tasks.pop(); } LOG_DEBUG("Handle one task"); task(); m_idleThreadNum++; } }
condition_variable需要和mutex一起用,使用的是mutex的锁特性将线程阻塞。
condition_variable::wait()参数为上述申请的锁,第二个参数可选
如果外部使用相同的condition_variable执行notify_one(),将会使一个执行wait阻塞的线程唤醒,并且去申请锁
wait第二个参数可选,在wait()之前会先判断条件是否为true,如果条件为true,不会调用wait,直接进行下面流程
示例
int test1(promise<int> &promisObj) { sleep(10); LOG_DEBUG("set promise value"); // promise变量赋值 promisObj.set_value(10); sleep(10); LOG_DEBUG("return"); return 0; } int main(int argC, char *arg[]) { LOG_DEBUG("Hello"); testPool &pool = testPool::getInstance(); pool.init(5); vector<future<int>> ret; // 声明promise promise<int> promisObj; // 起一个线程处理 ret.emplace_back(pool.commit(test1, ref(promisObj))); // 获取future变量 auto tmp = promisObj.get_future(); // 等待变量赋值,这里会阻塞 tmp.wait(); LOG_DEBUG("get data %d", tmp.get()); for (auto &tmp : ret) { tmp.wait(); } return 0; }
一般promise作为函数输入参数传入,然后调用get_future获取future变量
future变量调用wait会阻塞当前线程,直到promise变量调用了set_value函数赋值,才会返回
future直接调用get也会先调用wait再调用get
future本身可以当做函数返回值传入到线程中,当线程函数返回会赋值future,但是一般需要用到packaged_task来包装线程函数
线程的thread类参数中需要使用ref来传引用,由于线程传入参数是拷贝,隐式使用引用编译会报错,需要使用ref包裹来告诉编译器传入引用
//添加线程函数 template <class F, class... Args> auto commit(F &&f, Args &&... args) -> std::future<decltype(f(args...))> { using RetType = decltype(f(args...)); if (m_stoped) { return std::future<RetType>(); } auto task = std::make_shared<std::packaged_task<RetType()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); auto ret = task->get_future(); { std::lock_guard<std::mutex> lock(m_lock); m_tasks.emplace([task] { (*task)(); }); } m_cv.notify_one(); return ret; }
上述线程池实现中,将传入的函数外面包装一层packaged_task,类型为外部传入的函数类型
调用get_future()拿到返回值的future类,返回给外层调用,外层调用就可以使用此变量获取到函数返回值
由于返回值已经在调用前获取到了,所以线程函数中真实直接调用task,不需要关心返回值,一般线程创建函数都是没有返回值的,使用此类包装可以方便的把返回值带出去
这里的实现其实是将函数转成packaged_task用于获取future,然后外层包装智能指针,传给lambda表达式,然后整个表达式存放到task的队列中
lambda表达式,[]中相当于类的成员变量,()省略代表没有参数,内部实现没有返回值,编译时整个lambda表达式就相当于void(void)。符合m_tasks的定义
lambda函数只做一件事情,调用task,task的参数由上面的bind进行绑定,所以调用不需要加参数
void fun_2(int &a,int &b) { a++; b++; LOG_DEBUG("print a = %d, b = %d",a, b); // a = 4, b = 3 } int main(int argC, char *arg[]) { LOG_DEBUG("Hello"); int m = 2; int n = 3; auto f4 = std::bind(fun_2, n, placeholders::_1); //表示绑定fun_2的第一个参数为n, fun_2的第二个参数由调用f4的第一个参数(_1)指定。 f4(m); LOG_DEBUG("m %d", m); // 3 LOG_DEBUG("n %d", n); // 3 return 0; }
上面各个类的使用详解里面基本将整个线程池给介绍完了
关键点,将task通过bind变成通用的函数类型,使用C++的模板和deltype实现任意函数和参数任务
将传入的函数放到任务队列里面,启用一个线程处理任务。每提交一个任务通知一个线程唤醒,处理任务。
线程通过cv进行控制,没有任务时将会休眠等待外部唤醒处理任务。
最新C/C++linux服务器开发/架构师面试题、学习资料、教学视频和学习路线脑图(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享>>>>资料获取