获取线程ID
#include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
pthread_attr_t意为线程属性,比如线程优先级等,pthread_create执行成功返回0
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> void* func(void* arg) { printf("thread: pid = %d, tid = %ld\n", getpid(), pthread_self()); return NULL; } int main(int argc, char** argv) { printf("main: pid = %d, tid = %ld\n", getpid(), pthread_self()); pthread_t tid; int ret = pthread_create(&tid, NULL, func, NULL); if(ret != 0) { perror("pthread_create fail"); exit(1); } sleep(1); return 0; }
man page时看到pthread_create编译时要加上-pthread,所以编译时用
$ gcc pthread.c -phread -o pthread
循环创建多个线程时可能出现问题,例子如下:
void* func(void* arg) { printf("thread %d: pid = %d, tid = %ld\n", *(int*)arg,getpid(), pthread_self()); return NULL; } int main(int argc, char** argv) { printf("main: pid = %d, tid = %ld\n", getpid(), pthread_self()); pthread_t tid; int i; for(i = 0; i < 5; i++) { int ret = pthread_create(&tid, NULL, func, (void*)&i); // 这步会出现问题! if(ret != 0) { perror("pthread_create fail"); exit(1); } } sleep(5); return 0; }
看到执行结果如下:
main: pid = 21374, tid = 139621297563456 thread 1: pid = 21374, tid = 139621289035520 thread 2: pid = 21374, tid = 139621280642816 thread 5: pid = 21374, tid = 139621255464704 thread 5: pid = 21374, tid = 139621263857408 thread 5: pid = 21374, tid = 139621272250112
传递的是i的地址,子线程在执行过程中i值发生变化,子线程拿到的值就不是想要的值了
退出当前线程
void pthread_exit(void *retval);
这里来看三个返回:
a. return : 返回到调用者
b. pthread_exit :退出线程,如果将main函数的return拿掉,添加pthread_exit,那么主线程退出,子线程依旧在执行
c. exit :退出进程
阻塞等待并回收线程
int pthread_join(pthread_t thread, void **retval);
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <string.h> struct Thrd_ret{ int n; char str[128]; }; void* func(void* arg) { struct Thrd_ret* ret; ret = malloc(sizeof(struct Thrd_ret)); ret->n = 100; strcpy(ret->str, "Hello Thread"); printf("Thread : pid = %d, tid = %ld\n", getpid(), pthread_self()); return (void*)ret; } int main(int argc, char** argv) { pthread_t tid; int ret = pthread_create(&tid, NULL, func, NULL); if(ret != 0) { fprintf(stderr, "pthread_create fail, error = %s\n", strerror(ret)); exit(1); } struct Thrd_ret* thrd_ret; pthread_join(tid, (void*)&thrd_ret); printf("ret->n = %d, ret->str = %s\n", thrd_ret->n, thrd_ret->str); pthread_exit(NULL); }
杀死线程,需要一个取消点,如果没有取消点,可以通过pthread_testcancel添加取消点
int pthread_cancel(pthread_t thread);
实现线程分离,线程结束时不会残留资源在内核当中,分离出去的线程执行结束之后自动回收,不需要pthread_join去回收
线程属性初始化
设置线程属性为分离属性
销毁线程属性占用资源
线程同步指一个线程发出某一功能调用时,在没有得到结果之前,调用不会回,同时其他线程为保证数据的一致性,不能调用该功能
每个线程对资源操作前都尝试先加锁,成功加锁才能才做,操作结束解锁;同一时刻只能有一个线程持有锁。
下面有一种情况,A线程对某个全局变量加锁访问;B线程访问前尝试加锁,拿不到锁B线程阻塞;C线程不加锁,直接访问该全局变量,依然可以访问但是会出现数据混乱。
互斥锁是操作系统提供的一把建议锁,建议程序中有多线程加锁访问共享资源,不具有强制性
以下函数成功返回0,错误返回errno
pthread_mutex_init
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex);
pthread_mutex_lock:加锁失败会阻塞,直到持有该互斥量的其他线程解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
pthread_mutex_trylock:尝试加锁,加锁失败返回错误号EBUSY,不阻塞
int pthread_mutex_trylock(pthread_mutex_t *mutex);
pthread_mutex_unlock:主动解锁,将阻塞在该锁上的线程全部唤醒。唤醒顺序取决于优先级、调度,默认先阻塞先唤醒
int pthread_mutex_unlock(pthread_mutex_t *mutex);
pthread_mutex_t类型
下面举个例子:
#include <stdio.h> #include <pthread.h> #include <unistd.h> #include <string.h> #include <stdlib.h>void* func(void* arg) { while(1) { printf("hello "); sleep(rand()%3); printf("world\n"); sleep(rand()%3); } return NULL; } int main(int argc, char** argv) { srand(time(NULL)); pthread_t thread; int ret = pthread_create(&thread, NULL, func, NULL); if(ret != 0) { fprintf(stderr, "pthread create fail, errno = %s\n", strerror(ret)); exit(1); } while(1) { printf("HELLO "); sleep(rand()%3); // sleep 1 sec, let child thread get CPU printf("WORLD\n"); sleep(rand()%3); // let child thread get CPU } pthread_join(thread, NULL); }
在没有加锁之前,输出都是混乱的,随机sleep用于模拟长时间操作,让CPU去执行其他线程工作。
HELLO hello WORLD HELLO WORLD HELLO WORLD world HELLO WORLD hello world hello HELLO world hello WORLD world HELLO WORLD
加锁之后就会得到预期的结果了
#include <stdio.h> #include <pthread.h> #include <unistd.h> #include <string.h> #include <stdlib.h> pthread_mutex_t mutex; void* func(void* arg) { while(1) { pthread_mutex_lock(&mutex); printf("hello "); sleep(rand()%3); printf("world\n"); pthread_mutex_unlock(&mutex); sleep(rand()%3); } return NULL; } int main(int argc, char** argv) { pthread_mutex_init(&mutex, NULL); srand(time(NULL)); pthread_t thread; int ret = pthread_create(&thread, NULL, func, NULL); if(ret != 0) { fprintf(stderr, "pthread create fail, errno = %s\n", strerror(ret)); exit(1); } while(1) { pthread_mutex_lock(&mutex); printf("HELLO "); sleep(rand()%3); // sleep 1 sec, let child thread get CPU printf("WORLD\n"); pthread_mutex_unlock(&mutex); sleep(rand()%3); // let child thread get CPU } pthread_mutex_destroy(&mutex); pthread_join(thread, NULL); }
注意事项:
尽量保证锁的粒度,越小越小(访问共享数据前加锁,访问结束立即解锁)
如果pthread_mutex_unlock放在第二个sleep之后,会发现有一直数据大写或者一直输出小写的情况
a. 对于同一个锁反复lock
b. 线程1拥有A锁,请求B锁;线程2拥有B锁,请求A锁
读写锁拥有更高的并行性:写独占,读共享;写锁优先级高;锁只有一把
pthread_rwlock_t
pthread_rwlock_init
pthread_rwlock_destory
pthread_rwlock_rdlock
pthread_rwlock_rwlock
写模式加锁时,解锁前,所有对该锁加锁的线程都会被阻塞
读模式加锁时,如果线程以读模式对其加锁会成功,写模式加锁会则色
镀膜室加锁时,既有试图以写模式加锁的线程,也有试图以读模式加锁的线程,那么读写锁会阻塞随后的读模式锁清秋,优先满足写锁
pthread_rwlock_unlock
条件变量并不是锁,但是要结合锁来使用
pthread_cond_init
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
或者使用静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
pthread_cond_destroy
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_wait:
a. 阻塞等待一个条件变量
b. 释放已经掌握的互斥锁(解锁互斥量) a b两步为原子操作
c. 当被唤醒,pthread_cond_wait函数返回时,解除阻塞并重新申请获取互斥锁
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
pthread_cond_timewait
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
pthread_cond_signal:唤醒阻塞在条件变量上的一个线程
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_broadcast:唤醒阻塞在条件变量上的所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);
生产者消费者模型:
生产者: 消费者:
1、创建锁 1、加锁
2、生成数据 2、pthread_cond_wait(&cond, &mutex)
3、加锁,将数据放置到公共区域 1)阻塞等待条件变量 2)解锁 3)等待 4)加锁
4、解锁 3、访问共享数据
5、通知阻塞在条件变量上的线程 4、解锁
6、循环生产后续数据
这里写了一个demo来模拟:
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <string.h> typedef struct MSG { int num; struct MSG *next; } MSG; MSG *head = NULL; pthread_mutex_t mutex; pthread_cond_t cond; void* func_product(void* arg) { while(1) { pthread_mutex_lock(&mutex); MSG* msg = (MSG*)malloc(sizeof(MSG)); if(head != NULL) { MSG** tmp = &head->next; while(*tmp != NULL) { tmp = &(*tmp)->next; } msg->num = random() % 1000; msg->next = NULL; *tmp = msg; printf("----productor create child node, num = %d\n", (*tmp)->num); } else{ head = msg; head->num = random() % 1000 ; head->next = NULL; printf("----productor create header node, num = %d\n", head->num); } pthread_mutex_unlock(&mutex); pthread_cond_signal(&cond); // 这个放在if判断中是不是更好 sleep(rand()%2); } return NULL; } void* func_comsume(void* arg) { while(1) { pthread_mutex_lock(&mutex); if(head == NULL) pthread_cond_wait(&cond, &mutex); while(head != NULL) { MSG* msg = head->next; printf("comsumer release node, num = %d\n", head->num); free(head); head = msg; } pthread_mutex_unlock(&mutex); sleep(5); } return NULL; } int main(int argc, char** argv) { pthread_t thrd_productor, thrd_comsumer; int ret; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); srand(time(NULL)); ret = pthread_create(&thrd_productor, NULL, func_product, NULL); if(ret != 0) { fprintf(stderr, "pthread_create fail, %s\n", strerror(ret)); exit(1); } ret = pthread_create(&thrd_comsumer, NULL, func_comsume, NULL); pthread_join(thrd_productor, NULL); pthread_join(thrd_comsumer, NULL); return 0; }
按我的理解,这个模型在实际使用中生产者会一直生产,生产需要时间,但是不是所有步骤都要占用锁,所以消费者可以有机会拿到锁来消费;同样的消费者拿到锁也不会一直占着锁不让生产者生产,由于消费处理可能需要消耗一定时间,所以更好的是将生产的内容copy一份,然后就把锁释放掉,让生产者继续生产。
当有多个消费者时,生产者发出信号,一个消费者争抢到锁,其他消费者会阻塞在锁上;当争抢到锁的消费者释放锁时,其他消费者可以拿到锁,但是这时候条件变量不满足就会出现问题,这时候要再判断一下条件变量,把if改成while就可以
void* func_comsume(void* arg) { while(1) { pthread_mutex_lock(&mutex); while(head == NULL) pthread_cond_wait(&cond, &mutex); while(head != NULL) { MSG* msg = head->next; printf("comsumer release node, num = %d\n", head->num); free(head); head = msg; } pthread_mutex_unlock(&mutex); sleep(5); } return NULL; }
进化版的互斥锁,互斥锁导致并发性下降,信号量既能保证同步又能提高线程并发。可以用于线程和进程同步
以下为信号量相关方法,成功返回0,失败返回-1,并且设置errorno
sem_t
sem_init:
初始化信号量变量,第二个参数0表示用于线程同步,1表示进程同步,第三个参数表示指定最大同时访问的线程数
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_destroy
int sem_destroy(sem_t *sem);
sem_wait:相当于加锁操作,并不一定会阻塞,访问线程数上限之后才会阻塞
int sem_wait(sem_t *sem);
sem_trywait
int sem_trywait(sem_t *sem);
sem_timedwait
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); // 这里的abs_timeout是相对于1970.1.1的绝对时间,使用前要先获取当前时间 time_t cur = time(NULL); struct timespec t t.tv_sec = cur + 1; t.tv_nsec = t.tv_sec + 100;
sem_post:相当于解锁操作
int sem_post(sem_t *sem);
信号量同样可以实现生产者 - 消费者模型,但是和条件变量的实现方法有很大不同,譬如以下demo:
#include <stdio.h> #include <semaphore.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <unistd.h> #define NUM 5 int arr[NUM]; sem_t prod; sem_t coms; void* func_product(void* arg) { int i = 0; while(1) { sem_wait(&prod); arr[i] = rand()%1000 + 1; printf("--- product value %d\n", arr[i]); i = (i + 1) % NUM; sleep(rand()%1); sem_post(&coms); } return NULL; } void* func_comsume(void* arg) { int i = 0; while(1) { sem_wait(&coms); printf("tid = %ld, comsume value %d\n", pthread_self(), arr[i]); arr[i] = 0; i = (i+1)%NUM; sem_post(&prod); sleep(rand()%3); } return NULL; } int main(int argc, char** argv) { pthread_t thrd_productor, thrd_comsumer; memset(arr, 0, sizeof(arr)); sem_init(&prod, 0, NUM); sem_init(&coms, 0, 0); int ret; ret = pthread_create(&thrd_productor, NULL, func_product, NULL); ret = pthread_create(&thrd_comsumer, NULL, func_comsume, NULL); pthread_join(thrd_productor, NULL); pthread_join(thrd_comsumer, NULL); return 0; }
首先生产者每次wait并不一定会阻塞,等arr中的数据写满之后,才会阻塞等待;每次有数据写入到arr中,会post通知消费者线程,这时候消费者就可以消费了。
消费者线程wait等待,生产者post解除阻塞,消费者拿到数据消费,消费完成之后post通知生产者,消费结束可以再生产。
上面这个例子是两个线程同步,暂时未改成更多线程同步。如果在上面的例子直接多加几个消费者线程,会发现消费者读出0,这并不是预期的。