我们再来学习如何从跳跃表中查询数据,跳跃表本质上是一个链表,但它允许我们像数组一样定位某个索引区间内的节点,并且与数组不同的是,跳跃表允许我们将头节点L0层的前驱节点(即跳跃表分值最小的节点)zsl->header.level[0].forward当成索引0的节点,尾节点zsl->tail(跳跃表分值最大的节点)当成索引zsl->length-1的节点,索引按分值从小到大递增查询;也允许我们将尾节点当成索引0的节点,头节点L0层的前驱节点当做索引zsl->length-1的节点,索引按分值从大到小递增查询。当我们调用下面的方法按照索引区间来查询时,会把我们的索引转换成跨度,然后查找落在跨度的第一个节点,之后根据reverse(逆向状态)决定是要正向查询还是逆向查询。
假设我们要进行正向查询(即:索引按分值从小到大递增查询),给定的索引区间是[0,2],那么我们要找到跨度为1的节点,然后从跨度为1的节点L0层逐个递进,直到停留在跨度为3的节点,头节点L0层的前驱节点zsl->header.level[0].forward在跳跃表的索引为0,跨度为1,在跨度为1的节点从L0层逐个递进,一直递进到跨度为3的节点,这样便完成了索引区间[0,2]的查询。如果我们要进行逆向查询(即:索引按分值从大到小递增查询),索引区间依旧是[0,2],那么我们要找到跨度为跨度为zsl->length-0=zsl->length的节点,那自然是尾节点,找到跨度区间的第一个节点后,我们通过backward指针逐个后退,一直后退到跨度为zsl->length-2的节点,如此便完成查询。
void zrangeGenericCommand(client *c, int reverse) { robj *key = c->argv[1]; robj *zobj; int withscores = 0; long start; long end; long llen; long rangelen; //读取起始索引和终止索引,如果存在一个索引读取失败,则退出 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; //判断是否要返回分值 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr, "withscores")) { withscores = 1; } else if (c->argc >= 5) { addReply(c, shared.syntaxerr); return; } //判断key是否存在,如果不存在则退出,如果存在但类型不为ZSET也退出。 if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL || checkType(c, zobj, OBJ_ZSET)) return; /* * Sanitize indexes. * 审查索引,这里主要针对传入索引为负数的情况,大家都知道,如果一个 * 跳跃表的节点个数为N,我们要从起始节点查询到末尾节点,可以用[0,N-1] * 或者[0,-1],当传入的end<0时,这里会重新规正end的索引,llen为zset的长度, * 因此查询[0,-1],这里会规正为[0,N-1]。同理,start也会被规正,如果我们查询 * [-5,-3],即代表查询有序集合倒数第5个节点至倒数第三个节点,前提是N>=5这个 * 查询才有意义。如果我们的起始索引传入的是一个绝对值>N的负数,那么llen + start的 * 结果也为负数,如果判断start<0,则start会被规正为0。 * */ llen = zsetLength(zobj); if (start < 0) start = llen + start; if (end < 0) end = llen + end; if (start < 0) start = 0; /* * Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. * 如果起始索引大于终止索引,或者起始索引大于等于有序集合节点数量,则直接 * 返回空数组。 * */ if (start > end || start >= llen) { addReply(c, shared.emptyarray); return; } //如果判断终止索引大于等于节点数,则规整为llen-1 if (end >= llen) end = llen - 1; //计算要返回的节点数 rangelen = (end - start) + 1; //…… if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { //压缩列表逻辑…… } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; sds ele; /* * Check if starting point is trivial, before doing log(N) lookup. * 这里会根据给定的开始索引,查找该索引对应的节点,并将ln指向该节点。 * 需要注意的一点是:平常我们都认为header.level[0].forward指向的节点,在跳跃表 * 中的索引为0,但有时候跳跃表的末尾节点zsl->tail的索引值也有可能为0,这里就要提到 * 逆向查询。 * 当我们使用ZRANGE key min max [WITHSCORES]命令查询时,成员的位置是按照其分值 * 从小到大来排序,这时候header.level[0].forward的索引值为0, * header.level[0].forward.level[0].forward的索引值为1。而zls->tail的索引值 * 为zls->length-1。 * 当我们使用ZREVRANGE key start stop [WITHSCORES]命令查询时,成员的位置是按照 * 其分值从大到小来排序,这时候zls->tail的索引值为0,header.level[0].forward的 * 索引值为zls->length-1,header.level[0].forward.level[0].forward的索引值为 * zls->length-2。当reverse为1时,本次查询即为逆向查询。 * 我们注意到不管是if还是else分值,只要start>0,最终都会执行zslGetElementByRank() * 将ln定位到起始节点。当start为0时,如果是逆向查询,则索引0的位置是尾节点zsl->tail, * 如果是正向查询,索引0的位置则是zsl->header->level[0].forward。 * 那么(llen - start)和(start + 1)又代表什么含义呢?为什么zslGetElementByRank() * 可以根据这两个公式的计算结果,定位到索引对应的节点呢?其实这两个公式计算的是跨度,而 * zslGetElementByRank()则是根据给定的跳跃表和跨度查找节点而已。 * 如果是正常查询,假设起始索引为0,则跨度为start(0)+1=1,刚好为头节点L0层到达第一个节点的 * 跨度为1;如果起始索引为1,则跨度为start(1)+1=2,刚好是头节点到达索引值为1的节点的跨度。 * 如果是逆向查询,索引值为0代表尾节点,而llen-start(0)=llen为头节点到达尾节点的跨度;同理, * 倒数第二个节点的索引值为1,头节点到达倒数第二个节点的跨度为llen-start(1)=llen-1。 * */ if (reverse) { ln = zsl->tail; if (start > 0) ln = zslGetElementByRank(zsl, llen - start); } else { ln = zsl->header->level[0].forward; if (start > 0) ln = zslGetElementByRank(zsl, start + 1); } //定位到起始节点后,根据逆向状态,不为0时后退查询(ln-backward),为0时递进查询(ln->level[0].forward) while (rangelen--) { serverAssertWithInfo(c, zobj, ln != NULL); ele = ln->ele; if (withscores && c->resp > 2) addReplyArrayLen(c, 2); addReplyBulkCBuffer(c, ele, sdslen(ele)); if (withscores) addReplyDouble(c, ln->score); ln = reverse ? ln->backward : ln->level[0].forward; } } else { serverPanic("Unknown sorted set encoding"); } }
查找到跨度对应的节点,查找到跨度对应的节点,则在<1>处返回,如果我们传入的跨度大于头节点到尾节点的跨度,则返回NULL。
/* * Finds an element by its rank. The rank argument needs to be 1-based. * */ zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0;//累计跨度 int i; x = zsl->header; /* * 从头节点的最高层出发,如果基于当前层能够递进到前一个节点, * 则把当前节点的跨度加到traversed。 */ for (i = zsl->level - 1; i >= 0; i--) { while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { traversed += x->level[i].span; x = x->level[i].forward; } /* * 如果累计跨度与调用方传入的跨度相等,则代表x已经前进到调用方 * 所要求达到的跨度的节点,返回x。 */ if (traversed == rank) { return x;//<1> } } //如果传入的跨度大于头节点到尾节点的跨度,则返回NULL。 return NULL; }
跳跃表除了可以根据索引区间来查询,还可以根据分值区间来查询,这里我们又见到了结构体zrangespec。当我们需要判断一个节点是否落在我们指定的分值区间内,需要调用zslValueGteMin()和zslValueLteMax(),当传入一个指定的分值和区间,zslValueGteMin()和zslValueLteMax()的结果不为0,则表明节点落在分值区间内。此外,这两个方法还可以判断一个跳跃表是否和区间有交集,比如调用zslValueGteMin()时,传入尾节点(跳跃表分值最大的节点)及一个指定区间,如果尾节点没有落在指定区间,代表此区间都大于尾节点,此时我们不需要遍历跳跃表即可返回一个空数组,告诉客户端在指定区间内找不到任何节点;同理,调用zslValueLteMax()时传入头节点L0层的前驱节点(跳跃表分值最小节点)没有落在区间内,则表明区间小于跳跃表,同样不需要遍历跳跃表即可返回空数组给客户端,告诉客户端在指定区间内找不到任何节点。
/* * Struct to hold a inclusive/exclusive range spec by score comparison. * 此结构体用于表示一个指定区间,minex为0时表示在进行最小值比较时,要包含最小值本身 * 同理maxex为0时表示进行最大值比较时,要包含最大值本身。 * 比如:min=2,max=9 * 当minex=0,maxex=0时,区间为:[2,9] * 当minex=1,maxex=0时,区间为:(2,9] * 当minex=0,maxex=1时,区间为:[2,9) * 当minex=1,maxex=1时,区间为:(2,9) * */ typedef struct { double min, max; int minex, maxex; /* are min or max exclusive? */ } zrangespec; /* * 如果spec->minex不为0,返回分值是否大于区间最小值的比较结果, * 为0则返回分值是否大于等于区间最小值的比较结果。 * 如果传入一个跳跃表尾节点的分值zsl->tail.score(即:跳跃表最大分值)和区间返回结果为0, * 则表示跳跃表和区间没有交集。 * 这里分两种情况: * spec->minex不为0:区间要查询分值大于spec->min的元素, * zsl->tail.score<=spec->min代表跳跃表最大分值小于等于min,返回结果为0。 * spec->minex为0:区间要查询分值大于等于spec->min的元素, * zsl->tail.score<spec->min代表跳跃表最大分值小于min,返回结果为0。 */ int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } /* * 如果spec->maxex不为0,返回分值是否小于区间最大值的比较结果,为0则返回分值 * 是否小于等于区间最大值的比较结果。 * 如果传入一个跳跃表头节点L0层指向节点的分值 * zsl->header.level[0].forward.score(即:跳跃表最小分值)和 * 区间返回结果为0,则表示跳跃表和区间没有交集。 * 这里分两种情况: * spec->maxex不为0:区间要查询分值小于spec->max的元素, * zsl->header.level[0].forward.score>=spec->max代表跳跃表最小分值大于等于区间 * 最大分值,返回结果为0。 * spec->maxex为0:区间要查询分值小于等于spec->max的元素, * zsl->header.level[0].forward.score>spec->max代表跳跃表最小分值大于区间最大分值, * 返回结果为0。 */ int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); }
在真正根据分值区间查询跳跃表前,会校验区间是否有效,如果我们输入一个区间[a,b],但a>b,那么这个区间肯定是无效区间,无须遍历跳跃表;如果a=b,如果区间的开闭状态出现:(a,b)、(a,b]、[a,b)这三种情况,也是无效区间,只有[a,b]才会去查询节点,表示需要查找分值为a(或者b)的节点。当校验完区间是有效后,还会调用zslValueGteMin()和zslValueLteMax()判断跳跃表和区间是否存在交集,即区间是否整体大于跳跃表或整体小于跳跃表,如果出现这两种情况则表明区间和跳跃表无交集,也就不需要遍历。
/* * Returns if there is a part of the zset is in range. * 判断跳跃表和区间是否存在交集 * */ int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; /* * Test for ranges that will always be empty. * 校验区间范围是否有效,无效则返回0表示查询结果为空: * 1.如果最小值大于最大值,则无效。 * 2.如果最小值等于最大值,且区间为:(min,max)、(min,max]、[min,max)则无效。 * */ if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex))) return 0; /* * 如果尾节点不为NULL,则把跳跃表最大分值zsl->tail.score与区间比较, * 如果range->minex不为0,则查询分值大于range->min的元素,如果跳跃表 * 最大分值zsl->tail.score小于等于range->min,则表示跳跃表和区间没有交集, * 无须遍历跳跃表查询;同理如果range->minex为0,则查询分值大于等于range->min * 的元素,如果zsl->tail.score小于range->min,则表示跳跃表和区间没有交集, * 也无须遍历跳跃表查询。 */ x = zsl->tail; if (x == NULL || !zslValueGteMin(x->score, range)) return 0; /* * 如果头节点L0层的前驱节点不为NULL,则把跳跃表最小分值zsl->header->level[0].forward.score * 与区间比较,如果range->maxex不为0,则查询分值小于range->maxex的元素,如果跳跃表最小 * 分值zsl->header->level[0].forward.score大于等于range->max,则表示跳跃表和区间没有交集, * 无须遍历跳跃表查询;同理如果range->maxex为0,则查询分值小于等于range->maxex的元素,如果跳跃表 * 最小分值zsl->header->level[0].forward.score大于range->maxex,则表示跳跃表和区间没有交集, * 也无须遍历跳跃表查询。 */ x = zsl->header->level[0].forward; if (x == NULL || !zslValueLteMax(x->score, range)) return 0; //跳跃表和区间存在交集,需要遍历跳跃表查询。 return 1; }
跳跃表允许我们根据分值区间进行正向查询(分值从小到大)或逆向查询(分值从大到小),如果是正向查询,则调用zslFirstInRange()方法,先判断跳跃表和指定区间是否存在交集,如果存在则查找指定区间内的分值最小的节点并返回。
/* * Find the first node that is contained in the specified range. * Returns NULL when no element is contained in the range. * 查找落在指定区间的第一个节点,如果没有元素落在这个区间则返回NULL。 * */ zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; int i; /* * If everything is out of range, return early. * 如果跳跃表和区间没有交集则无须遍历,直接返回NULL。 * */ if (!zslIsInRange(zsl, range)) return NULL; //从头节点的最高层开始遍历 x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { /* * Go forward while *OUT* of range. * 如果x->level[i].forward不为NULL,根据其分值x->level[i].forward->score和 * range->minex判断前驱节点是否能前进。 * 这里分两种情况: * range->minex不为0:判断前驱节点的分值是否大于range->min,如果小于等于的话 * expression=zslValueGteMin(x->level[i].forward->score, range)为0, * 代表需要前进,找到大于min的节点,而!expression为1,while条件成立,x会 * 前进到它的前驱节点。当x的前驱节点的分值大于min,就会停止循环,x会停留在区间内 * 第一个节点的后继节点。 * range->minex为0:判断前驱节点的分值是否大于等于range->min,如果小于的话, * expression=zslValueGteMin(x->level[i].forward->score, range),expression为0, * 需要前进,找到大于等于min的节点,而!expression为1,while条件成立,x会 * 前进到它的前驱节点。当x的前驱节点的分值大于等于min,就会停止循环,x会停留在区间内 * 第一个节点的后继节点。 * */ while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score, range)) x = x->level[i].forward; } /* * This is an inner range, so the next node cannot be NULL. * 上面的循环会让x停留在区间内第一个节点的后继节点,为了达到区间内的 * 第一个节点,x要在L0层前进到它的前驱节点。 * */ x = x->level[0].forward; serverAssert(x != NULL); /* * Check if score <= max. * range->maxex不为0:如果区间内第一个节点的分值大于等于spec->max, * expression=zslValueLteMax(x->score, range),expression结果为0 * !expression为1,表示查询异常,返回NULL。 * range->maxex为0:如果区间内第一个节点的分值大于spec->max, * 则expression=zslValueLteMax(x->score, range),expression结果为0 * !expression为1,表示查询异常,返回NULL。 * */ if (!zslValueLteMax(x->score, range)) return NULL; return x; }
如果要进行逆向查询,则调用zslLastInRange(),这里同样先判断跳跃表是否和区间存在交集,只有存在交集才会进行下一步的判断,查找指定区间内分值最大的节点并返回。
/* * Find the last node that is contained in the specified range. * Returns NULL when no element is contained in the range. * 查找落在指定区间的最后一个节点,如果没有元素落在这个区间则返回NULL。 * */ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; int i; /*If everything is out of range, return early.*/ if (!zslIsInRange(zsl, range)) return NULL;<br> //从头节点的最高层开始遍历 x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { /* * Go forward while *IN* range. * 根据区间range和前驱节点的分值判断是否前进,如果x->level[i].forward * 不为NULL,根据range->maxex判断前驱节点是否能前进。 * 这里分两种情况: * 如果range->maxex不为0,且前驱节点的分值小于range->max,则可以前进。 * 如果range->maxex为0,且前驱节点的分值小于等于range->max,则可以前进。 * */ while (x->level[i].forward && zslValueLteMax(x->level[i].forward->score, range)) x = x->level[i].forward; } /* This is an inner range, so this node cannot be NULL. */ serverAssert(x != NULL); /* * Check if score >= min. * 如果range->minex不为0,x的分值小于或等于range->min,代表查询出现异常,则返回NULL。 * 如果range->minex为0,x的分值小于range->min,代表查询出现异常,则返回NULL。 * */ if (!zslValueGteMin(x->score, range)) return NULL; return x; }
在了解完上面的内容后,下面我们要步入正题:如何根据分值区间进行正向或逆向查找节点。在下面代码<1>处,会根据逆向状态选择ln是指向区间分值最大的节点,或是分值最小的节点。在定位到起始节点后,会在<2>处的while循环对节点进行偏移,如果到达偏移位置后的ln不为NULL,则会进入<3>处的while循环,查找分值落在区间内的节点,这里会根据逆向状态是否不为0,决定是用backward指针后退,还是向L0层的前驱节点递进,一直到分值不落在区间内跳出while循环,或者ln为NULL,又或者limit为0结束while循环。如果我们没有指定偏移(offset)和返回数量(limit),则不会进行偏移,limit默认值为-1,limit--永远不为0,这里会返回落在区间内的所有节点,能结束while循环只有遇到分值不落在区间内的节点,或者是ln为NULL。
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ void genericZrangebyscoreCommand(client *c, int reverse) { zrangespec range;//指定区间 robj *key = c->argv[1]; robj *zobj; long offset = 0, limit = -1;//偏移和结果返回数量 int withscores = 0; unsigned long rangelen = 0; void *replylen = NULL; int minidx, maxidx; /* * Parse the range arguments. * 解析范围参数 * ZRANGEBYSCORE key min max和ZREVRANGEBYSCORE key max min两个命令 * 都是此函数实现的,如果客户端输入的命令为ZRANGEBYSCORE,则reverse为0,按 * 从小到大查找分值及元素,分值小的在前,分值大的在后。如果客户端输入的命令为 * ZREVRANGEBYSCORE,则reverse不为0,按从大到小查找分值及元素,分值大的在前, * 分值小的在后。 * * */ if (reverse) { /* Range is given as [max,min] */ maxidx = 2; minidx = 3; } else { /* Range is given as [min,max] */ minidx = 2; maxidx = 3; } if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) { addReplyError(c, "min or max is not a float"); return; } /* * Parse optional extra arguments. Note that ZCOUNT will exactly have * 4 arguments, so we'll never enter the following code path. * 遍历可选参数,这里会判断是否要返回分值(withscores),是否要对查询结果进行偏移(offset)和数量(limit)的限制 * */ if (c->argc > 4) { int remaining = c->argc - 4; int pos = 4; while (remaining) { if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr, "withscores")) { pos++; remaining--; withscores = 1; } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr, "limit")) { if ((getLongFromObjectOrReply(c, c->argv[pos + 1], &offset, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[pos + 2], &limit, NULL) != C_OK)) { return; } pos += 3; remaining -= 3; } else { addReply(c, shared.syntaxerr); return; } } } /* * Ok, lookup the key and get the range * 如果key所对应的zobj不存在,或者zobj的类型不为zset,则退出。 * */ if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL || checkType(c, zobj, OBJ_ZSET)) return; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { //压缩列表流程... } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//zobj类型为跳跃表 zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; /* * If reversed, get the last node in range as starting point. * 如果是逆向查询,ln会指向区间分值最大的节点,如果是正向查询,ln则指向区间分值最小的节点。 * */ if (reverse) { ln = zslLastInRange(zsl, &range); } else { ln = zslFirstInRange(zsl, &range); } /* * No "first" element in the specified interval. * 如果没有落在区间的开始节点则退出 * */ if (ln == NULL) { addReply(c, shared.emptyarray); return; } /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ replylen = addReplyDeferredLen(c); /* * If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. * <2>如果有偏移量,则根据reverse状态选择是后退还是递进,以达到偏移量。 * 如果客户端有传入偏移,则offset不为0,这里会循环到offset为0或ln为NULL时跳出循环。 * 否则offset默认值为0,不会进入此循环。 * */ while (ln && offset--) { if (reverse) { ln = ln->backward; } else { ln = ln->level[0].forward; } } /* * <3>如果客户端有传入偏移和数量,则limit不为0,此时会根据reverse状态后退或者前进至 * ln为NULL或者limit为0,否则limit默认值为-1,limit--永远为true(注:只要limit * 不为0,则永远为true,即便是负数),这里就会循环到ln为NULL时,获取所有分值符合区间 * 节点的。 * 除了limit为0,或者ln为NULL会跳出while循环,在<4>处还会根据reverse状态判断分值是否在区间, * 如果不在则跳出循环,如果分值符合区间,还会在<5>处根据reverse状态选择是后退到后一个节点(ln->backward), * 还是前进到前一个节点(ln->level[0].forward)。 */ while (ln && limit--) { /*Abort when the node is no longer in range.*/ if (reverse) {//<4> if (!zslValueGteMin(ln->score, &range)) break; } else { if (!zslValueLteMax(ln->score, &range)) break; } rangelen++; if (withscores && c->resp > 2) addReplyArrayLen(c, 2); addReplyBulkCBuffer(c, ln->ele, sdslen(ln->ele)); if (withscores) addReplyDouble(c, ln->score); /* Move to next node */ if (reverse) {//<5> ln = ln->backward; } else { ln = ln->level[0].forward; } } } else { serverPanic("Unknown sorted set encoding"); } if (withscores && c->resp == 2) rangelen *= 2; setDeferredArrayLen(c, replylen, rangelen); }
最后,我们要了解如何获取一个元素在跳跃表中的索引,其实这里面的逻辑也是非常的简单,我们先从字典上获取节点的分值,然后根据分值及元素获取其在跳跃表中的索引,这里依旧支持正向查询或逆向查询,如果是正向查询,分值越小,索引越小,如果分值相等,则元素越小,索引越小;如果是逆向查询,则分值越大,索引越小,如果分值相等,则元素越大,索引越小。
/* Given a sorted set object returns the 0-based rank of the object or * -1 if the object does not exist. * 返回元素在有序集合中的索引,如果返回-1则代表元素不在有序集合内。 * * For rank we mean the position of the element in the sorted collection * of elements. So the first element has rank 0, the second rank 1, and so * forth up to length-1 elements. * 在跳跃表中第一个元素的索引为0,第二个元素索引为1,以此类推,最后一个元素索引为length-1。 * * If 'reverse' is false, the rank is returned considering as first element * the one with the lowest score. Otherwise if 'reverse' is non-zero * the rank is computed considering as element with rank 0 the one with * the highest score. * 如果reverse为0,跳跃表索引从分值最小的节点开始,即zsl->header.level[0].forward索引为0、 * zsl->header.level[0].forward.level[0].forward索引为1,zsl->tail索引为zsl-length-1; * 如果reverse不为0,跳跃表索引从分值最大的节点开始,即zsl->tail索引为0,zsl->tail.backward索引 * 为1,zsl->header.level[0].forward索引为zsl->length-1 * */ long zsetRank(robj *zobj, sds ele, int reverse) { unsigned long llen; unsigned long rank; llen = zsetLength(zobj); if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { //压缩列表逻辑…… } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; dictEntry *de; double score; de = dictFind(zs->dict, ele); if (de != NULL) { /* * 如果元素存在在跳跃表上,则获取元素的分支,并根据 * 分支判断其在跳跃表中的跨度,根据跨度计算节点在跳跃表 * 中的索引。 * 如果是正向查询(reverse为0),则索引为跨度(rank)-1。 * 如果是逆向查询(reverse不为0),则索引为跳跃表长度(zsl->length)-跨度(rank)。 */ score = *(double *) dictGetVal(de); rank = zslGetRank(zsl, score, ele); /* Existing elements always have a rank. */ serverAssert(rank != 0); if (reverse) return llen - rank; else return rank - 1; } else {//如果元素不在跳跃表上,则返回-1 return -1; } } else { serverPanic("Unknown sorted set encoding"); } }
获取完分值后,需要定位节点在跳跃表中的跨度,然后根据逆向状态及跨度,计算节点在跳跃表中的索引。
/* Find the rank for an element by both score and key. * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. * 根据给定的分支和元素查找其节点在跳跃表中的跨度,返回0代表节点不存在。 * */ unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { zskiplistNode *x; unsigned long rank = 0; int i; //从头节点最高层遍历,如果能前进到前一个节点,则把当前节点的跨度加到rank上 x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) <= 0))) { rank += x->level[i].span; x = x->level[i].forward; } /* * x might be equal to zsl->header, so test if obj is non-NULL * x可能停留在头节点,此处判断是保证节点的元素不为NULL。 * */ if (x->ele && sdscmp(x->ele, ele) == 0) { return rank; } } return 0; }
至此,笔者和大家一起学习了Redis跳跃表的是如何插入节点、删除节点、更新节点以及如何对跳跃表中的节点进行不同维度(索引、分值)的查询。跳跃表是一种应用相当广泛的数据结构,很多场景下人们都用跳跃表代替B-Tree,因为跳跃表和B-Tree有着一样的查询时间复杂度O(logN),但跳跃表的实现却比B-Tree简单很多。而Redis正是借助了跳跃表的思路实现了有序集合,使得很多需要存储、排序海量数据的业务得以实现。
当然,由于笔者的时间精力有限,这里并没有完全介绍所有跳跃表命令的相关实现,但笔者相信能看到这里的人,基本已经掌握了跳跃表的整体脉络。