• Redis事务相关源码探究


    Redis事务源码解读

    源码地址:https://github.com/redis/redis/tree/7.0/src

    从源码来简单分析下 Redis 中事务的实现过程

    1、MULTI 声明事务

    Redis 中使用 MULTI 命令来声明和开启一个事务

    // https://github.com/redis/redis/blob/7.0/src/multi.c#L104
    void multiCommand(client *c) {
    	// 判断是否已经开启了事务
    	// 不持之事务的嵌套
        if (c->flags & CLIENT_MULTI) {
            addReplyError(c,"MULTI calls can not be nested");
            return;
        }
    	// 设置事务标识
        c->flags |= CLIENT_MULTI;
    
        addReply(c,shared.ok);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    1、首先会判断当前客户端是是否已经开启了事务,Redis 中的事务不支持嵌套;

    2、给 flags 设置事务标识 CLIENT_MULTI

    2、命令入队

    开始事务之后,后面所有的命令都会被添加到事务队列中

    // https://github.com/redis/redis/blob/7.0/src/multi.c#L59
    /* Add a new command into the MULTI commands queue */
    void queueMultiCommand(client *c) {
        multiCmd *mc;
    
        // 这里有两种情况的判断  
        // 1、如果命令在入队是有问题就不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误
        // 2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动
        if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
            return;
            
        // 在原commands后面配置空间以存放新命令
        c->mstate.commands = zrealloc(c->mstate.commands,
                sizeof(multiCmd)*(c->mstate.count+1));
        // 微信新配置的空间设置执行的命令和参数
        mc = c->mstate.commands+c->mstate.count;
        mc->cmd = c->cmd;
        mc->argc = c->argc;
        mc->argv = c->argv;
        mc->argv_len = c->argv_len;
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    入队的时候会做个判断:

    1、如果命令在入队时有语法错误不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误;

    2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动;

    3、client watch 的 key 有更新,当前客户端的 flags 就会被标记成 CLIENT_DIRTY_CASCLIENT_DIRTY_CAS 是在何时被标记,可继续看下文。

    3、EXEC 执行事务

    命令入队之后,再来看下事务的提交

    // https://github.com/redis/redis/blob/7.0/src/multi.c#L140
    void execCommand(client *c) {
        ...
        // 判断下是否开启了事务
        if (!(c->flags & CLIENT_MULTI)) {
            addReplyError(c,"EXEC without MULTI");
            return;
        }
    
        // 事务中不能 watch 有过期时间的键值
        if (isWatchedKeyExpired(c)) {
            c->flags |= (CLIENT_DIRTY_CAS);
        }
    
         // 检查是否需要中退出事务,有下面两种情况  
         // 1、 watch 的 key 有变化了
         // 2、命令入队的时候,有语法错误  
        if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
            if (c->flags & CLIENT_DIRTY_EXEC) {
                addReplyErrorObject(c, shared.execaborterr);
            } else {
                addReply(c, shared.nullarray[c->resp]);
            }
            // 取消事务
            discardTransaction(c);
            return;
        }
    
        uint64_t old_flags = c->flags;
    
        /* we do not want to allow blocking commands inside multi */
        // 事务中不允许出现阻塞命令
        c->flags |= CLIENT_DENY_BLOCKING;
    
        /* Exec all the queued commands */
        unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    
        server.in_exec = 1;
    
        orig_argv = c->argv;
        orig_argv_len = c->argv_len;
        orig_argc = c->argc;
        orig_cmd = c->cmd;
        addReplyArrayLen(c,c->mstate.count);
        // 循环处理执行事务队列中的命令
        for (j = 0; j < c->mstate.count; j++) {
            c->argc = c->mstate.commands[j].argc;
            c->argv = c->mstate.commands[j].argv;
            c->argv_len = c->mstate.commands[j].argv_len;
            c->cmd = c->realcmd = c->mstate.commands[j].cmd;
    
            
            // 权限检查
            int acl_errpos;
            int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
            if (acl_retval != ACL_OK) {
              ...
            } else {
                // 执行命令
                if (c->id == CLIENT_ID_AOF)
                    call(c,CMD_CALL_NONE);
                else
                    call(c,CMD_CALL_FULL);
    
                serverAssert((c->flags & CLIENT_BLOCKED) == 0);
            }
    
            // 命令执行后可能会被修改,需要更新操作
            c->mstate.commands[j].argc = c->argc;
            c->mstate.commands[j].argv = c->argv;
            c->mstate.commands[j].cmd = c->cmd;
        }
    
        // restore old DENY_BLOCKING value
        if (!(old_flags & CLIENT_DENY_BLOCKING))
            c->flags &= ~CLIENT_DENY_BLOCKING;
            
        // 恢复原命令
        c->argv = orig_argv;
        c->argv_len = orig_argv_len;
        c->argc = orig_argc;
        c->cmd = c->realcmd = orig_cmd;
        // 清除事务
        discardTransaction(c);
    
        server.in_exec = 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    事务提交的时候,命令的执行逻辑还是比较简单的

    1、首先会进行一些检查;

    • 检查事务有没有嵌套;
    • watch 监听的键值是否有变动;
    • 事务中命令入队列的时候,是否有语法错误;

    2、循环执行,事务队列中的命令。

    通过源码可以看到语法错误的时候事务才会结束执行,如果命令操作的类型不对,事务是不会停止的,还是会把正确的命令执行

    4、WATCH 监听变量

    WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。

    看下 watch 的键值对是如何和客户端进行映射的

    // https://github.com/redis/redis/blob/7.0/src/server.h#L918
    typedef struct redisDb {
        ...
        dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
        ...
    } redisDb;
    
    // https://github.com/redis/redis/blob/7.0/src/server.h#L1083
    typedef struct client {
        ...
        list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
        ...
    } client;
    
    // https://github.com/redis/redis/blob/7.0/src/multi.c#L262
    // 服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端。   
    //
    // 每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key .
    typedef struct watchedKey {
        // 键值
        robj *key;
        // 键值所在的db
        redisDb *db;
        // 客户端
        client *client;
        // 正在监听过期key 的标识
        unsigned expired:1; /* Flag that we're watching an already expired key. */
    } watchedKey;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    变量映射关系吐下所示

    redis

    分析完数据结构,看下 watch 的代码实现

    // https://github.com/redis/redis/blob/7.0/src/multi.c#L441
    void watchCommand(client *c) {
        int j;
    
        if (c->flags & CLIENT_MULTI) {
            addReplyError(c,"WATCH inside MULTI is not allowed");
            return;
        }
        /* No point in watching if the client is already dirty. */
        if (c->flags & CLIENT_DIRTY_CAS) {
            addReply(c,shared.ok);
            return;
        }
        for (j = 1; j < c->argc; j++)
            watchForKey(c,c->argv[j]);
        addReply(c,shared.ok);
    }
    
    // https://github.com/redis/redis/blob/7.0/src/multi.c#L270
    /* Watch for the specified key */
    void watchForKey(client *c, robj *key) {
        list *clients = NULL;
        listIter li;
        listNode *ln;
        watchedKey *wk;
    
        // 检查是否正在 watch 传入的 key 
        listRewind(c->watched_keys,&li);
        while((ln = listNext(&li))) {
            wk = listNodeValue(ln);
            if (wk->db == c->db && equalStringObjects(key,wk->key))
                return; /* Key already watched */
        }
        // 没有监听,添加监听的 key 到 db 中的 watched_keys 中
        clients = dictFetchValue(c->db->**watched_keys**,key);
        if (!clients) {
            clients = listCreate();
            dictAdd(c->db->watched_keys,key,clients);
            incrRefCount(key);
        }
        // 添加 key 到 client 中的  watched_keys 中
        wk = zmalloc(sizeof(*wk));
        wk->key = key;
        wk->client = c;
        wk->db = c->db;
        wk->expired = keyIsExpired(c->db, key);
        incrRefCount(key);
        listAddNodeTail(c->watched_keys,wk);
        listAddNodeTail(clients,wk);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    1、服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端;

    2、每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key ;

    3、当用 watch 命令的时候,过期键会被分别添加到 redisDb 中的 watched_keys 中,和 client 中的 watched_keys 中。

    上面事务的执行的时候,客户端有一个 flags, CLIENT_DIRTY_CAS 标识当前客户端 watch 的键值对有更新,那么 CLIENT_DIRTY_CAS 是在何时被标记的呢?

    // https://github.com/redis/redis/blob/7.0/src/db.c#L535
    /*-----------------------------------------------------------------------------
     * Hooks for key space changes.
     *
     * Every time a key in the database is modified the function
     * signalModifiedKey() is called.
     *
     * Every time a DB is flushed the function signalFlushDb() is called.
     *----------------------------------------------------------------------------*/
    
    // 每次修改数据库中的一个键时,都会调用函数signalModifiedKey()。
    // 每次DB被刷新时,函数signalFlushDb()被调用。
    /* Note that the 'c' argument may be NULL if the key was modified out of
     * a context of a client. */
    // 当 键值对有变动的时候,会调用 touchWatchedKey 标识对应的客户端状态为 CLIENT_DIRTY_CAS
    void signalModifiedKey(client *c, redisDb *db, robj *key) {
        touchWatchedKey(db,key);
        trackingInvalidateKey(c,key,1);
    }
    
    // https://github.com/redis/redis/blob/7.0/src/multi.c#L348
    /* "Touch" a key, so that if this key is being WATCHed by some client the
     * next EXEC will fail. */
    // 修改 key 对应的客户端状态为 CLIENT_DIRTY_CAS,当前客户端 watch 的 key 已经发生了更新
    void touchWatchedKey(redisDb *db, robj *key) {
        list *clients;
        listIter li;
        listNode *ln;
    
        // 如果 redisDb 中的 watched_keys 为空,直接返回
        if (dictSize(db->watched_keys) == 0) return;
        // 通过传入的 key 在 redisDb 的 watched_keys 中找到监听该 key 的客户端信息
        clients = dictFetchValue(db->watched_keys, key);
        if (!clients) return;
    
        /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
        /* Check if we are already watching for this key */
        // 将监听该 key 的所有客户端信息标识成 CLIENT_DIRTY_CAS 状态  
        listRewind(clients,&li);
        while((ln = listNext(&li))) {
            watchedKey *wk = listNodeValue(ln);
            client *c = wk->client;
    
            if (wk->expired) {
                /* The key was already expired when WATCH was called. */
                if (db == wk->db &&
                    equalStringObjects(key, wk->key) &&
                    dictFind(db->dict, key->ptr) == NULL)
                {
                    /* Already expired key is deleted, so logically no change. Clear
                     * the flag. Deleted keys are not flagged as expired. */
                    wk->expired = 0;
                    goto skip_client;
                }
                break;
            }
    
            c->flags |= CLIENT_DIRTY_CAS;
            /* As the client is marked as dirty, there is no point in getting here
             * again in case that key (or others) are modified again (or keep the
             * memory overhead till EXEC). */
             // 这个客户端应该被表示成 dirty,这个客户端就不需要在判断监听了,取消这个客户端监听的 key
            unwatchAllKeys(c);
    
        skip_client:
            continue;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    Redis 中 redisClient 的 flags 设置被设置成 REDIS_DIRTY_CAS 位,有下面两种情况:

    1、每次修改数据库中的一个键值时;

    2、每次DB被 flush 时,整个 Redis 的键值被清空;

    上面的这两种情况发生,redis 就会修改 watch 对应的 key 的客户端 flags 为 CLIENT_DIRTY_CAS 表示该客户端 watch 有更新,事务处理就能通过这个状态来进行判断。

    几乎所有对 key 进行操作的函数都会调用 signalModifiedKey 函数,比如 setKey、delCommand、hsetCommand 等。也就所有修改 key 的值的函数,都会去调用 signalModifiedKey 来检查是否修改了被 watch 的 key,只要是修改了被 watch 的 key,就会对 redisClient 的 flags 设置 REDIS_DIRTY_CAS 位。

  • 相关阅读:
    security登录实战
    高效掌握JDBC技术(二)| 掌握ORM思想 | 定义连接数据库的工具类
    可降阶的高阶方程与高阶线性微分方程
    【300+精选大厂面试题持续分享】大数据运维尖刀面试题专栏(九)
    华清远见上海中心22071班
    基于 Three.js 的图形操纵控件
    move_base代码阅读
    路由器bgp协议对接生产K8S集群网络(Calico)
    动态内存分配——malloc,calloc,realloc,free
    Kafka之Producer网络传输
  • 原文地址:https://blog.csdn.net/weixin_45525272/article/details/127879947