• Redis篇(5)——持久化


    持久化

    RDB

    1、save会阻塞所有命令。而bgsave则不能与其他持久化命令同时执行
    2、自动rdb的发起:servercorn默认每100ms执行一次,根据redisserver里面记录的dirty(上次save后修改的次数)和lastsave(上次save的时间)变量以及配置的saveparams(保存条件)来判断是否符合保存条件,若符合则执行save
    3、默认的自动保存条件
    save 900 1 (900秒内1次修改)
    save 300 10
    save 60 10000

    数据结构

      struct redisServer {
      		// ...
      		long long dirty;    //距离上次保存后修改的键数
            time_t lastsave;    //上次保存时间
            struct saveparam *saveparams;   //保存条件
      		// ...
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    源码

    serverCron里根据根据redisserver里面记录的dirty和lastsave变量以及配置的*saveparams参数来判断当前是否需要触发bgsave
    在这里插入图片描述
    在这里插入图片描述

    save和bgSave伪代码
    在这里插入图片描述

    AOF

    数据结构

      struct redisServer {
      		// ...
      		sds 	aof_buf; //aof命令缓存区
      		// ...
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    源码

    在每次命令的call函数最终会执行到feedAppendOnlyFile方法,这个方法会将写命令存入aof_buf缓存区
    在这里插入图片描述
    而在后续serverCron都会执行flushAppendOnlyFile考虑是否将aof缓存区的内容写入同步到aof文件,以下是伪代码
    在这里插入图片描述
    不同的appendfsync决定了不同的刷盘策略
    在这里插入图片描述
    flushAppendOnlyFile源码

    void flushAppendOnlyFile(int force) {
        ssize_t nwritten;
        int sync_in_progress = 0;
        mstime_t latency;
    
        if (sdslen(server.aof_buf) == 0) {
            //如果是AOF_FSYNC_EVERYSEC且超过一秒且当前无刷盘任务则刷盘
            if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.aof_fsync_offset != server.aof_current_size &&
                server.unixtime > server.aof_last_fsync &&
                !(sync_in_progress = aofFsyncInProgress())) {
                goto try_fsync;
            } else {
                return;
            }
        }
    
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
            sync_in_progress = aofFsyncInProgress();
    
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
            //如果当前有刷盘任务且本次又不强制刷盘则判断是否需要推迟等待
            if (sync_in_progress) {
                if (server.aof_flush_postponed_start == 0) {
                    //如果是第一次则推迟并记录推迟开始时间
                    server.aof_flush_postponed_start = server.unixtime;
                    return;
                } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                    //最多推迟等待两秒
                    return;
                }
                //记录下延迟个数
                server.aof_delayed_fsync++;
                serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
            }
        }
    
        latencyStartMonitor(latency);
        nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); //写入系统缓存
        latencyEndMonitor(latency);
    
        if (sync_in_progress) {
            latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
        } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
            latencyAddSampleIfNeeded("aof-write-active-child",latency);
        } else {
            latencyAddSampleIfNeeded("aof-write-alone",latency);
        }
        latencyAddSampleIfNeeded("aof-write",latency);
    
        //已经写入系统缓存,重置延迟刷盘时间
        server.aof_flush_postponed_start = 0;
    
        //当实际写入与buf长度不一致时处理错误及打印些日志
        if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
            static time_t last_write_error_log = 0;
            int can_log = 0;
    
            /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
            if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
                can_log = 1;
                last_write_error_log = server.unixtime;
            }
    
            /* Log the AOF write error and record the error code. */
            if (nwritten == -1) {
                if (can_log) {
                    serverLog(LL_WARNING,"Error writing to the AOF file: %s",
                        strerror(errno));
                    server.aof_last_write_errno = errno;
                }
            } else {
                if (can_log) {
                    serverLog(LL_WARNING,"Short write while writing to "
                                           "the AOF file: (nwritten=%lld, "
                                           "expected=%lld)",
                                           (long long)nwritten,
                                           (long long)sdslen(server.aof_buf));
                }
    
                if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                    if (can_log) {
                        serverLog(LL_WARNING, "Could not remove short write "
                                 "from the append-only file.  Redis may refuse "
                                 "to load the AOF the next time it starts.  "
                                 "ftruncate: %s", strerror(errno));
                    }
                } else {
                    /* If the ftruncate() succeeded we can set nwritten to
                     * -1 since there is no longer partial data into the AOF. */
                    nwritten = -1;
                }
                server.aof_last_write_errno = ENOSPC;
            }
    
            /* Handle the AOF write error. */
            if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
                /* We can't recover when the fsync policy is ALWAYS since the
                 * reply for the client is already in the output buffers, and we
                 * have the contract with the user that on acknowledged write data
                 * is synced on disk. */
                serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
                exit(1);
            } else {
                /* Recover from failed write leaving data into the buffer. However
                 * set an error to stop accepting writes as long as the error
                 * condition is not cleared. */
                server.aof_last_write_status = C_ERR;
    
                /* Trim the sds buffer if there was a partial write, and there
                 * was no way to undo it with ftruncate(2). */
                if (nwritten > 0) {
                    server.aof_current_size += nwritten;
                    sdsrange(server.aof_buf,nwritten,-1);
                }
                return; /* We'll try again on the next call... */
            }
        } else {
            if (server.aof_last_write_status == C_ERR) {
                serverLog(LL_WARNING,
                    "AOF write error looks solved, Redis can write again.");
                server.aof_last_write_status = C_OK;
            }
        }
        server.aof_current_size += nwritten;
    
       //aof较小时则清空重用,比较大则释放内存并重新申请
        if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
            sdsclear(server.aof_buf);
        } else {
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
        }
    
    //实际刷盘
    try_fsync:
        //如果配置了aof rewrite时不要刷盘,且当前有子进程在执行aof重写或rdb保存,就先不要fsync
        if (server.aof_no_fsync_on_rewrite &&
            (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
                return;
    
        //AOF_FSYNC_ALWAYS 同步刷盘
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            latencyStartMonitor(latency);
            redis_fsync(server.aof_fd);
            latencyEndMonitor(latency);
            latencyAddSampleIfNeeded("aof-fsync-always",latency);
            server.aof_fsync_offset = server.aof_current_size;
            server.aof_last_fsync = server.unixtime;
        }
        //AOF_FSYNC_EVERYSEC 提交job异步刷盘 
        else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                    server.unixtime > server.aof_last_fsync)) {
            if (!sync_in_progress) {
                aof_background_fsync(server.aof_fd);
                server.aof_fsync_offset = server.aof_current_size;
            }
            server.aof_last_fsync = server.unixtime;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160

    其实不止serverCron会执行flushAppendOnlyFile,像beforesleepprepareForShutDownstopAppendOnly时也会执行
    在这里插入图片描述
    总结:aof核心方法是 feedAppendOnlyFile、flushAppendOnlyFile

    混合持久化(aof重写)

    混合持久化本质是通过 AOF 后台重写(bgrewriteaof 命令)完成的,不同的是当开启混合持久化时,fork 出的子进程先将当前全量数据以 RDB 方式写入新的 AOF 文件,然后再将 AOF 重写缓冲区(aof_rewrite_buf_blocks)的增量命令以 AOF 方式写入到文件,写入完成后通知主进程将新的含有 RDB 格式和 AOF 格式的 AOF 文件替换旧的的 AOF 文件。

    数据结构

      struct redisServer {
      		// ...
      		int aof_rewrite_perc;         //重写条件之增长百分比
       		off_t aof_rewrite_min_size;     //重写条件之aof文件大小
      		list *aof_rewrite_buf_blocks; //aof 重写缓存区
      		int aof_pipe_write_data_to_child; //管道传递发送端
        	int aof_pipe_read_data_from_parent; //管道传递接收端
      		// ...
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    源码

    除主动调用 rewriteaof/bgrewriteaof 外,在serverCron会根据配置的参数判断是否需要重新。具体判断逻辑如下
    在这里插入图片描述
    serverCron里的判断是否需要aof重写的源码:
    在这里插入图片描述
    rewriteAppendOnlyFileBackground 重写aof
    在这里插入图片描述

    思考

    aof重写会阻塞主线程吗?如果不会,那如何保证重写过程中的数据一致性?
    不会阻塞,也是fork一个子线程。并且会有一个aof_rewrite_buf用于存储重写过程中的命令。所以当正在进行重写时一条命令的执行会同时写入aof_buf和aof_rewrite_buf_blocks缓存区,再通过aof_pipe_write_data_to_child管道传给重写子进程,子进程重写的过程中会尽可能的从管道中获取数据(因为aof重写缓存区写到aof文件是主线程执行了,为了防止长时间阻塞,每次等待可读取事件1ms,如果一直能读取到数据,则这个过程最多执行1000次,也就是1秒。如果连续20次没有读取到数据,则结束这个过程。),当重写进程结束了,会在主进程(阻塞)将剩余的新命令写入aof文件。

    在这里插入图片描述
    其实难点就在于aof重写是子进程异步的,主进程新加入的写命令如何传递给重写子进程。具体原理用到了管道传递(共有3个管道:2个控制管道,1个数据管道),具体实现后面有机会再详细独立一篇吧,这里只大概说一下数据发送端和接收端。

    数据发送端:
    写命令写入aof_buf的同时会判断是否正在aof重写,如果是则同时写入aof重写缓存区并创建管道读事件
    在这里插入图片描述
    数据接收端:
    rewriteAppendOnlyFileBackground方法fork的子进程中rewriteAppendOnlyFile方法里的第1405行-1414行
    在这里插入图片描述

    过期键对rdb和aof的影响

    rdb:过期键不会写入rdb文件,载入rdb文件时,主服务器也不会载入,只有从服务器会载入过期键。不过因为主服务器会同步至从,所以最终从服务器也没有过期键。(在复制模式下从服务器不会删除过期键,只能以主服务器同步del,在主服务器统一的删除,以确保数据一致性)
    aof:在真正删除时才会追加删除命令到aof,同时重写aof文件时也会去掉过期键。

    过期删除策略
    1、定时:aeMain(时间事件)
    2、定期:servercron
    3、惰性:expireIfNeeded

    持久化文件的加载

    在redis启动的main函数里第4373行的loadDataFromDisk加载持久化文件,而aeMain的监听在main函数的最后。所以先加载完持久化文件后才会提供redis服务。
    在这里插入图片描述

    参考文献:《Redis设计与实现》黄健宏著、redis-5.0.14源码

  • 相关阅读:
    你真的会开发测试框架?
    [CC2642r1] ble5 stacks 蓝牙协议栈 介绍和理解
    Leetcode.1123 最深叶节点的最近公共祖先
    redis中常见的问题(缓存穿透,缓存雪崩,缓存击穿,redis淘汰策略)
    音视频学习 - 创建 WinUI3 + ffmpeg 的桌面程序
    【MySQL】 MySQL 更新数据机制
    MySQL故障排查与生产环境优化
    初探动态规划
    Yii2 创建定时任务
    antd Carousel 重写dot样式
  • 原文地址:https://blog.csdn.net/qq_43196360/article/details/127448489