在Redis的命令处理函数processCommand
(server.c)中有对集群节点的处理,满足以下条件时进入集群节点处理逻辑中:
server.cluster_enabled
判断条件三的判断条件有些绕,!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0
意味着命令中没有key参数,c->cmd->proc != execCommand
表示当前命令不是EXEC,然后对(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)
整体做了取反操作,那么看以下两种情况:
!cmdHasMovableKeys(c->cmd)
就已返回false,又因为对整体做了取反操作,所以条件成立,意味着收到命令中带有Key时需要执行重定向处理c->cmd->proc != execCommand
返回false,对整体取反变成true,所以条件也成立,意味着收到EXEC命令的时候执行重定向处理int processCommand(client *c) { // 省略... /* 如果启用了集群且发送命令的节点不是主节点,并且收到的命令中包含了key参数或者命令是EXEC时 */ if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot; int error_code; // 查询节点 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } // 重定向 clusterRedirectClient(c,n,hashslot,error_code); c->cmd->rejected_calls++; return C_OK; } } // 省略... return C_OK; } /* 如果参数中有key将会返回1 */ static int cmdHasMovableKeys(struct redisCommand *cmd) { return (cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) || cmd->flags & CMD_MODULE_GETKEYS; }
上面说到,如果是EXEC
命令时,也会进入到集群节点处理逻辑,EXEC
命令一般与MULTI
结合使用,用于执行事务。比如以下例子中,使用MULTI
开启事务,执行对a账户增1,b账户减1的操作,可以看到返回结果为QUEUED
,命令被缓存起来,直到执行EXEC
命令,Redis才开始提交命令:
127.0.0.1:6379> MULTI OK 127.0.0.1:6379> INCR a:account QUEUED 127.0.0.1:6379> DECR b:account QUEUED 127.0.0.1:6379> EXEC 1) (integer) 1 2) (integer) -1
由于集群也需要对EXEC
命令处理,所以先看一下MULTI
命令的处理逻辑,MULTI
命令对应的执行函数为multiCommand
,可以看到它在处理的时候为客户端设置了CLIENT_MULTI
标记:
void multiCommand(client *c) { if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } // 设置CLIENT_MULTI标记 c->flags |= CLIENT_MULTI; addReply(c,shared.ok); }
在Redis的命令处理函数中可以找到对CLIENT_MULTI
的处理逻辑,如果客户端标记中有CLIENT_MULTI,并且当前命令不是EXEC、DISCARD、MULTI、WATCH和RESET,将调用queueMultiCommand
函数,对命令进行缓存:
int processCommand(client *c) { // 省略... /* 处理MULTI命令 */ /* 如果客户端标记中有CLIENT_MULTI,并且当前命令不是EXEC、DISCARD、MULTI、WATCH和RESET */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != resetCommand) { queueMultiCommand(c); // 加入到multi队列中,先将命令缓存 addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; }
在客户端结构体定义中,可以看到使用了multiState
缓存MULTI命令:
// 客户端 typedef struct client { // ... multiState mstate; /* 存储MULTI/EXEC命令的结构体 */ // ... }
multiState
MULTI命令对应的结构体为multiState
,multiState
中使用了multiCmd
结构体来缓存具体的命令:
typedef struct multiState { multiCmd *commands; /* MULTI命令数组 */ int count; /* 缓存的命令个数 */ int cmd_flags; /* 命令标记 */ int cmd_inv_flags; /* 与cmd_flags一致 */ } multiState; /* multi命令 */ typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd; /* 命令 */ } multiCmd;
queueMultiCommand
对MULTI命令缓存的处理在queueMultiCommand函数中,它在multi.c文件中定义:
multiCmd
加入到缓存数组c->mstate.commands
中,对命令进行缓存multiCmd
中/* 将当前命令加入到MULTI命令中 */ void queueMultiCommand(client *c) { // MULTI命令 multiCmd *mc; int j; if (c->flags & CLIENT_DIRTY_EXEC) return; c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); // 到加入MULTI数组中 mc = c->mstate.commands+c->mstate.count; // 设置命令 mc->cmd = c->cmd; // 设置参数 mc->argc = c->argc; mc->argv = zmalloc(sizeof(robj*)*c->argc); memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); // 缓存的命令数加1 c->mstate.count++; // 设置客户端标记 c->mstate.cmd_flags |= c->cmd->flags; c->mstate.cmd_inv_flags |= ~c->cmd->flags; }
getNodeByQuery
getNodeByQuery函数用于根据KEY查询数据所在的节点,处理逻辑如下:
如果是EXEC命令,从客户端获取multiState,multiState中缓存了MULTI命令,如果不是MULTI命令,而是单个命令,同样使用multiState来存放命令,之后就可以统一使用multiState来获取请求中的命令
根据命令的个数进行遍历,处理每一个命令
(1)从命令中获取key的个数,处理每一个key
(2)查询每一个key所在的slot
(3)如果处理的是第一个key,根据所属slot获取所在的节点,记为n
,有以下三种情况:
情况一:未获取到节点(有可能节点已下线但是还未更新状态),记录错误信息为CLUSTER_REDIR_DOWN_UNBOUND
,表示key未绑定到slot,返回NULL
情况二:可以查找到节点,并且是当前节点自己,但是key所属slot正在做数据迁出操作(从当前节点迁出),此时将migrating_slot
置为1
情况三:可以查找到节点,并且不是当前节点自己,但是key所属slot正在迁入到当前节点,此时将importing_slot
置为1
(4)如果处理的不是第一个key,判断当前key所属的slot是否与第一个key的slot一致:
情况一:如果不一致,表示不同的key所属的slot不同,将error_code置为CLUSTER_REDIR_CROSS_SLOT
,返回NULL
情况二:如果一致,将multiple_keys
置为1,表示请求中有多个命令
(5)根据migrating_slot
和importing_slot
的值判断key所属slot是否正在迁出或者迁入,迁出意味着key对应的数据正在从当前节点迁出到其他节点,迁入意味着key对应的数据正在迁入到当前节点,由于数据未迁移完毕,所以这两种情况都需要检查key是否在当前节点的数据库中,如果不在意味着当前节点没有该key的数据,需要记录缺失的KEY的数量,missing_keys
增1
根据第二步查询后的结果,进行如下处理:
未查找到节点,也就是n
为空,返回当前节点自己
当前节点不处于正常状态(CLUSTER_OK)
(1)如果未开启allow_reads_when_down(在节点下线时允许读),error_code置为CLUSTER_REDIR_DOWN_STATE
,并返回NULL
(2)当前命令中有写标记,error_code置为CLUSTER_REDIR_DOWN_RO_STATE
,并返回NULL
(3)非以上两种情况,表示开启了allow_reads_when_down,并且是读操作,所以当前节点依旧可以处理请求,继续往下执行
如果数据正在迁出或者正在迁入,并且当前命令是MIGRATE数据迁移的命令,返回当前节点
如果key所在slot数据正在从当前节点迁出,并且当前节点数据库中有缺失的key,error_code置为CLUSTER_REDIR_ASK
并返回迁出到的那个节点
如果key所在slot正在迁入到当前节点,并且当前命令是ASK ,此时如果请求中有多个KEY并且当前节点存在缺失的KEY,表示有些key不在当前节点,error_code置为CLUSTER_REDIR_UNSTABLE
返回NULL,否则返回当前节点即可
如果客户端有只读标记、 当前命令不是写命令、当前节点是从节点并且它的主节点是根据key所属slot查找到的节点,返回当前节点,因为从节点数据是从master节点同步的,而master节点正是要查找的节点,从节点也可以处理读请求
如果查询到的节点不是当前节点,将error_code置为CLUSTER_REDIR_MOVED
,表示数据已经移动到其他节点,此时返回key所属slot对应的实际节点
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { // 集群节点 clusterNode *n = NULL; // 记录命令中的第一个KEY robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) return myself; if (error_code) *error_code = CLUSTER_REDIR_NONE; /* 如果是EXEC命令 */ if (cmd->proc == execCommand) { /* 校验是否有CLIENT_MULTI标记 */ if (!(c->flags & CLIENT_MULTI)) return myself; // 获取multiState ms = &c->mstate; } else { /* 如果不是MULTI命令,而是单个命令,同样使用multiState来存储命令 */ ms = &_ms; _ms.commands = &mc; _ms.count = 1; // 命令个数设置为1 mc.argv = argv; mc.argc = argc; mc.cmd = cmd; // 设置命令 } /* 根据命令的个数进行遍历,处理每一个命令 */ for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; int margc, *keyindex, numkeys, j; mcmd = ms->commands[i].cmd; // 获取命令 margc = ms->commands[i].argc; margv = ms->commands[i].argv; getKeysResult result = GETKEYS_RESULT_INIT; // 从命令中获取key的个数 numkeys = getKeysFromCommand(mcmd,margv,margc,&result); keyindex = result.keys; // 遍历每一个key for (j = 0; j < numkeys; j++) { // 获取key robj *thiskey = margv[keyindex[j]]; // 查询key所在的slot int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); // 如果是第一个key if (firstkey == NULL) { /* 将第一个key记录在firstkey */ firstkey = thiskey; // 记录slot slot = thisslot; // 根据slot获取集群节点 n = server.cluster->slots[slot]; /* 如果未获取到节点(有可能节点已下线),记录错误信息,返回NULL */ if (n == NULL) { getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; } /* 如果根据slot查到的节点是当前节点自己,并且slot正在做数据迁出操作 */ if (n == myself && server.cluster->migrating_slots_to[slot] != NULL) { migrating_slot = 1; // migrating_slot置为1,标记正在做数据迁出操作 } else if (server.cluster->importing_slots_from[slot] != NULL) { // 如果key所属的slot正在做数据迁入操作,importing_slot置为1 importing_slot = 1; } } else { /* 如果不是第一个key*/ if (!equalStringObjects(firstkey,thiskey)) { // 如果和第一个key的slot不一致,error_code置为CLUSTER_REDIR_CROSS_SLOT if (slot != thisslot) { getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_CROSS_SLOT; /* 不同的key所属不同的slot */ return NULL; } else { /* 标记请求中有多个KEY */ multiple_keys = 1; } } } /* 如果slot正在迁入或者迁出,检查key是否在当前节点的db中,如果不在记录缺失的KEY的数量 */ if ((migrating_slot || importing_slot) && lookupKeyRead(&server.db[0],thiskey) == NULL) { missing_keys++; } } getKeysFreeResult(&result); } /* 如果未查到,返回当前节点自己 */ if (n == NULL) return myself; /* 如果当前节点的状态不是CLUSTER_OK状态,节点可能处于异常状态,只有在开启了allow_reads_when_down(在节点下线时允许读)并且当前命令是读操作才继续往下处理,否则记录错误信息返回NULL */ if (server.cluster->state != CLUSTER_OK) { // 如果设置了节点下线时不允许读 if (!server.cluster_allow_reads_when_down) { /* 记录错误信息,返回NULL */ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; } else if (cmd->flags & CMD_WRITE) { // 如果命令中有写标记 /* The cluster is configured to allow read only commands */ if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE; return NULL; } else { /* Fall through and allow the command to be executed: * this happens when server.cluster_allow_reads_when_down is * true and the command is not a write command */ } } /* 更新hashslot */ if (hashslot) *hashslot = slot; /* 如果数据正在迁出或者正在迁入,并且当前命令是MIGRATE数据迁移的命令,返回当前节点 */ if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand) return myself; /* 如果key所在slot数据正在迁出,并且当前节点数据库中有缺失的key*/ if (migrating_slot && missing_keys) { // error_code设置为CLUSTER_REDIR_ASK if (error_code) *error_code = CLUSTER_REDIR_ASK; // 返回迁出到的那个节点 return server.cluster->migrating_slots_to[slot]; } /* 如果key所在slot正在做数据迁入,并且当前命令是ASK */ if (importing_slot && (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING)) { // 如果请求中有多个KEY并且有当前节点数据库中有缺失的key if (multiple_keys && missing_keys) { if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; return NULL; } else { // 返回当前节点 return myself; } } /* 是否是写命令 */ int is_write_command = (c->cmd->flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); // 如果客户端有只读标记、当前命令不是写命令,当前节点是从节点并且它的主节点是根据key所属slot查找到节点 if (c->flags & CLIENT_READONLY && !is_write_command && nodeIsSlave(myself) && myself->slaveof == n) { // 返回当前节点即可 return myself; } /* 如果查询到的节点不是当前节点,将error_code置为CLUSTER_REDIR_MOVED,返回key所属slot对应的实际节点 */ if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; return n; }
redisDb
Redis数据库对应的结构体定义为redisDb,里面有个字典类型的对象,存储键值对数据:
typedef struct redisDb { dict *dict; /* 存储的键值对数据 */ // 省略... } redisDb;
lookupKeyRead
lookupKeyRead函数用于从redisDb中根据key查找数据,最终是调用lookupKey函数完成的,根据Key从字典中查找并返回value:
robj *lookupKeyRead(redisDb *db, robj *key) { // 调用lookupKeyReadWithFlags查找 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); } robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { expireIfNeeded(db,key); // 调用lookupKey函数查找 return lookupKey(db,key,flags); } robj *lookupKey(redisDb *db, robj *key, int flags) { // 根据KEY从字典中进行查找 dictEntry *de = dictFind(db->dict,key->ptr); // 如果不为空 if (de) { robj *val = dictGetVal(de); if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { val->lru = LRU_CLOCK(); } } // 返回value return val; } else { return NULL; } }
clusterRedirectClient
clusterRedirectClient用于集群重定向处理,在getNodeByQuery函数中,根据查询节点的情况对error_code设置了不同的值,在clusterRedirectClient函数中可以看到对error_code的判断,根据error_code的不同,向客户端响应不同的内容:
CLUSTER_REDIR_CROSS_SLOT
,表示请求中有多个KEY,但是KEY所属slot不在同一个slot中CLUSTER_REDIR_UNSTABLE
,表示请求中有多个KEY并且在一个slot,但是数据可能正在迁入或迁出的过程中,节点中有缺失的KEY,slot处于一个不稳定的状态CLUSTER_REDIR_DOWN_STATE
,表示节点处于下线状态CLUSTER_REDIR_DOWN_RO_STATE
,表示节点处于下线状态,只接收读命令CLUSTER_REDIR_DOWN_UNBOUND
,标识key未绑定到节点,也就是根据key所属slot未查询到节点CLUSTER_REDIR_MOVED
或者CLUSTER_REDIR_ASK
:
CLUSTER_REDIR_MOVED
表示key所属slot已从当前节点迁出,此时向客户端响应MOVED命令并将迁出后slot以及所在节点ip和端口返回CLUSTER_REDIR_ASK
表示key所属slot正在从当前节点迁出的过程中,请求中的key有可能一部分还未迁出,一部分已经迁出完毕,此时向客户端返回ASK命令,并将slot以及迁出到的目标节点的ip和端口返回void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { if (error_code == CLUSTER_REDIR_CROSS_SLOT) { // 如果是CLUSTER_REDIR_CROSS_SLOT,向客户端回复key不在同一个slot中 addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot"); } else if (error_code == CLUSTER_REDIR_UNSTABLE) { /* 请求中有多个key并且在一个slot,但是数据可能正在迁入或迁出的过程中,slot并不稳定 */ addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot"); } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { // 节点处于下线状态 addReplyError(c,"-CLUSTERDOWN The cluster is down"); } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { // 节点已经下线只接收读命令 addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands"); } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { // 如果是CLUSTER_REDIR_DOWN_UNBOUND,表示根据key所属slot未查询到节点 addReplyError(c,"-CLUSTERDOWN Hash slot not served"); } else if (error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK) { /* 如果是MOVED或者ASK,需要进行请求重定向处理,向客户端返回ASK或者MOVED命令,并将目标节点的ip和端口返回 */ int use_pport = (server.tls_cluster && c->conn && connGetType(c->conn) != CONN_TYPE_TLS); int port = use_pport && n->pport ? n->pport : n->port; // 返回响应,包括ASK或者MOVED命令、slot信息、目标节点的ip端口 addReplyErrorSds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot, n->ip, port)); } else { serverPanic("getNodeByQuery() unknown error."); } }
参考
极客时间 - Redis源码剖析与实战(蒋德钧)
Redis版本:redis-6.2.5