redis延迟队列参考博客:https://blog.csdn.net/m0_37975854/article/details/119836978
jkd延迟队列参考博客:
https://www.jianshu.com/p/a1b3aa87f78b

zset参考博客:
https://blog.csdn.net/cm15835106905/article/details/126323705
https://blog.csdn.net/m0_37975854/article/details/119836978
Class extends RedisDelayedQueueListener> clazz, ? extends代表某个继承了RedisDelayedQueueListener接口的对象

接口:
package com.yj.redis;
/**
* 队列事件监听接口,需要实现这个方法
* @param
*/
public interface RedisDelayedQueueListener<T> {
/**
* 执行方法
*
* @param t
*/
void invoke(T t);
}
实现类:
package com.yj.redis;
import com.yj.vo.customer.CustomerManagerVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* redis延迟队列,数据过期处理类
*/
@Component
@Slf4j
public class PayQCordListener implements RedisDelayedQueueListener<CustomerManagerVO> {
@Override
public void invoke(CustomerManagerVO payStateReqVO) {
log.info("延迟时效时间:{}", new Date());
log.info("延迟失效,内容:{}", payStateReqVO);
//处理业务
log.info("延迟失效,内容:{},处理结果:{}", payStateReqVO,"测试处理");
}
}
创建redis中的zset和score(相当于创建延迟队列):
package com.yj.redis;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 创建延迟信息
*/
@Component
@Slf4j
public class RedisDelayedQueue {
@Resource(name = "getRedisson")
private RedissonClient getRedisson;
/**
* 添加队列
*
* @param t DTO传输类
* @param delay 时间数量
* @param timeUnit 时间单位
* @param 泛型
*/
private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
log.info("开始延迟队列时间:{}", new Date());
log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,t);
RBlockingQueue<T> blockingFairQueue = getRedisson.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = getRedisson.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
/**
* 添加队列-秒
*
* @param t DTO传输类
* @param delay 时间数量
* @param 泛型
*/
// 为什么返回的时候使用
public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
}
/**
* 添加队列-分
*
* @param t DTO传输类
* @param delay 时间数量
* @param 泛型
*/
public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
}
/**
* 添加队列-时
*
* @param t DTO传输类
* @param delay 时间数量
* @param 泛型
*/
public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
}
/**
* 添加队列-天
*
* @param t DTO传输类
* @param delay 时间数量
* @param 泛型
*/
public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
}
}
监听redis中数据是否过期,如果过期则进行消费
package com.yj.redis;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
/**
* 初始化队列监听
*/
@Component
@Slf4j
public class RedisDelayedQueueInit implements ApplicationContextAware {
@Resource(name = "getRedisson")
private RedissonClient getRedisson;
/**
* 获取应用上下文并获取相应的接口实现类
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
String listenerName = taskEventListenerEntry.getValue().getClass().getName();
startThread(listenerName, taskEventListenerEntry.getValue());
}
}
/**
* 启动线程获取队列*
*
* @param queueName queueName
* @param redisDelayedQueueListener 任务回调监听
* @param 泛型
* @return
*/
private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
RBlockingQueue<T> blockingFairQueue = getRedisson.getBlockingQueue(queueName);
//服务重启后,无offer,take不到信息。
getRedisson.getDelayedQueue(blockingFairQueue);
//由于此线程需要常驻,可以新建线程,不用交给线程池管理
Thread thread = new Thread(() -> {
log.info("启动监听队列线程" + queueName);
while (true) {
try {
T t = blockingFairQueue.take();
log.info("监听队列线程,监听名称:{},内容:{}", queueName, t);
redisDelayedQueueListener.invoke(t);
} catch (Exception e) {
log.info("监听队列线程错误,", e);
}
}
});
thread.setName(queueName);
thread.start();
}
}