本文参考源码版本为 redis6.2
前缀树是字符串查找时,经常使用的一种数据结构,能够在一个字符串集合中快速查找到某个字符串。
由于树中每个节点只存储字符串中的一个字符,故而有时会造成空间的浪费。Rax 的出现就是为了解决这一问题。Redis 对于 Rax 的解释为 A radix tree implement,基数树的一种实现。Rax中不仅可以存储字符串,同时还可以为这个字符串设置一个值,也就是key-value。
Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,其特点是,保存在树上的每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。
前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。
不同的是,rax 在前缀树上做了一些优化,来,继续往下看~
raxNode定义:
typedef struct raxNode { uint32_t iskey:1; //节点是否包含key uint32_t isnull:1; //节点的值是否为NULL uint32_t iscompr:1; //节点是否被压缩 uint32_t size:29; //节点大小 unsigned char data[]; //节点的实际存储数据 } raxNode;
该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。
这 4 个元数据就对应了压缩节点和非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用 29 bit。
另外,从 raxNode 结构体中,可以看到,除了元数据,该结构体中还有 char 类型数组 data。我们知道,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:
在 raxNode 的实现中,无论是非压缩节点还是压缩节点,具有两个特点:
而这两个特点给 Radix Tree 实际保存数据时的结构,带来了两个方面的变化。
如果你觉得晦涩不易懂,坚持,继续往下看~
这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符。
同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key 对应的 value 的指针。
结合 raxNode 结构来看看,如果是一个非压缩 node ,当我们有size个字符(bytes),就会有相对应size 个指向子节点 raxNode 的指针;
需要注意的是,字符不是存在子节点,而是存在于父节点中(可以思考下, 为什么不存在子节点中?)。
[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串。
和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个 key 对应的 value 的指针。
注意,压缩节点仅有一个子节点
[header iscompr=1][abc][c-ptr](value-ptr?)
假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在 [] 里面,反之写在 () 里面。
(f) "" \ (o) "f" \ (o) "fo" \ [t b] "foo" / \ "foot" (e) (a) "foob" / \ "foote" (r) (r) "fooba" / \ "footer" [] [] "foobar"
然而,这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种:
["foo"] "" | [t b] "foo" / \ "foot" ("er") ("ar") "foob" / \ "footer" [] [] "foobar"
这便是Radix Tree的模型图了。
不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:
(f) "" / (i o) "f" / \ "fi" ("rst") (o) "fo" / \ "first" [] [t b] "foo" / \ "foot" ("er") ("ar") "foob" / \ "footer" [] [] "foobar"
rax提供了查找key的接口raxFind,该接口用于获取key对应的value值,定义如下:
/* Find a key in the rax, returns raxNotFound special void pointer value * if the item was not found, otherwise the value associated with the * item is returned. */ void *raxFind(rax *rax, unsigned char *s, size_t len) { raxNode *h; int splitpos = 0; size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL); if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) return raxNotFound; return raxGetData(h); }
其中,该方法的作用有是,在rax中查找长度为 len 的字符串 s (s为rax中的key),找到之后返回 key 对应的 value。
可以看到,具体的查询逻辑交给了 raxLowWalk 去完成,该方法作为底层方法,在增删查改操作中均有调用,接口定义为:
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts)
其中,入参定义为:
该函数返回 s 的匹配长度,当 s ! = len 时,表示未查找到该key;当s == len时,需要检验 stopnode 是否为 key,并且当 stopnode 为压缩节点时,还需要检查splitpos是否为0(可能匹配到某个压缩节点中间的某个元素),具体实现如下:
raxNode *h = rax->head; // 从跟节点开始匹配 raxNode **parentlink = &rax->head; size_t i = 0; /* Position in the string. */ size_t j = 0; /* Position in the node children (or bytes if compressed).*/ while(h->size && i < len) { unsigned char *v = h->data; if (h->iscompr) { // 压缩节点,挨个比对,相同则继续往下找;反之,则退出,说明匹配到这就开始不同或者已经到末尾了 for (j = 0; j < h->size && i < len; j++, i++) { if (v[j] != s[i]) break; } if (j != h->size) break; } else { // 非压缩节点,只要找到一个相同的就可以继续往下,在子节点中查找了 /* Even when h->size is large, linear scan provides good * performances compared to other approaches that are in theory * more sounding, like performing a binary search. */ for (j = 0; j < h->size; j++) { if (v[j] == s[i]) break; } if (j == h->size) break; // 可以匹配到当前字符,继续往下匹配下一个字符,然后会移动到子节点进行匹配 i++; } // 记录路径信息 if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */ // 找到第一个子节点的地址 raxNode **children = raxNodeFirstChildPtr(h); if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */ // 将当前节点移动到其第j个子节点 memcpy(&h,children+j,sizeof(h)); parentlink = children+j; j = 0; /* If the new node is non compressed and we do not iterate again (since i == len) set the split position to 0 to signal this node represents the searched key. */ } if (stopnode) *stopnode = h; if (plink) *plink = parentlink; if (splitpos && h->iscompr) *splitpos = j; return i;
实现逻辑:
插入元素比较复杂,源码中也花了大量注释说明。
向 rax 中插入 key-value 对,对于已存在的 key, rax 提供了2种方案,覆盖或者不覆盖原有的 value,对应的接口分别为 raxInsert、raxTryInsert。
插入操作的真正实现函数 raxGenericInsert,方法定义如下:
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)
第一步,当完全匹配并且目标位置的节点不需要切割
size_t i; int j = 0; // 切割位置,当定位到具体的压缩节点时,j用于定位该压缩节点待插入的位置 raxNode *h, **parentlink; // 在插入元素前,会通过 raxLowWalk 判断 key 是否存在或者找到待插入位置。 i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL); // 1. i == len(代表已经匹配到了整个字符串) // 2. 如果不是压缩节点 或者 是压缩节点但该压缩节点不需要切割(j==0) // 注:对于压缩节点node,如果要在node中插入新元素 或者 该node中代表了这次要插入元素的key, 那么将会重新分配该node, 确保空间足够 if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) { /* Make space for the value pointer if needed. */ if (!h->iskey || (h->isnull && overwrite)) { h = raxReallocForData(h,data); if (h) memcpy(parentlink,&h,sizeof(h)); } if (h == NULL) { errno = ENOMEM; return 0; } // 如果key存在,根据方法入参进行处理(获取旧数据或者重写) if (h->iskey) { if (old) *old = raxGetData(h); if (overwrite) raxSetData(h,data); errno = 0; return 0; /* Element already exists. */ } // 给新增节点赋值 raxSetData(h,data); rax->numele++; return 1; /* Element inserted. */ }
第二步:对于需要切割的压缩节点,有几种情况:
假设我们当前所在节点 h 包含字符串 “ANNIBALE” ,h 有且仅有一个子节点(“SC”),并且该节点没有子节点,用 ‘0’ 表示,结构如下:
"ANNIBALE" -> "SCO" -> []
案例1: 插入 “ANNIENTARE”
|B| -> "ALE" -> "SCO" -> [] "ANNI" -> |-| |E| -> (... continue algo ...) "NTARE" -> []
需要将 “ANNIBALE” 拆分成3部分:压缩节点,非压缩节点,压缩节点
案例2: 插入 “ANNIBALI”
|E| -> "SCO" -> [] "ANNIBAL" -> |-| |I| -> (... continue algo ...) []
需要将 “ANNIBALE” 拆成2部分:压缩节点,非压缩节点。
案例3: 插入 “AGO”(和案例1类似,不过,需要在原节点中设置 iscompr = 0)
|N| -> "NIBALE" -> "SCO" -> [] |A| -> |-| |G| -> (... continue algo ...) |O| -> []
需要将 “ANNIBALE” 节点拆分为3部分:非压缩节点,非压缩节点,压缩节点。
案例4: 插入 “CIAO”
|A| -> "NNIBALE" -> "SCO" -> [] |-| |C| -> (... continue algo ...) "IAO" -> []
此时需要将 “ANNIBALE” 节点拆分为2部分:第一部分是非压缩节点,第二部分为压缩节点。
案例5: 插入 “ANNI”
"ANNI" -> "BALE" -> "SCO" -> []
将 “ANNIBALE” 拆分成2个压缩节点。
源码实现上,是用了两种算法覆盖以上5种案例:
算法1: 覆盖案例 1~4,出现字符不匹配的位置,是在某个压缩节点上。
SPLITPOS 代表压缩节点切割位置,例如 压缩节点包含 “ANNIBALE”,将插入 “ANNIENTARE”,那么 SPLITPOS = 4,也就是 未匹配元素的下标。
源码实现如下:
/* ------------------------- ALGORITHM 1 --------------------------- */ if (h->iscompr && i != len) { /* 1: Save next pointer. */ // 用 NEXT 保存 h的子节点指针 raxNode **childfield = raxNodeLastChildPtr(h); raxNode *next; memcpy(&next,childfield,sizeof(next)); if (h->iskey) { debugf("key value is %p\n", raxGetData(h)); } // 设置新增节点长度,这里 j 是压缩节点切割位置,因此压缩节点会被切割成两部分,长度分别是 j 和 postfixlen size_t trimmedlen = j; size_t postfixlen = h->size - j - 1; // 仔细想想?当压缩节点被切割成两部分,也就是说该压缩节点部分与 s 没有相同部分;如果该压缩节点本身 iskey, 那么切割后 split_node 自然也是 iskey (注意:iskey表示从根节点到压缩节点的父节点组成的key) int split_node_is_key = !trimmedlen && h->iskey && !h->isnull; size_t nodesize; /* 2: 创建 split node 节点,并分配空间 */ // 可以看到,这里会切割成三部分,当切割位置在第一个字符时(j==0),即trimmedlen=0 表示没有公共前缀,此时会切割成两部分 raxNode *splitnode = raxNewNode(1, split_node_is_key); // 切割位置 raxNode *trimmed = NULL; // 已匹配的前缀部分 raxNode *postfix = NULL; // 切割位置之后,未匹配的部分 // 如果匹配前缀大于0,分配新空间 if (trimmedlen) { nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+ sizeof(raxNode*); if (h->iskey && !h->isnull) nodesize += sizeof(void*); trimmed = rax_malloc(nodesize); } // 未匹配部分长度大于0,分配空间 if (postfixlen) { nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+ sizeof(raxNode*); postfix = rax_malloc(nodesize); } /* OOM? Abort now that the tree is untouched. */ // 分配失败 if (splitnode == NULL || (trimmedlen && trimmed == NULL) || (postfixlen && postfix == NULL)) { rax_free(splitnode); rax_free(trimmed); rax_free(postfix); errno = ENOMEM; return 0; } splitnode->data[0] = h->data[j]; // j == 0,意味着压缩节点只需拆分成两部分,split node 与 后缀部分已经拆分了,因此 parentlink 将指向 split node if (j == 0) { // 3a. 仔细想想?原压缩节点 iskey, 到这一步,split node 同样 iskey,因此将数据拷贝过去 if (h->iskey) { void *ndata = raxGetData(h); raxSetData(splitnode,ndata); } memcpy(parentlink,&splitnode,sizeof(splitnode)); } else { // 3b. 切割公共前缀,也就意味着,压缩节点将被分成三部分 trimmed->size = j; memcpy(trimmed->data,h->data,j); trimmed->iscompr = j > 1 ? 1 : 0; trimmed->iskey = h->iskey; trimmed->isnull = h->isnull; if (h->iskey && !h->isnull) { void *ndata = raxGetData(h); raxSetData(trimmed,ndata); } raxNode **cp = raxNodeLastChildPtr(trimmed); memcpy(cp,&splitnode,sizeof(splitnode)); memcpy(parentlink,&trimmed,sizeof(trimmed)); parentlink = cp; /* Set parentlink to splitnode parent. */ rax->numnodes++; } // 4. 创建 postfix 节点,即原压缩节点未匹配的后缀部分 if (postfixlen) { /* 4a: create a postfix node. */ postfix->iskey = 0; postfix->isnull = 0; postfix->size = postfixlen; postfix->iscompr = postfixlen > 1; memcpy(postfix->data,h->data+j+1,postfixlen); raxNode **cp = raxNodeLastChildPtr(postfix); memcpy(cp,&next,sizeof(next)); rax->numnodes++; } else { /* 4b: just use next as postfix node. */ postfix = next; } // 5. 设置 postfix 节点作为 splitnode 的第一个子节点 raxNode **splitchild = raxNodeLastChildPtr(splitnode); memcpy(splitchild,&postfix,sizeof(postfix)); // 6. 继续插入。以上只处理了原压缩节点,对于新插入key, 未匹配部分还未插入,源码稍后介绍;这里插入之后,splitnode将会多一个子节点 rax_free(h); h = splitnode; }
算法2:应对场景5,即在压缩节点某个位置处,完成了字符串 s 的匹配,此时,只需将压缩节点拆分成两部分即可。
例如,rax树中存在节点 “ANNIBALE”,将插入 “ANNI”,此时 SPLITPOS = 4。
源码实现如下:
else if (h->iscompr && i == len) { /* ------------------------- ALGORITHM 2 --------------------------- */ // 切割并分配 postfix 和 trimmed 节点 size_t postfixlen = h->size - j; size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+ sizeof(raxNode*); if (data != NULL) nodesize += sizeof(void*); raxNode *postfix = rax_malloc(nodesize); nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*); if (h->iskey && !h->isnull) nodesize += sizeof(void*); raxNode *trimmed = rax_malloc(nodesize); // 分配失败 if (postfix == NULL || trimmed == NULL) { rax_free(postfix); rax_free(trimmed); errno = ENOMEM; return 0; } /* 1: Save next pointer. */ raxNode **childfield = raxNodeLastChildPtr(h); raxNode *next; memcpy(&next,childfield,sizeof(next)); /* 2: Create the postfix node. */ postfix->size = postfixlen; postfix->iscompr = postfixlen > 1; postfix->iskey = 1; postfix->isnull = 0; memcpy(postfix->data,h->data+j,postfixlen); raxSetData(postfix,data); raxNode **cp = raxNodeLastChildPtr(postfix); memcpy(cp,&next,sizeof(next)); rax->numnodes++; /* 3: Trim the compressed node. */ trimmed->size = j; trimmed->iscompr = j > 1; trimmed->iskey = 0; trimmed->isnull = 0; memcpy(trimmed->data,h->data,j); memcpy(parentlink,&trimmed,sizeof(trimmed)); if (h->iskey) { void *aux = raxGetData(h); raxSetData(trimmed,aux); } /* Fix the trimmed node child pointer to point to * the postfix node. */ cp = raxNodeLastChildPtr(trimmed); memcpy(cp,&postfix,sizeof(postfix)); /* Finish! We don't need to continue with the insertion * algorithm for ALGO 2. The key is already inserted. */ rax->numele++; rax_free(h); return 1; /* Key inserted. */ }
到这里,还有一点工作需要完成, 那就是新key未匹配的部分,还需要插入rax树,源码实现如下:
while(i < len) { raxNode *child; // 这里就是判断要处理成压缩节点还是非压缩节点 if (h->size == 0 && len-i > 1) { debugf("Inserting compressed node\n"); size_t comprsize = len-i; if (comprsize > RAX_NODE_MAX_SIZE) comprsize = RAX_NODE_MAX_SIZE; raxNode *newh = raxCompressNode(h,s+i,comprsize,&child); if (newh == NULL) goto oom; h = newh; memcpy(parentlink,&h,sizeof(h)); parentlink = raxNodeLastChildPtr(h); i += comprsize; } else { debugf("Inserting normal node\n"); raxNode **new_parentlink; raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink); if (newh == NULL) goto oom; h = newh; memcpy(parentlink,&h,sizeof(h)); parentlink = new_parentlink; i++; } rax->numnodes++; h = child; } raxNode *newh = raxReallocForData(h,data); if (newh == NULL) goto oom; h = newh; if (!h->iskey) rax->numele++; raxSetData(h,data); memcpy(parentlink,&h,sizeof(h)); return 1; /* Element inserted. */
以上便是插入新key的全部过程。可以看出,实现上根据列举的5种案例进行处理。
rax 的删除操作主要有3个接口,可以删除 rax 中的某个 key,或者释放整个 rax,在释放 rax 时,还可以设置释放回调函数,在释放rax 的每个 key 时,都会调用这个回调函数,3个接口的定义如下:
// 在 rax 中删除长度为 len 的 s int raxRemove(rax *rax, unsigned char *s, size_t len, void **old); // 释放 rax void raxFree(rax *rax); // 释放 rax, 释放每个 key 时,都会调用 free_callback 方法 void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
其中,rax 的释放操作,采用的是深度优先算法。
下面重点介绍 raxRemove 方法:
第一步:当删除 rax 中的某个 key-value 对时,首先查找 key 是否存在,不存在则直接返回,存在则需要进行删除操作,实现如下:
raxNode *h; raxStack ts; raxStackInit(&ts); // 保存路径 int splitpos = 0; size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts); if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) { // 未找到目标 key, 直接退出 raxStackFree(&ts); return 0; }
第二步,如果 key 存在,则需要进行删除操作,删除操作完成后,rax 树可能需要进行压缩。具体可以分为下面 2 种情况,此处所说的压缩是指将某个节点与其子节点压缩成一个节点,叶子节点没有子节点,不能进行压缩。
案例1:
假设 rax 树存储了 “FOO” = 1 和 “FOOBAR” = 2,结构如下:
"FOO" -> "BAR" -> [] (2) (1)
删除 “FOO” ,压缩后,结构如下:
"FOOBAR" -> [] (2)
案例2:
假设 rax 树存储了 “FOOBAR” = 1 和 “FOOTER” = 2
|B| -> "AR" -> [] (1) "FOO" -> |-| |T| -> "ER" -> [] (2)
删除 “FOOTER” 后,结构如下:
"FOO" -> |B| -> "AR" -> [] (1)
可以压缩成的结果如下:
"FOOBAR" -> [] (1)
删除操作具体可以分为2个阶段,删除阶段以及压缩阶段:
1)删除阶段:
利用查找key时记录的匹配路径,依次向上直到无法删除为止。
if (h->size == 0) { debugf("Key deleted in node without children. Cleanup needed.\n"); raxNode *child = NULL; while(h != rax->head) { child = h; debugf("Freeing child %p [%.*s] key:%d\n", (void*)child, (int)child->size, (char*)child->data, child->iskey); rax_free(child); rax->numnodes--; h = raxStackPop(&ts); // 如果节点为 key,或者子节点 size != 1,则无法删除 if (h->iskey || (!h->iscompr && h->size != 1)) break; } if (child) { raxNode *new = raxRemoveChild(h,child); if (new != h) { raxNode *parent = raxStackPeek(&ts); raxNode **parentlink; if (parent == NULL) { parentlink = &rax->head; } else { parentlink = raxFindParentLink(parent,h); } memcpy(parentlink,&new,sizeof(new)); } // 删除后尝试进行压缩 if (new->size == 1 && new->iskey == 0) { trycompress = 1; h = new; } } } else if (h->size == 1) { // 可以尝试进行压缩 trycompress = 1; }
2)压缩阶段: 压缩过程可以细化为2步。
① 找到可以进行压缩的第一个元素,之后将所有可进行压缩的节点进行压缩。由于raxRowWalk函数已经记录了查找key的过程,压缩时只需从记录栈中不断弹出元素,即可找到可进行压缩的第一个元素,过程如下:
raxNode *parent; while(1) { parent = raxStackPop(&ts); if (!parent || parent->iskey || (!parent->iscompr && parent->size != 1)) break; h = parent; } // 可以进行压缩的第一个节点 raxNode *start = h;
② 找到第一个可压缩节点后,进行数据压缩。由于可压缩的节点都只有一个子节点,压缩过程只需要读取每个节点的内容,创建新的节点,并填充新节点的内容即可:
/* Scan chain of nodes we can compress. */ size_t comprsize = h->size; int nodes = 1; while(h->size != 0) { raxNode **cp = raxNodeLastChildPtr(h); memcpy(&h,cp,sizeof(h)); if (h->iskey || (!h->iscompr && h->size != 1)) break; /* Stop here if going to the next node would result into * a compressed node larger than h->size can hold. */ if (comprsize + h->size > RAX_NODE_MAX_SIZE) break; nodes++; comprsize += h->size; } if (nodes > 1) { /* If we can compress, create the new node and populate it. */ size_t nodesize = sizeof(raxNode)+comprsize+raxPadding(comprsize)+sizeof(raxNode*); raxNode *new = rax_malloc(nodesize); /* An out of memory here just means we cannot optimize this * node, but the tree is left in a consistent state. */ if (new == NULL) { raxStackFree(&ts); return 1; } new->iskey = 0; new->isnull = 0; new->iscompr = 1; new->size = comprsize; rax->numnodes++; /* Scan again, this time to populate the new node content and * to fix the new node child pointer. At the same time we free * all the nodes that we'll no longer use. */ comprsize = 0; h = start; while(h->size != 0) { memcpy(new->data+comprsize,h->data,h->size); comprsize += h->size; raxNode **cp = raxNodeLastChildPtr(h); raxNode *tofree = h; memcpy(&h,cp,sizeof(h)); rax_free(tofree); rax->numnodes--; if (h->iskey || (!h->iscompr && h->size != 1)) break; } /* Now 'h' points to the first node that we still need to use, * so our new node child pointer will point to it. */ raxNode **cp = raxNodeLastChildPtr(new); memcpy(cp,&h,sizeof(h)); /* Fix parent link. */ if (parent) { raxNode **parentlink = raxFindParentLink(parent,start); memcpy(parentlink,&new,sizeof(new)); } else { rax->head = new; } }
为了能够遍历 rax 中所有的 key, Redis 提供了迭代器操作。Redis 中实现的迭代器为双向迭代器,可以向前,也可以向后,顺序是按照 key 的字典序排列的。通过 rax 的结构图可以看出,如果某个节点为 key,则其子节点的 key 按照字典序比该节点的 key 大。另外,如果当前节点为非压缩节点,则其最左侧节点的key是其所有子节点的 key 中最小的。主要方法有:
void raxStart(raxIterator *it, rax *rt); int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len); int raxNext(raxIterator *it); int raxPrev(raxIterator *it);
1)raxStart 用于初始化 raxIterator 结构:
/* Initialize a Rax iterator. This call should be performed a single time * to initialize the iterator, and must be followed by a raxSeek() call, * otherwise the raxPrev()/raxNext() functions will just return EOF. */ void raxStart(raxIterator *it, rax *rt) { it->flags = RAX_ITER_EOF; /* No crash if the iterator is not seeked. */ it->rt = rt; it->key_len = 0; it->key = it->key_static_string; it->key_max = RAX_ITER_STATIC_LEN; it->data = NULL; it->node_cb = NULL; raxStackInit(&it->stack); }
2)raxSeek,在raxStart初始化迭代器后,必须调用raxSeek函数初始化迭代器的位置。
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
查找末尾元素可以直接在Rax中找到最右侧的叶子节点,查找首个元素被转换为查找大于等于空的操作。
处理大于、小于、等于等操作主要分为以下几步:
3)raxNext & raxPrev,主要用于迭代上一个或者下一个key
/* Go to the next element in the scope of the iterator 'it'. * If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is * returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */ int raxNext(raxIterator *it) { if (!raxIteratorNextStep(it,0)) { errno = ENOMEM; return 0; } if (it->flags & RAX_ITER_EOF) { errno = 0; return 0; } return 1; } /* Go to the previous element in the scope of the iterator 'it'. * If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is * returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */ int raxPrev(raxIterator *it) { if (!raxIteratorPrevStep(it,0)) { errno = ENOMEM; return 0; } if (it->flags & RAX_ITER_EOF) { errno = 0; return 0; } return 1; }
4)raxStop & raxEOF:
raxEOF用于标识迭代器迭代结束,raxStop用于结束迭代并释放相关资源:
/* Free the iterator. */ void raxStop(raxIterator *it) { if (it->key != it->key_static_string) rax_free(it->key); raxStackFree(&it->stack); } /* Return if the iterator is in an EOF state. This happens when raxSeek() * failed to seek an appropriate element, so that raxNext() or raxPrev() * will return zero, or when an EOF condition was reached while iterating * with raxNext() and raxPrev(). */ int raxEOF(raxIterator *it) { return it->flags & RAX_ITER_EOF; }
首先,要理解 rax 底层实现,第一步需要理解 rax 的结构;其次,要理解 压缩节点 和 非压缩节点 的区别
来,再回顾下 压缩节点 和 非压缩节点 的区别:
它们的相同之处在于:
不同之处在于:
相关参考:
redis readme.md
Redis · 引擎特性 · radix tree 源码解析