• 【Redis笔记】基于Redis的Stream结构作为消息队列,实现异步任务


    使用redis命令创建消息队列

    redis-cli中执行如下指令

    XGROUP CREATE key groupName ID [MKSTREAM]
    
    • 1

    key:队列名称
    groupName:消费者组名称
    ID:起始ID标示,$代表队列中最后一个消息,0代表队列中第一个消息
    MKSTREAM:队列不存在时自动创建队列

    示例:

    XGROUP CREATE streams.orders g1 0 MKSTREAM
    
    • 1

    编写Lua脚本,向redis消息队列中发送消息

    -- lua脚本中其他事项处理部分
    
    -- 获取调用的参数列表
    -- 优惠卷id
    local voucherId = ARGV[1]
    -- 用户id
    local userId = ARGV[2]
    -- 订单id
    local orderId = ARGV[3]
    
    -- key
    -- 库存key
    local stockKey = 'seckill:stock:' .. voucherId
    -- 订单key
    local orderKey = 'seckill:order:' .. voucherId
    
    -- 业务
    -- 判断库存是否充足
    if(tonumber(redis.call('get', stockKey)) <= 0) then
        -- 库存不足
        return 1
    end
    -- 判断用户是否已经下单 SISMEMBER orderKey userId
    if(redis.call('sismember', orderKey, userId) == 1) then
        -- 存在说明重复下单
        return 2
    end
    -- 扣库存,下单
    redis.call('incrby', stockKey, -1)
    redis.call('sadd', orderKey, userId)
    -- 发消息到队列, XADD stream.orders * k1 v1 k2 v2 ...
    redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
    
    return 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    业务代码——执行Lua脚本

    	private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
        static {
            SECKILL_SCRIPT = new DefaultRedisScript<>();
            // 从resources目录下加载脚本
            SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
            // lua脚本执行返回值
            SECKILL_SCRIPT.setResultType(Long.class);
        }
        @Override
        public Result seckillVoucher(Long voucherId) {
            // 获取用户
            Long userId = UserHolder.getUser().getId();
            // 订单Id
            long orderId = redisIdWorker.nextId("order");
            // 执行lua脚本
            int result = stringRedisTemplate.execute(
                    SECKILL_SCRIPT,
                    Collections.emptyList(),
                    voucherId.toString(),
                    userId.toString(),
                    String.valueOf(orderId)
            ).intValue();
            // 判断结果为0
            if (result != 0) {
                // 不为0,没有购买资格
                return Result.fail(result == 1 ? "库存不足" : "不能重复下单");
            }
            // 获取代理对象(事务)
            proxy = (IVoucherOrderService) AopContext.currentProxy();
            // 返回订单信息
            return Result.ok(orderId);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    业务代码——从消息队列获取消息并处理

    	// 线程池
        private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
        // 注解含义,在Bean被创建完毕后执行
        @PostConstruct
        private void init() {
            // 
            SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
        }
        // 从消息队列中获取消息,异步下单
        private class VoucherOrderHandler implements Runnable {
            String queueName = "stream.orders";
            @Override
            public void run() {
                while (true) {
                    try {
                        // 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 count 1 block 2000 STREAMS stream.order >
                        List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                                Consumer.from("g1", "c1"),
                                StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                                StreamOffset.create(queueName, ReadOffset.lastConsumed())
                        );
                        // 判断消息是否获取成功
                        if (list == null || list.isEmpty()) {
                            // 如果获取失败,说明没有消息,继续下一次循环
                            continue;
                        }
                        // 解析消息中的订单消息
                        MapRecord<String, Object, Object> record = list.get(0);
                        Map<Object, Object> values = record.getValue();
                        VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                        // 如果获取成功,执行下单
                        handleVoucherOrder(voucherOrder);
                        // ACK确认,SACK stream.orders g1 id
                        stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                    } catch (Exception e) {
                        log.error("订单处理异常", e);
                        // 发生异常后去pending-list中处理消息
                        handlePendingList();
                    }
                }
            }
    
            private void handlePendingList() {
                while (true) {
                    try {
                        // 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 count 1 STREAMS stream.order 0
                        List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                                Consumer.from("g1", "c1"),
                                StreamReadOptions.empty().count(1),
                                StreamOffset.create(queueName, ReadOffset.from("0"))
                        );
                        // 判断消息是否获取成功
                        if (list == null || list.isEmpty()) {
                            // 如果获取失败,说明pending-list没有异常消息,结束循环
                            break;
                        }
                        // 解析消息中的订单消息
                        MapRecord<String, Object, Object> record = list.get(0);
                        Map<Object, Object> values = record.getValue();
                        VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                        // 如果获取成功,执行下单
                        handleVoucherOrder(voucherOrder);
                        // ACK确认,SACK stream.orders g1 id
                        stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                    } catch (Exception e) {
                        log.error("处理pending-list订单处理异常", e);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
  • 相关阅读:
    怎样选择一套适合自己的跨境商城源码?
    全链路性能测试:Nginx 负载均衡的性能分析和调优
    [附源码]java毕业设计望湘人电子商城
    设计模式——状态模式
    20分钟快速入门SQL
    高架学习笔记之软件架构风格
    数据采集的基本方法?
    Leetcode—13.罗马数字转整数【简单】
    generate by chatgpt:应用上线前的checkList(部分是我自己的回答)
    手动实现SpringMVC底层机制
  • 原文地址:https://blog.csdn.net/weixin_44581175/article/details/136566344