• C++ 多线程 线程安全队列设计


    这是看《C++并发编程实战》这本书学的,这里我要为这本书辟谣一下,虽然是这本书前面翻译得很烂,但是从第6章开始,应该是换了个人翻译,虽然还是能难懂,但是难懂的是代码逻辑,而不是语言逻辑;

    实现,我先说明一下我自己的一个感悟,即对大多数线程错误的感悟:

    1:我设定一个“信息有效期”这个概念,这个概念是值我从一个信息源获取一个信息,这个信息相对这个信息源是正确的时间段,比如一个线程获取一个队列是否为空,如果得到的结果是true,那么从获取这个信息到队列中添加一个元素这段时间就是获取到的信息的有效期;

    2:大多数线程错误都是来源于信息有效期已经过去,我们知道,算法他的每一个步骤都是依赖之前的步骤的,如果前一个步骤的结果到实现依赖这个步骤的步骤时是错误的,那么不能说绝对,只能说是很可能这个步骤执行完后的结果也是错误的,那么这个“错误链”就会传递下去,如果没有正确的异常处理,那么这个程序很可能就会崩溃;

    3:怎么避免信息有效期过去?

            -:获得的信息有效期够长

            -:利用锁来将信息有效期延长;

            -:获得的信息正确与否不重要,我们只关系信息本身【这个也可能是无锁并发的实现思想之一】

    4:细粒度,如果我们要一个多线程安全的数据结构效率高的话,我们一般是需要控制好锁保护的数据范围,一般来说,一个锁保护的数据越多,那么这个数据结构并发访问的效率就越低,相反则反,另外一个方面,当一个操作需要的锁越多,那么并发访问的效率就越低,相反则反;

    粗粒度队列的设计

    一般我们是怎么设计一个队列的呢,利用锁将单线程安全队列变为多线程安全队列?

    直接用一个单链表就行了;

     我们只需要用两个互斥量来保护head和tail的是串行的即可;

    1. template<typename T>
    2. class queue
    3. {
    4. private:
    5. struct node
    6. {
    7. T data;
    8. std::unique_ptr next;
    9. node(T data_) :
    10. data(std::move(data_))
    11. {}
    12. };
    13. std::unique_ptr head; // 1
    14. node* tail; // 2
    15. mutex head_mutex, tail_mutex;
    16. public:
    17. queue()
    18. {}
    19. queue(const queue& other) = delete;
    20. queue& operator=(const queue& other) = delete;
    21. std::shared_ptr try_pop()
    22. {
    23. lock_guard lock_g(head_mutex);
    24. if (!head){
    25. return std::shared_ptr();
    26. }
    27. std::shared_ptr const res(
    28. std::make_shared(std::move(head->data)));
    29. std::unique_ptr const old_head = std::move(head);
    30. head = std::move(old_head->next); // 3
    31. return res;
    32. }
    33. void push(T new_value)
    34. {
    35. lock_guard lock_g(tail_mutex);
    36. lock_guard lock_g(head_mutex);
    37. std::unique_ptr p(new node(std::move(new_value)));
    38. node* const new_tail = p.get();
    39. if (tail){
    40. tail->next = std::move(p); // 4
    41. }
    42. else{
    43. head = std::move(p); // 5
    44. }
    45. tail = new_tail; // 6
    46. }
    47. };

    这里我们可以看到,每一个锁在保护数据的数量上,已经是非常少了【每一个锁只保护了一个节点,这已经是非常理想的状态了】,那么我们可以看到在try_pop操作当中,我们只依赖与tail节点,而在push操作当中,我们依赖head和tail两个节点,那么我们能不能进行修改,让push只依赖一个节点呢?答案是:能;

    细粒度队列的设计

    我们只需要一个虚拟节点即可,这个节点我们保证一直在队列的最后,且队列稳定的时候,tail恒定指向这个虚拟节点,这样的话,当我们push的时候,我们就可以不用依赖与head节点了,因为无论是空队列还是非空队列,在执行push操作的前后,head的指向都不变,这意味着我们可以不访问head节点;

    我们在构造队列的时候,我们创建一个虚拟节点,然后让head和tail都指向这个节点,要稍微注意的是,判空条件不再是head==nullptr了,而是head.get()==tail;

    看具体代码:

    1. class Queue {
    2. public:
    3. class node {
    4. public:
    5. shared_ptr<int> data;
    6. unique_ptr next;
    7. };
    8. private:
    9. unique_ptr head;
    10. node* tail;
    11. mutex tail_mtx, head_mtx;
    12. unique_ptr pop_head() {
    13. std::lock_guard lock_g(head_mtx);
    14. if (head.get() == get_tail()) {
    15. return nullptr;
    16. }
    17. unique_ptr old_head = std::move(head);
    18. head = std::move(old_head->next);
    19. return old_head;
    20. }
    21. node* get_tail() {
    22. std::lock_guard lock_g(tail_mtx);
    23. return tail;
    24. }
    25. public:
    26. Queue()
    27. :head(std::make_unique()), tail(head.get()){}
    28. void push(int new_date) {
    29. shared_ptr<int> date = std::make_shared<int>(new_date);
    30. unique_ptr new_tail = std::make_unique();
    31. node* temp = new_tail.get();
    32. std::lock_guard lock_g(tail_mtx);
    33. tail->data = date;
    34. tail->next = std::move(new_tail);
    35. tail = temp;
    36. }
    37. std::shared_ptr<int> try_pop() {
    38. unique_ptr ptr = pop_head();
    39. return ptr == nullptr ? nullptr : ptr->data;
    40. }
    41. };

    那么好,这个时候这个队列的并发访问性已经是非常高了【主要是队列能够操作的自由度是真不高】

    那么好,我们现在再提出一个需求,有的时候,我们一个线程必须要从一个队列中获取一个元素,才能进行下面的操作【比如线程池】,而我们不希望用一个while循环来进行检查队列中是否有元素以供获取,这个时候我们可以通过环境变量来进行,但是我们希望这个wait的过程与实际的业务代码是低耦合的,所以我们要把,这个逻辑封装到队列中;

    这里我们可以通过两种方式来获取队列中的值,一个是用引用hook,一个是直接通过返回值来获取;

    当然,我们用引用hook时,就不免会用到客户端程序员的带代码,移动拷贝结果的时候可能会报错,这个时候需要对具体的异常进行捕获,为了避免这个异常,我们设计节点用share_ptr来存储结果,因为share_ptr内部在创建对象的时候有异常处理,但是这样做的坏处是我们分配的结果必须存储到堆中,这为push时创建结果的过程增加了不少的时间开销;

    下面是具体代码:

    1. template<typename T>
    2. class queue_plus {
    3. private:
    4. class node {
    5. public:
    6. std::shared_ptr date;
    7. std::unique_ptr next;
    8. };
    9. mutex head_mtx, tail_mtx, size_mtx;
    10. unique_ptr head;
    11. node* tail;
    12. condition_variable cond;
    13. node* get_tail() {
    14. std::lock_guard lock_g(tail_mtx);
    15. return tail;
    16. }
    17. shared_ptr pop_head() {
    18. if (head->get() == get_tail()) {
    19. return nullptr;
    20. }
    21. shared_ptr date=std::move(head->date);
    22. head = std::move(head->next);
    23. return date;
    24. }
    25. unique_lock wait_for_notify() {
    26. std::unique_lock head_lock(head_mtx);
    27. cond.wait(head_lock, [&] {head.get() != get_tail(); });
    28. return head_lock;
    29. }
    30. void wait_for_date(T& value) {
    31. std::unique_lock head_lock(wait_for_notify());
    32. shared_ptr date = pop_head();
    33. if (date != nullptr) {
    34. value = std::move(*date);
    35. }
    36. }
    37. shared_ptr wait_for_date() {
    38. std::unique_lock head_lock(wait_for_notify());
    39. return pop_head();
    40. }
    41. public:
    42. void wait_and_pop(T& vlaue) {
    43. wait_for_date(value);
    44. }
    45. shared_ptr wait_and_pop() {
    46. return wait_for_date();
    47. }
    48. };

  • 相关阅读:
    flink版本升级之 checkpoint和savepoint 代码和SQL
    IP代理|一文看懂IPv4与IPv6
    RabbitMQ快速使用代码手册
    Android 开发一个动画
    公司电脑禁用U盘的方法
    linux系统延迟任务
    第六章---C++中的函数
    Docker 入门看这一篇就够了,万字详解!
    【linux外设挂载】linux系统找到U盘解决方案
    HTML+CSS简单漫画网页设计成品 蜡笔小新3页 大学生个人HTML网页制作作品
  • 原文地址:https://blog.csdn.net/weixin_62953519/article/details/127934260