• folly::ConcurrentSkipList 详解


    SkipList 原理及构造过程

    SkipList 是受多层链表的启发而设计出来的。实际上,最理想的情况是上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(log n)。但是,这种方法在插入数据的时候有很大的问题。新插入一个节点之后,就会打乱上下相邻两层链表上节点个数严格的 2:1 的对应关系。如果要维持这种对应关系,就必须把新插入的节点后面的所有节点(也包括新插入的节点)重新进行调整,这会让时间复杂度重新退化成 O(n)

    SkipList 为了避免这一问题,它不要求上下相邻两层链表之间的节点个数有严格的对应关系,而是为每个节点随机出一个层数。比如,一个节点随机出的层数是 3,那么就把它链入到第 1 层到第 3 层这三层链表中。为了表达清楚,下图展示了如何通过一步步的插入操作从而形成一个 SkipList 的过程(图片源自网络):

    从上面 SkipList 的创建和插入过程可以看出,每一个节点的层数是随机出来的,而且新插入一个节点不会影响其它节点的层数。因此,插入操作只需要修改插入节点前的指针,而不需要对很多节点都进行调整。这就降低了插入操作的复杂度。

    SkipList 优缺点

    缺点:从上述 SkipList 构造的过程中可以发现,它并不处于一种严格的平衡状态,相比平衡二叉树而言,虽然两者的查询时间复杂度理论上都是 O(log n),但是平衡二叉树的查询效率应该更好。

    优点:平衡树的插入和删除操作可能引发子树的调整,逻辑复杂,而 SkipList 的插入和删除只需要修改相邻节点的指针,操作简单又快速。如果插入一个节点引起了树的不平衡,AVL 都是最多只需要 2 次旋转操作,即时间复杂度为 O(1);但是在删除节点引起树的不平衡时,最坏情况下,AVL 需要维护从被删节点到 root 这条路径上所有节点的平衡性,因此需要旋转的量级 O(logn)。

    SkipList 在做范围查询时比平衡二叉树更有优势,因为二叉树需要通过中序遍历获得结果,而 SkipList 最底层是一个有序链表。

    folly::ConcurrentSkipList 源码详解

    多线程环境下的 SkipList,读操作(count, find, skipper)都是 lock free 的,写操作(remove, add, insert)也只是小范围的加了锁。

    Accessor 的读操作接口:

    1. iterator find(const key_type& value) {
    2. return iterator(sl_->find(value));
    3. }
    4. const_iterator find(const key_type& value) const {
    5. return iterator(sl_->find(value));
    6. }
    7. size_type count(const key_type& data) const { return contains(data); }
    8. bool contains(const key_type& data) const { return sl_->find(data); }

    Accessor 的写操作接口:

    1. size_t erase(const key_type& data) { return remove(data); }
    2. std::pairbool> addOrGetData(const key_type& data) {
    3. auto ret = sl_->addOrGetData(data);
    4. return std::make_pair(&ret.first->data(), ret.second);
    5. }
    6. bool add(const key_type& data) { return sl_->addOrGetData(data).second; }
    7. bool remove(const key_type& data) { return sl_->remove(data); }

    如果存的数据不是基础类型,比如是一个 std::pair,则需要实现一个比较函数。如下所示:

    1. struct Less {
    2. const bool operator()(const T& lhs, const T& rhs) {
    3. return lhs.first < rhs.first;
    4. }
    5. };
    6. using List = folly::ConcurrentSkipList;
    7. List list_;

    简单的使用方法:

    1. constexpr int init_height = 5;
    2. typedef ConcurrentSkipList<int> SkipList;
    3. shared_ptr skiplist(SkipList::createInstance(init_height));
    4. {
    5. // Accessor 提供了访问 skip list 的接口,我们不能直接使用 skip list 对象来访问数据
    6. SkipList::Accessor accessor(skiplist);
    7. accessor.insert(23); // 增加节点
    8. accessor.erase(2); // 删除节点
    9. for (auto &elem : accessor) {
    10. // use elem to access data
    11. }
    12. ... ...
    13. }
    14. 还有一种访问方式是 skipper,主要是用来跳过一部分数据,例如
    15. {
    16. SkipList::Accessor accessor(sl);
    17. SkipList::Skipper skipper(accessor);
    18. skipper.to(30); // 跳到比30大的第一个节点
    19. if (skipper) {
    20. CHECK_LE(30, *skipper);
    21. }
    22. ... ...
    23. // GC may happen when the accessor get sdestructed.
    24. }

    SkipList 查找

    1. typedef detail::SkipListNode NodeType;
    2. typedef detail::csl_iterator iterator;
    3. // 利用 boostiterator_facade 生成的iterator
    4. Accessor 提供的访问接口
    5. iterator find(const key_type& value) { return iterator(sl_->find(value)); }
    6. // Returns the node if found, nullptr otherwise.
    7. NodeType* find(const value_type& data) {
    8. auto ret = findNode(data);
    9. if (ret.second && !ret.first->markedForRemoval()) {
    10. return ret.first;
    11. }
    12. return nullptr;
    13. }
    14. // Find node for access. Returns a paired values:
    15. // pair.first = the first node that no-less than data value
    16. // pair.second = 1 when the data value is founded, or 0 otherwise.
    17. // This is like lower_bound, but not exact: we could have the node marked for
    18. // removal so still need to check that.
    19. std::pairint> findNode(const value_type& data) const {
    20. return findNodeDownRight(data);
    21. }

    调用 SkipList 的 find 方法,会调用 findNode 方法,如果找到节点并且该节点没有被标记删除的话就返回,否则返回 nullptr。 调用 findNodeDownRight(查找时先向下遍历,然后再向右遍历)方法。其实这里还实现了一个 findNodeRightDown(查找时先向右遍历,然后再向下遍历)方法。来看下 findNodeDownRight 方法是怎么实现的:

    1. static bool greater(const value_type& data, const NodeType* node) {
    2. return node && Comp()(node->data(), data);
    3. }
    4. static bool less(const value_type& data, const NodeType* node) {
    5. return (node == nullptr) || Comp()(data, node->data());
    6. }
    7. // Find node by first stepping down then stepping right. Based on benchmark
    8. // results, this is slightly faster than findNodeRightDown for better
    9. // localality on the skipping pointers.
    10. std::pairint> findNodeDownRight(const value_type& data) const {
    11. NodeType* pred = head_.load(std::memory_order_consume);
    12. int ht = pred->height();
    13. NodeType* node = nullptr;
    14. bool found = false;
    15. while (!found) {
    16. // stepping down,直到找到一个节点 pred 的数据比 data 大
    17. for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {
    18. }
    19. if (ht == 0) {
    20. return std::make_pair(node, 0); // not found
    21. }
    22. // node <= data now, but we need to fix up ht
    23. --ht;
    24. // stepping right,继续接近 data
    25. while (greater(data, node)) {
    26. pred = node;
    27. node = node->skip(ht);
    28. }
    29. found = !less(data, node);
    30. }
    31. return std::make_pair(node, found);
    32. }

    SkipList 增加

    1. Accessor 接口
    2. template <
    3. typename U,
    4. typename =
    5. typename std::enable_if::value>::type>
    6. std::pairbool> insert(U&& data) {
    7. auto ret = sl_->addOrGetData(std::forward(data));
    8. return std::make_pair(iterator(ret.first), ret.second);
    9. }
    10. std::pairbool> addOrGetData(const key_type& data) {
    11. auto ret = sl_->addOrGetData(data);
    12. return std::make_pair(&ret.first->data(), ret.second);
    13. }

    上面调用 SkipList 的 addOrGetData 方法如下所示:

    1. // Returns a paired value:
    2. // pair.first always stores the pointer to the node with the same input key.
    3. // It could be either the newly added data, or the existed data in the
    4. // list with the same key.
    5. // pair.second stores whether the data is added successfully:
    6. // 0 means not added, otherwise reutrns the new size.
    7. template <typename U>
    8. std::pairsize_t> addOrGetData(U&& data) {
    9. NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
    10. NodeType* newNode;
    11. size_t newSize;
    12. while (true) {
    13. int max_layer = 0;
    14. // 找到 data 对应的节点,以及它的前继和后继,max_layer 返回当前 skip list 的最大层级
    15. // 返回值 layer 是 data 对应的节点备找到时的 layer
    16. int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
    17. if (layer >= 0) {// 如果找到
    18. NodeType* nodeFound = succs[layer];
    19. DCHECK(nodeFound != nullptr);
    20. if (nodeFound->markedForRemoval()) {
    21. continue; // if it's getting deleted retry finding node.
    22. }
    23. // wait until fully linked. 可能节点被其他线程加入了,暂时还没有 fully linked
    24. // 等待完成后再返回给用户完整的节点
    25. while (UNLIKELY(!nodeFound->fullyLinked())) {
    26. }
    27. return std::make_pair(nodeFound, 0);
    28. }
    29. // need to capped at the original height -- the real height may have grown
    30. // 按概率生成新的节点高度,新节点的高度上限设为 max_layer+1
    31. // 值得注意的是选取概率是 1/e
    32. int nodeHeight =
    33. detail::SkipListRandomHeight::instance()->getHeight(max_layer + 1);
    34. ScopedLocker guards[MAX_HEIGHT];
    35. // 把前继全部加上锁
    36. if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
    37. continue; // give up the locks and retry until all valid
    38. }
    39. // locks acquired and all valid, need to modify the links under the locks.
    40. // 按照生成的高度建立新的节点
    41. newNode = NodeType::create(
    42. recycler_.alloc(), nodeHeight, std::forward(data));
    43. // 把新的节点联入 skip list 中
    44. for (int k = 0; k < nodeHeight; ++k) {
    45. newNode->setSkip(k, succs[k]);
    46. preds[k]->setSkip(k, newNode);
    47. }
    48. // 标记 fully linked
    49. newNode->setFullyLinked();
    50. newSize = incrementSize(1);
    51. break;
    52. }
    53. int hgt = height();
    54. size_t sizeLimit =
    55. detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
    56. // 检查是否需要增加 skip list 节点的高度
    57. if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
    58. growHeight(hgt + 1);
    59. }
    60. CHECK_GT(newSize, 0);
    61. return std::make_pair(newNode, newSize);
    62. }

    这个函数中调用的几个方法:

    findInsertionPointGetMaxLayer:首先它返回 skiplist 的高度,然后按照 right down 的方式查找节点,不同的是在查找过程中会保留前继指针 preds[] 和后继指针 succs[]。

    1. // find node for insertion/deleting
    2. int findInsertionPointGetMaxLayer(
    3. const value_type& data,
    4. NodeType* preds[],
    5. NodeType* succs[],
    6. int* max_layer) const {
    7. *max_layer = maxLayer();
    8. return findInsertionPoint(
    9. head_.load(std::memory_order_consume), *max_layer, data, preds, succs);
    10. }
    11. static int findInsertionPoint(
    12. NodeType* cur,
    13. int cur_layer,
    14. const value_type& data,
    15. NodeType* preds[],
    16. NodeType* succs[]) {
    17. int foundLayer = -1;
    18. NodeType* pred = cur;
    19. NodeType* foundNode = nullptr;
    20. for (int layer = cur_layer; layer >= 0; --layer) {
    21. NodeType* node = pred->skip(layer);
    22. while (greater(data, node)) {
    23. pred = node;
    24. node = node->skip(layer);
    25. }
    26. if (foundLayer == -1 && !less(data, node)) { // the two keys equal
    27. foundLayer = layer;
    28. foundNode = node;
    29. }
    30. preds[layer] = pred;
    31. // if found, succs[0..foundLayer] need to point to the cached foundNode,
    32. // as foundNode might be deleted at the same time thus pred->skip() can
    33. // return nullptr or another node.
    34. succs[layer] = foundNode ? foundNode : node;
    35. }
    36. return foundLayer;
    37. }

    SkipListRandomHeight::getHeight 和 SkipListRandomHeight::getSizeLimit

    在 SkipListRandomHeight 构造的时候会初始化两张表:lookupTable_ 是高度的概率表,sizeLimitTable_ 是 SkipList 的高度对应的最大的 list size。

    getHeight 方法用随机函数生成一个 0~1 之间的 double 值 p,然后在 lookupTable 中找比 p 大的值对应的表索引 i,找到后获得的高度就是 i+1。getSizeLimit 方法也类似,以参数 height 为 sizeLimitTable 的索引,返回对应高度的 sizeLimit。

    lockNodesForChange 的实现:

    1. // lock all the necessary nodes for changing (adding or removing) the list.
    2. // returns true if all the lock acquried successfully and the related nodes
    3. // are all validate (not in certain pending states), false otherwise.
    4. bool lockNodesForChange(
    5. int nodeHeight,
    6. ScopedLocker guards[MAX_HEIGHT],
    7. NodeType* preds[MAX_HEIGHT], // 插入或删除节点的前继
    8. NodeType* succs[MAX_HEIGHT], // 插入或删除节点的后继
    9. bool adding = true) {
    10. // adding 为 true 表明该函数是在 add 里被调用,否则是在 remove 里被调用
    11. NodeType *pred, *succ, *prevPred = nullptr;
    12. bool valid = true;
    13. for (int layer = 0; valid && layer < nodeHeight; ++layer) {
    14. pred = preds[layer];
    15. DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
    16. << " nodeheight=" << nodeHeight;
    17. succ = succs[layer];
    18. if (pred != prevPred) {
    19. // 可能连续多层的前继指针都是一个节点,这里可以避免多次上锁
    20. guards[layer] = pred->acquireGuard();
    21. prevPred = pred;
    22. }
    23. // 对于 remove 来说只要判断前继没有被删除并且前继的后继是后继节点即可
    24. valid = !pred->markedForRemoval() &&
    25. pred->skip(layer) == succ; // check again after locking
    26. // 对于 adding 来说还需要判断后继节点没有被删除
    27. if (adding) { // when adding a node, the succ shouldn't be going away
    28. valid = valid && (succ == nullptr || !succ->markedForRemoval());
    29. }
    30. }
    31. return valid;
    32. }

    growHeight 的实现:

    1. void growHeight(int height) {
    2. NodeType* oldHead = head_.load(std::memory_order_consume);
    3. if (oldHead->height() >= height) { // someone else already did this
    4. return;
    5. }
    6. // 生成新的 head 节点,height 参数就是在原来的 heigth 基础上加1
    7. NodeType* newHead =
    8. NodeType::create(recycler_.alloc(), height, value_type(), true);
    9. { // need to guard the head node in case others are adding/removing
    10. // nodes linked to the head.
    11. ScopedLocker g = oldHead->acquireGuard();
    12. // 从 oldHead 中把数据拷贝过来,类似拷贝构造函数
    13. newHead->copyHead(oldHead);
    14. NodeType* expected = oldHead;
    15. // 原子替换 head_ 指针指向 newHead
    16. if (!head_.compare_exchange_strong(
    17. expected, newHead, std::memory_order_release)) {
    18. // if someone has already done the swap, just return.
    19. NodeType::destroy(recycler_.alloc(), newHead);
    20. return;
    21. }
    22. oldHead->setMarkedForRemoval();
    23. }
    24. recycle(oldHead);
    25. }

    SkipList 删除

    1. Accessor 的接口
    2. bool remove(const key_type& data) { return sl_->remove(data); }
    3. size_t erase(const key_type& data) { return remove(data); }
    4. 具体调用
    5. bool remove(const value_type& data) {
    6. NodeType* nodeToDelete = nullptr;
    7. ScopedLocker nodeGuard;
    8. bool isMarked = false;
    9. int nodeHeight = 0;
    10. NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
    11. while (true) {
    12. int max_layer = 0;
    13. int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
    14. if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
    15. return false;
    16. }
    17. if (!isMarked) {
    18. nodeToDelete = succs[layer];
    19. nodeHeight = nodeToDelete->height();
    20. nodeGuard = nodeToDelete->acquireGuard();
    21. if (nodeToDelete->markedForRemoval()) {
    22. return false;
    23. }
    24. nodeToDelete->setMarkedForRemoval();
    25. isMarked = true;
    26. }
    27. // acquire pred locks from bottom layer up
    28. ScopedLocker guards[MAX_HEIGHT];
    29. if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
    30. continue; // this will unlock all the locks
    31. }
    32. for (int k = nodeHeight - 1; k >= 0; --k) {
    33. preds[k]->setSkip(k, nodeToDelete->skip(k));
    34. }
    35. incrementSize(-1);
    36. break;
    37. }
    38. recycle(nodeToDelete);
    39. return true;
    40. }

    注意:recycler 负责回收被删除的节点,但其实它只是把节点加入一个 vector 中,然后在 recycler 对象析构或者显示调用 release 方法时才会去释放这些节点。

    1. void recycle(NodeType* node) { recycler_.add(node); }
    2. detail::NodeRecycler recycler_;
    3. template <typename NodeType, typename NodeAlloc>
    4. class NodeRecycler<
    5. NodeType,
    6. NodeAlloc,
    7. typename std::enable_if<
    8. !NodeType::template DestroyIsNoOp::value>::type> {
    9. public:
    10. explicit NodeRecycler(const NodeAlloc& alloc)
    11. : refs_(0), dirty_(false), alloc_(alloc) {
    12. lock_.init();
    13. }
    14. explicit NodeRecycler() : refs_(0), dirty_(false) { lock_.init(); }
    15. ~NodeRecycler() {
    16. CHECK_EQ(refs(), 0);
    17. if (nodes_) {
    18. for (auto& node : *nodes_) {
    19. NodeType::destroy(alloc_, node);
    20. }
    21. }
    22. }
    23. void add(NodeType* node) {
    24. std::lock_guard g(lock_);
    25. if (nodes_.get() == nullptr) {
    26. nodes_ = std::make_unique>(1, node);
    27. } else {
    28. nodes_->push_back(node);
    29. }
    30. DCHECK_GT(refs(), 0);
    31. dirty_.store(true, std::memory_order_relaxed);
    32. }
    33. int addRef() { return refs_.fetch_add(1, std::memory_order_relaxed); }
    34. int releaseRef() {
    35. // We don't expect to clean the recycler immediately everytime it is OK
    36. // to do so. Here, it is possible that multiple accessors all release at
    37. // the same time but nobody would clean the recycler here. If this
    38. // happens, the recycler will usually still get cleaned when
    39. // such a race doesn't happen. The worst case is the recycler will
    40. // eventually get deleted along with the skiplist.
    41. if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
    42. return refs_.fetch_add(-1, std::memory_order_relaxed);
    43. }
    44. std::unique_ptr> newNodes;
    45. {
    46. std::lock_guard g(lock_);
    47. if (nodes_.get() == nullptr || refs() > 1) {
    48. return refs_.fetch_add(-1, std::memory_order_relaxed);
    49. }
    50. // once refs_ reaches 1 and there is no other accessor, it is safe to
    51. // remove all the current nodes in the recycler, as we already acquired
    52. // the lock here so no more new nodes can be added, even though new
    53. // accessors may be added after that.
    54. newNodes.swap(nodes_);
    55. dirty_.store(false, std::memory_order_relaxed);
    56. }
    57. // TODO(xliu) should we spawn a thread to do this when there are large
    58. // number of nodes in the recycler?
    59. for (auto& node : *newNodes) {
    60. NodeType::destroy(alloc_, node);
    61. }
    62. // decrease the ref count at the very end, to minimize the
    63. // chance of other threads acquiring lock_ to clear the deleted
    64. // nodes again.
    65. return refs_.fetch_add(-1, std::memory_order_relaxed);
    66. }
    67. NodeAlloc& alloc() { return alloc_; }
    68. private:
    69. int refs() const { return refs_.load(std::memory_order_relaxed); }
    70. std::unique_ptr> nodes_;
    71. std::atomic<int32_t> refs_; // current number of visitors to the list
    72. std::atomic<bool> dirty_; // whether *nodes_ is non-empty
    73. MicroSpinLock lock_; // protects access to *nodes_
    74. NodeAlloc alloc_;
    75. };

  • 相关阅读:
    css 块级元素与内联元素
    postgreSql 和mysql的一些区别
    [羊城杯2020]easyphp .htaccess的利用
    DIV布局个人介绍网页模板代码 家乡海阳个人简介网页制作 简单个人静态HTML网页设计作品 DW个人网站制作成品 web网页制作与实现...
    手动部署 OceanBase 集群
    更健康舒适更科技的照明体验!SUKER书客护眼台灯 L1上手体验
    pip快速安装torch、opencv、scipy库
    OpsWorks
    卡塔尔世界杯倒计时!世界杯直播在哪里观看?美家市场汇总来了!
    RocketMQ的主要组件及其功能
  • 原文地址:https://blog.csdn.net/qq_38289815/article/details/126883968