在开发中,有时需要使用延时队列。
比如,订单15分钟内未支付自动取消。
如果使用 jdk自带的延时队列,那么服务器挂了或者重启时,延时队列里的数据就会失效,可用性比较差。
可以使用Redisson的延时队列。
Redisson的配置,详情见:https://blog.csdn.net/sinat_32502451/article/details/133799192
public void addDelayQueue(String orderId) {
RBlockingDeque blockingDeque = redissonClient.getBlockingDeque("orderQueue");
RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
//在延时队列中添加任务,5秒后生效
delayedQueue.offer(orderId, 5, TimeUnit.SECONDS);
log.info("addDelayQueue orderId:" + orderId);
}
取出延时队列中的任务,如果延时队列中没有任务,会阻塞,直到队列中添加了任务。
注意:如果项目中有用到定时任务的中间件,可以使用中间件,设置每隔1-2秒去取出延时队列中的任务。 while (true) 轮询,万一发生错误没有继续执行,有可能会影响业务。
@Component
@Order(1)
@Slf4j
public class DelayQueueRunner implements ApplicationRunner {
@Resource
private RedissonClient redissonClient;
private final ExecutorService executorService = new ThreadPoolExecutor(
5, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy());
@Override
public void run(ApplicationArguments args) {
RBlockingDeque blockingDeque = redissonClient.getBlockingDeque("orderQueue");
while (true) {
try {
String orderId = blockingDeque.take();
//异步执行
executorService.execute(() -> {
log.info("DelayQueue get orderId:{}", orderId);
//其他的业务逻辑
});
} catch (Exception e) {
log.error("take error.", e);
}
}
}
}
不断在延时队列中拉取数据,由于队列中没有数据,所以该方法会先阻塞。
接着调用 addDelayQueue()方法,往队列中添加数据,观察日志,可以发现 5秒后,取到队列中的数据。
[2023-10-12 21:30:54.725] INFO c.c.m.c.controller.DelayQueueController [line: 54] addDelayQueue orderId:12345
[2023-10-12 21:30:59.821] INFO c.c.m.c.controller.DelayQueueRunner [line: 72] DelayQueue get orderId:12345
https://blog.csdn.net/sinat_32502451/article/details/133799192
https://blog.csdn.net/qq_27818157/article/details/107514319
https://blog.csdn.net/u012228523/article/details/132339547