事件循环池是一个单例类,管理着EventPoller
EventPollerPool::EventPollerPool() { auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true); InfoL << "创建EventPoller个数:" << size; }
根据s_pool_size的大小,实例化s_pool_size个EventPoller,在调用runLoop时创建不同的线程
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) { auto cpus = thread::hardware_concurrency(); size = size > 0 ? size : cpus; for (size_t i = 0; i < size; ++i) { EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority)); poller->runLoop(false, register_thread);//这里创建不同的线程 auto full_name = name + " " + to_string(i); poller->async([i, cpus, full_name]() { setThreadName(full_name.data()); setThreadAffinity(i % cpus); }); _threads.emplace_back(std::move(poller)); } return size; }
在调用runLoop时blocked传递的为false,因此是在函数else分支创建的线程,然后将runLoop函数传入线程,并将参数blocked 修改为true,
这样创建了不同的线程执行runLoop,事件循环以多路复用epoll的模型建立轮巡过程,epoll原理不在多描述
void EventPoller::runLoop(bool blocked,bool regist_self) { if (blocked) { ...... } else { _loop_thread = new thread(&EventPoller::runLoop, this, true, regist_self); _sem_run_started.wait(); } }
通过addEvent将需要监听的fd加入epoll管理,
如果是当前线程,则将事件加入_event_map去管理当epoll事件触发时,又会将事件epoll触发事件回调出去,也就是传入的PollEventCB
如果不是当前线程,异步处理,其实就是利用管道将事件在当前线程处理,管道处理过程可以操控2
int EventPoller::addEvent(int fd, int event, PollEventCB cb) { TimeTicker(); if (!cb) { WarnL << "PollEventCB 为空!"; return -1; } if (isCurrentThread()) { struct epoll_event ev = {0}; ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE; ev.data.fd = fd; int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (ret == 0) { _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb))); } return ret; } async([this, fd, event, cb]() { addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb))); }); return 0; }
程序对管道进行了包装,同时在构造函数中构建了管道
PipeWrap _pipe; PipeWrap::PipeWrap(){ //创建管道 if (pipe(_pipe_fd) == -1) { throw runtime_error(StrPrinter << "create posix pipe failed:" << get_uv_errmsg());\ } SockUtil::setNoBlocked(_pipe_fd[0],true); SockUtil::setNoBlocked(_pipe_fd[1],false); SockUtil::setCloExec(_pipe_fd[0]); SockUtil::setCloExec(_pipe_fd[1]); }
在EventPoller构造的时候,就已经将管道加入了epoll管理,并且回调为onPipeEvent
EventPoller::EventPoller(ThreadPool::Priority priority ) { ...... //添加内部管道事件 if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) { throw std::runtime_error("epoll添加管道失败"); } }
onPipeEvent定义如下,当触发时,会将_list_task交换到一个临时变量中,然后执行其中的任务,本质是处理异步任务的:在其他线程将任务添加进任务列表,同时写管道,epoll会在本线程监听到该管道的读事件,再从任务列表中取出任务去执行。从而达到异步处理任务的目的
inline void EventPoller::onPipeEvent() { TimeTicker(); char buf[1024]; int err = 0; do { if (_pipe.read(buf, sizeof(buf)) > 0) { continue; } err = get_uv_error(true); } while (err != UV_EAGAIN); decltype(_list_task) _list_swap; { lock_guard<mutex> lck(_mtx_task); _list_swap.swap(_list_task); } _list_swap.for_each([&](const Task::Ptr &task) { try { (*task)(); } catch (ExitException &) { _exit_flag = true; } catch (std::exception &ex) { ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what(); } }); }
程序启动会创建一个事件循环池,大小可以自定义,默认根据硬件性能自动分配,每个事件循环类拥有自己的独立的线程,事件循环是通过多路复用epoll模式去实现,监听sock的事件接收网络数据,监听管道的事件执行异步任务