1. 存储结构
在 有序集合对象 ZSet 的介绍中已经提到 ZSet 集合的底层存储结构主要有两种,其结构示例如下:
OBJ_ENCODING_ZIPLIST
当 ziplist 作为 zset 的底层存储结构时,每个集合元素使用两个紧挨在一起的压缩列表节点来保存,第一个节点保存元素值,第二个元素保存元素的分值,而且分值小的靠近表头,大的靠近表尾
OBJ_ENCODING_SKIPLIST
底层实现是跳跃表结合字典。每个跳跃表节点都保存一个集合元素,并按分值从小到大排列,每个节点的 ele 属性保存了元素的值,score属性保存分值;字典的每个键值对保存一个集合元素,元素值包装为字典的键,元素分值保存为字典的值。注意集合的元素成员和分值是共享的,跳跃表和字典通过指针指向同一地址,不会浪费内存
2. 源码解析
2.1 存储过程分析
对有序集合 ZSet 的操作命令的处理函数在t_zset.c 文件中,向 ZSet 添加元素调用的函数为t_zset.c#zaddCommand()。从源码可以看到,这个函数其实主要调用了t_zset.c#zaddGenericCommand(),zaddGenericCommand() 的主要逻辑如下:
解析参数得到每个元素及其对应的分值
查找key对应的 ZSet 集合对象是否存在,不存在则创建。此处创建新的 ZSet 集合对象有两个配置需要注意,如果 zset_max_ziplist_entries(默认 128) 为 0 或者要存储的元素长度大于 zset_max_ziplist_value(默认 64),则创建底层结构为 OBJ_ENCODING_SKIPLIST 的有序集合对象,否则创建底层结构为 OBJ_ENCODING_ZIPLIST 的有序集合对象
对于所有的元素,循环调用 t_zset.c#zsetAdd()函数将其添加到有序集合中
void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); } void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; /* The following vars are used in order to track what the command actually * did during the execution, to reply to the client and to trigger the * notification of keyspace change. */ int added = 0; /* Number of new elements added. */ int updated = 0; /* Number of elements with updated score. */ int processed = 0; /* Number of elements processed, may remain zero with options like XX. */ /* Parse options. At the end 'scoreidx' is set to the argument position * of the score of the first score-element pair. */ scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } /* Turn options into simple to check vars. */ int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; /* After the options, we expect to have an even number of args, since * we expect any number of score-element pairs. */ elements = c->argc-scoreidx; if (elements % 2 || !elements) { addReply(c,shared.syntaxerr); return; } elements /= 2; /* Now this holds the number of score-element pairs. */ /* Check for incompatible options. */ if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } /* Start parsing all the scores, we need to emit any syntax error * before executing additions to the sorted set, as the command should * either execute fully or nothing at all. */ scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } /* Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { zobj = createZsetObject(); } else { zobj = createZsetZiplistObject(); } dbAdd(c->db,key,zobj); } else { if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } for (j = 0; j < elements; j++) { double newscore; score = scores[j]; int retflags = flags; ele = c->argv[scoreidx+1+j*2]->ptr; int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; score = newscore; } server.dirty += (added+updated); reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c,score); else addReplyNull(c); } else { /* ZADD. */ addReplyLongLong(c,ch ? added+updated : added); } cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
ZSet 集合对象的创建函数在 object.c
中,源码如下,可以看到其两种存储结构是有明显区别的
robj *createZsetObject(void) { zset *zs = zmalloc(sizeof(*zs)); robj *o; zs->dict = dictCreate(&zsetDictType,NULL); zs->zsl = zslCreate(); o = createObject(OBJ_ZSET,zs); o->encoding = OBJ_ENCODING_SKIPLIST; return o; } robj *createZsetZiplistObject(void) { unsigned char *zl = ziplistNew(); robj *o = createObject(OBJ_ZSET,zl); o->encoding = OBJ_ENCODING_ZIPLIST; return o; }
t_zset.c#zsetAdd()函数实现较长,但是逻辑比较清晰:
首先判断如果存储结构是 ziplist,在执行添加的过程中需要调用函数 t_zset.c#zzlFind() 区分元素存在和不存在两种情况。另外需注意,函数 t_zset.c#zzlInsert() 将元素插入到 ziplist 中是根据 score 分数有序插入的,分数小的在 ziplist 头部,分数大的在尾部,元素和分数分两次操作插入到 ziplist 中,并且紧挨在一起
存在的情况下如果元素 score 改变了,则先删除 ziplist 中的元素然后重新添加
不存在的情况下直接向 ziplist 添加元素,插入完成后需要考虑元素的长度是否超出限制或 ziplist 存储的元素个数是否超过最大限制,进而决定是否将底层结构转为 skiplist
如果存储结构是 skiplist,同样需要区分元素存在和不存在两种情况。需要注意,在跳跃表中元素是按照分数有序存储的,这样范围查找的时间复杂度为 O(logN);使用字典则能达到单个元素查找O(1)的时间复杂度,非常高效
存在的情况并且 score 分数发生变化的话,调用 t_zset.c#zslUpdateScore() 函数在跳跃表中查找到目标节点,将其删除并重新插入一个节点,完成后返回新的跳跃表节点用于更新字典中元素的分数
不存在情况下就直接调用t_zset.c#zslInsert() 函数往跳跃表添加节点,完成后同时往字典新增一个节点
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Turn options into simple to check vars. */ int incr = (*flags & ZADD_INCR) != 0; int nx = (*flags & ZADD_NX) != 0; int xx = (*flags & ZADD_XX) != 0; *flags = 0; /* We'll return our response flags. */ double curscore; /* NaN as input is an error regardless of all the other parameters. */ if (isnan(score)) { *flags = ZADD_NAN; return 0; } /* Update the sorted set according to its encoding. */ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { /* NX? Return, same element already exists. */ if (nx) { *flags |= ZADD_NOP; return 1; } /* Prepare the score for the increment if needed. */ if (incr) { score += curscore; if (isnan(score)) { *flags |= ZADD_NAN; return 0; } if (newscore) *newscore = score; } /* Remove and re-insert when score changed. */ if (score != curscore) { zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); *flags |= ZADD_UPDATED; } return 1; } else if (!xx) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries || sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (newscore) *newscore = score; *flags |= ZADD_ADDED; return 1; } else { *flags |= ZADD_NOP; return 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; de = dictFind(zs->dict,ele); if (de != NULL) { /* NX? Return, same element already exists. */ if (nx) { *flags |= ZADD_NOP; return 1; } curscore = *(double*)dictGetVal(de); /* Prepare the score for the increment if needed. */ if (incr) { score += curscore; if (isnan(score)) { *flags |= ZADD_NAN; return 0; } if (newscore) *newscore = score; } /* Remove and re-insert when score changes. */ if (score != curscore) { znode = zslUpdateScore(zs->zsl,curscore,ele,score); /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. */ dictGetVal(de) = &znode->score; /* Update score ptr. */ *flags |= ZADD_UPDATED; } return 1; } else if (!xx) { ele = sdsdup(ele); znode = zslInsert(zs->zsl,score,ele); serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); *flags |= ZADD_ADDED; if (newscore) *newscore = score; return 1; } else { *flags |= ZADD_NOP; return 1; } } else { serverPanic("Unknown sorted set encoding"); } return 0; /* Never reached. */ }
2.2 数据存储结构 zskiplist
2.2.1 存储结构定义
Redis 中的有序集合 ZSet 有属于它自己的数据结构,其定义在server.h文件中,源码如下:
可以看到 OBJ_ENCODING_SKIPLIST 编码的 ZSet 以 zset 结构体封装数据,其内部包含了两个关键的数据结构
dict :字典,键为成员,值为分值,可用于支持 O(1) 复杂度的按元素取分值操作
zsl: 跳跃表,按分值排序元素,用于支持平均复杂度为 O(log N) 的按分值定位元素的操作以及范围操作
typedef struct zset { dict *dict; zskiplist *zsl; } zset;
数据结构 zskiplist 的定义如下,可以看到其包含的关键数据如下:
header: 跳跃表头节点
tail: 跳跃表尾节点
length: 即链表包含的节点总数。新创建的 skiplist 包含一个空的头指针,这个头指针不包含在 length 计数中
level: 表中层数最大的节点的层数
typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist;
数据结构 zskiplistNode
是zskiplist
的单个节点的定义,其包含的关键属性如下:
ele
: 动态字符串对象,用于存储元素score
: 元素的分值backward
: 当前节点的上一个节点level[]
: 跳跃表层数数组,保存了每一层指向的下一个节点
typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned long span; } level[]; } zskiplistNode;
2.2.2 关键函数
2.2.2.1 跳跃表的构建
t_zset.c#zslInsert() 是跳跃表中非常关键的函数,这里体现了跳跃表的构建过程及其查找策略。可以看到 skiplist 为每个节点随机出一个层数(level),比如一个节点随机出的层数是3,那么就把它链入到从第1层到第3层这三层链表中
首先通过分数 score 在跳跃表中查找到新增的节点应该插入的位置上的节点
调用 zslRandomLevel() 函数随机生成节点层数,使用该层数新建一个跳跃表节点
更新新增的节点应插入位置上的节点前后指针,将新增节点插入到跳跃表中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; } int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
2.2.2.2 跳跃表的查找
在以上三层链表结构上,如果需要查找15,查找的路径是沿着图中的红色箭头所指向的方向进行的15 首先和 10 比较,比 10 大,继续与后继节点比较
10 的后继节点为 NULL,说明最上层的链表已经结束了,回到第二层链表继续比较。此时 10 的后继节点为 20,15 比 20 小,回到 10 的第一层链表
10 的一层链表后继节点为 15,待查数据找到了
以上过程中沿着最上层链表首先要比较的是10,发现待查数据比10大,接下来就只需要到 10 的后面去继续查找,从而跳过了 10 前面的所有节点。这样当链表足够长的时候,这种多层链表的查找方式就能跳过很多下层节点,大大加快查找的速度