一. 背景
Disruptor是由英国外汇公司LMAX 于2010年公开的一款用于线程间数据交互的高并发组件,其原型是一个有界的环形队列,通过巧妙的设计使得该队列在多线程环境下无需加锁就能保证消息的安全可靠,为软件系统带来指数级性能提升,可以参考博文 https://blog.csdn.net/21aspnet/article/details/89709221 了解关于disruptor的更多内容。由于Disruptor原版是java开发的,而本人是从事C/C++开发工作的,期望找一个C/C++版本的实现,用到自己的工作中。为了避免令人头疼的第三方依赖问题,优先考虑使用现代C++特性,即C++ 11以上版本的实现(不想引入Boost库<_>)。Disruptor4cpp(GitHub - alexleemanfui/disruptor4cpp: C++ port of LMAX disruptor)正好满足上述需求,虽然该版本只实现到Disruptor 3.3.2版本,并且好像没有继续维护的迹象,但是3.3.2 版本的Disruptor对于大部分系统来说已经够用了,尤其是它对Disruptor一些关键特性的实现,能够很好地帮助我们理解Disruptor的工作原理。所以,最近对Disruptor4cpp展开了一些研究,下面对学习和使用过程中的一些问题做了记录。
二. 生产与消费模式
Disruptor不对生产者和消费者的数量做限制,可以是单生产者->单消费者,单生产者->多消费者,多生产者->单消费者,以及多生产者->多消费者(这里的每个生产者和消费者都指的是一个线程,而不是某个抽象业务的类实例)。
实际上,Disruptor支持两种消费模式:多播模式 和 竞争模式。多播模式下,每个消费者都会处理所有的消息,即一个消息会被多个消费者重复消费;竞争模式下,多个消费者竞争同一批消息,即一个消息仅被消费一次,分别对应了java版本实现中的BatchEventProcessor和WorkProcessor。
在Disruptor4cpp的实现中,只提供了BatchEventProcessor消费模式,给出的示例程序也是多播消费模式,如果想要使用竞争消费模式,则需要自己实现WorkProcessor。以下给出了WorkProcessor模式的实现,是对Disruptor4cpp的一个补充,包括两个文件: work_handler.h和race_work_processor.h。
- // file: work_handler.h
-
- /*
- Copyright (c) 2015, Alex Man-fui Lee
- All rights reserved.
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- * Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- * Neither the name of disruptor4cpp nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
- FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
- OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
- #ifndef DISRUPTOR4CPP_WORK_HANDLER_H_
- #define DISRUPTOR4CPP_WORK_HANDLER_H_
-
- #include
- #include
-
- namespace disruptor4cpp
- {
- template<class TEvent>
- class work_handler
- {
- public:
- virtual ~work_handler(){};
- virtual void on_start() = 0;
- virtual void on_shutdown() = 0;
- virtual void on_event(TEvent& event, int64_t sequence) = 0;
- virtual void on_timeout(int64_t sequence) = 0;
- virtual void on_event_exception(const std::exception& ex, int64_t sequence, TEvent* event) = 0;
- virtual void on_start_exception(const std::exception& ex) = 0;
- virtual void on_shutdown_exception(const std::exception& ex) = 0;
- };
- }
-
- #endif
- //file: race_work_processor.h
- /*
- Copyright (c) 2015, Alex Man-fui Lee
- All rights reserved.
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- * Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- * Neither the name of disruptor4cpp nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
- FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
- OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
- #ifndef DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
- #define DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
-
- #include
- #include
- #include
- #include
-
- #include "work_handler.h"
- #include "exceptions/alert_exception.h"
- #include "exceptions/timeout_exception.h"
- #include "sequence.h"
-
- namespace disruptor4cpp
- {
- template <typename TRingBuffer>
- class race_work_processor
- {
- public:
- race_work_processor(TRingBuffer& ring_buffer,
- typename TRingBuffer::sequence_barrier_type& sequence_barrier,
- work_handler<typename TRingBuffer::event_type>& wkr_handler,
- typename TRingBuffer::sequence_type& worksequence)
- : sequence_(),
- ring_buffer_(ring_buffer),
- sequence_barrier_(sequence_barrier),
- work_handler_(wkr_handler),
- work_sequence_(worksequence),
- running_(false)
- {
- }
-
- race_work_processor(TRingBuffer& ring_buffer,
- std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr,
- work_handler<typename TRingBuffer::event_type>& wkr_handler,
- typename TRingBuffer::sequence_type& worksequence)
- : sequence_(),
- ring_buffer_(ring_buffer),
- sequence_barrier_(*sequence_barrier_ptr),
- work_handler_(wkr_handler),
- sequence_barrier_ptr_(std::move(sequence_barrier_ptr)),
- work_sequence_(worksequence),
- running_(false)
- {
- }
-
- typename TRingBuffer::sequence_type& get_sequence()
- {
- return sequence_;
- }
-
- void halt()
- {
- running_.store(false, std::memory_order_release);
- sequence_barrier_.alert();
- }
-
- bool is_running() const
- {
- return running_.load(std::memory_order_acquire);
- }
-
- void run()
- {
- bool expected_running_state = false;
- if (!running_.compare_exchange_strong(expected_running_state, true))
- throw std::runtime_error("Thread is already running");
-
- sequence_barrier_.clear_alert();
- notify_start();
-
- typename TRingBuffer::event_type* event=nullptr;
- int64_t next_sequence;
- int64_t cached_available_sequence = LLONG_MIN;
- bool processedSequence = true;
- try
- {
- while (true)
- {
- try
- {
- if (processedSequence)
- {
- processedSequence = false;
- do
- {
- next_sequence = work_sequence_.get() + 1;
- sequence_.set(next_sequence-1);
- } while (!work_sequence_.compare_and_set(next_sequence - 1, next_sequence));
- }
- if (cached_available_sequence >= next_sequence)
- {
- event = &ring_buffer_[next_sequence];
- work_handler_.on_event(*event, next_sequence);
- processedSequence = true;
- }
- else
- {
- cached_available_sequence = sequence_barrier_.wait_for(next_sequence);
- }
- }
- catch (timeout_exception& timeout_ex)
- {
- notify_timeout(sequence_.get());
- }
- catch (alert_exception& alert_ex)
- {
- if (!running_.load(std::memory_order_acquire))
- break;
- }
- catch (std::exception& ex)
- {
- work_handler_.on_event_exception(ex, next_sequence, event);
- processedSequence = true;
- }
- }
- }
- catch (...)
- {
- notify_shutdown();
- running_.store(false, std::memory_order_release);
- throw;
- }
- notify_shutdown();
- running_.store(false, std::memory_order_release);
- }
-
- private:
- void notify_timeout(int64_t available_sequence)
- {
- try
- {
- work_handler_.on_timeout(available_sequence);
- }
- catch (std::exception& ex)
- {
- work_handler_.on_event_exception(ex, available_sequence, nullptr);
- }
- }
-
- void notify_start()
- {
- try
- {
- work_handler_.on_start();
- }
- catch (std::exception& ex)
- {
- work_handler_.on_start_exception(ex);
- }
- }
-
- void notify_shutdown()
- {
- try
- {
- work_handler_.on_shutdown();
- }
- catch (std::exception& ex)
- {
- work_handler_.on_shutdown_exception(ex);
- }
- }
-
- typename TRingBuffer::sequence_type sequence_;
- TRingBuffer& ring_buffer_;
- typename TRingBuffer::sequence_barrier_type& sequence_barrier_;//引用到ring_buffer的序列屏障,用于控制消费者不要消费没有写成功的数据
- work_handler<typename TRingBuffer::event_type>& work_handler_;
- typename TRingBuffer::sequence_type& work_sequence_;//引用到一个由多个消费者共同维护的消费序列,避免重复消费
- std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr_;
- std::atomic<bool> running_;
- };
- }
-
- #endif
-
三. 测试
在Discruptor提供的示例代码的基础上,增加竞争消费示例,代码如下。由于线程调度是由操作系统决定的,即便一个消费线程通过竞争拿到了消息,也可能因为线程挂起而推后本条消息的处理,所以每条消息的真正被消费的时机是不确定的,有可能先入队的消息后被消费,因此可能看到打印出来的消息消费顺序与消息入队顺序不一致的情况,这也是这种消费模式的重要特性。
- //main.cpp
- #include
- #include
- #include
- #include
- //将work_handler.h和race_work_processor.h放到disruptor4cpp.h同级目录,并在disruptor4cpp.h中添加 #include
-
- #include
-
- class int_event_handler : public disruptor4cpp::event_handler<int>
- {
- public:
- int_event_handler() = default;
- virtual ~int_event_handler() = default;
- virtual void on_start() { }
- virtual void on_shutdown() { }
- virtual void on_event(int& event, int64_t sequence, bool end_of_batch)
- {
- std::cout << "Received integer: " << event << ", Sequence:" << sequence << std::endl;
- }
- virtual void on_timeout(int64_t sequence) { }
- virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }
- virtual void on_start_exception(const std::exception& ex) { }
- virtual void on_shutdown_exception(const std::exception& ex) { }
- };
-
- class int_work_handler :public disruptor4cpp::work_handler<int>
- {
- public:
- int_work_handler() = default;
- virtual ~int_work_handler() = default;
- void SetWorkerId(int id) { worker_id = id; }
- virtual void on_start() { }
- virtual void on_shutdown() { }
- virtual void on_event(int& event, int64_t sequence)
- {
- std::cout << worker_id << " Received integer: " << event << ", Sequence:"<
- }
- virtual void on_timeout(int64_t sequence) { }
- virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }
- virtual void on_start_exception(const std::exception& ex) { }
- virtual void on_shutdown_exception(const std::exception& ex) { }
- private:
- int worker_id = -1;
- };
-
-
- void TestEventProcessor()
- {
- using namespace disruptor4cpp;
- // 创建一个支持多生产者写入的环形队列ring_buffer
- ring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;
- // 创建用于控制消费者消费进度的序列屏障。
- //如果要创建的消费者有上级依赖,则应该将上级消费者的序列作为参数传入。
- //此处不传参,则基于该序列屏障创建的消费者是第一级消费者
- auto barrier = ring_buffer.new_barrier();
-
- int_event_handler handler;
- //创建一个消费者(由于barrier 没有上级依赖,因此processor是一个一级消费者)
- batch_event_processor<decltype(ring_buffer)> processor(ring_buffer, *barrier.get(), handler);
- //将消费者的消费序列加入到监听列表,以便控制生产者写入时,避免覆盖未消费的数据
- ring_buffer.add_gating_sequences({ &processor.get_sequence(), });
- //启动消费者线程,开始消费
- std::thread processor_thread([&processor] { processor.run(); });
-
- // 循环写入1000条数据(也可以在多个线程中向ring_buffer中写数据)
- for (int i = 0; i < 1000; i++)
- {
- int64_t seq = ring_buffer.next();
- ring_buffer[seq] = i;
- ring_buffer.publish(seq);
- }
-
- // Stop the consumer.
- std::this_thread::sleep_for(std::chrono::seconds(1));
- processor.halt();
- processor_thread.join();
- return;
- }
-
-
- void TestWorkProcessor()
- {
- using namespace disruptor4cpp;
- // 创建一个支持多生产者写入的环形队列ring_buffer
- ring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;
- auto barrier = ring_buffer.new_barrier();
-
- sequence worksequence;
- int_work_handler wkr1;
- wkr1.SetWorkerId(1);
- race_work_processor<decltype(ring_buffer)> work1(ring_buffer, *barrier.get(), wkr1, worksequence);//一级消费者
-
- int_work_handler wkr2;
- wkr2.SetWorkerId(2);
- race_work_processor<decltype(ring_buffer)> work2(ring_buffer, *barrier.get(), wkr2, worksequence);//一级消费者
-
- //将消费者的消费序列加入到监听列表,以便控制生产者写入时,避免覆盖未消费的数据
- ring_buffer.add_gating_sequences({ &work1.get_sequence(), &work2.get_sequence(), &worksequence });
- std::thread wkr1_thread([&work1] { work1.run(); });
- std::thread wkr2_thread([&work2] { work2.run(); });
- // 循环写入1000条数据(也可以在多个线程中,向ring_buffer中写数据)
- for (int i = 0; i < 1000; i++)
- {
- int64_t seq = ring_buffer.next();
- ring_buffer[seq] = i;
- ring_buffer.publish(seq);
- }
-
- // Stop the consumer.
- std::this_thread::sleep_for(std::chrono::seconds(1));
- work1.halt();
- work2.halt();
- wkr1_thread.join();
- wkr2_thread.join();
- return;
- }
-
-
- int main(int argc, char* argv[])
- {
- TestWorkProcessor();
- //TestEventProcessor();
- return 0;
- }