#include <pthread.h> /* Create a new thread, starting with execution of START-ROUTINE getting passed ARG. Creation attributed come from ATTR. The new handle is stored in *NEWTHREAD. */ extern int pthread_create (pthread_t *__restrict __newthread, const pthread_attr_t *__restrict __attr, void *(*__start_routine) (void *), void *__restrict __arg) __THROWNL __nonnull ((1, 3));
_newthread是新线程的标识符,后续pthread*函数通过它来引用新线程。其类型pthread_t的定义如下:
#include <bits/pthreadtypes.h> /* Thread identifiers. The structure of the attribute type is not exposed on purpose. */ typedef unsigned long int pthread_t;
attr参数用于设置新线程的属性。给它传递NULL表示使用默认线程属性。start_routine 和arg参数分别指定新线程将运行的丽数及其参数。
pthread_create 成功时返回0,失败时返回错误码。一个用户可以打开的线程数量不能超过RLIMIT NPROC软资源限制。此外,系统上所有用户能创建的线程总数也不得超过/proc/sys/kermnel/threads-max
内核参数所定义的值。
线程一旦被创建好,内核就可以调度内核线程来执行start_routine函数数指针所指向的函数了。线程函数在结束时最好调用如下函数,以确保安全、干净地退出:
/* Terminate calling thread. The registered cleanup handlers are called via exception handling so we cannot mark this function with __THROW.*/ extern void pthread_exit (void *__retval) __attribute__ ((__noreturn__));
pthread_exit 函数通过retval参数向线程的回收者传递其退出信息。它执行完之后不会返回到调用者,而且永远不会失败。
一个进程中的所有线程都可以调用pthread_join 函数来回收其他线程( 前提是目标线程是可回收的,见后文),即等待其他线程结束,这类似于回收进程的wait和waitpid系统调用。pthread_join 的定义如下:
/* Make calling thread wait for termination of the thread TH. The exit status of the thread is stored in *THREAD_RETURN, if THREAD_RETURN is not NULL. This function is a cancellation point and therefore not marked with __THROW. */ extern int pthread_join (pthread_t __th, void **__thread_return);
__th参数是目标线程的标识符,__thread_return 参数则是目标线程返回的退出信息。该函数会一直阻塞,直到被回收的线程结束为止。该函数成功时返回0,失败则返回错误码。可能的错误码如表所示。
有时候我们希望异常终止-一个线程,即取消线程,它是通过如下函数实现的:
/* Cancel THREAD immediately or at the next possibility. */ extern int pthread_cancel (pthread_t __th);
__th参数是目标线程的标识符。该函数成功时返回0,失败则返回错误码。不过,接收到取消请求的目标线程可以决定是否允许被取消以及如何取消,这分别由如下两个函数完成:
/* Set cancelability state of current thread to STATE, returning old state in *OLDSTATE if OLDSTATE is not NULL. */ extern int pthread_setcancelstate (int __state, int *__oldstate); /* Set cancellation state of current thread to TYPE, returning the old type in *OLDTYPE if OLDTYPE is not NULL. */ extern int pthread_setcanceltype (int __type, int *__oldtype);
这两个函数的第一个参数分别用于设置线程的取消状态(是否允许取消)和取消类型(如何取消),第二个参数则分别记录线程原来的取消状态和取消类型。state 参数有两个可选值:
type参数也有两个可选值:
pthread_setcancelstate和pthread_setcanceltype 成功时返回0,失败则返回错误码。
# define __SIZEOF_PTHREAD_ATTR_T 56 union pthread_attr_t { char __size[__SIZEOF_PTHREAD_ATTR_T]; long int __align; }; #ifndef __have_pthread_attr_t typedef union pthread_attr_t pthread_attr_t; # define __have_pthread_attr_t 1 #endif
extern int pthread_attr_init (pthread_attr_t *__attr) __THROW __nonnull ((1)); /* Destroy thread attribute *ATTR. */ extern int pthread_attr_destroy (pthread_attr_t *__attr) __THROW __nonnull ((1)); /* Get detach state attribute. */ extern int pthread_attr_getdetachstate (const pthread_attr_t *__attr, int *__detachstate) __THROW __nonnull ((1, 2)); /* Set detach state attribute. */ extern int pthread_attr_setdetachstate (pthread_attr_t *__attr, int __detachstate) __THROW __nonnull ((1)); /* Get the size of the guard area created for stack overflow protection. */ extern int pthread_attr_getguardsize (const pthread_attr_t *__attr, size_t *__guardsize) __THROW __nonnull ((1, 2)); /* Set the size of the guard area created for stack overflow protection. */ extern int pthread_attr_setguardsize (pthread_attr_t *__attr, size_t __guardsize) __THROW __nonnull ((1)); /* Return in *PARAM the scheduling parameters of *ATTR. */ extern int pthread_attr_getschedparam (const pthread_attr_t *__restrict __attr, struct sched_param *__restrict __param) __THROW __nonnull ((1, 2)); /* Set scheduling parameters (priority, etc) in *ATTR according to PARAM. */ extern int pthread_attr_setschedparam (pthread_attr_t *__restrict __attr, const struct sched_param *__restrict __param) __THROW __nonnull ((1, 2)); /* Return in *POLICY the scheduling policy of *ATTR. */ extern int pthread_attr_getschedpolicy (const pthread_attr_t *__restrict __attr, int *__restrict __policy) __THROW __nonnull ((1, 2)); /* Set scheduling policy in *ATTR according to POLICY. */ extern int pthread_attr_setschedpolicy (pthread_attr_t *__attr, int __policy) __THROW __nonnull ((1)); /* Return in *INHERIT the scheduling inheritance mode of *ATTR. */ extern int pthread_attr_getinheritsched (const pthread_attr_t *__restrict __attr, int *__restrict __inherit) __THROW __nonnull ((1, 2)); /* Set scheduling inheritance mode in *ATTR according to INHERIT. */ extern int pthread_attr_setinheritsched (pthread_attr_t *__attr, int __inherit) __THROW __nonnull ((1)); /* Return in *SCOPE the scheduling contention scope of *ATTR. */ extern int pthread_attr_getscope (const pthread_attr_t *__restrict __attr, int *__restrict __scope) __THROW __nonnull ((1, 2)); /* Set scheduling contention scope in *ATTR according to SCOPE. */ extern int pthread_attr_setscope (pthread_attr_t *__attr, int __scope) __THROW __nonnull ((1)); /* Return the previously set address for the stack. */ extern int pthread_attr_getstackaddr (const pthread_attr_t *__restrict __attr, void **__restrict __stackaddr) __THROW __nonnull ((1, 2)) __attribute_deprecated__; /* Set the starting address of the stack of the thread to be created. Depending on whether the stack grows up or down the value must either be higher or lower than all the address in the memory block. The minimal size of the block must be PTHREAD_STACK_MIN. */ extern int pthread_attr_setstackaddr (pthread_attr_t *__attr, void *__stackaddr) __THROW __nonnull ((1)) __attribute_deprecated__; /* Return the currently used minimal stack size. */ extern int pthread_attr_getstacksize (const pthread_attr_t *__restrict __attr, size_t *__restrict __stacksize) __THROW __nonnull ((1, 2)); /* Add information about the minimum stack size needed for the thread to be started. This size must never be less than PTHREAD_STACK_MIN and must also not exceed the system limits. */ extern int pthread_attr_setstacksize (pthread_attr_t *__attr, size_t __stacksize) __THROW __nonnull ((1));
#include <semaphore.h> /* Initialize semaphore object SEM to VALUE. If PSHARED then share it with other processes. */ extern int sem_init (sem_t *__sem, int __pshared, unsigned int __value) __THROW __nonnull ((1)); /* Free resources associated with semaphore object SEM. */ extern int sem_destroy (sem_t *__sem) __THROW __nonnull ((1)); /* Wait for SEM being posted. This function is a cancellation point and therefore not marked with __THROW. */ extern int sem_wait (sem_t *__sem) __nonnull ((1)); /* Test whether SEM is posted. */ extern int sem_trywait (sem_t *__sem) __THROWNL __nonnull ((1)); /* Post SEM. */ extern int sem_post (sem_t *__sem) __THROWNL __nonnull ((1));
这些函数的第一个参数sem指向被操作的信号量。
上面这些函数成功时返回0,失败则返回-1并设置errno。
互斥锁(也称互斥量)可以用于保护关键代码段,以确保其独占式的访问,这有点像一个二进制信号量。当进入关键代码段时,我们需要获得互斥锁并将其加锁,这等价于二进制信号量的P操作;当离开关键代码段时,我们需要对互斥锁解锁,以唤醒其他等待该互斥锁的线程,这等价于二进制信号量的V操作。
/* Initialize a mutex. */ extern int pthread_mutex_init (pthread_mutex_t *__mutex, const pthread_mutexattr_t *__mutexattr) __THROW __nonnull ((1)); /* Destroy a mutex. */ extern int pthread_mutex_destroy (pthread_mutex_t *__mutex) __THROW __nonnull ((1)); /* Try locking a mutex. */ extern int pthread_mutex_trylock (pthread_mutex_t *__mutex) __THROWNL __nonnull ((1)); /* Lock a mutex. */ extern int pthread_mutex_lock (pthread_mutex_t *__mutex) __THROWNL __nonnull ((1)); //上面这些函数成功时返回0,失败则返回错误码。
这些函数的第一个参数mutex指向要操作的目标互斥锁,互斥锁的类型是pthread_mutex_t结构体。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
宏PTHREAD_MUTEX_INITIALIZER
实际上只是把互斥锁的各个字段都初始化为0。/* Data structures for mutex handling. The structure of the attribute type is not exposed on purpose. */ typedef union { char __size[__SIZEOF_PTHREAD_MUTEXATTR_T]; int __align; } pthread_mutexattr_t; /* Initialize mutex attribute object ATTR with default attributes (kind is PTHREAD_MUTEX_TIMED_NP). */ extern int pthread_mutexattr_init (pthread_mutexattr_t *__attr) __THROW __nonnull ((1)); /* Destroy mutex attribute object ATTR. */ extern int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr) __THROW __nonnull ((1)); /* Get the process-shared flag of the mutex attribute ATTR. */ extern int pthread_mutexattr_getpshared (const pthread_mutexattr_t * __restrict __attr, int *__restrict __pshared) __THROW __nonnull ((1, 2)); /* Set the process-shared flag of the mutex attribute ATTR. */ extern int pthread_mutexattr_setpshared (pthread_mutexattr_t *__attr, int __pshared) __THROW __nonnull ((1)); #if defined __USE_UNIX98 || defined __USE_XOPEN2K8 /* Return in *KIND the mutex kind attribute in *ATTR. */ extern int pthread_mutexattr_gettype (const pthread_mutexattr_t *__restrict __attr, int *__restrict __kind) __THROW __nonnull ((1, 2)); /* Set the mutex kind attribute in *ATTR to KIND (either PTHREAD_MUTEX_NORMAL, PTHREAD_MUTEX_RECURSIVE, PTHREAD_MUTEX_ERRORCHECK, or PTHREAD_MUTEX_DEFAULT). */ extern int pthread_mutexattr_settype (pthread_mutexattr_t *__attr, int __kind) __THROW __nonnull ((1)); #endif
互斥锁的两种常用属性: pshared 和type。互斥锁属性pshared指定是否允许跨进程共享互斥锁,其可选值有两个:
互斥锁属性type指定互斥锁的类型。Linux 支持如下4种类型的互斥锁:
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。条件变量的相关函数主要有如下5个:
typedef union { struct __pthread_cond_s __data; char __size[__SIZEOF_PTHREAD_COND_T]; __extension__ long long int __align; } pthread_cond_t; /* Initialize condition variable COND using attributes ATTR, or use the default values if later is NULL. */ extern int pthread_cond_init (pthread_cond_t *__restrict __cond, const pthread_condattr_t *__restrict __cond_attr) __THROW __nonnull ((1)); /* Destroy condition variable COND. */ extern int pthread_cond_destroy (pthread_cond_t *__cond) __THROW __nonnull ((1)); /* Wake up one thread waiting for condition variable COND. */ extern int pthread_cond_signal (pthread_cond_t *__cond) __THROWNL __nonnull ((1)); /* Wake up all threads waiting for condition variables COND. */ extern int pthread_cond_broadcast (pthread_cond_t *__cond) __THROWNL __nonnull ((1)); /* Wait for condition variable COND to be signaled or broadcast. MUTEX is assumed to be locked before. This function is a cancellation point and therefore not marked with __THROW. */ extern int pthread_cond_wait (pthread_cond_t *__restrict __cond, pthread_mutex_t *__restrict __mutex) __nonnull ((1, 2)); //上面这些函数成功时返回0,失败则返回错误码。
这些函数的第一个参数cond指向要操作的目标条件变量,条件变量的类型是pthread_cond_t结构体。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
宏PTHREAD_COND_INITIALIZER
实际上只是把条件变量的各个字段都初始化为0。如果一个丽数能被多个线程同时调用且不发生竞态条件,则我们称它是线程安全的(thread safe),或者说它是可重人函数。Linux 库函数只有一小部分是不可重人的,比如inet_ntoa 函数,以及getservbyname和getservbyport函数。这些库函数之所以不可重人,主要是因为其内部使用了静态变量。不过Linux对很多不可重人的库函数提供了对应的可重入版本,这些可重人版本的函数名是在原函数名尾部加上_r。 比如,函数localtime对应的可重入函数是localtime_ r。在多线程程序中调用库函数,一定要使用其可重人版本,否则可能导致预想不到的结果。
思考这样一个问题:如果一个多线程程序的某个线程调用了fork 函数,那么新创建的子进程是否将自动创建和父进程相同数量的线程呢?答案是“否”,正如我们期望的那样。子进程只拥有一个执行线程,它是调用fork的那个线程的完整复制。并且子进程将自动继承父进程中互斥锁(条件变量与之类似)的状态。也就是说,父进程中已经被加锁的互斥锁在子进程中也是被锁住的。这就引起了一个问题:子进程可能不清楚从父进程继承而来的互斥锁的具体状态(是加锁状态还是解锁状态)。这个互斥锁可能被加锁了,但并不是由调用fork函数的那个线程锁住的,而是由其他线程锁住的。如果是这种情况,则子进程若再次对该互斥锁执行加锁操作就会导致死锁。
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <wait.h> pthread_mutex_t mutex; void *another(void *arg) { printf("in child thread, lock the mutex\n"); pthread_mutex_lock(&mutex); sleep(5); pthread_mutex_unlock(&mutex); } void prepare() { pthread_mutex_lock(&mutex); } void infork() { pthread_mutex_unlock(&mutex); } int main() { pthread_mutex_init(&mutex, NULL); pthread_t id; pthread_create(&id, NULL, another, NULL); //pthread_atfork( prepare, infork, infork ); sleep(1); int pid = fork(); if (pid < 0) { pthread_join(id, NULL); pthread_mutex_destroy(&mutex); return 1; } else if (pid == 0) { printf("I anm in the child, want to get the lock\n"); /*子进程从父进程继承了互斥锁mutex的状态,该互斥锁处于锁住的状态,由于父进程中子线程pthread_mutex_lock引起的,因此,下面这句加锁操作会一直阻塞,尽管从逻辑上它是不应该阻塞的。*/ pthread_mutex_lock(&mutex); printf("I can not run to here, oop...\n"); pthread_mutex_unlock(&mutex); exit(0); } else { pthread_mutex_unlock(&mutex); wait(NULL); } pthread_join(id, NULL); pthread_mutex_destroy(&mutex); return 0; }
不过,pthread 提供了一个专门的函数pthread_ atfork, 以确保fork 调用后父进程和子进程都拥有一个清楚的锁状态。该函数的定义如下:
/* Install handlers to be called when a new process is created with FORK. The PREPARE handler is called in the parent process just before performing FORK. The PARENT handler is called in the parent process just after FORK. The CHILD handler is called in the child process. Each of the three handlers can be NULL, meaning that no handler needs to be called at that point. PTHREAD_ATFORK can be called several times, in which case the PREPARE handlers are called in LIFO order (last added with PTHREAD_ATFORK, first called before FORK), and the PARENT and CHILD handlers are called in FIFO (first added, first called). */ extern int pthread_atfork (void (*__prepare) (void), void (*__parent) (void), void (*__child) (void)) __THROW; //该函数成功时返回0,失败则返回错误码。
该函数将建立3个fork句柄来帮助我们清理互斥锁的状态。prepare句柄将在fork调用创建出子进程之前被执行。它可以用来锁住所有父进程中的互斥锁。parent 句柄则是fork调用创建出子进程之后,而fork返回之前,在父进程中被执行。它的作用是释放所有在prepare句柄中被锁住的互斥锁。child 句柄是fork返回之前,在子进程中被执行。和parent句柄一样,child 句柄也是用于释放所有在prepare句柄中被锁住的互斥锁。
/* Modify the signal mask for the calling thread. The arguments have the same meaning as for sigprocmask(2). */ extern int pthread_sigmask (int __how, const __sigset_t *__restrict __newmask, __sigset_t *__restrict __oldmask)__THROW; //成功返回0,失败返回错误码
由于进程中的所有线程共享该进程的信号,所以线程库将根据线程掩码决定把信号发送给哪个具体的线程。因此,如果我们在每个子线程中都单独设置信号掩码,就很容易导致逻辑错误。此外,所有线程共享信号处理函数。也就是说,当我们在一个线 程中设置了某个信号的信号处理函数后,它将覆盖其他线程为同-一个信号设置的信号处理函数。这两点都说明,我们应该定义一个专门的线程来处理所有的信号。这可以通过如下两个步骤来实现:
# ifdef __USE_POSIX199506 /* Select any of pending signals from SET or wait for any to arrive. This function is a cancellation point and therefore not marked with __THROW. */ extern int sigwait (const sigset_t *__restrict __set, int *__restrict __sig) __nonnull ((1, 2)); # endif /* Use POSIX 1995. */
set参数指定需要等待的信号的集合。我们可以简单地将其指定为在第1步中创建的信号掩码,表示在该线程中等待所有被屏蔽的信号。参数sig指向的整数用于存储该函数返回的信号值。sigwait 成功时返回0,失败则返回错误码。一旦sigwait正确返回,我们就可以对接收到的信号做处理了。很显然,如果我们使用了sigwait, 就不应该再为信号设置信号处理函数了。这是因为当程序接收到信号时,二者中只能有一个起作用。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> #include <errno.h> /* Simple error handling functions */ #define handle_error_en(en, msg) \ do \ { \ errno = en; \ perror(msg); \ exit(EXIT_FAILURE); \ } while (0) static void * sig_thread(void *arg) { printf("yyyyy, thread id is: %ld\n", pthread_self()); sigset_t aset; int s, sig; sigemptyset(&aset); sigaddset(&aset, SIGQUIT); sigaddset(&aset, SIGUSR1); //s = pthread_sigmask(SIG_BLOCK, &aset, NULL); sigset_t *set = (sigset_t *)arg; for (;;) { s = sigwait(set, &sig); if (s != 0) handle_error_en(s, "sigwait"); printf("Signal handling thread got signal %d\n", sig); } } static void handler(int arg) { printf("xxxxx, thread id is: %ld\n", pthread_self()); } int main(int argc, char *argv[]) { pthread_t thread; sigset_t set; int s; /* Block SIGINT; other threads created by main() will inherit * a copy of the signal mask. */ signal(SIGQUIT, handler); // if (s != 0) // handle_error_en(s, "pthread_sigmask"); s = pthread_create(&thread, NULL, &sig_thread, (void *)&set); sigemptyset(&set); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGUSR1); //s = pthread_sigmask(SIG_BLOCK, &set, NULL); if (s != 0) handle_error_en(s, "pthread_create"); printf("sub thread with id: %ld\n", thread); /* Main thread carries on to create other threads and/or do * other work */ pause(); /* Dummy pause so we can test program */ }
最后, pthread还提供了下面的方法,使得我们可以明确地将一个信号发送给指定的线程:
/* Send signal SIGNO to the given thread. */ extern int pthread_kill (pthread_t __threadid, int __signo) __THROW;
其中,thread 参数指定目标线程,sig 参数指定待发送的信号。如果sig为0,则pthread_kill不发送信号,但它任然会执行错误检查。我们可以利用这种方式来检测目标线程是否存在。pthread_kill 成功时返回0,失败则返回错误码。