Linux下创建新进程的系统调用是fork
#include <sys/types.h> #include <unistd.h> pid fork(void);
该调用返回两次,父进程中返回子进程的pid,子进程中返回0,失败返回-1,并设置errno。
fork复制当前进程,在内核进程表中创建一个新的进程表项。新的进程表项有很多属性和原进程相同,比如堆指针、栈指针和标志寄存器的值。也有一些属性被赋予新的值,比如新进程的ppid会被设为原进程的pid,信号位图被清除(原进程设置的信号处理函数不再对新进程生效)。
子进程的代码与父进程完全相同,同时还会复制父进程的数据(堆、栈、静态数据)。数据的复制采用写时复制。
在父进程中打开的fd,在子进程中也是打开的,且其引用计数会加1,所以如果不需要的话,莫要忘记关闭。
exec系列系统调用定义如下:
#include <unistd.h> extern char** environ; int execl(const char* path, const char* arg, ...); int execlp(const char* file, const char* arg, ...); int execle(const char* path, const char* arg, ..., char* const envp[]); int execv(const char* path, char* const argv[]); int execvp(const char* file, char* const argv[]); int execve(const char* path, char* const argv[], char* const envp[]);
path参数指定可执行文件的完整路径,file参数可以接受文件名(可执行文件),该文件的具体位置则在环境变量PATH中搜寻。arg接受可变参数,argv则接受参数数组。无论是使用arg还是argv,由于参数数量都是任意的,所以需在参数最后传一个NULL来标记参数表的结束。envp参数用于设置新程序的环境变量,如果未设置,则新程序使用由全局变量environ指定的环境变量。
实际上这一组函数完成的是一样的功能,即执行指定程序来替换当前进程映像。什么意思呢,我们知道,调用fork后,子进程会继承父进程的很多东西,跟父进程有着千丝万缕的关系,但是如果,在fork后立即调用exec系系统调用,那么会用exec指定的程序“替换”子进程原本的程序,除了一些“表象”的,无关紧要的一些数据,如ppid,你可以认为新fork出来的进程与调用fork的进程基本没什么关系了。写时复制机制在这里发挥了它的作用。
exec系列调用成功后没有返回值(因为进程空间中的程序已被替换,不再是调用exec时的程序),失败返回-1。
#include <sys/types.h> #include <stdio.h> #include <unistd.h> #include <sys/wait.h> #include <stdlib.h> int main() { printf("main process %d\n", getpid()); pid_t pid = fork(); if (pid < 0) { printf("fork error\n"); } else if (pid == 0) { printf("sub process %d exec ls -l \n", getpid()); execlp("ls", "ls", "-l", NULL); printf("execlp error \n"); exit(1); } else { wait(NULL); } return 0; }
exec函数不会关闭原程序打开的fd,除非该fd被设置了类似SOCK_CLOEXEC的属性。
子进程先于父进程退出,而父进程并没有回收子进程,释放子进程占用的资源,此时称子进程为僵尸进程。如果父进程先于子进程退出,子进程会被init进程托管,此后由init进程负责子进程的回收的资源释放。
避免子进程成为僵尸进程的办法是在父进程中调用下面的函数以等待子进程的结束:
#include <sys/types.h> #include <sys/wait.h> pid_t wait(int* stat_loc); pid_t waitpid(pid_t pid, int* stat_loc, int options);
wait函数将阻塞进程,直到某个子进程结束运行,返回该子进程的pid,子进程的退出状态信息存储与stat_loc指向的内存中。
定义在sys/wait.h中的几个宏可以帮助解释子进程退出时的状态信息
waitpid只等待由pid指定的子进程。如果pid为-1,则等待任意子进程结束。options可以控制waitpid的行为,通常取值WNOHANG,表示waitpid将是非阻塞的:如果pid指定的目标子进程还没结束或意外终止,waitpid立即返回0,如果目标子进程已正常退出,则返回子进程pid。waitpid调用失败返回-1,并设置errno。
子进程在结束时会向其父进程发送一个SIGCHLD信号,父进程由此得知子进程已经退出了。下面是SIGCHLD信号的典型处理函数
static void handle_child(int sig) { pid_t pid; int stat; while((pid = waitpid(-1, &stat, WNOHANG)) > 0) { // 善后处理 } }
fork调用后,父进程中打开的fd在子进程中也是打开的,所以可以通过管道来实现父子进程的通信。
通常使用socketpair创建父子进程间通信的管道,以在两边都能读写。
信号量主要用于对进程同步,也就是确保任意时刻只有一个进程能进入临界区。
信号量是一种特殊的变量,它只能取自然数值并且只支持两种操作,在Linux/UNIX中,称为P、V操作。假设有信号量SV,则对它的PV操作含义如下
P(SV):如果SV的值大于0,就将它减1,如果SV的值为0,则挂起进程的执行
V(SV):如果有其他进程因为等待SV而挂起,则唤醒之,否则,将SV加1
可以看出,进程在进入临界区前需执行P操作,退出临界区前需执行V操作。
信号量的取值可以是任何自然数,但最常用的是二进制信号量,即只取0和1两个值。
注意:使用一个普通变量来模拟二进制信号量是行不通的,因为所有高级语言都没有一个原子操作可以同时完成如下两步操作:检测变量是否为true/false,如果是则再将它设置为false/true。
Linux信号量的API主要包含3个系统调用:semget、semop、semctl。它们都被设计为操作一组信号量,即信号量集,而不是单个信号量。
#include <sys/sem.h> int semget(key_t key, int num_sems, int sem_flags); int semop(int sem_id, struct sembuf* sem_ops, size_t num_sem_ops); int semctl(int sem_id, int sem_num, int command, ...);
semget用于创建一个新的信号量集或者获取一个已经存在的信号量集。key是一个键值,用来标识一个全局唯一的信号量集。num_sems指定要创建/获取的信号量集中信号量的数目,获取时可以设为0,创建时则必须指定值。sem_flags指定一组标志,其低9位时该信号量的权限,其格式和含义与系统调用open的mode参数相同。semget成功返回信号量集的标识符,一个正整数,失败返回-1,并设置errno。
semop是改变信号量的值,即执行P、V操作。sem_id是semget的返回值,用于指定要操作的信号量集。num_sem_ops指定要执行的操作个数,即sem_ops数组中元素的个数。sem_ops指向的sembuf定义如下:
struct sembuf { unsigned short int sem_num; //信号量集中信号量的编号,0是第一个 short int sem_op; //操作类型,可选正整数、0、负整数,每种类型的操作行为受sem_flg影响 short int sem_flg;//可选值是IPC_NOWAIT(无论操作是否成功,semop都立即返回,非阻塞)和SEN_UNDO(当进程退出时取消正在进行的semop操作) }
semop成功返回0,失败返回-1,设置errno。失败是sem_ops中指定的所有操作都不执行。
semctl对信号量进行直接控制。sem_num指定被操作的信号量在信号量集中的编号。command指定要执行的命令,第4个参数由用户自己定义,但sys/sem.h中给出了它的推荐格式:
union semun { int val; //用于SETVAL命令 struct semid_ds* buf; //用于IPC_STAT和IPC_SET命令 unsigned short* array; //用于GETALL和SETALL命令 struct seminfo* __buf; //用于IPC_INFO命令 }
semctl支持的所有命令如下
semctl成功时的返回值取决于command参数,失败返回-1,设置errno。
可以给semget传一个特殊的键值IPC_PRIVATE(值为0)来强制创建一个新的信号量,下面是书中代码清单13-3使用IPC_PRIVATE信号量的源码:
#include <sys/sem.h> #include <iostream> #include <stdlib.h> #include <unistd.h> #include <sys/wait.h> union semun { int val; struct semid_ds* buf; unsigned short int* array; struct seminfo* __buf; }; /* op为-1时执行P操作,op为1时执行V操作 */ void pv(int sem_id, int op) { struct sembuf sem_b; sem_b.sem_num = 0; //要操作的信号量在信号量集中的编号 sem_b.sem_op = op; sem_b.sem_flg = SEM_UNDO; semop(sem_id, &sem_b, 1); } int main(int argc, char* argv[]) { int sem_id = semget(IPC_PRIVATE, 1, 0666); union semun sem_un; sem_un.val = 1; semctl(sem_id, 0, SETVAL, sem_un); pid_t pid = fork(); if (pid < 0) { return 1; } else if (pid == 0) { //子进程 std::cout << "child try to get binary sem" << std::endl; //在父子进程间共享IPC_PRIVATE信号量的关键就在于二者都可以操作该信号量的标识符sem_id pv(sem_id, -1); std::cout << "child get the sem and would release it after 5s" << std::endl; sleep(5); pv(sem_id, 1); exit(0); } else { //父进程 std::cout << "parent try to get binary sem" << std::endl; pv(sem_id, -1); std::cout << "parent get the sem and would release it after 5s" << std::endl; sleep(50); pv(sem_id, 1); } waitpid(pid, NULL, 0); //删除信号量 semctl(sem_id, 0, IPC_RMID, sem_un); return 0; }
共享内存是最高效的IPC机制,因为它不涉及进程间的任何数据传输。但是需要其他手段来同步进程对共享内存的访问,防止产生竞态条件。
Linux共享内存的API定义如下:
#include <sys/shm.h> int shmget(key_t key, size_t size, int shmflg); void* shmat(int shm_id, const void* shm_addr, int shmflg); int shmdt(const void* shm_addr); int shmctl(int shm_id, int command, struct shmid_ds* buf);
shmget创建一段新的共享内存或者获取一段已经存在的共享内存,key是个键值,用于标识一段全局唯一的共享内存。size指定共享内存大小,单位是字节。shmflg参数的使用和含义与semget系统调用的sem_flags相同,不过shmget支持SHM_HUGETLB和SHM_NORESERVE。shmget成功返回一个正整数值,是共享内存的标识符,失败返回-1,并设置errno。
共享内存被创建后不能立即访问它,而需要先将它关联到进程的地址空间,用完后也需要分离。这两项任务由shmat和shmdt实现。参数shm_addr指定将共享内存关联到进程的哪块地址空间,推荐设为NULL,由操作系统选择。shmflg是标志。shmat成功返回被关联到的地址,失败返回(void*)-1,并设置errno。shmdt的参数shm_addr使用shmat返回值,它成功返回0,失败返回-1,设置errno。
shmctl用于控制共享内存的某些属性。command参数用于指定要执行的命令。支持的命令如下:
shmctl成功时的返回值取决于command,失败返回-1,并设置errno。
Linux提供了一种利用mmap在无关进程间共享内存的方式。这种方式无需任何文件的支持。下面的方法用于创建或打开一个POSIX共享内存对象以及删除这个对象
//以下两个是共享内存的POSIX方法 编译的时候需要指定链接选项 -lrt ( 实时库real time) #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> int shm_open(const char* name, int oflag, mode_t mode); int shm_unlink(const char* *name);
shm_open的使用方法与open系统调用完全相同。需要指出的是参数name指定要创建/打开的共享内存对象,考虑到可移植性,应该使用“/somename”的格式。shm_unlink将参数name指定的共享内存对象标记为等待删除,当所有使用该共享内存对象的进程都使用ummap将它从进程中分离后,系统将销毁这个共享内存对象所占据的资源。
下面是书中代码清单13-4使用共享内存的聊天室服务器程序
// shm_unlink和shm_open是POSIX共享内存函数,编译时候需要带上 -lrt 选项 #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <signal.h> #include <sys/wait.h> #include <sys/mman.h> #include <sys/stat.h> #define USER_LIMIT 5 #define BUFFER_SIZE 1024 #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 // 书上这里定义的是1024 严重怀疑会越界 #define PROCESS_LIMIT 65535 struct client_data { sockaddr_in address; int connfd; pid_t pid; // 处理这个连接的子进程id int pipefd[2]; // 和父进程通信用的管道 }; //共享内存名字 static const char* shm_name = "/my_shm"; int sig_pipefd[2]; int epollfd; int listenfd; int shmfd; char* share_mem = 0; client_data* users = 0; //子进程和客户链接的映射关系表,用进程id来索引这个数组,即可取得进程所处理的客户链接编号 int* sub_process = 0; //当前客户数量 int user_count = 0; bool stop_child = false; int setnonblocking(int fd) { int oldopt = fcntl(fd, F_GETFL); int newopt = oldopt | O_NONBLOCK; fcntl(fd, F_SETFL, newopt); return oldopt; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { printf("process %d recv sig %d\n", getpid(), sig); int save_errno = errno; int msg = sig; send(sig_pipefd[1], (char*)&msg, 1, 0); errno = save_errno; } void addsig(int sig, void(*handler)(int), bool restart = true) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = handler; if (restart) { sa.sa_flags |= SA_RESTART; } sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void del_resource() { close(sig_pipefd[0]); close(sig_pipefd[1]); close(listenfd); close(epollfd); shm_unlink(shm_name); delete[] users; delete[] sub_process; } // 停止一个子进程 void child_term_handler(int sig) { printf("process %d recv sig %d\n", getpid(), sig); stop_child = true; } // 子进程运行的数组,idx指出该子进程处理的客户连接的编号 int run_child(int idx, client_data* users, char* share_mem) { epoll_event events[MAX_EVENT_NUMBER]; //子进程使用epoll同时监听客户端连接和与父进程通信的管道fd int child_epollfd = epoll_create(5); assert(child_epollfd != -1); int connfd = users[idx].connfd; addfd(child_epollfd, connfd); int pipefd = users[idx].pipefd[1]; addfd(child_epollfd, pipefd); int ret; // 子进程设置自己的信号处理函数 addsig(SIGTERM, child_term_handler, false); while (!stop_child) { int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1); if (number < 0 && errno != EINTR) { printf("epoll failuer\n"); break; } for (int i = 0; i < number; ++i) { int sockfd = events[i].data.fd; if (sockfd == connfd && events[i].events & EPOLLIN) { memset(share_mem + idx * BUFFER_SIZE, '\0', BUFFER_SIZE); ret = recv(sockfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0); if (ret < 0) { if (errno != EAGAIN) { stop_child = true; } } else if (ret == 0) { stop_child = true; } else { // 成功读取客户数据后通知主进程 send(pipefd, (char*)&idx, sizeof(idx), 0); } } else if (sockfd == pipefd && events[i].events & EPOLLIN) { // 主进程通知子进程将第client个客户端的数据发送到本进程负责的客户端 int client = 0; ret = recv(sockfd, (char*)&client, sizeof(client), 0); if (ret < 0) { if (errno != EAGAIN) { stop_child = true; } } else if (ret == 0) { stop_child = true; } else { send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); } } else { continue; } } } close(connfd); close(child_epollfd); close(pipefd); return 0; } int main(int argc, char* argv[]) { if (argc <= 2) { printf("usage: %s ip port\n", argv[0]); return 0; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(port); inet_pton(AF_INET, ip, &address.sin_addr); listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); user_count = 0; users = new client_data[USER_LIMIT + 1]; sub_process = new int[PROCESS_LIMIT]; for (int i = 0; i < PROCESS_LIMIT; ++i) { sub_process[i] = -1; } epoll_event events[MAX_EVENT_NUMBER]; epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd); assert(ret != -1); setnonblocking(sig_pipefd[1]); addfd(epollfd, sig_pipefd[0]); addsig(SIGCHLD, sig_handler); addsig(SIGTERM, sig_handler); addsig(SIGINT, sig_handler); addsig(SIGPIPE, sig_handler); bool stop_server = false; bool terminate = false; // 创建共享内存,作为所有客户端socket连接的读缓存 shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); assert(shmfd != -1); // ftruncate会将参数fd指定的文件大小改为参数length指定的大小 ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE); assert(ret != -1); // mmap用于申请一段内存空间。我们可以将这段内存作为进程间通信的共享内存,也可以将文件直接映射到其中 // MAP_SHARED控制内存段内容被修改后程序的行为 为在进程间共享这段内存,对该内存的修改将反映到被映射的文件中 share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); assert(share_mem != MAP_FAILED); close(shmfd); while (!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if (number < 0 && errno != EINTR) { printf("epoll failuer\n"); break; } for (int i = 0; i < number; ++i) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); if (connfd < 0) { printf("accept error %d\n", errno); continue; } if (user_count >= USER_LIMIT) { const char* info = "too many users\n"; printf("%s", info); send(connfd, info, strlen(info), 0); close(connfd); continue; } //保存第user_count个客户连接的相关数据 users[user_count].address = client_address; users[user_count].connfd = connfd; // 在主进程和子进程间建立管道,已传递必要的数据 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); assert(ret != -1); pid_t pid = fork(); if (pid < 0) { close(connfd); continue; } else if (pid == 0) { close(epollfd); close(listenfd); close(users[user_count].pipefd[0]); close(sig_pipefd[0]); close(sig_pipefd[1]); run_child(user_count, users, share_mem); munmap((void*)share_mem, USER_LIMIT * BUFFER_SIZE); exit(0); } else { printf("create new child process: %d\n", pid); close(connfd); //主进程用pipefd[0]端进行读写 子进程用pipefd[1] close(users[user_count].pipefd[1]); addfd(epollfd, users[user_count].pipefd[0]); users[user_count].pid = pid; sub_process[pid] = user_count; user_count++; } } // 处理信号事件 else if (sockfd == sig_pipefd[0] && events[i].events & EPOLLIN) { int sig; char signals[1024]; ret = recv(sockfd, signals, sizeof(signals), 0); if (ret == -1) { continue; } else if (ret == 0) { continue; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { // 子进程退出,表示有某个客户端关闭了连接 case SIGCHLD: { pid_t pid; int stat; /*-1表示等待任意子进程结束 WNOHANG指定waitpid调用是非阻塞的 *若没有子进程结束或意外终止则立即返回0,调用失败返回-1*/ while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) { int del_user = sub_process[pid]; sub_process[pid] = -1; if (del_user < 0 || del_user > USER_LIMIT) { continue; } //清除该客户连接使用的相关数据 epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); close(users[del_user].pipefd[0]); sub_process[users[del_user].pid] = del_user; } if (terminate && user_count == 0) { stop_server = true; } break; } case SIGTERM: case SIGINT: { //结束服务器进程 printf("kill all the child now, cur user_count: %d\n", user_count); if (user_count == 0) { stop_server = true; break; } for (int i = 0; i < user_count; ++i) { int pid = users[i].pid; printf("send SIGTERM %d to %d\n", SIGTERM, pid); kill(pid, SIGTERM); } terminate = true; break; } default: break; } } } } // 某个子进程向父进程写了数据 else if (events[i].events & EPOLLIN) { int child = 0; ret = recv(sockfd, (char*)&child, sizeof(child), 0); if (ret == -1) { continue; } else if (ret == 0) { continue; } else { //通知child以外的子进程有数据要写 for(int j = 0; j < user_count; ++j) { if (users[j].pipefd[0] != sockfd) { printf("send data to child accross pipe\n"); send(users[j].pipefd[0], (char*)&child, sizeof(child), 0); } } } } } } del_resource(); return 0; }
消息队列是在两个进程之间传递二进制块数据的一种简单有效的方式。每个数据块都有一个特定的类型,接收方可以根据类型来有选择的接收数据。下面是Linux消息队列的4个系统调用的定义
#include <sys/msg.h> int msgget(key_t key, int msgflg); int msgsnd(int msgid, const void* msg_ptr, size_t msg_sz, int msgflg); int msgrcv(int msgid, void* msg_ptr, size_t msg_sz, long int msgtype, int msgflg); int msgctl(int msgid, int command, struct msgid_ds* buf);
msgget用于创建或获取一个已有的消息队列,参数msgflg与semget系统调用的sem_flags参数相同。成功返回一个正整数值,为消息队列的标识符,失败返回-1,并设置errno。
msgsnd用于把一条消息添加到消息队列。msg_ptr指向一个准备发送的消息,消息必须被定义为如下msgbuf类型。msgflg控制调用行为,仅支持IPC_NOWAIT标志,表示已非阻塞方式发送消息。默认情况下若消息队列满了,则msgsnd将阻塞。
struct msgbuf { long mtype; //消息类型 char mtext[152]; //消息数据 }
msgrcv从消息队列中获取消息。msgtype用于指定接受何种类型的消息,可以使用如下几种方式来指定:
msgflg控制msgrcv的行为。它是如下一些标志的按位或
msgctl用于控制消息队列的属性,command指定要执行的命令,支持的命令如下
Linux提供了ipcs命令来查看当前系统上拥有哪些共享资源实例,提供了ipcrm命令来删除遗留在系统中的共享资源。
前面知道,fork调用后,父进程中打开的文件描述符在子进程中仍然保持打开,需要注意的是传递一个文件描述符并不是传递一个文件描述符的值,而是在接收进程中创建了一个新的文件描述符,并且该文件描述符和发送进程中被传递的fd指向内核中相同的文件表项。
Linux下可以利用UNIX域socket在进程间传递特殊的辅助数据。