• 无锁队列Disruptor使用笔记


     一. 背景

            Disruptor是由英国外汇公司LMAX 于2010年公开的一款用于线程间数据交互的高并发组件,其原型是一个有界的环形队列,通过巧妙的设计使得该队列在多线程环境下无需加锁就能保证消息的安全可靠,为软件系统带来指数级性能提升,可以参考博文 https://blog.csdn.net/21aspnet/article/details/89709221 了解关于disruptor的更多内容。由于Disruptor原版是java开发的,而本人是从事C/C++开发工作的,期望找一个C/C++版本的实现,用到自己的工作中。为了避免令人头疼的第三方依赖问题,优先考虑使用现代C++特性,即C++ 11以上版本的实现(不想引入Boost库<_>)。Disruptor4cpp(GitHub - alexleemanfui/disruptor4cpp: C++ port of LMAX disruptor)正好满足上述需求,虽然该版本只实现到Disruptor 3.3.2版本,并且好像没有继续维护的迹象,但是3.3.2 版本的Disruptor对于大部分系统来说已经够用了,尤其是它对Disruptor一些关键特性的实现,能够很好地帮助我们理解Disruptor的工作原理。所以,最近对Disruptor4cpp展开了一些研究,下面对学习和使用过程中的一些问题做了记录。

    二.  生产与消费模式

    Disruptor不对生产者和消费者的数量做限制,可以是单生产者->单消费者,单生产者->多消费者,多生产者->单消费者,以及多生产者->多消费者(这里的每个生产者和消费者都指的是一个线程,而不是某个抽象业务的类实例)。

    实际上,Disruptor支持两种消费模式:多播模式 和 竞争模式。多播模式下,每个消费者都会处理所有的消息,即一个消息会被多个消费者重复消费;竞争模式下,多个消费者竞争同一批消息,即一个消息仅被消费一次,分别对应了java版本实现中的BatchEventProcessor和WorkProcessor。

    在Disruptor4cpp的实现中,只提供了BatchEventProcessor消费模式,给出的示例程序也是多播消费模式,如果想要使用竞争消费模式,则需要自己实现WorkProcessor。以下给出了WorkProcessor模式的实现,是对Disruptor4cpp的一个补充,包括两个文件: work_handler.h和race_work_processor.h。

    1. // file: work_handler.h
    2. /*
    3. Copyright (c) 2015, Alex Man-fui Lee
    4. All rights reserved.
    5. Redistribution and use in source and binary forms, with or without
    6. modification, are permitted provided that the following conditions are met:
    7. * Redistributions of source code must retain the above copyright notice, this
    8. list of conditions and the following disclaimer.
    9. * Redistributions in binary form must reproduce the above copyright notice,
    10. this list of conditions and the following disclaimer in the documentation
    11. and/or other materials provided with the distribution.
    12. * Neither the name of disruptor4cpp nor the names of its
    13. contributors may be used to endorse or promote products derived from
    14. this software without specific prior written permission.
    15. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
    16. AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
    17. IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    18. DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
    19. FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
    20. DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
    21. SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
    22. CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
    23. OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    24. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    25. */
    26. #ifndef DISRUPTOR4CPP_WORK_HANDLER_H_
    27. #define DISRUPTOR4CPP_WORK_HANDLER_H_
    28. #include
    29. #include
    30. namespace disruptor4cpp
    31. {
    32. template<class TEvent>
    33. class work_handler
    34. {
    35. public:
    36. virtual ~work_handler(){};
    37. virtual void on_start() = 0;
    38. virtual void on_shutdown() = 0;
    39. virtual void on_event(TEvent& event, int64_t sequence) = 0;
    40. virtual void on_timeout(int64_t sequence) = 0;
    41. virtual void on_event_exception(const std::exception& ex, int64_t sequence, TEvent* event) = 0;
    42. virtual void on_start_exception(const std::exception& ex) = 0;
    43. virtual void on_shutdown_exception(const std::exception& ex) = 0;
    44. };
    45. }
    46. #endif
    1. //file: race_work_processor.h
    2. /*
    3. Copyright (c) 2015, Alex Man-fui Lee
    4. All rights reserved.
    5. Redistribution and use in source and binary forms, with or without
    6. modification, are permitted provided that the following conditions are met:
    7. * Redistributions of source code must retain the above copyright notice, this
    8. list of conditions and the following disclaimer.
    9. * Redistributions in binary form must reproduce the above copyright notice,
    10. this list of conditions and the following disclaimer in the documentation
    11. and/or other materials provided with the distribution.
    12. * Neither the name of disruptor4cpp nor the names of its
    13. contributors may be used to endorse or promote products derived from
    14. this software without specific prior written permission.
    15. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
    16. AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
    17. IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    18. DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
    19. FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
    20. DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
    21. SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
    22. CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
    23. OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    24. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    25. */
    26. #ifndef DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
    27. #define DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
    28. #include
    29. #include
    30. #include
    31. #include
    32. #include "work_handler.h"
    33. #include "exceptions/alert_exception.h"
    34. #include "exceptions/timeout_exception.h"
    35. #include "sequence.h"
    36. namespace disruptor4cpp
    37. {
    38. template <typename TRingBuffer>
    39. class race_work_processor
    40. {
    41. public:
    42. race_work_processor(TRingBuffer& ring_buffer,
    43. typename TRingBuffer::sequence_barrier_type& sequence_barrier,
    44. work_handler<typename TRingBuffer::event_type>& wkr_handler,
    45. typename TRingBuffer::sequence_type& worksequence)
    46. : sequence_(),
    47. ring_buffer_(ring_buffer),
    48. sequence_barrier_(sequence_barrier),
    49. work_handler_(wkr_handler),
    50. work_sequence_(worksequence),
    51. running_(false)
    52. {
    53. }
    54. race_work_processor(TRingBuffer& ring_buffer,
    55. std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr,
    56. work_handler<typename TRingBuffer::event_type>& wkr_handler,
    57. typename TRingBuffer::sequence_type& worksequence)
    58. : sequence_(),
    59. ring_buffer_(ring_buffer),
    60. sequence_barrier_(*sequence_barrier_ptr),
    61. work_handler_(wkr_handler),
    62. sequence_barrier_ptr_(std::move(sequence_barrier_ptr)),
    63. work_sequence_(worksequence),
    64. running_(false)
    65. {
    66. }
    67. typename TRingBuffer::sequence_type& get_sequence()
    68. {
    69. return sequence_;
    70. }
    71. void halt()
    72. {
    73. running_.store(false, std::memory_order_release);
    74. sequence_barrier_.alert();
    75. }
    76. bool is_running() const
    77. {
    78. return running_.load(std::memory_order_acquire);
    79. }
    80. void run()
    81. {
    82. bool expected_running_state = false;
    83. if (!running_.compare_exchange_strong(expected_running_state, true))
    84. throw std::runtime_error("Thread is already running");
    85. sequence_barrier_.clear_alert();
    86. notify_start();
    87. typename TRingBuffer::event_type* event=nullptr;
    88. int64_t next_sequence;
    89. int64_t cached_available_sequence = LLONG_MIN;
    90. bool processedSequence = true;
    91. try
    92. {
    93. while (true)
    94. {
    95. try
    96. {
    97. if (processedSequence)
    98. {
    99. processedSequence = false;
    100. do
    101. {
    102. next_sequence = work_sequence_.get() + 1;
    103. sequence_.set(next_sequence-1);
    104. } while (!work_sequence_.compare_and_set(next_sequence - 1, next_sequence));
    105. }
    106. if (cached_available_sequence >= next_sequence)
    107. {
    108. event = &ring_buffer_[next_sequence];
    109. work_handler_.on_event(*event, next_sequence);
    110. processedSequence = true;
    111. }
    112. else
    113. {
    114. cached_available_sequence = sequence_barrier_.wait_for(next_sequence);
    115. }
    116. }
    117. catch (timeout_exception& timeout_ex)
    118. {
    119. notify_timeout(sequence_.get());
    120. }
    121. catch (alert_exception& alert_ex)
    122. {
    123. if (!running_.load(std::memory_order_acquire))
    124. break;
    125. }
    126. catch (std::exception& ex)
    127. {
    128. work_handler_.on_event_exception(ex, next_sequence, event);
    129. processedSequence = true;
    130. }
    131. }
    132. }
    133. catch (...)
    134. {
    135. notify_shutdown();
    136. running_.store(false, std::memory_order_release);
    137. throw;
    138. }
    139. notify_shutdown();
    140. running_.store(false, std::memory_order_release);
    141. }
    142. private:
    143. void notify_timeout(int64_t available_sequence)
    144. {
    145. try
    146. {
    147. work_handler_.on_timeout(available_sequence);
    148. }
    149. catch (std::exception& ex)
    150. {
    151. work_handler_.on_event_exception(ex, available_sequence, nullptr);
    152. }
    153. }
    154. void notify_start()
    155. {
    156. try
    157. {
    158. work_handler_.on_start();
    159. }
    160. catch (std::exception& ex)
    161. {
    162. work_handler_.on_start_exception(ex);
    163. }
    164. }
    165. void notify_shutdown()
    166. {
    167. try
    168. {
    169. work_handler_.on_shutdown();
    170. }
    171. catch (std::exception& ex)
    172. {
    173. work_handler_.on_shutdown_exception(ex);
    174. }
    175. }
    176. typename TRingBuffer::sequence_type sequence_;
    177. TRingBuffer& ring_buffer_;
    178. typename TRingBuffer::sequence_barrier_type& sequence_barrier_;//引用到ring_buffer的序列屏障,用于控制消费者不要消费没有写成功的数据
    179. work_handler<typename TRingBuffer::event_type>& work_handler_;
    180. typename TRingBuffer::sequence_type& work_sequence_;//引用到一个由多个消费者共同维护的消费序列,避免重复消费
    181. std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr_;
    182. std::atomic<bool> running_;
    183. };
    184. }
    185. #endif

     三. 测试

    在Discruptor提供的示例代码的基础上,增加竞争消费示例,代码如下。由于线程调度是由操作系统决定的,即便一个消费线程通过竞争拿到了消息,也可能因为线程挂起而推后本条消息的处理,所以每条消息的真正被消费的时机是不确定的,有可能先入队的消息后被消费,因此可能看到打印出来的消息消费顺序与消息入队顺序不一致的情况,这也是这种消费模式的重要特性。

    1. //main.cpp
    2. #include
    3. #include
    4. #include
    5. #include
    6. //将work_handler.h和race_work_processor.h放到disruptor4cpp.h同级目录,并在disruptor4cpp.h中添加 #include
    7. #include
    8. class int_event_handler : public disruptor4cpp::event_handler<int>
    9. {
    10. public:
    11. int_event_handler() = default;
    12. virtual ~int_event_handler() = default;
    13. virtual void on_start() { }
    14. virtual void on_shutdown() { }
    15. virtual void on_event(int& event, int64_t sequence, bool end_of_batch)
    16. {
    17. std::cout << "Received integer: " << event << ", Sequence:" << sequence << std::endl;
    18. }
    19. virtual void on_timeout(int64_t sequence) { }
    20. virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }
    21. virtual void on_start_exception(const std::exception& ex) { }
    22. virtual void on_shutdown_exception(const std::exception& ex) { }
    23. };
    24. class int_work_handler :public disruptor4cpp::work_handler<int>
    25. {
    26. public:
    27. int_work_handler() = default;
    28. virtual ~int_work_handler() = default;
    29. void SetWorkerId(int id) { worker_id = id; }
    30. virtual void on_start() { }
    31. virtual void on_shutdown() { }
    32. virtual void on_event(int& event, int64_t sequence)
    33. {
    34. std::cout << worker_id << " Received integer: " << event << ", Sequence:"<
    35. }
    36. virtual void on_timeout(int64_t sequence) { }
    37. virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }
    38. virtual void on_start_exception(const std::exception& ex) { }
    39. virtual void on_shutdown_exception(const std::exception& ex) { }
    40. private:
    41. int worker_id = -1;
    42. };
    43. void TestEventProcessor()
    44. {
    45. using namespace disruptor4cpp;
    46. // 创建一个支持多生产者写入的环形队列ring_buffer
    47. ring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;
    48. // 创建用于控制消费者消费进度的序列屏障。
    49. //如果要创建的消费者有上级依赖,则应该将上级消费者的序列作为参数传入。
    50. //此处不传参,则基于该序列屏障创建的消费者是第一级消费者
    51. auto barrier = ring_buffer.new_barrier();
    52. int_event_handler handler;
    53. //创建一个消费者(由于barrier 没有上级依赖,因此processor是一个一级消费者)
    54. batch_event_processor<decltype(ring_buffer)> processor(ring_buffer, *barrier.get(), handler);
    55. //将消费者的消费序列加入到监听列表,以便控制生产者写入时,避免覆盖未消费的数据
    56. ring_buffer.add_gating_sequences({ &processor.get_sequence(), });
    57. //启动消费者线程,开始消费
    58. std::thread processor_thread([&processor] { processor.run(); });
    59. // 循环写入1000条数据(也可以在多个线程中向ring_buffer中写数据)
    60. for (int i = 0; i < 1000; i++)
    61. {
    62. int64_t seq = ring_buffer.next();
    63. ring_buffer[seq] = i;
    64. ring_buffer.publish(seq);
    65. }
    66. // Stop the consumer.
    67. std::this_thread::sleep_for(std::chrono::seconds(1));
    68. processor.halt();
    69. processor_thread.join();
    70. return;
    71. }
    72. void TestWorkProcessor()
    73. {
    74. using namespace disruptor4cpp;
    75. // 创建一个支持多生产者写入的环形队列ring_buffer
    76. ring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;
    77. auto barrier = ring_buffer.new_barrier();
    78. sequence worksequence;
    79. int_work_handler wkr1;
    80. wkr1.SetWorkerId(1);
    81. race_work_processor<decltype(ring_buffer)> work1(ring_buffer, *barrier.get(), wkr1, worksequence);//一级消费者
    82. int_work_handler wkr2;
    83. wkr2.SetWorkerId(2);
    84. race_work_processor<decltype(ring_buffer)> work2(ring_buffer, *barrier.get(), wkr2, worksequence);//一级消费者
    85. //将消费者的消费序列加入到监听列表,以便控制生产者写入时,避免覆盖未消费的数据
    86. ring_buffer.add_gating_sequences({ &work1.get_sequence(), &work2.get_sequence(), &worksequence });
    87. std::thread wkr1_thread([&work1] { work1.run(); });
    88. std::thread wkr2_thread([&work2] { work2.run(); });
    89. // 循环写入1000条数据(也可以在多个线程中,向ring_buffer中写数据)
    90. for (int i = 0; i < 1000; i++)
    91. {
    92. int64_t seq = ring_buffer.next();
    93. ring_buffer[seq] = i;
    94. ring_buffer.publish(seq);
    95. }
    96. // Stop the consumer.
    97. std::this_thread::sleep_for(std::chrono::seconds(1));
    98. work1.halt();
    99. work2.halt();
    100. wkr1_thread.join();
    101. wkr2_thread.join();
    102. return;
    103. }
    104. int main(int argc, char* argv[])
    105. {
    106. TestWorkProcessor();
    107. //TestEventProcessor();
    108. return 0;
    109. }

     

     

     

  • 相关阅读:
    Linux 文件操作(一) —— 遍历指定目录下的所有文件
    MFC真的过时了吗?C++是否真的适合做GUI界面?
    监控易:支持多种协议和设备,适应复杂多变的IT环境
    【滤波器】归一化LMS自适应滤波器
    Ajax--Ajax加强--XMLHttpRequest的基本使用
    普通人修谱必须读的三本书,最后一本市场买不到
    VB.net TCP服务端监听端口接收客户端RFID网络读卡器上传的读卡数据
    Ansible任务控制loop循环、when和block条件判断介绍演示
    如何使用 API 接口获取商品数据,从申请 API 接口、使用 API 接口到实际应用,一一讲解
    记录代码用
  • 原文地址:https://blog.csdn.net/yuanshenqiang/article/details/133913833