使用场景
1、下单成功,30分钟未支付。支付超时,自动取消订单
2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评
3、下单成功,商家5分钟未接单,订单取消
4、配送超时,推送短信提醒
......
对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度的方式定时轮询处理。如:xxl-job
今天我们采用一种比较简单、轻量级的方式,使用 Redis 的延迟队列来进行处理。当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案。如:使用消息中间件Kafka、RabbitMQ 的延迟队列
原理:开启一个守护线程对消息进行轮询消费,利用zset的有序性,ZSet 有一个 Score 属性可以用来存储延迟执行的时间,将到达过期时间的消息去除,不能的实现类处理不同的业务。
- import com.yx.constant.IdWorker;
-
- import java.io.Serializable;
-
- /**
- * 延迟消息实体类
- */
- public class DelayMessage implements Serializable {
- /**
- * 队列id 用原子类生产
- */
- private String seqId;
- /**
- * 消息主体
- */
- private Object body;
- /**
- * 消息消费的时间戳
- */
- private long expire;
-
- public DelayMessage(){}
-
- public DelayMessage(Object body, long expire) {
- this.seqId = IdWorker.getNextIdStr();
- this.body = body;
- this.expire = expire;
- }
-
- public DelayMessage(Object body, int delay) {
- this(body, System.currentTimeMillis() + delay);
- }
-
- public String getSeqId() {
- return seqId;
- }
-
- public Object getBody() {
- return body;
- }
-
- public long getExpire() {
- return expire;
- }
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.core.ZSetOperations;
- import org.springframework.stereotype.Component;
-
- import java.lang.reflect.Array;
- import java.util.Set;
-
- /**
- * 延迟队列类
- */
- @Component
- public class DelayQueue {
- @Autowired
- private RedisTemplate redisTemplate;
-
- /**
- * 加入一个消息
- * @param queueName 队列名称
- * @param message 延迟消息类
- */
- public void add(String queueName, DelayMessage message) {
- redisTemplate.opsForZSet().add(queueName, message, message.getExpire());
- synchronized (this) {
- //加入一个消息后唤醒所有睡眠的线程
- this.notifyAll();
- }
- }
-
- /**
- * 取出一个消息
- * @param queueName 队列名称
- * @return
- * @throws InterruptedException
- */
- public DelayMessage take(String queueName) throws InterruptedException {
- while (true) {
- ZSetOperations.TypedTuple<DelayMessage> tuple = getFirst(queueName);
-
- if (tuple == null) {
- synchronized (this) {
- //如果该队列为空则休眠该线程
- this.wait();
- tuple = getFirst(queueName);
- }
- }
- if (tuple != null){
- //消息到期时间戳
- long expire = tuple.getScore().longValue();
- //延迟时间
- long delay = expire - System.currentTimeMillis();
- //未到达消费时间
- if (delay > 0) {
- synchronized (this) {
- //让线程休眠delay毫秒
- this.wait(delay);
- }
- }
- /*
- 这里判断消息消费时间是否大于当前时间的原因:
- 当该线程执行上一步 this.wait(delay);
- 正好有消息消费在休眠准备消费时;
- 有消息加入执行了add 加入消息,此时唤醒了休眠的线程
- 所有这里再做一次判断,防止未到时间消息就消费了
- */
- if (System.currentTimeMillis() >= expire) {
- //取出消息
- DelayMessage delayMessage = tuple.getValue();
- //把消息 id取出 建立一个key
- String lockKey = queueName + "_" + delayMessage.getSeqId() + "_lock";
- //setIfAbsent的作用是:
- //如果该key有值1,则返回false
- //如果该key没有值为1,则返回ture而且再为key加上值为1
- Boolean ok = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
- if (ok) {
- //移除redis之前存的消息
- redisTemplate.opsForZSet().remove(queueName, delayMessage);
- //删除刚刚为了保证原子性的key
- redisTemplate.delete(lockKey);
- //返回消息主体
- return delayMessage;
- }
- }
- }
- }
- }
-
- /**
- * 得到该队列的第一条数据
- * @param queueName 队列名称
- * @return
- */
- private ZSetOperations.TypedTuple getFirst(String queueName) {
- //获取队列名称为 queueName的所有的消息
- Set<ZSetOperations.TypedTuple<DelayMessage>> tuples = redisTemplate.opsForZSet().rangeWithScores(queueName, 0, -1);
- if (tuples.size() == 0) {
- return null;
- }
- ZSetOperations.TypedTuple[] array = (ZSetOperations.TypedTuple[]) Array.newInstance(ZSetOperations.TypedTuple.class, tuples.size());
- return tuples.toArray(array)[0];
- }
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
-
- /**
- * 延迟消息消费类 开启一个线程进行消息的消费
- */
- public abstract class DelayQueueConsumer implements ApplicationRunner {
- @Autowired
- private DelayQueue queue;
-
- public abstract String getQueueName();
-
- public abstract void consume(DelayMessage message);
-
- public abstract void catchInterruptedException(InterruptedException e);
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- //获取队列名称
- String queueName = getQueueName();
- if (queueName == null) {
- throw new NullPointerException("the return value of getQueueName method cannot be null");
- }
- Thread thread = new Thread(()->{
- while (true) {
- try {
- //获取消息
- DelayMessage message = queue.take(queueName);
- //消费消息
- this.consume(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- thread.setDaemon(true);
- thread.start();
- }
- @Autowired
- private DelayQueue queue;
-
- @GetMapping("add")
- public String addQueue(@RequestParam Map<String, Object> params){
- long millis = System.currentTimeMillis() + 30000;
- DelayMessage delayMessage = new DelayMessage(params, millis);
- queue.add("remind",delayMessage);
- SimpleDateFormat format = new SimpleDateFormat("hh:mm:ss");
- String time = format.format(new Date(millis));
- return time;
- }
- import java.text.SimpleDateFormat;
- import java.util.Map;
-
- /**
- * @ClassName MessageTask
- * @Author yangxi
- * @Date 2019/9/8 10:42
- * @Description desc
- **/
- @Component
- public class MessageTask extends DelayQueueConsumer {
- @Override
- public String getQueueName() {
- return "remind";
- }
-
- @Override
- public void consume(DelayMessage message) {
- Map body = (Map) message.getBody();
- String name = body.get("name").toString();
- String age = body.get("age").toString();
- SimpleDateFormat format = new SimpleDateFormat("hh:mm:ss");
- String newTime = format.format(System.currentTimeMillis());
- System.out.println("当前时间"+ newTime + age + "||| -.- |||" + name);
- }
-
- @Override
- public void catchInterruptedException(InterruptedException e) {
-
- }
- import cn.hutool.core.thread.ThreadUtil;
- import cn.hutool.extra.spring.SpringUtil;
- import com.yx.handler.DelayMessageHandler;
- import com.yx.message.DelayMessage;
- import com.yx.message.DelayMessageType;
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.Redisson;
- import org.redisson.api.RBlockingQueue;
- import org.redisson.api.RDelayedQueue;
- import org.redisson.api.RedissonClient;
- import org.springframework.stereotype.Component;
-
- import java.util.Arrays;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
-
- /**
- * @description 基于redisson实现的延时消息管理器
- */
- @Component
- @Slf4j
- public class RedissonDelayMessageManager implements DelayMessageManager {
-
- private RedissonClient redissonClient = Redisson.create();
-
- private RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque("redisson-delay-message-queue");
-
- private RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
-
- private final Map<DelayMessageType, DelayMessageHandler> handlerMap = new ConcurrentHashMap<>(16);
-
- @Override
- public void add(DelayMessage message) {
- if (rDelayedQueue.contains(message)) {
- return;
- }
- log.info("redisson-delay-message-queue,add message = {}", message);
- rDelayedQueue.offer(message, message.getProperties().getExpire(), message.getProperties().getTimeUnit());
- }
-
- @Override
- public boolean remove(DelayMessage message) {
- return rDelayedQueue.remove(message);
- }
-
- @Override
- public void destroy() {
- rDelayedQueue.destroy();
- }
-
- @Override
- public void afterPropertiesSet() {
- Arrays.stream(DelayMessageType.values()).forEach(delayMessageType -> handlerMap.put(delayMessageType, SpringUtil.getBean(delayMessageType.getHandler())));
- Thread thread = new Thread(() -> {
- while (true) {
- try {
- DelayMessage delayMessage = rBlockingQueue.take();
- log.info("redisson-delay-message-queue,consume message = {}", delayMessage);
- handlerMap.get(delayMessage.getType()).handle(delayMessage);
- } catch (InterruptedException e) {
- e.printStackTrace();
- ThreadUtil.safeSleep(1000);
- afterPropertiesSet();
- }
- }
- });
- thread.setDaemon(true);
- thread.start();
- }
- /**
- * @description 延时消息处理器接口
- */
- public interface DelayMessageHandler {
-
- /**
- * 处理消息
- *
- * @param message 消息
- */
- void handle(DelayMessage message);
- }
- @Resource(type = RedissonDelayMessageManager.class)
- private RedissonDelayMessageManager delayMessageManager;
-
- @GetMapping("add")
- private String add() {
- DelayMessage delayMessage1 = new DelayMessage();
- QrCode qrCode1 = new QrCode();
- qrCode1.setConfigId("123");
- qrCode1.setUrl("http://www.baidu.com");
- delayMessage1.setBody(JSONUtil.toJsonStr(qrCode1));
- delayMessage1.setType(DelayMessageType.DELETE_QR_CODE);
-
- DelayMessage.DelayMessageProperties properties1 = new DelayMessage.DelayMessageProperties();
- properties1.setExpire(1239);
- properties1.setTimeUnit(TimeUnit.MILLISECONDS);
- delayMessage1.setProperties(properties1);
-
- delayMessageManager.add(delayMessage1);
- return "success";
- }
- /**
- * @description 任务消息处理器
- */
- @Component
- @Slf4j
- public class ExecuteTaskDelayMessageHandler implements DelayMessageHandler {
-
- @Override
- public void handle(DelayMessage message) {
- log.info("任务延时消息处理中,message={}", message);
- System.out.println(DateUtil.now() + "" + JSONUtil.toJsonStr(message));
- }
- }