typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;
Redis中dict结构体包含了两个ditcht,这是为了rehash。
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
提供了dictType,我认为这是用C语言实现的编译时多态(C语言是不存在多态的,这里准确的说我认为Redis通过dictType这个结构体,复用了dict结构体,很像C++编译时多态的思想) ,在创建dict时需要将dictType传入,不同的dictType可以提供不同的hashFunction、keyDup、keyCompare函数。
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
dicht的结构中有dictEntry **table
这是一个指针数组,可以理解为是哈希表的array部分。
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
这是dict中每个entry的结构,除了k和不同类型的v以外,还有struct dictEntry *next;
体现了这是一个链哈希,即如果发生哈希冲突,就通过链表指针来组织起所有位于同一个哈希槽的entry。
/* Returns the index of a free slot that can be populated with
* an hash entry for the given 'key'.
* If the key already exists, -1 is returned. */
static int _dictKeyIndex(dict *ht, const void *key) {
unsigned int h;
dictEntry *he;
/* Expand the hashtable if needed */
if (_dictExpandIfNeeded(ht) == DICT_ERR)
return -1;
/* Compute the key hash value */
h = dictHashKey(ht, key) & ht->sizemask;
/* Search if this slot does not already contain the given key */
he = ht->table[h];
while(he) {
if (dictCompareHashKeys(ht, key, he->key))
return -1;
he = he->next;
}
return h;
}
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
在dictExpand
函数中,有
_dictInit(&n, ht->type, ht->privdata);
n.size = realsize;
n.sizemask = realsize-1;
n.table = calloc(realsize,sizeof(dictEntry*));
所以,一个key应该插入到ht的哪个槽呢?就是_dictKeyIndex
中的这一句
h = dictHashKey(ht, key) & ht->sizemask;
可以保证h的值在[0,size-1]之间,而这些槽已经被初始化了:
n.table = calloc(realsize,sizeof(dictEntry*));
常规的dictAdd、dictDelete比较简单。
值得一提的是rehash过程。
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
整体来看就是ht[0]的尺寸太小了,为了效率,需要把ht[0]的所有元素都搬运到扩展了尺寸的ht[1]中。
返回值为1说明rehash还没有完成。
返回值为0说明rehash已经完成。并且已经交换了ht[0]和ht[1],之后的命令写入可以往ht[0]里写了。
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
实际上的rehash是在databasesCron函数里做的,incrementallyRehash指定了每次进行rehash的dict和时长(1 ms)。而dicthash()又设置了每次最多进行n个槽和n*10个空槽的遍历。