随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
基于数据库实现分布式锁
基于缓存(Redis等)
基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
性能:redis最高
可靠性:zookeeper最高
这里,我们就基于redis实现分布式锁。
yum install httpd-tools
(提示: 保证当前 linux 是可以联网的)ab并发工具的使用
ab -n 1000 -c 100 -p ~/postfile -T application/x-www-form-urlencoded
http://192.168.79.1:8080/seckill/secKillServlet
指令说明
(1) ab 是并发工具程序
(2) -n 1000 表示一共发出 1000 次 http 请求
(3) -c 100 表示并发时 100 次, 你可以理解 1000 次请求, 会在 10 次发送完毕
(4) -p ~/postfile 表示发送请求时, 携带的参数从当前目录的 postfile 文件读取 (这个你
事先要准备好)
(5) -T application/x-www-form-urlencoded 就是发送数据的编码是 基于表单的 url 编
码
(6) ~的含义: https://blog.csdn.net/m0_67401134/article/details/123973115
setnx:只有建不存在时,才对建进行设置操作。设置成功返回true设置失败返回false
1.当多个客户端同时请求时,都会先去获取锁 setnx
2.获取成功,则执行业务逻辑,执行完成之后删除锁(del(“lock”))已完成锁的释放。
3.其他客户端等待重试,比如线程等待几秒后再次尝试获取锁
1.创建SpringBoot工程,引入相关依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.llpgroupId>
<artifactId>Springboot-redisartifactId>
<version>1.0-SNAPSHOTversion>
<parent>
<artifactId>spring-boot-starter-parentartifactId>
<groupId>org.springframework.bootgroupId>
<version>2.5.3version>
parent>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-pool2artifactId>
dependency>
dependencies>
project>
2.application.yaml
spring:
application:
name: spring-boot-redis
redis:
#redis集群配置,这里是在一台主机上模拟
cluster:
nodes:
- 192.168.79.201:6379
- 192.168.79.201:6380
- 192.168.79.201:6381
- 192.168.79.201:6390
- 192.168.79.201:6391
- 192.168.79.201:6389
connect-timeout: 6000
spring:
redis:
host: 192.168.79.201
port: 6379
#Redis 数据库索引(默认为 0)
database: 0
#连接超时时间(毫秒)
timeout: 1800000
lettuce:
pool:
#连接池最大连接数(使用负值表示没有限制)
max-active: 20
#最大阻塞等待时间(负数表示没限制)
max-wait: -1
#连接池中的最大空闲连接
min-idle: 0
#密码
#password: foobared
3.redis序列化配置
@EnableCaching
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
4.创建测试类,实现分布式锁
@RestController
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/testLock")
public void testLock(){
//1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok");
//2获取锁成功、查询num的值
if(lock){
/**********业务逻辑start**********/
//从缓存中获取num值
Object value = redisTemplate.opsForValue().get("num");
//如果值为空则直接返回
if(ObjectUtils.isEmpty(value)){
return;
}
//如果有值则转成int
int num = Integer.parseInt(value+"");
//把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
/**********业务逻辑end**********/
//释放锁
redisTemplate.delete("lock");
}else{
try {
//如果获取锁失败,则等待三秒再次尝试获取锁
Thread.sleep(3000);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用ab工具进行测试
ab -n 1000 -c 100 http://192.168.79.1:8080/redisTest/testLock
问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放。
以上面的代码为例,当redis缓存中不存在num时,value为空那么程序就不会取调用 redisTemplate.delete("lock");
删除锁,进而导致锁一直得不到释放。当别的请求打进来 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok");
始终返回false,无法正常执行业务逻辑。
/**********业务逻辑start**********/
//从缓存中获取num值
Object value = redisTemplate.opsForValue().get("num");
//如果值为空则直接返回
if(ObjectUtils.isEmpty(value)){
return;
}
//如果有值则转成int
int num = Integer.parseInt(value+"");
//把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
/**********业务逻辑end**********/
解决:设置过期时间,自动释放锁。
设置过期时间有两种方式:
首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
在setnx时指定过期时间(推荐)具备原子性
设置过期时间:
压力测试肯定也没有问题。自行测试
**问题:**可能会释放其他服务器的锁。
**场景:**如果业务逻辑的执行时间是7s。执行流程如下
index1业务逻辑没执行完,3秒后锁被自动释放。
index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
index3获取到锁,执行业务逻辑
index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。
最终等于没锁的情况。
**解决:**setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
@RestController
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/testLock")
public void testLock(){
//1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
//设置过期时间的长短根据业务执行的时间而定
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok",3, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
/**********业务逻辑start**********/
//从缓存中获取num值
Object value = redisTemplate.opsForValue().get("num");
//如果值为空则直接返回
if(ObjectUtils.isEmpty(value)){
return;
}
//如果有值则转成int
int num = Integer.parseInt(value+"");
//把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
/**********业务逻辑end**********/
//释放锁
redisTemplate.delete("lock");
}else{
try {
//如果获取锁失败,则等待三秒再次尝试获取锁
Thread.sleep(3000);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
存在的问题
@RestController
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/testLock")
public void testLock(){
String uuid = UUID.randomUUID().toString();
//1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
//设置过期时间的长短根据业务执行的时间而定
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
/**********业务逻辑start**********/
//从缓存中获取num值
Object value = redisTemplate.opsForValue().get("num");
//如果值为空则直接返回
if(ObjectUtils.isEmpty(value)){
return;
}
//如果有值则转成int
int num = Integer.parseInt(value+"");
//把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
/**********业务逻辑end**********/
//释放各自的锁
if(uuid.equals((String) redisTemplate.opsForValue().get("lock"))){
this.redisTemplate.delete("lock")
}
}else{
try {
//如果获取锁失败,则等待三秒再次尝试获取锁
Thread.sleep(3000);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
问题:删除操作缺乏原子性。
//编写方法,使用Redis分布式锁,完成对 key为num的+1操作
@GetMapping("/testLock")
public void testLock() {
//得到一个uuid值,作为锁的值
String uuid = UUID.randomUUID().toString();
//1. 获取锁/设置锁 key->lock : setnx
Boolean lock =
redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
if (lock) {//true, 说明获取锁/设置锁成功
//这个key为num的数据,事先要在Redis初始化
Object value = redisTemplate.opsForValue().get("num");
//1.判断返回的value是否有值
if (value == null || !StringUtils.hasText(value.toString())) {
return;
}
//2.有值,就将其转成int
int num = Integer.parseInt(value.toString());
//3.将num+1,再重新设置回去
redisTemplate.opsForValue().set("num", ++num);
//释放锁-lock
//为了防止误删除其它用户的锁,先判断当前的锁是不是前面获取到的锁,如果相同,再释放
//=====使用lua脚本, 控制删除原子性========
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值
// Arrays.asList("lock") 会传递给 script 的 KEYS[1] , uuid 会传递给ARGV[1]
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
//if (uuid.equals((String) redisTemplate.opsForValue().get("lock"))) {
// //...
// redisTemplate.delete("lock");
//}
//redisTemplate.delete("lock");
} else { //获取锁失败,休眠100毫秒,再重新获取锁/设置锁
try {
Thread.sleep(100);
testLock();//重新执行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1、定义锁的 key, key 可以根据业务, 分别设置,比如操作某商品, key 应该是为每个 sku 定
义的,也就是每个 sku 有一把锁
2、为了确保分布式锁可用,要确保锁的实现同时满足以下四个条件: