RocketMQ 支持6种消息传递方式,我们本次来聊三种消息传递模型,分别是可靠的同步传输、可靠的异步传输和单向传输。
可靠的同步传输(Reliable Synchronous Transmission):这是最常见的模型,生产者发送消息后,会等待消费者响应,确认消息已被消费者接收并处理。这种模式虽然可靠,但是由于需要等待确认,所以传输速度相对较慢。
可靠的异步传输(Reliable Asynchronous Transmission):在这种模型中,生产者发送消息后,不等待消费者的确认,直接返回,继续发送下一条消息。消费者在接收到消息后,会异步地确认消息。这种模式的传输速度较快,但是可能会存在消息丢失的风险。
单向传输(One-way Transmission):这种模型更加简单,生产者只负责发送消息,不关心消费者是否接收和处理,也不需要任何确认。这种模式通常用于对可靠性要求不高,但对速度要求高的场景,比如日志收集。
Spring boot 集成很简单
1.配置依赖
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>${rocketmq.version}version>
dependency>
application.properties
文件中配置RocketMQ的相关属性:spring.rocketmq.name-server=127.0.0.1:9876
spring.rocketmq.producer.group=producerGroup
定义:
RocketMQ同步发送是指生产者发送消息后,会在收到服务器返回确认的应答后才会发送下一条消息。这样发送消息的方式会增加消息发送的耗时,但能够确保消息被服务器成功接收。
适用场景:
对于一些重要的消息通知、短信通知、短信营销系统等,需要确保消息的准确无误的到达,可以采用RocketMQ的同步发送方式。
Springboot 集成使用示例:通过RocketMQTemplate的syncSend方法发送消息。
// 发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage() {
rocketMQTemplate.syncSend("my-topic", "Hello, RocketMQ");
}
- 同步发送方式会阻塞当前线程,直到服务器返回响应,因此需要考虑到这种方式可能会影响系统的吞吐量。
- RocketMQ的同步发送方式能够保证消息的可靠性,但也需要保证RocketMQ服务器的高可用,防止服务器出现问题导致消息丢失。
- 在使用RocketMQ的同时,还需要注意消息的顺序性和消费者的消费能力,避免出现消息堆积的情况。
RocketMQ异步发送是指在发送消息时,不等待消息发送结果的返回,而是通过回调函数来处理消息发送的结果。
适用场景:
使用RocketMQ的RocketMQTemplate
发送消息
@Service
public class MessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理发送成功的逻辑
}
@Override
public void onException(Throwable throwable) {
// 处理发送异常的逻辑
}
});
}
}
在需要发送消息的地方调用MessageProducer
的sendMessage
方法:
@RestController
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage() {
String topic = "testTopic";
String message = "Hello RocketMQ!";
messageProducer.sendMessage(topic, message);
return "Message sent successfully!";
}
}
- 异步发送消息需要通过回调函数来处理发送结果,需要考虑回调函数的执行时间和顺序,以确保消息发送的可靠性。
- 异步发送消息可能会导致消息发送的顺序不确定,需要在接收端进行相关处理,保证消息的处理顺序。
- 异步发送消息时,需要注意控制并发量,避免发送过多消息导致系统负载过高。
定义:单向发送是指消息生产者发送消息后,不等待服务器回执响应,即发送后不关心是否到达broker。这种方式发送消息的过程网络开销最小,速度最快。
适用场景:适用于某些耗时非常短,但是对可靠性要求并不高的场景,比如日志收集。
然后通过RocketMQTemplate
的 sendOneWay
方法来发送单向消息:
@Service
public class MyProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMsg(String topic, String msg) {
// 这里的topic需要和你在RocketMQ中设置的topic相对应
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msg).build());
}
}
批量发送的优点和使用场景:
优点:批量发送可以减少网络开销,提高消息传输的吞吐量,特别是在网络带宽充足的情况下。使用场景:适合大量小消息的发送,例如日志收集,统计数据等。
如何进行批量发送:
List<Message> msgs = new ArrayList<>();
msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(msgs);
} catch (Exception e) {
e.printStackTrace();
}
批量消费的优点和使用场景:
优点:批量消费可以减少消费者与消息队列的通信次数,提高消费效率。使用场景:适合处理大量小消息的场景,例如日志处理,统计数据等。
如何进行批量消费:
在RocketMQ中,批量消费主要通过设置consumer的consumeMessageBatchMaxSize属性,一次性从队列中拉取多条消息。
consumer.setConsumeMessageBatchMaxSize(10); //一次消费10条消息
批量消费的前提是生产者发送的是批量消息。这个由于RocketMQ的设计,目前的版本中并不支持批量消费单条发送的消息。
这里以Spring Boot集成RocketMQ官方starter为例,首先在pom.xml中添加依赖:
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.0.4version>
dependency>
批量发送示例:
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendBatchMessages() {
List<Message> msgs = new ArrayList<>();
msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
rocketMQTemplate.syncSend("TopicA",msgs);
}
批量消费示例:
@Service
@RocketMQMessageListener(topic = "TopicA", consumerGroup = "my-consumer_group")
public class BatchConsumer implements RocketMQListener<List<String>> {
@Override
public void onMessage(List<String> messages) {
for (String msg : messages) {
System.out.println("Receive message: " + msg);
}
}
}
同步传输模型(Synchronous)
在同步传输模型中,消息发送方(Producer)发送消息后会一直等待消息被确认(Acknowledgement)后才继续执行后续操作。消息接收方(Consumer)在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型保证了消息的可靠性,但会造成消息发送方的阻塞。
异步传输模型(Asynchronous)
在异步传输模型中,消息发送方发送消息后不会立即等待确认,而是继续执行后续操作。消息接收方在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型可以提高消息发送方的吞吐量,但消息的可靠性需要通过设置重试和回调机制来保证。
单向传输模型(Oneway)
在单向传输模型中,消息发送方发送消息后不会等待确认,也不会接收到消息接收方的确认消息。消息发送方无法得知消息是否成功接收,也无法进行重试。这种模型适用于对消息可靠性要求不高,但对发送性能要求较高的场景,如日志记录等。
我的github https://github.com/wangshuai67/icepip-springboot-action-examples