转载:醍醐灌顶全方位击破C++线程池及异步处理 - 知乎 (zhihu.com)
重点:
转载的代码有点乱,他有两种方法,只测试了第二种方法。
代码是看了,但无法验证这个线程池的暂停是否有效。等后续再想想,测试暂停的有效性。
Threadpool.h
#pragma once #include <functional> #include <thread> #include <queue> #include <condition_variable> #include <future> using namespace std; using Task = function<void()>; class ThreadPool { public: ThreadPool(size_t size = 4); ~ThreadPool(); public: template<typename T, typename...Args> auto Commit(T&& t, Args&&...args)->future<decltype(t(args...))> { if (m_stop.load()) { throw runtime_error("task has closed commit"); } using ResType = decltype(t(args...)); auto task = make_shared<packaged_task<ResType()>>( bind(forward<T>(t), forward<Args>(args)...)); unique_lock<mutex> lock(mu); m_tasks.emplace([task]() { (*task)(); }); m_cv.notify_all(); //唤醒等待线程 future<ResType> fu = task->get_future(); return fu; } public: void ShutDown(); //停止任务提交 void Restart(); //重启任务提交 private: Task GetOneTask();//获取一个待执行的task void Schedual(); //任务调度 private: vector<thread> m_pool; mutex mu; queue<Task> m_tasks; condition_variable m_cv; atomic<bool> m_stop; };
ThreadPool.cpp
#include "ThreadPool.h" #include <future> ThreadPool::ThreadPool(size_t size) :m_stop{false} { size = size < 1 ? 1 : size; for (size_t i=0;i<size;++i) { m_pool.emplace_back(&ThreadPool::Schedual, this); } } ThreadPool::~ThreadPool() { for (auto&t:m_pool) { t.detach(); //让线程自身自灭 //t.join(); //等任务结束,前提:线程一定会执行完 } } void ThreadPool::ShutDown() { m_stop.store(true);//对内存进行访问memory_order_seq_cst,采用store } void ThreadPool::Restart() { m_stop.store(false);//对内存进行访问memory_order_seq_cst,采用store } Task ThreadPool::GetOneTask() { unique_lock<mutex> lock(mu); m_cv.wait(lock, [this] {return !m_tasks.empty(); }); Task task(move(m_tasks.front())); m_tasks.pop(); return task; } void ThreadPool::Schedual() { while (true) { if (Task task =GetOneTask()) { task(); } else { return; //结束 } } }
Test.cpp
// Test.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 // #include <iostream> #include <future> #include "ThreadPool.h" using namespace std; void fun() { for (int i = 0; i < 100000; ++i) { cout << "hello"<<i << endl; } } struct Gan { int operator()() { cout << "hello,gan" << endl; return 42; } }; int main() { try { ThreadPool task(10); future<void> ff = task.Commit(fun); future<int> fg = task.Commit(Gan()); future<string> fs = task.Commit([]()->string { return "hello,fs"; }); task.ShutDown(); ff.get(); cout << "fg.get : " << fg.get ()<< endl; this_thread::sleep_for(chrono::seconds(5)); task.Restart(); //重启任务 cout << "end " << endl; return 0; } catch (const std::exception& e) { cout << "soming is wrong "<< e.what() << endl; } return 0; }