- 多线程编程,需特别小心,很容易发生错误。
- 多线程调试很困难。
- 把一个任务划分为两个部分,用两个线程在单处理器上运行时,不一定更快。除非能确定这两个部分能同时执行、且运行于多处理器上。
/************************************************************************************************************************* * 函数:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); * 功能:创建一个线程。同时指定该线程的属性、执行函数、执行函数的参数。通过参数1返回该线程的标识符。 * 参数: * thread - 指向新线程的标识符。通过该指针返回所创建线程的标识符 * attr - 设置新线程的属性。一般取默认属性,即传值NULL * start_routine - 线程处理函数。该函数的返回值类型和参数类型都是 void* * arg - 线程处理函数 start_routine 的参数 * 返回: * 成功 - 返回0 * 失败 - 返回错误码。注意:大部分pthread_开头的函数成功返回0,失败返回错误码(而不是-1)。 * 说明: * 使用fork()创建进程后,进程立马会启动,和父进程同时执行fork()后的代码。 * 使用pthread_create()创建线程后,新线程会马上启动,执行对应的线程处理函数。 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:void pthread_exit(void *retval); * 功能:终止调用的线程 * 参数: * retval - 线程返回值 * 返回:无 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_join(pthread_t thread, void **retval); * 功能:以阻塞的方式等待指定线程的结束 * 参数: * thread - 线程标识符 * retval - 指向该线程函数的返回值 * 返回: * 成功 - 返回0 * 失败 - 返回错误码。注意:大部分pthread_开头的函数成功返回0,失败返回错误码(而不是-1)。 **************************************************************************************************************************/
编译代码时,定义宏 _REENTRANT;即:gcc -D_REENTRANT
- 在编译时,指定线程库;即:gcc -lpthread
- 功能:
使用系统默认的NPTL线程库,即在默认路径中寻找库文件 libpthread.so
默认路径为 /usr/lib 和 /usr/local/lib- 当系统默认使用的不是NPTL线程库时(系统较老,2003以前):
指定:gcc -L/usr/lib/ntpl -lpthread
-L 指定库文件所在目录
-l 指定库文件名称(-lpthread,指定库文件名为 libpthread.so)
一般采用如下形式即可:gcc mythread.c -o mythread -D_REENTRANT -lpthread
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> int m_global = 0; void *my_thread_handle(void *arg) { int val = *((int*)arg); printf("new thread begin, arg=%d\n", val); m_global += val; sleep(3); pthread_exit(&m_global); // 不会执行 printf("new thread end!\n"); } int main(int argc, char *argv) { pthread_t ptid; // 线程id int arg = 100; void *ptret; // 线程返回值 m_global = 1000; printf("global = %d\n", m_global); int ret = pthread_create(&ptid, 0, my_thread_handle, &arg); if(ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(1); } printf("Wait for the thread[%ld] to finish.!\n", ptid); ret = pthread_join(ptid, &ptret); if(ret != 0) { fprintf(stderr, "pthread_join() - failed!\n"); exit(2); } printf("The thread[%ld] has ended, return value is %d!\n", ptid, *((int*)ptret)); printf("global = %d\n", m_global); return 0; }
- 同一个进程内多个线程之间的信号量。即POSIX信号量,而不是System V 信号量(用于进程间同步)。
信号量的表示:sem_t 名称- PV操作
P - 荷兰文passeren,通过的意思
V - 荷兰文vrijgeven,释放的意思
信号量相关函数头文件: #include <semaphore.h>
/************************************************************************************************************************* * 函数:int sem_init(sem_t *sem, int pshared, unsigned int value); * 功能:初始化指定的信号量 * 参数: * sem - 需要被初始化的信号量 * pshared - 指定共享方式是进程间共享还是进程内共享。传值: * 0 - 信号量在进程的线程之间共享,并且应该位于所有线程都可见的某个地址(例如,全局变量或堆上动态分配的变量)。 * 非0 - 信号量在进程之间共享,并且应该位于共享内存区域中(Linux不支持) * value - 信号量的初始值,大于等于0 * 返回: * 成功 - 返回0 * 失败 - 返回-1,并设置errno **************************************************************************************************************************/
/************************************************************************************************************************* * 函数: int sem_wait(sem_t *sem); * 功能:递减(锁定)sem指向的信号量。 * 参数: * sem - 需要被执行P操作的信号量 * 返回: * 成功 - 返回0 * 失败 - 信号量的值保持不变,返回-1,并设置errno **************************************************************************************************************************/
/************************************************************************************************************************* * 函数: int sem_post(sem_t *sem); * 功能:递增(解锁)sem指向的信号量。 * 参数: * sem - 需要被执行V操作的信号量 * 返回: * 成功 - 返回0 * 失败 - 信号量的值保持不变,返回-1,并设置errno **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int sem_destroy(sem_t *sem); * 功能:递增(解锁)sem指向的信号量。 * 参数: * sem - 待销毁的信号量 * 返回: * 成功 - 返回0 * 失败 - 返回-1,并设置errno **************************************************************************************************************************/
- test1.c
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #define BUFF_SIZE 80 // 线程共享全局变量,因此定义为全局变量 sem_t sem; char buffer[BUFF_SIZE]; static void *str_thread_handle(void *arg) { while(1) { if(sem_wait(&sem) != 0) { fprintf(stderr, "sem_wait() - failed!\n"); exit(1); } printf("string is[%ld]: %s\n", strlen(buffer), buffer); if(strncmp(buffer, "end", 3) == 0) { break; } } } int main() { pthread_t ptid; int ret = sem_init(&sem, 0, 0); if(ret != 0) { fprintf(stderr, "sem_init() - failed!\n"); exit(2); } ret = pthread_create(&ptid, NULL, str_thread_handle, NULL); if(ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(3); } while(1) { fgets(buffer, sizeof(buffer), stdin); // V if(sem_post(&sem) != 0) { fprintf(stderr, "sem_post() - failed!\n"); exit(4); } if(strncmp(buffer, "end", 3) == 0) { break; } } void *thread_ret; ret = pthread_join(ptid, &thread_ret); if(ret != 0) { fprintf(stderr, "pthread_join() - failed!\n"); exit(5); } ret = sem_destroy(&sem); if(ret != 0) { fprintf(stderr, "sem_destroy() - failed!\n"); exit(6); } return 0; }
- multi_thread.c
主线程每接收到一个字符串之后, 线程1就马上对该字符串进行处理。
示例代码 - multi_thread.c:
#include <time.h> #include <stdio.h> #include <fcntl.h> #include <errno.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #include <sys/stat.h> #include <sys/types.h> #include <semaphore.h> #define BUFF_SIZE 1024 /************************************************************ * TODO - 字符串输出到文件为空 *************************************************************/ typedef struct str_attribute { int letters; // 字母数量 time_t time; // 当前时间 }str_attribute_t; char gBuffer[BUFF_SIZE]; sem_t gInput, gProcess, gWrite; // 输入、处理字符串信号量、处理完写入文件信号量 str_attribute_t gStrAttr; // 此函数为统计字符串字母(a-z A-Z)长度和记录时间线程的线程处理函数 void* count_time_handle(void* arg) { while (1) { // 对gProcess执行P操作 if (sem_wait(&gProcess) != 0) { fprintf(stderr, "count_time_handle() - sem_wait() - gProcess failed!\n"); exit(13); } for (int i = 0; i < strlen(gBuffer); ++i) { if ((gBuffer[i] >= 'a' && gBuffer[i] <= 'z') || (gBuffer[i] >= 'A' && gBuffer[i] <= 'Z')) { ++gStrAttr.letters; } } gStrAttr.time = time(NULL); // 对gWrite执行V操作 if (sem_post(&gWrite) != 0) { fprintf(stderr, "count_time_handle() - sem_post() - gWrite failed!\n"); exit(14); } if (strncmp(gBuffer, "exit", 4) == 0) { break; } } } // 此函数将处理结果写入文件 void* write_file_handle(void* arg) { while (1) { // 对gWrite执行P操作 if (sem_wait(&gWrite) != 0) { fprintf(stderr, "write_file_handle() - sem_post() - gWrite failed!\n"); exit(15); } if (strncmp(gBuffer, "exit", 4) == 0) { break; } FILE *file = fopen("./result.txt", "a+"); if (!file) { fprintf(stderr, "write_file_handle() - fopen() - failed! reason: %s\n", strerror(errno)); exit(16); } char buffer[BUFF_SIZE + 128]; bzero(buffer, sizeof(buffer)); sprintf(buffer, "[%ld] - %d - %ld\n", strlen(gBuffer), gStrAttr.letters, gStrAttr.time); printf("buffer: %s", buffer); char* p = strchr(gBuffer, '\n'); if (p) { *p = '\0'; } int ret = fwrite(gBuffer, sizeof(char), strlen(gBuffer), file); if (ret < 0) { fprintf(stderr, "write_file_handle() - fwrite() - failed! reason: %s\n", strerror(errno)); exit(17); } ret = fwrite(buffer, sizeof(char), strlen(buffer), file); if (ret < 0) { fprintf(stderr, "write_file_handle() - fwrite() - failed! reason: %s\n", strerror(errno)); exit(17); } ret = fclose(file); if (ret == EOF) { fprintf(stderr, "write_file_handle() - close() - failed! reason: %s\n", strerror(errno)); exit(18); } if (sem_post(&gInput) != 0) { fprintf(stderr, "write_file_handle() - sem_post() - gWrite failed!\n"); exit(19); } } } int main(int argc, char* argv[]) { pthread_t process_id, write_id; int ret = 0; // 初始化字符串处理信号量 ret = sem_init(&gInput, 0, 1); if (ret != 0) { fprintf(stderr, "sem_init() - init gInput failed!\n"); exit(1); } ret = sem_init(&gProcess, 0, 0); if (ret != 0) { fprintf(stderr, "sem_init() - init gProcess failed!\n"); exit(2); } // 初始化写入文件信号量 ret = sem_init(&gWrite, 0, 0); if (ret != 0) { fprintf(stderr, "sem_init() - init gWrite failed!\n"); exit(3); } // 创建字符串处理线程 ret = pthread_create(&process_id, NULL, count_time_handle, NULL); if (ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(4); } // 创建写入文件线程 ret = pthread_create(&write_id, NULL, write_file_handle, NULL); if (ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(5); } // 主线程接收用户输入 while (1) { if (sem_wait(&gInput) != 0) { fprintf(stderr, "sem_post() - failed!\n"); exit(6); } bzero(gBuffer, sizeof(gBuffer)); printf("please input:"); fgets(gBuffer, sizeof(gBuffer), stdin); if (sem_post(&gProcess) != 0) { fprintf(stderr, "sem_post() - failed!\n"); exit(7); } if (strncmp(gBuffer, "exit", 4) == 0) { break; } } /* 等待子线程结束 */ ret = pthread_join(process_id, NULL); if (ret != 0) { fprintf(stderr, "thread_join() - process_id failed!\n"); exit(8); } ret = pthread_join(write_id, NULL); if (ret != 0) { fprintf(stderr, "thread_join() - write_id failed!\n"); exit(9); } /* 关闭信号量 */ ret = sem_destroy(&gInput); if (ret != 0) { fprintf(stderr, "sem_destroy() - gInput failed!\n"); exit(10); } ret = sem_destroy(&gProcess); if (ret != 0) { fprintf(stderr, "sem_destroy() - gProcess failed!\n"); exit(11); } ret = sem_destroy(&gWrite); if (ret != 0) { fprintf(stderr, "sem_destroy() - gWrite failed!\n"); exit(12); } return 0; }
- 输入预览
- 输出到文件预览
- 互斥量:即互斥锁,在效果上等同于初始值为1的信号量。它用来保证任一时刻,只有一个线程访问临界资源。
- 互斥量类型:pthread_mutex_t
- 头文件:#include <pthread.h>
- 如果在Linux Shell界面下输入:man pthread_mutex_init显示No manual entry for pthread_mutex_init则手动安装manpages
安装命令:sudo apt-get install manpages-posix-dev
/************************************************************************************************************************* * 函数:int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); * 功能:销毁并初始化互斥量 * 参数: * mutex - 需要初始化的互斥量 * attr - 互斥量的属性。一般取默认值(当一个线程已获取互斥量后,该线程再次获取该信号量,将导致死锁) * 返回: * 成功 - 返回0 * 失败 - 返回错误号 * 说明:亦可在定义时使用PTHREAD_MUTEX_INITIALIZER初始化。示例:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_mutex_lock(pthread_mutex_t *mutex); * 功能:对指定的互斥量上锁 * 参数: * mutex - 需要上锁的互斥量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_mutex_unlock(pthread_mutex_t *mutex); * 功能:对指定的互斥量解锁 * 参数: * mutex - 需要解锁的互斥量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_mutex_destroy(pthread_mutex_t *mutex); * 功能:销毁指定的互斥量 * 参数: * mutex - 需要销毁的互斥量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
- mutex.c
示例代码 - mutex.c:
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> pthread_mutex_t mutex_lock; int global_value = 10000; void *thread_handle(void *arg) { for(int i = 0; i < 10; ++i) { // 上锁 pthread_mutex_lock(&mutex_lock); if(global_value > 0) { // work //sleep(1); printf("The %d output of the child thread: %d\n", i + 1, global_value); } --global_value; // 解锁 pthread_mutex_unlock(&mutex_lock); sleep(1); } } int main(int argc, char *argv[]) { pthread_t ptid; void *thread_return; // 初始化互斥量 int ret = pthread_mutex_init(&mutex_lock, 0); if(ret != 0) { fprintf(stderr, "pthread_mutex_init() - failed!\n"); exit(1); } // 创建线程 ret = pthread_create(&ptid, NULL, thread_handle, NULL); if(ret != 0) { fprintf(stderr, "pthread_create() - failed!\n"); exit(2); } for(int i = 0; i < 10; ++i) { // 上锁 pthread_mutex_lock(&mutex_lock); if(global_value > 0) { // work //sleep(1); printf("The %d output of the parent thread: %d\n", i + 1, global_value); } --global_value; // 解锁 pthread_mutex_unlock(&mutex_lock); sleep(1); } // 等待子进程结束 ret = pthread_join(ptid, &thread_return); if(ret != 0) { fprintf(stderr, "pthread_join() - failed!\n"); exit(3); } // 删除互斥量 ret = pthread_mutex_destroy(&mutex_lock); if(ret != 0) { fprintf(stderr, "pthread_mutex_destroy() - failed!\n"); exit(4); } return 0; }
- 有以下代码,分析存在的隐患,并使用互斥量解决改隐患(不能删除、修改代码,只能添加代码)
示例代码1 - HiddenDanger.c
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void* thread_function(void* arg); #define WORK_SIZE 1024 char work_area[WORK_SIZE]; int time_to_exit = 0; int main() { int res; pthread_t a_thread; void* thread_result; res = pthread_create(&a_thread, NULL, thread_function, NULL); if (res != 0) { perror("Thread creation failed"); exit(EXIT_FAILURE); } printf("Input some text. Enter 'end' to finish\n"); while (!time_to_exit) { fgets(work_area, WORK_SIZE, stdin); while (1) { if (work_area[0] != '\0') { sleep(1); } else { break; } } } printf("\nWaiting for thread to finish...\n"); res = pthread_join(a_thread, &thread_result); if (res != 0) { perror("Thread join failed"); exit(EXIT_FAILURE); } printf("Thread joined\n"); exit(EXIT_SUCCESS); } void* thread_function(void* arg) { sleep(1); while (strncmp("end", work_area, 3) != 0) { printf("You input %ld characters\n", strlen(work_area) - 1); work_area[0] = '\0'; sleep(1); while (work_area[0] == '\0') { sleep(1); } } time_to_exit = 1; work_area[0] = '\0'; pthread_exit(0); }
示例代码2 - SolveHiddenDanger.c
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void *thread_function(void *arg); pthread_mutex_t work_mutex; /* protects both work_area and time_to_exit */ #define WORK_SIZE 1024 char work_area[WORK_SIZE]; int time_to_exit = 0; int main() { int res; pthread_t a_thread; void *thread_result; res = pthread_mutex_init(&work_mutex, NULL); if (res != 0) { perror("Mutex initialization failed"); exit(EXIT_FAILURE); } res = pthread_create(&a_thread, NULL, thread_function, NULL); if (res != 0) { perror("Thread creation failed"); exit(EXIT_FAILURE); } pthread_mutex_lock(&work_mutex); printf("Input some text. Enter 'end' to finish\n"); while(!time_to_exit) { fgets(work_area, WORK_SIZE, stdin); pthread_mutex_unlock(&work_mutex); while(1) { pthread_mutex_lock(&work_mutex); if (work_area[0] != '\0') { pthread_mutex_unlock(&work_mutex); sleep(1); } else { break; } } } pthread_mutex_unlock(&work_mutex); printf("\nWaiting for thread to finish...\n"); res = pthread_join(a_thread, &thread_result); if (res != 0) { perror("Thread join failed"); exit(EXIT_FAILURE); } printf("Thread joined\n"); pthread_mutex_destroy(&work_mutex); exit(EXIT_SUCCESS); } void *thread_function(void *arg) { sleep(1); pthread_mutex_lock(&work_mutex); while(strncmp("end", work_area, 3) != 0) { printf("You input %d characters\n", strlen(work_area) -1); work_area[0] = '\0'; pthread_mutex_unlock(&work_mutex); sleep(1); pthread_mutex_lock(&work_mutex); while (work_area[0] == '\0' ) { pthread_mutex_unlock(&work_mutex); sleep(1); pthread_mutex_lock(&work_mutex); } } time_to_exit = 1; work_area[0] = '\0'; pthread_mutex_unlock(&work_mutex); pthread_exit(0); }
- 与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。
- 条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。
- 条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
- 头文件:#include <pthread.h>
- 如果在Linux Shell界面下输入:man pthread_mutex_init显示No manual entry for pthread_mutex_init则手动安装manpages
安装命令:sudo apt-get install manpages-posix-dev
/************************************************************************************************************************* * 函数:int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); * 功能:销毁并初始化条件变量 * 参数: * cond - 需要初始化的条件变量 * attr - 条件变量的属性。 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 * 说明:亦可在定义时使用PTHREAD_COND_INITIALIZER初始化。示例: pthread_cond_t cond = PTHREAD_COND_INITIALIZER; **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_signal(pthread_cond_t *cond); * 功能:通知条件变量,唤醒一个等待者 * 参数: * cond - 条件变量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_broadcast(pthread_cond_t *cond); * 功能:解锁当前在指定条件变量cond上阻塞的所有线程。 * 参数: * cond - 条件变量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数: int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); * 功能:等待条件变量或者超时 被唤醒 * 参数: * cond - 条件变量 * mutex - 互斥锁 * abstime - 等待时间(系统时间 + 等待时间) * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); * 功能:等待条件变量被唤醒 * 参数: * cond - 条件变量 * mutex - 互斥锁 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
/************************************************************************************************************************* * 函数:int pthread_cond_destroy(pthread_cond_t *cond); * 功能:销毁指定的条件变量,实际上变为未初始化的条件变量 * 参数: * cond - 待销毁的条件变量 * 返回: * 成功 - 返回0 * 失败 - 返回错误号 **************************************************************************************************************************/
- test.c
示例代码 - test.c
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> pthread_cond_t cond; pthread_mutex_t mutex; void* thread1_handle(void* arg) { while (1) { printf("thread 1 start running!\n"); // 上锁 pthread_mutex_lock(&mutex); printf("thread 1 mutex lock!\n"); // 等待条件变量 pthread_cond_wait(&cond, &mutex); printf("thread 1 applied the condition!\n"); // 解锁 pthread_mutex_unlock(&mutex); printf("thread 1 mutex unlock!\n"); sleep(1); } } void* thread2_handle(void* arg) { while (1) { printf("thread 2 start running!\n"); // 上锁 pthread_mutex_lock(&mutex); printf("thread 2 mutex lock!\n"); // 等待条件变量 pthread_cond_wait(&cond, &mutex); printf("thread 2 applied the condition!\n"); // 解锁 pthread_mutex_unlock(&mutex); printf("thread 2 mutex unlock!\n"); sleep(1); } } int main() { pthread_t ptid1, ptid2; // 初始化互斥锁 pthread_mutex_init(&mutex, NULL); // 初始化条件变量 pthread_cond_init(&cond, NULL); // 创建线程 pthread_create(&ptid1, NULL, thread1_handle, NULL); pthread_create(&ptid2, NULL, thread2_handle, NULL); while(1) { pthread_cond_signal(&cond); sleep(5); } return 0; }
- 并发编程:是指在同一台计算机上"同时处理多个任务"。并发同一实体上的多个事件。
- 当遇到以下情况被"阻塞"时,该使用什么解决方案?
- 忙于漫长的CPU密集型处理
- 读取文件,但文件尚未缓存,从硬盘中读取较为缓慢
- 不得不等待获取某个资源,如:硬件驱动、互斥锁、等待同步方式调用的数据库响应、网络上的请求和响应等。
- 使用多进程和多线程技术解决如上问题
缺陷:创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多。活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或"切换过度"而导致系统资源不足。- 解决方案:使用线程池技术
组件 | 说明 |
任务 | 待处理的工作,通常由标识、上下文和处理函数组成 |
任务队列 | 按顺序保持待处理的任务序列,等待线程池中的线程处理 |
线程池 | 由多个已启动的一组线程组成 |
条件变量 | 一组同步机制,允许线程挂起,直到共享数据上的某些条件得到满足 |
互斥锁 | 保证在任一时刻,只有一个线程访问该对象 |
#ifndef _DEMO_THREAD_H_INCLUDED_ #define _DEMO_THREAD_H_INCLUDED_ #include <stdio.h> #include <stdint.h> #include <stdlib.h> #include <sys/types.h> #include <pthread.h> #include <errno.h> #include <string.h> typedef intptr_t int_t; typedef uintptr_t uint_t; #define OK 0 #define ERROR -1 int thread_mutex_create(pthread_mutex_t *mtx); int thread_mutex_destroy(pthread_mutex_t *mtx); int thread_mutex_lock(pthread_mutex_t *mtx); int thread_mutex_unlock(pthread_mutex_t *mtx); int thread_cond_create(pthread_cond_t *cond); int thread_cond_destroy(pthread_cond_t *cond); int thread_cond_signal(pthread_cond_t *cond); int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx); #endif /* _DEMO_THREAD_H_INCLUDED_ */
#include "thread.h" int thread_mutex_create(pthread_mutex_t *mtx) { int err; pthread_mutexattr_t attr; err = pthread_mutexattr_init(&attr); if (err != 0) { fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno)); return ERROR; } err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); if (err != 0) { fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno)); return ERROR; } err = pthread_mutex_init(mtx, &attr); if (err != 0) { fprintf(stderr,"pthread_mutex_init() failed, reason: %s\n",strerror(errno)); return ERROR; } err = pthread_mutexattr_destroy(&attr); if (err != 0) { fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno)); } return OK; } int thread_mutex_destroy(pthread_mutex_t *mtx) { int err; err = pthread_mutex_destroy(mtx); if (err != 0) { fprintf(stderr,"pthread_mutex_destroy() failed, reason: %s\n",strerror(errno)); return ERROR; } return OK; } int thread_mutex_lock(pthread_mutex_t *mtx) { int err; err = pthread_mutex_lock(mtx); if (err == 0) { return OK; } fprintf(stderr,"pthread_mutex_lock() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_mutex_unlock(pthread_mutex_t *mtx) { int err; err = pthread_mutex_unlock(mtx); #if 0 ngx_time_update(); #endif if (err == 0) { return OK; } fprintf(stderr,"pthread_mutex_unlock() failed, reason: %s\n",strerror(errno)); return ERROR; }
#include "thread.h" int thread_cond_create(pthread_cond_t *cond) { int err; err = pthread_cond_init(cond, NULL); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_init() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_cond_destroy(pthread_cond_t *cond) { int err; err = pthread_cond_destroy(cond); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_destroy() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_cond_signal(pthread_cond_t *cond) { int err; err = pthread_cond_signal(cond); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_signal() failed, reason: %s\n",strerror(errno)); return ERROR; } int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx) { int err; err = pthread_cond_wait(cond, mtx); if (err == 0) { return OK; } fprintf(stderr, "pthread_cond_wait() failed, reason: %s\n",strerror(errno)); return ERROR; }
#ifndef _THREAD_POOL_H_INCLUDED_ #define _THREAD_POOL_H_INCLUDED_ #include "thread.h" #define DEFAULT_THREADS_NUM 4 #define DEFAULT_QUEUE_NUM 65535 typedef unsigned long atomic_uint_t; typedef struct thread_task_s thread_task_t; typedef struct thread_pool_s thread_pool_t; struct thread_task_s { thread_task_t *next; uint_t id; void *ctx; // 上下文 void (*handler)(void *data); // 指向执行任务的函数,data: 使用ctx传值 }; typedef struct { thread_task_t *first; thread_task_t **last; } thread_pool_queue_t; #define thread_pool_queue_init(q) \ (q)->first = NULL; \ (q)->last = &(q)->first struct thread_pool_s { pthread_mutex_t mtx; thread_pool_queue_t queue; int_t waiting; pthread_cond_t cond; char *name; uint_t threads; int_t max_queue; }; thread_task_t *thread_task_alloc(size_t size); int_t thread_task_post(thread_pool_t *tp, thread_task_t *task); thread_pool_t* thread_pool_init(); void thread_pool_destroy(thread_pool_t *tp); #endif /* _THREAD_POOL_H_INCLUDED_ */
#include "thread_pool.h" static void thread_pool_exit_handler(void *data); static void *thread_pool_cycle(void *data); static int_t thread_pool_init_default(thread_pool_t *tpp, char *name); static uint_t thread_pool_task_id; static int debug = 0; thread_pool_t* thread_pool_init() { int err; pthread_t tid; uint_t n; pthread_attr_t attr; thread_pool_t *tp=NULL; tp = calloc(1,sizeof(thread_pool_t)); if(tp == NULL){ fprintf(stderr, "thread_pool_init: calloc failed!\n"); } thread_pool_init_default(tp, NULL); thread_pool_queue_init(&tp->queue); if (thread_mutex_create(&tp->mtx) != OK) { free(tp); return NULL; } if (thread_cond_create(&tp->cond) != OK) { (void) thread_mutex_destroy(&tp->mtx); free(tp); return NULL; } err = pthread_attr_init(&attr); if (err) { fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno)); free(tp); return NULL; } err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (err) { fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno)); free(tp); return NULL; } for (n = 0; n < tp->threads; n++) { err = pthread_create(&tid, &attr, thread_pool_cycle, tp); if (err) { fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno)); free(tp); return NULL; } } (void) pthread_attr_destroy(&attr); return tp; } void thread_pool_destroy(thread_pool_t *tp) { uint_t n; thread_task_t task; volatile uint_t lock; memset(&task,'\0', sizeof(thread_task_t)); task.handler = thread_pool_exit_handler; task.ctx = (void *) &lock; for (n = 0; n < tp->threads; n++) { lock = 1; if (thread_task_post(tp, &task) != OK) { return; } while (lock) { sched_yield(); } //task.event.active = 0; } (void) thread_cond_destroy(&tp->cond); (void) thread_mutex_destroy(&tp->mtx); free(tp); } static void thread_pool_exit_handler(void *data) { uint_t *lock = data; *lock = 0; pthread_exit(0); } thread_task_t * thread_task_alloc(size_t size) { thread_task_t *task; task = calloc(1, sizeof(thread_task_t) + size); if (task == NULL) { return NULL; } task->ctx = task + 1; return task; } int_t thread_task_post(thread_pool_t *tp, thread_task_t *task) { if (thread_mutex_lock(&tp->mtx) != OK) { return ERROR; } if (tp->waiting >= tp->max_queue) { (void) thread_mutex_unlock(&tp->mtx); fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n", tp->name, tp->waiting); return ERROR; } //task->event.active = 1; task->id = thread_pool_task_id++; task->next = NULL; if (thread_cond_signal(&tp->cond) != OK) { (void) thread_mutex_unlock(&tp->mtx); return ERROR; } *tp->queue.last = task; tp->queue.last = &task->next; tp->waiting++; (void) thread_mutex_unlock(&tp->mtx); if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n", task->id, tp->name); return OK; } static void * thread_pool_cycle(void *data) { thread_pool_t *tp = data; int err; thread_task_t *task; if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name); for ( ;; ) { if (thread_mutex_lock(&tp->mtx) != OK) { return NULL; } tp->waiting--; while (tp->queue.first == NULL) { if (thread_cond_wait(&tp->cond, &tp->mtx) != OK) { (void) thread_mutex_unlock(&tp->mtx); return NULL; } } task = tp->queue.first; tp->queue.first = task->next; if (tp->queue.first == NULL) { tp->queue.last = &tp->queue.first; } if (thread_mutex_unlock(&tp->mtx) != OK) { return NULL; } if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n", task->id, tp->name); task->handler(task->ctx); if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name); task->next = NULL; //notify } } static int_t thread_pool_init_default(thread_pool_t *tpp, char *name) { if(tpp) { tpp->threads = DEFAULT_THREADS_NUM; tpp->max_queue = DEFAULT_QUEUE_NUM; tpp->name = strdup(name?name:"default"); if(debug)fprintf(stderr, "thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n", tpp->name, tpp->threads, tpp->max_queue); return OK; } return ERROR; }
#include <stdio.h> #include <unistd.h> #include <string.h> #include "thread_pool.h" typedef struct { int value; char* name; } ctx_t; void thread1_handle(void* arg) { printf("call thread1_handle!\n"); return; } void thread2_handle(void* arg) { printf("call thread2_handle!\n"); ctx_t* p = (ctx_t*)arg; printf("value: %d, name: %s\n", p->value, p->name ? p->name : "default"); } int main(int argc, char* argv[]) { const char* name = "Nginx Thread Pool"; thread_pool_t* pool = thread_pool_init(); // thread_test_alloc() - size:传参数大小 thread_task_t* task1 = thread_task_alloc(0); thread_task_t* task2 = thread_task_alloc(sizeof(ctx_t) + strlen(name) + 1); ctx_t* p = (ctx_t*)task2->ctx; p->name = (char*)(p + 1); strcpy(p->name, name); task1->handler = thread1_handle; task2->handler = thread2_handle; // 将任务添加到线程池队列 thread_task_post(pool, task1); thread_task_post(pool, task2); sleep(10); thread_pool_destroy(pool); return 0; }
#include "thread_pool.h" struct test{ int arg1; int arg2; }; void task_handler1(void* data){ static int index = 0; printf("Hello, this is 1th test.index=%d\r\n", index++); } void task_handler2(void* data){ static int index = 0; printf("Hello, this is 2th test.index=%d\r\n", index++); } void task_handler3(void* data){ static int index = 0; struct test *t = (struct test *) data; printf("Hello, this is 3th test.index=%d\r\n", index++); printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2); } int main(int argc, char **argv) { thread_pool_t* tp = NULL; int i = 0; tp = thread_pool_init(); //sleep(1); thread_task_t * test1 = thread_task_alloc(0); thread_task_t * test2 = thread_task_alloc(0); thread_task_t * test3 = thread_task_alloc(sizeof(struct test)); test1->handler = task_handler1; test2->handler = task_handler2; test3->handler = task_handler3; ((struct test*)test3->ctx)->arg1 = 666; ((struct test*)test3->ctx)->arg2 = 888; //for(i=0; i<10;i++){ thread_task_post(tp, test1); thread_task_post(tp, test2); thread_task_post(tp, test3); //} sleep(10); thread_pool_destroy(tp); return 0; }