c++版本生产者消费者模型:
#include <mutex> #include <deque> #include <future> #include <iostream> template <typename T> class LoopQueue { public: LoopQueue() { } void Push(const T &element) { //这个lock用于对生产者间的同步 std::unique_lock<std::mutex> lock(m_mutex); m_deqDatas.push_back(element); //信号量用于生产者,消费者间的同步 // m_notEmpty.notify_one(); m_notEmpty.notify_all(); } T Pop() { //这个lock用于配合信号量的使用 std::unique_lock<std::mutex> lock(m_mutex); /*当后面的lambda返回false时,则信号量进入wait,此时自动释放lock *等待到信号量后,则再获取lock */ m_notEmpty.wait(lock, [this] { return !m_deqDatas.empty(); }); //取元素返回 T front = m_deqDatas.front(); m_deqDatas.pop_front(); return front; } bool Empty() { //这个lock用于对消费者,生产者线程的互斥 std::unique_lock<std::mutex> lock(m_mutex); return m_deqDatas.empty(); } size_t Size() { //这个lock用于对消费者,生产者线程的互斥 std::unique_lock<std::mutex> lock(m_mutex); return m_deqDatas.size(); } private: std::deque<T> m_deqDatas; //互斥量,对队列进行同步保护 std::mutex m_mutex; //用于限制消费者线程 std::condition_variable m_notEmpty; }; void producer(LoopQueue<int> *dep) { while (1) { std::this_thread::sleep_for(std::chrono::seconds(rand() % 3)); int num = rand() % 1000 + 1; dep->Push(num); std::cout << "producer, Push ==========: " << num << std::endl; // break; } } void consumer(LoopQueue<int> *dep) { while (1) { std::this_thread::sleep_for(std::chrono::seconds(rand() % 3)); int ret = dep->Pop(); std::cout << "consumer, Pop: ----------" << ret << std::endl; // break; } } int main() { srand(time(NULL)); LoopQueue<int> queue; std::future<void> fut1 = std::async(std::launch::async, producer, &queue); std::future<void> fut2 = std::async(std::launch::async, consumer, &queue); std::cout << "--------------------------------" << std::endl; fut1.wait(); // 等待异步结束 fut2.wait(); // 等待异步结束 std::cout << "================================" << std::endl; }
运行结果: