2021SC@SDUSC
本周主要研究server的启动。
在构造了一个server后,start:
int WFServerBase::start(int family, const char *host, unsigned short port, const char *cert_file, const char *key_file) { struct addrinfo hints = { .ai_flags = AI_PASSIVE, // key .ai_family = family, .ai_socktype = SOCK_STREAM, }; struct addrinfo *addrinfo; char port_str[PORT_STR_MAX + 1]; int ret; ... snprintf(port_str, PORT_STR_MAX + 1, "%d", port); getaddrinfo(host, port_str, &hints, &addrinfo); start(addrinfo->ai_addr, (socklen_t)addrinfo->ai_addrlen, cert_file, key_file); freeaddrinfo(addrinfo); ... }
此时start的是:
int ::start(const struct sockaddr *bind_addr, socklen_t addrlen, const char *cert_file, const char *key_file) { ... this->init(bind_addr, addrlen, cert_file, key_file); this->scheduler->bind(this); ... }
这里init了WFServerBase,然后bend,进入server的基本流程。
bind部分代码:
int Communicator::bind(CommService *service) { struct poller_data data; sockfd = this->nonblock_listen(service); service->listen_fd = sockfd; service->ref = 1; data.operation = PD_OP_LISTEN; data.fd = sockfd; data.accept = Communicator::accept; data.context = service; data.result = NULL; mpoller_add(&data, service->listen_timeout, this->mpoller); }
从代码中可以看出,bind和listen操作打包到一起了,相关代码如下:
int Communicator::nonblock_listen(CommService *service) { int sockfd = service->create_listen_fd(); __set_fd_nonblock(sockfd) __bind_and_listen(sockfd, service->bind_addr, service->addrlen); }
static int __bind_and_listen(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { ... bind(sockfd, addr, addrlen); ... return listen(sockfd, SOMAXCONN); }
然后将listen操作加入epoll监听
static void *__poller_thread_routine(void *arg) { ... case PD_OP_LISTEN: __poller_handle_listen(node, poller); break; ... }
epoll检测到listen时
static void __poller_handle_listen(struct __poller_node *node, poller_t *poller) { ... while (1) { ... // 1. 这里调用了accept建立连接 sockfd = accept(node->data.fd, (struct sockaddr *)&ss, &len); // data.accept = Communicator::accept; // 2. 调用Communicator::accept,初始化 p = node->data.accept((const struct sockaddr *)&ss, len, sockfd, node->data.context); res->data = node->data; res->data.result = p; res->error = 0; res->state = PR_ST_SUCCESS; // .callback = Communicator::callback, /* void Communicator::callback(struct poller_result *res, void *context) { Communicator *comm = (Communicator *)context; msgqueue_put(res, comm->queue); } */ // 放回结果到msgqueue中 poller->cb((struct poller_result *)res, poller->ctx); res = (struct __poller_node *)malloc(sizeof (struct __poller_node)); node->res = res; if (!res) break; } if (__poller_remove_node(node, poller)) return; ... }
epoll检测到listen 时,将epoll中listen事件带的callback执行,然后将结果写入msgqueue中。然后到了消费者流程
void Communicator::handler_thread_routine(void *context) { case PD_OP_LISTEN: comm->handle_listen_result(res); break; }
void Communicator::handle_listen_result(struct poller_result *res) { CommService *service = (CommService *)res->data.context; ... case PR_ST_SUCCESS: target = (CommServiceTarget *)res->data.result; entry = this->accept_conn(target, service); res->data.operation = PD_OP_READ; res->data.message = NULL; timeout = target->response_timeout; ... if (res->data.operation != PD_OP_LISTEN) { res->data.fd = entry->sockfd; res->data.ssl = entry->ssl; res->data.context = entry; if (mpoller_add(&res->data, timeout, this->mpoller) >= 0) { if (this->stop_flag) mpoller_del(res->data.fd, this->mpoller); break; } } ... }
整个流程产生CommConnEntry,然把read事件放进epoll进行监听,因为建立连接,需要等待对方发消息。
以下代码产生CommConnEntry,并将信息保存下来。
struct CommConnEntry *Communicator::accept_conn(CommServiceTarget *target, CommService *service) { __set_fd_nonblock(target->sockfd); size = offsetof(struct CommConnEntry, mutex); entry = (struct CommConnEntry *)malloc(size); entry->conn = service->new_connection(target->sockfd); entry->seq = 0; entry->mpoller = this->mpoller; entry->service = service; entry->target = target; entry->ssl = NULL; entry->sockfd = target->sockfd; entry->state = CONN_STATE_CONNECTED; entry->ref = 1; }