网络服务器端
,用了处理高并发网络 IO
请求的一种编程模型
。
处理 3 类事件:
连接事件
:客户端→服务器的连接请求,对应服务端的连接事件写事件
:客户端→服务器的读请求,服务端处理后要写回客户端,对应服务端的写事件读事件
:服务端要从客户端读取请求内容,对应服务端的读事件3 个关键角色:
acceptor
:处理连接事件,接收连接、创建 handlerhandler
:处理读写事件reactor
:专门监听和分配事件,连接请求 → acceptor、读写请求 → handler事件驱动框架就是 Reactor 的具体实现。包括:
事件初始化
:创建要监听的事件类型,及该类事件对应的 handler事件捕获、分发和处理主循环
:
实现代码:
ae.h
ae.c
Redis 的事件驱动框架定义了 2 类事件:
IO 事件
时间事件
下面介绍 IO 事件 aeFileEvent 的数据结构:
/* File event structure */ typedef struct aeFileEvent { // 事件类型的掩码,AE_(READABLE|WRITABLE|BARRIER) int mask; // AE_READABLE 事件的处理函数 aeFileProc *rfileProc; // AE_WRITABLE 事件的处理函数 aeFileProc *wfileProc; // 指向客户端私有数据 void *clientData; } aeFileEvent;
是在 Redis 初始化时调用的,详见 Redis 源码简洁剖析 07 - main 函数启动。
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; // 循环调用 while (!eventLoop->stop) { // 核心函数,处理事件的逻辑 aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
代码非常简单,就是循环调用 aeProcessEvents 函数。aeMain 是在 main 函数中被调用的:
// 事件驱动框架,循环处理各种触发的事件 aeMain(server.el); // 循环结束,删除 eventLoop aeDeleteEventLoop(server.el);
主体有 3 个 if 分支:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* 若没有事件处理,则立刻返回*/ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /*如果有 IO 事件发生,或者紧急的时间事件发生,则开始处理*/ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { … } /* 检查是否有时间事件,若有,则调用 processTimeEvents 函数处理 */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); /* 返回已经处理的文件或时间*/ return processed; }
核心是第 2 个 if 语句:
// 有 IO 事件发生 || 紧急时间事件发生 if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { …… // 调用 aeApiPoll 捕获事件 numevents = aeApiPoll(eventLoop, tvp); …… }
aeApiPoll 函数如何捕获事件?依赖于操作系统底层提供的 IO 多路复用
机制,实现事件捕获,检查是否有新的连接、读写事件的发生。为了适配不同的操作系统,Redis 对不同操作系统实现网络 IO 多路复用函数,进行统一封装,封装后的代码在 4 个文件中实现:
ae_epoll.c
中 aeApiPoll 函数的实现,核心是调用了 epoll_wait
函数,并将 epoll 返回的事件信息保存起来。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 调用 epoll_wait 获取监听到的事件 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); if (retval > 0) { int j; // 获取监听到的事件数量 numevents = retval; // 处理每个事件 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events + j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE | AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE | AE_READABLE; // 保存事件信息 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
在 Mac 上查看源码,aeApiPoll 方法会进入 ae_kqueue.c
中:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; if (tvp != NULL) { struct timespec timeout; timeout.tv_sec = tvp->tv_sec; timeout.tv_nsec = tvp->tv_usec * 1000; retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout); } else { retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL); } if (retval > 0) { int j; /* Normally we execute the read event first and then the write event. * When the barrier is set, we will do it reverse. * * However, under kqueue, read and write events would be separate * events, which would make it impossible to control the order of * reads and writes. So we store the event's mask we've got and merge * the same fd events later. */ for (j = 0; j < retval; j++) { struct kevent *e = state->events+j; int fd = e->ident; int mask = 0; if (e->filter == EVFILT_READ) mask = AE_READABLE; else if (e->filter == EVFILT_WRITE) mask = AE_WRITABLE; addEventMask(state->eventsMask, fd, mask); } /* Re-traversal to merge read and write events, and set the fd's mask to * 0 so that events are not added again when the fd is encountered again. */ numevents = 0; for (j = 0; j < retval; j++) { struct kevent *e = state->events+j; int fd = e->ident; int mask = getEventMask(state->eventsMask, fd); if (mask) { eventLoop->fired[numevents].fd = fd; eventLoop->fired[numevents].mask = mask; resetEventMask(state->eventsMask, fd); numevents++; } } } return numevents; }
main 函数中调用了 createSocketAcceptHandler
:
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); }
而 createSocketAcceptHandler
创建接收连接的 handler:
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) { int j; for (j = 0; j < sfd->count; j++) { if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) { /* Rollback */ for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); return C_ERR; } } return C_OK; }
其主要是调用了 aeCreateFileEvent
,aeCreateFileEvent 就是实现事件和处理函数注册的核心函数。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { // 错误处理 if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; // 核心 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
Linux 提供了 epoll_ctl API,用于增加新的观察事件。而 Redis 在此基础上,封装了 aeApiAddEvent 函数,对 epoll_ctl 进行调用,注册希望监听的事件和相应的处理函数。
ae_epoll.c 中 aeApiAddEvent 实现如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; // 增加新的观察事件 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
注册的函数 acceptTcpHandler 在 network.c
中:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); // 每次处理 1000 个 while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } anetCloexec(cfd); serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } }
Redis 处理连接、客户端请求是单线程的,但是这单个线程能够处理上千个客户端,就是因为 Redis 是基于 Reactor 模型的。通过事件驱动框架,Redis 可以使用一个循环不断捕获、分发、处理客户端产生的网络连接、数据读写事件。
当然这里有一个前提,就是 Redis 几乎所有数据读取和处理都是在内存中操作的,服务端对单个客户端的读写请求处理时间极短。
最简洁的 Redis 源码剖析系列文章
Java 编程思想-最全思维导图-GitHub 下载链接,需要的小伙伴可以自取~
原创不易,希望大家转载时请先联系我,并标注原文链接。