参考之前的文章优惠券秒杀功能,我们完成了基于单体或者集群项目的秒杀业务。
黑马点评项目Redis实现分布式锁_兜兜转转m的博客-CSDN博客
但分析时其吞吐量并不是很高,延迟也有点高。
我们来回顾一下下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤
【结构图】

其中扣减库存和创建订单两个业务是比较耗费时间的,我们之前是在主线程中进行操作的,因此系统的延迟会很高,可以通过消息队列的方式进行异步处理来提高系统的并发能力。
其次,查询优惠券,判断秒杀库存是否足够和校验一人一单的业务也是查询数据库来进行实现的,我们能否将这些业务在Redis中进行实现?
基于上述分析,
【结构图】

【业务逻辑图】

VoucherServiceImpl:
在最后增加【新增秒杀优惠券的同时,将优惠券信息保存到Redis中业务】。
- @Override
- @Transactional
- public void addSeckillVoucher(Voucher voucher) {
- // 保存优惠券
- save(voucher);
- // 保存秒杀信息
- SeckillVoucher seckillVoucher = new SeckillVoucher();
- seckillVoucher.setVoucherId(voucher.getId());
- seckillVoucher.setStock(voucher.getStock());
- seckillVoucher.setBeginTime(voucher.getBeginTime());
- seckillVoucher.setEndTime(voucher.getEndTime());
- seckillVoucherService.save(seckillVoucher);
- // 保存秒杀库存到Redis中
- //SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
- //private static final String SECKILL_STOCK_KEY ="seckill:stock:"
- stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
- }
- -- 1.参数列表
- -- 1.1.优惠券id
- local voucherId = ARGV[1]
- -- 1.2.用户id
- local userId = ARGV[2]
- -- 1.3.订单id
- local orderId = ARGV[3]
-
- -- 2.数据key
- -- 2.1.库存key
- local stockKey = 'seckill:stock:' .. voucherId
- -- 2.2.订单key
- local orderKey = 'seckill:order:' .. voucherId
-
- -- 3.脚本业务
- -- 3.1.判断库存是否充足 get stockKey
- if(tonumber(redis.call('get', stockKey)) <= 0) then
- -- 3.2.库存不足,返回1
- return 1
- end
- -- 3.2.判断用户是否下单 SISMEMBER orderKey userId
- if(redis.call('sismember', orderKey, userId) == 1) then
- -- 3.3.存在,说明是重复下单,返回2
- return 2
- end
- -- 3.4.扣库存 incrby stockKey -1
- redis.call('incrby', stockKey, -1)
- -- 3.5.下单(保存用户)sadd orderKey userId
- redis.call('sadd', orderKey, userId)
- -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
- redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
- return 0
当以上lua表达式执行完毕后,剩下的就是根据步骤3,4来执行我们接下来的任务了
VoucherOrderServiceImpl
- @Override
- public Result seckillVoucher(Long voucherId) {
- //获取用户
- Long userId = UserHolder.getUser().getId();
- long orderId = redisIdWorker.nextId("order");
- // 1.执行lua脚本
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString(), String.valueOf(orderId)
- );
- int r = result.intValue();
- // 2.判断结果是否为0
- if (r != 0) {
- // 2.1.不为0 ,代表没有购买资格
- return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
- }
- // 3.返回订单id
- return Result.ok(orderId);
- }
采用Redis中Stream来作为消息队列。
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组:
![]()
key:队列名称 groupName:消费者组名称 ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息 MKSTREAM:队列不存在时自动创建队列 其它常见命令:
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
消费者监听消息的基本思路:

STREAM类型消息队列的XREADGROUP命令特点:
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
需求:
修改lua表达式,新增3.6

Java实现如下:
VoucherOrderServiceImpl
- private class VoucherOrderHandler implements Runnable {
-
- @Override
- public void run() {
- while (true) {
- try {
- // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
- List
> list = stringRedisTemplate.opsForStream().read( - Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
- StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
- );
- // 2.判断订单信息是否为空
- if (list == null || list.isEmpty()) {
- // 如果为null,说明没有消息,继续下一次循环
- continue;
- }
- // 解析数据
- MapRecord
record = list.get(0); - Map
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
- // 3.创建订单
- createVoucherOrder(voucherOrder);
- // 4.确认消息 XACK
- stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
- } catch (Exception e) {
- log.error("处理订单异常", e);
- //处理异常消息
- handlePendingList();
- }
- }
- }
-
- private void handlePendingList() {
- while (true) {
- try {
- // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
- List
> list = stringRedisTemplate.opsForStream().read( - Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1),
- StreamOffset.create("stream.orders", ReadOffset.from("0"))
- );
- // 2.判断订单信息是否为空
- if (list == null || list.isEmpty()) {
- // 如果为null,说明没有异常消息,结束循环
- break;
- }
- // 解析数据
- MapRecord
record = list.get(0); - Map
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
- // 3.创建订单
- createVoucherOrder(voucherOrder);
- // 4.确认消息 XACK
- stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
- } catch (Exception e) {
- log.error("处理pendding订单异常", e);
- try{
- Thread.sleep(20);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
- }
- }