• KeyDB源码解析三——多版本控制


    Redis官方推荐的单节点内存大小不超过16G,因为过大的内存在做rdb的时候采用fork系统调用,会导致较大的延时,从而引起系统抖动,根据官方的测试结果,大约10ms~300ms/GB。KeyDB采用多线程来扩展节点多核的能力,对应大节点,也需要解决内存的大小限制,KeyDB采用多版本控制MVCC的方式来避免fork系统调用:https://docs.keydb.dev/docs/mvcc

    MVCC

    KeyDB在系统中维护数据库的多个版本,修改在最新版本上,其他版本只读,看下MVCC如何替代fork。
    多版本数据定义在redisDbPersistentDataSnapshot中:

    class redisDbPersistentData {
    	// Keyspace
      dict *m_pdict = nullptr;          /* The keyspace for this DB */
      // 标记当前快照中为删除的key
      dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
      // 链表方式管理,下一个快照指针,如果不为nullptr,说明当前db存在snapshot
      const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
      std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在执行BGSAVE命令时:rdbSaveBackground->launchRdbSaveThread,会创建一个新的snapshot,RDB文件生成之后,会释放snapshot:

    int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi){
    	...
    	// rdb之前,先创建snapshot
    	for (int idb = 0; idb < cserver.dbnum; ++idb)
                args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /* fOptional */);
        ...
        if (pthread_create(&child, &tattr, rdbSaveThread, args)) {
           pthread_attr_destroy(&tattr);
            // rdb完成之后,结束这个snapshot
            for (int idb = 0; idb < cserver.dbnum; ++idb)
                g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
            args->~rdbSaveThreadArgs();
            zfree(args);
            return C_ERR;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    创建snapshot,把当前db中的内容设置为snapshot,并插入snapshot链表中,此后这个db就只读了,后续生成新的db,最为当前的db,用来更新

    const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
    	...
    	if (m_spdbSnapshotHOLDER != nullptr)
        {
            // If possible reuse an existing snapshot (we want to minimize nesting)
            // 查看当前的快照是否足够新,来复用
            if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
            {
                if (!m_spdbSnapshotHOLDER->FStale())
                {
                    m_spdbSnapshotHOLDER->m_refCount++;
                    return m_spdbSnapshotHOLDER.get();
                }
                serverLog(LL_VERBOSE, "Existing snapshot too old, creating a new one");
            }
        }
    
        // See if we have too many levels and can bail out of this to reduce load
        // 快照的层级不能太多,防止内存膨胀
        if (fOptional && (levels >= 6))
        {
            serverLog(LL_DEBUG, "Snapshot nesting too deep, abondoning");
            return nullptr;
        }
        ...
        // 快照的dict是只读的,不需要rehash
        discontinueAsyncRehash(m_pdict);
        discontinueAsyncRehash(m_pdictTombstone);
    
        spdb->m_fAllChanged = false;
        spdb->m_fTrackingChanges = 0;
        // 直接将当前db的dict设置为快照的dict
        spdb->m_pdict = m_pdict;
        spdb->m_pdictTombstone = m_pdictTombstone;
        // Add a fake iterator so the dicts don't rehash (they need to be read only)
        dictPauseRehashing(spdb->m_pdict);
        dictForceRehash(spdb->m_pdictTombstone);    // prevent rehashing by finishing the rehash now
        spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
        if (m_spstorage != nullptr)
            spdb->m_spstorage = std::shared_ptr<StorageCache>(const_cast<StorageCache*>(m_spstorage->clone()));
        spdb->m_pdbSnapshot = m_pdbSnapshot;
        spdb->m_refCount = 1;
        spdb->m_mvccCheckpoint = getMvccTstamp();
        ...
        // 生成新的dict
        m_pdict = dictCreate(&dbDictType,this);
        dictExpand(m_pdict, 1024);   // minimize rehash overhead
        m_pdictTombstone = dictCreate(&dbTombstoneDictType, this);
    
        serverAssert(spdb->m_pdict->pauserehash == 1);
    
        // 将新创建的快照插入snapshot链表中
        m_spdbSnapshotHOLDER = std::move(spdb);
        m_pdbSnapshot = m_spdbSnapshotHOLDER.get();
    
        // Finally we need to take a ref on all our children snapshots.  This ensures they aren't free'd before we are
        redisDbPersistentData *pdbSnapshotNext = m_pdbSnapshot->m_spdbSnapshotHOLDER.get();
        while (pdbSnapshotNext != nullptr)
        {
            pdbSnapshotNext->m_refCount++;
            pdbSnapshotNext = pdbSnapshotNext->m_spdbSnapshotHOLDER.get();
        }
    
        if (m_pdbSnapshotASYNC != nullptr)
        {
            // free the async snapshot, it's done its job
            endSnapshot(m_pdbSnapshotASYNC);    // should be just a dec ref (FAST)
            m_pdbSnapshotASYNC = nullptr;
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    创建完snapshot之后,就可以通过rdbSave对该snapshot生成RDB文件,核心函数为iterate_threadsafe,首先遍历当前dict,然后遍历链表中的其他snapshot:

    bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
    {
        return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true);
    }
    bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const {
    	...
    	// 先遍历当前snatshot
    	dictIterator *di = dictGetSafeIterator(m_pdict);
        while(fResult && ((de = dictNext(di)) != nullptr))
        {
            --celem;
            robj *o = (robj*)dictGetVal(de);
            if (!fn((const char*)dictGetKey(de), o))
                fResult = false;
        }
        dictReleaseIterator(di);
        ...
        // 递归遍历链表中剩下的snapshot
        const redisDbPersistentDataSnapshot *psnapshot;
        __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
        if (fResult && psnapshot != nullptr)
        {
            std::function<bool(const char*, robj_roptr o)> fnNew = [&fn, &celem, dictTombstone](const char *key, robj_roptr o) {
                // 如果key已经被标记删除,则跳过
                dictEntry *deTombstone = dictFind(dictTombstone, key);
                if (deTombstone != nullptr)
                    return true;
    
                // Alright it's a key in the use keyspace, lets ensure it and then pass it off
                --celem;
                return fn(key, o);
            };
            fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    snapshot的CURD

    有了snapshot之后,对db的CURD都要考虑snapshot。
    新增KV对:

    bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_iter *piterExisting)
    {
        if (!fAssumeNew && (g_pserver->m_pstorageFactory != nullptr || m_pdbSnapshot != nullptr))
            ensure(key);
        dictEntry *de;
        // 直接放入当前db的dict中
        int res = dictAdd(m_pdict, key, o, &de);
        serverAssert(FImplies(fAssumeNew, res == DICT_OK));
        if (res == DICT_OK)
        {
    #ifdef CHECKED_BUILD
            if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr)
            {
                serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
            }
    #endif
            trackkey(key, false /* fUpdate */);
        }
        else
        {
            if (piterExisting)
                *piterExisting = dict_iter(m_pdict, de);
        }
        return (res == DICT_OK);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    查找KV对:

      dict_iter find(const char *key) {
        // 先在本m_pdict查找
        dictEntry *de = dictFind(m_pdict, key);
        // 然后递归在各个snapshot中查找
        ensure(key, &de);
        return dict_iter(m_pdict, de);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ensure是递归查找各个snapshot中是否存在该key,首先查找snapshot中的m_pdictTombstone,如果已经被标记删除了,则不再需要继续查找。
    更新KV对:

    void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
        // 如果准备overwrite,并且当前db中有这个key,先标记key失效
        db->prepOverwriteForSnapshot(szFromObj(key));
        dict_iter iter;
        if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */, false /*fAssumeNew*/, &iter)) {
            dbOverwrite(db, key, val, !keepttl, &iter);
        }
        incrRefCount(val);
        if (signal) signalModifiedKey(c,db,key);
    }
    void redisDbPersistentData::prepOverwriteForSnapshot(char *key)
    {
        if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)
            return;
    
        if (m_pdbSnapshot != nullptr)
        {
        	// 如果snapshot中有,则在m_pdictTombstone插入该key,表示该key需要被删除
            auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
            if (itr.key() != nullptr)
            {
                if (itr.val()->FExpires()) {
                    // Note: I'm sure we could handle this, but its too risky at the moment.
                    //  There are known bugs doing this with expires
                    return;
                }
                sds keyNew = sdsdupshared(itr.key());
                if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK)
                    sdsfree(keyNew);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    删除KV对

    bool redisDbPersistentData::syncDelete(robj *key)
    {
        ...
        if (m_spstorage != nullptr)
            fDeleted = m_spstorage->erase(szFromObj(key));
        // 先在当前m_pdict中删除
        fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
    
        if (fDeleted) {
            ...
            if (m_pdbSnapshot != nullptr)
            {
                // 如果有snapshot,那么在snapshot的m_pdictTombstone添加key的删除标记
                auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
                if (itr != nullptr)
                {
                    sds keyTombstone = sdsdupshared(itr.key());
                    uint64_t hash = dictGetHash(m_pdict, keyTombstone);
                    if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK)
                        sdsfree(keyTombstone);
                }
            }
            if (g_pserver->cluster_enabled) slotToKeyDel(szFromObj(key));
            return 1;
        } else {
            return 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    snapshot的GC

    当不需要snapshot时,通过把各个snapshot的dict进行merge,在merge之前,删除已经被标记为删除的key:

    void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
    {
    	// 第一层的snapshot已经没有ref了 ,可以释放,那么先递归释放快照链表中的其他链表
        if (m_spdbSnapshotHOLDER->m_refCount == 1)
            recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
          // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
        dictIterator *di = dictGetIterator(m_pdictTombstone);
        dictEntry *de;
        dictPauseRehashing(m_spdbSnapshotHOLDER->m_pdict);
        auto splazy = std::make_unique<LazyFree>();
        while ((de = dictNext(di)) != NULL)
        {
            dictEntry **dePrev;
            dictht *ht;
            // BUG BUG Why not a shallow search?
            dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/);
            if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot)
            {
                // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt)
    #ifdef CHECKED_BUILD
                serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
    #endif
                // 把本snapshot中要删除的key传递到下一层snapshot中
                dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de));
                continue;
            }
            else if (deSnapshot == nullptr)
            {
                serverAssert(m_spdbSnapshotHOLDER->m_spstorage != nullptr); // the only case where we can have a tombstone without a snapshot child is if a storage engine is set
                continue;
            }
            
            // Delete the object from the source dict, we don't use dictDelete to avoid a second search
            *dePrev = deSnapshot->next; // Unlink it first
            if (deSnapshot != nullptr) {
                if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) {
                    dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
                } else {
                    splazy->vecde.push_back(deSnapshot);
                }
            }
            ht->used--;
        }
        // Stage 2 Move all new keys to the snapshot DB
        // merge snapshot
        dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict);
        
        // Stage 3 swap the databases with the snapshot
        // 移除snapshot
        std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict);
        if (m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
            std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone);
         ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    MVCC的其他应用

    除了替代fork生成rdb,MVCC在keys、scan命令也有应用,这两个命令都是需要当前DB的快照
    keys:

    void keysCommand(client *c) {
        sds pattern = szFromObj(c->argv[1]);
    
        const redisDbPersistentDataSnapshot *snapshot = nullptr;
        // 创建snapshot
        if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
            snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
        if (snapshot != nullptr)
        {
            sds patternCopy = sdsdup(pattern);
            aeEventLoop *el = serverTL->el;
            blockClient(c, BLOCKED_ASYNC);
            redisDb *db = c->db;
            // 遍历snapshot
            g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{
                keysCommandCore(c, snapshot, patternCopy);
                sdsfree(patternCopy);
                aePostFunction(el, [c, db, snapshot]{
                    aeReleaseLock();    // we need to lock with coordination of the client
    
                    std::unique_lock<decltype(c->lock)> lock(c->lock);
                    AeLocker locker;
                    locker.arm(c);
    
                    unblockClient(c);
    
                    locker.disarm();
                    lock.unlock();
                    db->endSnapshotAsync(snapshot);
                    aeAcquireLock();
                });
            });
        }
        else
        {
            keysCommandCore(c, c->db, pattern);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    scan命令通过asyncCommand命令实现:

    bool client::asyncCommand(
        std::function<void(const redisDbPersistentDataSnapshot *,
                           const std::vector<robj_sharedptr> &)> &&mainFn,
        std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn) {
      serverAssert(FCorrectThread(this));
      const redisDbPersistentDataSnapshot *snapshot = nullptr;
      if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
        snapshot =
            this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */);
      if (snapshot == nullptr) {
        return false;
      }
      aeEventLoop *el = serverTL->el;
      blockClient(this, BLOCKED_ASYNC);
      g_pserver->asyncworkqueue->AddWorkFunction(
          [el, this, mainFn, postFn, snapshot] {
            std::vector<robj_sharedptr> args = clientArgs(this);
            aePostFunction(el, [this, mainFn, postFn, snapshot, args] {
              aeReleaseLock();
              std::unique_lock<decltype(this->lock)> lock(this->lock);
              AeLocker locker;
              locker.arm(this);
              unblockClient(this);
              mainFn(snapshot, args);
              locker.disarm();
              lock.unlock();
              if (postFn)
                postFn(snapshot);
              this->db->endSnapshotAsync(snapshot);
              aeAcquireLock();
            });
          });
      return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
  • 相关阅读:
    借助这个宝藏神器,我成为全栈了
    Linux内核调试篇——获取内核函数地址的四种方法(一文解决)
    Android Kotlin Paging3 Flow完整教程
    【UNR #6 E】神隐(交互)(二进制分组)
    常见磁盘调度算法总结
    Nacos 如何实现配置文件动态更新的
    【数据结构和算法】--N叉树中,返回某些目标节点到根节点的所有路径
    从零开始:使用 Kubernetes 部署 Nginx 应用
    CCF ChinaSoft 2023 论坛巡礼 | CCF-华为胡杨林基金-软件工程专项(海报)论坛
    学习WCET(一)
  • 原文地址:https://blog.csdn.net/yh88623131/article/details/127990758