• 分布式锁-简单入门


    状态不是很好,记一下以前学过的分布式锁吧。

    样例简介

    不谈大概念,就是简单入门以及使用。

    为什么要用分布式锁呢?

    假设我需要一个定时操作,每天在某个点,我要处理一批数据,要先从数据库中查询出来(IO操作),然后再逐步处理数据(CPU操作)。这些数据之间没有关联关系,不需要考虑数据执行的先后顺序。

    单机思路

    在单机情况下,做法是什么。开启一个定时任务,该任务中,查询数据库得到所有数据并加载到内存中。在内存中,我可以通过JUC的一些工具类,LinkedBlockingQueue保证线程安全的情况下移除元素,一个个的来处理。这是一种多线程的方案。

    那么多进程呢,集群情况下,多台主机,该怎么处理。

    多进程思路

    分布式锁结合MQ实现解决方案。我说一下个人想法的一种解决方法,具体操作:

    1.要定时处理数据,肯定是需要定时任务的。那么,三台主机都配置了定时任务。但是,从数据库中查询出数据的这一步我却只想执行一次,而不是三个主机各执行一次。在这里,就用到分布式锁了,我们可以使用tryLock方式,对其进行试图获取锁,同一个时刻,肯定只有一个任务获取到锁。剩下的两个获取失败就不执行了。我们就获取到这个数据。

    2.在获取到这个数据后,可以选择将查询到数据推送至MQ,然后三台主机作为消费者,来有效提升系统的执行速率。

    这是我内心中比较倾向的一种解决方式。性能相对高一点。

    想想不高的,如同单机的这种方式,我们该怎么实现(不涉及中间件这种)。说一个相对耗性能的方案。也利用了分布式锁。

    我要处理的数据,这个数据表中我加一个状态,0标识未处理,1标识处理。

    1.加分布式锁。

    2.查询出数据表中状态为0的单条数据。

    3.做涉及到这条数据的一系列业务操作。

    4.修改当前数据的状态为1。

    5.解分布式锁。

    有多少条数据就得加锁解锁多少次,以及查询多次数据库。性能的低下可想而之。

    代码

    例子说完了,用代码来体现一下。分布式锁我采用的是reddision的方式。这里就为了代码简单,模拟一个火车票数量减少的操作。(数据虽然放到redis了,而且redis执行是单线程的,但可以java代码将减少这个操作的过程设置为线程不安全的操作,这个样例中我是这么做的)。

    POM文件

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <groupId>com.bogroupId>
    6. <artifactId>distributed-lock-threeartifactId>
    7. <version>0.0.1-SNAPSHOTversion>
    8. <name>distributed-lock-threename>
    9. <description>distributed-lock-threedescription>
    10. <properties>
    11. <java.version>1.8java.version>
    12. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    13. <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
    14. <spring-boot.version>2.3.7.RELEASEspring-boot.version>
    15. properties>
    16. <dependencies>
    17. <dependency>
    18. <groupId>com.alibabagroupId>
    19. <artifactId>fastjsonartifactId>
    20. <version>2.0.17version>
    21. dependency>
    22. <dependency>
    23. <groupId>org.springframework.bootgroupId>
    24. <artifactId>spring-boot-devtoolsartifactId>
    25. <version>${spring-boot.version}version>
    26. <scope>runtimescope>
    27. <optional>trueoptional>
    28. dependency>
    29. <dependency>
    30. <groupId>org.projectlombokgroupId>
    31. <artifactId>lombokartifactId>
    32. <version>1.18.24version>
    33. <optional>trueoptional>
    34. dependency>
    35. <dependency>
    36. <groupId>org.springframework.bootgroupId>
    37. <artifactId>spring-boot-starter-testartifactId>
    38. <version>${spring-boot.version}version>
    39. <scope>testscope>
    40. <exclusions>
    41. <exclusion>
    42. <groupId>org.junit.vintagegroupId>
    43. <artifactId>junit-vintage-engineartifactId>
    44. exclusion>
    45. exclusions>
    46. dependency>
    47. <dependency>
    48. <groupId>org.springframework.bootgroupId>
    49. <artifactId>spring-boot-autoconfigureartifactId>
    50. <version>${spring-boot.version}version>
    51. dependency>
    52. <dependency>
    53. <groupId>org.springframework.bootgroupId>
    54. <artifactId>spring-boot-starter-webartifactId>
    55. dependency>
    56. <dependency>
    57. <groupId>org.springframeworkgroupId>
    58. <artifactId>spring-contextartifactId>
    59. <version>5.2.12.RELEASEversion>
    60. dependency>
    61. <dependency>
    62. <groupId>org.redissongroupId>
    63. <artifactId>redissonartifactId>
    64. <version>3.17.7version>
    65. dependency>
    66. dependencies>
    67. <dependencyManagement>
    68. <dependencies>
    69. <dependency>
    70. <groupId>org.springframework.bootgroupId>
    71. <artifactId>spring-boot-dependenciesartifactId>
    72. <version>${spring-boot.version}version>
    73. <type>pomtype>
    74. <scope>importscope>
    75. dependency>
    76. dependencies>
    77. dependencyManagement>
    78. <build>
    79. <plugins>
    80. <plugin>
    81. <groupId>org.apache.maven.pluginsgroupId>
    82. <artifactId>maven-compiler-pluginartifactId>
    83. <version>3.8.1version>
    84. <configuration>
    85. <source>1.8source>
    86. <target>1.8target>
    87. <encoding>UTF-8encoding>
    88. configuration>
    89. plugin>
    90. <plugin>
    91. <groupId>org.springframework.bootgroupId>
    92. <artifactId>spring-boot-maven-pluginartifactId>
    93. <version>2.3.7.RELEASEversion>
    94. <configuration>
    95. <mainClass>com.bo.distributedlockthree.DistributedLockThreeApplicationmainClass>
    96. configuration>
    97. <executions>
    98. <execution>
    99. <id>repackageid>
    100. <goals>
    101. <goal>repackagegoal>
    102. goals>
    103. execution>
    104. executions>
    105. plugin>
    106. plugins>
    107. build>
    108. project>

    Redisson注册的代码

    1. package com.bo.distributedlockthree.test.config;
    2. import org.redisson.Redisson;
    3. import org.redisson.api.RedissonClient;
    4. import org.redisson.config.Config;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.stereotype.Component;
    7. /**
    8. * @Auther: zeroB
    9. * @Date: 2022/11/14 09:53
    10. * @Description:
    11. */
    12. @Component
    13. public class TestDistributedLockConfig {
    14. @Bean
    15. public RedissonClient redissonClient(){
    16. Config config = new Config();
    17. config.useSingleServer().setAddress("redis://192.168.140.130:6379").setPassword("2wsx@WSX");
    18. RedissonClient redissonClient = Redisson.create(config);
    19. return redissonClient;
    20. }
    21. }

    返回结果类

    1. package com.bo.distributedlockthree.test.pojo;
    2. import java.io.Serializable;
    3. /**
    4. * @Auther: zeroB
    5. * @Date: 2022/11/14 11:10
    6. * @Description:
    7. */
    8. public class ResultVO implements Serializable {
    9. private boolean isSuccess;
    10. private String errorMsg;
    11. private T data;
    12. public boolean isSuccess() {
    13. return isSuccess;
    14. }
    15. public void setSuccess(boolean success) {
    16. isSuccess = success;
    17. }
    18. public String getErrorMsg() {
    19. return errorMsg;
    20. }
    21. public void setErrorMsg(String errorMsg) {
    22. this.errorMsg = errorMsg;
    23. }
    24. public T getData() {
    25. return data;
    26. }
    27. public void setData(T data) {
    28. this.data = data;
    29. }
    30. }

    工具代码

    1. package com.bo.distributedlockthree.test.action;
    2. import com.bo.distributedlockthree.test.config.properties.ApplicationUtils;
    3. import com.bo.distributedlockthree.test.pojo.ResultVO;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.redisson.api.RAtomicLong;
    6. import org.redisson.api.RLock;
    7. import org.redisson.api.RedissonClient;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.stereotype.Controller;
    10. import org.springframework.util.StringUtils;
    11. import org.springframework.web.bind.annotation.RequestMapping;
    12. import org.springframework.web.bind.annotation.ResponseBody;
    13. import java.util.ArrayList;
    14. import java.util.List;
    15. /**
    16. * @Auther: zeroB
    17. * @Date: 2022/11/14 09:48
    18. * @Description:
    19. */
    20. @Controller
    21. @Slf4j
    22. public class TestAction {
    23. @Autowired
    24. private RedissonClient redissonClient;
    25. @Autowired
    26. private ApplicationUtils applicationUtils;
    27. /**
    28. * 在redis中添加共享数据
    29. */
    30. @RequestMapping("testAddData")
    31. @ResponseBody
    32. public void testAddData(){
    33. RAtomicLong atomicLong = redissonClient.getAtomicLong("ticketCount");
    34. long l = atomicLong.addAndGet(1000L);
    35. log.info("存放至redis中的数据为{}",l);
    36. }
    37. /**
    38. * 测试下分布式锁,多进程共享数据存放至redis中
    39. */
    40. @RequestMapping("/testLock")
    41. @ResponseBody
    42. public String testLock(){
    43. RLock testLock = redissonClient.getLock("testLock");
    44. long l = 0L;
    45. try{
    46. testLock.lock();
    47. RAtomicLong ticketCount = redissonClient.getAtomicLong("ticketCount");
    48. long l1 = ticketCount.get();
    49. //如果这里不加判定,是不会存在异常的,但加上的话,就有可能出现减为0的情况了
    50. if(l1 <= 0){
    51. log.info("销售完成,销售数量为{}",l1);
    52. throw new UnsupportedOperationException("销售完成,已不支持");
    53. }
    54. //这里感觉用过CAS了,本质上已经同步处理过了
    55. Thread.sleep(500);
    56. l = ticketCount.decrementAndGet();
    57. log.info("减去后的值为"+l);
    58. } catch (InterruptedException e) {
    59. e.printStackTrace();
    60. } finally {
    61. testLock.unlock();
    62. return String.valueOf(l);
    63. }
    64. }
    65. /**
    66. * 得到容器内存放所有元素
    67. * @param name 容器内元素名称
    68. * @return
    69. * TODO 前端序列化传输展示存在问题,先不考虑这个问题
    70. */
    71. @RequestMapping("/listContainerData")
    72. @ResponseBody
    73. public ResultVO> listContainerData(String name){
    74. ResultVO> listResultVO = new ResultVO<>();
    75. if(StringUtils.isEmpty(name)){
    76. log.info("查询出全部的容器内容");
    77. listResultVO.setData(applicationUtils.listBean());
    78. return listResultVO;
    79. }
    80. List objects = new ArrayList<>();
    81. //需要序列化
    82. objects.add(applicationUtils.getBean(name));
    83. listResultVO.setData(objects);
    84. return listResultVO;
    85. }
    86. /**
    87. * 分布式锁测试tryLock方案的问题
    88. * @return
    89. */
    90. @RequestMapping("tryLockTest")
    91. @ResponseBody
    92. public ResultVO tryLockTest(){
    93. RLock testLock = redissonClient.getLock("testLock");
    94. ResultVO resultVO = new ResultVO<>();
    95. long l1 = 0L;
    96. //相当于结合CAS的dowhile循环,只有获取到锁的情况下,执行了我要执行的任务,结束了,所有的请求我都要得到并处理
    97. //但这样,一直获取不到锁,进程饥渴的情况下,会一直占用CPU,阻塞的,这个得想想方案
    98. while(true){
    99. //这块应该得加锁,因为这个值结果和判断条件是相关的,假如我不放到锁中的话
    100. /**
    101. * 1.我得到count是0了
    102. * 2.此时其它进程得到系统的值然后操作
    103. * 3.此时实际是小于0了,但依旧正常操作
    104. */
    105. if(!testLock.tryLock()){
    106. continue;
    107. }
    108. try{
    109. RAtomicLong ticketCount = redissonClient.getAtomicLong("ticketCount");
    110. long l = ticketCount.get();
    111. if(l <= 0){
    112. log.info("售票已经执行完成了,售票结果为{}",l);
    113. throw new UnsupportedOperationException("售票已经执行完成了,售票结果为"+l);
    114. }
    115. Thread.sleep(100);
    116. l1 = ticketCount.decrementAndGet();
    117. log.info("销售完成的结果为{}",l1);
    118. } catch (InterruptedException e) {
    119. e.printStackTrace();
    120. } finally {
    121. testLock.unlock();
    122. resultVO.setData(String.valueOf(l1));
    123. return resultVO;
    124. }
    125. }
    126. }
    127. }
    128. 总结

      上述的测试可以通过,只是一个简单样例,来理解一下它的思想。其实真正使用的时候吧,配置之类的肯定是不简单的。我也没有实际生产用过,只是先提供给大家一个思路。

      附上Redisson官方文档地址,有中文实在太贴心了:

      目录 · redisson/redisson Wiki · GitHubicon-default.png?t=M85Bhttps://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

    129. 相关阅读:
      wait/notify
      【MySQL】库和表的操作
      UE4实现光束和体积雾
      C【文件操作】
      经纬恒润推出全新一代智能电动座椅模块
      springboot中aop的代理模式
      【PID优化】基于头脑风暴算法PID控制器优化设计含Matlab源码
      Java 异常处理
      SpringBoot:(五)自动配置原理解析
      全球化浪潮下的数据库革新:嘉里物流 TiDB 实践价值的设想
    130. 原文地址:https://blog.csdn.net/key_wu/article/details/127986347