redis DB KV组织是通过字典来实现的;hash结构当节点超过 512 个或者单个字符串长度大于 64 时,hash结构采用字典实现;
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
typedef struct dictht {
dictEntry **table;
unsigned long size;// 数组长度
unsigned long sizemask; //size-1
unsigned long used;//当前数组当中包含的元素
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2]; //ht[0]代表旧的hashtable、ht[1]代表新的hashtable
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) 用于安全遍历*/
} dict;

负载因子
负载因子 = used / size ; used 是数组存储元素的个数, size 是数组的长度;
负载因子越小,冲突越小;负载因子越大,冲突越大;
redis的负载因子是 1 ;

如果负载因子 > 1 ,则会发生扩容;扩容的规则是翻倍;
如果正在 fork (在rdb、aof复写以及rdb-aof混用情况下)时,会阻止扩容;但是此时若负载因子 > 5 ,索引效率大大降低, 则马上扩容;这里涉及到写时复制原理;

写时复制
写时复制核心思想:只有在不得不复制数据内容时才去复制数据内容;

如果负载因子 < 0.1 ,则会发生缩容;缩容的规则是恰好包含 used 的 ;
恰好的理解:假如此时数组存储元素个数为9,恰好包含该元素的就是 ,也就是 16;

当 hashtable 中的元素过多的时候,不能一次性 rehash 到 ht[1] ;这样会长期占用 redis ,
其他命令得不到响应;所以需要使用渐进式 rehash ;
rehash步骤: 将 ht[0] 中的元素重新经过hash函数生成64位整数,再对 ht[1] 长度进行取余,从而映射到ht[1] ;
渐进式规则:
面试:
处于渐进式rehash阶段时,是否会发生扩容缩容?不会!
提出疑问:扩容的情况 4 scan 0
scan cursor [MATCH pattern] [COUNT count] [TYPE type]

