阅读redis的源码永远也绕不过它的启动。我们来看下redis的启动流程。不想看代码可以直接看最后的流程图。
以下源码分析是redis的5.0分支 源码注释:https://github.com/yxkong/redis/commits/5.0
这是启动流程的核心代码。
int main(int argc, char **argv) { //申请空间oom后的处理器 zmalloc_set_oom_handler(redisOutOfMemoryHandler); //哨兵模式 server.sentinel_mode = checkForSentinelMode(argc,argv); //初始化服务器配置(默认值) initServerConfig(); if (argc >= 2) { //将配置文件的内容填充到server中 loadServerConfig(configfile,options); } //守护进程 int background = server.daemonize && !server.supervised; if (background) daemonize(); //初始化server服务 initServer(); InitServerLast(); aeMain(server.el); }
initServerConfig中只是给变量赋值了默认值
void initServerConfig(void) { //初始化锁 pthread_mutex_init(&server.next_client_id_mutex,NULL); pthread_mutex_init(&server.lruclock_mutex,NULL); pthread_mutex_init(&server.unixtime_mutex,NULL); //数据库配置填充 //淘汰策略 //持久化配置 //将预定义的命令填充到server.commands populateCommandTable();
将配置文件的内容填充到server中覆盖初始化变量 在config.c中
void loadServerConfig(char *filename, char *options) { //读取配置到config while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL) config = sdscat(config,buf); //将配置填充到server,这里是各种if else逻辑 loadServerConfigFromString(config); }
这里是重头戏
void initServer(void) { //创建共享对象 createSharedObjects(); //创建事件监听器,maxclients默认10000个 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); //socket监听,绑定几个ip,就监听几个,默认监听ipv6和ipv4 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); //初始化化数据库的属性 for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); } /** * @brief 创建时间处理器,并将serverCron放入处理器里(重要) * 在这里创建了aeTimeEvent并扔给了eventLoop->timeEventHead */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } /** * @brief 重点 ########## * 监听多少个tcp就创建多少个 */ for (j = 0; j < server.ipfd_count; j++) { //将acceptTcpHandler 放入文件监听器里, if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } //慢日志初始化 slowlogInit(); //monitor初始化 latencyMonitorInit(); }
在createSharedObjects()里主要创建了共享对象,包括一些异常提示,服务交互指令,共享int等
在ae.c中,我们看下aeCreateEventLoop
/** * @brief 创建事件监听器 * * @param setsize 比配置的最大链接数要多96,为了安全处理(比如有的处理完了,还没有释放,多创建的就相当于缓冲队列了) * @return aeEventLoop* */ aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; //分配空间 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; //创建对应数量的aeFileEvent空间 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); //创建对应数量的aeFiredEvent空间,此处承载的是触发事件(从epoll里读取后会放入这里) eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); //时间事件链表头节点(只有一个) eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; //创建一个aeApiState 给eventLoop(不同的系统实现不同,aeApiState也不同) if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
在ae.c中,看下aeCreateFileEvent
** * @brief 创建文件事件监听器并放入到eventLoop->events中 * 注册acceptTcpHandler处理AE_READABLE和AE_WRITABLE * @param eventLoop * @param fd 对应tcp socket的fd值 * @param mask * @param proc 处理器acceptTcpHandler * @param clientData * @return int */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } // 将创建的aeFileEvent 赋值给了&eventLoop->events[fd] aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; // 将处理器给rfileProc和wfileProc(后续会拿着这个直接执行) if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
void InitServerLast() { bioInit(); server.initial_memory_usage = zmalloc_used_memory(); } bioInit() 在 bio.c中 /** * @brief 初始化后台线程 * */ void bioInit(void) { /** * @brief 根据后台线程的数量创建线程 * */ for (j = 0; j < BIO_NUM_OPS; j++) { //初始化一个NUll 的互斥锁 pthread_mutex_init(&bio_mutex[j],NULL); //新任务条件变量 pthread_cond_init(&bio_newjob_cond[j],NULL); //下一个执行的条件变量 pthread_cond_init(&bio_step_cond[j],NULL); //创建一个list bio_jobs[j] = listCreate(); //待处理 bio_pending[j] = 0; } /** * @brief 创建后端线程,并放入线程组 * */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; //创建线程,并启动bioProcessBackgroundJobs if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } }
在ae.c中
/** * @brief 执行eventLoop * * @param eventLoop */ void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; //只要没有停止,就循环执行,这个是主线程 while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } /** * @brief 处理eventLoop里等待的 定时任务,文件事件,过期事件 * @param eventLoop * @param flags 事件类型, * 从main中过来是所有的事件, * 从networking过来是文件事件 * @return int */ int aeProcessEvents(aeEventLoop *eventLoop, int flags){ //先从eventLoop取下一个要执行的定时器,如果下一次定时还有一定的时间间隔,那么就让epoll阻塞间隔的时间获取数据,否则则立即执行 /** * @brief 从epoll里拿到numevents数量的数据 * 有时间,就阻塞指定的时间,没有时间,直到有数据 */ numevents = aeApiPoll(eventLoop, tvp); /** * @brief 执行触发的事件 * */ for (j = 0; j < numevents; j++) { //获取一个事件处理器 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; //处理读事件,此处的rfileProc和wfileProc 可以直接理解为acceptTcpHandler if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } } //处理定时任务 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); //处理完以后进入另一次循环 return processed; /* return the number of processed file/time events */ }
ae_kqueue.c中
/** * @brief 单位时间内获取事件数量 * * @param eventLoop * @param tvp 在单位时间内 * @return int 返回待处理的事件数量 */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; if (tvp != NULL) { struct timespec timeout; timeout.tv_sec = tvp->tv_sec; timeout.tv_nsec = tvp->tv_usec * 1000; retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout); } else { /** * @brief 获取已经就绪的文件描述符数量 * timeout指针为空,那么kevent()会永久阻塞,直到事件发生 */ retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL); } //将事件填充到eventLoop->fired中 if (retval > 0) { int j; numevents = retval; for(j = 0; j < numevents; j++) { int mask = 0; struct kevent *e = state->events+j; if (e->filter == EVFILT_READ) mask |= AE_READABLE; if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->ident; eventLoop->fired[j].mask = mask; } } return numevents; }
终极图