条件变量是线程间同步的一种机制,本文分析条件变量的实现和使用。我们先看一下条件变量的定义。
typedef struct { int c_spinlock; /* Spin lock to protect the queue. */ struct _pthread_queue c_waiting; /* Threads waiting on this condition. */ } pthread_cond_t;
我们看到条件变量的定义很简单,条件变量通常配合互斥变量一起使用,大致流程如下
加锁 if (条件不满足) { 阻塞在条件变量 } 操作加锁的资源 解锁
其实机制也很简单,条件变量就是在条件不满足的时候,把线程插入等待队列,等待条件满足的时候再唤醒队列里的线程。我们看一下具体实现。
// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) { volatile pthread_t self = thread_self(); // 加锁操作队列 acquire(&cond->c_spinlock); // 插入条件的等待队列 enqueue(&cond->c_waiting, self); // 操作完释放锁 release(&cond->c_spinlock); // 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足 pthread_mutex_unlock(mutex); // 挂起等待条件满足后被唤醒 suspend_with_cancellation(self); // 被唤醒后重新获取互斥锁 pthread_mutex_lock(mutex); /* This is a cancellation point */ // 取消点,等待期间被取消了 if (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE) { /* Remove ourselves from the waiting queue if we're still on it */ acquire(&cond->c_spinlock); // 线程准备退出,从条件阻塞队列中移除 remove_from_queue(&cond->c_waiting, self); release(&cond->c_spinlock); pthread_exit(PTHREAD_CANCELED); } return 0; }
pthread_cond_wait函数是当条件不能满足时,线程调用的函数。调用完后线程会被挂起,等待被唤醒(如果不希望一直被阻塞可以调用pthread_cond_timedwait,pthread_cond_timedwait支持定时阻塞)。看一下挂起线程的逻辑。
static inline void suspend_with_cancellation(pthread_t self) { sigset_t mask; sigjmp_buf jmpbuf; // 获取当前的信号屏蔽码 sigprocmask(SIG_SETMASK, NULL, &mask); /* Get current signal mask */ // 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号 sigdelset(&mask, PTHREAD_SIG_RESTART); /* Unblock the restart signal */ /* No need to save the signal mask, we'll restore it ourselves */ /* 直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时, 收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里 */ if (sigsetjmp(jmpbuf, 0) == 0) { self->p_cancel_jmp = &jmpbuf; // 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒 if (! (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE)) { do { // 挂起等待restart信号 sigsuspend(&mask); /* Wait for a signal */ } while (self->p_signal != PTHREAD_SIG_RESTART); } self->p_cancel_jmp = NULL; } else { // 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号 sigaddset(&mask, PTHREAD_SIG_RESTART); /* Reblock the restart signal */ sigprocmask(SIG_SETMASK, &mask, NULL); } }
我们看到最终通过调用sigsuspend挂起线程。等待信号的唤醒,从while循环的条件我们可以看到,当收到PTHREAD_SIG_RESTART信号的时候线程才会真正被“唤醒”。接着我们看看当条件满足后,其他线程是如何唤醒被阻塞的线程的。
// 条件满足,唤醒线程 int pthread_cond_signal(pthread_cond_t *cond) { pthread_t th; acquire(&cond->c_spinlock); // 取出一个被被阻塞的线程 th = dequeue(&cond->c_waiting); release(&cond->c_spinlock); // 发送信号唤醒他 if (th != NULL) restart(th); return 0; } // 给pid进程发送唤醒信号 static inline void restart(pthread_t th) { kill(th->p_pid, PTHREAD_SIG_RESTART); }
我们看到pthread_cond_signal的函数非常简单,从阻塞队列中获取一个线程,然后给他发一个唤醒信号。另外线程库也支持唤醒所有线程。
// 条件满足,唤醒所有线程 int pthread_cond_broadcast(pthread_cond_t *cond) { pthread_queue tosignal; pthread_t th; acquire(&cond->c_spinlock); /* Copy the current state of the waiting queue and empty it */ tosignal = cond->c_waiting; // 重置阻塞队列 queue_init(&cond->c_waiting); release(&cond->c_spinlock); /* Now signal each process in the queue */ // 发送信号唤醒所有线程 while ((th = dequeue(&tosignal)) != NULL) restart(th); return 0; }
pthread_cond_broadcast就是给每一个等待的线程发送唤醒信号。这就是线程条件变量的原理和实现。最后我们看一下使用例子。
struct prodcons { int buffer[BUFFER_SIZE]; /* 环形数据缓冲区 */ pthread_mutex_t lock; /* 访问数据区的互斥锁 */ int readpos, writepos; /* 读写指针 */ pthread_cond_t notempty; /* 消费者使用的条件变量,非空即有数据消费 */ pthread_cond_t notfull; /* 生产者使用的条件变量,非满即可以生产数据 */ }; struct prodcons buffer; void init(struct prodcons * b) { pthread_mutex_init(&b->lock, NULL); pthread_cond_init(&b->notempty, NULL); pthread_cond_init(&b->notfull, NULL); b->readpos = 0; b->writepos = 0; } int main() { pthread_t th_a, th_b; void * retval; // 初始化线程间共享的数据结构 init(&buffer); // 创建两个线程 pthread_create(&th_a, NULL, producer, 0); pthread_create(&th_b, NULL, consumer, 0); pthread_join(th_a, &retval); pthread_join(th_b, &retval); return 0; }
我们分别看看生产者和消费者的逻辑
void * producer(void * data) { int n; for (n = 0; n < 10000; n++) { printf("%d --->\n", n); put(&buffer, n); } put(&buffer, OVER); return NULL; } void * consumer(void * data) { int d; while (1) { d = get(&buffer); if (d == OVER) break; printf("---> %d\n", d); } return NULL; }
我们看到生产者和消费者的逻辑很简单,就是一个往buffer里写数据,一个从buffer里读数据。问题在于如果没有空间可写或者没有数据可读怎么办?我们看get和put函数的逻辑。
void put(struct prodcons * b, int data) { // 操作共享数据需要加锁 pthread_mutex_lock(&b->lock); /* 写指针+1等于读指针,说明没有空闲可写了,等待空闲空间 */ while ((b->writepos + 1) % BUFFER_SIZE == b->readpos) { pthread_cond_wait(&b->notfull, &b->lock); } // pthread_cond_wait中被唤醒后会重新获得互斥锁,所以这里直接操作就行 b->buffer[b->writepos] = data; b->writepos++; // 到尾巴了,修正位置 if (b->writepos >= BUFFER_SIZE) b->writepos = 0; /* 有数据可消费了,通知等待的消费者 */ pthread_cond_signal(&b->notempty); pthread_mutex_unlock(&b->lock); }
接着看看get的实现。
int get(struct prodcons * b) { int data; pthread_mutex_lock(&b->lock); /* 读写指针相等说明没有数据读了,等待数据 */ while (b->writepos == b->readpos) { pthread_cond_wait(&b->notempty, &b->lock); } data = b->buffer[b->readpos]; b->readpos++; if (b->readpos >= BUFFER_SIZE) b->readpos = 0; /* 消费了数据,说明有空闲空间了,唤醒生产者 */ pthread_cond_signal(&b->notfull); pthread_mutex_unlock(&b->lock); return data; }
以上就是线程间同步机制:条件变量的实现和原理。