近期有个需求,是关于如何处理redis中set列表的数据,目的是实现延时队列,代码简单记录一下。
public interface RedisDelayQueueService {
/**
* 添加拍卖数据至延时队列
*
* @param auctionId 拍卖id
* @param score 时间戳
* @return 是否添加成功
*/
boolean addAuctionData(Long auctionId, double score);
/**
* 添加支付数据至延时队列
*
* @param auctionId 拍卖id
* @param score 时间戳
* @return 添加成功与否
*/
boolean addPaymentData(Long auctionId, double score);
/**
* 获取zSet中指定key的score
*
* @param key 键
* @param score 值
* @return 时间戳
*/
Double opsForZSetScore(String key, Object score);
/**
* 删除元素
*
* @param key 键
* @param value 值
*/
void opsForZSetRemove(String key, Object value);
/**
* @param key 键
* @param startTime 开始时间
* @param currentTime 当前时间
* @return 集合
*/
Set<Object> opsForZSetRangeByScore(String key, double startTime, double currentTime);
}
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
private final RedisTemplate redisTemplate;
@Override
public boolean addAuctionData(Long auctionId, double score) {
// 定义返回结果
boolean result;
try {
// 保存数据至缓存
redisTemplate.opsForZSet().add(CommonConstants.DELAY_TYPE_AUCTION, auctionId, score);
result = true;
} catch (Exception e) {
log.info("addAuctionData failed, auctionId: {}, score: {}, error message: {} ", auctionId, score, e.getMessage());
result = false;
}
return result;
}
@Override
public boolean addPaymentData(Long auctionId, double score) {
// 定义返回结果
boolean result;
try {
// 保存数据至缓存
redisTemplate.opsForZSet().add(CommonConstants.DELAY_TYPE_PAYMENT, auctionId, score);
result = true;
} catch (Exception e) {
log.info("addAuctionData failed, auctionId: {}, value: {}, score: {}, error message: {} ", auctionId, score, e.getMessage());
result = false;
}
return result;
}
@Override
public Double opsForZSetScore(String key, Object score) {
return redisTemplate.opsForZSet().score(key, score);
}
@Override
public void opsForZSetRemove(String key, Object value) {
redisTemplate.opsForZSet().remove(key, value);
}
@Override
public Set<Object> opsForZSetRangeByScore(String key, double startTime, double currentTime) {
return redisTemplate.opsForZSet().rangeByScore(key, startTime, currentTime);
}
}
@Slf4j
@Component
@RequiredArgsConstructor
@EnableScheduling
public class RedisDelayQueueTask {
private final RedisDelayQueueService redisDelayQueueService;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Scheduled(cron = "*/1 * * * * * ")
public void auctionConsume() {
log.info("------------RedisDelayQueueTask--auction--等待消费--------------");
// 取出auction集合中的score在0-当前时间戳这个范围的所有值
Set<Object> set = redisDelayQueueService.opsForZSetRangeByScore(CommonConstants.DELAY_TYPE_AUCTION, 0, System.currentTimeMillis());
Iterator<Object> iterator = set.iterator();
while (iterator.hasNext()) {
Integer value = (Integer) iterator.next();
// 遍历取出每一个score
Double score = redisDelayQueueService.opsForZSetScore(CommonConstants.DELAY_TYPE_AUCTION, value);
// 达到了时间就进行消费
if (System.currentTimeMillis() > score) {
log.info("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis()));
// do sth
redisDelayQueueService.opsForZSetRemove(CommonConstants.DELAY_TYPE_AUCTION, value);
}
}
}
@Scheduled(cron = "*/1 * * * * * ")
public void paymentConsume() {
log.info("------------RedisDelayQueueTask--payment--等待消费--------------");
// 取出auction集合中的score在0-当前时间戳这个范围的所有值
Set<Object> set = redisDelayQueueService.opsForZSetRangeByScore(CommonConstants.DELAY_TYPE_PAYMENT, 0, System.currentTimeMillis());
Iterator<Object> iterator = set.iterator();
while (iterator.hasNext()) {
Integer value = (Integer) iterator.next();
// 遍历取出每一个score
Double score = redisDelayQueueService.opsForZSetScore(CommonConstants.DELAY_TYPE_PAYMENT, value);
// 达到了时间就进行消费
if (System.currentTimeMillis() > score) {
log.info("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis()));
// do sth
redisDelayQueueService.opsForZSetRemove(CommonConstants.DELAY_TYPE_PAYMENT, value);
}
}
}
}