• 通过RedisTemplate简单实现延时队列


    需求背景

    近期有个需求,是关于如何处理redis中set列表的数据,目的是实现延时队列,代码简单记录一下。

    代码实现

    1、业务层

    public interface RedisDelayQueueService {
    	/**
    	 * 添加拍卖数据至延时队列
    	 *
    	 * @param auctionId 拍卖id
    	 * @param score     时间戳
    	 * @return 是否添加成功
    	 */
    	boolean addAuctionData(Long auctionId, double score);
    
    	/**
    	 * 添加支付数据至延时队列
    	 *
    	 * @param auctionId 拍卖id
    	 * @param score     时间戳
    	 * @return 添加成功与否
    	 */
    	boolean addPaymentData(Long auctionId, double score);
    
    	/**
    	 * 获取zSet中指定key的score
    	 *
    	 * @param key   键
    	 * @param score 值
    	 * @return 时间戳
    	 */
    	Double opsForZSetScore(String key, Object score);
    
    	/**
    	 * 删除元素
    	 *
    	 * @param key   键
    	 * @param value 值
    	 */
    	void opsForZSetRemove(String key, Object value);
    
    	/**
    	 * @param key         键
    	 * @param startTime   开始时间
    	 * @param currentTime 当前时间
    	 * @return 集合
    	 */
    	Set<Object> opsForZSetRangeByScore(String key, double startTime, double currentTime);
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    2、业务实现层

    
    @Slf4j
    @RequiredArgsConstructor
    @Service
    public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
    
    	private final RedisTemplate redisTemplate;
    
    	@Override
    	public boolean addAuctionData(Long auctionId, double score) {
    		// 定义返回结果
    		boolean result;
    		try {
    			// 保存数据至缓存
    			redisTemplate.opsForZSet().add(CommonConstants.DELAY_TYPE_AUCTION, auctionId, score);
    			result = true;
    		} catch (Exception e) {
    			log.info("addAuctionData failed, auctionId: {}, score: {}, error message: {} ", auctionId, score, e.getMessage());
    			result = false;
    		}
    		return result;
    	}
    
    	@Override
    	public boolean addPaymentData(Long auctionId, double score) {
    		// 定义返回结果
    		boolean result;
    		try {
    			// 保存数据至缓存
    			redisTemplate.opsForZSet().add(CommonConstants.DELAY_TYPE_PAYMENT, auctionId, score);
    			result = true;
    		} catch (Exception e) {
    			log.info("addAuctionData failed, auctionId: {}, value: {}, score: {}, error message: {} ", auctionId, score, e.getMessage());
    			result = false;
    		}
    		return result;
    	}
    
    	@Override
    	public Double opsForZSetScore(String key, Object score) {
    		return redisTemplate.opsForZSet().score(key, score);
    	}
    
    	@Override
    	public void opsForZSetRemove(String key, Object value) {
    		redisTemplate.opsForZSet().remove(key, value);
    	}
    
    	@Override
    	public Set<Object> opsForZSetRangeByScore(String key, double startTime, double currentTime) {
    		return redisTemplate.opsForZSet().rangeByScore(key, startTime, currentTime);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    3、定时任务

    @Slf4j
    @Component
    @RequiredArgsConstructor
    @EnableScheduling
    public class RedisDelayQueueTask {
    
    	private final RedisDelayQueueService redisDelayQueueService;
    	SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    	@Scheduled(cron = "*/1 * * * * * ")
    	public void auctionConsume() {
    		log.info("------------RedisDelayQueueTask--auction--等待消费--------------");
    		// 取出auction集合中的score在0-当前时间戳这个范围的所有值
    		Set<Object> set = redisDelayQueueService.opsForZSetRangeByScore(CommonConstants.DELAY_TYPE_AUCTION, 0, System.currentTimeMillis());
    		Iterator<Object> iterator = set.iterator();
    		while (iterator.hasNext()) {
    			Integer value = (Integer) iterator.next();
    			// 遍历取出每一个score
    			Double score = redisDelayQueueService.opsForZSetScore(CommonConstants.DELAY_TYPE_AUCTION, value);
    			// 达到了时间就进行消费
    			if (System.currentTimeMillis() > score) {
    				log.info("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis()));
    				// do sth
    				redisDelayQueueService.opsForZSetRemove(CommonConstants.DELAY_TYPE_AUCTION, value);
    			}
    		}
    	}
    
    	@Scheduled(cron = "*/1 * * * * * ")
    	public void paymentConsume() {
    		log.info("------------RedisDelayQueueTask--payment--等待消费--------------");
    		// 取出auction集合中的score在0-当前时间戳这个范围的所有值
    		Set<Object> set = redisDelayQueueService.opsForZSetRangeByScore(CommonConstants.DELAY_TYPE_PAYMENT, 0, System.currentTimeMillis());
    		Iterator<Object> iterator = set.iterator();
    		while (iterator.hasNext()) {
    			Integer value = (Integer) iterator.next();
    			// 遍历取出每一个score
    			Double score = redisDelayQueueService.opsForZSetScore(CommonConstants.DELAY_TYPE_PAYMENT, value);
    			// 达到了时间就进行消费
    			if (System.currentTimeMillis() > score) {
    				log.info("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis()));
    				// do sth
    				redisDelayQueueService.opsForZSetRemove(CommonConstants.DELAY_TYPE_PAYMENT, value);
    			}
    		}
    	}
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
  • 相关阅读:
    nginx代理本地服务请求,避免跨域;前端图片压缩并上传
    电商项目之如何扣减库存
    人工智能、机器学习和深度学习
    Jetson系统烧录环境搭建
    Linux 文件夹和文件操作【Linux 常用命令系列一】
    【技术分享】配置手工模式链路聚合(交换机之间直连)
    关于Redis的事件回调解析
    c#——switch case语句
    Java基础之IO流操作
    医学图像处理中的数据读写
  • 原文地址:https://blog.csdn.net/uaucome/article/details/126177554