单体应用下共享资源的操作需要使用本地锁(synchronized、reentrantLock),来控制共享资源访问问题。随着分布式的快速发展,本地锁已经无法解决并发问题,需要一种跨服务、跨JVM的锁机制:分布式锁。
分布式锁实现方式
通常把锁和应用分开部署,把这个锁做成公共组件,多个应用多个节点去访问这个组件,来解决共享资源的访问。
对于一些并发量不是很高的场景,可使用Mysql来实现分布式锁会比较精简且巧妙。
乐观锁实现
一般是通过为数据库表添加"version"版本号字段,实现读取出数据时,取出版本号,之后更新时,对此版本号+1,在更新的时候,会对版本号进行比较,如果版本号一致,则更新数据,否则更新失败。
悲观锁实现
基于排它锁(写锁) 实现,只允许获取锁的事务对数据进行更新或删除操作,其它事务对数据只能进行读操作,不能更新和删除操作。
在Mysql中,若基于排它锁实现行级锁,则需要对表中索引列加锁,否则的话,排它锁就属于表级锁。
关键SQL:select … from XX for update
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
(1)创建表
CREATE TABLE `dcs_lock` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`lock_name` varchar(64) CHARACTER SET utf8 NOT NULL DEFAULT '' COMMENT '锁名',
`desc` varchar(1024) CHARACTER SET utf8 NOT NULL DEFAULT '备注信息' COMMENT '描述信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `uidx_lock_name`(`lock_name`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 COMMENT = '分布式锁';
(2)实现代码
@Configuration
public class MysqlLock {
@Resource
DataSource dataSource;
public void lock(Supplier supplier, String lockName) {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement("select * from dcs_lock where lock_name = '"+lockName+"' for update");
preparedStatement.execute();
//执行业务操作
String res = supplier.get();
System.out.println(res);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (connection != null) {
try {
//提交事务,释放锁
connection.commit();
//恢复自动提交
connection.setAutoCommit(true);
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
}
}
}
(3)使用示例
MysqlLock mysqlLock = new MysqlLock();
String lockName = "test";
mysqlLock.lock(() -> {
System.out.println("执行业务操作");
return "SUCCESS";
},lockName);
基于Redis实现分布式锁是平时开发中使用率较多的方式,从自己平时的积累和网上众多优质博客中,总结共计有以下几实现方案:
(1)setnx + expire 结合
SETNX 是SET IF NOT EXISTS的简写。日常命令格式是SETNX key value,如果 key不存在,则SETNX成功返回1,如果这个key已经存在了,则返回0。
但是这个方案存在缺陷,setnx 和 expire 两个命令是分开的,不是原子操作。如果执行完 setnx 加锁,正要执行 expire 设置过期时间时,程序宕机崩溃,导致锁没有设置过期时间,那么将会产生死锁。
(2) setnx,value = 过期时间
为解决方案(1)的缺陷,我们可以将过期时间放在value里面,但是过期时间是客户端生成的,所以需要保证所有客户端时间同步,还有一个问题,该锁没有保存持有者的唯一标识,可能被别的客户端释放/解锁。
(3)lua脚本
lua脚本可以保证 setnx 和 expire 两条指令的原子性,lua脚本如下:
if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then
redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end;
(4)set 扩展命令
相对于lua脚本,更好理解的便是,redis 的 set 指令扩展参数,也是有原子性的。
SET key value[EX seconds][PX milliseconds][NX|XX]
这种方案可能还存在问题,当业务还没执行完,锁过期释放了,这种临界问题咋处理呢?难道只是把过期时间设置长一点就可以啦?
(5)redisson 开源框架
Redisson - 是一个高级的分布式协调Redis客服端,能帮助用户在分布式环境中轻松实现一些Java的对象。当然,既然说到这里啦,Redission框架肯定帮我们解决了这些问题。
Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。附一张Redission底层原理图:
(6)集群下 redLock 实现
上面几种方案都是基于单机Redis实现的,没有考虑到集群部署的问题,在集群模式时由于主从复制延迟,或者主节点宕机,造成锁丢失或者解锁延迟的现象。
RedLock的实现步骤:
既然有现成的框架可以直接使用,那么下面简单举例Springboot集成Redisson为例(单机模式下)。
(1)引入依赖
org.redisson
redisson-spring-boot-starter
3.15.6
复制代码
(2)redisson 配置
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.TransportMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://127.0.0.1:6379");
singleServerConfig.setPassword("123456");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
(3)测试客户端
@Resource
private RedissonClient redissonClient;
public void lockTest(){
RLock lock = redissonClient.getLock("LOCK_KEY");
try {
if (!lock.tryLock(10, TimeUnit.SECONDS)) {
System.out.println("锁失败");
}
//模拟
Thread.sleep(1000);
System.out.println("操作业务逻辑");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
Zookeeper是基于树形结构实现分布式锁,它有四种类型节点:
Zookeeper分布式锁恰恰应用了临时顺序节点。
Zookeeper实现分布式锁流程
Curator就是Netflix开源的一套ZooKeeper客户端框架,它提供了zk场景的绝大部分实现,使用Curator就不必关心其内部算法,Curator提供了来实现分布式锁,用方法获取锁,以及用方法释放锁,同其他锁一样,方法需要放在finally代码块中,确保锁能正确释放。
(1)引入依赖
org.apache.curator
curator-framework
4.2.0
org.apache.curator
curator-recipes
4.2.0
(2)配置文件
curator:
connectionTimeoutMs: 5000
elapsedTimeMs: 5000
retryCount: 3
sessionTimeoutMs: 60000
connectString: 127.0.0.1:2181
(3)配置类
import lombok.Data;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "curator")
@Data
public class CuratorConfig {
private int retryCount;
private int elapsedTimeMs;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
@Bean
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(elapsedTimeMs, retryCount);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(retryPolicy)
.build();
return curatorFramework;
}
}
(4)测试类
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class LockClient {
@Autowired
private CuratorFramework curatorFramework;
public void lockTest(){
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/zk-lock");
try {
// 获取锁
lock.acquire();
System.out.println("业务处理逻辑");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
方案 | 理解程度 | 实现程度 | 性能 | 可靠性 |
---|---|---|---|---|
基于数据库 | 容易 | 复杂 | 差 | 不可靠 |
基于redis | 一般 | 一般 | 高 | 可靠 |
基于zookeeper | 难 | 简单 | 一般 | 一般 |