• KeyDB源码解析二——线程安全


    通过多线程处理用户请求之后,首要解决的问题是数据间的线程安全。处理方法就是加锁,因为数据都是存在内存中,大部分请求的处理速度都很困,用linux原生的锁带来的线程间上下文切换代价过大,KeyDB设计了一个fastlock。

    fastlock

    fastlock的设计思想是优先尝试在用户态加锁,如果用户态获取不到锁,再通过系统调用进入内核态通过futex阻塞加锁,这样尽可能的避免线程间的上下文切换。

    struct ticket
    {
        union
        {
            struct
            {
                uint16_t m_active;
                uint16_t m_avail;
            };
            unsigned u;
        };
    };
    
    struct fastlock
    {
        volatile int m_pidOwner;
        volatile int m_depth;
        char szName[56];
        /* Volatile data on seperate cache line */
        volatile struct ticket m_ticket;
        unsigned futex;
        // 防止false sharing
        char padding[56]; 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    这里fastlock用了两个小技巧:1、char padding[56],故意填充56byte,和前面的ticket、futex组成64byte,是一个cache line,防止线程间伪共享出现;2、通过volatile避免编译器优化,强制CPU读取变量时每次都从内存中读取,从而避免将szName,m_ticket划分到一个cache line中。
    下面看看加解锁过程:

    extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker)
    {
        int pidOwner;
        __atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE);
        if (pidOwner == gettid())
        {
            ++lock->m_depth;
            return;
        }
    
        int tid = gettid();
        unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE);
        unsigned cloops = 0;
        ticket ticketT;
        int fHighPressure;
        __atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
        unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
    
        if (worker != nullptr) {
            for (;;) {
                __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
                if ((ticketT.u & 0xffff) == myticket)
                    break;
                if (!worker())
                    goto LNormalLoop;
            }
        } else {
    LNormalLoop:
            for (;;)
            {
                __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
                // 如果相等,说明此时没有其他线程竞争,可以直接获取锁
                if ((ticketT.u & 0xffff) == myticket)
                    break;
    
    #if defined(__i386__) || defined(__amd64__)
                __asm__ __volatile__ ("pause");
    #elif defined(__aarch64__)
                __asm__ __volatile__ ("yield");
    #endif
    
                if ((++cloops % loopLimit) == 0)
                {
                	// 
                    fastlock_sleep(lock, tid, ticketT.u, myticket);
                }
            }
        }
    
        lock->m_depth = 1;
        __atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE);
        ANNOTATE_RWLOCK_ACQUIRED(lock, true);
        std::atomic_thread_fence(std::memory_order_acquire);
    }
    extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned myticket)
    {
        UNUSED(lock);
        UNUSED(pid);
        UNUSED(wake);
        UNUSED(myticket);
    #ifdef __linux__
        g_dlock.registerwait(lock, pid);
        unsigned mask = (1U << (myticket % 32));
        __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
    
        // double check the lock wasn't release between the last check and us setting the futex mask
        uint32_t u;
        __atomic_load(&lock->m_ticket.u, &u, __ATOMIC_ACQUIRE);
        if ((u & 0xffff) != myticket)
        {
            futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask);
        }
        
        __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
        g_dlock.clearwait(lock, pid);
    #endif
        __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    加锁时,首先在用户态通过原子变量的比较尝试加锁,尝试一定次数之后,如果失败,通过fastlock_sleep,futex阻塞等待m_ticket加锁,对于Redis这样的内存应用,大部分操作都很快,很有可能在用户态就获取到锁。
    解锁:

    extern "C" void fastlock_unlock(struct fastlock *lock)
    {
        --lock->m_depth;
        if (lock->m_depth == 0)
        {
            int pidT;
            __atomic_load(&lock->m_pidOwner, &pidT, __ATOMIC_RELAXED);
            serverAssert(pidT >= 0);  // unlock after free
            int t = -1;
            __atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE);
            std::atomic_thread_fence(std::memory_order_acq_rel);
            ANNOTATE_RWLOCK_RELEASED(lock, true);
            uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE);  // on x86 the atomic is not required here, but ASM handles that case
    #ifdef __linux__
            unlock_futex(lock, activeNew);
    #else
    		UNUSED(activeNew);
    #endif
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    对m_active原子加1,使得加锁的用户态条件成立。

    线程安全

    保证线程安全有两把锁要加,一个是每个client自身的锁,因为有些命令会操作非自己worker线程绑定的client,第二个是保护数据的全局锁。
    从读取client数据开始:

    void readQueryFromClient(connection *conn) {
    	std::unique_lock<decltype(c->lock)> lock(c->lock, std::defer_lock);
        // 先尝试获取client的锁
        if (!lock.try_lock())   //  获取不到,说明当前client正在做其他事情,以后再尝试
            return; // Process something else while we wait
        // 接下来读取并解析数据
        // 最后处理请求
        if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) {
            parseClientCommandBuffer(c);
            if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
                // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
                //  so we exclude them unless the snapshot we need already exists.
                // Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf
                bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
                bool fWriteTooRecent = !g_fTestMode && (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2);
    
                // The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
                // 有快照并且写请求不频繁,则根据MVCC处理,注意,这里不需要加全局锁
                if (!fWriteTooRecent || fSnapshotExists) {
                    processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
                }
            }
            // 否则进入在beforeSleep中异步处理
            if (!c->vecqueuedcmd.empty())
                serverTL->vecclientsProcess.push_back(c);
        } else {
            // If we're single threaded its actually better to just process the command here while the query is hot in the cache
            //  multithreaded lock contention dominates and batching is better
            // 在处理命令之前,获取全局锁
            AeLocker locker;
            locker.arm(c);
            runAndPropogateToReplicas(processInputBuffer, c, true /*fParse*/, CMD_CALL_FULL);
        }
    }
    
    void beforeSleep(struct aeEventLoop *eventLoop) {
      AeLocker locker;
      int iel = ielFromEventLoop(eventLoop);
    
       // 获取全局锁
      locker.arm();
      ...
      // 处理用户命令
      runAndPropogateToReplicas(processClients);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    因为beforeSleep的诸多操作中都需要加全局锁,所以这里把用户的命令解析出来之后,不会立即处理,而是放入队列中异步处理,这样做可以节省一次加全局锁的操作。
    然后根据processClients函数从队列中取出命令并逐个执行:

    void processClients()
    {
        serverAssert(GlobalLocksAcquired());
    
        // Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer
        while (!serverTL->vecclientsProcess.empty()) {
            client *c = serverTL->vecclientsProcess.front();
            serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin());
    
            /* There is more data in the client input buffer, continue parsing it
            * in case to check if there is a full command to execute. */
            std::unique_lock<fastlock> ul(c->lock);
            processInputBuffer(c, false /*fParse*/, CMD_CALL_FULL);
        }
    
        if (listLength(serverTL->clients_pending_asyncwrite))
        {
            ProcessPendingAsyncWrites();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    WebRTC[52] - WebRTC 带宽分配逻辑详解
    城市内涝解决方案:实时监测,提前预警,让城市更安全
    MySQL主从复制最全教程(CentOS7 yum)
    GPS定位与IP地址有什么区别?
    操作系统——死锁及其解决方案(p38-p41王道视频、课本ch6)
    R语言:卡方检验
    linux日志文件删除
    Node.js躬行记(27)——接口管理
    知识图谱(Knowledge Graph)- Neo4j 5.10.0 使用 - CQL - 太极拳传承谱系表
    tooltip里面画echarts图
  • 原文地址:https://blog.csdn.net/yh88623131/article/details/127912697