I/O 多路复用技术是为了解决进程或线程阻塞到某个 I/O 系统调用而出现的技术,使进程不阻塞于某个特定的 I/O 系统调用。
在IO多路复用技术描述前,先讲解下同步,异步,阻塞,非阻塞的概念。
linux网络IO中涉及到的模型如下:
(1)阻塞式IO
(2)非阻塞式IO
(3)IO多路复用
(4)信号驱动IO
(5)异步IO
今天不谈信号驱动IO,略过..
在学习IO模型的时候,我们必须明确一个概念,处理 IO 的时候,阻塞和非阻塞都是同步 IO。
只有使用了特殊的 API 才是异步 IO,例如Linux网络中的AIO。
再看下POSIX对同步和异步这两个术语的定义:
通俗的理解下同步和异步
同步:当执行系统调用read时,需要用户等待内核完成从内核缓冲区到用户缓冲区的数据拷贝。
异步:当执行异步IO操作例如aio_read
时,用户不需要等待,只需要接收内核完成操作的通知,由内核来完成数据的读取。
在知晓阻塞和非阻塞都是同步 IO后,阻塞和非阻塞就很好理解了
阻塞IO:由系统调用read,导致线程一直等待数据返回。
非阻塞IO:系统调用read后立即返回一个状态,当数据达到内核缓冲区之前都是非阻塞的,即返回一个系统调用状态。
ps:闪客的动图做的非常的形象,上述gif动图来源「低并发编程」
IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;
select 是操作系统提供的系统调用函数,select()用来等待文件描述词(普通文件、终端、伪终端、管道、FIFO、套接字及其他类型的字符型)状态的改变。是一个轮循函数,循环询问文件节点,可设置超时时间,超时时间到了就跳过代码继续往下执行。
通过select,我们可以把一个文件描述符的数组发给操作系统, 让操作系统去遍历,确定哪个文件描述符可以读写, 然后告诉我们去处理:
头文件
#include <sys/select.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h>
拥塞函数,拥塞等待文件描述符事件的到来
int select(int maxfdp , fd_set *readset , fd_set *writeset , fd_set *exceptset ,struct timeval *timeout);
参数说明:
maxfdp:被监听的文件描述符的最大值,它比所有文件描述符集合中的文件描述符的最大值大1,因为文件描述符是从0开始计数的;
readfds、writefds、exceptset:分别指向可读、可写和异常等事件对应的描述符集合。
timeout:用于设置select函数的超时时间,即告诉内核select等待多长时间之后就放弃等待。timeout == NULL 表示等待无限长的时间,timeout == 0,select立即返回
struct timeval { long tv_sec; /*秒 */ long tv_usec; /*微秒 */ };
int FD_ZERO(int fd, fd_set *fdset); //一个 fd_set类型变量的所有位都设为 0 int FD_CLR(int fd, fd_set *fdset); //清除某个位时可以使用 int FD_SET(int fd, fd_set *fd_set); //设置变量的某个位置位 int FD_ISSET(int fd, fd_set *fdset); //测试某个位是否被置位
当声明了一个文件描述符集后,必须用FD_ZERO将所有位置零
调用 select函数,拥塞等待文件描述符事件的到来 ;如果超过设定的时间,则不再等待,继续往下执行
select返回后,用FD_ISSET测试给定位是否置位:
if(FD_ISSET(fd, &rset) { ... //do something }
fd_set其实这是一个数组的宏定义,实际上是一long类型的数组,每一个数组元素都能与一打开的文件句柄(socket、文件、管道、设备等)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪个句柄可读。
整个 select 的流程图如下:
Demo1:select示例
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/wait.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/time.h> #include <sys/types.h> #define MAXBUF 1024 #define LISTEN_NUM 2 int main(int argc, char **argv) { int default_port = 8000; int optch = 0; while ((optch = getopt(argc, argv, "s:p:")) != -1) { switch (optch) { case 'p': default_port = atoi(optarg); printf("port: %s\n", optarg); break; case '?': printf("Unknown option: %c\n", (char)optopt); break; default: break; } } int sockfd, new_fd; socklen_t len; struct sockaddr_in my_addr, their_addr; char buf[MAXBUF + 1]; fd_set rfds; // select struct timeval tv; //超时时间 int retval, maxfd = -1; // select返回值 select监听句柄的最大数量 if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); exit(EXIT_FAILURE); } bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = PF_INET; my_addr.sin_port = htons(default_port); my_addr.sin_addr.s_addr = INADDR_ANY; if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(EXIT_FAILURE); } if (listen(sockfd, LISTEN_NUM) == -1) { perror("listen"); exit(EXIT_FAILURE); } /* 数据处理 */ while (1) { printf("\n----wait for new connect port:%d\n",default_port); len = sizeof(struct sockaddr); if ((new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &len)) == -1) { perror("accept"); exit(errno); } else printf("server: got connection from %s, port %d, socket %d\n", inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd); while (1) { FD_ZERO(&rfds); FD_SET(0, &rfds); FD_SET(new_fd, &rfds); maxfd = new_fd; tv.tv_sec = 1; tv.tv_usec = 0; retval = select(maxfd + 1, &rfds, NULL, NULL, &tv); if (retval == -1) { perror("select"); exit(EXIT_FAILURE); } else if (retval == 0) { continue; } else { /*标准输入*/ if (FD_ISSET(0, &rfds)) { bzero(buf, MAXBUF + 1); fgets(buf, MAXBUF, stdin); if (!strncasecmp(buf, "quit", 4)) { printf("i will quit!\n"); break; } len = send(new_fd, buf, strlen(buf) - 1, 0); if (len > 0) printf("send successful,%d byte send..\n", len); else { printf("send failure!"); break; } } if (FD_ISSET(new_fd, &rfds)) { bzero(buf, MAXBUF + 1); len = recv(new_fd, buf, MAXBUF, 0); if (len > 0) printf("recv success :'%s', %d byte recv..\n", buf, len); else { if (len < 0) printf("recv failure\n"); else { printf("the client close ,quit\n"); break; } } } } } close(new_fd); printf("need othe connecdt (no->quit)"); fflush(stdout); bzero(buf, MAXBUF + 1); fgets(buf, MAXBUF, stdin); if (!strncasecmp(buf, "no", 2)) { printf("quit!\n"); break; } } close(sockfd); return 0; }
makefile:
TARGET=server SRC = $(wildcard *.cpp *.c) OBJ = $(patsubst %.cpp *.c,%.o,$(SRC)) DEFS = CFLAGS = -g CC =g++ LIBS = -lpthread $(TARGET):$(OBJ) $(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS) .PHONY: clean: rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ make g++ -g -o server select.c -lpthread ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ ./server ----wait for new connect port:8000
#include <stdio.h> #include <string.h> #include <errno.h> #include <sys/socket.h> #include <resolv.h> #include <stdlib.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <sys/time.h> #include <sys/types.h> #define MAXBUF 1024 int main(int argc, char **argv) { int sockfd, len; struct sockaddr_in dest; char buffer[MAXBUF + 1]; fd_set rfds; struct timeval tv; int retval, maxfd = -1; int optch,ret = -1; const char*server_addr; int default_port = 8000; /*判断是否为合法输入 必须传入一个参数:服务器Ip*/ if(argc<3) { printf("usage:tcpcli <IPaddress>"); return 0; } while((optch = getopt(argc, argv, "s:p:")) != -1) { switch (optch) { case 's': server_addr = optarg; break; case 'p': default_port = atoi(optarg); printf("port: %s\n", optarg); break; case '?': printf("Unknown option: %c\n",(char)optopt); break; default: break; } } if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Socket"); exit(EXIT_FAILURE); } bzero(&dest, sizeof(dest)); dest.sin_family = AF_INET; dest.sin_port = htons(default_port); if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0) { perror(server_addr); exit(EXIT_FAILURE); } if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0) { perror("Connect "); exit(EXIT_FAILURE); } printf("\nget ready message chat:\n"); while (1) { FD_ZERO(&rfds); FD_SET(0, &rfds); FD_SET(sockfd, &rfds); maxfd = sockfd; tv.tv_sec = 1; tv.tv_usec = 0; retval = select(maxfd + 1, &rfds, NULL, NULL, &tv); if (retval == -1) { printf("select %s", strerror(errno)); break; } else if (retval == 0) continue; else { if (FD_ISSET(sockfd, &rfds)) { bzero(buffer, MAXBUF + 1); len = recv(sockfd, buffer, MAXBUF, 0); if (len > 0) printf ("recv message:'%s', %d byte recv..\n",buffer, len); else { if (len < 0) printf ("message recv failure\n"); else { printf("server close ,quit\n"); break; } } } if (FD_ISSET(0, &rfds)) { bzero(buffer, MAXBUF + 1); fgets(buffer, MAXBUF, stdin); if (!strncasecmp(buffer, "quit", 4)) { printf("i will quit\n"); break; } len = send(sockfd, buffer, strlen(buffer) - 1, 0); if (len < 0) { printf ("message send failure"); break; } else printf ("send success,%d byte send..\n",len); } } } close(sockfd); return 0; }
TARGET=server SRC = $(wildcard *.cpp *.c) OBJ = $(patsubst %.cpp *.c,%.o,$(SRC)) DEFS = CFLAGS = -g CC =g++ LIBS = -lpthread $(TARGET):$(OBJ) $(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS) .PHONY: clean: rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ make g++ -g -o client client.c -lpthread ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ ./client -s 0.0.0.0 get ready message chat:
#include<stdio.h> #include<sys/types.h> #include<stdlib.h> #include<sys/socket.h> #include<sys/select.h> #include<netinet/in.h> #include<arpa/inet.h> #include<unistd.h> #include<string.h> #define _BACKLOG_ 5 //监听队列里允许等待的最大值 #define MAX_CONNECT 20 int fds[MAX_CONNECT]; //用来存放需要处理的IO事件 int listen_sock = -1; int creat_sock(int port) { int sock = socket(AF_INET,SOCK_STREAM,0); if(sock < 0){ perror("creat_sock error"); exit(1); } struct sockaddr_in local; local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; //inet_addr(0.0.0.0) // 设置允许socket立即重用 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&sock, sizeof(sock)); if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0){ perror("bind"); exit(2); } if(listen(sock,_BACKLOG_) < 0 ){ perror("listen"); exit(4); } return sock; } int accept_sock(){ struct sockaddr_in client; socklen_t len = sizeof(client); int accept_sock = accept(listen_sock, (struct sockaddr *)&client, &len); if (accept_sock < 0) { perror("accept"); exit(5); } printf("connect by a client, ip:%s port:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port)); size_t i = 0; for (; i < MAX_CONNECT; ++i) //将新接受的描述符存入集合中 { if (fds[i] == -1) { fds[i] = accept_sock; break; } } if (i == MAX_CONNECT) { printf("accept is upper limit..\n"); close(accept_sock); } } int groupChat(int sockFd,void* pBuf,int iSize){ for(int index=0;index<MAX_CONNECT;index++){ if(fds[index] == sockFd || fds[index] == listen_sock) { continue; } if(fds[index]!=-1){ printf("write fd:%d..socketFd:%d\n",fds[index],sockFd); write(fds[index],pBuf,iSize); } } } int handle_read(int* socketFd) { int socket = *socketFd; char buf[1024]; memset(buf, '\0', sizeof(buf)); ssize_t size = read(socket, buf, sizeof(buf) - 1); if (size < 0) { perror("read"); exit(6); } else if (size == 0) { printf("client close..\n"); close(socket); *socketFd = -1; } else { printf("client say: %s\n", buf); groupChat(socket, buf, size); } } int main(int argc,char* argv[]) { int default_port = 8000; int optch = 0; while ((optch = getopt(argc, argv, "s:p:")) != -1) { switch (optch) { case 'p': default_port = atoi(optarg); printf("port: %s\n", optarg); break; case '?': printf("Unknown option: %c\n", (char)optopt); break; default: break; } } listen_sock = creat_sock(default_port); size_t fds_num = sizeof(fds)/sizeof(fds[0]); size_t i = 0; for(;i < fds_num;++i) { fds[i] = -1; } int max_fd = listen_sock; fds[0] = listen_sock; fd_set rset; while(1) { FD_ZERO(&rset); FD_SET(listen_sock,&rset); struct timeval timeout = {10 , 0}; size_t i = 0; for(;i < fds_num;++i) { if(fds[i] > 0 ){ FD_SET(fds[i] ,&rset); if(max_fd < fds[i]) { max_fd = fds[i]; } } } switch(select(max_fd+1,&rset,NULL,NULL,&timeout)) { case -1: perror("select"); break; case 0: printf("time out..\n"); break; default: { size_t i = 0; for(;i < fds_num;++i) { //连接请求 //当为 listen_socket 事件就绪的时候,就表明有新的连接请求 if(FD_ISSET(fds[i],&rset) && fds[i] == listen_sock) { accept_sock(); } //普通请求 else if(FD_ISSET(fds[i],&rset) && (fds[i] > 0)) { handle_read(&fds[i]); } else{} } } break; } } return 0; }
makfeile
TARGET=server SRC = $(wildcard *.cpp *.c) OBJ = $(patsubst %.cpp *.c,%.o,$(SRC)) DEFS = CFLAGS = -g CC =g++ LIBS = -lpthread $(TARGET):$(OBJ) $(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS) .PHONY: clean: rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ make g++ -g -o server select.c -lpthread ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select$ ./server connect by a client, ip:127.0.0.1 port:42964
#include <stdio.h> #include <string.h> #include <errno.h> #include <sys/socket.h> #include <resolv.h> #include <stdlib.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <sys/time.h> #include <sys/types.h> #define MAXBUF 1024 #define MAXNAME 64 int main(int argc, char **argv) { int sockfd, len; struct sockaddr_in dest; char buffer[MAXBUF + 1]; fd_set rfds; struct timeval tv; int retval, maxfd = -1; int optch,ret = -1; const char*server_addr; int default_port = 8000; char* clientName = "佚名"; /*判断是否为合法输入 必须传入一个参数:服务器Ip*/ if(argc<3) { printf("usage:tcpcli <IPaddress>"); return 0; } while((optch = getopt(argc, argv, "s:p:n:")) != -1) { switch (optch) { case 's': server_addr = optarg; break; case 'p': default_port = atoi(optarg); printf("port: %s\n", optarg); break; case 'n': clientName = optarg; printf("client Name: %s\n", optarg); break; case '?': printf("Unknown option: %c\n",(char)optopt); break; default: break; } } if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Socket"); exit(EXIT_FAILURE); } bzero(&dest, sizeof(dest)); dest.sin_family = AF_INET; dest.sin_port = htons(default_port); if (inet_aton(server_addr, (struct in_addr *) &dest.sin_addr.s_addr) == 0) { perror(server_addr); exit(EXIT_FAILURE); } if (connect(sockfd, (struct sockaddr *) &dest, sizeof(dest)) != 0) { perror("Connect "); exit(EXIT_FAILURE); } printf("get ready message chat:\n"); while (1) { FD_ZERO(&rfds); FD_SET(0, &rfds); FD_SET(sockfd, &rfds); maxfd = sockfd; tv.tv_sec = 1; tv.tv_usec = 0; retval = select(maxfd + 1, &rfds, NULL, NULL, &tv); if (retval == -1) { printf("select %s", strerror(errno)); break; } else if (retval == 0) continue; else { if (FD_ISSET(sockfd, &rfds)) { bzero(buffer, MAXBUF + 1); len = recv(sockfd, buffer, MAXBUF, 0); if (len > 0) printf ("recv byte %d, %s\n",len, buffer); else { if (len < 0) printf ("message recv failure\n"); else { printf("server close ,quit\n"); break; } } } if (FD_ISSET(0, &rfds)) { char name_msg[MAXNAME + MAXBUF]; bzero(buffer, MAXBUF + 1); fgets(buffer, MAXBUF, stdin); if (!strncasecmp(buffer, "quit", 4)) { printf("i will quit\n"); break; } sprintf(name_msg, "[%s]: %s", clientName, buffer); len = send(sockfd, name_msg, strlen(name_msg) - 1, 0); if (len < 0) { printf ("message send failure"); break; } else printf("send success,%d byte send..\n",len); } } } close(sockfd); return 0; }
makefile
TARGET=server SRC = $(wildcard *.cpp *.c) OBJ = $(patsubst %.cpp *.c,%.o,$(SRC)) DEFS = CFLAGS = -g CC =g++ LIBS = -lpthread $(TARGET):$(OBJ) $(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS) .PHONY: clean: rm -rf *.o $(TARGET)
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/select/client$ ./client -s 0.0.0.0 -n 梦凡 client Name: 梦凡 get ready message chat:
Poll就是监控文件是否可读的一种机制,作用与select一样。
#include <poll.h> int poll(struct pollfd fds[], nfds_t nfds, int timeout);
参数说明
fds:是一个struct pollfd结构类型的数组,列出了我们需要poll()检查的文件描述符
typedef struct pollfd { int fd; /* 需要被检测或选择的文件描述符*/ short events; /* 对文件描述符fd上感兴趣的事件 */ short revents; /* 文件描述符fd上当前实际发生的事件*/ } pollfd_t;
events:想要监听的事件
revents:实际上发生的事件
POLLIN POLLOUT POLLPRI POLLRDHUB POLLHUP POLLERR
指定了fds中元素的个数,nfds_t为无符号整形
决定阻塞行为,一般如下:
-1:一直阻塞到fds数组中有一个达到就绪态或者捕获到一个信号
0:不会阻塞,立即返回
>0:阻塞时间
>0:数组fds中准备好读、写或出错状态的那些socket描述符的总数量;
==0:数组fds中没有任何socket描述符准备好读、写,或出错;此时poll超时
-1: poll函数调用失败
#include <stdio.h> #include <poll.h> #include <string.h> int main() { int timeout = 0; char buf[1024]; struct pollfd fd_poll[1]; //设置只有一个事件 while(1){ fd_poll[0].fd = 0; fd_poll[0].events = POLLIN; fd_poll[0].revents = 0; memset(buf, '\0', sizeof(buf)); switch( poll(fd_poll, 1, -1) ){ case 0: perror("timeout!"); break; case -1: perror("poll"); break; default: { if( fd_poll[0].revents & POLLIN ) { gets(buf); printf("buf : %s\n",buf); } } break; } } return 0; }
makefile
tcp_poll:tcp_poll.c gcc -o $@ $^ .PHONY:clean clean: rm -f tcp_poll
epoll没有对描述符数目的限制,它所支持的文件描述符上限是整个系统最大可以打开的文件数目,例如,在1GB内存的机器上,这个限制大概为10万左右。
epoll只有 epoll_create
、epoll_ctl
和 epoll_wait
这三个系统调用。
第一步,创建一个 epoll 句柄
第二步,向内核添加、修改或删除要监控的文件描述符。
第三步,发起了 select() 调用
其定义如下:
#include <sys/epoll.h> int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
#include <sys/epoll.h> int epoll_create(int size);
调用epoll_create方法创建一个epoll的句柄,使用完epoll后使用close函数进行关闭
#include <sys/epoll.h> int epoll_ctl(int epfd //第一个参数epfd:epoll_create函数的返回值。 , int op //第二个参数events:表示动作类型。有三个宏来表示 , int fd //第三个参数fd:需要监听的fd。 , struct epoll_event *event);//第四个参数event:告诉内核需要监听什么事件。
op:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从 epfd 中删除一个 fd。
fd:需要注册监视对象文件描述符
// 感兴趣的事件和被触发的事件 struct epoll_event { __uint32_t events; // Epoll events epoll_data_t data; // User data variable }; // 保存触发事件的某个文件描述符相关的数据 typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
EPOLLIN:表示对应的文件描述符可读(包括对端Socket);
EPOLLOUT:表示对应的文件描述符可写;
EPOLLPRI:表示对应的文件描述符有紧急数据可读(带外数据);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered),这是相对于水平触发(Level Triggered)而言的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket,需要再次添加
例如:
struct epoll_event ep_ev; int accept_sock = accept(listen_sock,(struct sockaddr*)&remote,&len); ep_ev.events = EPOLLIN | EPOLLET; ep_ev.data.fd = accept_sock; epoll_ctl(epoll_fd,EPOLL_CTL_ADD,accept_sock,&ep_ev)
收集在epoll监控的事件中已经发生的事件
#include <sys/epoll.h> int epoll_wait(int epfd //第一个参数epfd:epoll_create函数的返回值。 , struct epoll_event *events , int maxevents , int timeout); //超时时间(毫秒)
第一个参数epfd:epoll_create函数的返回值。
第二个参数events:是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据赋值到这个event数组中,不会去帮助我们在用户态分配内存)
第三个参数maxevents:maxevents告诉内核这个events数组有多大,这个maxevents的值不能大于创建epoll_create时的size。
第四个参数:是超时时间(毫秒),如果函数调用成功,则返回对应IO上已准备好的文件描述符数目,如果返回0则表示已经超时。
基于epoll的简单回显服务器
#include<stdio.h> #include<unistd.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<sys/epoll.h> #include<fcntl.h> #include<stdlib.h> #include<string.h> int listen_sock = -1; int epoll_fd = -1; //设置非阻塞 int set_noblock(int sock) { int opts = fcntl(sock,F_GETFL); return fcntl(sock,F_SETFL,opts | O_NONBLOCK); } int creat_socket(int port) { int sock = socket(AF_INET,SOCK_STREAM,0); if(sock < 0){ perror("socket"); exit(2); } //调用setsockopt使当server先断开时避免进入 TIME_WAIT 状态,\ 将其属性设定为SO_REUSEADDR,使其地址信息可被重用 int opt = 1; if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)) < 0){ perror("setsockopt"); exit(3); } struct sockaddr_in local; local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; //inet_addr("0.0.0.0"); if( bind(sock,(struct sockaddr*)&local,sizeof(local)) < 0 ){ perror("bind"); exit(4); } if(listen(sock,5) < 0){ perror("listen"); exit(5); } printf("listen port %d..\n",port); return sock; } int accept_socket(){ struct sockaddr_in remote; socklen_t len = sizeof(remote); int accept_sock = accept(listen_sock, (struct sockaddr *)&remote, &len); if (accept_sock < 0) { perror("accept"); return -1; } printf("accept a client..[ip]: %s,[port]: %d\n", inet_ntoa(remote.sin_addr), ntohs(remote.sin_port)); //将新的事件添加到epoll集合中 struct epoll_event ep_ev; ep_ev.events = EPOLLIN | EPOLLET; // edge边沿触发,只触发一次 ep_ev.data.fd = accept_sock; set_noblock(accept_sock); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_sock, &ep_ev) < 0) { perror("epoll_ctl"); close(accept_sock); return -1; } return 0; } int handle_request(int socketFd){ //申请空间同时存文件描述符和缓冲区地址 char buf[102400]; memset(buf, '\0', sizeof(buf)); ssize_t _s = recv(socketFd, buf, sizeof(buf) - 1, 0); if (_s < 0) { perror("recv"); return -1; } else if (_s == 0) { printf("remote close..\n"); //远端关闭了,进行善后 epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL); close(socketFd); } else { //读取成功,输出数据 printf("client# %s", buf); fflush(stdout); //将事件改写为关心事件,进行回写 struct epoll_event ep_ev; ep_ev.data.fd = socketFd; ep_ev.events = EPOLLOUT | EPOLLET; //在epoll实例中更改同一个事件,触发socket可写事件 epoll_ctl(epoll_fd, EPOLL_CTL_MOD, socketFd, &ep_ev); } return 0; } int handle_response(int socketFd){ const char *msg = "HTTP/1.1 200 OK \r\n\r\n<h1> hi girl </h1>\r\n"; send(socketFd, msg, strlen(msg), 0); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socketFd, NULL); close(socketFd); } int main(int argc,char *argv[]) { int default_port = 8000; int optch = 0; while ((optch = getopt(argc, argv, "s:p:")) != -1) { switch (optch) { case 'p': default_port = atoi(optarg); printf("port: %s\n", optarg); break; case '?': printf("Unknown option: %c\n", (char)optopt); break; default: break; } } listen_sock = creat_socket(default_port); epoll_fd = epoll_create(256); if(epoll_fd < 0){ perror("epoll creat"); exit(6); } struct epoll_event ep_ev; ep_ev.events = EPOLLIN; //数据的读取 ep_ev.data.fd = listen_sock; //添加关心的事件 if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ep_ev) < 0){ perror("epoll_ctl"); exit(7); } struct epoll_event ready_ev[128]; //申请空间来放就绪的事件。 int maxnum = 128; int timeout = -1; //设置超时时间,若为-1,则永久阻塞等待。 int ret = 0; int done = 0; while(!done){ switch(ret = epoll_wait(epoll_fd,ready_ev,maxnum,timeout)){ case -1: perror("epoll_wait"); break; case 0: printf("time out...\n"); break; default://至少有一个事件就绪 { int i = 0; for(;i < ret;++i) { //判断是否为监听套接字,是的话 accept int fd = ready_ev[i].data.fd; if((fd == listen_sock) && (ready_ev[i].events & EPOLLIN)){ accept_socket(); } else{//普通IO if(ready_ev[i].events & EPOLLIN){ handle_request(fd); }else if(ready_ev[i].events & EPOLLOUT){ handle_response(fd); } } } } break; } } close(listen_sock); return 0; }
makefile
TARGET=server SRC = $(wildcard *.cpp *.c) OBJ = $(patsubst %.cpp *.c,%.o,$(SRC)) DEFS = CFLAGS = -g CC =g++ LIBS = -lpthread $(TARGET):$(OBJ) $(CC) $(CFLAGS) $(DEFS) -o $@ $^ $(LIBS) .PHONY: clean: rm -rf *.o $(TARGET)
make
ubuntu@VM-16-5-ubuntu:~/learnbase/IO复用/epoll$ ./server listen port 8000
浏览器输入:http://服务器ip:8000/例如,http://49.234.35.128:8000/