本文主要是介绍C++多生产者多消费者模型,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
// 多生产者多消费者模型
// 需要了解以下概念
// thread 线程
// mutex 互斥锁
// atomic 原子操作
// condition_variable 条件变量
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <queue>
#include <random>
#include <ctime>
#include <vector>
// 随机数生成,用于模拟生产消费的耗时
std::default_random_engine e(time(0));
std::uniform_int_distribution<int> distribution(3000, 7000);
class Factory
{
public:
Factory(int production_target, int producer_count,
int consumer_count, int line_capacity);
void produce(int tag);
void consume(int tag);
void start_working();
private:
bool do_produce(int tag);
bool do_consume(int tag);
private:
const int production_target; // 生产目标,计划生产的数目
const int producer_count; // 生产者数量
const int consumer_count; // 消费者数量
const int line_capacity; // 产品暂存区最大存放量
std::atomic<int> produced_count = 0; // 已生产产品数量
std::atomic<int> consumed_count = 0; // 已消费产品数量
std::atomic_flag produce_done = ATOMIC_FLAG_INIT; // 产品已经生产完毕
std::atomic_flag consume_done = ATOMIC_FLAG_INIT; // 产品已经消费完毕
std::queue<int> product_line; // 已生产未消费产品存放区
std::mutex mtx; // 互斥锁
std::condition_variable line_not_full; // 消费者消耗了一个产品
std::condition_variable line_not_empty; // 生产者产出了一个产品
};
Factory::Factory(int production_target, int producer_count,
int consumer_count, int line_capacity) :
production_target(production_target),
producer_count(producer_count),
consumer_count(consumer_count),
line_capacity(line_capacity)
{
// do nothing
}
bool Factory::do_produce(int tag)
{
// 假设生产总是成功的
std::this_thread::sleep_for(std::chrono::duration(
std::chrono::milliseconds(distribution(e))));
return true;
}
bool Factory::do_consume(int tag)
{
// 假设消费总是成功的
std::this_thread::sleep_for(std::chrono::duration(
std::chrono::milliseconds(distribution(e))));
return true;
}
void Factory::produce(int tag)
{
while(produced_count < production_target)
{
std::unique_lock<std::mutex> lock(mtx);
if(produced_count < production_target && product_line.size() < line_capacity)
{
int id = ++produced_count; // 提前将计数器加上,防止生产超额
std::cout << "[P" << tag << "] prodecing " << id << "\n";
lock.unlock(); // 具体的生产过程会比较耗时,将锁释放,使用其它线程生产/消费
do_produce(tag); // 如果生产可能失败,则需要重新设计此处逻辑
lock.lock();
product_line.push(id);
std::cout << "[P" << tag << "] prodeced " << id << "\n";
line_not_empty.notify_one(); // 只生产了一个产品
if(product_line.size() == line_capacity && produced_count < production_target)
{
std::cout << "[" << tag << "] product line full\n";
line_not_full.wait(lock);
}
}
}
if(produce_done.test_and_set()) // 生产结束,需要唤醒所有消费者
line_not_empty.notify_all(); // 消费者在生产线为空后一直等待
std::cout << "producer " << tag << " done\n";
}
void Factory::consume(int tag)
{
while(consumed_count < production_target)
{
std::unique_lock<std::mutex> lock(mtx);
if(consumed_count < production_target && !product_line.empty())
{
int id = product_line.front();
++consumed_count;
product_line.pop();
std::cout << "[C" << tag << "] consuming " << id << "\n";
lock.unlock();
do_consume(tag);
lock.lock();
std::cout << "[C" << tag << "] consumed " << id << "\n";
line_not_full.notify_one();
if(product_line.empty() && consumed_count < production_target)
{
std::cout << "[" << tag << "] product line empty\n";
line_not_empty.wait(lock);
}
}
}
if(consume_done.test_and_set())
line_not_full.notify_all();
std::cout << "consumer " << tag << " done\n";
}
void Factory::start_working()
{
std::vector<std::thread> threads;
for(int i = 0; i < producer_count; ++i)
threads.emplace_back(&Factory::produce, this, i); // 使用类成员函数时需要加上this指针
for(int i = producer_count; i < producer_count + consumer_count; ++i)
threads.emplace_back(&Factory::consume, this, i);
for(int i = 0; i < producer_count + consumer_count; ++i)
threads[i].join();
}
int main()
{
Factory factory(10, 3, 3, 3);
factory.start_working();
std::cout << "done\n";
}
// TODO
// 1. 为每条打印信息加上时间戳
// chrono打印时间戳有点麻烦
// 2. 处理生产/消费失败的情形
// 使用两个变量,一个记录已生产的数量,一个记录正在生产的数量(包含已生产数量)
这篇关于C++多生产者多消费者模型的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!