在这个篇章,我们将对数据库、RDB持久化、AOF持久化、事件做下介绍。 这边还是一样,会参照redis源码,对redis特性进行分析。
这里我们将介绍Redis存储内容的一些代码
Redis服务器将将所有数据库都保存在服务器状态redis.h/redisServer结构的db数组中
struct redisServer {
//...
//一个数组保存服务器所有数据库
redisDb *db;
//服务器的数据库数量,默认16
int dbnum;
//...
}
默认redis创建0到15号数据库,默认使用0号数据库
struct client {
int fd;
sds querybuf;
int argc;
robj **argv;
//客户端正在使用的数据库
redisDb *db;
int flags;
list *reply;
// ... many other fields ...
char buf[PROTO_REPLY_CHUNK_BYTES];
}
client 是redis客户端结构,以前叫redisClient。 现在client.db指针指向redisServer.db数组中的其中一个元素,指向的就是client正在使用的数据库
select 2 (切换到2号数据库,实际就是修改client.db指针指向2号数据库)
Redis Flushdb 命令用于清空当前数据库中的所有 key(不能执行)。
Redis是一个键值对(key-value pair)数据库服务器,数据库由redis.h/redisDb表示,redisDb.dict字典保存了数据库中的所有键值对,这个字典称为键空间(key space)
typedef struct redisDb {
//...
dict *dict; /* The keyspace for this DB */ 指针指向一个map
dict *expires; /* Timeout of keys with a timeout set */指针指向一个map 有过期时间的key才会添加,value=long long 类型整数(过期时间 毫秒精度时间戳)
int id; /* Database ID */
//...
} redisDb;
键空间的键都是字符串,就是数据库里定义的key,而键空间的值也就是数据库的值,实际就redisObject(string,list,hash,set,zset)
TTL(Time To Live):key 剩下时间单位秒
SETEX:添加一个string键值对,同时设置过期时间(只有string可以使用,其他类型不支持)
PERSIST 命令用于移除给定 key 的过期时间,使得 key 永不过期(EXPIRE 键过期)
删除过期key策略
(1)定时器删除: 惰性删除由db.c/expireIfNeeded()方法实现,在执行读写操作时,会先调用这个函数
(2)惰性删除(长期放任不管,使用到该key,检查是否过期。过期删除):server.c/serverCron()定时方法调用expire.c/activeExpireCycle()方法实现,分多次随机检查一部分过期时间减少CPU占用
(3)定期删除(每隔一段时间程序检查数据库):
RDB全称redis database,在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时 直接将快照文件直接读到内存里
AOF全称是append only file. AOF 持久化的方式是通过redis服务器记录保存下所有的写命令到AOF文件存放在磁盘上
redis主服务器遇到过期键会删除,并通知从服务器;而从服务器遇到过期不管
redis通知功能由notify.c/notifyKeyspaceEvent(int type, char *event, robj *key, int dbid)函数实现,
两个命令生成RDB文件save和bgsave
save:会阻塞Redis服务进程,直到rdb文件创建完毕,服务进程阻塞,服务器不能处理任何命令请求
bgsave(background save):bgsave派生出子进程,由子进程创建rdb文件,服务进程可以继续处理请求
struct saveparam {
time_t seconds;
int changes;
};
save 300 10 // 300s内10次更新数据触发触发RDB文件生成
修改redis.conf文件可以配置自动触发触发RDB文件生成
struct redisServer {
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */ 命令修改了多少次数据库dirty次数加1,save后重置为0
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
time_t lastsave; /* Unix time of last successful save */
}
RDB(Redis DataBase)文件是 Redis 在指定时间点对当前数据库的数据进行快照(Snapshot)并以二进制格式保存的文件。RDB 文件用于持久化数据,允许在 Redis 服务重新启动时将数据加载回内存,以保证数据的持久性。
RDB文件是一个二进制文件组成部分:REDIS ,db_version,database,EOF,check_num
REDIS:52 45 44 49 53 # Magic String "REDIS" 类似 java calss文件结构 CAFEBABE(魔数)
db_version:30 30 30 33 # RDB Version Number as ASCII string. "0003" = 3
database:数据库中的数据,非空数据库保存 database0 database3 (空的数据库不会保存)
FF ## End of RDB file indicator
8-byte-checksum ## CRC64 checksum of the entire file.
database里的结构是:selectdb,db_number,key_value_pairs
selectdb: 0xFE 数据库选择器,告诉程序要开始选择数据库了
db_number:保存着数据库号码
key_value_pairs:数据库中所有键的信息,键有过期时间,那么过期时间和键值会保存一起
不带过期的键值对:type,key,value
带过期的键值对:EXPIRETIMEMS,ms,type,key,value
EXPIRETIMEMS:告知程序这是个过期key
ms:过期时间Union时间戳
如果Redis使用docker安装,那么查看方式
查看docker的rdb文件命令:docker inspect 容器名,找到MOUNTS 下的Source
AOF(Append Only File)持久化是 Redis 的一种持久化机制,通过记录所有写命令以追加方式保存到一个文件中,以确保数据的持久性。与 RDB 不同,AOF 文件是一个文本文件,可以包含 Redis 服务器在执行写命令时生成的所有操作。
struct redisServer {
sds aof_buf; /* AOF buffer, written before entering the event loop */ redis执行一个命令会将命令写到aof_buf末尾
}
AOF 文件的同步机制
appendonly yes 是否打开 AOF 持久化功能
appendfilename "appendonly.aof" AOF 文件名称
appendfsync everysec 同步频率: always 每个 Redis 命令都要同步写入硬盘,everysec 每秒执行一次同步,显式地将多个写命令同步到硬盘,no 让操作系统来决定应该何时进行同步
AOF文件读取,是创建一个伪客户端(fake client):使用一个没有网络连接的来执行AOF文件命令OF 文件的生成过程具体包括命令追加,文件写入,文件同步三个步骤
AOF文件重写,AOF文件保存每个命令文件会非常大,为了让文件小些那么就让AOF文件重写。原理就是遍历数据库查询每个键值对值,添加到对应命令到AOF文件中。
在AOF重写期间,服务器会接着操作命令为了保证重写后和数据库数据一致操作步骤是:执行客户端发来的命令,将命令追加到AOF缓冲区和AOF重写缓冲区(子进程执行AOF重写期间),AOF重写结束后
将AOF重写缓冲区里的命令写到AOF重写文件,AOF重写文件覆盖旧的AOF文件
Redis服务器是一个事件驱动程序,主要处理以下两类事件:
文件事件(file event):Redis服务器通过套接字与客户端连接,文件事件就是服务器对套接字的抽象。服务器与客户端的通信会产生相应的文件事件,而服务器通过监听并处理这些事件来完成网络通信操作
时间事件(time event):Redis服务器有些定时任务需要执行(serverCron函数)
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器(file event handler)。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器(dispatcher)、事件处理器。
因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型。
文件事件是对套接字的抽象,每当一个套接字准备好执行连接应答(accept)、写入、读取、关闭等操作时,就会产生一个文件事件
I/O多路复用将套接字放在一个队列里,通过队列有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件分派器传送套接字,所以Redis是单线程
I/O多路复用同时监听:AE_READABLE事件和 AE_WRITABLE事件,如果两个事件同时都可以执行那么优先执行AE_READABLE事件
文件事件
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
aeEventLoop *aeCreateEventLoop(int setsize) :将套接字给定事件加入到I/O多路复用程序监听范围,并对事件和事件处理器关联
void aeDeleteEventLoop(aeEventLoop *eventLoop):I/O多路复用程序取消对套接字事件的监听,并取消事件和事件处理器关联
int aeGetFileEvents(aeEventLoop *eventLoop, int fd):返回套接字正在监听的事件类型
AE_NONE 0 /* No events registered. */
AE_READABLE 1 /* Fire when descriptor is readable. */
AE_WRITABLE 2 /* Fire when descriptor is writable. */
AE_BARRIER 4 /* With WRITABLE, never fire the event if the
READABLE event already fired in the same event
loop iteration. Useful when you want to persist
things to disk before sending replies, and want
to do that in a group fashion. */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp):阻塞并等待所有被aeCreateEventLoop监听的套接字直到事件产生或者超时
int aeProcessEvents(aeEventLoop *eventLoop, int flags):文件事件分派器,先调用aeApiPoll()等待事件产生,然后遍历所有已产生的事件,并调用相应的事件处理器处理这些事件
文件事件处理器:
1.连接应答处理器 void clientAcceptHandler(connection *conn)
2.命令请求处理器 void readQueryFromClient(connection *conn) 读入客户端发送的命令请求
3.命令回复处理器 void sendReplyToClient(connection *conn) 将服务器执行命令后得到的数据通过套接字返回给客户端
时间事件
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
时间事件处理:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc):将一个新的时间事件添加到服务器
static int processTimeEvents(aeEventLoop *eventLoop) :时间事件处理器,遍历所有已到达时间事件(when <= now),并调用时间事件处理器
Redis定期对自身资源和状态检查和调整,事件事件一般只执行serverCron()这个函数,调用的函数是server.c/serverCron(),默认每秒运行10次,每次间隔100ms,实现的功能有:
清理过期键值对、尝试进行AOF和RDB操作,关闭清理失效客户端,集群模式进行定期同步,更新服务器的各类统计信息(时间、内存占用、数据库占用)
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
int aeProcessEvents(aeEventLoop *eventLoop, int flags):由这个函数调度文件事件还是时间事件,文件事件和时间事件不会出现抢占情况,所以时间事件执行会比设定的时间晚一些
server.h/client 结构保存了客户端当前的状态信息,以及执行相关功能用到的数据结构
struct client {
int fd;
sds querybuf; //保存客户端发送的命令请求,默认最大为1GB
int argc; //argv数组长度
robj **argv; //argv是一个数组,每一项都是字符串对象
redisDb *db; //客户端正在使用的数据库
int flags; //客户端标志值,客户端角色以及客户端所处的状态 主服务器,从服务器,lua脚本
list *reply;
// ... many other fields ...
char buf[PROTO_REPLY_CHUNK_BYTES]; //字节数组,默认大小为16KB
int bufpos; //buf数组目前已使用的字节数量
list *reply; /* List of reply objects to send to the client. */ 可变大小缓存区,通过链表连接多个字符串对象,当回复的长度超过固定字符长度(16kb)。理论上输出缓冲区不限制大小,不过不能超过硬性限制
robj *name; /* As set by CLIENT SETNAME. */
struct redisCommand *cmd //根据argv[0]='SET'找到对应的redisCommand结构,并将指针指向它
int authenticated; /* Needed when the default user requires auth. */ 0未通过身份验证 1已通过身份验证 (开启身份验证才会生效)
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
}
struct redisServer {
list *clients; /* List of active clients */
}
fd连接套接字的属性:伪客户端(fake client lua脚本和AOF文件命令,fd=-1)和客户端(连接的客户端fd>0)
服务器读取客户端命令请求:
(1)读取套接字中协议格式的命令请求,并将其保存到客户端状态的输入缓存区里(querybuf)
(2)对输入缓存区中的命令请求进行分析,提取出命令请求中包含的命令参数,以及命令个数,将其保存到 **argv和 argc中
(3)调用命令执行器,执行客户端指定的命令, 根据argv[0]='SET'找到对应的redisCommand结构,并将指针指向它
(4)程序需要进行一些检查的预备操作,比如cmd指针知否指向一个方法即输入的方法是否存在,根据arity判断输入的参数个数是否合法,是否通过身份验证等
(5)执行命令就是clint->cmd->proc(client)即setCommand(client),将产生一个OK回复(假设是 set msg hello),这个回复放到输出缓存区里(buf和reply)
(6)为客户端的套接字关联命令处理器,当前客户端套接字变为可写状态时,服务器执行回复处理器,将客户端输出缓存区里的命令回复发给客户端
(7)服务器执行一些后续的工作,如果开启了慢查询日志,检查这个命令是否为慢查询;更新microseconds和calls值;有开启AOF,将这个命令写入到AOF缓存中;其他服务器正在复制当前服务器,将当前命令传播给所有从服务器
struct redisCommand {
/* Declarative data */
const char *declared_name; /* A string representing the command declared_name.
* It is a const char * for native commands and SDS for module commands. */
int arity; /* Number of arguments, it is possible to use -N to say >= N */ 命令参数个数用于检查命令是否正确
redisCommandProc *proc; /* Command implementation */ 命令的实现函数
uint64_t flags; /* Command flags, see CMD_*. */ 命令的属性是写命令还是读命令
long long microseconds, calls, rejected_calls, failed_calls; //calls 服务器总共执行了多少次这个命令 microseconds 执行命令耗费的总时长
};
struct redisServer {
/* time cache */ serverCron()函数默认每隔100ms更新unixtime和mstime,时间并不是很准确。对于打印日志,lru时间,服务器上线时间,时间准备性不高使用;键设置过期时间,会再次调用下系统时间
redisAtomic time_t unixtime; /* Unix time sampled every cron cycle. */
mstime_t mstime; /* 'unixtime' in milliseconds. */
redisAtomic unsigned int lruclock; /* Clock for LRU eviction */ clock 时钟,lock锁。lruclock用于计算键的空转时长(idle)
size_t stat_peak_memory; /* Max used memory record */
/* Shutdown */
int shutdown_timeout; /* Graceful shutdown time limit in seconds. */
int shutdown_on_sigint; /* Shutdown flags configured for SIGINT. */
int shutdown_on_sigterm; /* Shutdown flags configured for SIGTERM. */
}
初始化服务器过程
(1)初始化服务器状态结构,调用server.c/initServerConfig()函数 初始化server变量工作,初始化服务运行id,运行频率(server.hz),服务器架构,运行端口号,初始化lru时钟
(2)载入配置选项,载入用户配置的信息,比如 修改端口号 redis-server --port 10086,替换第一步的默认配置信息参数
(3)初始化服务器数据结构,在initServerConfig只初始化server的结构数据,在当前部初始化server.clients(与客户端相连的状态结构),server.db数组(服务器数据库),
server.slowlog(慢查询日志),之后调用server.c/initServer()开始真正初始化服务器。(initServerConfig初始化一般属性,而initServer初始化数据结构)
initServer初始化数据结构,还进行了:
为服务器设置进程信号处理器; signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);
创建共享对象(经常使用的"OK"和“ERR”字符串和1-10000整数字符串对象,共享对象避免创建重复对象)
打开服务器的监听窗口,并为监听套接字关联应答事件处理器,等待服务器正式运行时接收客户端的连接
为serverCron函数创建时间事件,等待服务器正式运行时执行serverCron函数
AOF开启了为AOF功能操作做准备
初始化服务器后台的I/O模块,为I/O操作做准备
(4)还原数据库状态,AOF文件和RDB载入
(5)执行事件循环(loop)
Redis的复制功能分为同步(sync)和命令传播(command propagate)两个操作:
同步从服务器同步主服务器的数据状态 slaveof 命令可以将当前服务器转变为指定服务器的从属服务器 (slave server)。sync从服务器执行,主服务器收到指令执行bgsave命令生成rdb文件,
并将rdb文件同步给从服务器,接着将同步期间命令发送给从服务器
命令传播主服务操作的命令同步给从服务器
sync(中间从服务器宕机,需要完全复制一遍,消耗资源) psync:有完全同步(full resynchronization)和部分同步(partial resynchronization)
sync部分同步实现:
主服务器的复制偏移量(replication offset)和从服务器复制偏移量
主服务器的复制积压缓冲区(replication backlog):复制缓冲区记录命令,同时每个命令的偏移量。从服务器和主服务器偏移量不一样,那么从复制缓冲区查找从服务器的偏移量数据复制。缓冲区默认大小1M,就是个队列数据结构
服务器运行id(run id):从服务器保存主服务器的run id,如果从服务器发送了主服务器id,那么重连就知道部分同步即可。否则就完全同步
当这个服务器是从服务器时,保存主服务的地址和端口
struct redisServer {
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
}
struct client {
int slave_listening_port; /* As configured with: REPLCONF listening-port */
}
复制的实现:
(1)从服务器在客户端设置主服务器的地址和端口号,执行 slaveof host port
(2)创建套接字连接,从服务器根据ip和port建立和主服务器的套接字连接,现在从服务器就是主服务的客户端实际
(3)从服务器向主服务器发送ping命令,验证主从服务器能否正常通信
(4)身份验证,主从服务器都没设置身份验证才不会进行身份验证。而且需要主从服务器密码设置一样
(5)从服务器发送端口信息(replconf listening-port <port-number>)给主服务器
(6)同步操作反过来了,主服务器变成从服务器的客户端了,主服务器向从服务器发送命令请求`在这里插入代码片`
(7)主服务器将命令传播给从服务器,进而保证主从服务器数据一致
复制的过程和原理和MySQL的复制有点像。
MySQL复制: 也使用主从模式,主节点负责所有写操作,并将更新同步到从节点。从节点被动地从主节点接收数据。数据同步方式包括基于语句的复制和基于行的复制。
Sentinel(哨兵)是Redis高可用解决方案:由一个或任意多个Sentinel实例组成的Sentinel系统可以监视任意多个主服务器以及这些主服务器的从属服务器,主服务器宕机可以让从服务器变成主服务器。
工作原理:
每个sentinel以每秒钟一次的频率向它所知的master,slave以及其他sentinel实例发送一个 PING 命令
如果一个实例距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 则这个实例会被sentinel标记为主观下线。
如果一个master被标记为主观下线,则正在监视这个master的所有sentinel要以每秒一次的频率确认master的确进入了主观下线状态
当有足够数量的sentinel(大于等于配置文件指定的值 quorum)在指定的时间范围内确认master的确进入了主观下线状态, 则master会被标记为客观下线
在一般情况下, 每个sentinel会以每 10 秒一次的频率向它已知的所有master,slave发送 INFO 命令(就是提取一些信息 master slave等状态)
当master被sentinel标记为客观下线时,sentinel向下线的master的所有slave发送 INFO 命令的频率会从 10 秒一次改为 1 秒一次
当一个主服务器被判断为客观下线后,各个sentinel会协商选取一个领头sentinel(选举领头sentinel使用的raft算法),并由sentinel对下线服务器进行故障转移操作。(状态转移时频率也是变成1秒一次)
领头sentinel在从服务器中选一个称为主服务器
让旧的主服务器下的从服务器都改为复制新的主服务器,同时让旧的服务器称为新的主服务器的从服务器。这个旧主服务器上线后就变成主服务器了
SentinelState结构(Sentinel状态)保存了服务器中所有和Sentinel功能有关的状态
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */ 用于实现故障转移
dict *masters; /* Dictionary of master sentinelRedisInstances. 保存了所有被这个sentinel监视的服务器 即sentinelRedisInstance
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
} sentinel;
sentinelRedisInstance被sentinel监听的实例结构,这个实例可以是主服务器、从服务器、或者另外一个sentinel
typedef struct sentinelRedisInstance {
int flags; /* See SRI_... defines */ 实例的类型和实例的状态
char *name; /* Master name from the point of view of this sentinel. */
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
uint64_t config_epoch; /* Configuration epoch. */ 用于实现故障转移
sentinelAddr *addr; /* Master host. */
mstime_t down_after_period; /* Consider it down after that period. */ 实例无响应多少毫秒才会被判定为主观下线
/* Master specific. */
dict *sentinels; /* Other sentinels monitoring the same master. */ 指向其他sentinel节点
dict *slaves; /* Slaves for this master instance. */ 指向的是 从服务器的sentinelRedisInstance实例
unsigned int quorum;/* Number of sentinels that need to agree on failure. */ 判断这个实例为客观下线所需的支持投票数量
mstime_t failover_timeout; /* Max time to refresh failover state. */ 刷新故障转移的最大时限
} sentinelRedisInstance;
/* Address object, used to describe an ip:port pair. */ sentinelRedisInstance.addr
typedef struct sentinelAddr {
char *hostname; /* Hostname OR address, as specified */
char *ip; /* Always a resolved address */
int port;
} sentinelAddr;
初始化Sentinel最后一步是创建连向被监视主服务器的网络连接,Sentinel称为主服务器的客户端。sentinel会创建两个连向主服务器的异步网络连接:
(1)命令连接,这个连接专门向主服务器发送命令,并接收主服务器的回复
(2)订阅连接,这个连接订阅主服务器的 sentinel:hello频道 (为了消息不丢失,命令连接有时候客户端掉线也需要发送一些信息)
Sentinel连接主服务器或者从服务器时,会同时创建命令连接和订阅连接;连接其他sentinel时只会创建命令连接,不会创建订阅连接。因为sentinel需要通过接收主服务器和从服务器
发来的频道信息,来发现未知的新sentinel。所以才需要建立订阅连接,而互相已知的sentinel只要使用命令连接来通信即可。
Redis集群(Cluster) 是Redis提供的分布式数据库方案,集群通过分片(sharding)进行数据共享,并提供复制和故障转移功能。 redis集群,主节点负责处理槽,从节点用于复制某个主节点,并在主节点宕机称为主节点实现故障转移。
clusterNode 保存当前节点的状态
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ 40个十六进制数
char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */ 节点标识(表示节点角色,节点状态)
uint64_t configEpoch; /* Last configEpoch observed for this node */ 节点当前的配置纪元,用于故障转移
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ 一个字节二进制可以表示8个槽(如果该位为1表示该节点负责处理槽i,如果该为为0那么)
clusterLink *link; /* TCP/IP link established toward this node */ 保存连接节点所需的信息
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */ 一个数组,每个数组指向一个正在复制主节点的从节点的clusterNode结构
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
list *fail_reports; /* List of nodes signaling this as failing */ 一个链表记录了所有其他节点对该节点的下线报告
} clusterNode;
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
connection *conn; /* Connection to remote node */
list *send_msg_queue; /* List of messages to be sent */
size_t head_msg_send_offset; /* Number of bytes already sent of message at head of queue */
unsigned long long send_msg_queue_mem; /* Memory in bytes used by message queue */
char *rcvbuf; /* Packet reception buffer */ 输入缓冲区
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
client结构和clusterLink都有类似套接字,client用于连接客户端,而clusterLink用于连接其他节点
clusterState记录在当前节点下这个集群状态(集群上线还是下线,节点数)
typedef struct clusterState {
clusterNode *myself; /* This node */ 指向当前的节点
uint64_t currentEpoch; //集群当前的配置纪元,用于故障转移
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ 集群状态
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */ 集群节点名单
dict *shards; /* Hash table of shard_id -> list (of nodes) structures */
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; //当前节点正在将槽i迁移至其他节点即migrating_slots_to指向的节点
clusterNode *importing_slots_from[CLUSTER_SLOTS]; //当前节点正在从节点导入的槽
clusterNode *slots[CLUSTER_SLOTS]; //记录了集群中所有16384个槽的指派信息
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
} clusterState;
slots是一个二进制数组,用来记录当前节点需要处理哪些槽(一个字节可以表示8位,那么数组长度就是16384/8=2048),如果该位为1那么就是当前节点需要处理的槽,类似算法里的用二进制表示标识位
slots的16384槽由哪个节点处理,那么redis集群才会上线
Gossip protocol 也叫 Epidemic Protocol (流行病协议),实际上它还有很多别名,比如:“流言算法”、“疫情传播算法”等。redis使用meet添加节点,
节点建立连接后使用这个协议传播给其他节点,让其他节点都与当前新添加的节点握手。
cluster通过分片的方式保存数据库键值对:集群的整个数据库被分为16384个槽(slot),数据库每个键在当其中一个槽里。这个思想和一致性Hash有点像,在增加和减少节点,可以尽量少的迁移对应节点数据。
一致哈希是一种特殊的哈希技术,当调整哈希表的大小时,平均只需要 n/m 重新映射键,其中 n 是键数和 m 槽数。相比之下,在大多数传统哈希表中,数组槽数量的变化会导致几乎所有键被重新映射,因为键和槽之间的映射是由模块化操作定义的。(reSize时候)
Consistent Hashing主要目标是在节点的动态增减时尽量减小数据迁移的开销。
核心原理在于通过哈希函数将数据和节点映射到一个环形空间,然后通过在环上的顺时针方向找到最近的节点,从而实现负载均衡和节点的动态增减。为了映射更加均匀节点会用多个虚拟节点,hash更加均匀分布。
当客户端向服务段发送与数据库键相关的命令后,接收命令的节点会计算出命令要处理的key属于哪个槽。
如果连接的节点是自己的槽那么直接处理;如果不是那么返回一个moved错误,并指引客户端转向(redirect)正确的节点,客户端进行重试再次发起请求
因为可能会出现每次请求命令都会出现REMOVED,需要再次重定向。客户端可以将键和节点之间的映射信息保存起来, 可以有效地减少可能出现的转向次数(单机客户端会显示moved错误,集群只会打印重定向信息)
集群客户端通常会与集群中的多个节点创建套接字连接,所谓的节点转向实际就是换一个套接字发送命令
当节点需要让客户端仅仅在下一个命令请求中转向至另一个节点时, 节点向客户端返回 ASK 转向,往往发生在数据从节点A迁移到节点B的时候,即进行分片的时候。(这是和返回REMOVED区别,有这个区别是因为让客户端再次请求的时候发送ASKING以区别moved,不然节点又会返回错误的了)
计算key属于哪个槽:CRC16(key) & 16383 (每个 key 通过 CRC16 校验后对 16384 取模来,与操作更快。类似map属于数组哪个index一样) 2^14=16383
CRC16(循环冗余校验)是一种 16 位的校验和算法。它的输出结果是一个 16 位的无符号整数,因此 CRC16 的值的范围是 0 到 65535(2^16 - 1)
查看key在哪个槽: CLUSTER KEYSLOT
- 1
Redis6增加了集群代理。客户端不需要知道集群中的具体节点个数和主从身份,可以直接通过代理访问集群,对于客户端来说通过集群代理访问的集群就和单机的Redis一样,因此也能解决很多集群的使用限制。
redis集群重新分片原理:(增加或移除节点)重新分片由redis集群管理工具redis-trib负责执行
集群节点故障检测,节点向其他节点发送一个ping消息,如果长时间没有收到pong消息,那么发送ping消息的节点会将该节点标记为疑似下线(probable fail,PFAIL).如果超过半数
都认为这个节点下线了,那么就会将这个节点下线(标记为 FAIL),并广播。
故障转移,当一个复制的从节点发现自己复制的主节点进入下线状态了,那么就会开始进行故障转移
(1)宕机的主节点里从当中的从节点选一个处理,称为主节点(选举和sentinel类似都采用Raft算法的领头选举)
(2)被选中的从节点执行 savleof no one命令,称为一个新的主节点
(3)新的主节点会撤销所有对已下线的主节点的槽指派,并将这些槽全部指派给自己
(4)新的节点向集群广播一条pong消息,让集群其他节点知道自己称为主节点了,并且接管了原来负责的槽
(5)新节点开始接收和处理自己负责的槽,故障转移结束
节点之间通信发送的消息都由消息头包装,使用的结构是cluster.h/clusterMsg
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */ 包括消息头和消息正文的长度
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* TCP base port number. */
uint16_t type; /* Message type */ 消息类型
uint16_t count; /* Only used for some kind of messages. */ 消息正文包含的节点信息数量,只在发送MEET,PING,PONG这三种Gossip协议消息时使用
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last 主节点那么是自己的configEpoch,从节点那么是主节点的configEpoch
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */ 发送者名字
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN]; 发送者是从节点,记录是从节点复制的主节点名字;发送者是主节点那么这里记录的是主节点名字
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
uint16_t extensions; /* Number of extensions sent along with this packet. */
char notused1[30]; /* 30 bytes reserved for future usage. */
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data; //消息正文
} clusterMsg;
消息正文结构cluster.h/clusterMsgData
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
/* Extension data that can optionally be sent for ping/meet/pong
* messages. We can't explicitly define them here though, since
* the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
sentinel着眼于高可用,Cluster提高并发量
Redis的发布与订阅由PUBLISH(发布),SUBSCRIBE(订阅),PSUBSCRIBE(模式订阅)
struct redisServer {
//...
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */ 所有频道的订阅关系
dict *pubsub_patterns; /* A dict of pubsub_patterns */ 模式订阅关系
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
//...
}
客户端执行SUBSCRIBE命令,会在pubsub_channels将频道和客户端建立关联pubsub_channels保存所有频道的订阅关系,键是被订阅的频道,键的值是一个链表。记录了所有订阅这个频道的客户端
UNSUBSCRIBE命令是取消订阅,就是pubsub_channels将订阅者删除,如果pubsub_channels为空了,那么这个dict也删除
Redis通过MULTI,EXEC,WATCH等命令实现事务(transaction)功能。事务提供一种将多个命令请求打包,然后一次性顺序执行。不过当中执行报错了,前面执行了不会回滚
struct client {
multiState mstate; /* MULTI/EXEC state */ 事务状态保存在客户端的client.mstate里
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
}
typedef struct multiState {
multiCmd *commands; /* Array of MULTI commands */ 先进先出队列,事务队列
int count; /* Total number of MULTI commands */
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
size_t argv_len_sums; /* mem used by all commands arguments */
int alloc_count; /* total number of multiCmd struct memory reserved. */
} multiState;
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv; 参数
int argv_len; 参数长度
int argc; 参数数量
struct redisCommand *cmd; 命令指针
} multiCmd;
typedef struct redisDb {
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
} redisDb;
/* The watchedKey struct is included in two lists: the client->watched_keys list,
* and db->watched_keys dict (each value in that dict is a list of watchedKey structs).
* The list in the client struct is a plain list, where each node's value is a pointer to a watchedKey.
* The list in the db db->watched_keys is different, the listnode member that's embedded in this struct
* is the node in the dict. And the value inside that listnode is a pointer to the that list, and we can use
* struct member offset math to get from the listnode to the watchedKey struct.
* This is done to avoid the need for listSearchKey and dictFind when we remove from the list. */
typedef struct watchedKey {
listNode node;
robj *key;
redisDb *db;
client *client;
unsigned expired:1; /* Flag that we're watching an already expired key. */
} watchedKey;
客户端使用WACTH 命令,监听对应键,刚这个事务在开启事务期间被其他客户端执行了,那么客户端不执行这个事务相当于乐观锁(CAS)
所有对数据库修改的命令比如SET,LPUSH,DEL,SADD,都会触发multi.c/touchWatchedKey(redisDb *db, robj *key)方法,查看是否有客户端正在监视刚刚被命令修改过的数据库键,
如果有的话,会将监视被修改键的客户端 CLIENT_DIRTY_CAS 标识打开,客户安全性已被破坏。客户端执行exec会变成失败。
redis事务执行报错不会回滚,作者认为redis主旨是简单高效一般开发环境才会出现执行报错,生产环境不会出现,没有设计执行错误回滚。也就是会出现一个事务里部分失败,部分成功。
Redis事务ACID中原子性(理论上有,不过报错不会滚),一致性(转账一个少了,一个多了),隔离性(本身就是单线程),持久性(服务器在AOF开启,并且appendfsync模式为always事务才具有持久性)
通过服务器中嵌入Lua环境,Redis客户端可以使用Lua脚本,直接在服务端原子地执行多个Redis命令
Redis在服务器内嵌了一个Lua环境,并对环境进行了一些修改,从而确保这个Lua环境可以满足Redis服务器的需要。还创建了用于执行Lua脚本中的Redis命令的伪客户端,还有保存Lua脚本的lua_scripts字典
Redis服务器在Lua环境中创建一个redis全局表格,并设为全局变量。这个redis表格包含以下函数:
执行Redis命令的 redis.call命令和redis.pcall命令;记录Redis日志的redis.log函数;计算SHA1检验和的redis.sha1hex函数;返回错误信息的redis.error_reply和redis.status_reply
redis.call() 和 redis.pcall() 的唯一区别在于它们对错误处理的不同。
redis.call() 在执行命令的过程中发生错误时,脚本会停止执行,并返回一个脚本错误,错误的输出信息会说明错误造成的原因
redis.pcall() 出错时并不引发(raise)错误,而是返回一个带 err 域的 Lua 表(table),用于表示错误
Redis服务器Lua脚本执行过程:
(1)Lua环境将redis.call函数和redis.pcall函数想要执行的命令传给伪客户端
(2)伪客户端将要执行的脚本命令传给命令执行器
(3)命令执行器执行伪客户端传给他的命令,并将结果返回给伪客户端
(4)伪客户端将结果返回给Lua环境
(5)Lua环境将收到的命令结果,将结果返回给redis.call函数或者redis.pcall函数
(6)redis.call函数或者redis.pcall函数将命令结果作为返回值返回给脚本调用者
最新版本lua_scripts保存在eval.h/luaCtx
/* Lua context */
struct luaCtx {
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
client *lua_client; /* The "fake client" to query Redis from Lua */
// 被eval和script load命令执行过的lua脚本都保存到lua_scripts字典,为了实现SCRIPT EXISTS命令和脚本复制功能(脚本复制到其他节点)
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
} lctx;
EVAL命令实现:
(1) 定义脚本函数,函数名就是 f_前缀加上脚本的SHA1校验和组成,函数体就是脚本本身,例如.sha1如果一样那么说明脚本一样,那么就不会保存重复的脚本了
EVAL "return 'hello world'" 0 脚本,服务器将在Lua脚本定义如下函数:
function f_5332031c6b470dc5a0dd9b4bf2030dea6d65de91()
return 'hello world'
end
(2)将脚本保存到lua_scripts中,key=5332031c6b470dc5a0dd9b4bf2030dea6d65de91 val=return 'hello world'
(3)执行脚本函数,将EVAL命令传入的参数保存到Lua环境中,服务器为Lua环境装载超时处理钩子(hook),钩子可以让脚本超时客户端通过 SCRIPT KILL命令停止脚本
执行脚本函数,移除超时钩子,将函数所得结果保存到客户端输出缓冲区中,对Lua环境执行垃圾回收
以上是对Redis总结,大家如果想深入理解Redis。推荐大家查阅《Redis设计与实现 》这本书