状态不是很好,记一下以前学过的分布式锁吧。
不谈大概念,就是简单入门以及使用。
为什么要用分布式锁呢?
假设我需要一个定时操作,每天在某个点,我要处理一批数据,要先从数据库中查询出来(IO操作),然后再逐步处理数据(CPU操作)。这些数据之间没有关联关系,不需要考虑数据执行的先后顺序。
在单机情况下,做法是什么。开启一个定时任务,该任务中,查询数据库得到所有数据并加载到内存中。在内存中,我可以通过JUC的一些工具类,LinkedBlockingQueue保证线程安全的情况下移除元素,一个个的来处理。这是一种多线程的方案。
那么多进程呢,集群情况下,多台主机,该怎么处理。
分布式锁结合MQ实现解决方案。我说一下个人想法的一种解决方法,具体操作:
1.要定时处理数据,肯定是需要定时任务的。那么,三台主机都配置了定时任务。但是,从数据库中查询出数据的这一步我却只想执行一次,而不是三个主机各执行一次。在这里,就用到分布式锁了,我们可以使用tryLock方式,对其进行试图获取锁,同一个时刻,肯定只有一个任务获取到锁。剩下的两个获取失败就不执行了。我们就获取到这个数据。
2.在获取到这个数据后,可以选择将查询到数据推送至MQ,然后三台主机作为消费者,来有效提升系统的执行速率。
这是我内心中比较倾向的一种解决方式。性能相对高一点。
想想不高的,如同单机的这种方式,我们该怎么实现(不涉及中间件这种)。说一个相对耗性能的方案。也利用了分布式锁。
我要处理的数据,这个数据表中我加一个状态,0标识未处理,1标识处理。
1.加分布式锁。
2.查询出数据表中状态为0的单条数据。
3.做涉及到这条数据的一系列业务操作。
4.修改当前数据的状态为1。
5.解分布式锁。
有多少条数据就得加锁解锁多少次,以及查询多次数据库。性能的低下可想而之。
例子说完了,用代码来体现一下。分布式锁我采用的是reddision的方式。这里就为了代码简单,模拟一个火车票数量减少的操作。(数据虽然放到redis了,而且redis执行是单线程的,但可以java代码将减少这个操作的过程设置为线程不安全的操作,这个样例中我是这么做的)。
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
- <groupId>com.bogroupId>
- <artifactId>distributed-lock-threeartifactId>
- <version>0.0.1-SNAPSHOTversion>
- <name>distributed-lock-threename>
- <description>distributed-lock-threedescription>
-
- <properties>
- <java.version>1.8java.version>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
- <spring-boot.version>2.3.7.RELEASEspring-boot.version>
- properties>
-
- <dependencies>
- <dependency>
- <groupId>com.alibabagroupId>
- <artifactId>fastjsonartifactId>
- <version>2.0.17version>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-devtoolsartifactId>
- <version>${spring-boot.version}version>
- <scope>runtimescope>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <version>1.18.24version>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <version>${spring-boot.version}version>
- <scope>testscope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintagegroupId>
- <artifactId>junit-vintage-engineartifactId>
- exclusion>
- exclusions>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-autoconfigureartifactId>
- <version>${spring-boot.version}version>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframeworkgroupId>
- <artifactId>spring-contextartifactId>
- <version>5.2.12.RELEASEversion>
- dependency>
-
- <dependency>
- <groupId>org.redissongroupId>
- <artifactId>redissonartifactId>
- <version>3.17.7version>
- dependency>
- dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-dependenciesartifactId>
- <version>${spring-boot.version}version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- dependencies>
- dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <version>3.8.1version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- <encoding>UTF-8encoding>
- configuration>
- plugin>
- <plugin>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-maven-pluginartifactId>
- <version>2.3.7.RELEASEversion>
- <configuration>
- <mainClass>com.bo.distributedlockthree.DistributedLockThreeApplicationmainClass>
- configuration>
- <executions>
- <execution>
- <id>repackageid>
- <goals>
- <goal>repackagegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
-
- project>
- package com.bo.distributedlockthree.test.config;
-
-
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- /**
- * @Auther: zeroB
- * @Date: 2022/11/14 09:53
- * @Description:
- */
- @Component
- public class TestDistributedLockConfig {
- @Bean
- public RedissonClient redissonClient(){
- Config config = new Config();
- config.useSingleServer().setAddress("redis://192.168.140.130:6379").setPassword("2wsx@WSX");
- RedissonClient redissonClient = Redisson.create(config);
- return redissonClient;
- }
-
-
- }
- package com.bo.distributedlockthree.test.pojo;
-
- import java.io.Serializable;
-
- /**
- * @Auther: zeroB
- * @Date: 2022/11/14 11:10
- * @Description:
- */
- public class ResultVO
implements Serializable { - private boolean isSuccess;
-
- private String errorMsg;
-
- private T data;
-
- public boolean isSuccess() {
- return isSuccess;
- }
-
- public void setSuccess(boolean success) {
- isSuccess = success;
- }
-
- public String getErrorMsg() {
- return errorMsg;
- }
-
- public void setErrorMsg(String errorMsg) {
- this.errorMsg = errorMsg;
- }
-
- public T getData() {
- return data;
- }
-
- public void setData(T data) {
- this.data = data;
- }
- }
- package com.bo.distributedlockthree.test.action;
-
- import com.bo.distributedlockthree.test.config.properties.ApplicationUtils;
- import com.bo.distributedlockthree.test.pojo.ResultVO;
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.api.RAtomicLong;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Controller;
- import org.springframework.util.StringUtils;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.ResponseBody;
-
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * @Auther: zeroB
- * @Date: 2022/11/14 09:48
- * @Description:
- */
- @Controller
- @Slf4j
- public class TestAction {
- @Autowired
- private RedissonClient redissonClient;
-
- @Autowired
- private ApplicationUtils applicationUtils;
-
- /**
- * 在redis中添加共享数据
- */
- @RequestMapping("testAddData")
- @ResponseBody
- public void testAddData(){
- RAtomicLong atomicLong = redissonClient.getAtomicLong("ticketCount");
- long l = atomicLong.addAndGet(1000L);
- log.info("存放至redis中的数据为{}",l);
- }
-
-
- /**
- * 测试下分布式锁,多进程共享数据存放至redis中
- */
- @RequestMapping("/testLock")
- @ResponseBody
- public String testLock(){
-
- RLock testLock = redissonClient.getLock("testLock");
- long l = 0L;
- try{
- testLock.lock();
- RAtomicLong ticketCount = redissonClient.getAtomicLong("ticketCount");
- long l1 = ticketCount.get();
- //如果这里不加判定,是不会存在异常的,但加上的话,就有可能出现减为0的情况了
- if(l1 <= 0){
- log.info("销售完成,销售数量为{}",l1);
- throw new UnsupportedOperationException("销售完成,已不支持");
- }
- //这里感觉用过CAS了,本质上已经同步处理过了
- Thread.sleep(500);
- l = ticketCount.decrementAndGet();
- log.info("减去后的值为"+l);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- testLock.unlock();
- return String.valueOf(l);
- }
- }
-
- /**
- * 得到容器内存放所有元素
- * @param name 容器内元素名称
- * @return
- * TODO 前端序列化传输展示存在问题,先不考虑这个问题
- */
- @RequestMapping("/listContainerData")
- @ResponseBody
- public ResultVO
> listContainerData(String name){
- ResultVO
> listResultVO = new ResultVO<>();
- if(StringUtils.isEmpty(name)){
- log.info("查询出全部的容器内容");
- listResultVO.setData(applicationUtils.listBean());
- return listResultVO;
- }
- List
-
- //需要序列化
- objects.add(applicationUtils.getBean(name));
-
- listResultVO.setData(objects);
- return listResultVO;
- }
-
- /**
- * 分布式锁测试tryLock方案的问题
- * @return
- */
- @RequestMapping("tryLockTest")
- @ResponseBody
- public ResultVO
tryLockTest(){ - RLock testLock = redissonClient.getLock("testLock");
- ResultVO
resultVO = new ResultVO<>(); -
- long l1 = 0L;
- //相当于结合CAS的dowhile循环,只有获取到锁的情况下,执行了我要执行的任务,结束了,所有的请求我都要得到并处理
- //但这样,一直获取不到锁,进程饥渴的情况下,会一直占用CPU,阻塞的,这个得想想方案
- while(true){
- //这块应该得加锁,因为这个值结果和判断条件是相关的,假如我不放到锁中的话
- /**
- * 1.我得到count是0了
- * 2.此时其它进程得到系统的值然后操作
- * 3.此时实际是小于0了,但依旧正常操作
- */
- if(!testLock.tryLock()){
- continue;
- }
- try{
- RAtomicLong ticketCount = redissonClient.getAtomicLong("ticketCount");
- long l = ticketCount.get();
- if(l <= 0){
- log.info("售票已经执行完成了,售票结果为{}",l);
- throw new UnsupportedOperationException("售票已经执行完成了,售票结果为"+l);
- }
-
- Thread.sleep(100);
- l1 = ticketCount.decrementAndGet();
- log.info("销售完成的结果为{}",l1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- testLock.unlock();
- resultVO.setData(String.valueOf(l1));
- return resultVO;
- }
- }
-
- }
-
-
- }
上述的测试可以通过,只是一个简单样例,来理解一下它的思想。其实真正使用的时候吧,配置之类的肯定是不简单的。我也没有实际生产用过,只是先提供给大家一个思路。
附上Redisson官方文档地址,有中文实在太贴心了:
目录 · redisson/redisson Wiki · GitHubhttps://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95