什么是条件变量:
条件变量是利用线程间共享的全局变量进行同步的一种机制。主要包括两种动作:
这个怎么理解呢?举个例子:当你去超市买手机,问售货员有没有苹果13,售货员需要在苹果13到的时候才知道有没有苹果13,那么你需要等待售货员的通知,这个时候你是被挂起的,在wait售货员;同时当苹果13到店里的时候,售货员知道了有苹果13,条件成立了,售货员通知你苹果13到了,这个时候就是发信号signal的时候。
一般条件变量需要与互斥锁同时使用,那么互斥锁的用处在下面的例子有讲到。
使用条件变量可以以原子方式阻塞线程,直到某个特定条件为真为止,条件变量一般是与互斥锁一起使用的,对条件变量的测试一般是在互斥锁的保护下进行的。
初始化:
条件变量的初始化和创建进程初始化一摸一样,只是换了个名字,条件变量初始化也包括动态初始化和静态初始化
动态初始化: int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 参数: cond:要初始化的条件变量 attr:NULL--设置条件变量属性,一般设置为NULL即可 返回值: 成功返回0,失败返回错误码 静态初始化: pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
销毁:
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足:
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 参数: cond:要在这个条件变量上等待 mutex:互斥量,后面详细解释
唤醒等待:
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒所有进程 int pthread_cond_signal(pthread_cond_t *cond); //唤醒一个进程
思路:
创建一个master线程和3个worker线程,master线程用来发送信号signal,worker的每个线程接收到master线程信号开始做自己的动作。
主函数中的主线程工作:初始化(线程,条件变量),创建线程,等待线程,销毁。
两个新线程的动作:master状态就绪发送信号,唤醒等待队列中的线程,worker线程阻塞等待接收信号,再做动作。我们使用了pthread_cond_signal唤醒队列中的线程,那么唤醒的是哪一个呢?唤醒的是队列中第一个线程。
#include <iostream> #include <string> #include <pthread.h> #include <unistd.h> using namespace std; pthread_mutex_t mtx;//互斥锁 pthread_cond_t cond;//条件变量 void* ctrl(void* args) { std::string name=(char*)args; while(true) { std::cout<<"master say:begin work"<<std::endl; //pthread_cond_broadcast(&cond); pthread_cond_signal(&cond); sleep(1); } } void* work(void* args) { int number=*(int*)args; delete (int*)args; while(true) { pthread_cond_wait(&cond,&mtx); std::cout<<"worker:"<<number<<"is working..."<<std::endl; } } int main() { #define NUM 3 pthread_mutex_init(&mtx,nullptr); pthread_cond_init(&cond,nullptr); pthread_t master;//创建线程 pthread_t worker[NUM]; pthread_create(&master,nullptr,ctrl,(void*)"boss"); for(int i=0;i<NUM;i++) { int* number=new int(i); pthread_create(worker+i,nullptr,work,(void*)number); } for(int i=0;i<NUM;i++) { pthread_join(worker[i],nullptr); } pthread_join(master,nullptr); pthread_mutex_destroy(&mtx); pthread_cond_destroy(&cond); }
makefile文件:
myctrl:myctrl.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f myctrl
运行结果:
这个例子也就说明了条件变量内部一定有一个等待队列。我们可以把条件变量看作一个结构体,结构体里面有一个队列用来存放等待的线程,还有一个状态变量,当进程被唤醒,这个状态变量status由0变1,然后再将等待队列的线程由D状态变成R状态,完成唤醒操作。
struct cond { int status; task_struct* q; }
条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须 要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件 变量上的线程。 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有 互斥锁就无法安全的获取和修改共享数据 既然上锁了,那么当一个线程条件不满足,使用条件变量等待,会将锁释放归还,然后继续执行下面的动作代码,这样不会因为条件不满足一直占用锁,做不了其它动作而浪费锁资源。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者 要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞)
我们用队列模拟实现一个生产者消费者模型,其中需要将Push函数和Pop函数自定义实现。那么Push函数的核心就是,向阻塞队列中push数据,当队列满的时候,就不能再push数据,需要return。 因为push操作和pop操作都要抢占队列,所以队列不是线程安全的。我们知道STL不是安全的,所以STL安全需要我们成需要自己去维护的。所以生产者和消费者需要通过枷锁来保证队列的安全。所以成员变量中bq_和cap_都是临界资源。我们要引入pthread.h中的锁。 当生产者不断在生产,加入生产者生产力特别强,不断地向队列里塞任务,使得队列被塞满了。当队列被塞满了,生产者还是在不断的申请锁,导致消费者得不到资源,进而导致消费者产生的饥饿问题。所以我们最理想的情况是: 当生产满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费 当消费空了,就不应该消费(不要竞争锁了),而应该让生产者来进行生产 这次我们就不想让某个线程来无脑抢锁,无脑的向队列里塞,此时就因该让他俩有顺序性,这个顺序性并不是你一下我一下的,我们可以让某个Push或Pop多做一点,或少做一点。,我们宏观意义上需要遵守上面两条规则。所以我们需要两个条件变量full_和empty_。
ps:const& 是输出型参数,*是输出型参数,&是输入输出型参数。
当我们push数据的时候,要保证队列数据不能为满,满了我们就不能放了。所以构造函数中我们初始化cap_,给一个定值。const int default_cap=5。当push满的时候做判断,所以我们需要设计一个接口isFull,这个接口不用暴露在外部,设置为private。 无论是判断队列是否为满,还是向队列中放数据,都是在访问临界资源,需要进行LockQueue(),将队列锁住;在放完数据之后还需要UnlockQueue(),将队列解锁,此时在枷锁和解锁直接访问才是安全的。者两个接口也不需要暴露在外面,设置为private。 构造函数中需要先初始化锁和条件变量,才能写这个锁函数。
伪代码:
void Push(const T& in) { LockQueue(); if(IsFull()) { return; } bq_.push(in); UnlockQueue(); }
因为push和pop操作都是非安全线程,需要我们自己加锁,在return之后,就会将锁归还,归还之后,可能因为push线程的锁竞争能力特别强,使得pop操作一直得不到锁,导致锁一直被push占用,浪费锁资源。所以就用到了条件变量pthread_cond_wait()操作,让push线程等待,等待别的线程给你发信号,可以push的时候,再继续push。
那么在pop函数中,同上,当队列是空的时候,pop操作需要等待,不能再继续pop了。等待需要一个等待信号,pop操作要等待队列满,那么这个等待信号我们命名为is_full,等待队列有数据的时候才能再次pop,那么就会让生产者开始生产,生产者在生产数据后,就可以继续让pop线程运行下去,用一个signal(is_full)信号来唤醒pop的wait(is_full)。
那么当我们使用pthread_cond_wait函数的时候,这个进程已经被停止,但是最开始我们还获取了一个锁,那么你wait等待的时候不能占用锁等待呀,这样会造成其它进程抢不到锁而产生死锁,所以wait函数中还要传一个锁参数,就是为了在线程挂起等待的时候,将锁归还。
在这个线程被signal信号唤起后,这个进程又参与到抢锁。
我么设置一个值,当队列元素个数大于一半的时候,就唤醒消费者pop操作,当队列元素小于整个队列的一半的时候,就唤醒生产者push操作。
所以只有生产者知道消费者再什么时候可以消费;只有消费者知道生产者什么时候来生产。
BlockQueue.hpp
#include <iostream> #include <queue> #include <pthread.h> #pragma once namespace ns_blockqueue { const int default_cap=5; template <class T> class BlockQueue { private: std::queue<T> bq_; //阻塞队列 int cap_; //记录队列元素上限 pthread_mutex_t mtx_; //保护临界资源 pthread_cond_t is_full_; //bq_满的,消费者在该条件变量下等待 pthread_cond_t is_empty_; //bq_空的,生产者在该条件变量下等待 private: bool IsFull() { return bq_.size() == cap_; } void LockQueue() { pthread_mutex_lock(&mtx_); } void UnlockQueue() { pthread_mutex_unlock(&mtx_); } void ProducterWait() { //pthread_cond_wait:1.调用的时候,会首先自动释放mtx_,然后再挂起自己 //2.返回的时候,会首先自动竞争锁,获取到锁之后才能返回 pthread_cond_wait(&is_empty_,&mtx_); } bool IsEmpty() { return bq_.size() == 0; } void ConsumerWait() { pthread_cond_wait(&is_full_,&mtx_); } void WakeUpConsumer(){ pthread_cond_signal(&is_full_); } void WakeUpProducter(){ pthread_cond_signal(&is_empty_); } public: BlockQueue(int cap=default_cap) :cap_(cap) { pthread_mutex_init(&mtx_,nullptr); pthread_cond_init(&is_empty_,nullptr); pthread_cond_init(&is_full_,nullptr); } ~BlockQueue() { pthread_mutex_destroy(&mtx_); pthread_cond_destroy(&is_empty_); pthread_cond_destroy(&is_full_); } public: //const& :输入 //* :输出 //& :输入输出 void Push(const T& in) { LockQueue(); if(IsFull()) { ProducterWait();//队列满了,生产者等待,等待空的cond } //向队列中放数据,生产函数 bq_.push(in); if(bq_.size()>cap_/2) WakeUpConsumer();//有数据了就可以消费 UnlockQueue(); } void Pop(T* out) { //从队列中拿数据,消费函数 LockQueue(); if(IsEmpty())//如果是空的,无法消费,应该让消费者去等待 { ConsumerWait();//等待满的条件变量 } *out=bq_.front(); bq_.pop(); if(bq_.size()<cap_/2)//当队列中元素小于一半,让生产者来生产 WakeUpProducter();//消费数据了就可以生产了 UnlockQueue(); } }; }
唤醒线程这个操作在加锁里面或者外面都可以:在锁外面的时候,已经把锁释放了,那么唤醒操作立马就能竞争到锁。在锁里面的时候,这个锁有可能被释放,也有可能没有被释放,没有被释放的时候,被唤醒也要重新申请锁,当把你唤醒的时候,你不在条件变量下等了,有可能被挂起到互斥锁当中去唤醒。但是没有关系,因为在锁里面的时候,马上就要被唤醒了,释放了就立即申请锁。
void Push(const T& in) { LockQueue(); if(IsFull()) { ProducterWait(); } //向队列中放数据,生产函数 bq_.push(in); WakeUpConsumer(); UnlockQueue(); // WakeUpConsumer(); }
CpTest.cc
主函数中,创建两个线程,消费者和生产者,消费者push随机数字,生产者pop队列头部front数据。通常生产者生产一个就会消费一个,我们自己设置一下,生产者生产一部分数据,消费者过了两秒再开始消费。
当把数据push到队列中后,需要将数据从队列中pop,pop函数的参数是输出型参数,我们要从里面拿出数据,所以我们用&data,我们就将队列中数据拿出来了。
我们控制一下生产者的速度,每两秒Push一个数据。在我们没有控制生产者速度的时候,只要队列有空间了,就Push数据。在我们控制速度后,Pop速度是受到Push限制的,pop操作必须跟着push节奏来,生产一条,消费一条,这就是阻塞队列。
还有一种控制办法,就是当队列元素超过一办,就在push函数中唤醒消费者;当队列元素少于一半,就在pop函数中唤醒生产者。
#include "BlockQueue.hpp" #include <iostream> #include <pthread.h> #include <time.h> #include <cstdlib> #include <unistd.h> using namespace ns_blockqueue; void* producter(void* args) { //先将阻塞队列参数拿到 BlockQueue<int>* bq=(BlockQueue<int>*)args; while(true)//不断向阻塞队列放任务 { sleep(2); //1.制造数据 int data = rand()%20 + 1; std::cout<<"生产者生产数据:" << data <<std::endl; bq->Push(data); } } void* consumer(void* args) { BlockQueue<int>* bq=(BlockQueue<int>*)args; while(true)//不断从阻塞队列中拿到任务 { int data=0; bq->Pop(&data); std::cout<<"消费者消费了一个数据:"<<data<<std::endl; } } int main() { srand((long long)time(nullptr)); BlockQueue<int>* bq = new BlockQueue<int>(); //创建两个线程 pthread_t c,p; //生产者和消费者线程 pthread_create(&c,nullptr,consumer,(void*)bq);//为了让两个线程产生联系,我们将阻塞队列传递给线程,这样两个线程拿到了同一个阻塞队列 pthread_create(&p,nullptr,producter,(void*)bq);//一个可以从procudter向阻塞队列放数据,一个可以从consumer从阻塞队列中拿数据 //创建线程主线程等待 pthread_join(c,nullptr); pthread_join(p,nullptr); return 0; }
makefile:
CpTest:CpTest.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f CpTest
1.在队列满的时候,push操作会等待队列有空,需要进行ProducterWait()等待将线程挂起,那么我挂起失败了呢?如果我被异常或伪唤醒了呢?
将线程挂起,ProducterWait()是一个函数,运行函数调用就有可能失败;万一IsFull函数没有就绪,就将线程挂起操作伪唤醒了(这个情况在单核cpu中不会出现,如果是多核cpu,当很多线程都在条件变量下等待,虽然大家会排队,但是它就有可能向目标条件变量发送条件就绪的指令,最后导致当前线程被伪唤醒,总结来讲就是条件不满足了被唤醒了。再比如说,我生产了10个数据,那么就会WakeUpConsumer10次,那么在WakeUpConsumer10次的时候,它也在执行唤醒生产者这个操作,而当消费者消费的时候,可能根本不会进行生产操作,WakeUpProducter无效。而此时消费者一直醒着,那么它一直在消费,但是生产者一直被唤醒,发送的生产者通知信息量十一个过量的信息量。所以当进行等待的时候,历史上会收到通知,导致伪唤醒。总之加一个while就不会出现这些情况)。这两种情况出现了,说明我们的生产条件不具备。当队列是满的,还继续向下走,这时候就向队列中插入了不该插入的数据,说明使用if来进行判断是不太完善的,所以我们需要将判断条件代码做一下改善。
通过将if变成while,做轮询检测。通过Push操作举例子,如果判断成功(判断是满的),那么就挂起线程,如果挂起失败,while循环再继续判断一次,直到成功挂起线程。
2.上面生产者生产的数据是我们自己制造出来的,排除这种情况,生产者的数据是从哪来的?消费者拿到数据就是把数据从交易场所拷贝到自己的上下文就结束了吗?
生产和消费,传输数据只是第一步,那么数据怎么来的呢?耗时吗?数据又是怎么处理的?它耗时吗?
再举一个例子,当生产者获取了+-*/的任务,消费者要将这个+-*/的任务拿到自己的上下文来处理,这个就是我们用模板的好处,不仅可以处理int数据,还可以处理自定义任务。那么下面我们写一个计算任务
主函数的线程不变,但是线程操作producter和consumer要改变,因为要执行任务,所以我们让生产者生产数据,消费者拿到数据并做处理。生产数据通过Task任务的有参构造函数来构造任务,之后让消费者拿到任务处理,这个处理方法,我们在Task任务类中协议给Run函数,让消费者调用即可。
同时我们可以将run函数封装成一个仿函数,直接调用对象+()即可
Task.hpp
#pragma once #include <iostream> #include <pthread.h> namespace ns_task { class Task { private: int x_; int y_; char op_; //+-*/% public: Task() {} Task(int x, int y, char op) : x_(x), y_(y), op_(op) { } ~Task() {} int Run() { int res = 0; switch (op_) { case +: res = x_ + y_; break; case -: res = x_ - y_; break; case *: res = x_ * y_; break; case /: res = x_ / y_; break; case %: res = x_ % y_; break; default: std::cout << "bug??" << std::endl; break; } std::cout<<"当前任务正在被:"<<pthread_self()<<"处理:" <<x_<<op_<<y_<<"="<<res<<std::endl; return res; } int operator()() { return Run(); } }; }
同时与上面的CpTest.cc代码不同,上面是单生产单消费,那么我们也可以多生产多消费。所以要再创建一批pthread_t的线程。
CpTest.cc
#include "BlockQueue.hpp" #include "Task.hpp" #include <iostream> #include <pthread.h> #include <time.h> #include <cstdlib> #include <unistd.h> using namespace ns_blockqueue; using namespace ns_task; void* producter(void* args) { BlockQueue<Task>* bq= (BlockQueue<Task>*)args; std::string ops="+-*/%"; while(true) { //1.制造数据 int x = rand()%20+1; //[1-20] int y = rand()%10+1; //[1-10] char op = ops[rand()%5]; Task t(x,y,op); std::cout<<"生产者派发了一个任务:"<<x<<op<<y<<"?"<<std::endl; bq->Push(t); sleep(1); } } void* consumer(void* args) { BlockQueue<Task>* bq=(BlockQueue<Task>*)args; while(true) { Task t; bq->Pop(&t); t(); } } int main() { srand((long long)time(nullptr)); BlockQueue<Task>* bq = new BlockQueue<Task>(); //创建两个线程 pthread_t c,p; pthread_t c1,c2,c3,c4; //生产者和消费者线程 pthread_create(&c,nullptr,consumer,(void*)bq);//为了让两个线程产生联系,我们将阻塞队列传递给线程,这样两个线程拿到了同一个阻塞队列 pthread_create(&c1,nullptr,consumer,(void*)bq); pthread_create(&c2,nullptr,consumer,(void*)bq); pthread_create(&c3,nullptr,consumer,(void*)bq); pthread_create(&c4,nullptr,consumer,(void*)bq); pthread_create(&p,nullptr,producter,(void*)bq);//一个可以从procudter向阻塞队列放数据,一个可以从consumer从阻塞队列中拿数据 //创建线程主线程等待 pthread_join(c,nullptr); pthread_join(c1,nullptr); pthread_join(c2,nullptr); pthread_join(c3,nullptr); pthread_join(c4,nullptr); pthread_join(p,nullptr); return 0; }
a.信号量本质就是一把计数器,描述临界资源中资源数目的大小。也就是最多能有多少资源分配给线程。
b.信号量本质和电影院概念相似。举个例子,比如说电影院有100张票,当有票的时候,买走一张票就会tickets--,当票数小于或等于0的时候,就不能再卖票。那么是不是我们买完票,人坐在座位上,这个座位才属于我呢?并不是,只要在申请买到票的时候,这个座位已经属于自己。所以买票的本质是:预定资源。
假设这里有一个阻塞队列,有七个空间,虽然整体称为临界资源,但是可以让不同线程访问临界资源的不同区域,此时就可以让多个线程在一个大的临界资源里直接并行访问小的资源,只要各个线程访问的位置不同就可以。
所以临界资源如果可以被划分为一个一个的小资源,如果处理得当,我们也有可能让多个线程同时访问临界资源的不同却与,从而实现并发。
以上就是多线程预定资源的手段。
那么每个线程想访问临界资源,都要先申请信号量资源,申请到了,就一定会有一小块资源属于自己。
假如有一个计数器count,count有5个,也就是有五个信号量。申请信号量就会进行count--操作,释放信号量就会执行count++操作归还信号量。那么在计算count++/count--操作的时候要经过三个步骤,将数据从内存加载到cpu,在cpu进行运算,将计算后的结果返回内存。那么在者中间很多线程会抢占cpu资源,导致count本身就不是安全的。所以我们需要加锁。下面来看一段伪代码
申请信号量:如果申请失败,解锁并返回到开始,如果申请成功就count--。
start; lock(); if(count <= 0) { //挂起 unlock(); goto start; } else { count--; } unlock();
释放信号量:
lock(); count++; unlock();
用这种伪代码的方式来保证信号量操作是原子的。上面的count--就是P操作,count++就是V()操作。
初始化信号量
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); 参数: pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值,你想让计数器是多少就写多少 返回值:成功0.错误返回错误码
销毁信号量
int sem_destroy(sem_t *sem);
信号量各个操作
SYNOPSIS #include <semaphore.h> int sem_wait(sem_t *sem); -- 信号量申请成功继续执行,不成功用挂起等待,P()操作 int sem_post(sem_t *sem); -- unlock a semaphore,V()操作
从一个起点开始放数据,当放完一圈,会从开头处再继续生成,再继续放就会覆盖数据。
环形队列什么时候空,什么时候满?最开始tail和head是在统一位置,表示环形队列是空的,那么在tail不消耗数据的情况下,head一直放数据,最终会又走到和tail一样的位置,当head再次走到tail位置,就会将tail位置数据覆盖。那么说明放!=拿的时候,说明有数据。
那么判断满的方法
1.生产者消费者环形队列基本原理
多线程的情况,来进行环形队列的并发访问,实现一个基于环形队列的生产消费模型。当队列为空的时候,生产者head和消费者tail指针指向同一个位置;生产者和消费者在为满的时候也会指向同一个位置。那么当队列不为空,不为满的时候,生产者和消费者一定指向的不是同一个位置。这个时候生产者和消费者就可以并发执行。
当我们为空/满的时候,要保证一定不能让生产和消费同时进行,也就要表现出一定的互斥特性。那么当队列为空的时候,应该让生产者执行;当队列为满的时候,应该让消费者先访问。
2.生产者消费者环形队列基本实现思想:
放的人就是生产者,拿的人是消费者。生产者和消费者最关心什么资源呢?生产者关心环形队列中的空位置,消费者关心环形队列中的数据,需要从环形队列中拿数据。空的位置和数据我们把它统称为资源。
所以环形队列的实现需要遵守以下规则:
规则1:生产者不能把消费者再套一个圈。意思是,当生产者生产数量已经追上消费者消费的量,也就是生产者已经和消费者再次重叠,不能生产出第二圈数量,必须等消费者消费之后有空余空间,再生产。 规则2:消费者不能超过生产者 规则3:当指向同一个位置的时候,要根据空/满状态,来判定谁先执行,除此之外,生产和消费可以并发执行。
3.实现伪代码
在最开始队列为空的时候,假设队列大小为10,空位置有10个,有数据的位置有0个,所以我们定义两个信号量,一个用来表示空位置数量,一个用来表示有数据的位置的数量。
sem blank = 10;
sem data = 0;
那么接下来需要两个动作,生产和消费。那么生产的前提条件是先申请空格子P(blank),有了空格子才能生产数据;同理,消费者也需要提前申请数据格子P(data),有了数据才能消费。在生产了一个数据之后,消费者不能立马消费数据,因为data还是为0,需要先申请信号量再消费。那么在生产一个数据之后,就要data++——V(data),一旦V(data),消费者立马醒来,可以开始消费了
通过以上理论,我们来实现以下代码。
主函数ring_cp.cc中,和以往一样,我们先创建两个线程,并对这两个线程分别进行生产producter和消费pop操作。那么我们将基于生产者消费者模型的环形队列封装成一个类。因为环形队列是通过数组来实现,所以成员变量中使用vector表示环形队列,当生产者或消费者任意一个走到数组最大位置,那么继续向下走,通过取模的方式再次走到最开始的位置,循环往复。那么还可以定义一个cap_来记录队列元素的上限,我们通过定义一个全局变g_cap_default(g代表global全局)量来初始化。
那么下面我们来实现push和pop。那么进行操作就要先预定,可以不立即就用上,但是我们需要先预定资源。需要用到semaphore.h下的信号量函数。
在构造函数中初始化,在析构函数中销毁。在push函数中先申请空信号量,申请之后我们将数据放到哪里呢?信号量只是帮我们预定位置,但是放数据这个事情是需要我们程序员去做。我们传参来确定位置,但是这样的话顶层数据就暴露给用户,而用户并不知道底层,所以我们最好封装在RingQueue类中,将位置变量变成成员变量,定义c_step(消费位置),p_step(生产位置),并在构造函数中初始化。
push中每在p_setp_位置放入数据,就++一次p_step_,表示下次申请到位置后,就在这个位置来放入数据。在我们申请位置,并放入数据后,完成P()操作之后,这代已经有了数据,可以让消费者来消费,所以就要用sem_post(&data_sem_)给消费者发信号,此时sem_wait就等到有数据的位置了,可以开始消费操作了。
pop函数中,同样,我们需要先申请数据,看到数据之后,才能将数据拿出来。将数据拿出来后,数据没了,格子就露出来了,V操作将格子归还,然后对消费者位置++,消费者消费完一个,就继续向下消费。因为怕位置越界,对位置取模。
我们看一下单消费单生产版本
ring_queue.hpp
#pragma once #include <iostream> #include <vector> #include <semaphore.h> namespace ns_ring_queue { const int g_cap_default = 5; template <class T> class RingQueue { private: std::vector<T> ring_queue_; //环形队列 int cap_; //队列元素上限 sem_t blank_sem_; //生产者关心空位置资源 sem_t data_sem_; //消费者关心有数据资源 int c_step_; int p_step_; //生产者位置下标 public: RingQueue(int cap = g_cap_default) : ring_queue_(cap), cap_(cap) { sem_init(&blank_sem_,0,cap);//不共享设置为0,设置信号量大小为cap,最开始有cap个空位置‘ sem_init(&data_sem_,0,0);//不共享,最开始没有数据信号量为0 c_step_ = p_step_ = 0; } ~RingQueue() { sem_destroy(&blank_sem_); sem_destroy(&data_sem_); } public: //目前高优先级的先实现单生产和单消费 void Push(const T& in) { sem_wait(&blank_sem_);//P(空位置) ring_queue_[p_step_] = in; sem_post(&data_sem_); //V(数据) p_step_++; p_step_%=cap_; } void Pop(T* out) { //消费接口 sem_wait(&data_sem_);//P(data) *out=ring_queue_[c_step_]; sem_post(&blank_sem_);//V(blank) c_step_++; c_step_%=cap_; } }; }
ring_cp.cc
#include "ring_queue.hpp" #include <pthread.h> #include <time.h> #include <unistd.h> using namespace ns_ring_queue; void* consumer(void* args) { RingQueue<int>* rq=(RingQueue<int>*)args; while(true) { int data = 0; rq->Pop(&data); std::cout << "消费数据是:" << data <<std::endl; sleep(1); } } void* producter(void* args) { RingQueue<int>* rq=(RingQueue<int>*)args; while(true) { int data = rand()%20 + 1; std::cout<<"生产数据是:"<<data<<std::endl; rq->Push(data); } } int main() { srand((long long)time(nullptr)); RingQueue<int>* rq = new RingQueue<int>(); pthread_t c,p; pthread_create(&c,nullptr,consumer,(void*)rq); pthread_create(&p,nullptr,producter,(void*)rq); pthread_join(c,nullptr); pthread_join(p,nullptr); return 0; }
上面单生产单消费版本,我们只关心生产者与消费者的关系,他们的关系是竞争并同步,同步就是排队。多生产的多消费版本,我们除了生产者与消费者的关系,还需要维护生产者与生产者,消费者与消费者之间的关系。生产者与生产者也是竞争关系,但他们是互斥的,一个线程正在生产,其他线程想要再生产竞争锁来保证一个线程生产是原子性的。消费者与消费者也是竞争且互斥的关系的。
那么我们就要定义两把互斥锁,那么我们加锁应该在申请信号量之前还是在申请信号量之后呢?如果在信号量之前加锁,它也不会出错,但这不是我们想要的,这样的话每次在申请锁成功之后才能申请信号量资源,当生产者不断生产,消费者不去消费,等生产者生产满了,消费者要申请信号量之前都要申请锁,这样效率降低,如果在申请信号量之后加锁,如果还有空位置,那么就可以申请,没有空位置,生产者信号量申请不成功,就会等待,不会有更多的消耗。前面那种情况一直申请锁会消耗资源,后面这种情况就是先申请信号量,预先分配给各个线程,所有人有资格竞争锁的前提是必须先申请信号量,如果有多个生产者,那么就预先把所有的信号量全部分配给生产者,当锁一旦释放,其他人立即申请锁。在申请锁的同时,别人就可以并行把信号量准备好。所以将锁放到信号量后面效率会高一点点。
在单消费单生产模型,位置变量p_step_/c_step_不是临界资源,因为只有一个生产者执行p_step_,而多消费多生产模型,p_step_/c_step_就是临界资源。
多生产和多消费的优势,最主要的不在于放数据和拿数据,而在于并发的获取和处理任务。
代码:
我们复用上面的Task.hpp代码,向队列中push/pop任务
ring_cp.cc
#include "ring_queue.hpp" #include <pthread.h> #include <time.h> #include <unistd.h> #include "Task.hpp" using namespace ns_task; using namespace ns_ring_queue; void* consumer(void* args) { RingQueue<Task>* rq=(RingQueue<Task>*)args; while(true) { Task t; rq->Pop(&t); t(); sleep(1); } } void* producter(void* args) { RingQueue<Task>* rq=(RingQueue<Task>*)args; const std::string ops = "+-*/%"; while(true) { int x = rand()%20 + 1; int y = rand()%10 + 1; char op = ops[rand()%ops.size()]; Task t(x,y,op); std::cout<<"生产数据是:"<<t.Show()<<"我是:"<<pthread_self()<<std::endl; rq->Push(t); } } int main() { srand((long long)time(nullptr)); RingQueue<Task>* rq = new RingQueue<Task>(); pthread_t c0,c1,c2,c3,p0,p1,p2; pthread_create(&c0,nullptr,consumer,(void*)rq); pthread_create(&c1,nullptr,consumer,(void*)rq); pthread_create(&c2,nullptr,consumer,(void*)rq); pthread_create(&c3,nullptr,consumer,(void*)rq); pthread_create(&p0,nullptr,producter,(void*)rq); pthread_create(&p1,nullptr,producter,(void*)rq); pthread_create(&p2,nullptr,producter,(void*)rq); pthread_join(c0,nullptr); pthread_join(c1,nullptr); pthread_join(c2,nullptr); pthread_join(c3,nullptr); pthread_join(p0,nullptr); pthread_join(p1,nullptr); pthread_join(p2,nullptr); return 0;