• 高性能队列Disruptor使用教程


    一、简介

    Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。由于其高性能,获得了很多大奖。
    在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。
    从数据结构上来看,Disruptor 是一个支持 生产者 -> 消费者 模式的 环形队列。能够在 无锁 的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后消费次序。

    二、代码

    2.1 依赖

      <dependency>
          <groupId>com.lmaxgroupId>
          <artifactId>disruptorartifactId>
          <version>3.4.2version>
      dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2 角色介绍

    Event :事件类:生产者和消费者之间进行交换的数据被称为事件(Event)。
    Producer: 生产者,用于发布事件。
    Consumer :消费者(实现EventHandler接口):用于处理事件。

    Disruptor通过事件工厂EventFactory在RingBuffer中预创建事件Event的实例。
    一个事件实例Event类似于一个数据槽。
    生产者Producer发布Publish之前,先从Ringbuffer中获取一个事件Event实例。
    然后生产者Producer向事件Event实例中填充数据,然后再发布到RingBuffer中。
    最后由消费者Consumer获取事件Event实例并读取实例中的数据。

    2.3 事件类

    /**
     * @Author: LiuShihao
     * @Date: 2022/11/23 14:39
     * @Desc: 定义事件类:生产者和消费者之间进行交换的数据
     */
    public class LogEvent {
    
        //事件类工厂:引用new方法
        public static final EventFactory<LogEvent> FACTORY = LogEvent::new;
        private String data;
    
        private Instant timestamp;
    
        public String getData() {
            return data;
        }
    
        public void setData(String data) {
            this.data = data;
        }
    
        public Instant getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(Instant timestamp) {
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "LogEvent{" +
                    "data='" + data + '\'' +
                    ", timestamp=" + timestamp +
                    '}';
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2.4 生产者

    /**
     * @Author: LiuShihao
     * @Date: 2022/11/23 14:39
     * @Desc: 生产者类:用于发布事件。
     */
    public class MyProducer {
    
        //RingBuffer
        private final RingBuffer<LogEvent> ringBuffer;
    
        //有参构造
        public MyProducer(RingBuffer<LogEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        /**
         * 发布事件
         * @param eventObject
         */
        public void publish(LogEvent eventObject) {
            boolean isPublished = ringBuffer.tryPublishEvent((event, sequence) -> {
                event.setTimestamp(Instant.now());
                event.setData(eventObject.getData());
            });
    
            if (!isPublished) {
                System.err.println(Thread.currentThread().getName()+" - "+Thread.currentThread().getId() + " producer Failed to publish!");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2.5 消费者

    /**
     * @Author: LiuShihao
     * @Date: 2022/11/23 14:39
     * @Desc: 消费者类:接收事件,实现EventHandler接口
     */
    public class MyConsumer implements EventHandler<LogEvent> {
        @Override
        public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("consumer:"+event);
            Thread.sleep(3000);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.6 启动Disruptor

    Disruptor的构造参数有5个:
    在这里插入图片描述

    1. EventFactory:事件工厂类,用于生产事件。
    2. ringBufferSize:环形缓冲区的大小,必须是2的次幂。
    3. threadFactory:线程工厂,用于创建线程。
    4. ProducerType:事件生产者策略(单线程和多线程)。
    5. WaitStrategy:等待策略。

    在这里插入图片描述

    通过disruptor.handleEventsWith();方法设置消费者,方法内可以传入一个或者多个消费者。

     Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(
             LogEvent.FACTORY,
             2,
             Executors.defaultThreadFactory(),
             ProducerType.MULTI,
             new BlockingWaitStrategy()
     );
     disruptor.handleEventsWith(myConsumer);
     disruptor.start();
    
     MyProducer myProducer = new MyProducer(disruptor.getRingBuffer());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.7 测试

    /**
     * @Author: LiuShihao
     * @Date: 2022/11/23 14:38
     * @Desc:
     */
    public class Main {
        public static void main(String[] args) {
            MyConsumer myConsumer = new MyConsumer();
    
            Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(
                    LogEvent.FACTORY,
                    2,
                    Executors.defaultThreadFactory(),
                    ProducerType.MULTI,
                    new BlockingWaitStrategy()
            );
            disruptor.handleEventsWith(myConsumer);
            disruptor.start();
    
            MyProducer myProducer = new MyProducer(disruptor.getRingBuffer());
    
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    LogEvent logEvent = LogEvent.FACTORY.newInstance();
                    logEvent.setData(Thread.currentThread().getName());
                    myProducer.publish(logEvent);
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    在这里插入图片描述

    源码

    https://github.com/Liu-Shihao/disruptor-demo.git

    参考文章:
    https://juejin.cn/post/6844904020973191181
    https://juejin.cn/post/6844903976924610574
    https://blog.51cto.com/u_15185289/3313032

  • 相关阅读:
    Vue3新的状态管理库-Pinia(保姆级别教程)
    Flink基础概念入门
    Blazor OIDC 单点登录授权实例7 - Blazor hybird app 端授权
    linux 应急响应工具整理列表
    Transformer - model architecture
    C语言-多层for循环详解
    Linux安装Oracle19c(极简版)
    栈和队列的概念及实现
    【iOS ARKit】RealityKit 中的物理组件
    浅谈云原生
  • 原文地址:https://blog.csdn.net/DreamsArchitects/article/details/127983158