• Redis -- 秒杀


    秒杀

    优化前

     优化后

     将库存量和一人一单的判断放在Redis中进行判断,如果满足下单条件异步完成下单操作,减少整体业务时间,和数据库交互次数。

     判断库存是否充足,用户是否已经下过单,扣除库存,必须需要具有原子性,则使用Lua脚本完成

     

    1.新增秒杀优惠券同时,保存在Redis中

    1. private static final String SECKILL_STOCK_KEY = "seckill:stock:";
    2. @Autowired
    3. private StringRedisTemplate stringRedisTemplate;
    4. public void addSeckillVoucher(Voucher voucher) {
    5. //新增秒杀卷 -- 保存数据库
    6. //saveSeckillVoucher(voucher);
    7. //将秒杀卷库存数量保存到Redis中
    8. stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    9. }

    2.利用Lua脚本,判断库存,一人一单,是否有抢购资格

    1. --1删除列表
    2. --1.1优惠券ID
    3. local voucherId = ARGV[1]
    4. --1.2 用户ID
    5. local userId = ARGV[2]
    6. --2.数据key
    7. --2.1库存key
    8. local stockKey = 'seckill:stock:' .. voucherId
    9. --2.2订单key
    10. local orderKey = 'seckill:order:' .. voucherId
    11. --3.脚本业务
    12. --3.1判断库存是否充足
    13. --字符串不能跟数字进行比对,需要使用tonumber转为数字
    14. if (tonumber(redis.call('get', stockKey)) <= 0) then
    15. --3.2库存不足返回1
    16. return 1
    17. end
    18. --3.3判断用户是否下单
    19. if (redis.call('sismember', orderKey, userId) == 1) then
    20. --3.4存在,说明用户重复下单,返回2
    21. return 2
    22. end
    23. --3.5扣库存
    24. redis.call('incrby', stockKey, -1)
    25. --3.6下单保存用户
    26. redis.call('sadd', orderKey, userId)
    27. return 0

    3.抢购成功加入阻塞队列

    1. private static final DefaultRedisScript SECKILL_SCRIPT;
    2. static {
    3. SECKILL_SCRIPT = new DefaultRedisScript<>();
    4. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    5. SECKILL_SCRIPT.setResultType(Long.class);
    6. }
    7. @Autowired
    8. private RedisIDGenerator redisIDGenerator;
    9. public void seckillVoucher(Long voucherId, Long userId) {
    10. //1.执行lua脚本
    11. Long result = stringRedisTemplate.execute(
    12. SECKILL_SCRIPT,
    13. Collections.emptyList(),
    14. voucherId.toString(), userId.toString());
    15. //2.判断是否为0
    16. int r = result.intValue();
    17. if (r != 0) {
    18. //操作失败 1库存不足 2重复下单
    19. log.info("下单失败, {}", (result == 1 ? "库存不足" : "重复下单"));
    20. return;
    21. }
    22. //3 为0,将下单信息保存到阻塞队列中
    23. long orderId = redisIDGenerator.nextId("order");
    24. VourcherOrder vourcherOrder = new VourcherOrder();
    25. vourcherOrder.setOrderId(orderId);
    26. vourcherOrder.setUserId(userId);
    27. vourcherOrder.setVoucherId(voucherId);
    28. blockingQueue.add(vourcherOrder);
    29. log.info("下单成功");
    30. }

    4.开启线程从阻塞队列中获取task,完成下单

    1. private BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1024 * 1024);
    2. private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    3. private VoucherService proxy;
    4. //类启动时就执行handler处理阻塞队列内容
    5. @PostConstruct
    6. private void init() {
    7. SECKILL_ORDER_EXECUTOR.execute(new VoucherOrderHandler());
    8. proxy = (VoucherService) AopContext.currentProxy();
    9. }
    10. private class VoucherOrderHandler implements Runnable {
    11. @Override
    12. public void run() {
    13. while (true) {
    14. try {
    15. //获取队列中的订单信息
    16. VourcherOrder vourcherOrder = blockingQueue.take();
    17. //创建订单
    18. handlerVoucherOrder(vourcherOrder);
    19. } catch (Exception e) {
    20. log.error("处理订单异常{}", e);
    21. }
    22. }
    23. }
    24. }
    25. private void handlerVoucherOrder(VourcherOrder vourcherOrder) {
    26. //1获取用户
    27. Long userId = vourcherOrder.getUserId();
    28. //2获取锁对象
    29. RLock lock = redissonClient.getLock("lock:order:" + userId);
    30. //3判断锁是否获取成功
    31. boolean isLock = lock.isLocked();
    32. if (!isLock) {
    33. log.error("handlerVoucherOrder -- 不能重复下单");
    34. return;
    35. }
    36. //4获取成功,则保存数据库,因为调用的方法存在事务则需要代理对象
    37. // 但是调用方式一个子线程,无法获取到代理对象,则需要在主线程中提前获得proxy
    38. try {
    39. proxy.createOrder(vourcherOrder);
    40. } finally {
    41. lock.unlock();
    42. }
    43. }
    44. @Transactional
    45. public String createOrder(VourcherOrder vourcherOrder) {
    46. //查询数据库order中userId是否存在记录,存在则不让购买
    47. Long userId = vourcherOrder.getUserId();
    48. int count = queryOrderCount(userId);
    49. if (count > 0) {
    50. //已买过购买失败
    51. return "fail";
    52. }
    53. boolean success = order_count(userId);
    54. if (!success) {
    55. return "库存不足";
    56. }
    57. saveOrder(userId);
    58. return "成功";
    59. // }
    60. }
    61. private void saveOrder(Long userId) {
    62. //模拟提交订单插入数据库
    63. //insert into order values(userId.....);
    64. }
    65. private boolean order_count(Long userId) {
    66. //修改库存
    67. //update order_count set count = count -1 where userId =? and count >0;
    68. return true;
    69. }
    70. private int queryOrderCount(Long userId) {
    71. //模拟查询数据库
    72. //select count(*) from order where userId = ?
    73. return 0;
    74. }

    存在的问题: 但是自实现的阻塞队列,存在安全问题,例如:任务过多内存溢出,消费后失败不能重试,宕机导致数据丢失等....

    解决方案:使用第三方的MQ服务,来存储任务,第三方的MQ服务独立在JVM外,则可以解决内存问题,并且服务本身有很好的安全属性,来解决宕机等问题。第三方的MQ服务--RabbitMQ,ActiveMQ,Kafka等

    这里使用Redis所带功能实现消息队列:

     Redis实现的消息队列

    基于List结构

     

     基于PubSub

     

     基于Stream

     

     

     

     解决消息丢失:

     

     

     

     

    代码实现:

     

     

    1.创建Stream类型的消息队列

     

     2,lua脚本

    1. --1删除列表
    2. --1.1优惠券ID
    3. local voucherId = ARGV[1]
    4. --1.2 用户ID
    5. local userId = ARGV[2]
    6. --1.3 订单ID
    7. local orderId = ARGV[2]
    8. --2.数据key
    9. --2.1库存key
    10. local stockKey = 'seckill:stock:' .. voucherId
    11. --2.2订单key
    12. local orderKey = 'seckill:order:' .. voucherId
    13. --3.脚本业务
    14. --3.1判断库存是否充足
    15. --字符串不能跟数字进行比对,需要使用tonumber转为数字
    16. if (tonumber(redis.call('get', stockKey)) <= 0) then
    17. --3.2库存不足返回1
    18. return 1
    19. end
    20. --3.3判断用户是否下单
    21. if (redis.call('sismember', orderKey, userId) == 1) then
    22. --3.4存在,说明用户重复下单,返回2
    23. return 2
    24. end
    25. --3.5扣库存
    26. redis.call('incrby', stockKey, -1)
    27. --3.6下单保存用户
    28. redis.call('sadd', orderKey, userId)
    29. --3.7 发送消息到队列中 XADD stream.order * k1 v1 k2 v2
    30. redis.call('xadd', 'stream.order', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
    31. return 0

     3.启动线程完成下单

    1. private class VoucherOrderHandler implements Runnable {
    2. String queueName = "stream.order";
    3. @Override
    4. public void run() {
    5. while (true) {
    6. try {
    7. //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
    8. List> list = stringRedisTemplate.opsForStream().read(
    9. Consumer.from("g1", "c1"),
    10. StreamReadOptions.empty().count(1L).block(Duration.ofSeconds(2)),
    11. StreamOffset.create(queueName, ReadOffset.lastConsumed()));
    12. //2.判断消息获取是否成功
    13. if (CollectionUtil.isEmpty(list)) {
    14. //2.1如果获取失败,说明没有消息,继续下一次循环
    15. continue;
    16. }
    17. //3.解析消息订单信息
    18. MapRecord record = list.get(0);
    19. Map value = record.getValue();
    20. VourcherOrder vourcherOrder = BeanUtil.fillBeanWithMap(value, new VourcherOrder(), true);
    21. //4.如果获取成功可以下单
    22. handlerVoucherOrder(vourcherOrder);
    23. //5.ACK确认
    24. stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
    25. } catch (Exception e) {
    26. log.error("处理订单异常{}", e);
    27. handlePendingList();
    28. }
    29. }
    30. }
    31. private void handlePendingList() {
    32. while (true) {
    33. try {
    34. //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0
    35. List> list = stringRedisTemplate.opsForStream().read(
    36. Consumer.from("g1", "c1"),
    37. StreamReadOptions.empty().count(1L),
    38. StreamOffset.create(queueName, ReadOffset.from("0")));
    39. //2.判断消息获取是否成功
    40. if (CollectionUtil.isEmpty(list)) {
    41. //2.1如果获取失败,说明没有消息,跳出循环
    42. break;
    43. }
    44. //3.解析消息订单信息
    45. MapRecord record = list.get(0);
    46. Map value = record.getValue();
    47. VourcherOrder vourcherOrder = BeanUtil.fillBeanWithMap(value, new VourcherOrder(), true);
    48. //4.如果获取成功可以下单
    49. handlerVoucherOrder(vourcherOrder);
    50. //5.ACK确认
    51. stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
    52. } catch (Exception e) {
    53. log.error("处理订单异常{}", e);
    54. }
    55. }
    56. }
    57. }
    58. private void handlerVoucherOrder(VourcherOrder vourcherOrder) {
    59. //1获取用户
    60. Long userId = vourcherOrder.getUserId();
    61. //2获取锁对象
    62. RLock lock = redissonClient.getLock("lock:order:" + userId);
    63. //3判断锁是否获取成功
    64. boolean isLock = lock.isLocked();
    65. if (!isLock) {
    66. log.error("handlerVoucherOrder -- 不能重复下单");
    67. return;
    68. }
    69. //4获取成功,则保存数据库,因为调用的方法存在事务则需要代理对象
    70. // 但是调用方式一个子线程,无法获取到代理对象,则需要在主线程中提前获得proxy
    71. try {
    72. proxy.createOrder(vourcherOrder);
    73. } finally {
    74. lock.unlock();
    75. }
    76. }

  • 相关阅读:
    前端工程化精讲第十课 流程分解:Webpack 的完整构建流程
    Android平台 Target API level 升级到 31,在Android 12上启动黑屏卡死
    list转map(根据某个或多个属性分组)
    重温以太坊的升级之路
    ubuntu20.4 更新中科大软件源
    综述 | 关于点云配准的全面综述(二)
    栈和队列(顺序表、单链表形式)
    嵌入式开发:估算电池寿命的7个技巧
    Nginx(openresty) 开启目录浏览 以及进行美化配置
    链式前向星核心代码解析 ← 数组模拟邻接表
  • 原文地址:https://blog.csdn.net/qq_33753147/article/details/126444366