Redis使用哈希表来存放键值对数据,在插入新键值对数据时,会先按照”key“来计算哈希值,再根据哈希值和哈希表的sizemask来计算出该”key“在对于哈希数组中的索引值,然后将键值对数据封装成dictEntry对象并放入到索引值对应的哈希数组中。
不同的Key经过相同哈希函数计算后可能得到相同的哈希值和索引值,当两个或多个Key被分配到哈希数组相同的索引位置时,成为哈希冲突,Redis使用单向链表的方式来解决哈希冲突。最后增加的键值对数据会被放置在单向链表的头部位置。
/* Add an element to the target hash table */ int dictAdd(dict *d, void *key, void *val) { /* 使用dictAddRaw来创建一个新的entry */ dictEntry *entry = dictAddRaw(d,key,NULL); if (!entry) return DICT_ERR; /* 设置新entry的value数据 */ dictSetVal(d, entry, val); return DICT_OK; } /* Low level add or find: * This function adds the entry but instead of setting a value returns the * dictEntry structure to the user, that will make sure to fill the value * field as he wishes. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value, example: * * entry = dictAddRaw(dict,mykey,NULL); * if (entry != NULL) dictSetSignedIntegerVal(entry,1000); * * Return values: * * If key already exists NULL is returned, and "*existing" is populated * with the existing entry if existing is not NULL. * * If key was added, the hash entry is returned to be manipulated by the caller. */ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) { long index; dictEntry *entry; dictht *ht; if (dictIsRehashing(d)) _dictRehashStep(d); /* Get the index of the new element, or -1 if * the element already exists. */ /* 获取到新增key在哈希数组中的索引位置 */ if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) return NULL; /* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ /* 找到当前使用的哈希表, * 如果处于rehash状态则新增的键值对数据放入到ht[1]中 * 否则将新增的键值对数据放入到ht[0]中 */ ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; /* 创建新的entry对象 */ entry = zmalloc(sizeof(*entry)); /* 将当前位置上数据赋值给新entry的next指针 */ entry->next = ht->table[index]; /* 将心得entry对象赋值给当前位置,即将新enrty放到单向链表的头部 */ ht->table[index] = entry; /* 增加哈希表的已使用值 */ ht->used++; /* Set the hash entry fields. */ /* 设置新entry的key数据 */ dictSetKey(d, entry, key); return entry; }
在按照key进行哈希查找时,会先按照key来计算出对应的哈希值和索引值,再按照索引值找到哈希数组对应位置上的单向链表,再依次遍历单向链表中的每个entry对象并使用查找的key和entry的key值进行对比,从而找到特定key的entry对象。
dictEntry *dictFind(dict *d, const void *key) { dictEntry *he; uint64_t h, idx, table; if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */ /* 如果当前字典处于rehash状态,则辅助完成1条数据迁移工作 */ if (dictIsRehashing(d)) _dictRehashStep(d); /* 计算key对应的hash值 */ h = dictHashKey(d, key); for (table = 0; table <= 1; table++) { /* 计算key对应的索引值 */ /* 在rehash过程中,ht[0]和ht[1]的sizemask不同,因此需要分别计算 */ idx = h & d->ht[table].sizemask; /* 获取到哈希数组上指定索引位置上的单向链表 */ he = d->ht[table].table[idx]; /* 遍历单向链表 */ while(he) { /* 针对单向链表上每个entry,对比entry存储的key和要查找的key */ if (key==he->key || dictCompareKeys(d, key, he->key)) /* 找到后立即返回,同一个key只会存在一份数据 */ return he; he = he->next; } /* 如果未处于rehas状态,处理完ht[0]后无需再处理ht[1] */ if (!dictIsRehashing(d)) return NULL; } return NULL; }
在每个哈希表对象中,使用size字段来存放该哈希表的长度,使用used字段来存放哈希表的已使用量:
/* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */ typedef struct dictht { /* 使用数组方式来存放哈希表数据 */ dictEntry **table; /* 哈希表长度 */ unsigned long size; /* 哈希表长度掩码,用来计算哈希值对应的数组下标 */ unsigned long sizemask; /* 哈希表上已使用量,即当前存储的entry总数量 */ unsigned long used; } dictht;
当 used * 100 / size < HASHTABLE_MIN_FILL(10)
时,哈希数组资源浪费严重,需要对哈希表进行缩容,将哈希数表长度扩展为第一个大于等于used*2的2的n次幂。
当 used / size > dict_force_resize_ratio(5)
时,哈希数组上哈希冲突严重,需要对哈希表进行扩容,将哈希数表长度缩小为第一个大于等于used的2的n次幂。
在Redis执行命令请求增加新键时,会调用函数dictAddRaw或dictAdd-->dictAddRaw来进行插入,在函数dictAddRaw中会使用_dictKeyIndex --> _dictExpandIfNeeded 来判断当前哈希表是否需要扩容。
static int dict_can_resize = 1; static unsigned int dict_force_resize_ratio = 5; /* Expand the hash table if needed */ static int _dictExpandIfNeeded(dict *d) { /* Incremental rehashing already in progress. Return. */ if (dictIsRehashing(d)) return DICT_OK; /* If the hash table is empty expand it to the initial size. */ if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); /* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */ if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { return dictExpand(d, d->ht[0].used*2); } return DICT_OK; }
哈希表的扩容操作主要由Redis插入新键值对数据的命令来触发,如:
在判断哈希表是否需要缩容时,通过函数tryResizeHashTables来:
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL * we resize the hash table to save memory */ void tryResizeHashTables(int dbid) { if (htNeedsResize(server.db[dbid].dict)) dictResize(server.db[dbid].dict); if (htNeedsResize(server.db[dbid].expires)) dictResize(server.db[dbid].expires); } /* This is the initial size of every hash table */ #define DICT_HT_INITIAL_SIZE 4 /* Hash table parameters */ /* Minimal hash table fill 10% */ #define HASHTABLE_MIN_FILL 10 int htNeedsResize(dict *dict) { long long size, used; size = dictSlots(dict); used = dictSize(dict); /* 当前使用量小于当前长度的10%时,进行缩容 */ return (size > DICT_HT_INITIAL_SIZE && (used*100/size < HASHTABLE_MIN_FILL)); } /* Resize the table to the minimal size that contains all the elements, * but with the invariant of a USED/BUCKETS ratio near to <= 1 */ int dictResize(dict *d) { int minimal; if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR; /* 按照当前使用量进行缩容 */ minimal = d->ht[0].used; if (minimal < DICT_HT_INITIAL_SIZE) minimal = DICT_HT_INITIAL_SIZE; return dictExpand(d, minimal); }
Redis按照redisServer.hz参数控制的频率执行定时任务执行过程中,会调用serverCron-->databasesCron函数,在databasesCron函数执行过程中,如果当前未执行AOF重写或RDB备份操作,则:
/* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (server.masterhost == NULL) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); } } /* Defrag keys gradually. */ if (server.active_defrag_enabled) activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ /* 在没有RDB备份或AOF日志重写时,才会触发rehash操作 */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j++) { tryResizeHashTables(resize_db % server.dbnum); resize_db++; } /* Rehash */ if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum; } } } } }
无论时缩容还是扩容,都会调用dictExpand函数来处理,按照新的hash表容量(unsigned long size)计算出一个接近且大于size的2^N的值,并以此值来初始化dictht对象,再赋值给d->ht[1],同时设置rehash状态值d->rehashidx =0。
/* Expand or create the hash table */ int dictExpand(dict *d, unsigned long size) { /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; dictht n; /* the new hash table */ unsigned long realsize = _dictNextPower(size); /* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR; /* Allocate the new hash table and initialize all pointers to NULL */ n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; } /* Prepare a second hash table for incremental rehashing */ d->ht[1] = n; d->rehashidx = 0; return DICT_OK; } /* Our hash table capability is a power of two */ /* 哈希表的长度必须是2的N次方,因此必须选择一个接近且大于指定长度的2^N值 */ static unsigned long _dictNextPower(unsigned long size) { unsigned long i = DICT_HT_INITIAL_SIZE; if (size >= LONG_MAX) return LONG_MAX + 1LU; while(1) { if (i >= size) return i; i *= 2; } }
函数dictExpand仅仅是初始化新哈希表ht[1]并设置dict的状态为rehash状态(rehashidx = 0),为不严重阻塞正常命令请求,采用渐进式哈希(rehashing)的机制来完成数据从ht[0]到ht[1]的迁移。rehash操作以哈希桶为基本单位调用dictRehash函数来完成。
rehash操作按照触发方式可以分为:
在函数incrementallyRehash中,当哈希表处于rehash状态(rehashidx不等于-1)时,会调用dictRehashMilliseconds来进行增量处理,按照每批次出100个键的方式循环执行1ms(硬编码在程序中),避免rehash操作执行时间过长阻塞其他客户请求。
/* Our hash table implementation performs rehashing incrementally while * we write/read from the hash table. Still if the server is idle, the hash * table will use two tables for a long time. So we try to use 1 millisecond * of CPU time at every call of this function to perform some rehahsing. * * The function returns 1 if some rehashing was performed, otherwise 0 * is returned. */ int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ } /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 1; /* already used our millisecond for this loop... */ } return 0; } /* 按照rehashidx是否不等于-1来判断当前dict对象是否处于rehas状态 */ #define dictIsRehashing(d) ((d)->rehashidx != -1) /* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */ int dictRehashMilliseconds(dict *d, int ms) { long long start = timeInMilliseconds(); int rehashes = 0; while(dictRehash(d,100)) { rehashes += 100; if (timeInMilliseconds()-start > ms) break; } return rehashes; }
在dictFind等对dict的增删改查的操作过程中,如果当前dict处于rehash状态,则调用_dictRehashStep(d)来迁移1个哈希桶的数据。
dictEntry *dictFind(dict *d, const void *key) { if (dictIsRehashing(d)) _dictRehashStep(d); } /* This function performs just a step of rehashing, and only if there are * no safe iterators bound to our hash table. When we have iterators in the * middle of a rehashing we can't mess with the two hash tables otherwise * some element can be missed or duplicated. * * This function is called by common lookup or update operations in the * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */ static void _dictRehashStep(dict *d) { if (d->iterators == 0) dictRehash(d,1); }
由于惰性rehash的操作仅迁移1个哈希桶的数据,因此不会对操作产生明显的性能影响。
无论时主动rehash还是惰性rehash操作,都会严格控制rehash操作的耗时,不会阻塞正常命令请求,但:
zcalloc(realsize*sizeof(dictEntry*))
来为新的哈希表ht[1]申请内存。当涉及到的哈希表size较大时,需要申请或释放较大的内存资源,造成Redis服务器的内存使用量明显变化,同时影响到客户端命令请求的执行时间。如果请求执行过程中触发,则可能导致简单命令耗时较长被记录到慢日志中。
Redis的内部扩容机制
Redis关键点(rehash)
浅谈Redis中的Rehash机制