• 天池比赛记录


    赛题简单介绍

    比赛地址:第四届全球数据库大赛赛道1:云原生共享内存数据库性能优化

    赛题大致内容:
    本地读写速度快,但空间小,远端读写速度慢,但空间大(通过eRDMA读写远端数据)
    初赛时实现一个简化、高效的KV存储引擎,支持Write、Read接口,此时key-value皆为定值
    复赛额外实现一个Delete接口和重建(rebuild)功能,此时value为变长值。
    评测程序分为2个阶段:
    1)程序正确性验证
    验证KV操作的正确性(包括加密/解密过程),这部分的耗时不计入运行时间的统计。如果正确性测试不通过,则终止,评测失败。
    2)性能评测
    引擎使用的本地内存和远端内存限制在 8 GB 和 32 GB。 阶段1. 每个线程分别写入约 12 M个Key大小为 16 Bytes,Value大小为 80-1024 Bytes 的 KV对象,并选择性读取验证;阶段2. 每个线程会进行并发删除,每个线程删除 10 M个Key,删除操作耗时将计入运行时间;阶段3. 每个线程分别再次写入约 10 M个Key大小为 16 Bytes,Value大小为 80-256 Bytes 的 KV对象;接着会进行读写混合测试,开启16个线程以75%:25%的读写比例调用64M次。其中75%的读访问具有热点的特征,大部分的读访问集中在少量的Key上面。最后的分数为以上操作耗时的总和。

    数据安排如下:本阶段保证任意时刻数据的value部分长度和不超过30G。纯写入的12M次操作中大约70%的操作Value长度在80-128Bytes之间;大约20%的操作Value长度在129-256Bytes之间;大约10%的操作Value长度在257-1024Bytes之间。读写混合的64M操作中,所有Set操作的Value长度均不超过128Bytes。
    在这里插入图片描述
    评测程序输出大致如下:

    Start local encryption evaluation...Start evaluation.
    Generating the ZipFian PDF......
    Generate PDF Done.
    Do new LocalEngine.
    Start LocalEngine using start interface.
    LocalEngine::start finsh
    
    Starting Write-Read Testing.
    ##################### Start Write from index: 0
    ...
    ##################### End The Write-Read Test ##############################
    Time for Write-Read Test 32.000000 seconds
    LocalEngine::stop finsh
    ##################### Evaluation Success ##############################
    Start local perf evaluation...Start evaluation.
    Generating the ZipFian PDF......
    Generate PDF Done.
    Do new LocalEngine.
    Start LocalEngine using start interface.
    LocalEngine::start finsh
    
    Starting Write-Read Testing.
    ##################### Start Write from index: 0
    ...
    ##################### End The Write-Read Test ##############################
    Time for Write-Read Test 158.000000 seconds
    
    Starting Deleting Testing.
    ##################### Start Delete from index: 0
    ...
    ##################### End The Delete Test ##############################
    Time for Delete Test 47.000000 seconds
    
    Starting HOT Data Testing.
    ##################### Start test from index: 0
    ...
    ##################### End The Hot Data Test ##############################
    Time for Hot Data Test 46.000000 seconds
    Hot:46
    LocalEngine::stop finsh
    Total:253
    ##################### Evaluation Success ##############################
    Success evaluation, update score...37.2979146325685141834147
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    复赛排名第20名,正好是极客奖最后一名,嘻嘻。

    比赛经历

    在初赛时官方提供了一个简单的demo,将key和远端地址存于本地,value全部存于远端,初赛结束时我们的代码大致架构为:

    key-value数据以页面的方式存储起来,本地存储key的元数据(key,(页号,索引))(5G),缓存少量页(2G),远端页的远端地址,远端存储大部分页数据(30G)。
    写入时:
    	插入:将数据插入新申请的页中,写入元数据(key,(页号,页索引))
    	更新:若对应页面当前存于远端,则视作插入操作处理,并更新key元数据,
    		 若对应页面存于本地,则直接更新value
    	更新LRU列表
    读取时:
    	若页面存于远端,则读取远端数据,将该页加入本地缓存
    	若页面存于本地,则直接读取数据
    	更新LRU列表
    淘汰:
    	开启一个后台线程,当本地缓存页大于阈值时,将最久未被访问的页写入远端,记录远端地址
    
    哈希:
    	为减小锁争用,我们构建了许多个执行请求的实体,并通过对key进行哈希将请求分发至某一实体
    	(LocalEngine->LocalEngineEntity),而后对key进行第二次哈希,写入/读取key的元数据。
    	由于STL的map占用空间较大,官方提供了哈希表的简单实现,直接刚开始就申请足够的空间,然后使
    	用拉链法连接哈希值相同的数据,不需要动态扩容。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述
    初赛结束时,我们只得了9分,最大的原因在于第一次哈希与第二次哈希使用同样的哈希函数(std::hash),导致LocalEngineEntity里的自定义哈希表中很大的一部分空间永远不会被访问(哈希值皆为LocalEngineEntity下标的整数),增大了哈希冲突的概率。
    在初赛的基础上编写复赛代码,主要实现三个功能:value的加密,删除操作,重构操作(删除被标记为无效的数据,整理有效数据使其排列更紧凑)。
    加密:value的加密根据IPP-Crypto的接口简单实现一个加密算法即可,没有几行代码。

    bool LocalEngine::set_aes() {
      // Current algorithm is not supported, just for demonstration.
      m_aes_.algo = CBC;
      m_aes_.key_len = 16;
      m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
      if (m_aes_.key == nullptr) return false;
      m_aes_.blk_size = 16;
      m_aes_.piv_len = 16;
      m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
      if (m_aes_.piv == nullptr) return false;
    
      int ctxSize;               // AES context size
      ippsAESGetSize(&ctxSize);  // evaluating AES context size
      // allocatting memory for AES context
      m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
      // AES context initialization
      ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
      return true;
    }
    // 参考pdf实现简单加密算法
    bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
      Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
      // encrypting plaintext
      ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
      std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
      encrypt_value = std::move(tmp);
      return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    删除: 代码的数据通路为:key——(page, index) ——本地缓存m_data_map——远端地址m_addr_map。故实现删除操作首先需要将key—>(page, index)的映射删除,这个只需要增加自定义哈希表的删除功能,注意将删除后的slot插入另一个链表中,以便复用该slot

    data_info_t hash_map_t::remove(const std::string &key, int index) {
      hash_map_slot *cur = m_bucket_[index];
      hash_map_slot *parent = nullptr;
      if (cur == nullptr) {
        return kNullInfo;
      }
      while (cur) {
        if (memcmp(cur->key, key.c_str(), 16) == 0) {
          // 在bucket中删除该slot
          if (parent == nullptr) {
            m_bucket_[index] = cur->next;
          } else {
            parent->next = cur->next;
          }
          // 加入后备链表
          cur->next = m_slot_head_->next;
          m_slot_head_->next = cur;
          return cur->info;
        }
        parent = cur;
        cur = cur->next;
      }
      return kNullInfo;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    将该映射删除后,无法通过key访问相应的value,但value仍然占据存储空间,故需要标记该位置,表示该value已经被删除,在重构操作时不需要迁移该位置的数据。

    std::bitset<kMaxIndex> m_bitmap_[kBitmapSize];  // 删除为1,正常为0
    
    • 1

    每一页增加一个位图,标记页中记录是否有效。其中kMaxIndex表示页中最大记录数,kBitmapSize表示运行过程中的最大页号。
    与位图相关的另一个操作是更新操作,如果更新操作对应的数据当前在远端,若此时读取远端数据再进行本地更新效率太低;故将这个操作拆分为删除远端数据+插入新数据;这时也需要将远端数据标记为无效。

    bool LocalEngineEntity::deleteK(const std::string &key) {
      m_delete_envent_ = true;
      int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
      m_mutex_.lock();
      data_info_t info = m_page_map_.remove(key, hash_index);  // 删除对应key的元数据
      m_mutex_.unlock();
      m_bitmap_[info.page_id].set(info.index, true);  // 将对应记录标记为删除
      return true;
    }
    // 数据在远端的更新操作
    m_bitmap_[slot->info.page_id].set(slot->info.index, true);  // 将之前数据标记为删除
    slot->info = info;                                          // 更新元数据信息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    重构:
    本地缓存与远端数据交互的基本单位是页,程序运行过程中,无效记录会越来越多,故需定时读取所有页,将有效记录写入到新页中,删除旧页,类似于一种垃圾回收。
    在读取远端页时,先读取其头部元数据,再依次读取有效记录,而不是读取整个页数据,这是因为rebuild时远端页有效记录占比较小,这样的读取方式可以减小读取量。

    void LocalEngineEntity::rebuild_index() {
      std::lock_guard<std::mutex> lk(m_mutex_);
      std::string key;
      std::string value;
      data_info_t data_info;
      std::shared_ptr<Page> page;
      std::unordered_map<page_id_t, remote_info_t> tmp_addr_map;  // 暂时存储页号与远端地址映射
      uint32_t new_cache_size = 1;
      std::vector<page_id_t> local_id = m_lru_list_.clear();
      auto new_page = m_cur_page_;
      page_id_t new_page_id = m_cur_page_id_;
      // 处理本地缓存页
      for (auto &page_id : local_id) {
        auto &bitmap = m_bitmap_[page_id];
        if (bitmap.none()) {  // 不存在删除的记录,不进行操作
          m_lru_list_.insert(page_id);
          new_cache_size++;
        } else if (!bitmap.all()) {  //存在有效记录
          auto page = m_data_map_[page_id];
          int record_num = page->record_number();
          for (int i = 0; i < record_num; i++) {
            if (!bitmap.test(i)) {        // 该记录未被删除
              if (new_page->is_full()) {  // 页满,写入本地
                m_data_map_[new_page_id] = new_page;
                new_cache_size++;
                m_lru_list_.insert(new_page_id);
                new_page = std::make_shared<Page>();
                new_page_id++;
              }
              // 读出数据插入新页并更新元数据映射
              key = page->read_key(i);
              data_info.index = new_page->insert(key, page->read_value(i));
              data_info.page_id = new_page_id;
              m_page_map_.update(key, data_info);
            }
          }
          m_data_map_[page_id] = nullptr;  // 删除原先页
        } else {
          m_data_map_[page_id] = nullptr;  // 删除原先页
        }
      }
      page_id_t page_id;
      remote_info_t info;
      char head_data[kMaxIndex * 10];
      char kv_data[2 * kMaxValueSize];
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
      uint16_t head_length;
      uint16_t offset, length;
    
      // 处理远端内存页
      for (auto &kv : m_addr_map_) {
        page_id = kv.first;
        info = kv.second;
        auto &bitmap = m_bitmap_[page_id];
        if (bitmap.none()) {  // 不存在删除的记录,不进行操作
          tmp_addr_map.insert({page_id, info});
        } else if (bitmap.all()) {  // 不存在有效记录,将后端地址加入地址列表
          m_addr_list_.emplace(info);
        } else {
          bool avai_info = true;  // 是否将该远端地址加入地址列表
          head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
          // 读取页头部数据
          m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
          int record_num = m_max_index_[page_id];
          for (int i = 0; i < record_num; i++) {
            if (!bitmap.test(i)) {
              if (new_page->is_full()) {
                if (new_cache_size > kPageThreshold) {  // 本地页满,写入远程
                  std::string &&page_data = new_page->to_string();
                  uint32_t len = page_data.length();
                  m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
                  tmp_addr_map.insert({new_page_id, info});  // 暂时记录页号与远程地址映射
                  avai_info = false;
                } else {  // 写入本地缓存
                  m_data_map_[new_page_id] = new_page;
                  new_cache_size++;
                  m_lru_list_.insert(new_page_id);
                }
                new_page = std::make_shared<Page>();
                new_page_id++;
              }
              // 读出数据插入新页并更新元数据映射
              offset = u16_pointer[i + 2];
              length = u16_pointer[i + 3] - u16_pointer[i + 2];
              m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
              key = std::string(kv_data, kv_data + 16);
              value = std::string(kv_data + 16, kv_data + length);
              data_info.index = new_page->insert(key, std::move(value));
              data_info.page_id = new_page_id;
              m_page_map_.update(key, data_info);
            }
          }
          if (avai_info) {
            m_addr_list_.emplace(info);
          }
        }
      }
      m_addr_map_ = std::move(tmp_addr_map);
      m_cur_page_id_ = new_page_id;
      m_cur_page_ = new_page;
      m_vicitm_id_ = kNullPage;
      m_vicitm_page_ = nullptr;
      m_cache_size_ = new_cache_size;
      m_last_update_id_ = kNullPage;
      m_data_map_[new_page_id] = new_page;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107

    一些细节

    变长字符编码方式演化

    auto &&vicitm_page_data = victim_page->to_string();  // 记录淘汰页数据,开始写入远端内存
    m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), len, remote_info.remote_addr,
                                               remote_info.rkey);  
    
    • 1
    • 2
    • 3

    kv数据在本地缓存是以string数组(Page类)的形式存储的,当本地缓存达到阈值时,需将很久未访问的页写入到远端,此时是写一个大字符串;故需要将string数组转换为一个大字符串。由于之后有可能再访问该页,需要把各记录的大小也编码进字符串中。刚开始我将每个记录编码成记录大小 +‘\0’ +记录内容的形式。

    /*
    页数据字符串形式的排列格式为:
    页索引数(记录数)'\0'
    页容量(字节数)'\0'
    记录大小 '\0' 记录内容
    记录大小 '\0' 记录内容
    ...
    '\0'(页结束标志)
    */
    class Page {
     public:
      Page(std::string &data);  // 以字符串填充页
      std::string to_string();  // 页数据转换成字符串
     private:
      int get_size(const std::string &data, int &start);  // 字符串转数字
      std::string get_string(int size);                   // 数字转字符串
    
      std::vector<std::string> m_value_;  // 以vector存储记录
      std::vector<std::string> m_key_;    // 以vector存储记录
      uint32_t m_cur_size_;               // 当前页数据大小
      uint32_t m_cur_index_;              // 当前索引
    };
    
    // 将start为起点的字符串转换成数字
    int Page::get_size(const std::string &data, int &start) {
      std::string size_str = "";
      int value_size = 0;
      while (true) {
        if (data[start] != '\0') {  // 未遇到结束标志,加入字符串
          size_str.push_back(data[start]);
        } else {
          if (!size_str.empty()) {  // 字符串不为空,转换成数字
            value_size = std::stoi(size_str);
          }
          break;
        }
        start++;
      }
      start++;            // 跳过当前的结束字符
      return value_size;  // 页末尾结束字符返回0
    }
    
    // 将数字转换成字符串并在其后填充结束字符
    std::string Page::get_string(int size) {
      auto res = std::to_string(size);
      res.push_back('\0');
      return res;
    }
    
    Page::Page(std::string &data) {
      int start = 0;
      int value_num = get_size(data, start);  // 读取记录数
      int page_size = get_size(data, start);  // 读取页大小
      m_key_.reserve(value_num);
      m_value_.reserve(value_num);
    
      m_cur_index_ = value_num;
      m_cur_size_ = page_size;
      std::string kv;
      int value_size;
      while (true) {  // 循环读取记录
        value_size = get_size(data, start);
        if (value_size <= 0) {  // 遇到页末尾结束字符则返回
          break;
        }
        kv = data.substr(start, value_size);
        start += value_size;
        m_key_.emplace_back(kv.substr(0, 16));
        m_value_.emplace_back(kv.substr(16));
      }
    }
    
    std::string Page::to_string() {
      assert(this != nullptr);
      std::string data;
      data.reserve(m_cur_size_ + m_cur_index_ * 6);  // 提前预订字符串空间
      data.append(get_string(m_cur_index_));         // 加入记录数
      data.append(get_string(m_cur_size_));          // 加入页大小
      assert(m_cur_index_ == m_key_.size());
      assert(m_key_.size() == m_value_.size());
      for (uint32_t i = 0; i < m_cur_index_; i++) {
        data.append(get_string(16 + m_value_[i].length()));  // 加入记录大小
        data.append(m_key_[i]);                              // 加入记录数据
        data.append(m_value_[i]);
      }
      data.push_back('\0');  // 页末尾填充结束字符
      return data;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    而后稍微改变一下编码方式,将记录大小全部放在头部,记录数据放在尾部。

    /*
    页数据字符串形式的排列格式为:
    页索引数(记录数)'\0'
    页容量(字节数)'\0'
    记录大小'\0'
    记录大小'\0'
    记录内容(key value)
    记录内容(key value)
    
    保存key值以便在rebuild时能够更改元数据信息
    ...
    */
    class Page {
     public:
      Page();
      Page(std::string &data);            // 以字符串填充页
      std::string to_string();            // 页数据转换成字符串
     private:
      std::string get_string(int size);  // 数字转字符串
    
      std::string m_key_[kMaxIndex];
      std::string m_value_[kMaxIndex];
      uint32_t m_cur_size_;   // 当前页数据大小
      uint32_t m_cur_index_;  // 当前索引
    
      // std::vector m_value_;  // 以vector存储key值,方便rebuild时更新元数据
      // std::vector m_key_;    // 以vector存储记录
    };
    
    std::string Page::to_string() {
      std::string data;
      data.reserve(m_cur_size_ + m_cur_index_ * 4);  // 提前预订字符串空间
      data.append(get_string(m_cur_index_));         // 加入记录数
      data.append(get_string(m_cur_size_));          // 加入页大小
      for (uint32_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录大小
        data.append(get_string(16 + m_value_[i].size()));
      }
      for (uint32_t i = 0; i < m_cur_index_; i++) {
        data.append(m_key_[i]);  // 加入记录数据
        data.append(m_value_[i]);
      }
      return data;
    }
    Page::Page(std::string &str) {
      std::string::size_type pos;
      std::string::size_type size = str.size();
      // 获取页大小与最大记录数
      int start = 0;
      pos = str.find('\0', start);
      m_cur_index_ = std::stoi(str.substr(start, pos - start));
      start = pos + 1;
      pos = str.find('\0', start);
      m_cur_size_ = std::stoi(str.substr(start, pos - start));
      start = pos + 1;
      // 获取各个记录长度
      std::vector<int> record_lengths(m_cur_index_);
      for (uint32_t i = 0; i < m_cur_index_; i++) {
        pos = str.find('\0', start);
        record_lengths[i] = std::stoi(str.substr(start, pos - start));
        start = pos + 1;
      }
      // 获取各个记录内容
      int record_num = record_lengths.size();
      for (int i = 0; i < record_num; i++) {
        m_key_[i] = str.substr(start, 16);
        m_value_[i] = str.substr(start + 16, record_lengths[i] - 16);
        start += record_lengths[i];
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    这时候编码解码比之前稍微简单一点(去掉了get_size函数),但将长度编码进字符串的方式还是觉得有点低效。后面一想,为什么不直接把大字符串视作一个整数数组,对数组元素赋值就可以了,也就是说 123没必要转换成’1’ ‘2’ '3’后存入数组(3个字节),而是直接对一个short类型(2字节)的数赋值。 这样就不再需要编码额外的结束字符作为标记了。

    字符数组的头部存储各种元数据,尾部存储实际key-value数据。

    /*
    u16存储
    记录数
    页大小
    各记录大小
    char存储
    各个记录(kv)
    */
    std::string Page::to_string() {
      std::string data;
      data.resize(kAllocSize);
      uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t);         // 从该偏移开始存储实际的数据
      char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针
    
      u16_pointer[0] = m_cur_index_;
      u16_pointer[1] = m_cur_size_;
      uint16_t offset = data_start;
      for (uint16_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录起始地址
        u16_pointer[i + 2] = offset;
        offset += 16 + m_value_[i].size();
      }
      u16_pointer[m_cur_index_ + 2] = offset;  // 实际数据最终偏移
      char *p = char_pointer + data_start;
      for (uint16_t i = 0; i < m_cur_index_; i++) {
        memcpy(p, m_key_[i].c_str(), 16);
        memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
        p += 16 + m_value_[i].size();
      }
      return data;
    }
    Page::Page(std::string &data) {
      char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针
      m_cur_index_ = u16_pointer[0];
      m_cur_size_ = u16_pointer[1];
      uint16_t offset, length;
      for (uint16_t i = 0; i < m_cur_index_; i++) {
        offset = u16_pointer[i + 2];
        length = u16_pointer[i + 3] - u16_pointer[i + 2];
        m_key_[i] = data.substr(offset, 16);
        m_value_[i] = data.substr(offset + 16, length - 16);
      }
      m_is_dirty_ = false;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    后面发现leveldb中用了以下这种编码形式
    详解varint编码原理

    #pragma pack使用错误

    本地缓存总共8G空间,有5G是存储key的元数据。其结构体定义如下:

    using page_id_t = uint16_t;
    using index_t = uint16_t;
    
    struct data_info_t {  // 数据信息
      page_id_t page_id;  // 页号
      index_t index;      // 索引
    };
    
    /* One slot stores the key and the meta info of the value which
       describles the remote addr, size, remote-key on remote end. */
    struct hash_map_slot {
      char key[16];
      data_info_t info;
      hash_map_slot *next;
    };
    
    #pragma pack(4)
    struct hash_map_slot_test {
      char key[16];
      data_info_t info;
      hash_map_slot *next;
    };
    // 没有#pragma pack()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    可以看到struct data_info_t占4字节,但struct hash_map_slot因为对齐的原因占32字节。自然而然的,为了节省空间,可以强制结构体4字节对齐,这样就能节省4字节的空间,也就节省了1/8的空间。但由于不熟悉#pragma pack,#pragma pack(4)并没有以#pragma pack()结束。然后一运行程序就段错误,gdb调试时bt显示调用栈,f3时传递参数为32位,f2突然截断,参数变成了16位。我觉得这个错误过于诡异,因为就修改了字节对齐,故到比赛结束我都没再用#pragma pack,一般也不建议使用#pragma pack。
    示例程序:

    using page_id_t = uint16_t;
    using index_t = uint16_t;
    
    struct data_info_t {  // 数据信息
      page_id_t page_id;  // 页号
      index_t index;      // 索引
    };
    
    /* One slot stores the key and the meta info of the value which
       describles the remote addr, size, remote-key on remote end. */
    struct hash_map_slot {
      char key[16];
      data_info_t info;
      hash_map_slot *next;
    };
    
    #pragma pack(4)
    struct hash_map_slot_test {
      char key[16];
      data_info_t info;
      hash_map_slot_test *next;
    };
    #pragma pack()
    
    int main() {
      cout << sizeof(hash_map_slot) << endl;
      cout << sizeof(hash_map_slot_test) << endl;
      hash_map_slot slot1;
      hash_map_slot_test slot2;
    
      printf("hash_map_slot layout:\n%p  \n%p  \n%p\n", &(slot1.key), &(slot1.info), &(slot1.next));
      printf("hash_map_slot test layout:\n%p  \n%p  \n%p\n", &(slot2.key), &(slot2.info), &(slot2.next));
    }
    
    /*
    输出:
    32
    28
    hash_map_slot layout:
    0x7ffef416d1f0  
    0x7ffef416d200  
    0x7ffef416d208
    hash_map_slot test layout:
    0x7ffef416d1d0  
    0x7ffef416d1e0  
    0x7ffef416d1e4
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    相关博客:
    C/C++中结构体内存对齐(边界对齐),#pragma pack设置
    关于#pragma pack(n)引发的一系列问题

    右值引用本身是左值

    1: void func(Test&& t);
    2: void func(Test& t);
    
    Test t1
    Test&& t2 = std::move(t1);
    func(t2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    对于右值引用我一直有一个误区,认为右值引用是右值,例如以上代码片段,实参类型为Test&&,很容易认为此时调用的函数为第二个,但实际上此时调用的却是第一个。这是因为右值引用本身是左值,更为具体来说,右值引用类型既可以被当作左值也可以被当作右值,判断的标准是,如果它有名字,那就是左值,否则就是右值

    示例程序:

    #include 
    using namespace std;
    
    class Test {
     public:
      Test() = default;
      Test(const Test &test) : str(test.str) { printf("enter copy construction\n"); }
      Test(Test &&test) : str(std::move(test.str)) { printf("enter move construction\n"); }
    
      Test &operator=(const Test &test) {
        str = test.str;
        printf("enter copy assign\n");
        return *this;
      }
      Test &operator=(Test &&test) {
        str = std::move(test.str);
        printf("enter move assign\n");
        return *this;
      }
    
     private:
      string str;
    };
    
    int main() {
      Test p1, p2, p3;
      Test &&p4 = std::move(p3);
    
      Test p5(p1);             // 调用复制构造
      Test p6(std::move(p2));  // 调用移动构造
      Test p7(p4);             // 调用复制构造
    
      Test t1, t2, t3, t4;
      Test &&t5 = std::move(t4);
      t1 = t2;             // 调用复制赋值
      t1 = std::move(t3);  // 调用移动赋值
      t1 = t5;             // 调用复制赋值(误区)
    }
    /*
    输出:
    enter copy construction
    enter move construction
    enter copy construction
    enter copy assign
    enter move assign
    enter copy assign
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    实际上使用vscode将鼠标点到函数调用的地方就能看到所调用的函数
    在这里插入图片描述
    在这里插入图片描述
    注意move后不应该再使用变量值
    我有一次错误就在于调用move函数后仍然使用对象的size方法,导致未定义的行为。

    template<class T>
    void swap(T& a, T& b)
    {
      T tmp(std::move(a));
      a = std::move(b);
      b = std::move(tmp);
    }
    
    X a, b;
    swap(a, b);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    参考博客:
    详解C++右值引用

    其余优化

    读者互斥锁
    当读请求未命中时,程序需读取远端的字符串数据,并将其解码成string数组(Page类),插入本地缓存后再读取value;该操作耗时较长,且没必要持有锁;但如果多个请求同时请求该远端页,如果让每一个请求都读取远端页,既浪费IO资源,也没啥实际用处;故增加一个读者互斥锁,保证每一页只有一个读者正在请求远端页,其余请求同一远端页的读者会阻塞互斥锁前;待读取远端页的读者读取页数据完成并将该页插入本地缓存中,其余读者发现本地缓存已存在该页,就不会重复读取远端页了。
    相关代码:

      if (m_data_map_[info.page_id] == nullptr) {  // 数据在远端内存
        m_mutex_.unlock();
        m_same_reader_mutex_[info.page_id].lock();   // 读者互斥
        if (m_data_map_[info.page_id] == nullptr) {  // 双重检查
          auto remote_info = m_addr_map_[info.page_id];
          std::string &&page_data = std::string(remote_info.size, '0');
          m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
          auto new_page = std::make_shared<Page>(page_data);  // 构建缓存页
          m_mutex_.lock();
          m_data_map_[info.page_id] = new_page;
          m_cache_size_++;
          m_lru_list_.insert(info.page_id);
          need_update_lru = false;
          m_addr_map_.erase(info.page_id);    // 在远端地址映射中删除该项
          m_addr_list_.emplace(remote_info);  // 将该页加入地址列表
          m_mutex_.unlock();
          // 尝试唤醒后台进程
          if (m_cache_size_ > kPageThreshold) {
            m_cv_.notify_one();
          }
        }
        m_same_reader_mutex_[info.page_id].unlock();
        m_mutex_.lock();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    时不时用sizeof看看占用空间,增加一个读者互斥锁相当于每页多了40字节,占比不大。

    int main() {
      cout << "mutex:" << sizeof(std::mutex) << endl;
      cout << "shared_ptr:" << sizeof(std::shared_ptr<int>) << endl;
    }
    /*
    输出:
    mutex:40
    shared_ptr:16
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    LRU优化
    不论是读操作还是写操作,末尾都需要更新LRU列表;为了减小冲突,LRU采用独立的锁,不在m_mutex_锁内更新;当前页(新申请的页)并不加入LRU列表,也不会被淘汰,待当前页写满后再插入到LRU列表中;上次LRU更新的页本次也不再更新(已在队首)。

      if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
        m_lru_list_.update(info.page_id);
        m_last_update_id_ = info.page_id;
      }
    
    • 1
    • 2
    • 3
    • 4

    map改成数组 vector改数组

    由于最大页号确定,map<page_id,page>改成了page[max_page_id];
    由于页最大记录数确定,std::vector<std::string> m_value_变成了std::string m_value_[kMaxIndex]; 
    通过使用固定的数据结构,减少了扩容带来的开销。
    
    • 1
    • 2
    • 3

    程序说明

    程序正确性预设
    1:运行中页号不得超过kBitmapSize,页中记录数不得超过kMaxIndex,页导出字符串长度不得超过kAllocSize
    2:程序经历连续的删除操作后才进入读写操作,使得rebuild最大化。在删除之前远端内存足容纳所有数据
    3:即使每次插入时会检查当前页是否已满,但仍然无法阻止程序通过更新value值来增大页大小,所以页数据大小大于kAllocSize便会出现问题
    4:kAllocSize小于65536,页成员就可以使用uint16存储,大于则需要使用uint32存储
    5:mutex之外的操作耗时极短,可以在rebuild处理前完成

    1: debug时系统高效充分打印所需的信息
    2:架构越简洁。越不容易出错
    3: 可编写简单的测试用例检验模块的正确性
    4: 利用sizeof和system_clock简单估计内存占用与执行速度
    5  右值引用变量在用于表达式时是左值,move后不应该再使用变量值
    int &&x = 1;
    f(x);             // 调用 f(int& x)
    f(std::move(x));  // 调用 f(int&& x)
    6 编写代码时明确锁所维持的不变量是什么,而不是仅仅确保操作的原子性
    m_mutex_:确保保护成员操作的原子性,并保证页数据仅存在本地(m_data_map_)与远端(m_addr_map_)中的一个位置
    m_remote_read_lock_:淘汰时在数据真正写入前阻塞对应页的请求
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    相关代码

    Engine

    // kv_engine.h
    namespace kv {
    
    /* Encryption algorithm competitor can choose. */
    enum aes_algorithm { CTR = 0, CBC, CBC_CS1, CBC_CS2, CBC_CS3, CFB, OFB };
    
    /* Algorithm relate message. */
    typedef struct crypto_message_t {
      aes_algorithm algo;
      Ipp8u *key;
      Ipp32u key_len;
      Ipp8u *counter;
      Ipp32u counter_len;
      Ipp8u *piv;
      Ipp32u piv_len;
      Ipp32u blk_size;
      Ipp32u counter_bit;
    
      IppsAESSpec *ctx;
    } crypto_message_t;
    
    /* Abstract base engine */
    class Engine {
     public:
      virtual ~Engine(){};
    
      virtual bool start(const std::string addr, const std::string port) = 0;
    
      virtual void stop() = 0;
    
      virtual bool alive() = 0;
    };
    
    class LocalEngineEntity;  // 前置声明
    /* Local-side engine */
    class LocalEngine : public Engine {
     public:
      ~LocalEngine();
    
      bool start(const std::string addr, const std::string port) override;
    
      void stop() override;
    
      bool alive() override;
    
      /* Init aes context message. */
      bool set_aes();
    
      bool encrypted(const std::string &value, std::string &encrypt_value);
    
      /* Evaluation problem will call this function. */
      crypto_message_t *get_aes() { return &m_aes_; }
    
      bool write(const std::string &key, const std::string &value, bool use_aes = false);
    
      bool read(const std::string &key, std::string &value);
      /** The delete interface */
      bool deleteK(const std::string &key);
    
      /** Rebuild the hash index to recycle the unsed memory */
      void rebuild_index();
    
     private:
      // static inline int get_index(const std::string &key) { return std::hash()(key) & (kSharedNumber - 1); }
      static inline int get_index(const std::string &key) { return CityHash16(key.c_str()) & (kSharedNumber - 1); }
      kv::ConnectionManager *m_rdma_conn_;
      // /* NOTE: should use some concurrent data structure, and also should take the
      //  * extra memory overhead into consideration */
      // RDMAMemPool *m_rdma_mem_pool_;
    
      crypto_message_t m_aes_;
    
      LocalEngineEntity *m_entity_[kSharedNumber];  // 执行请求的实体
    };
    
    /* Remote-side engine */
    class RemoteEngine : public Engine {
     public:
      struct WorkerInfo {
        CmdMsgBlock *cmd_msg;
        CmdMsgRespBlock *cmd_resp_msg;
        struct ibv_mr *msg_mr;
        struct ibv_mr *resp_mr;
        rdma_cm_id *cm_id;
        struct ibv_cq *cq;
      };
    
      ~RemoteEngine(){};
    
      bool start(const std::string addr, const std::string port) override;
      void stop() override;
      bool alive() override;
    
     private:
      void handle_connection();
    
      int create_connection(struct rdma_cm_id *cm_id);
    
      struct ibv_mr *rdma_register_memory(void *ptr, uint64_t size);
    
      int remote_write(WorkerInfo *work_info, uint64_t local_addr, uint32_t lkey, uint32_t length, uint64_t remote_addr, uint32_t rkey);
    
      int allocate_and_register_memory(uint64_t &addr, uint32_t &rkey, uint64_t size);
    
      void worker(WorkerInfo *work_info, uint32_t num);
    
      struct rdma_event_channel *m_cm_channel_;
      struct rdma_cm_id *m_listen_id_;
      struct ibv_pd *m_pd_;
      struct ibv_context *m_context_;
      bool m_stop_;
      std::thread *m_conn_handler_;
      WorkerInfo **m_worker_info_;
      uint32_t m_worker_num_;
      std::thread **m_worker_threads_;
    };
    
    }  // namespace kv
    
    
    // local_engine.cc
    #include "assert.h"
    #include "atomic"
    #include "kv_engine.h"
    #include "local_engine_entity.h"
    #include 
    
    namespace kv {
    
    /**
     * @description: start local engine service
     * @param {string} addr    the address string of RemoteEngine to connect
     * @param {string} port   the port of RemoteEngine to connect
     * @return {bool} true for success
     */
    bool LocalEngine::start(const std::string addr, const std::string port) {
      m_rdma_conn_ = new ConnectionManager();
      if (m_rdma_conn_ == nullptr) return false;
      if (m_rdma_conn_->init(addr, port, 4, 72)) return false;
      for (int i = 0; i < kSharedNumber; i++) {
        m_entity_[i] = new LocalEngineEntity(this, m_rdma_conn_);
      }
      printf("LocalEngine::start finsh\n");
      auto watcher = std::thread([]() {
        sleep(60 * 9);
        printf("timeout\n");
        fflush(stdout);
        abort();
      });
      watcher.detach();
      return true;
    }
    
    /**
     * @description: stop local engine service
     * @return {void}
     */
    void LocalEngine::stop() {
      for (int i = 0; i < kSharedNumber; i++) {
        delete m_entity_[i];
      }
      delete m_rdma_conn_;
      printf("LocalEngine::stop finsh\n");
    };
    
    /**
     * @description: get engine alive state
     * @return {bool}  true for alive
     */
    bool LocalEngine::alive() { return true; }
    
    /**
     * @description: provide message about the aes_ecb mode
     * @return {bool}  true for success
     */
    bool LocalEngine::set_aes() {
      // Current algorithm is not supported, just for demonstration.
      m_aes_.algo = CBC;
      m_aes_.key_len = 16;
      m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
      if (m_aes_.key == nullptr) return false;
      m_aes_.blk_size = 16;
      m_aes_.piv_len = 16;
      m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
      if (m_aes_.piv == nullptr) return false;
    
      int ctxSize;               // AES context size
      ippsAESGetSize(&ctxSize);  // evaluating AES context size
      // allocatting memory for AES context
      m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
      // AES context initialization
      ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
      return true;
    }
    // 参考pdf实现简单加密算法
    bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
      Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
      // encrypting plaintext
      ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
      std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
      encrypt_value = std::move(tmp);
      return true;
    }
    
    bool LocalEngine::write(const std::string &key, const std::string &value, bool use_aes) {
      int index = get_index(key);
      return m_entity_[index]->write(key, value, use_aes);
    }
    /**
     * @description: read value from engine via key
     * @param {string} key
     * @param {string} &value
     * @return {bool}  true for success
     */
    
    bool LocalEngine::read(const std::string &key, std::string &value) {
      int index = get_index(key);
      return m_entity_[index]->read(key, value);
    }
    
    bool LocalEngine::deleteK(const std::string &key) {
      int index = get_index(key);
      return m_entity_[index]->deleteK(key);
    }
    /* When we delete a number of KV pairs, we should rebuild the index to
     * reallocate remote addr for each KV to recycle fragments. This will block
     * front request processing. This solution should be optimized. */
    void LocalEngine::rebuild_index() {
      /** rebuild all the index to recycle the fragment */
      /** step-1: block the database and not allowe any writes
       *  step-2: transverse the index to read each value
       *  step-3: read the value from the remote and write it to a new remote addr
       *  step-4: free all old addr
       */
      printf("*********rebuild_index*********\n");
      for (int i = 0; i < kSharedNumber; i++) {
        m_entity_[i]->rebuild_index();
      }
    }
    
    LocalEngine::~LocalEngine() {
      if (nullptr != m_aes_.key) delete[] m_aes_.key;
      if (nullptr != m_aes_.counter) delete[] m_aes_.counter;
      if (nullptr != m_aes_.piv) delete[] m_aes_.piv;
      if (nullptr != m_aes_.ctx) delete (Ipp8u *)m_aes_.ctx;
      m_aes_.key = nullptr;
      m_aes_.counter = nullptr;
      m_aes_.piv = nullptr;
      m_aes_.ctx = nullptr;
    }
    }  // namespace kv
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252

    type

    // type.h
    namespace kv {
    using page_id_t = uint16_t;
    using index_t = uint16_t;
    struct remote_info_t {  // 远端内存信息
      uint64_t remote_addr;
      uint32_t rkey;
      uint32_t size;
    };
    
    struct data_info_t {  // 数据信息
      page_id_t page_id;  // 页号
      index_t index;      // 索引
    };
    
    /* One slot stores the key and the meta info of the value which
       describles the remote addr, size, remote-key on remote end. */
    struct hash_map_slot {
      char key[16];
      data_info_t info;
      hash_map_slot *next;
    };
    const page_id_t kNullPage = 0;
    const data_info_t kNullInfo = {0xffff, 0xffff};  // 无效的信息
    
    const int kSharedNumber = 1 << 6;                                             // 缓存实体数目
    const uint32_t kBucketNum = 1 << 21;                                          // hash中bucket数
    const uint32_t kSlotSize = (1 << 22) * 1.2 * 32 / kSharedNumber;              // hash中slot数
    const uint64_t kMaxDataSize = (uint64_t)1 << 36;                              // 测试数据大小
    const uint32_t kPageSize = 60 * 1 << 10;                                      // 页容量
    const uint32_t kMaxValueSize = 1024;                                          // value最大值
    const uint32_t kMinValueSize = 80;                                            // value最小值
    const uint32_t kMaxIndex = kPageSize / 96;                                    // 页中最大记录数,96=80+16
    const uint64_t kBitmapSize = 1.4 * kMaxDataSize / kSharedNumber / kPageSize;  // 运行过程中最大页号
    const uint32_t kAllocSize = 1 << 16;                                          // 分配的远端内存大小
    const uint32_t kPageThreshold = 1 << 8;                                       // 本地存储页的阈值
    const uint32_t kEvictNumber = kPageThreshold * 0.05;                          // 一次淘汰的页数
    const uint32_t kPrintFreq = 1 << 24;                                          // 输出信息频率
    const uint64_t kRebuildThreshold = (uint64_t)(1 << 30) * 36 / kSharedNumber;  // 重建阈值
    const uint32_t kEralyRegisterNumber = (uint64_t)(1 << 30) * 30 / kSharedNumber / kAllocSize;
    
    class hash_map_t {
     public:
      /* Initialize all the pointers to nullptr. */
      hash_map_t();
      // index为key对应hash值
      /* Find the corresponding key. */
      hash_map_slot *find(const std::string &key, int index);
    
      /* Insert into the head of the list. */
      void insert(const std::string &key, const data_info_t &info, int index);
    
      data_info_t remove(const std::string &key, int index);
      // 只在rebuild时用到
      void update(const std::string &key, const data_info_t &info);
    
     private:
      hash_map_slot *m_bucket_[kBucketNum];
      hash_map_slot *m_slot_head_;                  // 可用的slot链表头部,用于连接被删除元素的slot
      hash_map_slot m_hash_slot_array_[kSlotSize];  // 哈希数组
      uint32_t m_slot_cnt_;
    };
    
    class Page {
     public:
      Page(std::string &data);  // 以字符串填充页
    
      std::string to_string();  // 页数据转换成字符串
      
      void to_string(char *ptr);
      Page() : m_cur_size_{0}, m_cur_index_{0}, m_is_dirty_{true} {};
    
      bool is_full() { return m_cur_size_ > kPageSize; }  // 页是否满
    
      uint16_t page_size() { return m_cur_size_; }  // 返回当前页大小
    
      index_t record_number() { return m_cur_index_; }  // 返回记录数
    
      index_t insert(const std::string &key, const std::string &value) {
        m_cur_size_ += 16 + value.size();  // 加上key的长度
        m_key_[m_cur_index_] = key;
        m_value_[m_cur_index_] = value;
        return m_cur_index_++;  // 返回当前索引并加一
      }
    
      index_t insert(const std::string &key, std::string &&value) {
        m_cur_size_ += 16 + value.size();  // 加上key的长度
        m_key_[m_cur_index_] = key;
        m_value_[m_cur_index_] = std::move(value);
        return m_cur_index_++;  // 返回当前索引并加一
      }
    
      void update(index_t index, const std::string &value) {
        m_is_dirty_ = true;
        m_cur_size_ -= m_value_[index].size();
        m_cur_size_ += value.size();  // 更新页当前大小
        m_value_[index] = value;
      }
    
      void update(index_t index, std::string &&value) {
        m_is_dirty_ = true;
        m_cur_size_ -= m_value_[index].size();
        m_cur_size_ += value.size();  // 更新页当前大小
        m_value_[index] = std::move(value);
      }
    
      std::string read_value(index_t index) { return m_value_[index]; }
    
      std::string read_key(index_t index) { return m_key_[index]; }
    
      bool is_dirty() { return m_is_dirty_; }
    
     private:
      std::string m_key_[kMaxIndex];
      std::string m_value_[kMaxIndex];
      uint16_t m_cur_size_;  // 当前页数据大小
      index_t m_cur_index_;  // 当前索引
      bool m_is_dirty_;
    
      // std::vector m_value_;  // 以vector存储key值,方便rebuild时更新元数据
      // std::vector m_key_;    // 以vector存储记录
    };
    
    class LRUList {
     public:
      LRUList() = default;
      void insert(page_id_t hit_id) {
        m_mutex_.lock();
        m_list_.emplace_front(hit_id);
        m_speed_map_.insert({hit_id, m_list_.begin()});
        m_mutex_.unlock();
      }
      void update(page_id_t hit_id) {
        m_mutex_.lock();
        auto iter = m_speed_map_[hit_id];
        if (iter != m_list_.begin()) {  // 将该页移至队首
          m_list_.erase(iter);
          m_list_.emplace_front(hit_id);
          m_speed_map_[hit_id] = m_list_.begin();
        }
        m_mutex_.unlock();
      }
      page_id_t evict() {
        m_mutex_.lock();
        page_id_t vicitm_page_id = m_list_.back();
        m_list_.pop_back();
        m_speed_map_.erase(vicitm_page_id);
        m_mutex_.unlock();
        return vicitm_page_id;
      }
      std::vector<page_id_t> clear() {  // 清空LRU列表所有元素
        m_mutex_.lock();
        std::vector<page_id_t> res(m_list_.begin(), m_list_.end());
        m_list_.clear();
        m_speed_map_.clear();
        m_mutex_.unlock();
        return res;
      }
      int size() { return m_list_.size(); }
    
     private:
      std::mutex m_mutex_;
      std::list<page_id_t> m_list_;                                                // LRU列表
      std::unordered_map<page_id_t, std::list<page_id_t>::iterator> m_speed_map_;  // 加速LRU列表访问
    };
    
    }  // namespace kv
    
    // type.c
    #include 
    #include "type.h"
    namespace kv {
    hash_map_t::hash_map_t() {
      memset(m_bucket_, 0, sizeof(m_bucket_));
      m_slot_head_ = &m_hash_slot_array_[0];
      m_slot_head_->next = nullptr;
      m_slot_cnt_ = 1;
    }
    
    /* Find the corresponding key. */
    hash_map_slot *hash_map_t::find(const std::string &key, int index) {
      hash_map_slot *cur = m_bucket_[index];
      if (cur == nullptr) {
        return nullptr;
      }
      while (cur) {
        if (memcmp(cur->key, key.c_str(), 16) == 0) {
          return cur;
        }
        cur = cur->next;
      }
      return nullptr;
    }
    
    /* Insert into the head of the list. */
    void hash_map_t::insert(const std::string &key, const data_info_t &info, int index) {
      hash_map_slot *new_slot;
      // 优先使用被删除数据的slot
      if (m_slot_head_->next == nullptr) {
        new_slot = &m_hash_slot_array_[m_slot_cnt_++];
        assert(m_slot_cnt_ < kSlotSize);
      } else {
        new_slot = m_slot_head_->next;
        m_slot_head_->next = new_slot->next;
      }
      new_slot->next = nullptr;
    
      memcpy(new_slot->key, key.c_str(), 16);
      new_slot->info = info;
      if (!m_bucket_[index]) {
        m_bucket_[index] = new_slot;
      } else {
        /* Insert into the head. */
        hash_map_slot *tmp = m_bucket_[index];
        m_bucket_[index] = new_slot;
        new_slot->next = tmp;
      }
    }
    
    // 只在rebuild时调用
    void hash_map_t::update(const std::string &key, const data_info_t &info) {
      int index = std::hash<std::string>()(key) & (kBucketNum - 1);
      hash_map_slot *cur = m_bucket_[index];
      while (cur) {
        if (memcmp(cur->key, key.c_str(), 16) == 0) {
          cur->info = info;
          return;
        }
        cur = cur->next;
      }
    }
    
    data_info_t hash_map_t::remove(const std::string &key, int index) {
      hash_map_slot *cur = m_bucket_[index];
      hash_map_slot *parent = nullptr;
      if (cur == nullptr) {
        return kNullInfo;
      }
      while (cur) {
        if (memcmp(cur->key, key.c_str(), 16) == 0) {
          // 在bucket中删除该slot
          if (parent == nullptr) {
            m_bucket_[index] = cur->next;
          } else {
            parent->next = cur->next;
          }
          // 加入后备链表
          cur->next = m_slot_head_->next;
          m_slot_head_->next = cur;
          return cur->info;
        }
        parent = cur;
        cur = cur->next;
      }
      return kNullInfo;
    }
    /*
    u16存储
    记录数
    页大小
    各记录大小
    char存储
    各个记录(kv)
    */
    std::string Page::to_string() {
      std::string data;
      data.resize(kAllocSize);
      uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t);         // 从该偏移开始存储实际的数据
      char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针
    
      u16_pointer[0] = m_cur_index_;
      u16_pointer[1] = m_cur_size_;
      uint16_t offset = data_start;
      for (uint16_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录起始地址
        u16_pointer[i + 2] = offset;
        offset += 16 + m_value_[i].size();
      }
      u16_pointer[m_cur_index_ + 2] = offset;  // 实际数据最终偏移
      char *p = char_pointer + data_start;
      for (uint16_t i = 0; i < m_cur_index_; i++) {
        memcpy(p, m_key_[i].c_str(), 16);
        memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
        p += 16 + m_value_[i].size();
      }
      return data;
    }
    void Page::to_string(char *ptr) {
      uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t);  // 从该偏移开始存储实际的数据
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(ptr);    // 解释为u32指针
    
      u16_pointer[0] = m_cur_index_;
      u16_pointer[1] = m_cur_size_;
      uint16_t offset = data_start;
      for (uint16_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录起始地址
        u16_pointer[i + 2] = offset;
        offset += 16 + m_value_[i].size();
      }
      u16_pointer[m_cur_index_ + 2] = offset;  // 实际数据最终偏移
      char *p = ptr + data_start;
      for (uint16_t i = 0; i < m_cur_index_; i++) {
        memcpy(p, m_key_[i].c_str(), 16);
        memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
        p += 16 + m_value_[i].size();
      }
    }
    Page::Page(std::string &data) {
      char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针
      m_cur_index_ = u16_pointer[0];
      m_cur_size_ = u16_pointer[1];
      uint16_t offset, length;
      for (uint16_t i = 0; i < m_cur_index_; i++) {
        offset = u16_pointer[i + 2];
        length = u16_pointer[i + 3] - u16_pointer[i + 2];
        m_key_[i] = data.substr(offset, 16);
        m_value_[i] = data.substr(offset + 16, length - 16);
      }
      m_is_dirty_ = false;
    }
    
    }  // namespace kv
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322

    local_engine_entity

    // local_engine_entity.h
    #pragma once
    #include 
    #include 
    #include "type.h"
    #include "rdma_conn_manager.h"
    #include "rdma_mem_pool.h"
    namespace kv {
    
    class LocalEngine;  // 前置声明
    class LocalEngineEntity {
     public:
      LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn);
      ~LocalEngineEntity();
      bool write(const std::string &key, const std::string &value, bool use_aes = false);
      bool read(const std::string &key, std::string &value);
      bool deleteK(const std::string &key);
      void rebuild_index();
      std::vector<uint64_t> print_memory();  // 输出内存占用信息
    
     private:
      void start_thread();                  // 启动后台线程
      remote_info_t request_signle_info();  // 请求单个远端地址
      void register_remote_memory();        // 提前注册远端内存
    
      hash_map_t m_page_map_;                                    // 数据与元数据映射
      std::shared_ptr<Page> m_data_map_[kBitmapSize];            // 页号与页映射(数据存于本地)
      uint32_t m_cache_size_;                                    // 本地缓存大小
      LRUList m_lru_list_;                                       // LRU列表
      std::unordered_map<page_id_t, remote_info_t> m_addr_map_;  // 页号与远端信息映射(数据存于远端)
      uint16_t m_max_index_[kBitmapSize];                        // 各个远端页最大索引
      std::queue<remote_info_t> m_addr_list_;                    // 未使用的远端内存
      std::mutex m_mutex_;                                       // 保护以上成员
    
      // 注意最大页号不要大于kBitmapSize!
      std::bitset<kMaxIndex> m_bitmap_[kBitmapSize];  // 删除为1,正常为0
      std::mutex m_same_reader_mutex_[kBitmapSize];   // 提供同一页远程读互斥访问
    
      page_id_t m_cur_page_id_;           // 当前使用页号,不存在于LRU列表
      std::shared_ptr<Page> m_cur_page_;  // 当前使用页
    
      page_id_t m_vicitm_id_;                // 淘汰页号
      std::shared_ptr<Page> m_vicitm_page_;  // 淘汰页号
      page_id_t m_last_update_id_;           // 上次lru更新的页号
    
      bool m_alive_;                  // 控制后台进程生命周期
      std::condition_variable m_cv_;  // 用于唤醒后台进程
      std::thread *m_backup_thread_;  // 后台进程
      std::mutex m_useless_mutex_;    // 只是方便调用API,没啥实际意义
    
      LocalEngine *m_engine_;
      ConnectionManager *m_rdma_conn_;
      RDMAMemPool *m_rdma_mem_pool_;
    
      // 统计数据
      uint64_t m_alloc_memory_{0};  // 申请远端内存大小
    
      bool m_rebuild_allow_{true};
      bool m_delete_envent_{false};
      bool m_rw_envent_after_delete_{false};
    };
    }  // namespace kv
    
    // local_engine_entity.cc
    #include 
    #include 
    #include 
    #include "local_engine_entity.h"
    #include "kv_engine.h"
    
    namespace kv {
    LocalEngineEntity::LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn) : m_engine_(engine), m_rdma_conn_(rdma_conn) {
      m_rdma_mem_pool_ = new RDMAMemPool(m_rdma_conn_);
      if (m_rdma_mem_pool_ == nullptr) {
        printf("alloc rdma_mem_pool failed");
      }
      auto page = std::make_shared<Page>();  // 申请第一页
      m_cur_page_id_ = 1;
      m_cur_page_ = page;
    
      m_vicitm_id_ = kNullPage;
      m_vicitm_page_ = nullptr;
      m_last_update_id_ = kNullPage;
    
      m_data_map_[m_cur_page_id_] = page;
      m_cache_size_ = 1;
    
      m_alive_ = true;
      // 提前申请远端内存
      auto requester = std::thread([&]() { register_remote_memory(); });
      requester.detach();
      // 启动后台线程
      start_thread();
    }
    // 申请kEralyRegisterNumber个kAllocSize大小的远端内存
    void LocalEngineEntity::register_remote_memory() {
      std::queue<remote_info_t> list;
      remote_info_t remote_info;
      remote_info.size = kAllocSize;
      for (uint32_t i = 0; i < kEralyRegisterNumber; i++) {
        m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
        list.emplace(remote_info);
      }
      m_alloc_memory_ += kAllocSize * kEralyRegisterNumber;
    
      m_mutex_.lock();
      while (!m_addr_list_.empty()) {
        list.emplace(m_addr_list_.front());
        m_addr_list_.pop();
      }
      m_addr_list_ = std::move(list);
      m_mutex_.unlock();
    }
    
    remote_info_t LocalEngineEntity::request_signle_info() {
      remote_info_t remote_info;
      // 先从后备地址列表选择远端地址
      if (!m_addr_list_.empty()) {
        remote_info = m_addr_list_.front();
        m_addr_list_.pop();
        return remote_info;
      }
      // 申请远端内存
      remote_info.size = kAllocSize;
      m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
      m_alloc_memory_ += remote_info.size;
      return remote_info;
    }
    
    void LocalEngineEntity::start_thread() {
      auto backup_func = [&]() {
        uint64_t rebuild_threshold = kRebuildThreshold;
        std::unique_lock<std::mutex> useless_lock(m_useless_mutex_);  // 持有无用锁
        while (m_alive_) {
          while (m_cache_size_ < kPageThreshold && !(m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_)) {  // 若当前大小小于阈值并且不需要重构则休眠
            m_cv_.wait(useless_lock);
            if (!m_alive_) {  // 进程退出,该后台线程也应该退出
              return;
            }
          }
          if (m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_) {
            rebuild_index();
            m_rebuild_allow_ = false;
          }
          if (m_cache_size_ > kPageThreshold) {
            for (uint32_t i = 0; i < kEvictNumber; i++) {
              m_mutex_.lock();
              page_id_t victim_id = m_lru_list_.evict();
              std::shared_ptr<Page> victim_page = m_data_map_[victim_id];
              remote_info_t info = request_signle_info();
    
              // 设置淘汰页信息
              m_data_map_[victim_id] = nullptr;
              m_cache_size_--;
              m_addr_map_[victim_id] = info;
              m_max_index_[victim_id] = victim_page->record_number();
              m_mutex_.unlock();
    
              std::string &&vicitm_page_data = victim_page->to_string();  // 记录淘汰页数据,开始写入远端内存
              // 将本地数据写入远端内存
              m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), kAllocSize, info.remote_addr, info.rkey);
            }
          }
        }
      };
      m_backup_thread_ = new std::thread(backup_func);
    }
    LocalEngineEntity::~LocalEngineEntity() {
      m_alive_ = false;
      m_cv_.notify_one();
      m_backup_thread_->join();
      delete m_backup_thread_;
      delete m_rdma_mem_pool_;
    }
    
    bool LocalEngineEntity::write(const std::string &key, const std::string &value, bool use_aes) {
      if (m_rebuild_allow_ && m_delete_envent_) {
        m_rw_envent_after_delete_ = true;
        m_cv_.notify_one();
      }
    
      // 区分加密与非加密
      std::string encrypt_value;
      if (use_aes) {
        m_engine_->encrypted(value, encrypt_value);
      }
      int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
    
      m_mutex_.lock();
      auto slot = m_page_map_.find(key, hash_index);
    
      // 第一次写入或该页正在写入远端或该页正在远端
      if (slot == nullptr || m_data_map_[slot->info.page_id] == nullptr) {
        if (m_cur_page_->is_full()) {          // 该页已满不可用
          m_lru_list_.insert(m_cur_page_id_);  // 直到页满才插入LRU列表
          m_cur_page_id_++;
          m_data_map_[m_cur_page_id_] = std::make_shared<Page>();
          m_cur_page_ = m_data_map_[m_cur_page_id_];
          m_cache_size_++;
          // 尝试唤醒后台进程
          if (m_cache_size_ > kPageThreshold) {
            m_cv_.notify_one();
          }
        }
    
        index_t index;
        if (use_aes) {
          index = m_cur_page_->insert(key, std::move(encrypt_value));
        } else {
          index = m_cur_page_->insert(key, value);
        }
    
        data_info_t info = {m_cur_page_id_, index};
        if (slot == nullptr) {                        // 第一次写入,插入元数据
          m_page_map_.insert(key, info, hash_index);  // 插入key与元数据映射
        } else {
          m_bitmap_[slot->info.page_id].set(slot->info.index, true);  // 将之前数据标记为删除
          slot->info = info;                                          // 更新元数据信息
        }
        m_mutex_.unlock();
      } else {  // 该页在本地且未被淘汰,更新页数据
        data_info_t info = slot->info;
        if (use_aes) {
          m_data_map_[info.page_id]->update(info.index, std::move(encrypt_value));
        } else {
          m_data_map_[info.page_id]->update(info.index, value);
        }
        m_mutex_.unlock();
        // 更新LRU列表
        if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_) {
          m_lru_list_.update(info.page_id);
          m_last_update_id_ = info.page_id;
        }
      }
      return true;
    }
    bool LocalEngineEntity::read(const std::string &key, std::string &value) {
      if (m_rebuild_allow_ && m_delete_envent_) {
        m_rw_envent_after_delete_ = true;
        m_cv_.notify_one();
      }
      int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
      bool need_update_lru = true;
      m_mutex_.lock();
    
      auto slot = m_page_map_.find(key, hash_index);
      if (slot == nullptr) {  // 元数据不存在
        m_mutex_.unlock();
        return false;
      }
      data_info_t info = slot->info;
      if (info.page_id == m_vicitm_id_) {
        value = m_vicitm_page_->read_value(info.index);
        m_mutex_.unlock();
        return true;
      }
      if (m_data_map_[info.page_id] == nullptr) {  // 数据在远端内存
        m_mutex_.unlock();
        m_same_reader_mutex_[info.page_id].lock();   // 读者互斥
        if (m_data_map_[info.page_id] == nullptr) {  // 双重检查
          auto remote_info = m_addr_map_[info.page_id];
          std::string &&page_data = std::string(remote_info.size, '0');
          m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
          auto new_page = std::make_shared<Page>(page_data);  // 构建缓存页
          m_mutex_.lock();
          m_data_map_[info.page_id] = new_page;
          m_cache_size_++;
          m_lru_list_.insert(info.page_id);
          need_update_lru = false;
          m_addr_map_.erase(info.page_id);    // 在远端地址映射中删除该项
          m_addr_list_.emplace(remote_info);  // 将该页加入地址列表
          m_mutex_.unlock();
          // 尝试唤醒后台进程
          if (m_cache_size_ > kPageThreshold) {
            m_cv_.notify_one();
          }
        }
        m_same_reader_mutex_[info.page_id].unlock();
        m_mutex_.lock();
      }
      value = m_data_map_[info.page_id]->read_value(info.index);
      m_mutex_.unlock();
      if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
        m_lru_list_.update(info.page_id);
        m_last_update_id_ = info.page_id;
      }
      return true;
    }
    
    bool LocalEngineEntity::deleteK(const std::string &key) {
      m_delete_envent_ = true;
      int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
      m_mutex_.lock();
      data_info_t info = m_page_map_.remove(key, hash_index);  // 删除对应key的元数据
      m_mutex_.unlock();
      m_bitmap_[info.page_id].set(info.index, true);  // 将对应记录标记为删除
      return true;
    }
    
    void LocalEngineEntity::rebuild_index() {
      std::lock_guard<std::mutex> lk(m_mutex_);
      std::string key;
      std::string value;
      data_info_t data_info;
      std::shared_ptr<Page> page;
      std::unordered_map<page_id_t, remote_info_t> tmp_addr_map;  // 暂时存储页号与远端地址映射
      uint32_t new_cache_size = 1;
      std::vector<page_id_t> local_id = m_lru_list_.clear();
      auto new_page = m_cur_page_;
      page_id_t new_page_id = m_cur_page_id_;
      // 处理本地缓存页
      for (auto &page_id : local_id) {
        auto &bitmap = m_bitmap_[page_id];
        if (bitmap.none()) {  // 不存在删除的记录,不进行操作
          m_lru_list_.insert(page_id);
          new_cache_size++;
        } else if (!bitmap.all()) {  //存在有效记录
          auto page = m_data_map_[page_id];
          int record_num = page->record_number();
          for (int i = 0; i < record_num; i++) {
            if (!bitmap.test(i)) {        // 该记录未被删除
              if (new_page->is_full()) {  // 页满,写入本地
                m_data_map_[new_page_id] = new_page;
                new_cache_size++;
                m_lru_list_.insert(new_page_id);
                new_page = std::make_shared<Page>();
                new_page_id++;
              }
              // 读出数据插入新页并更新元数据映射
              key = page->read_key(i);
              data_info.index = new_page->insert(key, page->read_value(i));
              data_info.page_id = new_page_id;
              m_page_map_.update(key, data_info);
            }
          }
          m_data_map_[page_id] = nullptr;  // 删除原先页
        } else {
          m_data_map_[page_id] = nullptr;  // 删除原先页
        }
      }
      page_id_t page_id;
      remote_info_t info;
      char head_data[kMaxIndex * 10];
      char kv_data[2 * kMaxValueSize];
      uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
      uint16_t head_length;
      uint16_t offset, length;
    
      // 处理远端内存页
      for (auto &kv : m_addr_map_) {
        page_id = kv.first;
        info = kv.second;
        auto &bitmap = m_bitmap_[page_id];
        if (bitmap.none()) {  // 不存在删除的记录,不进行操作
          tmp_addr_map.insert({page_id, info});
        } else if (bitmap.all()) {  // 不存在有效记录,将后端地址加入地址列表
          m_addr_list_.emplace(info);
        } else {
          bool avai_info = true;  // 是否将该远端地址加入地址列表
          head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
          // 读取页头部数据
          m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
          int record_num = m_max_index_[page_id];
          for (int i = 0; i < record_num; i++) {
            if (!bitmap.test(i)) {
              if (new_page->is_full()) {
                if (new_cache_size > kPageThreshold) {  // 本地页满,写入远程
                  std::string &&page_data = new_page->to_string();
                  uint32_t len = page_data.length();
                  m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
                  tmp_addr_map.insert({new_page_id, info});  // 暂时记录页号与远程地址映射
                  avai_info = false;
                } else {  // 写入本地缓存
                  m_data_map_[new_page_id] = new_page;
                  new_cache_size++;
                  m_lru_list_.insert(new_page_id);
                }
                new_page = std::make_shared<Page>();
                new_page_id++;
              }
              // 读出数据插入新页并更新元数据映射
              offset = u16_pointer[i + 2];
              length = u16_pointer[i + 3] - u16_pointer[i + 2];
              m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
              key = std::string(kv_data, kv_data + 16);
              value = std::string(kv_data + 16, kv_data + length);
              data_info.index = new_page->insert(key, std::move(value));
              data_info.page_id = new_page_id;
              m_page_map_.update(key, data_info);
            }
          }
          if (avai_info) {
            m_addr_list_.emplace(info);
          }
        }
      }
      m_addr_map_ = std::move(tmp_addr_map);
      m_cur_page_id_ = new_page_id;
      m_cur_page_ = new_page;
      m_vicitm_id_ = kNullPage;
      m_vicitm_page_ = nullptr;
      m_cache_size_ = new_cache_size;
      m_last_update_id_ = kNullPage;
      m_data_map_[new_page_id] = new_page;
    }
    
    std::vector<uint64_t> LocalEngineEntity::print_memory() {
      // 输出一系列统计数据
      uint64_t key_metadata = kSlotSize * 32 + kBucketNum * 8;
      uint64_t page_id_size = m_cur_page_id_;
      uint64_t page_metadata = page_id_size * 48 + kBitmapSize * 1.2 * kMaxIndex;
      uint64_t page_size = m_cache_size_ * (kPageSize);
      return {key_metadata, page_id_size, page_metadata, page_size, m_alloc_memory_};
    }
    }  // namespace kv
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
  • 相关阅读:
    第3章 docker容器管理
    CleanMyPC中文版切换教程(专注于电脑缓存文件清理的工具)
    虹科分享 | 近距离接触最新的3个勒索软件
    selenium 自动化测试——环境搭建
    执行shell脚本插入oracle数据库中文数据乱码
    Review of AI (HITSZ)
    实例分割Yolact边缘端部署 (二) 训练自己的模型-> onnx
    【Linux】zabbix告警执行远程脚本报错Unsupported item key.问题汇总及解决方式
    SpringBoot实现动态数据源配置
    安装Canal1.1.5 并监控mysql8的binlog
  • 原文地址:https://blog.csdn.net/freedom1523646952/article/details/127762537