• (三)库存超卖案例实战——使用redis分布式锁解决“超卖”问题


    前言

    在上一节内容中我们介绍了如何使用mysql数据库的传统锁(行锁、乐观锁、悲观锁)来解决并发访问导致的“超卖问题”。虽然mysql的传统锁能够很好的解决并发访问的问题,但是从性能上来讲,mysql的表现似乎并不那么优秀,而且会受制于单点故障。本节内容我们介绍一种性能更加优良的解决方案,使用内存数据库redis实现分布式锁从而控制并发访问导致的“超卖”问题。关于redis环境的搭建这里不做介绍,可查看作者往期博客内容。

    正文

    • 在项目中添加redis的依赖和配置信息

    - pom依赖配置

    1. <!-- 数据库连接池工具包-->
    2. <dependency>
    3. <groupId>org.apache.commons</groupId>
    4. <artifactId>commons-pool2</artifactId>
    5. </dependency>
    6. <!--redis启动器-->
    7. <dependency>
    8. <groupId>org.springframework.boot</groupId>
    9. <artifactId>spring-boot-starter-data-redis</artifactId>
    10. </dependency>

    - application.yml配置

    1. spring:
    2. application:
    3. name: ht-atp-plat
    4. datasource:
    5. driver-class-name: com.mysql.cj.jdbc.Driver
    6. url: jdbc:mysql://192.168.110.88:3306/ht-atp?characterEncoding=utf-8&serverTimezone=GMT%2B8&useAffectedRows=true&nullCatalogMeansCurrent=true
    7. username: root
    8. password: root
    9. profiles:
    10. active: dev
    11. # redis配置
    12. redis:
    13. host: 192.168.110.88
    14. lettuce:
    15. pool:
    16. # 连接池最大连接数(使用负值表示没有限制) 默认为8
    17. max-active: 8
    18. # 连接池中的最小空闲连接 默认为 0
    19. min-idle: 1
    20. # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1
    21. max-wait: 1000
    22. # 连接池中的最大空闲连接 默认为8
    23. max-idle: 8

    - redis序列化配置

    1. package com.ht.atp.plat.config;
    2. import com.fasterxml.jackson.annotation.JsonAutoDetect;
    3. import com.fasterxml.jackson.annotation.JsonTypeInfo;
    4. import com.fasterxml.jackson.annotation.PropertyAccessor;
    5. import com.fasterxml.jackson.databind.ObjectMapper;
    6. import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. import org.springframework.data.redis.connection.RedisConnectionFactory;
    10. import org.springframework.data.redis.core.RedisTemplate;
    11. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    12. import org.springframework.data.redis.serializer.StringRedisSerializer;
    13. @Configuration
    14. public class RedisConfig {
    15. /**
    16. * @param factory
    17. * @return
    18. */
    19. @Bean
    20. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
    21. // 缓存序列化配置,避免存储乱码
    22. RedisTemplate<String, Object> template = new RedisTemplate<>();
    23. template.setConnectionFactory(factory);
    24. Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
    25. ObjectMapper objectMapper = new ObjectMapper();
    26. objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    27. objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
    28. ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
    29. jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    30. StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
    31. // key采用String的序列化方式
    32. template.setKeySerializer(stringRedisSerializer);
    33. // hash的key也采用String的序列化方式
    34. template.setHashKeySerializer(stringRedisSerializer);
    35. // value序列化方式采用jackson
    36. template.setValueSerializer(jackson2JsonRedisSerializer);
    37. // hash的value序列化方式采用jackson
    38. template.setHashValueSerializer(jackson2JsonRedisSerializer);
    39. template.afterPropertiesSet();
    40. return template;
    41. }
    42. }

    •  在redis中增加商品P0001的库存数量为10000

    • 使用redis不加锁的业务测试

    - 业务测试代码

    1. /**
    2. * 使用redis不加锁
    3. */
    4. @Override
    5. public void checkAndReduceStock() {
    6. // 1. 查询库存数量
    7. String stockQuantity = redisTemplate.opsForValue().get("P0001").toString();
    8. // 2. 判断库存是否充足
    9. if (stockQuantity != null && stockQuantity.length() != 0) {
    10. Integer quantity = Integer.valueOf(stockQuantity);
    11. if (quantity > 0) {
    12. // 3.扣减库存
    13. redisTemplate.opsForValue().set("P0001", String.valueOf(--quantity));
    14. }
    15. }
    16. }

    - 使用jmeter压测,查看测试结果:库存并没有减少为0,说明存在“超卖”问题

    • 使用redis的setnx指令加锁,开启三个相同服务,使用jmeter压测

    - redis加锁测试代码

    1. /**
    2. * 使用redis加锁
    3. *
    4. */
    5. @Override
    6. public void checkAndReduceStock() {
    7. // 1.使用setnx加锁
    8. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000");
    9. // 2.重试:递归调用,如果获取不到锁
    10. if (!lock) {
    11. try {
    12. //暂停50ms
    13. Thread.sleep(50);
    14. this.checkAndReduceStock();
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. } else {
    19. try {
    20. // 3. 查询库存数量
    21. String stockQuantity = (String) redisTemplate.opsForValue().get("P0001");
    22. // 4. 判断库存是否充足
    23. if (stockQuantity != null && stockQuantity.length() != 0) {
    24. Integer quantity = Integer.valueOf(stockQuantity);
    25. if (quantity > 0) {
    26. // 5.扣减库存
    27. redisTemplate.opsForValue().set("P0001", String.valueOf(--quantity));
    28. }
    29. } else {
    30. System.out.println("该库存不存在!");
    31. }
    32. } finally {
    33. // 5.解锁
    34. redisTemplate.delete("lock-stock");
    35. }
    36. }
    37. }

    - 开启服务7000、7001、7002

     - jmeter压测结果:平均访问时间364ms,接口吞吐量为每秒249

    - redis数据库库存结果为:0,并发“超卖”问题解决

    • 以上普通加锁方式存在死锁问题及死锁问题的解决方案

    - 死锁产生的原因:在上述redis加锁的正常情况下,是可以解决并发访问的问题,但是也存在死锁的问题,例如7000的服务获取到锁之后,由于服务异常导致锁没有释放,那么7001和7002服务将永远不可能获取到锁。

    - 解决方案:给锁设置过期时间,自动释放锁

    ①使用expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

    ②使用setex指令设置过期时间:set key value ex 3 nx(保证原子性操作既达到setnx的效果,又设置了过期时间)

    - 代码实现

    1. public void checkAndReduceStock() {
    2. // 1.使用setex加锁,保证加锁的原子性,以及锁可以自动释放
    3. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",3, TimeUnit.SECONDS);
    4. // 2.重试:递归调用,如果获取不到锁
    5. if (!lock) {
    6. try {
    7. //暂停50ms
    8. Thread.sleep(50);
    9. this.checkAndReduceStock();
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. } else {
    14. try {
    15. // 3. 查询库存数量
    16. String stockQuantity = (String) redisTemplate.opsForValue().get("P0001");
    17. // 4. 判断库存是否充足
    18. if (stockQuantity != null && stockQuantity.length() != 0) {
    19. Integer quantity = Integer.valueOf(stockQuantity);
    20. if (quantity > 0) {
    21. // 5.扣减库存
    22. redisTemplate.opsForValue().set("P0001", String.valueOf(--quantity));
    23. }
    24. } else {
    25. System.out.println("该库存不存在!");
    26. }
    27. } finally {
    28. // 5.解锁
    29. redisTemplate.delete("lock-stock");
    30. }
    31. }
    32. }

    - 测试结果:库存扣减为0,锁也释放

    •  防止误删,在以上普通加锁的方式下,存在锁被误删除的情况

    - 锁误删除的原因:在上面的加锁场景中,会出现以下的情况,A请求方法获取到锁之后,在业务还没有执行完成,锁就被自动释放,这个时候B请求方法也会获取到锁,在B业务还未执行完成之前,A执行完成并执行手动删除锁操作,这个时候会把B业务的锁释放掉,导致B刚刚获取到锁就被释放,从而产生后续的并发访问问题。

    - 模拟锁误删除产生的并发问题

    - 库存扣减结果:没有扣减为0,产生并发问题

    - 解决方案,每个请求使用全局唯一UUID为value值,删除锁之前,先判断value值是否相同,相同再删除锁

    1. public void checkAndReduceStock() {
    2. // 1.使用setex加锁,保证加锁的原子性,以及锁可以自动释放
    3. String uuid = UUID.randomUUID().toString();
    4. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", uuid, 1, TimeUnit.SECONDS);
    5. // 2.重试:递归调用,如果获取不到锁
    6. if (!lock) {
    7. try {
    8. //暂停50ms
    9. Thread.sleep(10);
    10. this.checkAndReduceStock();
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. } else {
    15. try {
    16. // 3. 查询库存数量
    17. String stockQuantity = (String) redisTemplate.opsForValue().get("P0001");
    18. // 4. 判断库存是否充足
    19. if (stockQuantity != null && stockQuantity.length() != 0) {
    20. Integer quantity = Integer.valueOf(stockQuantity);
    21. if (quantity > 0) {
    22. // 5.扣减库存
    23. redisTemplate.opsForValue().set("P0001", String.valueOf(--quantity));
    24. }
    25. } else {
    26. System.out.println("该库存不存在!");
    27. }
    28. } finally {
    29. // 5.先判断是否是自己的锁,然后再解锁
    30. String redisUuid = (String) redisTemplate.opsForValue().get("lock-stock");
    31. if (StringUtils.equals(uuid, redisUuid)) {
    32. redisTemplate.delete("lock-stock");
    33. }
    34. }
    35. }
    36. }

    - 存在的问题:由于判断锁和解锁的操作不具有原子性,仍然会存在误删除的操作,如A请求在完成判断之后准备删除锁的时候,此时A的锁自动释放,B请求获取到锁,这个时候A请求会手动将B请求的锁删除掉,依然存在并发访问的问题。该概率很小。

    •  使用lua脚本解决锁手动释放删除的操作是原子性操作

    - lua代码解决误删操作

    1. public void checkAndReduceStock() {
    2. // 1.使用setex加锁,保证加锁的原子性,以及锁可以自动释放
    3. String uuid = UUID.randomUUID().toString();
    4. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", uuid, 1, TimeUnit.SECONDS);
    5. // 2.重试:递归调用,如果获取不到锁
    6. if (!lock) {
    7. try {
    8. //暂停50ms
    9. Thread.sleep(10);
    10. this.checkAndReduceStock();
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. } else {
    15. try {
    16. // 3. 查询库存数量
    17. String stockQuantity = (String) redisTemplate.opsForValue().get("P0001");
    18. // 4. 判断库存是否充足
    19. if (stockQuantity != null && stockQuantity.length() != 0) {
    20. Integer quantity = Integer.valueOf(stockQuantity);
    21. if (quantity > 0) {
    22. // 5.扣减库存
    23. redisTemplate.opsForValue().set("P0001", String.valueOf(--quantity));
    24. }
    25. } else {
    26. System.out.println("该库存不存在!");
    27. }
    28. } finally {
    29. // 5.先判断是否是自己的锁,然后再解锁
    30. String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
    31. "then " +
    32. " return redis.call('del', KEYS[1]) " +
    33. "else " +
    34. " return 0 " +
    35. "end";
    36. redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock-stock"), uuid);
    37. }
    38. }
    39. }

    结语

    关于使用redis分布式锁解决“超卖”问题的内容到这里就结束了,我们下期见。。。。。。

  • 相关阅读:
    Echart前端的修饰器,你不来看看吗?
    Linux 进程切换与命令行参数
    JTS: 17 DiscreteHausdorffDistance 豪斯多夫距离计算
    Edge浏览器没有让我失望! 今天终于可以在win10中模拟IE内核进行前端测试了!
    猿创征文|Vue、React可视化分析
    设计模式-装饰器模式
    Element——组件
    [附源码]计算机毕业设计springboot疫情物资管理系统
    使用 PyTorch 实现 Word2Vec 中Skip-gram 模型
    keychron矮轴无线机械键盘简直就是yyds
  • 原文地址:https://blog.csdn.net/yprufeng/article/details/134027090