采用高位进位加法的遍历顺序, rehash 后的槽位在遍历顺序上是相邻的;
遍历目标是:不重复,不遗漏 ;
# 只支持对最外层key过期;
expire key seconds
pexpire key milliseconds
ttl key
pttl key
惰性删除
分布在每一个命令操作时检查 key 是否过期;若过期删除 key ,再进行命令操作;
定时删除
在定时器中检查库中指定个数(25)个 key;
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */
/*The default effort is 1, and the maximum configurable effort * is 10. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now);
在redis实例中形成了很大的对象,比如一个很大的hash或很大的zset,这样的对象在扩容的时候,会一次性申请更大的一块内存,这会导致卡顿;如果这个大key被删除,内存会一次性回收,卡顿现象会再次产生;
如果观察到redis的内存大起大落,极有可能因为大key导致的;
# 每隔0.1秒 执行100条scan命令
redis-cli -h 127.0.0.1 --bigkeys -i 0.1 12
跳表(多层级有序链表)结构用来实现有序集合;鉴于redis需要实现 zrange 以及 zrevrange功能;需要节点间最好能直接相连并且增删改操作后结构依然有序;B+树时间复杂度为h*O(log2n);鉴于B+复杂的节点分裂操作;考虑其他数据结构;
有序数组通过二分查找能获得O(log2n)时间复杂度;平衡二叉树也能获得O(log2n)时间复杂度;

每隔一个节点生成一个层级节点;模拟二叉树结构,以此达到搜索时间复杂度为 O(log2n);
但是如果对理想跳表结构进行删除增加操作,很有可能改变跳表结构;如果重构理想结构,将是巨大的运算;考虑用概率的方法来进行优化;从每一个节点出发,每增加一个节点都有1/2 的概率增加一个层级,1/4 的概率增加两个层级, 1/8的概率增加3个层级,以此类推;经过证明,当数据量足够大(256)时,通过概率构造的跳表趋向于理想跳表,并且此时如果删除节点,无需重构跳表结构,此时依然趋向于理想跳表;此时时间复杂度为 (1-1/n^c)*O(log2n);
从节约内存出发,redis 考虑牺牲一点时间复杂度让跳表结构更加变扁平,就像二叉堆改成四叉堆结构;并且redis 还限制了跳表的最高层级为 32 ;
节点数量大于 128 或者有一个字符串长度大于 64 ,则使用跳表( skiplist );
数据结构
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele;
double score; // WRN: score 只能是浮点数
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span; // 用于 zrank
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length; // zcard
int level; // 最高层
} zskiplist;
typedef struct zset {
dict *dict; // 帮助快速索引到节点
zskiplist *zsl;
} zset;
结构图

前面实现的延时队列有很大的局限性;
解决:
# 在redis当中 zset结构 score = time + 5
dq_add "delay_queue" 5 msg
# 如果没有消息pop,则将此连接加入阻塞队列,然后在redis的定时器中 比较 zset 最小的值与当前 time的大小 如果满足条件,通知阻塞队列中的连接;
dq_bpop "delay_queue" timeout 60
Redis 通过对外提供一套 API 和一些数据类型,可以供开发者开发自己的模块并且加载到 redis中。通过API可以直接操作 redis 中的数据,也可以通过调用 redis 命令来操作数据(类似luascript)。通过编写模块可以注册自己的命令到redis中。事务 watch multi exec
不侵入 redis 源码基础上,提供一种高效的扩展数据结构的方式,也可以用来实现原子操作;
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
// RedisModule_Init 应该要是第一个被调用的函数
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver);
// RedisModule_CreateCommand 应该要是第二个被调用的函数
REDISMODULE_API int (*RedisModule_CreateCommand)(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep);
// 回调函数
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
redismodule.h
提供了丰富 API 加载模块,并且提供了 API 可以操作 redis 内部提供的数据结构;
需要掌握的接口:
下载编译
git clone https://github.com/RedisBloom/RedisBloom.git
cd RedisBloom
make
cp redisbloom.so /path/to
vi redis.conf
# loadmodules /path/to/redisbloom.so
# 为 key 所对应的布隆过滤器分配内存,参数是误差率以及预期元素,根据运算得出需要多少hash函数 以及所需位图大小
bf.reserve hotkey 0.0000001 10000
# 检测 key 所对应的布隆过滤器是否包含 field 字段
bf.exists key field
# 往 key 所对应的布隆过滤器中添加 field 字段
bf.add key field
BF.RESERVE hotkey 0.0000001 10000
BF.ADD hotkey mark
BF.ADD hotkey king
BF.ADD hotkey darren
BF.EXISTS hotkey mark

http://content.research.neustar.biz/blog/hll.html
作者:Philippe Flajolet.
在Redis实现中,每个键只使用 12kb 进行计数,使用 16384 ( ) 个桶子,标准误差为
0.8125% ,并且对可以计数的项目数没有限制,除非接近 个项目数(这似乎不太可能)。

hyperloglog提供合并两个集合的功能;

少量内存下统计一个集合中唯一元素数量的近似值;
给定一系列的随机整数 N,记录低位连续零位的最大长度 K;随机整数数量 N 和 最大长度 K 的关系?

-- for lua5.4
local function low_zeros(val)
for i=1, 32 do
if val >> i << i ~= val then
return i-1
end
end
return 31
end
math.randomseed(os.time(), 0)
local bit_record = {maxbits = 0}
function bit_record.random(self)
local val = math.random(1, 2^32-1)
local bits = low_zeros(val)
if bits > self.maxbits then
self.maxbits = bits
end
end
local test = {n = 0, record = bit_record}
function test.init(self, n)
self.record.maxbits = 0
self.n = n
end
function test.run(self)
for i=1, self.n do
self.record:random(i)
end
end
function test.dump(self)
print(self.n, ('%.2f'):format(math.log(self.n, 2)), self.record.maxbits)
end
for i=1000, 10000, 100 do
test:init(i)
test:run()
test:dump()
end
3400 11.73 11
3500 11.77 12
4000 11.97 12
4100 12.00 14
9100 13.15 13
9200 13.17 16
9700 13.24 12
9800 13.26 15
9900 13.27 13
10000 13.29 13

如何加权统计?
将计算分布在不同的桶中,然后通过计算平均数的方式来进行修改偏差;HLL 采用的是调和平均数(倒数的平均)。调和平均可以有效平滑离散值的影响;


相同元素通过hash函数会生成相同的 64 位整数,它会索引到相同的桶子中,累计0的个数也会相同,按照上面计算最长累计0的规则,它不会改变该桶子的最长累计0;
redis 中的 hyperloglog 存储分为稀疏存储和紧凑存储;
当元素很少的时候,redis采用节约内存的策略,hyperloglog采用稀疏存储方案;当存储大小超过3000 的时候,hyperloglog 会从稀疏存储方案转换为紧凑存储方案;紧凑存储不会转换为稀疏存储,因为hyperloglog数据只会增加不会减少(不存储具体元素,所以没法删除);
配置: hll-sparse-max-bytes 3000 ;
# hll1 对应着 Hyberloglog 对象
# 往 hll1 对应的 Hyberloglog 添加元素
PFADD hll1 mark king darren
# 计算 hll1 对应的 Hyberloglog 中唯一元素个数值
PFCOUNT hll1
# 往 hll1 对应的 Hyberloglog 添加重复元素mark
PFADD hll1 mark
# 计算 hll1 对应的 Hyberloglog 中唯一元素个数值,发现可以去重
PFCOUNT hll1
PFADD hll2 mark vico milo
# 合并 hll1 和 hll2 到 hll
PFMERGE hll hll1 hll2
# 发现也可以去重
PFCOUNT hll
前面介绍了时间窗口限流,方法简单,但是有很多缺陷;保存了所有元素,当量大的时候,不值得推荐;漏斗限流可以很好的限制元素的容量以及速率;
漏斗的结构特点:容量是有限的,当漏斗水装满,水将装不进去;水从漏嘴按一定速率流出,当流水速度大于灌水速度,那么漏斗永远无法装满;当流水速度小于灌水速度,一旦漏斗装满,需要阻塞等待漏斗腾出空间后再灌水;
所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,流水速度代表着系统允许该行为的最大频率;
Redis-Cell 是一个扩展模块,它采用 rust 编写;它采用了漏斗算法,并提供了原子的限流指令;
该模块只提供了一个指令 cl.throttle ;该指令描述了允许某用户某行为的频率为每 x 秒 n 次行为(流水速度);
# key 为某漏斗容器
# capacity 为某漏斗容器容量
# operations 为单位时间内某行为的操作次数
# seconds 为单位时间 operations / seconds = 流水速度
# quota 单次行为操作次数 默认值为 1
cl.thottle key capacity operations seconds quota
安装
# 源码安装 然后按照 readme.md 编译安装
git clone https://gitee.com/yaowenqiang/redis-cell.git
# bin安装 如下网址下载相应平台的 .so 动态库文件
https://github.com/brandur/redis-cell/releases
cp libredis_cell.so /path/to
vi redis.conf
# loadmodules /path/to/libredis_cell.so
测试
CL.THROTTLE cell 1 1 10 1
1) (integer) 0 # 0 标识允许 1 表示拒绝
2) (integer) 2 # 漏斗容量
3) (integer) 1 # 漏斗剩余空间
4) (integer) -1 # 如果被拒绝了,需要多长时间后再试(单位秒)
5) (integer) 10 # 多长时间后,漏斗完全空出来