看到Redis源码中主从复制的源码,对某些逻辑不是很确定。梳理了Redis非阻塞connect的大概实现之后,自己写了一个简单的版本。
主要流程:
#include <arpa/inet.h> #include <assert.h> #include <errno.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <strings.h> #include <sys/epoll.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #define AF_ERR -1 #define MAX_EVENTS 1024 static void epoll_ctl_add(int epfd, int fd, int evts) { struct epoll_event ev; ev.events = evts; ev.data.fd = fd; int err = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); assert(!err); } /* check socket status*/ void connectionEstablished(int fd) { int sockerr = 0; socklen_t errlen = sizeof(sockerr); getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen); assert(sockerr == 0); printf("connection done.\n"); } void handle_events(struct epoll_event* e, int epfd) { printf("events %d: ", e->data.fd); if (e->events & EPOLLOUT) { printf("EPOLLOUT "); connectionEstablished(e->data.fd); } } /* non-blocking-connect */ int connect(const char* ip, int port) { struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int s = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (connect(s, (struct sockaddr*)&address, sizeof(address)) == -1) { if (errno == EINPROGRESS) { goto end;; } close(s); s = AF_ERR; } end: return s; } int main() { int fd = connect("127.0.0.1", 8888); if (fd == -1) { printf("connect failed\n"); return 1; } int epfd; struct epoll_event events[MAX_EVENTS]; epfd = epoll_create1(0); assert(epfd != -1); epoll_ctl_add(epfd, fd, EPOLLOUT); int n = epoll_wait(epfd, events, MAX_EVENTS, -1); assert(n != -1); for (int i = 0; i < n; i++) { handle_events(&events[i], epfd); } close(fd); return 0; }
使用nc -l 8888当服务端,测试发现确实是可以通过监听POLLOUT事件来判断connect成功的
// src int connectWithMaster(void) { int fd; /* 从服务器作为client,执行connect(2)连接到master */ fd = anetTcpNonBlockBindConnect(NULL, server.masterhost,server.masterport,REDIS_BIND_ADDR); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } /* 监听读写事件,设置事件处理回调函数为syncWithMaster */ if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; server.repl_state = REDIS_REPL_CONNECTING; return REDIS_OK; } void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REDIS_REPL_NONE) { close(fd); return; } /* Check for errors in the socket. */ /* 检查socket状态 */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", strerror(sockerr)); goto error; } /* If we were connecting, it's time to send a non blocking PING, we want to * make sure the master is able to reply before going into the actual * replication process where we have long timeouts in the order of * seconds (in the meantime the slave would block). */ /* 建立连接后首先给master发PING,确保两端读写正常和master可以正确处理命令 因为从服务器注册了RD and WR,而非阻塞connect(2)会触发EPOLLOUT,所以会执行第一步 */ if (server.repl_state == REDIS_REPL_CONNECTING) { redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ /* 这一步之后WR事件就可以取消 */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REDIS_REPL_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ syncWrite(fd,"PING\r\n",6,100); return; } /* 对读事件的监听 */ /* Receive the PONG command. */ if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { char buf[1024]; // ... } }
继续看一下Redis非阻塞IO的实现:
// src/netdb.h #define ANET_CONNECT_NONE 0 #define ANET_CONNECT_NONBLOCK 1 static int anetTcpGenericConnect(char *err, char *addr, int port, char *source_addr, int flags) { // ... for (p = servinfo; p != NULL; p = p->ai_next) { if (connect(s,p->ai_addr,p->ai_addrlen) == -1) { /* If the socket is non-blocking, it is ok for connect() to * return an EINPROGRESS error here. */ if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) goto end; close(s); s = ANET_ERR; continue; } goto end; // ...