线程同步是线程间有序访问共享数据。
同步:在调用后没有得到结果,该调用就不返回。一旦调用返回,就意味得到了返回值。由调用者等待调用的结果。
异步:调用后立即返回了,不等待返回结果。由被调用者在得到结果后,通知调用者(回调函数,等)处理这个调用。
在< thread >线程库中,没有获得线程执行结果的方法。
在并发编程中,虽然多线程互斥临界区条件变量,可以获取异步任务执行结果,但操作多容易引入bug。还会使用各种回调方法来处理异步返回的结果,让代码分散且难以维护。
C++ 11新增了< future >库函数为异步编程提供了很大的便利。
< future >库允许不同的线程访问共享数据。
<future> 头文件中包含了以下几个类和函数:
Providers 类:std::promise, std::package_task
Futures 类:std::future, shared_future.
Providers 函数:std::async()
其他类型:std::future_error, std::future_errc, std::future_status, std::launch.
promise 提供了一种线程同步的手段。promise 对象可以保存类型 T 的值,该值可被在其他线程中的 future 对象读取 。 promise 对象可以和一个std::future(一种共享状态)对象相关联,并可以在相关联的共享状态(std::future)上保存一个类型T 的值。
通过promise的成员函数 get_future 来与 future 对象关联,调用该函数之后,两个对象共享相同的共享状态(shared state)
promise 对象是异步 Provider,它可以在另一个线程通过set_value成员函数设置共享状态的值,使得共享状态标志变为 ready。
future 对象可以接收异步返回的共享状态的值,可以在必要的情况下,用wait函数阻塞调用者,等待共享状态标志变为 ready,后才能获取共享状态的值。
Prmomise和Future提供一种访问异步操作结果的机制,可以在线程之间传递数据和异常信息。
std::promise< T >与std::packaged_task< Func >提供了较丰富的异步编程工具,使用时要创建提供共享状态的对象(promise与packaged_task),还要创建访问共享状态的对象(future与shared_future)。
promise (const promise&) = delete; 拷贝构造函数,被禁用。
promise (promise&& x) noexcept; 有移动构造函数。
std::promise 的 operator= 没有拷贝语义,即 std::promise 普通的赋值操作被禁用,operator= 只有 move 语义,所以 std::promise 对象是禁止拷贝的
std::promise<int> pr;
std::thread t([](std::promise<int>& p){ p.set_value_at_thread_exit(9); },std::ref(pr));//这里promise拿到了9
std::future<int> f = pr.get_future();//设置关联的future
auto r = f.get();
#include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> #include <chrono> void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise) { int sum = std::accumulate(first, last, 0); Sleep(10); accumulate_promise.set_value(sum); //promise对象设置结果,内部会让共享状态变为就绪,以提醒future } int use_promise() { //用std::promise<int> 在线程间传递结果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::promise<int> accumulate_promise; std::future<int> accumulate_future = accumulate_promise.get_future(); //将future 对象和promise 对象关联上 std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise)); //把promise对象送入耗时work_thread线程内 accumulate_future.wait();//future对象在wait方法上阻塞,用以等待关联的promise对象设置结果 std::cout << "result=" << accumulate_future.get() << '\n'; work_thread.join();//阻塞等待work_thread线程执行完成 return 0; }
std::promise<int> promise1; void print_global_promise() { std::future<int> fut = promise1.get_future(); int x = fut.get(); std::cout << "value: " << x << '\n'; } int use_promise2() { std::thread th1(print_global_promise); std::this_thread::sleep_for(std::chrono::seconds(5)); promise1.set_value(10); th1.join(); promise1 = std::promise<int>(); // promise1 被move赋值为一个新的 promise 对象. std::thread th2(print_global_promise); std::this_thread::sleep_for(std::chrono::seconds(5)); promise1.set_value(20); th2.join(); return 0; }
返回与 promise 共享状态相关联的 future对象 。future 对象可以访问该 promise 对象设置在共享状态上的值或者异常对象。调用get_future函数后,promise 对象通常会在某个时间点准备好(设置一个值或一个异常对象),如果不设置值或者异常,promise 对象在析构时会自动地设置一个 future_error 异常,来设置其自身的准备状态。
用于设置共享状态的值,通过成员函数set_value可以设置std::promise中保存的值,调用后 promise 的共享状态标志变为 ready。该值最终会被与之关联的std::future::get函数读取到。需要注意的是:set_value只能被调用一次,多次调用会抛出std::future_error异常
promise会做出承诺:会设置T类型的值,future未来从这里获得承诺的T类型的值。
线程A可以对promise执行set_value(),传入对应产出的值,而另一个线程B则可以使用future的get()方法来获取他们共享的值,但这个线程B会阻塞在那,直到获得future与promise共享的值。这里有一个值得注意的地方:Future的一个重要属性在于它只能被赋值一次。
void runner(std::promise<int>* pPromise) { std::cout << "do some io task" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(5)); pPromise->set_value(3); } int use_promise3() { std::promise<int> _promise; std::thread t(runner, &_promise); int ret = _promise.get_future().get(); // Wait p.set_value() std::cout << ret << std::endl; t.join(); return 0; }
std::future提供了一种访问异步操作结果的机制。因为异步操作是不能马上获得结果的,只能在未来某个时候获取,我们以同步等待的方式来获取结果。future_status有三种状态:
deferred:异步操作还没开始
ready:异步操作已经完成
timeout:异步操作超时
获取future结果有三种方式:
get:等待异步操作结束并返回结果
wait:只是等待异步操作完成,没有返回值
wait_for:超时等待返回结果。
//获取状态
status = future.wait_for(std::chrono::seconds(1))
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
从异步调用的角度来说,future更像是执行函数的返回值,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future对象来代表这个事件。
异步调用往往不知道何时返回,如果需要使用异步调用的结果。这个时候就要用到future。
线程可以周期性的在这个future上等待一小段时间,检查future是否已经ready,如果没有,该线程可以先去做另一个任务,一旦future就绪,该future就无法复位(无法再次使用这个future等待这个事件),所以future代表的是一次性事件。
<future>库中声明了两种future,唯一future(std::future)和共享future(std::shared_future)前者的实例是仅有的一个指向其关联事件的实例,后者可以有多个实例指向同一个关联事件,当事件就绪时,所有指向同一事件的std::shared_future实例会变成就绪。
std::future是一个模板,例如std::future<int>,模板参数就是期待返回的类型,future被用于线程间通信。
future使用的时机是当你不需要立刻得到一个结果的时候,你可以开启一个线程帮你去做一项任务,并期待这个任务的返回,但是std::thread并没有提供这样的机制,这就需要用到std::async和std::future(都在<future>头文件中声明)
std::async返回一个std::future对象,而不是给一个确定的值。当你需要使用这个值的时候,对future使用get(),线程就会阻塞直到future就绪,然后返回该值。
int accumulate2(std::vector<int>::iterator first, std::vector<int>::iterator last) { int sum = std::accumulate(first, last, 0); std::this_thread::sleep_for(std::chrono::seconds(10)); return sum; } int use_packaged_task() { //用packaged_task 在线程间传递结果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::packaged_task<int(std::vector<int>::iterator, std::vector<int>::iterator)> accumulate_task(accumulate2); std::future<int> accumulate_future = accumulate_task.get_future(); std::thread work_thread(std::move(accumulate_task), numbers.begin(), numbers.end()); accumulate_future.wait(); //等待结果 std::cout << "result=" << accumulate_future.get() << '\n'; work_thread.join(); //阻塞等待线程执行完成 return 0; }
字面意思是:可共享的未来
shared_future对象的行为与future对象类似,但是shared_future对象可以被复制的,多个shared_future对象可以共享所有权。在共享状态的值在准备就绪状态ready后,可以多次获取值。
shared_future对象可以从future对象隐式转换(构造函数),也可以调用future::share显式获得。转换获取后,原future对象无效。
共享状态的生存期持续到与之关联的最后一个对象被销毁为止。
future的get()函数为转移语义数据,只能get一次;shared_future的get()函数为复制数据,可以get多次。
需要用std::shared_future< T >
std::future< T >提供了一个将future转换为shared_future的方法f.share(),但转换后原future状态失效。使用时需要留心。
使用全局变量传递数据,会使得不同函数间的耦合度较高,不利于模块化编程。后面两种方法分别通过函数参数与返回值来传递数据,可以降低函数间的耦合度,使编程和维护更简单快捷。
//修改后,使用shared_future的result_s可以get多次
std::shared_future<int> result_s(std::move(result)); //执行后result为空。
语句执行效果等同于下句:
//std::shared_future<int> result_s(mypt.get_future());
auto mythreadresult = result_s.get(); //mythreadresult=5
mythreadresult = result_s.get(); //mythreadresult=5
std::packaged_task是模板,是任务的封装。好处是不需要修改函数,直接利用return。
std::packaged_task的模板参数是函数签名。
packaged_task包装callable元素,将其作为任务,将callable对象的return结果传输到future对象,并允许异步的获取future对象。
调用packaged_task的get_future()方法,获得它绑定的函数的返回值类型的future。
例如 int add(int a, intb)的函数签名是int(int, int);
std::packaged_task<int(int, int)> task(add);
packaged_task对象包含两个元素:
一个是任务,它是可调用对象(如函数指针、指向成员或函数对象的指针)。
一个是共享状态,能够存储调用已存储任务return的结果,并可通过futrue未来对象在另个线程异步的访问。
通过调用packaged_task的成员函数get_future,将共享状态与future对象关联。使得两个对象共享相同的共享状态:packaged_task对象是异步提供程序,通过调用存储的任务,可以在某个时刻将共享状态设置为ready就绪。
future是异步返回对象,用于检索共享状态的值,在必要时它等待,直到变成准备就绪ready状态。共享状态的生存期持续到与之关联的最后一个对象被销毁为止。
int add(int a, int b) { std::this_thread::sleep_for(std::chrono::seconds(5)); return a + b; } int main() { std::packaged_task<int(int, int)> task(add); std::cout << "Hello World 11" << std::endl; std::future<int> result = task.get_future(); //task(1, 1); //必须要让任务执行,否则在get()获取future的值时会一直阻塞 std::thread th(std::move(task), 10, 5); std::cout << "Hello World 66" << std::endl; std::cout << "Hello World 77" << std::endl; std::cout << result.get() << std::endl; std::cout << "Hello World 88" << std::endl; return 0; }
future库封装了更高级别的函数std::async ,std::async是更简单的异步工具,使异步执行任务更方便。
std::future std::async(std::launch policy, Func, Args…)
std::async函数的返回值是std::futrue对象。可以很方便的获取线程函数的执行结果。
std::async会自动创建一个线程去调用线程函数,它返回一个std::future,这个future中存储了线程函数返回的结果,当我们需要线程函数的结果时,直接从future中获取。
std::launch::async | std::launch::deferred参数 是默认行为(可省略)。有了这个启动策略,它可以异步运行或不运行,这取决于系统的负载。
继续使用上面的程序示例,改为使用std::async传递结果,修改后的代码如下:
std::async()第一个参数std::launch policy是启动策略,控制std::async的异步行为,可以用三种不同的启动策略来创建std::async:
(1)std::launch::async保证异步执行,即传递函数将在单独的线程中执行;
std::launch::async 不需要get(),新线程也会创建并执行
std::future<int> result = std::async(std::launch::async,&A::mythread2, &a, tempar);
(2)std::launch::deferred表示线程入口函数,被延迟到std::future的wait()或者get()函数调用时,才执行;当其他线程调用std::future的get()/wait()来访问共享状态时,将执行非异步行为;
std::future<int> result = std::async(std::launch::deferred,&A::mythread2, &a, tempar);
如果在此之后不调用wait()或者get(),那么上述线程不会被执行。
如果在之后调用get()函数,并没有创建新线程,是在主线程中调用的线程入口函数。
(3)当不指定参数时,默认参数为:std::launch::deferred | std::launch::async,系统会自行选择一种模式。
Func是要调用的可调用对象(function, member function, function object, lambda),Args是传递给Func的参数。
使用std::async能在很大程度上简少编程工作量,我们可以使用std::async替代线程的创建,让它成为我们做异步操作的首选。
class CA { public: int funcHaoShi(int param) { std::cout << "threadid=" << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(5000)); return param * 3; } }; int use_async() { // 演示用 async 在线程间传递结果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::cout << "use_async ... " << std::endl; auto accumulate_future = std::async(std::launch::async, accumulate2, numbers.begin(), numbers.end()); std::cout << "result=" << accumulate_future.get() << '\n'; CA a; auto _future = std::async(&CA::funcHaoShi, &a, 12); //类的成员函数 //当不指定参数时,默认参数为:std::launch::deferred | std::launch::async,系统会自行选择一种模式 std::cout << "result=" << _future.get() << '\n'; return 0; }
int funUseTime(int i) { std::cout << " funUseTime running" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(5)); return i; } int use_async2() { std::future<int> _future = std::async(std::launch::async, funUseTime, 1); std::future_status status; do { status = _future.wait_for(std::chrono::seconds(1)); if (status == std::future_status::deferred) { std::cout << "deferred\n"; } else if (status == std::future_status::timeout) { std::cout << "timeout\n"; } else if (status == std::future_status::ready) { std::cout << "ready!\n"; } } while (status != std::future_status::ready); std::cout << "result is " << _future.get() << '\n'; return 0; }