redis集群源码分为两个部分来讲,第一部分主要分析一个命令在集群模式下是如何定位到一个server来处理该命令。第二部分主要分析集群故障转移部分的源码。
本篇文章主要包括
1、一个命令请求在集群模式下是如何被处理的
2、集群模式下我们要注意哪些问题?
我们知道在redis集群模式下,每个实例分担了部分的命令请求。比如说我们要查找一个key,那么查找这个key的命令究竟要在哪个示例上被处理呢?说到这里我们有必要先给出redis集群的数据结构。
// 集群模式下节点数据结构 struct clusterNode { // 创建节点的时间 mstime_t ctime; /* Node object creation time. */ // 节点的名字,由 40 个十六进制字符组成 // 例如 68eef66df23420a5862208ef5b1a7005b806f2ff char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ // 节点标识 // 使用各种不同的标识值记录节点的角色(比如主节点或者从节点), // 以及节点目前所处的状态(比如在线或者下线)。 int flags; /* REDIS_NODE_... */ // 节点当前的配置纪元,用于实现故障转移 uint64_t configEpoch; /* Last configEpoch observed for this node */ // 由这个节点负责处理的槽 // 一共有 REDIS_CLUSTER_SLOTS / 8 个字节长 // 每个字节的每个位记录了一个槽的保存状态 // 位的值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理 // 比如 slots[0] 的第一个位保存了槽 0 的保存情况 // slots[0] 的第二个位保存了槽 1 的保存情况,以此类推 unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ // 该节点负责处理的槽数量 int numslots; /* Number of slots handled by this node */ // 如果本节点是主节点,那么用这个属性记录从节点的数量 int numslaves; /* Number of slave nodes, if this is a master */ // 指针数组,指向各个从节点 struct clusterNode **slaves; /* pointers to slave nodes */ // 如果这是一个从节点,那么指向主节点 struct clusterNode *slaveof; /* pointer to the master node */ // 最后一次发送 PING 命令的时间 mstime_t ping_sent; /* Unix time we sent latest ping */ // 最后一次接收 PONG 回复的时间戳 mstime_t pong_received; /* Unix time we received the pong */ // 最后一次被设置为 FAIL 状态的时间 mstime_t fail_time; /* Unix time when FAIL flag was set */ // 最后一次给某个从节点投票的时间 mstime_t voted_time; /* Last time we voted for a slave of this master */ // 最后一次从这个节点接收到复制偏移量的时间 mstime_t repl_offset_time; /* Unix time we received offset for this node */ // 这个节点的复制偏移量 long long repl_offset; /* Last known repl offset for this node. */ // 节点的 IP 地址 char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ // 节点的端口号 int port; /* Latest known port of this node */ // 保存连接节点所需的有关信息 clusterLink *link; /* TCP/IP link with this node */ // 一个链表,记录了所有其他节点对该节点的下线报告 list *fail_reports; /* List of nodes signaling this as failing */ };
我们需要注意的是clusterNode节点中的 slots数组,这个数组中保存了该节点要处理的槽位。如果该位置为1表示该节点负责该槽位的处理,否则该节点不负责该槽位的处理。
关于槽位的解释:在集群模式下,redis总共划分16384个槽位,每个集群中的节点(注:在本文中 节点=实例)负责部分槽位的处理。
下面再给出在集群模式下集群状态的数据结构
// 集群状态,每个节点都保存着一个这样的状态,记录了它们眼中的集群的样子。 // 另外,虽然这个结构主要用于记录集群的属性,但是为了节约资源, // 有些与节点有关的属性,比如 slots_to_keys 、 failover_auth_count // 也被放到了这个结构里面。 typedef struct clusterState { // 指向当前节点的指针 clusterNode *myself; /* This node */ // 集群当前的配置纪元,用于实现故障转移 uint64_t currentEpoch; // 集群当前的状态:是在线还是下线 int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ // 集群中至少处理着一个槽的节点的数量。 int size; /* Num of master nodes with at least one slot */ // 集群节点名单(包括 myself 节点) // 字典的键为节点的名字,字典的值为 clusterNode 结构 dict *nodes; /* Hash table of name -> clusterNode structures */ // 节点黑名单,用于 CLUSTER FORGET 命令 // 防止被 FORGET 的命令重新被添加到集群里面 // (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?) dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ // 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点 // migrating_slots_to[i] = NULL 表示槽 i 未被迁移 // migrating_slots_to[i] = clusterNode_A 表示槽 i 要从本节点迁移至节点 A clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS]; // 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点 // importing_slots_from[i] = NULL 表示槽 i 未进行导入 // importing_slots_from[i] = clusterNode_A 表示正从节点 A 中导入槽 i clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; // 负责处理各个槽的节点 // 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理 clusterNode *slots[REDIS_CLUSTER_SLOTS]; // 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序 // 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便 // 具体操作定义在 db.c 里面 zskiplist *slots_to_keys; /* The following fields are used to take the slave state on elections. */ // 以下这些域被用于进行故障转移选举 // 上次执行选举或者下次执行选举的时间 mstime_t failover_auth_time; /* Time of previous or next election. */ // 节点获得的投票数量 int failover_auth_count; /* Number of votes received so far. */ // 如果值为 1 ,表示本节点已经向其他节点发送了投票请求 int failover_auth_sent; /* True if we already asked for votes. */ int failover_auth_rank; /* This slave rank for current auth request. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */ /* Manual failover state in common. */ /* 共用的手动故障转移状态 */ // 手动故障转移执行的时间限制 mstime_t mf_end; /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */ /* Manual failover state of master. */ /* 主服务器的手动故障转移状态 */ clusterNode *mf_slave; /* Slave performing the manual failover. */ /* Manual failover state of slave. */ /* 从服务器的手动故障转移状态 */ long long mf_master_offset; /* Master offset the slave needs to start MF or zero if stil not received. */ // 指示手动故障转移是否可以开始的标志值 // 值为非 0 时表示各个主服务器可以开始投票 int mf_can_start; /* If non-zero signal that the manual failover can start requesting masters vote. */ /* The followign fields are uesd by masters to take state on elections. */ /* 以下这些域由主服务器使用,用于记录选举时的状态 */ // 集群最后一次进行投票的纪元 uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ // 在进入下个事件循环之前要做的事情,以各个 flag 来记录 int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ // 通过 cluster 连接发送的消息数量 long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ // 通过 cluster 接收到的消息数量 long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/ } clusterState;
我们需要特别注意的是 clusterState结构中引用了clusterNode数据结构。在clusterState结构中我们也看到了一个slots数组,这个数组记录了槽i被指派给了哪个节点处理。所以clusterNode中的slots与clusterState中的slots数组不同之处在于:clusterNode.slots只记录了本节点需要处理的槽位信息,而clusterState.slots记录了槽位i分配给了哪个节点,之所以用两个数组是为了效率上的考虑,我们知道这点就行了。
下面进入正题,当在集群模式下,一个请求命令到底要被哪个server进行处理呢?这里大家记住,每一个命令请求都是从函数 processCommand(redisClient *c) 开始的
在这里插入代码片int processCommand(redisClient *c) { // 这里就是有关集群的入口 /* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot; // 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK; // 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK; // 命令针对的槽和键不是本节点处理的,进行转向 } else if (n != server.cluster->myself) { flagTransaction(c); // -<ASK or MOVED> <slot> <ip>:<port> // 例如 -ASK 10086 127.0.0.1:12345 addReplySds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d\r\n", (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot,n->ip,n->port)); return REDIS_OK; } // 如果执行到这里,说明键 key 所在的槽由本节点处理 // 或者客户端执行的是无参数命令 } }
上面的代码段就是集群模式下处理命令请求的入口。其中getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code)这个函数就是找到处理命令对应的redis 实例。
我们看到入参有 error_code这个参数,这个参数会在此函数中被赋值。那么它有几个值呢?每个不同的code码,都是代表什么意思呢?下面给出几个枚举值
/* 由 getNodeByQuery() 函数返回的转向错误。 /
// 节点可以处理这个命令
#define REDIS_CLUSTER_REDIR_NONE 0 / Node can serve the request. /
// 键在其他槽
#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 / Keys in different slots. /
// 键所处的槽正在进行 reshard
#define REDIS_CLUSTER_REDIR_UNSTABLE 2 / Keys in slot resharding. /
// 需要进行 ASK 转向
#define REDIS_CLUSTER_REDIR_ASK 3 / -ASK redirection required. */
// 需要进行 MOVED 转向
#define REDIS_CLUSTER_REDIR_MOVED 4
我们这里先给出这几个值,后续源码中会有关于这几个值的解释。
我们开始分析getNodeByQuery()这个函数
```c /* 该函数返回处理命令的集群节点 Return the pointer to the cluster node that is able to serve the command. 集群处理的命令只能是1.单个key 2.多个key,但是这些key对应的redis 实例是同一个,并且集群是稳定的【未在进行重新分片】 * For the function to succeed the command should only target either: * * 1) A single key (even multiple times like LPOPRPUSH mylist mylist). * 2) Multiple keys in the same hash slot, while the slot is stable (no * resharding in progress). * 如果成功,该函数返回能够处理命令请求的redis 实例 * On success the function returns the node that is able to serve the request. * 如果节点不是 'myself',会执行重定向 * If the node is not 'myself' a redirection must be perfomed. * 重定向的方式有 ask和remvoed两种 The kind of * redirection is specified setting the integer passed by reference * 'error_code', which will be set to REDIS_CLUSTER_REDIR_ASK or * REDIS_CLUSTER_REDIR_MOVED. * 当处理命令的节点是 `myself`, error_code被赋值为REDIS_CLUSTER_REDIR_NONE * When the node is 'myself' 'error_code' is set to REDIS_CLUSTER_REDIR_NONE. *当这个命令不能处理时,返回NULL,error_code被赋值为原因 * If the command fails NULL is returned, and the reason of the failure is * provided via 'error_code', which will be set to: *当这个命令包含多个key并且这些key不在同一个slot中时, error_code被赋值为 REDIS_CLUSTER_REDIR_CROSS_SLOT * REDIS_CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that * don't belong to the same hash slot. * 当命令中含有多个key,并且这些key同属一个slot,但是集群正在resharding,error_code被赋值为RREDIS_CLUSTER_REDIR_UNSTABLE * REDIS_CLUSTER_REDIR_UNSTABLE if the request contains mutliple keys * belonging to the same slot, but the slot is not stable (in migration or * importing state, likely because a resharding is in progress). */ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { // 初始化为 NULL , // 如果输入命令是无参数命令,那么 n 就会继续为 NULL clusterNode *n = NULL; robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; /* Set error code optimistically for the base case. */ if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE; /* We handle all the cases as if they were EXEC commands, so we have * a common code path for everything */ // 集群可以执行事务, // 但必须确保事务中的所有命令都是针对某个相同的键进行的 // 这个 if 和接下来的 for 进行的就是这一合法性检测 if (cmd->proc == execCommand) { /* If REDIS_MULTI flag is not set EXEC is just going to return an * error. */ if (!(c->flags & REDIS_MULTI)) return myself; ms = &c->mstate; } else { /* In order to have a single codepath create a fake Multi State * structure if the client is not in MULTI/EXEC state, this way * we have a single codepath below. */ ms = &_ms; _ms.commands = &mc; _ms.count = 1; mc.argv = argv; mc.argc = argc; mc.cmd = cmd; } /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; int margc, *keyindex, numkeys, j; mcmd = ms->commands[i].cmd; margc = ms->commands[i].argc; margv = ms->commands[i].argv; // 定位命令的键位置 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys); // 遍历命令中的所有键 for (j = 0; j < numkeys; j++) { robj *thiskey = margv[keyindex[j]]; int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { // 这是事务中第一个被处理的键 // 获取该键的槽和负责处理该槽的节点 /* This is the first key we see. Check what is the slot * and node. */ firstkey = thiskey; slot = thisslot; n = server.cluster->slots[slot]; redisAssertWithInfo(c,firstkey,n != NULL); /* If we are migrating or importing this slot, we need to check * if we have all the keys in the request (the only way we * can safely serve the request, otherwise we return a TRYAGAIN * error). To do so we set the importing/migrating state and * increment a counter for every missing key. */ if (n == myself && server.cluster->migrating_slots_to[slot] != NULL) { migrating_slot = 1; } else if (server.cluster->importing_slots_from[slot] != NULL) { importing_slot = 1; } } else { /* If it is not the first key, make sure it is exactly * the same key as the first we saw. */ if (!equalStringObjects(firstkey,thiskey)) { if (slot != thisslot) { /* Error: multiple keys from different slots. */ getKeysFreeResult(keyindex); if (error_code) *error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT; return NULL; } else { /* Flag this request as one with multiple different * keys. */ multiple_keys = 1; } } } /* Migarting / Improrting slot? Count keys we don't have. */ if ((migrating_slot || importing_slot) && lookupKeyRead(&server.db[0],thiskey) == NULL) { missing_keys++; } } getKeysFreeResult(keyindex); } // end for /* No key at all in command? then we can serve the request * without redirections or errors. */ if (n == NULL) return myself; /* Return the hashslot by reference. */ if (hashslot) *hashslot = slot; /* This request is about a slot we are migrating into another instance? * Then if we have all the keys. */ /* If we don't have all the keys and we are migrating the slot, send * an ASK redirection. */ if (migrating_slot && missing_keys) { if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK; return server.cluster->migrating_slots_to[slot]; } /* If we are receiving the slot, and the client correctly flagged the * request as "ASKING", we can serve the request. However if the request * involves multiple keys and we don't have them all, the only option is * to send a TRYAGAIN error. */ if (importing_slot && (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) { if (multiple_keys && missing_keys) { if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE; return NULL; } else { return myself; } } /* Handle the read-only client case reading from a slave: if this * node is a slave and the request is about an hash slot our master * is serving, we can reply without redirection. */ if (c->flags & REDIS_READONLY && cmd->flags & REDIS_CMD_READONLY && nodeIsSlave(myself) && myself->slaveof == n) { return myself; } /* Base case: just return the right node. However if this node is not * myself, set error_code to MOVED since we need to issue a rediretion. */ if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED; // 返回负责处理槽 slot 的节点 n return n; }
以上就是 getQueryNode这个函数的功能,其实也比较简单,对着注释多看两遍就能看懂。无非就是找到处理命令的集群节点,如果找不到给error_code赋值原因。调用者根绝error_code做相应的处理。
2.在集群模式下我们应该注意哪些问题呢?
(1)在集群模式下应该尽量避免多key查询
(2)注意 key的散列与集群均衡
后记:参考书籍《redis设计与实现》《redis3.0源码》