简单来讲I/O多路复用就是用一个进程来监听多个文件描述符(fd),我们将监听的fd通过系统调用注册到内核中,如果有一个或多个fd可读或可写,内核会通知应用程序来对这些fd做读写操作,select、poll、epoll都是用于处理此类问题的系统API,只不过注册和调用的方式略有不同。
例如telnet命令的操作,telnet命令从shell读入数据然后写到socket fd上,同时也需要从socket fd上读数据写到shell上。telnet server需要从socket读出命令并发送给shell,再将命令执行结果返回给telnet客户端。此时对于telnet命令来说,需要接收用户输入和sockfd的输入,也需要输出给用户和socket fd,这两种输入和输出是无序的,不能单纯的阻塞某一个读操作,如何处理这种场景?
这样父进程读入用户数据后会发送给socketfd到telenetd,子进程读入telnetd数据后发送给用。当用户终止父进程时,需要发送信号给子进程。当子进I/O结束终止时,父进程也需要接收子进程的结束信号。使用多线程同样需要一些复杂的线程间同步操作。
以上三种方法在读写连接少的时候没什么问题,当一个server进程需要维护成千上万条通信连接时就会出问题。第1种会无端浪费cpu,第2种就算使用线程\进程池来避免上下文切换的开销,当连接数量过多的时候,会占用大量的内存,第3种使用异步I/O显然信号类型肯定是不够用的。所以为了应对此类问题,有了I/O多路复用的技术。
先看一下select的创建函数
#include <sys/select.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); // 监听描述符数目 // readfds、writefds、exceptfds表示可读、可写、异常事件对应的fd // timeout表示select阻塞多长时间后返回,NULL为一直阻塞、0为立即返回、或指定超时时间 /* 返回值: 0表示超时时间内没有就绪的fds 成功时返回就绪fds总数(读、写、异常) 失败返回-1并设置errno,如果select等待期间被信号中断则立即返回-1并设置errno为EINTR */
fd_set
是一个字节数组,每一位标识一个fd。所以通常nfds设置为最大的fd的值+1,在sys/selct.h
中可以找到/* Number of descriptors that can fit in an fd_set'. */值为
#define __FD_SETSIZE 1024,系统默认单个进程打开最大fd数量
ulimit -n`为1024,所以select默认最大只能监听1024个fd。select通过以下四个宏来对fd_set置位:
void FD_CLR(int fd, fd_set *set); // 清除fd_set中的fd位 int FD_ISSET(int fd, fd_set *set); // 确认fd是否在fd_set中开启,非0值为开启,0为关闭 void FD_SET(int fd, fd_set *set); // 开启fd在fd_set中的位 void FD_ZERO(fd_set *set); // 清除fd_set的所有位
我们可以使用select的read_fds和exception_fds来接收普通数据和带外数据
#include <arpa/inet.h> #include <netinet/in.h> #include <sys/select.h> #include <sys/socket.h> #include <unistd.h> #include <cassert> #include <cstring> #include <iostream> #define BUFFERSIZE 1024 using namespace std; int main(int argc, char *argv[]) { if (argc < 3) { cout << "usage: " << argv[0] << " ip port" << endl; return 1; } // 设置TCP socket server struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(argv[2])); const char *ip = argv[1]; inet_pton(AF_INET, ip, &server_addr.sin_addr); int listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "error in create socket" << endl; return 1; } int ret = bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)); assert(ret != -1); ret = listen(listenfd, 6); assert(ret != -1); // 接收客户端连接 struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_addr_len); if (connfd < 0) { close(listenfd); cout << "accept connect error" << endl; return 1; } // 初始化要用到的select fd集 fd_set readfds; fd_set exceptionfds; FD_ZERO(&readfds); FD_ZERO(&exceptionfds); char buffer[BUFFERSIZE]; while (true) { // 如果是普通数据则触发readfds, 如果是oob数据触发exceptionfds FD_SET(connfd, &readfds); FD_SET(connfd, &exceptionfds); // 注册select, 不关心写fds设置为NULL,timeout NULL为阻塞 ret = select(connfd + 1, &readfds, NULL, &exceptionfds, NULL); if (ret < 0) { cout << "select error" << endl; break; } memset(buffer, '\0', BUFFERSIZE); if (FD_ISSET(connfd, &readfds)) { // 接收普通数据 int number = recv(connfd, buffer, BUFFERSIZE - 1, 0); if (number < 0) { cout << "recv normal data error" << endl; break; } else if (number == 0) { cout << "connection closed" << endl; break; } cout << "recv normal data " << number << " bytes: " << buffer << endl; } memset(buffer, '\0', BUFFERSIZE); if (FD_ISSET(connfd, &exceptionfds)) { // 接收带外数据 int number = recv(connfd, buffer, BUFFERSIZE - 1, MSG_OOB); if (number < 0) { cout << "recv oob data error" << endl; break; } else if (number == 0) { cout << "connection closed" << endl; break; } cout << "recv oob data " << number << " bytes: " << buffer << endl; } } close(listenfd); close(connfd); return 0; }
客户端截取部分发送内容
const char *oob_data = "abc"; const char *normal_data = "123"; send(sockfd, normal_data, strlen(normal_data), 0); send(sockfd, oob_data, strlen(oob_data), MSG_OOB); send(sockfd, normal_data, strlen(normal_data), 0); send(sockfd, normal_data, strlen(normal_data), 0); send(sockfd, normal_data, strlen(normal_data), 0);
运行结果如下,成功的接收到带外数据并处理:
getsockopt
和setsockopt
获取设置),socket可读,recv大于0。对端关闭连接,recv等于0。如果没有资源这次读取不成功recv返回小于0,并且错误码为EAGIN或EWOULDBLOCK errno,这种不算是错误,或许下次读取就可以成功。poll较select做出了改进,select使用bitmap来监视fds,而poll使用pollfd结构的数组来监视fds,突破了fds数量的限制,通过结构体将fd与events绑定,可以监视更多类型的事件
struct pollfd { int fd; /* file descriptor */ short events; /* requested events 注册的事件*/ short revents; /* returned events 实际发生的事件*/ };
poll的创建函数
int poll(struct pollfd *fds, nfds_t nfds, int timeout) // fds 是pollfd结构类型的数组 // nfds 指定fds的大小 // timeout 超时时间,-1阻塞,0立即返回 /* 返回值: 0表示超时时间内没有就绪的fds 成功时返回就绪fds总数(读、写、异常) 失败返回-1并设置errno,如果select等待期间被信号中断则立即返回-1并设置errno为EINTR */
监听两个文件的写入,输出到标准输出
#include <fcntl.h> #include <poll.h> #include <unistd.h> #include <cstdio> #include <cstring> #include <iostream> #define BUFFERSIZE 1024 using namespace std; // 存放pollfd结构数组 pollfd fds[2]; void setnonblocking(int fd) { int old_fd_option = fcntl(fd, F_GETFL); int new_fd_option = O_NONBLOCK | old_fd_option; fcntl(fd, F_SETFL, new_fd_option); } int main(int argc, char *argv[]) { if (argc < 2) { cout << "usage: " << argv[0] << "filename1 filename2" << endl; return 1; } // 打开创建好的文件 int fd1 = open(argv[1], O_RDONLY); int fd2 = open(argv[2], O_RDONLY); // 设置pollfd结构 fds[0].fd = fd1; fds[0].events = POLLIN | POLLERR; fds[0].revents = 0; fds[1].fd = fd2; fds[1].events = POLLIN | POLLERR; fds[1].revents = 0; // 设置fd为非阻塞,方便看读取的效果,否则会阻塞在read调用上 setnonblocking(fd1); setnonblocking(fd2); char buffer[BUFFERSIZE]; int number = 0; while (true) { // 创建poll int ret = poll(fds, 2, -1); if (ret < 0) { cout << "poll error" << endl; break; } for (int i = 0; i < 2; ++i) { pollfd fd = fds[i]; if (fd.revents & POLLERR) { cout << "poll error fd: " << fd.fd << endl; continue; // 如果fd可读 } else if (fd.revents & POLLIN) { // 每次poll事件清空缓冲区 bzero(buffer, BUFFERSIZE); while ((number = read(fd.fd, buffer, BUFFERSIZE)) > 0) { cout << "read " << number << " bytes from file " << argv[i + 1] << " content: " << buffer << endl; } } } } close(fd1); return 0; }
server端输出
epoll与select和poll有很大的差异,epoll将需要监视的fd放入内核的红黑树表中,通过epoll_ctl
函数来添加或删除该表中需要监视的fd,只复制已经就绪的fd集合返回给应用。
创建epoll:
int epoll_create(int size); // size:提示内核事件表的大小,不是硬限制 // 返回一个fd,所有其他的函数都操作该fd
操作事件:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epfd:epoll_create返回的fd /* op: EPOLL_CTL_ADD 添加fd到epfd,事件集合为event EPOLL_CTL_MOD 修改epfd中的fd事件,事件集合为event EPOLL_CTL_DEL 从epfd中删除fd,忽略event参数,一般设为NULL */ // 返回值:成功返回0,失败返回-1设置errno
获取就绪的事件集
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); // epfd:epoll_create返回的fd // events:就绪的事件数组,应用遍历它 // maxevents:指定最大监听的事件数目 // timeout:超时时间,-1阻塞,0立即返回 // 返回值:成功返回就绪fd的数目,失败返回-1设置errno
epoll支持两个模式LT(Level Trigger)和ET(Edge Trigger)
epoll_wait
系统调用的次数(上下文切换),所以这种模式也被称为高效的epoll模式我们说ET模式对于一个事件只会触发一次,如果是多线程的并发场景下,当前线程在读完socket上的数据后开始处理这些数据,在处理期间有新的数据到来,此时唤醒新的线程来处理新到来的数据,出现了两个线程操作同一fd的情况,可能会出现未知错误。EPOLLONESHOT事件可以保证,操作系统对该fd只触发一种事件,并且只触发一次,这样任何时刻只能有一个线程操作该fd。这样也会导致下次该事件无法触发,所以线程处理完毕后应当使用epoll_ctl
重置EPOLLONESHOT。
server的主线程与客户端建立TCP连接,建立好连接后将连接fd注册到epoll,如果该链接有请求数据就启动新的线程来处理。使用telnet作为客户端对比不使用EPOLLONESHOT和使用EPOLLONESHOT后server的行为
#include <arpa/inet.h> #include <fcntl.h> #include <netinet/in.h> #include <pthread.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <cassert> #include <cstring> #include <iostream> using namespace std; #define MAX_EVENT_NUMBER 1024 #define BUFFERSIZE 1024 static int epollfd = 0; void setnonblocking(int fd) { int old_fd_option = fcntl(fd, F_GETFL); int new_fd_option = old_fd_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_fd_option); } void register_epoll(int epollfd, int fd, bool newfd = false, bool oneshot = false) { epoll_event events; events.data.fd = fd; events.events = EPOLLIN | EPOLLET; // 读事件、ET工作模式 if (oneshot) { events.events |= EPOLLONESHOT; // 使用EPOLLONESHOT } if (newfd) { epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &events); } else { epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events); } setnonblocking(fd); } void *handle_connect(void *arg) { pid_t tid = gettid(); int connfd = *((int *)arg); cout << "use thread " << tid << " to handle connect " << connfd << endl; char buffer[BUFFERSIZE]; memset(buffer, '\0', BUFFERSIZE); while (true) { int bytes = recv(connfd, buffer, BUFFERSIZE - 1, 0); if (bytes == 0) { cout << "the other peer close connection" << endl; close(connfd); break; } else if (bytes < 0) { if (errno == EAGAIN) { cout << connfd << " Temporarily unavailable, read later" << endl; register_epoll(epollfd, connfd, false, true); // 重置该连接fd的EPOLLONESHOT break; } else { cout << "read " << connfd << " failure" << endl; close(connfd); } } else { cout << "thread " << tid << " recve " << bytes << " bytes from connection " << connfd << ", content: " << buffer << endl; sleep(10); } } cout << "thread " << tid << " end handle connect " << connfd << endl; } int main(int argc, char *argv[]) { if (argc < 3) { cout << "usage: " << argv[0] << " ip port" << endl; return 1; } // 创建server端socket const char *ip = argv[1]; struct sockaddr_in serv_addr; serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(atoi(argv[2])); inet_pton(AF_INET, ip, &serv_addr.sin_addr); int listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { cout << "create socket error" << endl; return 1; } int ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); // epoll_event数组,用来接收返回的就绪fd epoll_event events[MAX_EVENT_NUMBER]; // 创建epoll epollfd = epoll_create(5); if (epollfd < 0) { cout << "create epoll error" << endl; close(listenfd); return 1; } // listenfd 无需使用EPOLLONESHOT register_epoll(epollfd, listenfd, true, false); while (true) { // 等待事件触发 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if ((sockfd == listenfd) && (events[i].events & EPOLLIN)) { // 接收客户端连接 struct sockaddr cli_addr; socklen_t cli_addr_len = sizeof(cli_addr); int connfd = accept(sockfd, (struct sockaddr *)&cli_addr, &cli_addr_len); if (connfd < 0) { cout << "accept connect failure" << endl; continue; } // 新的连接使用EPOLLONESHOT属性 register_epoll(epollfd, connfd, true, true); // 新的连接不使用EPOLLONESHOT属性 // register_epoll(epollfd, connfd, true, false); } else if (events[i].events & EPOLLIN) { // 已建立的连接有数据请求 pthread_t thread; // 创建线程处理连接数据,传入sockfd参数以便重置EPOLLONESHOT pthread_create(&thread, NULL, handle_connect, &sockfd); } else { cout << "other errors" << endl; } } } close(listenfd); close(epollfd); return 0; }
server使用线程102108逐个处理 connect5的请求,对于connect6使用线程102109单独处理
修改代码
注释掉 // register_epoll(epollfd, connfd, false, // true); // 重置该连接fd的EPOLLONESHOT 不给connfd使用EPOLLONESHOT // 新的连接使用EPOLLONESHOT属性 // register_epoll(epollfd, connfd, true, true); // 新的连接不使用EPOLLONESHOT属性 register_epoll(epollfd, connfd, true, false);
编译运行
线程102137处理connfd 5,sleep的期间内,connfd5有新的请求到来,可以看到新起了线程来处理connfd5的新消息
FD_ISSET
判断事件是否就绪。(两次fd_set拷贝,两次fd_set遍历)__FD_SETSIZE 1024
宏,可修改该值重新编译内核来增加select可监视fd的数目/proc/sys/fs/file-max/
epoll_create
创建,返回一个fd来使用,维护着所有需要监视的fd。通过系统调用epoll_ctl
对fd对应的事件进行增、删、改。应用代码调用epoll_wait
来获取已经触发事件的fd,epoll会将就绪的epoll_event结构的fd放入数组并拷贝到用户态,应用直接遍历该数组即可拿到每一个触发事件的fd/proc/sys/fs/file-max/
学习自:
《Linux高性能服务器编程》
《UNIX环境高级编程》
《UNIX系统编程》