#include <sys/types.h> #include <unistd.h> /* Clone the calling process, creating an exact copy. Return -1 for errors, 0 to the new process, and the process ID of the new process to the old process. */ extern __pid_t fork (void) __THROWNL;
该函数的每次调用都返回两次,在父进程中返回的是子进程的PID,在子进程中则返回0。 该返回值是后续代码判断当前进程是父进程还是子进程的依据。fork调用失败时返回-1,并设置ermo。
fork函数复制当前进程,在内核进程表中创建一个新的进程表项。新的程表项有很多属性和原进程相同,比如堆指针、栈指针和标志寄存器的值。但也有许多属性被赋予了新的值,比如该进程的PPID被设置成原进程的PID,信号位图被清除(原进程设置的信号处理函数不再对新进程起作用)。
子进程的代码与父进程完全相同,同时它还会复制父进程的数据(堆数据、栈数据和静态数据)。数据的复制采用的是所谓的写时复制(copy on writte),即只有在任一进程(父进程或子进程)对数据执行了写操作时,复制才会发生(先是缺页中断,然后操作系统给子进程分配内存并复制父进程的数据)。即便如此,如果我们在程序中分配了大量内存,那么使用fork时也应当十分谨慎,尽量避免没必要的内存分配和数据复制。
此外,创建子进程后,父进程中打开的文件描述符默认在子进程中也是打开的,且文件描述符的引用计数加1。不仅如此,父进程的用户根目录、当前工作目录等变量的引用计数均会加1。
有时我们需要在子进程中执行其他程序,即替换当前进程映像,这就需要使用如下exec
系列函数之一:
#include <unistd.h> /* Replace the current process, executing PATH with arguments ARGV and environment ENVP. ARGV and ENVP are terminated by NULL pointers. */ extern int execve (const char *__path, char *const __argv[], char *const __envp[]) __THROW __nonnull ((1, 2)); /* Execute PATH with arguments ARGV and environment from `environ'. */ extern int execv (const char *__path, char *const __argv[]) __THROW __nonnull ((1, 2)); /* Execute PATH with all arguments after PATH until a NULL pointer, and the argument after that for environment. */ extern int execle (const char *__path, const char *__arg, ...) __THROW __nonnull ((1, 2)); /* Execute PATH with all arguments after PATH until a NULL pointer and environment from `environ'. */ extern int execl (const char *__path, const char *__arg, ...) __THROW __nonnull ((1, 2)); /* Execute FILE, searching in the `PATH' environment variable if it contains no slashes, with arguments ARGV and environment from `environ'. */ extern int execvp (const char *__file, char *const __argv[]) __THROW __nonnull ((1, 2)); /* Execute FILE, searching in the `PATH' environment variable if it contains no slashes, with all arguments after FILE until a NULL pointer and environment from `environ'. */ extern int execlp (const char *__file, const char *__arg, ...) __THROW __nonnull ((1, 2));
path参数指定可执行文件的完整路径,file 参数可以接受文件名,该文件的具体位置则在环境变量PATH中搜寻。arg 接受可变参数,argv 则接受参数数组,它们都会被传递给新程序(path或file指定的程序)的main兩数。envp参数用于设置新程序的环境变量。
如果未设置它,则新程序将使用由全局变量environ指定的环境变量。一般情况下,exec函数是不返回的,除非出错。它出错时返回-1,并设置没出错,则原程序中exec调用之后的代码都不会执行,因为此时原程序已经被exec的参数指定的程序完全替换(包括代码和数据)。
exec函数不会关闭原程序打开的文件描述符,除非该文件描述符被设置了类似SOCK_CLOEXEC的属性。
对于多进程程序而言,父进程一般需要跟踪子进程的退出状态。因此,当子进程结束运行时,内核不会立即释放该进程的进程表表项,以满足父进程后续对该子进程退出信息的查询(如果父进程还在运行)。在子进程结束运行之后,父进程读取其退出状态之前,我们称该子进程处于僵尸态。另外-一种使子进程进人僵尸态的情况是:父进程结束或者异常终止,而子进程继续运行。此时子进程的PPID将被操作系统设置为1,即init进程。init 进程接管了该子进程,并等待它结束。在父进程退出之后,子进程退出之前,该子进程处于僵尸态。由此可见,无论哪种情况,如果父进程没有正确地处理子进程的返回信息,子进程都将停留在僵尸态,并占据着内核资源。这是绝对不能容许的,毕竟内核资源有限。下面这对函数在父进程中调用,以等待子进程的结束,并获取子进程的返回信息,从而避免了僵尸进程的产生,或者使子进程的僵尸态立即结束:
/* Wait for a child to die. When one does, put its status in *STAT_LOC and return its process ID. For errors, return (pid_t) -1. This function is a cancellation point and therefore not marked with __THROW. */ extern __pid_t wait (int *__stat_loc); /* Wait for a child matching PID to die. If PID is greater than 0, match any process whose process ID is PID. If PID is (pid_t) -1, match any process. If PID is (pid_t) 0, match any process with the same process group as the current process. If PID is less than -1, match any process whose process group is the absolute value of PID. If the WNOHANG bit is set in OPTIONS, and that child is not already dead, return (pid_t) 0. If successful, return PID and store the dead child's status in STAT_LOC. Return (pid_t) -1 for errors. If the WUNTRACED bit is set in OPTIONS, return status for stopped children; otherwise don't. This function is a cancellation point and therefore not marked with __THROW. */ extern __pid_t waitpid (__pid_t __pid, int *__stat_loc, int __options);
wait函数将阻塞进程,直到该进程的某个子进程结束运行为止。它返回结束运行的子进程的PID,并将该子进程的退出状态信息存储于stat_loc 参数指向的内存中。sys/wait.h
头文件中定义了几个宏来帮助解释子进程的退出状态信息,如表所示。
wait函数的阻塞特性显然不是服务器程序期望的,而waitpid函数解决了这个问题。waitpid只等待由pid参数指定的子进程。如果pid取值为-1,那么它就和wait函数相同,即等待任意一个子进程结束。statloc参数的含义和wait函数的statloc参数相同。options参数可以控制waitpid函数的行为。该参数最常用的取值是WNOHANG。当options的取值是WNOHANG时,waitpid调用将是非阻塞的:如果pid指定的目标子进程还没有结束或意外终止,则waitpid立即返回0;如果目标子进程确实正常退出了,则waitpid返回该子进程的PID。waitpid 调用失败时返回-1并设置errno。
要在事件已经发生的情况下执行非阻塞调用才能提高程序的效率。对waitpid函数而言,我们最好在某个子进程退出之后再调用它。那么父进程从何得知某个子进程已经退出了呢?这正是SIGCHLD信号的用途。当一个进程结束时,它将给其父进程发送一个SIGCHLD信号。因此,我们可以在父进程中捕获SIGCHLD信号,并在信号处理函数中调用waitpid函数以"彻底结束"一个子进程。
static void handle_child(int sig) { pid_t pid; int stat; while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) { /*对结束的子进程善后处理*/ } }
管道能在父、子进程间传递数据,利用的是fork调用之后两个管道文件描述符( fd[0]和fd[1])都保持打开。一对这样的文件描述符只能保证父、子进程间一个方向的数据传输,父进程和子进程必须有一个关闭fd[0],另一个关闭fd[1]。比如,我们要使用管道实现从父进程向子进程写数据,就应该按照图所示来操作。
显然,如果要实现父、子进程之间的双向数据传输,就必须使用两个管道。socket 编程接口提供了一个创建全双工管道的系统调用: socketpair,以实现在父进程和日志服务子进程之间传递日志信息。
当多个进程同时访问系统上的某个资源的时候,比如同时写一个数据库的某条记录,或者同时修改某个文件,就需要考虑进程的同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问。通常,程序对共享资源的访问的代码只是很短的一段,但就是这一段代码引发了进程之间的竞态条件。我们称这段代码为关键代码段,或者临界区。对进程同步,也就是确保任一时刻只有一个进程能进人关键代码段。
要编写具有通用目的的代码,以确保关键代码段的独占式访问是非常困难的。有两个名为Dekker算法和Peterson算法的解决方案,它们试图从语言本身(不需要内核支持)解决并发问题。但它们依赖于忙等待,即进程要持续不断地等待某个内存位置状态的改变。这种方式下CPU利用率太低,显然是不可取的。
Dijkstra提出的信号量( Semaphore)概念是并发编程领域迈出的重要一步。信号量是一种特殊的变量,它只能取自然数值并且只支持两种操作:等待(wait) 和信号(signal)。 不过在Linux/UNIX中,“等待”和“信号”都已经具有特殊的含义,所以对信号量的这两种操作更常用的称呼是P、V操作。这两个字母来自于荷兰语单词passeren (传递,就好像进人临界区)和vrijgeven (释放,就好像退出临界区)。假设有信号量SV,则对它的P、V操作
信号量的取值可以是任何自然数。但最常用的、最简单的信号量是二进制信号量,它只能取0和1这两个值。本书仅讨论二进制信号量。使用二进制信号量同步两个进程,以确保关键代码段的独占式访问的一个典型例子如图所示。
在图中,当关键代码段可用时,二进制信号量SV的值为1,进程A和B都有机会进人关键代码段。如果此时进程A执行了P(SV)操作将SV减1,则进程B若再执行P(SV)操作就会被挂起。直到进程A离开关键代码段,并执行V(SV)操作将SV加1,关键代码段才重新变得可用。如果此时进程B因为等待SV而处于挂起状态,则它将被唤醒,并进入关键代码段。同样,这时进程A如果再执行P(SV)操作,则也只能被操作系统挂起以等待进程B退出关键代码段。
Linux信号量的API都定义在sys/sem.h头文件中,主要包含3个系统调用:semget,semop和semctl。它们都被设计为操作一组信号量,即信号量集,而不是单个信号量,因此这些接口看上去多少比我们期望的要复杂一点。
semget系统调用创建一个新的信号量集,或者获取-一个已经存在的信号量集。其定义如下:
#include <sys/sem.h> /* Get semaphore. */ extern int semget (key_t __key, int __nsems, int __semflg) __THROW;
key参数是一个键值,用来标识一个全局唯一的信号量集,就像文件名全局唯一地标识一个文件一样。要通过信号量通信的进程需要使用相同的键值来创建/获取该信号量。
num_sems参数指定要创建/获取的信号量集中信号量的数目。如果是创建信号量,则该值必须被指定;如果是获取已经存在的信号量,则可以把它设置为0。
sem_flags参数指定一组标志。它低端的9个比特是该信号量的权限,其格式和含义都与系统调用open的mode参数相同。此外,它还可以和IPC_CREAT标志做按位“或”运算以创建新的信号量集。此时即使信号量已经存在,semget也不会产生错误。我们还可以联合使用IPC_CREAT和IPC_EXCL标志来确保创建一组新的、唯一的信号量集。在这种情况下,如果信号量集已经存在,则semget返回错误并设置errno为EEXIST。这种创建信号量的行为与用O_CREAT和O_EXCL标志调用open来排他式地打开一个文件相似。
semget成功时返回一个正整数值, 它是信号量集的标识符; semget 失败时返回-1,并设置errno。如果semget用于创建信号量集,则与之关联的内核数据结构体semid_ds将被创建并初始化。semid_ds 结构体的定义如下:
/* Data structure used to pass permission information to IPC operations. It follows the kernel ipc64_perm size so the syscall can be made directly without temporary buffer copy. However, since glibc defines the MODE field as mode_t per POSIX definition (BZ#18231), it omits the __PAD1 field (since glibc does not export mode_t as 16-bit for any architecture). */ struct ipc_perm { __key_t __key; /* Key. */ __uid_t uid; /* Owner's user ID. */ __gid_t gid; /* Owner's group ID. */ __uid_t cuid; /* Creator's user ID. */ __gid_t cgid; /* Creator's group ID. */ __mode_t mode; /* Read/write permission. */ unsigned short int __seq; /* Sequence number. */ unsigned short int __pad2; __syscall_ulong_t __glibc_reserved1; __syscall_ulong_t __glibc_reserved2; };
/* Data structure describing a set of semaphores. */ struct semid_ds { struct ipc_perm sem_perm; /* operation permission struct */ __SEM_PAD_TIME (sem_otime, 1); /* last semop() time */ __SEM_PAD_TIME (sem_ctime, 2); /* last time changed by semctl() */ __syscall_ulong_t sem_nsems; /* number of semaphores in set */ __syscall_ulong_t __glibc_reserved3; __syscall_ulong_t __glibc_reserved4; };
用于改变信号量的值,即执行P,V操作。对信号量的操作实际上就是对这些内核变量的操作。semop的定义如下:
/* Operate on semaphore. */ extern int semop (int __semid, struct sembuf *__sops, size_t __nsops) __THROW;
sem_id 参数是由semget调用返回的信号量集标识符,用以指定被操作的目标信号量集。sem_ops 参数指向一个sembuf结构体类型的数组,sembuf 结构体的定义如下:
struct sembuf { unsigned short int sem_num; /* semaphore number */ short int sem_op; /* semaphore operation */ short int sem_flg; /* operation flag */ };
其中,sem_num成员是信号量集中信号量的编号,0表示信号量集中的第一个信 号量。sem_op成员指定操作类型,其可选值为正整数、0和负整数。每种类型的操作的行为又受到sem_flg成员的影响。sem_flg 的可选值是IPC_NOWAIT和SEM_UNDO。IPC_NOWAIT的含义是,无论信号量操作是否成功,semop调用都将立即返回,这类似于非阻塞I/O操作。SEM_UNDO的含义是,当进程退出时取消正在进行的semop操作。
semop系统调用的第3个参数num_sem_ops 指定要执行的操作个数,即sem_ops 数组中元素的个数。semop 对数组sem_ops 中的每个成员按照数组顺序依次执行操作,并且该过程是原子操作,以避免别的进程在同一时刻按照不同的顺序对该信号集中的信号量执行semop操作导致的竞态条件。
semop成功时返回0,失败则返回-1并设置ermo。失败的时候,sem_ _ops 数组中指定的所有操作都不被执行。
semctl系统调用允许调用者对信号量进行直接控制。其定义如下:
/* Semaphore control operation. */ extern int semctl (int __semid, int __semnum, int __cmd, ...) __THROW;
sem_id参数是由semget调用返回的信号量集标识符,用以指定被操作的信号量集。参数指定被操作的信号量在信号量集中的编号。command参数指定要执行的命令。有的命令需要调用者传递第4个参数。第4个参数的类型由用户自己定义,但sys/sem.h
头文件给出了它的推荐格式,具体如下:
/* The user should define a union like the following to use it for arguments for `semctl'. union semun { int val; <= value for SETVAL struct semid_ds *buf; <= buffer for IPC_STAT & IPC_SET unsigned short int *array; <= array for GETALL & SETALL struct seminfo *__buf; <= buffer for IPC_INFO }; Previous versions of this file used to define this union but this is incorrect. One can test the macro _SEM_SEMUN_UNDEFINED to see whether one must define the union or not. */
struct seminfo { int semmap; /* Linux内核没有使用*/ int semmni; /*系统最多可以拥有的信号量集数目*/ int semmns; /*系统最多可以拥有的信号量数目*/ int semmnu; /* Linux内核没有使用*/ int semmsl; /*一个信号量集最多允许包含的信号量数目*/ int semopm; /* semop一次最多能执行的sem_ op操作数目*/ int semume; /* Linux 内核没有使用*/ int semusz; /* sem undo结构体的大小*/ int semvmx; /*最大允许的信号量值*/ int semaem; /*最多允许的UNDO次数(带SEM _UNDO标志的semop操作的次数) */ };
semctl成功时的返回值取决于command参数,如表所示。semctl 失败时返回-1,并设置errmo。
semget的调用者可以给其key参数传递一个特殊的键值IPC_ PRIVATE (其值为0),这样无论该信号量是否已经存在,semget 都将创建一个新的信 号量。使用该键值创建的信号量并非像它的名字声称的那样是进程私有的。其他进程,尤其是子进程,也有方法来访问这个信号量。所以semget的man手册的BUGS部分上说,使用名字IPC_PRIVATE有些误导(历史原因),应该称为IPC_NEW比如下面的代码 就在父、子进程间使用一个 IPC_PRIVATE信号量来同步。
#include <sys/sem.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/wait.h> union semun { int val; struct semid_ds *buf; unsigned short int *array; struct seminfo *__buf; }; void pv(int sem_id, int op) { struct sembuf sem_b; sem_b.sem_num = 0; sem_b.sem_op = op; sem_b.sem_flg = SEM_UNDO; semop(sem_id, &sem_b, 1); } int main(int argc, char *argv[]) { int sem_id = semget(IPC_PRIVATE, 1, 0666); union semun sem_un; sem_un.val = 1; semctl(sem_id, 0, SETVAL, sem_un); pid_t id = fork(); if (id < 0) { return 1; } else if (id == 0) { printf("child try to get binary sem\n"); pv(sem_id, -1); printf("child get the sem and would release it after 5 seconds\n"); sleep(5); pv(sem_id, 1); exit(0); } else { printf("parent try to get binary sem\n"); pv(sem_id, -1); printf("parent get the sem and would release it after 5 seconds\n"); sleep(5); pv(sem_id, 1); } waitpid(id, NULL, 0); semctl(sem_id, 0, IPC_RMID, sem_un); return 0; }
共享内存是最高效的IPC机制,因为它不涉及进程之间的任何数据传输。这种高效率带来的问题是,我们必须用其他辅助手段来同步进程对共享内存的访问,否则会产生竞态条件。因此,共享内存通常和其他进程间通信方式一起使用。Linux共享内存的API都定义在sys/shm.h
头文件中,包括4个系统调用:shmget,shmat、 shmdt和shmctl。
shmget系统调用创建一段新的共享内存,或者获取段一已经存在的共享内存。其定义如下:
#include <sys/shm.h> /* Get shared memory segment. */ extern int shmget (key_t __key, size_t __size, int __shmflg) __THROW;
和semget系统调用一样,key 参数是一个键值,用来标识- -段全局唯一的共享内存。size参数指定共享内存的大小,单位是字节。如果是创建新的共享内存,则size值必须被指定。如果是获取已经存在的共享内存,则可以把size设置为0。
shmflg参数的使用和含义与semget系统调用的sem_fags 参数相同。不过shmget支持两个额外的标志一SHM_HUGETLB和SHM_NORESERVE。 它们的含义如下:
shmget成功时返回-一个正整数值,它是共享内存的标识符。shmget 失败时返回-1,并设置errmo。
如果shmget用于创建共享内存,则这段共享内存的所有字节都被初始化为0,与之关联的内核数据结构shmid_ds 将被创建并初始化。shmid_ds 结构体的定义如下:
/* Data structure describing a shared memory segment. */ struct shmid_ds { struct ipc_perm shm_perm; /* operation permission struct */ #if !__SHM_SEGSZ_AFTER_TIME size_t shm_segsz; /* size of segment in bytes */ #endif __SHM_PAD_TIME (shm_atime, 1); /* time of last shmat() */ __SHM_PAD_TIME (shm_dtime, 2); /* time of last shmdt() */ __SHM_PAD_TIME (shm_ctime, 3); /* time of last change by shmctl() */ #if __SHM_PAD_BETWEEN_TIME_AND_SEGSZ unsigned long int __glibc_reserved4; #endif #if __SHM_SEGSZ_AFTER_TIME size_t shm_segsz; /* size of segment in bytes */ #endif __pid_t shm_cpid; /* pid of creator */ __pid_t shm_lpid; /* pid of last shmop */ shmatt_t shm_nattch; /* number of current attaches */ __syscall_ulong_t __glibc_reserved5; __syscall_ulong_t __glibc_reserved6; };
共享内存被创建/获取之后,我们不能立即访问它,而是需要先将它关联到进程的地址空间中。使用完共享内存之后,我们也需要将它从进程地址空间中分离。这两项任务分别由如下两个系统调用实现:
/* Attach shared memory segment. */ extern void *shmat (int __shmid, const void *__shmaddr, int __shmflg) __THROW; /* Detach shared memory segment. */ extern int shmdt (const void *__shmaddr) __THROW;
其中,shm_id参数是由shmget调用返回的共享内存标识符。shm_addr 参数指定将共享内存关联到进程的哪块地址空间,最终的效果还受到shmfg参数的可选标志SHM_RND的影响:
[shm_addr-(shm_addr%SHMLBA)]
。SHMLBA的含义是“段低端边界地址倍数”(Segment Low Boundary Address Multiple),它必须是内存页面大小(PAGE_SIZE)的整数倍。现在的Linux内核中,它等于一个内存页大小。SHM_RND的含义是圆整(round),即将共享内存被关联的地址向下圆整到离shm_addr最近的SHMLBA的整数倍地址处。除了SHM_ RND标志外,shmflg 参数还支持如下标志:
shmat成功时返回共享内存被关联到的地址,失败则返回(void*)-1并设置ermo。shmat成功时,将修改内核数据结构shmid_ds的部分字段,如下:
shmdt函数将关联到shm_addr 处的共享内存从进程中分离。它成功时返回0,失败则返回-1并设置errno。shmdt 在成功调用时将修改内核数据结构shmid_ds 的部分字段,如下:
shmctl系统调用控制共享内存的某些属性。其定义如下:
/* Shared memory control operation. */ extern int shmctl (int __shmid, int __cmd, struct shmid_ds *__buf) __THROW;
其中,shm_id 参数是由shmget调用返回的共享内存标识符。command参数指定要执行的命令。shmctl 支持的所有命令如表所示。
shmctl成功时的返回值取决于command参数,如表13-3所示。shmetl 失败时返回-1,并设置errno。
Linux 提供了另外一种利用mmap在无关进程之间共享内存的方式。这种方式无须任
何文件的支持,但它需要先使用如下函数来创建或打开一个POSIX共享内存对象:
#include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> /* Open shared memory segment. */ extern int shm_open (const char *__name, int __oflag, mode_t __mode);
shm_open的使用方法与open系统调用完全相同。
name参数指定要创建/打开的共享内存对象。从可移植性的角度考虑,该参数应该使用“/somename"的格式:以“/”开始,后接多个字符,且这些字符都不是“/”;以“\0”结尾,长度不超过NAME_MAX (通常是255)。
ofag参数指定创建方式。它可以是下列标志中的一个或者多个的按位或:
shm_open调用成功时返回一个文件描述符。该文件描述符可用于后续的mmap调用,从而将共享内存关联到调用进程。shm_open失败时返回-1,并设置ermo。
和打开的文件最后需要关闭一样,由shm_open 创建的共享内存对象使用完之后也需要被删除。这个过程是通过如下函数实现的:
/* Remove shared memory segment. */ extern int shm_unlink (const char *__name);
该函数将name参数指定的共享内存对象标记为等待删除。当所有使用该共享内存对象的进程都使用ummap将它从进程中分离之后,系统将销毁这个共享内存对象所占据的资源。如果代码中使用了上述POSIX共享内存函数,则编译的时候需要指定链接选项-Irt。
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <signal.h> #include <sys/wait.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #define USER_LIMIT 5 #define BUFFER_SIZE 1024 #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define PROCESS_LIMIT 65536 struct client_data { sockaddr_in address; int connfd; pid_t pid; int pipefd[2]; }; static const char *shm_name = "/my_shm"; int sig_pipefd[2]; int epollfd; int listenfd; int shmfd; char *share_mem = 0; client_data *users = 0; int *sub_process = 0; int user_count = 0; bool stop_child = false; int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(sig_pipefd[1], (char *)&msg, 1, 0); errno = save_errno; } void addsig(int sig, void (*handler)(int), bool restart = true) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = handler; if (restart) { sa.sa_flags |= SA_RESTART; } sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void del_resource() { close(sig_pipefd[0]); close(sig_pipefd[1]); close(listenfd); close(epollfd); shm_unlink(shm_name); delete[] users; delete[] sub_process; } void child_term_handler(int sig) { stop_child = true; } int run_child(int idx, client_data *users, char *share_mem) { epoll_event events[MAX_EVENT_NUMBER]; int child_epollfd = epoll_create(5); assert(child_epollfd != -1); int connfd = users[idx].connfd; addfd(child_epollfd, connfd); int pipefd = users[idx].pipefd[1]; addfd(child_epollfd, pipefd); int ret; addsig(SIGTERM, child_term_handler, false); while (!stop_child) { int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if ((sockfd == connfd) && (events[i].events & EPOLLIN)) { memset(share_mem + idx * BUFFER_SIZE, '\0', BUFFER_SIZE); ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0); if (ret < 0) { if (errno != EAGAIN) { stop_child = true; } } else if (ret == 0) { stop_child = true; } else { send(pipefd, (char *)&idx, sizeof(idx), 0); } } else if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) { int client = 0; ret = recv(sockfd, (char *)&client, sizeof(client), 0); if (ret < 0) { if (errno != EAGAIN) { stop_child = true; } } else if (ret == 0) { stop_child = true; } else { send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); } } else { continue; } } } close(connfd); close(pipefd); close(child_epollfd); return 0; } int main(int argc, char *argv[]) { if (argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); user_count = 0; users = new client_data[USER_LIMIT + 1]; sub_process = new int[PROCESS_LIMIT]; for (int i = 0; i < PROCESS_LIMIT; ++i) { sub_process[i] = -1; } epoll_event events[MAX_EVENT_NUMBER]; epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd); assert(ret != -1); setnonblocking(sig_pipefd[1]); addfd(epollfd, sig_pipefd[0]); addsig(SIGCHLD, sig_handler); addsig(SIGTERM, sig_handler); addsig(SIGINT, sig_handler); addsig(SIGPIPE, SIG_IGN); bool stop_server = false; bool terminate = false; shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); assert(shmfd != -1); ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE); assert(ret != -1); share_mem = (char *)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); assert(share_mem != MAP_FAILED); close(shmfd); while (!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { printf("errno is: %d\n", errno); continue; } if (user_count >= USER_LIMIT) { const char *info = "too many users\n"; printf("%s", info); send(connfd, info, strlen(info), 0); close(connfd); continue; } users[user_count].address = client_address; users[user_count].connfd = connfd; ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); assert(ret != -1); pid_t pid = fork(); if (pid < 0) { close(connfd); continue; } else if (pid == 0) { close(epollfd); close(listenfd); close(users[user_count].pipefd[0]); close(sig_pipefd[0]); close(sig_pipefd[1]); run_child(user_count, users, share_mem); munmap((void *)share_mem, USER_LIMIT * BUFFER_SIZE); exit(0); } else { close(connfd); close(users[user_count].pipefd[1]); addfd(epollfd, users[user_count].pipefd[0]); users[user_count].pid = pid; sub_process[pid] = user_count; user_count++; } } else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { int sig; char signals[1024]; ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { continue; } else if (ret == 0) { continue; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGCHLD: { pid_t pid; int stat; while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) { int del_user = sub_process[pid]; sub_process[pid] = -1; if ((del_user < 0) || (del_user > USER_LIMIT)) { printf("the deleted user was not change\n"); continue; } epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); close(users[del_user].pipefd[0]); users[del_user] = users[--user_count]; sub_process[users[del_user].pid] = del_user; printf("child %d exit, now we have %d users\n", del_user, user_count); } if (terminate && user_count == 0) { stop_server = true; } break; } case SIGTERM: case SIGINT: { printf("kill all the clild now\n"); //addsig( SIGTERM, SIG_IGN ); //addsig( SIGINT, SIG_IGN ); if (user_count == 0) { stop_server = true; break; } for (int i = 0; i < user_count; ++i) { int pid = users[i].pid; kill(pid, SIGTERM); } terminate = true; break; } default: { break; } } } } } else if (events[i].events & EPOLLIN) { int child = 0; ret = recv(sockfd, (char *)&child, sizeof(child), 0); printf("read data from child accross pipe\n"); if (ret == -1) { continue; } else if (ret == 0) { continue; } else { for (int j = 0; j < user_count; ++j) { if (users[j].pipefd[0] != sockfd) { printf("send data to child accross pipe\n"); send(users[j].pipefd[0], (char *)&child, sizeof(child), 0); } } } } } } del_resource(); return 0; }
msgget系统调用创建一个消 息队列,或者获取-一个已有的消息队列。其定义如下:
#include <sys/msg.h> /* Get messages queue. */ extern int msgget (key_t __key, int __msgflg) __THROW;
和semget系统调用一样,key 参数是一个键值,用来标识一个全局唯一的消息队列。msgflg参数的使用和含义与semget系统调用的sem_flags 参数相同。msgget成功时返回一个正整数值,它是消息队列的标识符。msgget 失败时返回-1,并设置errno。
如果msgget用于创建消息队列,则与之关联的内核数据结构msqid_ds 将被创建并初始化。msqid_ds 结构体的定义如下:
/* Structure of record for one message inside the kernel. The type `struct msg' is opaque. */ struct msqid_ds { struct ipc_perm msg_perm; /* structure describing operation permission */ __MSQ_PAD_TIME (msg_stime, 1); /* time of last msgsnd command */ __MSQ_PAD_TIME (msg_rtime, 2); /* time of last msgrcv command */ __MSQ_PAD_TIME (msg_ctime, 3); /* time of last change */ __syscall_ulong_t __msg_cbytes; /* current number of bytes on queue */ msgqnum_t msg_qnum; /* number of messages currently on queue */ msglen_t msg_qbytes; /* max number of bytes allowed on queue */ __pid_t msg_lspid; /* pid of last msgsnd() */ __pid_t msg_lrpid; /* pid of last msgrcv() */ __syscall_ulong_t __glibc_reserved4; __syscall_ulong_t __glibc_reserved5; };
msgsnd系统调用把- -条消息添加到消息队列中。其定义如下:
/* Send message to message queue. This function is a cancellation point and therefore not marked with __THROW. */ extern int msgsnd (int __msqid, const void *__msgp, size_t __msgsz, int __msgflg);
msqid参数是由msgget调用返回的消息队列标识符。
msg_ ptr参数指向- -个准备发送的消息,消息必须被定义为如下类型:
/* Template for struct to be used as argument for `msgsnd' and `msgrcv'. */ struct msgbuf { __syscall_slong_t mtype; /* type of received/sent message */ char mtext[1]; /* text of the message */ };
其中,mtype 成员指定消息的类型,它必须是一个正整数。mtext 是消息数据。msg_sz参数是消息的数据部分(mtext) 的长度。这个长度可以为0,表示没有消息数据。
msgfg参数控制msgsnd的行为。它通常仅支持IPC_NOWAIT标志,即以非阻塞的方式发送消息。默认情况下,发送消息时如果消息队列满了,则msgsnd将阻塞。若IPC_NOWAIT标志被指定,则msgsnd将立即返回并设置ermno为EAGAIN。
处于阻塞状态的msgsnd调用可能被如下两种异常情况所中断:
msgsnd成功时返回0,失败则返回-1并设置errmo。msgsnd 成功时将修改内核数据结构msqid_ds的部分字段,如下所示:
msgrcv系统调用从消息队列中获取消息。其定义如下:
/* Receive message from message queue. This function is a cancellation point and therefore not marked with __THROW. */ extern ssize_t msgrcv (int __msqid, void *__msgp, size_t __msgsz, long int __msgtyp, int __msgflg);
msqid参数是由msgget调用返回的消息队列标识符。
msg_ ptr 参数用于存储接收的消息,msg. _sz 参数指的是消息数据部分的长度。
msgtype参数指定接收何种类型的消息。我们可以使用如下几种方式来指定消息类型:
参数msgfg控制msgrev函数的行为。它可以是如下一-些标志的按位或:
处于阻塞状态的msgrcv调用还可能被如下两种异常情况所中断:
msgrcv成功时返回0,失败则返回-1并设置errmo。 msgrcv 成功时将修改内核数据结构msqid_ds 的部分字段,如下所示:
msqid参数是由msgget调用返回的共享内存标识符。__cmd 参数指定要执行的命令。
msgctl支持的所有命令如表所示。
/* Message queue control operation. */ extern int msgctl (int __msqid, int __cmd, struct msqid_ds *__buf) __THROW;
msgctl成功时的返回值取决于__cmd参数,如表所示。msgctl函数失败时返回-1
并设置ermo。
由于fork调用之后,父进程中打开的文件描述符在子进程中仍然保持打开,所以文件描述符可以很方便地从父进程传递到子进程。需要注意的是,传递一个文件描述符并不是传递一个文件描述符的值,而是要在接收进程中创建一个新的文件描述符,并且该文件描述符和发送进程中被传递的文件描述符指向内核中相同的文件表项。
#include <sys/socket.h> #include <fcntl.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <assert.h> #include <string.h> static const int CONTROL_LEN = CMSG_LEN(sizeof(int)); void send_fd(int fd, int fd_to_send) { struct iovec iov[1]; struct msghdr msg; char buf[0]; iov[0].iov_base = buf; iov[0].iov_len = 1; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; cmsghdr cm; cm.cmsg_len = CONTROL_LEN; cm.cmsg_level = SOL_SOCKET; cm.cmsg_type = SCM_RIGHTS; *(int *)CMSG_DATA(&cm) = fd_to_send; msg.msg_control = &cm; msg.msg_controllen = CONTROL_LEN; sendmsg(fd, &msg, 0); } int recv_fd(int fd) { struct iovec iov[1]; struct msghdr msg; char buf[0]; iov[0].iov_base = buf; iov[0].iov_len = 1; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; cmsghdr cm; msg.msg_control = &cm; msg.msg_controllen = CONTROL_LEN; recvmsg(fd, &msg, 0); int fd_to_read = *(int *)CMSG_DATA(&cm); return fd_to_read; } int main() { int pipefd[2]; int fd_to_pass = 0; int ret = socketpair(PF_UNIX, SOCK_DGRAM, 0, pipefd); assert(ret != -1); pid_t pid = fork(); assert(pid >= 0); if (pid == 0) { close(pipefd[0]); fd_to_pass = open("test.txt", O_RDWR, 0666); send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0); close(fd_to_pass); exit(0); } close(pipefd[1]); fd_to_pass = recv_fd(pipefd[0]); char buf[1024]; memset(buf, '\0', 1024); read(fd_to_pass, buf, 1024); printf("I got fd %d and data %s\n", fd_to_pass, buf); close(fd_to_pass); }