• RocketMQ 消息传递模型



    在这里插入图片描述

    0. 前言

    RocketMQ 支持6种消息传递方式,我们本次来聊三种消息传递模型,分别是可靠的同步传输、可靠的异步传输和单向传输。

    1. 可靠的同步传输(Reliable Synchronous Transmission):这是最常见的模型,生产者发送消息后,会等待消费者响应,确认消息已被消费者接收并处理。这种模式虽然可靠,但是由于需要等待确认,所以传输速度相对较慢。

    2. 可靠的异步传输(Reliable Asynchronous Transmission):在这种模型中,生产者发送消息后,不等待消费者的确认,直接返回,继续发送下一条消息。消费者在接收到消息后,会异步地确认消息。这种模式的传输速度较快,但是可能会存在消息丢失的风险。

    3. 单向传输(One-way Transmission):这种模型更加简单,生产者只负责发送消息,不关心消费者是否接收和处理,也不需要任何确认。这种模式通常用于对可靠性要求不高,但对速度要求高的场景,比如日志收集

    1. RocketMQ的消息传递模型

    Spring boot 集成很简单
    1.配置依赖

    <dependency>
        <groupId>org.apache.rocketmqgroupId>
        <artifactId>rocketmq-spring-boot-starterartifactId>
        <version>${rocketmq.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. application.properties文件中配置RocketMQ的相关属性:
    spring.rocketmq.name-server=127.0.0.1:9876
    spring.rocketmq.producer.group=producerGroup
    
    • 1
    • 2

    1.1. 同步发送

    • 定义:
      RocketMQ同步发送是指生产者发送消息后,会在收到服务器返回确认的应答后才会发送下一条消息。这样发送消息的方式会增加消息发送的耗时,但能够确保消息被服务器成功接收。

    • 适用场景:
      对于一些重要的消息通知、短信通知、短信营销系统等,需要确保消息的准确无误的到达,可以采用RocketMQ的同步发送方式。

    • Springboot 集成使用示例:通过RocketMQTemplate的syncSend方法发送消息。

    // 发送消息
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendMessage() {
        rocketMQTemplate.syncSend("my-topic", "Hello, RocketMQ");
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 同步发送方式会阻塞当前线程,直到服务器返回响应,因此需要考虑到这种方式可能会影响系统的吞吐量。
    2. RocketMQ的同步发送方式能够保证消息的可靠性,但也需要保证RocketMQ服务器的高可用,防止服务器出现问题导致消息丢失。
    3. 在使用RocketMQ的同时,还需要注意消息的顺序性和消费者的消费能力,避免出现消息堆积的情况。

    1.2. 异步发送

    • RocketMQ异步发送是指在发送消息时,不等待消息发送结果的返回,而是通过回调函数来处理消息发送的结果。

    • 适用场景:

      • 需要发送大量消息,并且对消息发送的响应时间要求不高的场景。
      • 需要异步处理消息发送结果的场景,例如发送短信、邮件等通知类消息。

    使用RocketMQ的RocketMQTemplate发送消息

    @Service
    public class MessageProducer {
        
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        
        public void sendMessage(String topic, String message) {
            rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    // 处理发送成功的逻辑
                }
    
                @Override
                public void onException(Throwable throwable) {
                    // 处理发送异常的逻辑
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在需要发送消息的地方调用MessageProducersendMessage方法:

    @RestController
    public class MessageController {
        
        @Autowired
        private MessageProducer messageProducer;
        
        @GetMapping("/send")
        public String sendMessage() {
            String topic = "testTopic";
            String message = "Hello RocketMQ!";
            messageProducer.sendMessage(topic, message);
            return "Message sent successfully!";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 异步发送消息需要通过回调函数来处理发送结果,需要考虑回调函数的执行时间和顺序,以确保消息发送的可靠性。
    • 异步发送消息可能会导致消息发送的顺序不确定,需要在接收端进行相关处理,保证消息的处理顺序。
    • 异步发送消息时,需要注意控制并发量,避免发送过多消息导致系统负载过高。

    1.3. 单向发送

    • 定义:单向发送是指消息生产者发送消息后,不等待服务器回执响应,即发送后不关心是否到达broker。这种方式发送消息的过程网络开销最小,速度最快。

    • 适用场景:适用于某些耗时非常短,但是对可靠性要求并不高的场景,比如日志收集。

      • 单向发送方式并不能保证消息一定会被成功消费,因为它并不关心消息是否正确到达broker,所以如果你的业务对消息的可靠性有较高要求,不推荐使用单向发送。
      • 在大流量的情况下,单向发送方式由于其网络开销小,速度快的特点,可以显著提高系统的吞吐量。

    然后通过RocketMQTemplatesendOneWay方法来发送单向消息:

     @Service
     public class MyProducer {
         @Autowired
         private RocketMQTemplate rocketMQTemplate;
     
         public void sendMsg(String topic, String msg) {
             // 这里的topic需要和你在RocketMQ中设置的topic相对应
             rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msg).build());
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2. RocketMQ的批量发送和消费

    2.1 批量发送

    批量发送的优点和使用场景:
    优点:批量发送可以减少网络开销,提高消息传输的吞吐量,特别是在网络带宽充足的情况下。使用场景:适合大量小消息的发送,例如日志收集,统计数据等。

    如何进行批量发送:

    List<Message> msgs = new ArrayList<>();
    msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
    msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
    msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
    try {
        producer.send(msgs);
    } catch (Exception e) {
        e.printStackTrace();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.2 批量消费

    批量消费的优点和使用场景:
    优点:批量消费可以减少消费者与消息队列的通信次数,提高消费效率。使用场景:适合处理大量小消息的场景,例如日志处理,统计数据等。

    如何进行批量消费:
    在RocketMQ中,批量消费主要通过设置consumer的consumeMessageBatchMaxSize属性,一次性从队列中拉取多条消息。

    consumer.setConsumeMessageBatchMaxSize(10);  //一次消费10条消息
    
    • 1

    2.3 Spring Boot集成RocketMQ官方starter 示例

    批量消费的前提是生产者发送的是批量消息。这个由于RocketMQ的设计,目前的版本中并不支持批量消费单条发送的消息。
    这里以Spring Boot集成RocketMQ官方starter为例,首先在pom.xml中添加依赖:

    <dependency>
        <groupId>org.apache.rocketmqgroupId>
        <artifactId>rocketmq-spring-boot-starterartifactId>
        <version>2.0.4version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    批量发送示例:

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendBatchMessages() {
        List<Message> msgs = new ArrayList<>();
        msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
        msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
        msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
        rocketMQTemplate.syncSend("TopicA",msgs);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    批量消费示例:

    @Service
    @RocketMQMessageListener(topic = "TopicA", consumerGroup = "my-consumer_group")
    public class BatchConsumer implements RocketMQListener<List<String>> {
        @Override
        public void onMessage(List<String> messages) {
            for (String msg : messages) {
                System.out.println("Receive message: " + msg);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3. 总结

    同步传输模型(Synchronous)
    在同步传输模型中,消息发送方(Producer)发送消息后会一直等待消息被确认(Acknowledgement)后才继续执行后续操作。消息接收方(Consumer)在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型保证了消息的可靠性,但会造成消息发送方的阻塞。

    异步传输模型(Asynchronous)
    在异步传输模型中,消息发送方发送消息后不会立即等待确认,而是继续执行后续操作。消息接收方在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型可以提高消息发送方的吞吐量,但消息的可靠性需要通过设置重试和回调机制来保证。

    单向传输模型(Oneway)
    在单向传输模型中,消息发送方发送消息后不会等待确认,也不会接收到消息接收方的确认消息。消息发送方无法得知消息是否成功接收,也无法进行重试。这种模型适用于对消息可靠性要求不高,但对发送性能要求较高的场景,如日志记录等。

    4. 参考文档

    1. 官方文档链接:https://rocketmq.apache.org/docs/

    2. GitHub链接:https://github.com/apache/rocketmq-spring

    5. 源码地址

    我的github https://github.com/wangshuai67/icepip-springboot-action-examples

  • 相关阅读:
    USB2.0 UTMI PHY芯片测试
    Spring Cloud项目合规性注册之-(单元集成测试报告)
    Promise初体验
    散列表查找技术(数据结构)
    【雷达原理】雷达信号级建模与仿真
    SpringMvc--综合案例
    java计算机毕业设计基于springboot+vue+elementUI的结婚婚庆婚纱拍摄管理系统(前后端分离)
    Springboot+vue的入校申报审批管理系统(有报告),Javaee项目,springboot vue前后端分离项目。
    BatchNorm
    如何使用 LinkedHashMap 实现 LRU 缓存?
  • 原文地址:https://blog.csdn.net/wangshuai6707/article/details/132863088