• Spring集成高性能队列Disruptor


    Disruptor简介

    Disruptor(中文翻译为“破坏者”或“颠覆者”)是一种高性能、低延迟的并发编程框架,最初由LMAX Exchange开发。它的主要目标是解决在金融交易系统等需要高吞吐量和低延迟的应用中的并发问题。

    Disruptor特点

    1. 无锁并发:Disruptor基于无锁的数据结构,它允许多个线程并发地读取和写入数据,而无需使用传统锁定机制,从而避免锁定的竞争和性能瓶颈。

    2. 环形缓冲区:Disruptor使用一个环形缓冲区,将数据项存储在其中。这个环形缓冲区可以通过预分配的方式来减少内存分配和垃圾回收的开销。

    3. 事件驱动:Disruptor的核心思想是将数据项(事件)从生产者传递到消费者,通过一种发布-订阅的模型来实现。生产者将事件写入缓冲区,而消费者从缓冲区中读取事件进行处理。

    4. 高性能:Disruptor专注于提供极高的吞吐量和低延迟,适用于需要快速处理大量数据的应用,如金融交易系统、网络数据传输等。

    5. 易于使用:尽管Disruptor的内部实现复杂,但它提供了简单而清晰的API,使开发人员能够相对容易地集成和使用它。

    6. 并发编程的辅助工具:Disruptor不仅仅是一个并发编程框架,还提供了一些辅助工具,如事件处理器、工作池等,帮助开发者更好地处理并发任务。

     Disruptor应用

    Disruptor可以理解为一个可以集成在项目里的MQ,它主要也分为了生产者,消息队列和消费者这么几部分,接下来用一个例子演示

    引进依赖

    1. <dependency>
    2. <groupId>com.lmaxgroupId>
    3. <artifactId>disruptorartifactId>
    4. <version>3.4.2version>
    5. dependency>

    定义监听实体

    1. @Data
    2. public class DisruptorEvent {
    3. /**
    4. * 定义加入队列的时间毫秒值
    5. */
    6. private long creatTime;
    7. /**
    8. * 携带的其他信息
    9. */
    10. private String data;
    11. }

    定义消费者

    实现EventHandler接口,重写onEvent方法

    1. @Slf4j
    2. public class DisruptorConsumer implements EventHandler {
    3. @Override
    4. public void onEvent(DisruptorEvent disruptorEvent, long l, boolean b) throws Exception {
    5. log.debug("消费者开始消费数据:[{}]", disruptorEvent);
    6. //模拟复杂环境下系统延迟
    7. Thread.sleep(1100);
    8. }
    9. }

    初始化Disruptor

    由于我们后续还要用到这个对象,所以要把他交给IOC容器来管理,定义bean名称,避免后续有多个Disruptor对象

    1. @Configuration
    2. public class DisruptorConfig {
    3. /**
    4. * 队列长度,注意必须是2的n次幂
    5. */
    6. private static final int RING_BUFFER_SIZE = 1024;
    7. @Bean(name = "cardDisruptor")
    8. public Disruptor disruptorStart() {
    9. DisruptorConsumer consumer = new DisruptorConsumer();
    10. Disruptor disruptor = new Disruptor<>(
    11. DisruptorEvent::new,
    12. RING_BUFFER_SIZE,
    13. Executors.defaultThreadFactory(),
    14. ProducerType.MULTI,
    15. new BlockingWaitStrategy());
    16. disruptor.handleEventsWith(consumer);
    17. disruptor.start();
    18. return disruptor;
    19. }
    20. }

    定义生产者

    1. @Slf4j
    2. public class DisruptorProducer {
    3. private static Disruptor disruptor;
    4. public DisruptorProducer(Disruptor disruptor) {
    5. DisruptorProducer.disruptor = disruptor;
    6. }
    7. public static void push(DisruptorEvent disruptorEvent) {
    8. //获取队列
    9. RingBuffer ringBuffer = disruptor.getRingBuffer();
    10. boolean flag = ringBuffer.tryPublishEvent(new EventTranslator() {
    11. @Override
    12. public void translateTo(DisruptorEvent event, long l) {
    13. //这里一定要写成这种set的形式(也就是说不要改变这个event的内存指向地址),不然消费者会拿不到值
    14. event.setData(disruptorEvent.getData());
    15. event.setCreatTime(disruptorEvent.getCreatTime());
    16. }
    17. });
    18. if (!flag) {
    19. throw new RuntimeException("发送消息失败!");
    20. }
    21. }
    22. }

    测试

    使用postman模拟并发10次

    1. public void test() {
    2. DisruptorEvent event = new DisruptorEvent();
    3. event.setData("哈哈哈");
    4. event.setCreatTime(System.currentTimeMillis());
    5. DisruptorProducer.push(event);
    6. log.info("请求成功");
    7. }

  • 相关阅读:
    MyBatis框架一二级缓存含代码演示
    Java之转换流的详细解析
    JavaScrip-T5(2022年11月21日移动2112班)
    01.智慧商城——项目介绍与初始化
    hive创建分区表
    AcWing第 68 场周赛题解
    java并发编程-StampedLock高性能读写锁
    【工具类】非 sudo 运行 docker
    M2 MacbookPro配置Spark源码运行环境
    ERROR: [Place 30-675]
  • 原文地址:https://blog.csdn.net/winerpro/article/details/134203492