分布式锁其实就是控制分布式系统中不同进程共同访问共享资源的一种锁的实现。在分布式系统中各个微服务都是独立部署在不同的服务器上,如果多个服务同时操作同一个共享资源的话,就不能像单体服务那样通过synchronized或者Lock等同步机制保证一个代码块在同一时间只能由一个线程访问来实现共享资源的安全性。因为分布式系统中的不同服务已经不在是多线程之间的并发访问了,而是属于多进程之间的并发访问,所以就需要一种更加高级的锁机制,来处理这种跨JVM进程之间的线程安全问题。
在 Java 中,实现分布式锁的方案有多种,常见的3中方案如下:
实现方式有以下几种:
(1)setnx + expire:这种方式加锁操作和设置超时时间是分开的。如果在执行完setnx加锁后,正要执行expire设置过期时间时,进程挂掉了,那这个锁就永远不会过期了。
(2)set的扩展命令:通过set(String key, String value, String nxxx, String expx, int time) 加锁的同时设置过期时间,再通过del(key)删除key。这种方式可能导致锁被别的线程误删,假设A获取锁后,由于业务还没执行完就过期释放了,然后立即就被B获取该锁执行业务逻辑,此时A执行完成后就会去释放这个锁,但此时这个锁已经被B占用了,也就是说A此时把B的锁给释放掉了。
(3)set的扩展命令+唯一值校验:通过set(String key, String value, String nxxx, String expx, int time) 加锁的同时设置过期时间,再通过Lua 脚本去根据唯一值删除key。这种方式可以解决误删除别人的锁问题,但是还是存在锁过期释放了,业务还没执行完的问题。
添加redis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.pool.maxTotal}")
private int maxTotal;
@Value("${spring.redis.pool.maxWait}")
private int maxWait;
@Value("${spring.redis.pool.maxIdle}")
private int maxIdle;
@Value("${spring.redis.pool.minIdle}")
private int minIdle;
@Value("${spring.redis.blockWhenExhausted}")
private Boolean blockWhenExhausted;
@Value("${spring.redis.JmxEnabled}")
private Boolean JmxEnabled;
/**
* 创建JedisPool实例
*
* @return JedisPool
*/
@RefreshScope
@Bean
public JedisPool jedisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMinIdle(minIdle);
jedisPoolConfig.setMaxWaitMillis(maxWait);
// 连接耗尽时是否阻塞, false报异常,true阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted);
// 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(JmxEnabled);
return new JedisPool(jedisPoolConfig, host, port, timeout, password);
}
/**
* redisTemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类
*
* @return redisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
@SuppressWarnings({ "rawtypes", "unchecked" })
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
@Slf4j
@Component
public class RedisUtil {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
private static final String LOCK_SUCCESS = "OK";
/**
* NX: 仅在键不存在时设置键
* XX: 只有在键已存在时才设置
*/
private static final String SET_IF_NOT_EXIST = "NX";
/**
* 过期时间单位
* EX: seconds
* PX: milliseconds
*/
private static final String SET_WITH_EXPIRE_TIME = "EX";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 尝试获取分布式锁
*
* @param lockKey 分布式锁的key,想要获取锁时,判断这个key是否存在于redis中,存在则说明获取分布式锁失败,否则成功获取锁
* @param requestId 每个请求的全局唯一id,用于释放锁时只能释放自己持有的锁
* @param expireTime 超期时间,单位:秒
* @return boolean 是否获取成功
*/
public boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
if (jedis == null) {
return false;
}
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
log.info("========================================获取分布式锁成功, lockKey is:{}, requestId is:{}", lockKey, requestId);
return true;
}
log.info("========================================获取分布式锁失败, lockKey is:{}, requestId is:{}", lockKey, requestId);
return false;
}
/**
* 释放分布式锁
*
* @param jedis Redis客户端
* @param lockKey 分布式锁的key
* @param requestId 每个请求的全局唯一id
* @return 是否释放成功
*/
public boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
if (jedis == null) {
log.info("========================================分布式锁释放失败,Jedis为空, lockKey is:{}, requestId is:{}", lockKey, requestId);
return false;
}
// 通过Lua 脚本保证只释放requestId对应的lockKey
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
log.info("========================================分布式锁释放成功, lockKey is:{}, requestId is:{}", lockKey, requestId);
return true;
}
log.info("========================================分布式锁释放失败, lockKey is:{}, requestId is:{}", lockKey, requestId);
return false;
}
}
/**
* 通过分布式锁来生成全局订单唯一的id
*
* @param type 订单类型
* @return String 订单唯一的id
*/
@Override
public String generate(String type) {
String orderId = null;
log.info("========================================要生成的订单类型为:{}", type);
if (StringUtils.isBlank(type)) {
return null;
}
// 开始获取分布式锁
Jedis jedis = jedisPool.getResource();
String lockKey = redisUtils.getRedisKey(RedisTemplateConstant.DISTRIBUTED_LOCK_KEY_TYPE, type);
String requestId = CommonUtil.getUUID();
try {
if (redisUtils.tryGetDistributedLock(jedis, lockKey, requestId, expireTime)) {
// 生成订单id
orderId = getOrderId(type);
}
} catch (Exception e) {
log.info("========================================组装id出错================================");
} finally {
// 释放分布式锁
redisUtils.releaseDistributedLock(jedis, lockKey, requestId);
if (jedis != null) {
jedis.close();
}
}
return orderId;
}
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.0</version>
</dependency>
@Slf4j
@Component
public class RedissonUtil {
@Resource
private Redisson redisson;
/**
* 加锁
*
* @param key 分布式锁的 key
* @param timeout 超时时间
* @param unit 时间单位
* @return
*/
public boolean tryLock(String key, long timeout, TimeUnit unit) {
RLock lock = redisson.getLock(key);
try {
return lock.tryLock(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 释放分布式锁
*
* @param key 分布式锁的 key
*/
public void unlock(String key) {
RLock lock = redisson.getLock(key);
lock.unlock();
}
}
<!-- 若使用redisTemplate作为分布式锁底层,则需要引入 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redis-template-spring-boot-starter</artifactId>
<version>2.2.4</version>
</dependency>
<!-- 若使用redisson作为分布式锁底层,则需要引入 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redisson-spring-boot-starter</artifactId>
<version>2.2.4</version>
</dependency>
@Slf4j
@RestController
@RequestMapping("/redisLock")
public class RedisLockController {
@Autowired
private LockTemplate lockTemplate;
/**
* 使用 lock4j 注解加锁
*/
@Lock4j(keys = {"#key"}, acquireTimeout = 1000, expire = 10000)
@GetMapping("/testAnnotate")
public R<String> testAnnotate(String key) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return R.ok(key);
}
/**
* 使用LockTemplate模板加锁
*/
@GetMapping("/testLock4jLockTemplate")
public R<String> testLock4jLockTemplate(String key) {
final LockInfo lockInfo = lockTemplate.lock(key, 30000L, 5000L, RedissonLockExecutor.class);
if (null == lockInfo) {
throw new RuntimeException("业务繁忙,请稍后再试!");
}
// 获取锁成功,处理业务
try {
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
//
}
System.out.println("当前线程:" + Thread.currentThread().getName());
} finally {
//释放锁
lockTemplate.releaseLock(lockInfo);
}
return R.ok(key);
}
}
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>latest</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>latest</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>latest</version>
</dependency>
spring:
zookeeper:
connect-string: localhost:8081
namespace: test
@Component
public class ZkLockUtil {
@Autowired
private CuratorFramework curatorFramework;
/**
* 获取分布式锁
*
* @param lockPath 锁路径
* @param waitTime 等待时间
* @param leaseTime 锁持有时间
* @param timeUnit 时间单位
* @return 锁对象
* @throws Exception 获取锁异常
*/
public InterProcessMutex acquire(String lockPath, long waitTime, long leaseTime, TimeUnit timeUnit) throws Exception {
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
if (!lock.acquire(waitTime, timeUnit)) {
throw new RuntimeException("获取分布式锁失败");
}
if (leaseTime > 0) {
lock.acquire(leaseTime, timeUnit);
}
return lock;
}
/**
* 释放分布式锁
*
* @param lock 锁对象
* @throws Exception 释放锁异常
*/
public void release(InterProcessMutex lock) throws Exception {
if (lock != null) {
lock.release();
}
}
}