• Redis 源码简洁剖析 11 - 主 IO 线程及 Redis 6.0 多 IO 线程


    Redis 到底是不是单线程的程序?#

    Redis 只有在处理「客户端请求」时,是单线程的;整个 Redis server 不是单线程的,还有后台线程在辅助处理任务。

    Redis 选择单线程处理请求,是因为 Redis 操作的是「内存」,加上设计了「高效」的数据结构,所以操作速度极快,利用 IO 多路复用机制,单线程依旧可以有非常高的性能。

    Redis 不让主线程执行一些耗时操作,比如同步写、删除等,而是交给后台线程异步完成,从而避免了对主线程的阻塞。

    在 2020 年 5 月推出的 Redis 6.0 版本中,还会使用多线程来处理 IO 任务,能够充分利用服务器的多核特性,使用多核运行多线程,让多线程帮助加速数据读取命令解析数据写回的速度,提升 Redis 的整体性能。

    多 IO 线程的初始化#

    在 main 函数中,会调用 InitServerLast 函数,Redis 6.0 源码:

    void InitServerLast() {
        bioInit();
        // 初始化 IO 线程
        initThreadedIO();
        set_jemalloc_bg_thread(server.jemalloc_bg_thread);
        server.initial_memory_usage = zmalloc_used_memory();
    }
    

    在调用了 bioInit 函数后,又调用了 initThreadedIO 函数初始化多 IO 线程。initThreadedIO 函数在 networking.c 文件中。

    void initThreadedIO(void) {
        // IO 线程激活标志:设置为「未激活」
        server.io_threads_active = 0;
    
        // 只有 1 个 io 线程,直接返回,直接在主线程处理 IO
        if (server.io_threads_num == 1) return;
    
        if (server.io_threads_num > IO_THREADS_MAX_NUM) {
            serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                                 "The maximum number is %d.", IO_THREADS_MAX_NUM);
            exit(1);
        }
    
        /* Spawn and initialize the I/O threads. */
        for (int i = 0; i < server.io_threads_num; i++) {
            io_threads_list[i] = listCreate();
            // Thread 0 是主线程
            if (i == 0) continue;
    
            /* Things we do only for the additional threads. */
            pthread_t tid;
            // 初始化 io_threads_mutex
            pthread_mutex_init(&io_threads_mutex[i],NULL);
            setIOPendingCount(i, 0);
            pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
            // pthread_create 创建 IO 线程,线程运行函数是 IOThreadMain
            if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
                serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
                exit(1);
            }
            // 初始化 io_threads 数组,设置值为线程标识
            io_threads[i] = tid;
        }
    }
    

    代码中首先判断 io_threads_num:

    • io_threads_num = 1,表示直接在主线程处理,直接返回
    • io_threads_num > IO_THREADS_MAX_NUM,表示 IO 线程数量>宏定义的值(默认值 128),直接退出程序

    initThreadedIO 函数就会给以下四个数组进行初始化操作:

    • io_threads_list 数组:保存了每个 IO 线程要处理的客户端,将数组每个元素初始化为一个 List 类型的列表
    • io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数
    • io_threads_mutex 数组:保存线程互斥锁
    • io_threads 数组:保存每个 IO 线程的描述符

    这四个数组的定义都在 networking.c 文件中:

    
    pthread_t io_threads[IO_THREADS_MAX_NUM];   //记录线程描述符的数组
    pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];  //记录线程互斥锁的数组
    _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];  //记录线程待处理的客户端个数
    list *io_threads_list[IO_THREADS_MAX_NUM];  //记录线程对应处理的客户端
    

    initThreadedIO 函数在 for 循环中,调用 pthread_create 函数创建线程。pthread_create 详细语法见:pthread_create(3) — Linux manual page

    创建的线程要运行的函数是 IOThreadMain,*arg 参数就是当前创建线程的编号(从 1 开始,0 是主 IO 线程)。

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        io_threads_list[i] = listCreate();
        // Thread 0 是主线程
        if (i == 0) continue;
    
        /* Things we do only for the additional threads. */
        pthread_t tid;
        // 初始化 io_threads_mutex
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]);
        // pthread_create 创建 IO 线程,线程运行函数是 IOThreadMain
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        // 初始化 io_threads 数组,设置值为线程标识
        io_threads[i] = tid;
    }
    

    IO 线程运行函数 IOThreadMain#

    主要逻辑是一个 while(1) 的循环,会把 io_threads_list 在这个线程对应的元素取出来,判断并处理。

    void *IOThreadMain(void *myid) {
        ……
    
        while(1) {
            /* Wait for start */
            for (int j = 0; j < 1000000; j++) {
                if (getIOPendingCount(id) != 0) break;
            }
    
            ……
            // 获取 IO 线程要处理的客户端列表
            listRewind(io_threads_list[id],&li);
            while((ln = listNext(&li))) {
                // 从客户端列表中获取一个客户端
                client *c = listNodeValue(ln);
                // 线程是「写操作」,调用 writeToClient 将数据写回客户端
                if (io_threads_op == IO_THREADS_OP_WRITE) {
                    writeToClient(c,0);
                // 如果是『读操作』,调用 readQueryFromClient 从客户端读数据
                } else if (io_threads_op == IO_THREADS_OP_READ) {
                    readQueryFromClient(c->conn);
                } 
                ……
            }
            // 处理完所有客户端,清空该线程的客户端列表
            listEmpty(io_threads_list[id]);
            // 将该线程的待处理任务数量设为 0
            setIOPendingCount(id, 0);
        }
    }
    

    注:上面代码中 io_threads_op 变量是在 handleClientsWithPendingWritesUsingThreads 函数和 handleClientsWithPendingReadsUsingThreads 函数中设置的。

    问题:IO 线程要处理的客户端是如何添加到 io_threads_list 数组中的呢?

    是在 redisServer 全局变量里,有两个 List 类型的成员变量:

    • clients_pending_write:待写回数据的客户端
    • clients_pending_read:待读取数据的客户端
    
    struct redisServer {
        ...
        // 待写回数据的客户端
        list *clients_pending_write;  
        // 待读取数据的客户端
        list *clients_pending_read;  
        ...
    }
    

    Redis server 在接收到客户端请求、返回给客户端数据的过程中,会根据一定条件,推迟客户端的读写操作,并分别把待读写的客户端保存到这两个列表中。之后 Redis server 每次进入事件循环前,都会把列表中的客户端添加到 io_threads_list 数组中,交给 IO 线程处理。

    如何推迟客户端「读」操作?#

    处理可读事件的回调函数是 readQueryFromClient。

    void readQueryFromClient(connection *conn) {
        // 从 connection 结构中获取客户端
        client *c = connGetPrivateData(conn);
        ……
    
        // 是否推迟从客户端读取数据(使用多线程 IO 时)
        if (postponeClientRead(c)) return;
    
        ……
    }
    

    主要看下 postponeClientRead 函数。

    int postponeClientRead(client *c) {
        if (server.io_threads_active &&
            server.io_threads_do_reads &&
            !ProcessingEventsWhileBlocked &&
            !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
        {
            // 客户端 flag 添加 CLIENT_PENDING_READ 标记,推迟客户端的读操作
            c->flags |= CLIENT_PENDING_READ;
            // 将客户端添加到 server 的 clients_pending_read 列表中
            listAddNodeHead(server.clients_pending_read,c);
            return 1;
        } else {
            return 0;
        }
    }
    

    if 的判断条件:是否可以推迟当前客户端的读操作;if 块里的执行逻辑:将客户端添加到 clients_pending_read 列表中。下面主要看下判断条件:

    1. server.io_threads_active = 1:多 IO 线程已激活。
    2. server.io_threads_do_reads = 1:多 IO 线程可用于处理延迟执行的客户端读操作,是在 Redis 配置文件 redis.conf 中,通过配置项 。io-threads-do-reads 设置的,默认值为 no。
    3. ProcessingEventsWhileBlocked = 0:ProcessingEventsWhileBlocked 函数没有在执行,当 Redis 在读取 RDB 文件或 AOF 文件时,会调用这个函数,用来处理事件驱动框架捕获到的事件,避免因读取 RDB 或 AOF 文件造成 Redis 阻塞。
    4. 客户端现有标识不能有 CLIENT_MASTERCLIENT_SLAVECLIENT_PENDING_READ
      • CLIENT_MASTER:客户端用于主从复制
      • CLIENT_SLAVE:客户端用于主从复制
      • CLIENT_PENDING_READ:客户端本来就被设置为推迟读操作

    如何推迟客户端「写」操作?#

    Redis 在执行了客户端命令,要给客户端返回结果时,会调用 addReply 函数将待返回的结果写入输出缓冲区。addReply 函数开始就会调用 prepareClientToWrite 函数。

    /* -----------------------------------------------------------------------------
     * Higher level functions to queue data on the client output buffer.
     * The following functions are the ones that commands implementations will call.
     * -------------------------------------------------------------------------- */
    
    /* Add the object 'obj' string representation to the client output buffer. */
    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        ……
    }
    

    prepareClientToWrite 函数的注释如下:

    /* This function is called every time we are going to transmit new data
     * to the client. The behavior is the following:
     *
     * If the client should receive new data (normal clients will) the function
     * returns C_OK, and make sure to install the write handler in our event
     * loop so that when the socket is writable new data gets written.
     *
     * If the client should not receive new data, because it is a fake client
     * (used to load AOF in memory), a master or because the setup of the write
     * handler failed, the function returns C_ERR.
     *
     * The function may return C_OK without actually installing the write
     * event handler in the following cases:
     *
     * 1) The event handler should already be installed since the output buffer
     *    already contains something.
     * 2) The client is a slave but not yet online, so we want to just accumulate
     *    writes in the buffer but not actually sending them yet.
     *
     * Typically gets called every time a reply is built, before adding more
     * data to the clients output buffers. If the function returns C_ERR no
     * data should be appended to the output buffers. */
    
    int prepareClientToWrite(client *c) {
        ……
        // 当前客户端没有待写回数据 && flag 不包含 CLIENT_PENDING_READ
        if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
                clientInstallWriteHandler(c);
    
        return C_OK;
    }
    

    clientInstallWriteHandler 如下,if 判断条件就不赘述了。

    void clientInstallWriteHandler(client *c) {
    
        if (!(c->flags & CLIENT_PENDING_WRITE) &&
            (c->replstate == REPL_STATE_NONE ||
             (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
        {
            // 将客户端的标识设置为 CLIENT_PENDING_WRITE(待写回)
            c->flags |= CLIENT_PENDING_WRITE;
            // 将 client 加入 server 的 clients_pending_write 列表
            listAddNodeHead(server.clients_pending_write,c);
        }
    }
    

    上面介绍如如何推迟客户端的读操作、写操作,那 Redis 是如何将推迟读写操作的客户端,分配给多 IO 线程执行的呢?是通过:

    • handleClientsWithPendingReadsUsingThreads 函数:将 clients_pending_read 列表中的客户端分配给 IO 线程
    • handleClientsWithPendingWritesUsingThreads 函数:将 clients_pending_write 列表中的客户端分配给 IO 线程

    如何把待「读」客户端分配给 IO 线程执行?#

    beforeSleep 函数中调用了 handleClientsWithPendingReadsUsingThreads 函数:

    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();
    

    handleClientsWithPendingReadsUsingThreads 函数如下,逻辑都在注释中:

    /* When threaded I/O is also enabled for the reading + parsing side, the
     * readable handler will just put normal clients into a queue of clients to
     * process (instead of serving them synchronously). This function runs
     * the queue using the I/O threads, and process them in order to accumulate
     * the reads in the buffers, and also parse the first command available
     * rendering it in the client structures. */
    int handleClientsWithPendingReadsUsingThreads(void) {
        // 判断 io_threads_active 是否被激活,io_threads_do_reads 是否可以用 IO 线程处理待读客户端
        if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    
        // 判断 clients_pending_read 长度
        int processed = listLength(server.clients_pending_read);
        if (processed == 0) return 0;
    
        /* Distribute the clients across N different lists. */
        listIter li;
        listNode *ln;
        // 获取 clients_pending_read 的客户端列表
        listRewind(server.clients_pending_read,&li);
        // 轮询方式,将客户端分配给 IO 线程
        int item_id = 0;
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            int target_id = item_id % server.io_threads_num;
            listAddNodeTail(io_threads_list[target_id],c);
            item_id++;
        }
    
        // 将 IO 线程的操作标识设置为「读操作」
        io_threads_op = IO_THREADS_OP_READ;
        for (int j = 1; j < server.io_threads_num; j++) {
            // 每个线程等待处理的客户端数量 → io_threads_pending 数组
            int count = listLength(io_threads_list[j]);
            setIOPendingCount(j, count);
        }
    
        // 处理 0 号线程(主线程)的待读客户端
        listRewind(io_threads_list[0],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            readQueryFromClient(c->conn);
        }
        // 清空 0 号列表
        listEmpty(io_threads_list[0]);
    
        // 循环,等待其他所有 IO 线程的待读客户端都处理完
        while(1) {
            unsigned long pending = 0;
            for (int j = 1; j < server.io_threads_num; j++)
                pending += getIOPendingCount(j);
            if (pending == 0) break;
        }
    
        /* Run the list of clients again to process the new buffers. */
        // 取出 clients_pending_read 列表
        while(listLength(server.clients_pending_read)) {
            ln = listFirst(server.clients_pending_read);
            client *c = listNodeValue(ln);
            // 判断客户端标识符是否有 CLIENT_PENDING_READ,有则表示被 IO 线程解析过
            c->flags &= ~CLIENT_PENDING_READ;
            // 将客户端从 clients_pending_read 列表中删掉
            listDelNode(server.clients_pending_read,ln);
    
            serverAssert(!(c->flags & CLIENT_BLOCKED));
            if (processPendingCommandsAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
    
            // 解析并执行客户端的所有命令
            processInputBuffer(c);
    
            /* We may have pending replies if a thread readQueryFromClient() produced
             * replies and did not install a write handler (it can't).
             */
            if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
                clientInstallWriteHandler(c);
        }
    
        /* Update processed count on server */
        server.stat_io_reads_processed += processed;
    
        return processed;
    }
    

    如何把待「写」客户端分配给 IO 线程执行?#

    待写客户端的分配处理是由 handleClientsWithPendingWritesUsingThreads 函数完成的,该函数也是在 beforeSleep 函数中调用的。逻辑和 handleClientsWithPendingReadsUsingThreads 函数很像。

    int handleClientsWithPendingWritesUsingThreads(void) {
    
        // 判断 clients_pending_write 列表的数量
        int processed = listLength(server.clients_pending_write);
        if (processed == 0) return 0;
    
        // 只有主 IO 线程 || 不使用 IO 线程
        if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
            return handleClientsWithPendingWrites();
        }
    
        /* Start threads if needed. */
        if (!server.io_threads_active) startThreadedIO();
    
        /* Distribute the clients across N different lists. */
        listIter li;
        listNode *ln;
        listRewind(server.clients_pending_write,&li);
        int item_id = 0;
        // 把待写客户端,按照轮询方式分配给 IO 线程
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
    
            if (c->flags & CLIENT_CLOSE_ASAP) {
                listDelNode(server.clients_pending_write, ln);
                continue;
            }
    
            int target_id = item_id % server.io_threads_num;
            listAddNodeTail(io_threads_list[target_id],c);
            item_id++;
        }
    
        // 将 IO 线程的操作标识设置为「写操作」
        io_threads_op = IO_THREADS_OP_WRITE;
        for (int j = 1; j < server.io_threads_num; j++) {
            // 每个线程等待处理的客户端数量 → io_threads_pending 数组
            int count = listLength(io_threads_list[j]);
            setIOPendingCount(j, count);
        }
    
        /* Also use the main thread to process a slice of clients. */
        listRewind(io_threads_list[0],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            writeToClient(c,0);
        }
        listEmpty(io_threads_list[0]);
    
        // 循环,等待其他所有 IO 线程的待写客户端都处理完
        while(1) {
            unsigned long pending = 0;
            for (int j = 1; j < server.io_threads_num; j++)
                pending += getIOPendingCount(j);
            if (pending == 0) break;
        }
    
        /* Run the list of clients again to install the write handler where
         * needed. */
        listRewind(server.clients_pending_write,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
    
            // 再次检查是否有待写客户端
            if (clientHasPendingReplies(c) &&
                    connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
            {
                freeClientAsync(c);
            }
        }
        listEmpty(server.clients_pending_write);
    
        /* Update processed count on server */
        server.stat_io_writes_processed += processed;
    
        return processed;
    }
    

    需要注意的是,stopThreadedIOIfNeeded 函数中会判断待写入的客户端数量如果 < IO 线程数 * 2,则也会直接返回,直接使用主 IO 线程处理待写客户端。这是因为待写客户端不多时,使用多线程效率反而会下降。

    int stopThreadedIOIfNeeded(void) {
        int pending = listLength(server.clients_pending_write);
    
        /* Return ASAP if IO threads are disabled (single threaded mode). */
        if (server.io_threads_num == 1) return 1;
    
        if (pending < (server.io_threads_num*2)) {
            if (server.io_threads_active) stopThreadedIO();
            return 1;
        } else {
            return 0;
        }
    }
    

    总结#

    Redis 6.0 实现的多 IO 线程机制,主要是使用多个 IO 线程,并发处理客户端读取数据解析命令写回数据,充分利用服务器的多核特性,提高 IO 效率。

    Redis server 会根据 readQueryFromClient 函数调用 postponeClientRead 函数决定是否要推迟客户端操作;会根据 addReply 函数中的 prepareClientToWrite 函数,决定是否推迟客户端的写操作。待读客户端加入到 clients_pending_read 列表,待写客户端加入 clients_pending_write 列表。

    IO 线程创建之后,会一直检测 io_threads_list 列表,如果有待读写的客户端,IO 线程就会调用 readQueryFromClient 或 writeToClient 函数进行处理。

    但是多 IO 线程并不会执行命令,执行命令仍然在主 IO 线程

    参考链接#

    Redis 源码简洁剖析系列#

    最简洁的 Redis 源码剖析系列文章

    Java 编程思想-最全思维导图-GitHub 下载链接,需要的小伙伴可以自取~

    原创不易,希望大家转载时请先联系我,并标注原文链接。

  • 相关阅读:
    设计和实施
    工厂方法与抽象工厂模式
    vue3 封装使用 pinia (可直接使用,包含数据持久化)
    设计师必备!5款一键抠图神器,让你轻松实现抠图需求!
    Linux程序设计shell程序学习
    【深度学习 论文篇 02-1 】YOLOv1论文精读
    打印水仙花数---c语言刷题
    英国小黄车玩法,国际版抖音tiktok小店
    [附源码]Python计算机毕业设计Django南通大学福利发放管理系统
    USACO Guide银组3. 自定义比较器和坐标压缩
  • 原文地址:https://www.cnblogs.com/510602159-Yano/p/15895377.html