Java教程

ZLMediaKit 服务器源码解读---事件循环

本文主要是介绍ZLMediaKit 服务器源码解读---事件循环,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一:事件循环池类

事件循环池是一个单例类,管理着EventPoller

1、EventPollerPool构造函数
EventPollerPool::EventPollerPool() {
    auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true);
    InfoL << "创建EventPoller个数:" << size;
}
2:循环池的添加EventPoller

根据s_pool_size的大小,实例化s_pool_size个EventPoller,在调用runLoop时创建不同的线程

size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) {
    auto cpus = thread::hardware_concurrency();
    size = size > 0 ? size : cpus;
    for (size_t i = 0; i < size; ++i) {
        EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority));
        poller->runLoop(false, register_thread);//这里创建不同的线程
        auto full_name = name + " " + to_string(i);
        poller->async([i, cpus, full_name]() {
            setThreadName(full_name.data());
            setThreadAffinity(i % cpus);
        });
        _threads.emplace_back(std::move(poller));
    }
    return size;
}

在调用runLoop时blocked传递的为false,因此是在函数else分支创建的线程,然后将runLoop函数传入线程,并将参数blocked 修改为true,
这样创建了不同的线程执行runLoop,事件循环以多路复用epoll的模型建立轮巡过程,epoll原理不在多描述

void EventPoller::runLoop(bool blocked,bool regist_self) {
    if (blocked) {
       	......
    } else {
        _loop_thread = new thread(&EventPoller::runLoop, this, true, regist_self);
        _sem_run_started.wait();
    }
}

二:EventPoller(事件轮巡)

通过epoll监听2种类型文件描述符
1、socket的描述符,主要是监听不同客户端发来的数据

通过addEvent将需要监听的fd加入epoll管理,
如果是当前线程,则将事件加入_event_map去管理当epoll事件触发时,又会将事件epoll触发事件回调出去,也就是传入的PollEventCB
如果不是当前线程,异步处理,其实就是利用管道将事件在当前线程处理,管道处理过程可以操控2

int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
    TimeTicker();
    if (!cb) {
        WarnL << "PollEventCB 为空!";
        return -1;
    }

    if (isCurrentThread()) {
        struct epoll_event ev = {0};
        ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
        ev.data.fd = fd;
        int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
        if (ret == 0) {
            _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
        }
        return ret;
    }

    async([this, fd, event, cb]() {
        addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));
    });
    return 0;
}
2、内部的自定义事件,利用linux管道模拟文件,从而进行监听,

程序对管道进行了包装,同时在构造函数中构建了管道

PipeWrap _pipe;
PipeWrap::PipeWrap(){
	//创建管道
    if (pipe(_pipe_fd) == -1) {
        throw runtime_error(StrPrinter << "create posix pipe failed:" << get_uv_errmsg());\
    }
    SockUtil::setNoBlocked(_pipe_fd[0],true);
    SockUtil::setNoBlocked(_pipe_fd[1],false);
    SockUtil::setCloExec(_pipe_fd[0]);
    SockUtil::setCloExec(_pipe_fd[1]);
}

在EventPoller构造的时候,就已经将管道加入了epoll管理,并且回调为onPipeEvent

EventPoller::EventPoller(ThreadPool::Priority priority ) {
	......
    //添加内部管道事件
    if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
        throw std::runtime_error("epoll添加管道失败");
    }
}

onPipeEvent定义如下,当触发时,会将_list_task交换到一个临时变量中,然后执行其中的任务,本质是处理异步任务的:在其他线程将任务添加进任务列表,同时写管道,epoll会在本线程监听到该管道的读事件,再从任务列表中取出任务去执行。从而达到异步处理任务的目的

inline void EventPoller::onPipeEvent() {
    TimeTicker();
    char buf[1024];
    int err = 0;
    do {
        if (_pipe.read(buf, sizeof(buf)) > 0) {
            continue;
        }
        err = get_uv_error(true);
    } while (err != UV_EAGAIN);

    decltype(_list_task) _list_swap;
    {
        lock_guard<mutex> lck(_mtx_task);
        _list_swap.swap(_list_task);
    }

    _list_swap.for_each([&](const Task::Ptr &task) {
        try {
            (*task)();
        } catch (ExitException &) {
            _exit_flag = true;
        } catch (std::exception &ex) {
            ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what();
        }
    });
}

三:总结

程序启动会创建一个事件循环池,大小可以自定义,默认根据硬件性能自动分配,每个事件循环类拥有自己的独立的线程,事件循环是通过多路复用epoll模式去实现,监听sock的事件接收网络数据,监听管道的事件执行异步任务

这篇关于ZLMediaKit 服务器源码解读---事件循环的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!