线程,是进程内部的一个控制序列。即使不使用线程,进程内部也有一个执行线程。进程包含一个或多个线程。线程有自己的局部变量。同一个进程内的各个线程共享整个进程内的全局变量,即除了线程的局部变量外,其他资源都共享。
注意:单核处理器上,同一时刻只能运行单个线程。但是对于用户而言,感觉同时执行了多个线程(各线程在单核CPU上切换,在一段时间内,同时执行了多个线程)。
创建线程比创建进程开销要小。
- 多线程编程,需特别小心,很容易发生错误。
- 多线程调试很困难。
- 把一个任务划分为两个部分,用两个线程在单处理器上运行时,不一定更快。除非能确定这两个部分能同时执行、且运行于多处理器上。
/************************************************************************************************************************* * 函数:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); * 功能:创建一个线程。同时指定该线程的属性、执行函数、执行函数的参数。通过参数1返回该线程的标识符。 * 参数: * thread - 指向新线程的标识符。通过该指针返回所创建线程的标识符 * attr - 设置新线程的属性。一般取默认属性,即传值NULL * start_routine - 线程处理函数。该函数的返回值类型和参数类型都是 void* * arg - 线程处理函数 start_routine 的参数 * 返回: * 成功 - 返回0 * 失败 - 返回错误码。注意:大部分pthread_开头的函数成功返回0,失败返回错误码(而不是-1)。 * 说明: * 使用fork()创建进程后,进程立马会启动,和父进程同时执行fork()后的代码。 * 使用pthread_create()创建线程后,新线程会马上启动,执行对应的线程处理函数。 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:void pthread_exit(void *retval); * 功能:终止调用的线程 * 参数: * retval - 线程返回值 * 返回:无 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_join(pthread_t thread, void **retval); * 功能:以阻塞的方式等待指定线程的结束 * 参数: * thread - 线程标识符 * retval - 指向该线程函数的返回值 * 返回: * 成功 - 返回0 * 失败 - 返回错误码。注意:大部分pthread_开头的函数成功返回0,失败返回错误码(而不是-1)。 **************************************************************************************************************************/
编译代码时,定义宏 _REENTRANT;即:gcc -D_REENTRANT
功能:告诉编译器,编译时需要可重入功能。即使得在编译时,编译部分函数的可重入版本。
注意:在单线程程序中,整个程序都是顺序执行的,一个函数在同一时刻只能被一个函数调用,但在多线程中,由于并发性,一个函数可能同时被多个函数调用,此时这个函数就成了临界资源,很容易造成调用函数处理结果的相互影响,如果一个函数在多线程并发的环境中每次被调用产生的结果是不确定的,我们就说这个函数是"不可重入的"/"线程不安全"的。
- 在编译时,指定线程库;即:gcc -lpthread
- 功能:
使用系统默认的NPTL线程库,即在默认路径中寻找库文件 libpthread.so
默认路径为 /usr/lib 和 /usr/local/lib- 当系统默认使用的不是NPTL线程库时(系统较老,2003以前):
指定:gcc -L/usr/lib/ntpl -lpthread
补充:
-L 指定库文件所在目录
-l 指定库文件名称(-lpthread,指定库文件名为 libpthread.so)
一般采用如下形式即可:gcc mythread.c -o mythread -D_REENTRANT -lpthread
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> int m_global = 0; void *my_thread_handle(void *arg) { int val = *((int*)arg); printf("new thread begin, arg=%d\n", val); m_global += val; sleep(3); pthread_exit(&m_global); // 不会执行 printf("new thread end!\n"); } int main(int argc, char *argv) { pthread_t ptid; // 线程id int arg = 100; void *ptret; // 线程返回值 m_global = 1000; printf("global = %d\n", m_global); int ret = pthread_create(&ptid, 0, my_thread_handle, &arg); if(ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(1); } printf("Wait for the thread[%ld] to finish.!\n", ptid); ret = pthread_join(ptid, &ptret); if(ret != 0) { fprintf(stderr, "pthread_join() - failed!\n"); exit(2); } printf("The thread[%ld] has ended, return value is %d!\n", ptid, *((int*)ptret)); printf("global = %d\n", m_global); return 0; }
线程的互斥:指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排他性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
线程的同步:指在互斥的基础上(大多数情况),通过其他机制实现访问者对资源的有序访问。
同:协同、协助、互相配合
- 同一个进程内多个线程之间的信号量。即POSIX信号量,而不是System V 信号量(用于进程间同步)。
线程的信号量原理和进程间的信号量原理相同。都有P操作、V操作。
信号量的表示:sem_t 名称- PV操作
P - 荷兰文passeren,通过的意思
V - 荷兰文vrijgeven,释放的意思
信号量相关函数头文件: #include <semaphore.h>
编译时需要链接:-pthread
/************************************************************************************************************************* * 函数:int sem_init(sem_t *sem, int pshared, unsigned int value); * 功能:初始化指定的信号量 * 参数: * sem - 需要被初始化的信号量 * pshared - 指定共享方式是进程间共享还是进程内共享。传值: * 0 - 信号量在进程的线程之间共享,并且应该位于所有线程都可见的某个地址(例如,全局变量或堆上动态分配的变量)。 * 非0 - 信号量在进程之间共享,并且应该位于共享内存区域中(Linux不支持) * value - 信号量的初始值,大于等于0 * 返回: * 成功 - 返回0 * 失败 - 返回-1,并设置errno **************************************************************************************************************************/
/************************************************************************************************************************* * 函数: int sem_wait(sem_t *sem); * 功能:递减(锁定)sem指向的信号量。 * 参数: * sem - 需要被执行P操作的信号量 * 返回: * 成功 - 返回0 * 失败 - 信号量的值保持不变,返回-1,并设置errno **************************************************************************************************************************/
/************************************************************************************************************************* * 函数: int sem_post(sem_t *sem); * 功能:递增(解锁)sem指向的信号量。 * 参数: * sem - 需要被执行V操作的信号量 * 返回: * 成功 - 返回0 * 失败 - 信号量的值保持不变,返回-1,并设置errno **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int sem_destroy(sem_t *sem); * 功能:递增(解锁)sem指向的信号量。 * 参数: * sem - 待销毁的信号量 * 返回: * 成功 - 返回0 * 失败 - 返回-1,并设置errno **************************************************************************************************************************/
- test1.c
主线程输入从终端输入字符串(直到输入end)
子线程显示用户输入的字符串(直到输入end)
示例代码test.c
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #define BUFF_SIZE 80 // 线程共享全局变量,因此定义为全局变量 sem_t sem; char buffer[BUFF_SIZE]; static void *str_thread_handle(void *arg) { while(1) { if(sem_wait(&sem) != 0) { fprintf(stderr, "sem_wait() - failed!\n"); exit(1); } printf("string is[%ld]: %s\n", strlen(buffer), buffer); if(strncmp(buffer, "end", 3) == 0) { break; } } } int main() { pthread_t ptid; int ret = sem_init(&sem, 0, 0); if(ret != 0) { fprintf(stderr, "sem_init() - failed!\n"); exit(2); } ret = pthread_create(&ptid, NULL, str_thread_handle, NULL); if(ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(3); } while(1) { fgets(buffer, sizeof(buffer), stdin); // V if(sem_post(&sem) != 0) { fprintf(stderr, "sem_post() - failed!\n"); exit(4); } if(strncmp(buffer, "end", 3) == 0) { break; } } void *thread_ret; ret = pthread_join(ptid, &thread_ret); if(ret != 0) { fprintf(stderr, "pthread_join() - failed!\n"); exit(5); } ret = sem_destroy(&sem); if(ret != 0) { fprintf(stderr, "sem_destroy() - failed!\n"); exit(6); } return 0; }
- multi_thread.c
创建2个线程(共有主线程、线程1、线程2共3个线程)
主线程阻塞式等待用户输入字符串
主线程每接收到一个字符串之后, 线程1就马上对该字符串进行处理。
线程1的处理逻辑为:统计该字符串的个数,并记录当时的时间。
线程1把该字符串处理完后,线程2马上就把处理结果写入文件result.txt
直到用户输入exit.
示例代码 - multi_thread.c:
#include <time.h> #include <stdio.h> #include <fcntl.h> #include <errno.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #include <sys/stat.h> #include <sys/types.h> #include <semaphore.h> #define BUFF_SIZE 1024 /************************************************************ * TODO - 字符串输出到文件为空 *************************************************************/ typedef struct str_attribute { int letters; // 字母数量 time_t time; // 当前时间 }str_attribute_t; char gBuffer[BUFF_SIZE]; sem_t gInput, gProcess, gWrite; // 输入、处理字符串信号量、处理完写入文件信号量 str_attribute_t gStrAttr; // 此函数为统计字符串字母(a-z A-Z)长度和记录时间线程的线程处理函数 void* count_time_handle(void* arg) { while (1) { // 对gProcess执行P操作 if (sem_wait(&gProcess) != 0) { fprintf(stderr, "count_time_handle() - sem_wait() - gProcess failed!\n"); exit(13); } for (int i = 0; i < strlen(gBuffer); ++i) { if ((gBuffer[i] >= 'a' && gBuffer[i] <= 'z') || (gBuffer[i] >= 'A' && gBuffer[i] <= 'Z')) { ++gStrAttr.letters; } } gStrAttr.time = time(NULL); // 对gWrite执行V操作 if (sem_post(&gWrite) != 0) { fprintf(stderr, "count_time_handle() - sem_post() - gWrite failed!\n"); exit(14); } if (strncmp(gBuffer, "exit", 4) == 0) { break; } } } // 此函数将处理结果写入文件 void* write_file_handle(void* arg) { while (1) { // 对gWrite执行P操作 if (sem_wait(&gWrite) != 0) { fprintf(stderr, "write_file_handle() - sem_post() - gWrite failed!\n"); exit(15); } if (strncmp(gBuffer, "exit", 4) == 0) { break; } FILE *file = fopen("./result.txt", "a+"); if (!file) { fprintf(stderr, "write_file_handle() - fopen() - failed! reason: %s\n", strerror(errno)); exit(16); } char buffer[BUFF_SIZE + 128]; bzero(buffer, sizeof(buffer)); sprintf(buffer, "[%ld] - %d - %ld\n", strlen(gBuffer), gStrAttr.letters, gStrAttr.time); printf("buffer: %s", buffer); char* p = strchr(gBuffer, '\n'); if (p) { *p = '\0'; } int ret = fwrite(gBuffer, sizeof(char), strlen(gBuffer), file); if (ret < 0) { fprintf(stderr, "write_file_handle() - fwrite() - failed! reason: %s\n", strerror(errno)); exit(17); } ret = fwrite(buffer, sizeof(char), strlen(buffer), file); if (ret < 0) { fprintf(stderr, "write_file_handle() - fwrite() - failed! reason: %s\n", strerror(errno)); exit(17); } ret = fclose(file); if (ret == EOF) { fprintf(stderr, "write_file_handle() - close() - failed! reason: %s\n", strerror(errno)); exit(18); } if (sem_post(&gInput) != 0) { fprintf(stderr, "write_file_handle() - sem_post() - gWrite failed!\n"); exit(19); } } } int main(int argc, char* argv[]) { pthread_t process_id, write_id; int ret = 0; // 初始化字符串处理信号量 ret = sem_init(&gInput, 0, 1); if (ret != 0) { fprintf(stderr, "sem_init() - init gInput failed!\n"); exit(1); } ret = sem_init(&gProcess, 0, 0); if (ret != 0) { fprintf(stderr, "sem_init() - init gProcess failed!\n"); exit(2); } // 初始化写入文件信号量 ret = sem_init(&gWrite, 0, 0); if (ret != 0) { fprintf(stderr, "sem_init() - init gWrite failed!\n"); exit(3); } // 创建字符串处理线程 ret = pthread_create(&process_id, NULL, count_time_handle, NULL); if (ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(4); } // 创建写入文件线程 ret = pthread_create(&write_id, NULL, write_file_handle, NULL); if (ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(5); } // 主线程接收用户输入 while (1) { if (sem_wait(&gInput) != 0) { fprintf(stderr, "sem_post() - failed!\n"); exit(6); } bzero(gBuffer, sizeof(gBuffer)); printf("please input:"); fgets(gBuffer, sizeof(gBuffer), stdin); if (sem_post(&gProcess) != 0) { fprintf(stderr, "sem_post() - failed!\n"); exit(7); } if (strncmp(gBuffer, "exit", 4) == 0) { break; } } /* 等待子线程结束 */ ret = pthread_join(process_id, NULL); if (ret != 0) { fprintf(stderr, "thread_join() - process_id failed!\n"); exit(8); } ret = pthread_join(write_id, NULL); if (ret != 0) { fprintf(stderr, "thread_join() - write_id failed!\n"); exit(9); } /* 关闭信号量 */ ret = sem_destroy(&gInput); if (ret != 0) { fprintf(stderr, "sem_destroy() - gInput failed!\n"); exit(10); } ret = sem_destroy(&gProcess); if (ret != 0) { fprintf(stderr, "sem_destroy() - gProcess failed!\n"); exit(11); } ret = sem_destroy(&gWrite); if (ret != 0) { fprintf(stderr, "sem_destroy() - gWrite failed!\n"); exit(12); } return 0; }
- 输入预览
- 输出到文件预览
- 互斥量:即互斥锁,在效果上等同于初始值为1的信号量。它用来保证任一时刻,只有一个线程访问临界资源。
- 互斥量类型:pthread_mutex_t
- 头文件:#include <pthread.h>
- 如果在Linux Shell界面下输入:man pthread_mutex_init显示No manual entry for pthread_mutex_init则手动安装manpages
安装命令:sudo apt-get install manpages-posix-dev
/************************************************************************************************************************* * 函数:int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); * 功能:销毁并初始化互斥量 * 参数: * mutex - 需要初始化的互斥量 * attr - 互斥量的属性。一般取默认值(当一个线程已获取互斥量后,该线程再次获取该信号量,将导致死锁) * 返回: * 成功 - 返回0 * 失败 - 返回错误号 * 说明:亦可在定义时使用PTHREAD_MUTEX_INITIALIZER初始化。示例:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_mutex_lock(pthread_mutex_t *mutex); * 功能:对指定的互斥量上锁 * 参数: * mutex - 需要上锁的互斥量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_mutex_unlock(pthread_mutex_t *mutex); * 功能:对指定的互斥量解锁 * 参数: * mutex - 需要解锁的互斥量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_mutex_destroy(pthread_mutex_t *mutex); * 功能:销毁指定的互斥量 * 参数: * mutex - 需要销毁的互斥量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
- mutex.c
互斥量的简单使用
示例代码 - mutex.c:
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> pthread_mutex_t mutex_lock; int global_value = 10000; void *thread_handle(void *arg) { for(int i = 0; i < 10; ++i) { // 上锁 pthread_mutex_lock(&mutex_lock); if(global_value > 0) { // work //sleep(1); printf("The %d output of the child thread: %d\n", i + 1, global_value); } --global_value; // 解锁 pthread_mutex_unlock(&mutex_lock); sleep(1); } } int main(int argc, char *argv[]) { pthread_t ptid; void *thread_return; // 初始化互斥量 int ret = pthread_mutex_init(&mutex_lock, 0); if(ret != 0) { fprintf(stderr, "pthread_mutex_init() - failed!\n"); exit(1); } // 创建线程 ret = pthread_create(&ptid, NULL, thread_handle, NULL); if(ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(2); } for(int i = 0; i < 10; ++i) { // 上锁 pthread_mutex_lock(&mutex_lock); if(global_value > 0) { // work //sleep(1); printf("The %d output of the parent thread: %d\n", i + 1, global_value); } --global_value; // 解锁 pthread_mutex_unlock(&mutex_lock); sleep(1); } // 等待子进程结束 ret = pthread_join(ptid, &thread_return); if(ret != 0) { fprintf(stderr, "pthread_join() - failed!\n"); exit(3); } // 删除互斥量 ret = pthread_mutex_destroy(&mutex_lock); if(ret != 0) { fprintf(stderr, "pthread_mutex_destroy() - failed!\n"); exit(4); } return 0; }
- 有以下代码,分析存在的隐患,并使用互斥量解决改隐患(不能删除、修改代码,只能添加代码)
示例代码1 - HiddenDanger.c
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void* thread_function(void* arg); #define WORK_SIZE 1024 char work_area[WORK_SIZE]; int time_to_exit = 0; int main() { int res; pthread_t a_thread; void* thread_result; res = pthread_create(&a_thread, NULL, thread_function, NULL); if (res != 0) { perror("Thread creation failed"); exit(EXIT_FAILURE); } printf("Input some text. Enter 'end' to finish\n"); while (!time_to_exit) { fgets(work_area, WORK_SIZE, stdin); while (1) { if (work_area[0] != '\0') { sleep(1); } else { break; } } } printf("\nWaiting for thread to finish...\n"); res = pthread_join(a_thread, &thread_result); if (res != 0) { perror("Thread join failed"); exit(EXIT_FAILURE); } printf("Thread joined\n"); exit(EXIT_SUCCESS); } void* thread_function(void* arg) { sleep(1); while (strncmp("end", work_area, 3) != 0) { printf("You input %ld characters\n", strlen(work_area) - 1); work_area[0] = '\0'; sleep(1); while (work_area[0] == '\0') { sleep(1); } } time_to_exit = 1; work_area[0] = '\0'; pthread_exit(0); }
示例代码2 - SolveHiddenDanger.c
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void *thread_function(void *arg); pthread_mutex_t work_mutex; /* protects both work_area and time_to_exit */ #define WORK_SIZE 1024 char work_area[WORK_SIZE]; int time_to_exit = 0; int main() { int res; pthread_t a_thread; void *thread_result; res = pthread_mutex_init(&work_mutex, NULL); if (res != 0) { perror("Mutex initialization failed"); exit(EXIT_FAILURE); } res = pthread_create(&a_thread, NULL, thread_function, NULL); if (res != 0) { perror("Thread creation failed"); exit(EXIT_FAILURE); } pthread_mutex_lock(&work_mutex); printf("Input some text. Enter 'end' to finish\n"); while(!time_to_exit) { fgets(work_area, WORK_SIZE, stdin); pthread_mutex_unlock(&work_mutex); while(1) { pthread_mutex_lock(&work_mutex); if (work_area[0] != '\0') { pthread_mutex_unlock(&work_mutex); sleep(1); } else { break; } } } pthread_mutex_unlock(&work_mutex); printf("\nWaiting for thread to finish...\n"); res = pthread_join(a_thread, &thread_result); if (res != 0) { perror("Thread join failed"); exit(EXIT_FAILURE); } printf("Thread joined\n"); pthread_mutex_destroy(&work_mutex); exit(EXIT_SUCCESS); } void *thread_function(void *arg) { sleep(1); pthread_mutex_lock(&work_mutex); while(strncmp("end", work_area, 3) != 0) { printf("You input %d characters\n", strlen(work_area) -1); work_area[0] = '\0'; pthread_mutex_unlock(&work_mutex); sleep(1); pthread_mutex_lock(&work_mutex); while (work_area[0] == '\0' ) { pthread_mutex_unlock(&work_mutex); sleep(1); pthread_mutex_lock(&work_mutex); } } time_to_exit = 1; work_area[0] = '\0'; pthread_mutex_unlock(&work_mutex); pthread_exit(0); }
- 与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。
- 条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。
- 条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
- 头文件:#include <pthread.h>
- 如果在Linux Shell界面下输入:man pthread_mutex_init显示No manual entry for pthread_mutex_init则手动安装manpages
安装命令:sudo apt-get install manpages-posix-dev
/************************************************************************************************************************* * 函数:int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); * 功能:销毁并初始化条件变量 * 参数: * cond - 需要初始化的条件变量 * attr - 条件变量的属性。 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 * 说明:亦可在定义时使用PTHREAD_COND_INITIALIZER初始化。示例: pthread_cond_t cond = PTHREAD_COND_INITIALIZER; **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_signal(pthread_cond_t *cond); * 功能:通知条件变量,唤醒一个等待者 * 参数: * cond - 条件变量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_broadcast(pthread_cond_t *cond); * 功能:解锁当前在指定条件变量cond上阻塞的所有线程。 * 参数: * cond - 条件变量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数: int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); * 功能:等待条件变量或者超时 被唤醒 * 参数: * cond - 条件变量 * mutex - 互斥锁 * abstime - 等待时间(系统时间 + 等待时间) * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); * 功能:等待条件变量被唤醒 * 参数: * cond - 条件变量 * mutex - 互斥锁 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_destroy(pthread_cond_t *cond); * 功能:销毁指定的条件变量,实际上变为未初始化的条件变量 * 参数: * cond - 待销毁的条件变量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
- test.c
条件变量的基础使用
示例代码 - test.c
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> pthread_cond_t cond; pthread_mutex_t mutex; void* thread1_handle(void* arg) { while (1) { printf("thread 1 start running!\n"); // 上锁 pthread_mutex_lock(&mutex); printf("thread 1 mutex lock!\n"); // 等待条件变量 pthread_cond_wait(&cond, &mutex); printf("thread 1 applied the condition!\n"); // 解锁 pthread_mutex_unlock(&mutex); printf("thread 1 mutex unlock!\n"); sleep(1); } } void* thread2_handle(void* arg) { while (1) { printf("thread 2 start running!\n"); // 上锁 pthread_mutex_lock(&mutex); printf("thread 2 mutex lock!\n"); // 等待条件变量 pthread_cond_wait(&cond, &mutex); printf("thread 2 applied the condition!\n"); // 解锁 pthread_mutex_unlock(&mutex); printf("thread 2 mutex unlock!\n"); sleep(1); } } int main() { pthread_t ptid1, ptid2; // 初始化互斥锁 pthread_mutex_init(&mutex, NULL); // 初始化条件变量 pthread_cond_init(&cond, NULL); // 创建线程 pthread_create(&ptid1, NULL, thread1_handle, NULL); pthread_create(&ptid2, NULL, thread2_handle, NULL); while(1) { pthread_cond_signal(&cond); sleep(5); } return 0; }
- 并发编程:是指在同一台计算机上"同时处理多个任务"。并发同一实体上的多个事件。
- 当遇到以下情况被"阻塞"时,该使用什么解决方案?
- 忙于漫长的CPU密集型处理
- 读取文件,但文件尚未缓存,从硬盘中读取较为缓慢
- 不得不等待获取某个资源,如:硬件驱动、互斥锁、等待同步方式调用的数据库响应、网络上的请求和响应等。
- 使用多进程和多线程技术解决如上问题
缺陷:创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多。活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或"切换过度"而导致系统资源不足。- 解决方案:使用线程池技术
线程池:由一个任务队列和一组处理队列的线程组成。一旦工作进程需要处理某个可能"阻塞"的操作,不用自己操作,将其作为一个任务放入线程池的队列,接着会被某个空闲线程提取处理。
组件 | 说明 |
---|---|
任务 | 待处理的工作,通常由标识、上下文和处理函数组成 |
任务队列 | 按顺序保持待处理的任务序列,等待线程池中的线程处理 |
线程池 | 由多个已启动的一组线程组成 |
条件变量 | 一组同步机制,允许线程挂起,直到共享数据上的某些条件得到满足 |
互斥锁 | 保证在任一时刻,只有一个线程访问该对象 |
#ifndef _DEMO_THREAD_H_INCLUDED_ #define _DEMO_THREAD_H_INCLUDED_ #include <stdio.h> #include <stdint.h> #include <stdlib.h> #include <sys/types.h> #include <pthread.h> #include <errno.h> #include <string.h> typedef intptr_t int_t; typedef uintptr_t uint_t; #define OK 0 #define ERROR -1 int thread_mutex_create(pthread_mutex_t *mtx); int thread_mutex_destroy(pthread_mutex_t *mtx); int thread_mutex_lock(pthread_mutex_t *mtx); int thread_mutex_unlock(pthread_mutex_t *mtx); int thread_cond_create(pthread_cond_t *cond); int thread_cond_destroy(pthread_cond_t *cond); int thread_cond_signal(pthread_cond_t *cond); int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx); #endif /* _DEMO_THREAD_H_INCLUDED_ */
#include "thread.h" int thread_mutex_create(pthread_mutex_t *mtx) { int err; pthread_mutexattr_t attr; err = pthread_mutexattr_init(&attr); if (err != 0) { fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno)); return ERROR; } err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); if (err != 0) { fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno)); return ERROR; } err = pthread_mutex_init(mtx, &attr); if (err != 0) { fprintf(stderr,"pthread_mutex_init() failed, reason: %s\n",strerror(errno)); return ERROR; } err = pthread_mutexattr_destroy(&attr); if (err != 0) { fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno)); } return OK; } int thread_mutex_destroy(pthread_mutex_t *mtx) { int err; err = pthread_mutex_destroy(mtx); if (err != 0) { fprintf(stderr,"pthread_mutex_destroy() failed, reason: %s\n",strerror(errno)); return ERROR; } return OK; } int thread_mutex_lock(pthread_mutex_t *mtx) { int err; err = pthread_mutex_lock(mtx); if (err == 0) { return OK; } fprintf(stderr,"pthread_mutex_lock() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_mutex_unlock(pthread_mutex_t *mtx) { int err; err = pthread_mutex_unlock(mtx); #if 0 ngx_time_update(); #endif if (err == 0) { return OK; } fprintf(stderr,"pthread_mutex_unlock() failed, reason: %s\n",strerror(errno)); return ERROR; }
#include "thread.h" int thread_cond_create(pthread_cond_t *cond) { int err; err = pthread_cond_init(cond, NULL); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_init() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_cond_destroy(pthread_cond_t *cond) { int err; err = pthread_cond_destroy(cond); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_destroy() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_cond_signal(pthread_cond_t *cond) { int err; err = pthread_cond_signal(cond); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_signal() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx) { int err; err = pthread_cond_wait(cond, mtx); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_wait() failed, reason: %s\n",strerror(errno)); return ERROR; }
#ifndef _THREAD_POOL_H_INCLUDED_ #define _THREAD_POOL_H_INCLUDED_ #include "thread.h" #define DEFAULT_THREADS_NUM 4 #define DEFAULT_QUEUE_NUM 65535 typedef unsigned long atomic_uint_t; typedef struct thread_task_s thread_task_t; typedef struct thread_pool_s thread_pool_t; struct thread_task_s { thread_task_t *next; uint_t id; void *ctx; // 上下文 void (*handler)(void *data); // 指向执行任务的函数,data: 使用ctx传值 }; typedef struct { thread_task_t *first; thread_task_t **last; } thread_pool_queue_t; #define thread_pool_queue_init(q) \ (q)->first = NULL; \ (q)->last = &(q)->first struct thread_pool_s { pthread_mutex_t mtx; thread_pool_queue_t queue; int_t waiting; pthread_cond_t cond; char *name; uint_t threads; int_t max_queue; }; thread_task_t *thread_task_alloc(size_t size); int_t thread_task_post(thread_pool_t *tp, thread_task_t *task); thread_pool_t* thread_pool_init(); void thread_pool_destroy(thread_pool_t *tp); #endif /* _THREAD_POOL_H_INCLUDED_ */
#include "thread_pool.h" static void thread_pool_exit_handler(void *data); static void *thread_pool_cycle(void *data); static int_t thread_pool_init_default(thread_pool_t *tpp, char *name); static uint_t thread_pool_task_id; static int debug = 0; thread_pool_t* thread_pool_init() { int err; pthread_t tid; uint_t n; pthread_attr_t attr; thread_pool_t *tp=NULL; tp = calloc(1,sizeof(thread_pool_t)); if(tp == NULL){ fprintf(stderr, "thread_pool_init: calloc failed!\n"); } thread_pool_init_default(tp, NULL); thread_pool_queue_init(&tp->queue); if (thread_mutex_create(&tp->mtx) != OK) { free(tp); return NULL; } if (thread_cond_create(&tp->cond) != OK) { (void) thread_mutex_destroy(&tp->mtx); free(tp); return NULL; } err = pthread_attr_init(&attr); if (err) { fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno)); free(tp); return NULL; } err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (err) { fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno)); free(tp); return NULL; } for (n = 0; n < tp->threads; n++) { err = pthread_create(&tid, &attr, thread_pool_cycle, tp); if (err) { fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno)); free(tp); return NULL; } } (void) pthread_attr_destroy(&attr); return tp; } void thread_pool_destroy(thread_pool_t *tp) { uint_t n; thread_task_t task; volatile uint_t lock; memset(&task,'\0', sizeof(thread_task_t)); task.handler = thread_pool_exit_handler; task.ctx = (void *) &lock; for (n = 0; n < tp->threads; n++) { lock = 1; if (thread_task_post(tp, &task) != OK) { return; } while (lock) { sched_yield(); } //task.event.active = 0; } (void) thread_cond_destroy(&tp->cond); (void) thread_mutex_destroy(&tp->mtx); free(tp); } static void thread_pool_exit_handler(void *data) { uint_t *lock = data; *lock = 0; pthread_exit(0); } thread_task_t * thread_task_alloc(size_t size) { thread_task_t *task; task = calloc(1, sizeof(thread_task_t) + size); if (task == NULL) { return NULL; } task->ctx = task + 1; return task; } int_t thread_task_post(thread_pool_t *tp, thread_task_t *task) { if (thread_mutex_lock(&tp->mtx) != OK) { return ERROR; } if (tp->waiting >= tp->max_queue) { (void) thread_mutex_unlock(&tp->mtx); fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n", tp->name, tp->waiting); return ERROR; } //task->event.active = 1; task->id = thread_pool_task_id++; task->next = NULL; if (thread_cond_signal(&tp->cond) != OK) { (void) thread_mutex_unlock(&tp->mtx); return ERROR; } *tp->queue.last = task; tp->queue.last = &task->next; tp->waiting++; (void) thread_mutex_unlock(&tp->mtx); if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n", task->id, tp->name); return OK; } static void * thread_pool_cycle(void *data) { thread_pool_t *tp = data; int err; thread_task_t *task; if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name); for ( ;; ) { if (thread_mutex_lock(&tp->mtx) != OK) { return NULL; } tp->waiting--; while (tp->queue.first == NULL) { if (thread_cond_wait(&tp->cond, &tp->mtx) != OK) { (void) thread_mutex_unlock(&tp->mtx); return NULL; } } task = tp->queue.first; tp->queue.first = task->next; if (tp->queue.first == NULL) { tp->queue.last = &tp->queue.first; } if (thread_mutex_unlock(&tp->mtx) != OK) { return NULL; } if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n", task->id, tp->name); task->handler(task->ctx); if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name); task->next = NULL; //notify } } static int_t thread_pool_init_default(thread_pool_t *tpp, char *name) { if(tpp) { tpp->threads = DEFAULT_THREADS_NUM; tpp->max_queue = DEFAULT_QUEUE_NUM; tpp->name = strdup(name?name:"default"); if(debug)fprintf(stderr, "thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n", tpp->name, tpp->threads, tpp->max_queue); return OK; } return ERROR; }
#include <stdio.h> #include <unistd.h> #include <string.h> #include "thread_pool.h" typedef struct { int value; char* name; } ctx_t; void thread1_handle(void* arg) { printf("call thread1_handle!\n"); return; } void thread2_handle(void* arg) { printf("call thread2_handle!\n"); ctx_t* p = (ctx_t*)arg; printf("value: %d, name: %s\n", p->value, p->name ? p->name : "default"); } int main(int argc, char* argv[]) { const char* name = "Nginx Thread Pool"; thread_pool_t* pool = thread_pool_init(); // thread_test_alloc() - size:传参数大小 thread_task_t* task1 = thread_task_alloc(0); thread_task_t* task2 = thread_task_alloc(sizeof(ctx_t) + strlen(name) + 1); ctx_t* p = (ctx_t*)task2->ctx; p->name = (char*)(p + 1); strcpy(p->name, name); task1->handler = thread1_handle; task2->handler = thread2_handle; // 将任务添加到线程池队列 thread_task_post(pool, task1); thread_task_post(pool, task2); sleep(10); thread_pool_destroy(pool); return 0; }
#include "thread_pool.h" struct test{ int arg1; int arg2; }; void task_handler1(void* data){ static int index = 0; printf("Hello, this is 1th test.index=%d\r\n", index++); } void task_handler2(void* data){ static int index = 0; printf("Hello, this is 2th test.index=%d\r\n", index++); } void task_handler3(void* data){ static int index = 0; struct test *t = (struct test *) data; printf("Hello, this is 3th test.index=%d\r\n", index++); printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2); } int main(int argc, char **argv) { thread_pool_t* tp = NULL; int i = 0; tp = thread_pool_init(); //sleep(1); thread_task_t * test1 = thread_task_alloc(0); thread_task_t * test2 = thread_task_alloc(0); thread_task_t * test3 = thread_task_alloc(sizeof(struct test)); test1->handler = task_handler1; test2->handler = task_handler2; test3->handler = task_handler3; ((struct test*)test3->ctx)->arg1 = 666; ((struct test*)test3->ctx)->arg2 = 888; //for(i=0; i<10;i++){ thread_task_post(tp, test1); thread_task_post(tp, test2); thread_task_post(tp, test3); //} sleep(10); thread_pool_destroy(tp); return 0; }