这里不需要重点看,主要看下cancelReplicationHandshake就行:
#include "server.h" #include "cluster.h" #include "bio.h" #include <sys/time.h> #include <unistd.h> #include <fcntl.h> #include <sys/socket.h> #include <sys/stat.h> void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); void putSlaveOnline(client *slave); int cancelReplicationHandshake(int reconnect); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case * the instance is configured to have no persistence. */ int RDBGeneratedByReplication = 0; /* --------------------------- Utility functions ---------------------------- */ /* Return the pointer to a string representing the slave ip:listening_port * pair. Mostly useful for logging, since we want to log a slave using its * IP address and its listening port which is more clear for the user, for * example: "Closing connection with replica 10.1.2.3:6380". */ char *replicationGetSlaveName(client *c) { static char buf[NET_HOST_PORT_STR_LEN]; char ip[NET_IP_STR_LEN]; ip[0] = '\0'; buf[0] = '\0'; if (c->slave_addr || connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1) { char *addr = c->slave_addr ? c->slave_addr : ip; if (c->slave_listening_port) anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port); else snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",addr); } else { snprintf(buf,sizeof(buf),"client id #%llu", (unsigned long long) c->id); } return buf; } /* Plain unlink() can block for quite some time in order to actually apply * the file deletion to the filesystem. This call removes the file in a * background thread instead. We actually just do close() in the thread, * by using the fact that if there is another instance of the same file open, * the foreground unlink() will only remove the fs name, and deleting the * file's storage space will only happen once the last reference is lost. */ int bg_unlink(const char *filename) { int fd = open(filename,O_RDONLY|O_NONBLOCK); if (fd == -1) { /* Can't open the file? Fall back to unlinking in the main thread. */ return unlink(filename); } else { /* The following unlink() removes the name but doesn't free the * file contents because a process still has it open. */ int retval = unlink(filename); if (retval == -1) { /* If we got an unlink error, we just return it, closing the * new reference we have to the file. */ int old_errno = errno; close(fd); /* This would overwrite our errno. So we saved it. */ errno = old_errno; return -1; } bioCreateCloseJob(fd); return 0; /* Success. */ } } /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); server.repl_backlog = zmalloc(server.repl_backlog_size); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ server.repl_backlog_off = server.master_repl_offset+1; } /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the * server.repl_backlog_size and to resize the buffer and setup it so that * it contains the same data as the previous one (possibly less data, but * the most recent bytes, or the same data and more free space in case the * buffer is enlarged). */ void resizeReplicationBacklog(long long newsize) { if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (server.repl_backlog_size == newsize) return; server.repl_backlog_size = newsize; if (server.repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. * The reason is that copying a few gigabytes adds latency and even * worse often we need to alloc additional space before freeing the * old buffer. */ zfree(server.repl_backlog); server.repl_backlog = zmalloc(server.repl_backlog_size); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; /* Next byte we have is... the next since the buffer is empty. */ server.repl_backlog_off = server.master_repl_offset+1; } } void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); zfree(server.repl_backlog); server.repl_backlog = NULL; } /* Add data to the replication backlog. * This function also increments the global replication offset stored at * server.master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ void feedReplicationBacklog(void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ while(len) { size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; if (thislen > len) thislen = len; memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); server.repl_backlog_idx += thislen; if (server.repl_backlog_idx == server.repl_backlog_size) server.repl_backlog_idx = 0; len -= thislen; p += thislen; server.repl_backlog_histlen += thislen; } if (server.repl_backlog_histlen > server.repl_backlog_size) server.repl_backlog_histlen = server.repl_backlog_size; /* Set the offset of the first byte we have in the backlog. */ server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1; } /* Wrapper for feedReplicationBacklog() that takes Redis string objects * as input. */ void feedReplicationBacklogWithObject(robj *o) { char llstr[LONG_STR_SIZE]; void *p; size_t len; if (o->encoding == OBJ_ENCODING_INT) { len = ll2string(llstr,sizeof(llstr),(long)o->ptr); p = llstr; } else { len = sdslen(o->ptr); p = o->ptr; } feedReplicationBacklog(p,len); } int canFeedReplicaReplBuffer(client *replica) { /* Don't feed replicas that only want the RDB. */ if (replica->flags & CLIENT_REPL_RDBONLY) return 0; /* Don't feed replicas that are still waiting for BGSAVE to start. */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; return 1; } /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication * stream. Instead if the instance is a slave and has sub-slaves attached, * we use replicationFeedSlavesFromMasterStream() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ if (server.masterhost != NULL) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } /* Add the SELECT command into the backlog. */ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; addReply(slave,selectcmd); } if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyArrayLen(slave,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }