对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。
延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉(取消订单释放库存),这样就不需要使用定时任务的方式去处理了
。
RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level(共18级),例如定时 5s, 10s, 1m 等
。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。
支持的level如下:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
<!--rocketMq依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
# 端口
server:
port: 8083
# 配置 rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#生产者
producer:
#生产者组名,规定在一个应用里面必须唯一
group: group1
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 3
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 3
package com.example.springbootrocketdemo.controller;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* 延时消息
* @author qzz
*/
@RestController
public class RocketMQDelayCOntroller {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延时消息
*/
@RequestMapping("/testDelaySend")
public void testDelaySend(){
Map<String,Object> orderMap = new HashMap<>();
orderMap.put("orderNumber","1357890");
orderMap.put("createTime", LocalDateTime.now());
//参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
//参数二:Message<?>
//参数三:消息发送超时时间
//参数四:delayLevel 延时level messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
rocketMQTemplate.syncSend("test-topic-delay", MessageBuilder.withPayload(JSONObject.toJSONString(orderMap)).build(),3000,3);
}
}
package com.example.springbootrocketdemo.config;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* 消费延时消息
* 配置RocketMQ监听
* @author qzz
*/
@Service
@RocketMQMessageListener(consumerGroup = "test-delay",topic = "test-topic-delay")
public class RocketMQDelayConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
Map<String,Object> orderMap = JSONObject.parseObject(s,Map.class);
String orderNumber = String.valueOf(orderMap.get("orderNumber"));
String createTime = String.valueOf(orderMap.get("createTime"));
//根据orderNumber 查询订单状态,若为未支付,则消息订单并修改库存
//....
System.out.println("consumer 延时消息消费 orderNumber:"+orderNumber+",createTime:"+createTime);
}
}
消费者类要实现RocketMQListener
接口,以及动态指定消息类型
String。
类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-delay,以及消费者组test-delay
延时消息发送与接收搭建完毕!
测试OK,成功消费!