为了保证一个方法在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
1、处理效率提升:应用分布式锁,可以减少重复任务的执行,避免资源处理效率的浪费;
2、数据准确性保障:使用分布式锁可以放在数据资源的并发访问,避免数据不一致情况,甚至数据损失等。
例如:
思路:Redis实现分布式锁基于SetNx命令,因为在Redis中key是保证是唯一的。所以当多个线程同时的创建setNx时,只要谁能够创建成功谁就能够获取到锁。
Set 命令: 每次 set 时,可以修改原来旧值;
SetNx命令:每次SetNx检查该 key是否已经存在,如果已经存在的话不会执行任何操作。返回为0 如果已经不存在的话直接新增该key。
1:新增key成功, 0:失败
获取锁的时候:当多个线程同时创建SetNx k,只要谁能够创建成功谁就能够获取到锁。
释放锁:可以对该key设置一个有效期可以避免死锁的现象。
1、增加maven依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
2、编写Jedis连接Redis工具类
public class RedisClientUtil {
//protected static Logger logger = Logger.getLogger(RedisUtil.class);
private static String IP = "www.kaicostudy.com";
//Redis的端口号
private static int PORT = 6379;
//可用连接实例的最大数目,默认值为8;
//如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 100;
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 20;
//等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 3000;
private static int TIMEOUT = 3000;
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
private static boolean TEST_ON_BORROW = true;
//在return给pool时,是否提前进行validate操作;
private static boolean TEST_ON_RETURN = true;
private static JedisPool jedisPool = null;
/**
* redis过期时间,以秒为单位
*/
public final static int EXRP_HOUR = 60 * 60; //一小时
public final static int EXRP_DAY = 60 * 60 * 24; //一天
public final static int EXRP_MONTH = 60 * 60 * 24 * 30; //一个月
/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWaitMillis(MAX_WAIT);
config.setTestOnBorrow(TEST_ON_BORROW);
jedisPool = new JedisPool(config, IP, PORT, TIMEOUT, "123456");
} catch (Exception e) {
//logger.error("First create JedisPool error : "+e);
e.getMessage();
}
}
/**
* 在多线程环境同步初始化
*/
private static synchronized void poolInit() {
if (jedisPool == null) {
initialPool();
}
}
/**
* 同步获取Jedis实例
*
* @return Jedis
*/
public synchronized static Jedis getJedis() {
if (jedisPool == null) {
poolInit();
}
Jedis jedis = null;
try {
if (jedisPool != null) {
jedis = jedisPool.getResource();
}
} catch (Exception e) {
e.getMessage();
// logger.error("Get jedis error : "+e);
}
return jedis;
}
/**
* 释放jedis资源
*
* @param jedis
*/
public static void returnResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}
public static Long sadd(String key, String... members) {
Jedis jedis = null;
Long res = null;
try {
jedis = getJedis();
res = jedis.sadd(key, members);
} catch (Exception e) {
//logger.error("sadd error : "+e);
e.getMessage();
}
return res;
}
}
3、编写Redis锁的工具类
public class RedisLock {
private static final int setnxSuccss = 1;
/**
* 获取锁
*
* @param lockKey 定义锁的key
* @param notLockTimeOut 没有获取锁的超时时间
* @param lockTimeOut 使用锁的超时时间
* @return
*/
public String getLock(String lockKey, int notLockTimeOut, int lockTimeOut) {
// 获取Redis连接
Jedis jedis = RedisClientUtil.getJedis();
// 定义没有获取锁的超时时间
Long endTimeOut = System.currentTimeMillis() + notLockTimeOut;
while (System.currentTimeMillis() < endTimeOut) {
String lockValue = UUID.randomUUID().toString();
// 如果在多线程情况下谁能够setnx 成功返回0 谁就获取到锁
if (jedis.setnx(lockKey, lockValue) == setnxSuccss) {
jedis.expire(lockKey, lockTimeOut / 1000);
return lockValue;
}
// 否则情况下 在超时时间内继续循环
}
try {
if (jedis != null) {
jedis.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 释放锁 其实就是将该key删除
*
* @return
*/
public Boolean unLock(String lockKey, String lockValue) {
Jedis jedis = RedisClientUtil.getJedis();
// 确定是对应的锁 ,才删除
if (lockValue.equals(jedis.get(lockKey))) {
return jedis.del(lockKey) > 0 ? true : false;
}
return false;
}
}
4、测试方法
private RedisLock redisLock = new RedisLock();
private String lockKey = "kaico_lock";
/**
* 测试Jedis实现分布式锁
* @return
*/
@GetMapping("/restLock1")
public String restLock1(){
// 1.获取锁
String lockValue = redisLock.getLock(lockKey, 5000, 5000);
if (StringUtils.isEmpty(lockValue)) {
System.out.println(Thread.currentThread().getName() + ",获取锁失败!");
return "获取锁失败";
}
// 2.获取锁成功执行业务逻辑
System.out.println(Thread.currentThread().getName() + ",获取成功,lockValue:" + lockValue);
// 3.释放lock锁
redisLock.unLock(lockKey, lockValue);
return "";
}
依赖于之前的项目
1、编写锁的工具类方法
@Component
public class SpringbootRedisLockUtil {
@Autowired
public RedisTemplate redisTemplate;
// 解锁原子性操作脚本
public static final String unlockScript="if redis.call(\"get\",KEYS[1]) == ARGV[1]\n"
+ "then\n"
+ " return redis.call(\"del\",KEYS[1])\n"
+ "else\n"
+ " return 0\n"
+ "end";
/**
* 加锁,有阻塞
* @param name
* @param expire
* @param timeout
* @return
*/
public String lock(String name, long expire, long timeout) throws UnsupportedEncodingException {
long startTime=System.currentTimeMillis();
String token;
do{
token=tryLock(name,expire);
if(token==null){
//设置等待时间,若等待时间过长则获取锁失败
if((System.currentTimeMillis()-startTime)>(timeout-50)){
break;
}
try {
Thread.sleep(50);//try it again per 50
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}while (token==null);
return token;
}
/**
* 解锁
* @param name
* @param token
* @return
*/
public Boolean unlock(String name, String token) throws UnsupportedEncodingException {
byte[][] keyArgs=new byte[2][];
keyArgs[0]= name.getBytes(Charset.forName("UTF-8"));
keyArgs[1]= token.getBytes(Charset.forName("UTF-8"));
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
RedisConnection connection = connectionFactory.getConnection();
try{
Long result = connection.scriptingCommands().eval(unlockScript.getBytes(Charset.forName("UTF-8")), ReturnType.INTEGER, 1, keyArgs);
if(result!=null&&result>0){
return true;
}
}finally {
RedisConnectionUtils.releaseConnection(connection,connectionFactory);
}
return false;
}
/**
* 加锁,无阻塞
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) throws UnsupportedEncodingException {
String token= UUID.randomUUID().toString();
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
RedisConnection connection = connectionFactory.getConnection();
try{
Boolean result = connection.set(name.getBytes(Charset.forName("UTF-8")), token.getBytes(Charset.forName("UTF-8")),
Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT);
if(result!=null&&result){
return token;
}
}
finally {
RedisConnectionUtils.releaseConnection(connection,connectionFactory);
}
return null;
}
}
2、测试类
@Autowired
private SpringbootRedisLockUtil springbootRedisLockUtil;
@PostMapping("/restLock1")
public void restLock2() throws UnsupportedEncodingException {
String token;
token=springbootRedisLockUtil.lock(Thread.currentThread().getName(),1000,11000);
if(token!=null){
System.out.println("我拿到锁了哦!");
}
else{
System.out.println("我没有拿到锁!");
}
springbootRedisLockUtil.unlock(Thread.currentThread().getName(),token);
}
等搭建完集群版本再实现