Redis在命令成功执行后,会先将命令追加到AOF日志中,然后再依次推送给每个从节点。
/* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * * flags are an xor between: * + PROPAGATE_NONE (no propagation of command at all) * + PROPAGATE_AOF (propagate into the AOF file if is enabled) * + PROPAGATE_REPL (propagate into the replication link) * * This should not be used inside commands implementation since it will not * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(), * preventCommandPropagation(), forceCommandPropagation(). * * However for functions that need to (also) propagate out of the context of a * command execution, for example when serving a blocked client, you * want to use propagate(). */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } /* Propagate expires into slaves and the AOF file. * When a key expires in the master, a DEL operation for this key is sent * to all the slaves and the AOF file if enabled. * * This way the key expiry is centralized in one place, and since both * AOF and the master->slave link guarantee operation ordering, everything * will be consistent even if we allow write operations against expiring * keys. */ void propagateExpire(redisDb *db, robj *key, int lazy) { robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; argv[1] = key; incrRefCount(argv[0]); incrRefCount(argv[1]); if (server.aof_state != AOF_OFF) feedAppendOnlyFile(server.delCommand,db->id,argv,2); replicationFeedSlaves(server.slaves,db->id,argv,2); decrRefCount(argv[0]); decrRefCount(argv[1]); }
先将切换数据库命令
写入repl_backlog中,再将切换数据库命令
发给每个从库执行。
先将执行成功命令
写入到repl_backlog中,再将执行成功命令
发送给每个从库执行。
/* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication * stream. Instead if the instance is a slave and has sub-slaves attached, * we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ if (server.masterhost != NULL) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } /* Add the SELECT command into the backlog. */ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; addReply(slave,selectcmd); } if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyMultiBulkLen(slave,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }
在将命令追加到AOF日志过程中,会对部分命令进行重写:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setCommand && argc > 3) { int i; robj *exarg = NULL, *pxarg = NULL; /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */ buf = catAppendOnlyGenericCommand(buf,3,argv); for (i = 3; i < argc; i ++) { if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1]; if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1]; } serverAssert(!(exarg && pxarg)); if (exarg) buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1], exarg); if (pxarg) buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1], pxarg); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); }
CALL执行命令步骤:
/* Call() is the core of Redis execution of a command. * * The following flags can be passed: * CMD_CALL_NONE No flags. * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. * * The exact propagation behavior depends on the client flags. * Specifically: * * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set * in the call flags, then the command is propagated even if the * dataset was not affected by the command. * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP * are set, the propagation into AOF or to slaves is not performed even * if the command modified the dataset. * * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or * slaves propagation will never occur. * * Client flags are modified by the implementation of a given command * using the following API: * * forceCommandPropagation(client *c, int flags); * preventCommandPropagation(client *c); * preventCommandAOF(client *c); * preventCommandReplication(client *c); * */ void call(client *c, int flags) { long long dirty; ustime_t start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; server.fixed_time_expire++; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); } /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); redisOpArray prev_also_propagate = server.also_propagate; redisOpArrayInit(&server.also_propagate); /* Call the command. */ dirty = server.dirty; updateCachedTime(0); start = server.ustime; c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & CLIENT_LUA) flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ if (c->flags & CLIENT_LUA && server.lua_caller) { if (c->flags & CLIENT_FORCE_REPL) server.lua_caller->flags |= CLIENT_FORCE_REPL; if (c->flags & CLIENT_FORCE_AOF) server.lua_caller->flags |= CLIENT_FORCE_AOF; } /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } if (flags & CMD_CALL_STATS) { /* use the real command that was executed (cmd and lastamc) may be * different, in case of MULTI-EXEC or re-written commands such as * EXPIRE, GEOADD, etc. */ real_cmd->microseconds += duration; real_cmd->calls++; } /* Propagate the command into the AOF and replication link */ if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; /* Check if the command operated changes in the data set. If so * set for replication / AOF propagation. */ if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); /* If the client forced AOF / replication of the command, set * the flags regardless of the command effects on the data set. */ if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; /* However prevent AOF / replication propagation if the command * implementations called preventCommandPropagation() or similar, * or if we don't have the call() flags to do so. */ if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; /* Call propagate() only if at least one of AOF / replication * propagation is needed. Note that modules commands handle replication * in an explicit way, so we never replicate them automatically. */ if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } /* Restore the old replication flags, since call() can be executed * recursively. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ if (server.also_propagate.numops) { int j; redisOp *rop; if (flags & CMD_CALL_PROPAGATE) { for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; /* Whatever the command wish is, we honor the call() flags. */ if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } } redisOpArrayFree(&server.also_propagate); } server.also_propagate = prev_also_propagate; server.fixed_time_expire--; server.stat_numcommands++; }