RocketMQ发送不同的普通消息和顺序消息。
RocketMQ提供了三种方式发送普通消息:可靠同步发送、可靠异步发送和单向发送。
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
dependency>
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.2version>
dependency>
rocketmq:
name-server: localhost:9876
producer:
group: group_rocketmq #需要指定默认的组名
对于同步消息,我们可以获取到发送结果。
@RunWith(SpringRunner.class)
@SpringBootTest
public class MessageTypeTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testSyncSend(){
SendResult result = rocketMQTemplate.syncSend("topic-1", "同步测试消息", 10000);
System.out.println(result);
}
}
运行测试用例,查看RocketMq-console控制台。
添加tag标签
SendResult result = rocketMQTemplate.syncSend("topic-1:mytag", "同步测试消息", 10000);
发送异步消息,通过回调方式获取发送结果。
@Test
public void testAsyncSend() throws Exception {
rocketMQTemplate.asyncSend("topic-1:mytag", "异步测试消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功回调消息");
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("回调发生异常了");
System.out.println(throwable);
}
});
System.out.println("-----------------");
TimeUnit.SECONDS.sleep(300);//避免主线程死掉无法获取回调结果
}
运行测试用例
直接发送消息,不等待结果,不保证消息一定送达,不回调。
@Test
public void testSendOneWay(){
//不回调,不等待
rocketMQTemplate.sendOneWay("topic-1:mytag","异步单向消息");
}
运行测试用例
我们查看前面已经发送的普通消息(主题topic-1),可以看到它有4个队列,消费者在消费时就没办法保证消息的顺序性。
通过sendOneWayOrderly方法,将消息发送到同一个队列,即可实现消息的顺序性。
@Test
public void testSendOneWayOrderly(){
for (int i=0;i<10;i++){
//第三个参数的作用是用来决定这些消息发送到哪个队列
rocketMQTemplate.sendOneWayOrderly("topic-1","异步单向顺序消息","xx");
}
}
运行发送消息后,查看RocketMq-console控制台,可以看到消息进入了同一个队列。
同理,同步和异步的消息发送也提供了顺序发送的API,使用方式完全一样。