• Mysql分布式锁设计


            最近开发电商库存相关项目,其中最为重要的一个功能之一是分布式锁的实现。本文就项目组中用到的基于MySQL实现的分布式锁,做一些思考和总结。

    1、常见的分布式锁实现方案

    • 基于数据库实现的分布式锁
    • 基于Redis实现的分布式锁
    • 基于Zookeeper实现的分布式锁

            在讨论使用分布式锁的时候往往首先排除掉基于数据库的方案,本能的会觉得这个方案不够“高级”。从性能的角度考虑,基于数据库的方案性能确实不够优异,但就目前笔者所在项目组来说,几乎所有项目的项目都是基于MySQL实现的分布式锁,所以采用哪种方案是要基于使用场景来看的,选择哪种方案,合适最重要,本文也仅就MySQL实现分布式锁展开讨论。

    2、一把极为简单的MySQL分布式锁

            最容易想到的基于MySQL的分布式锁就是通过数据库的唯一键约束,来达到抢占锁资源的目的,本文也从这把最为简单的分布式锁讲起。在MySQL中创建一张表如下,为资源ID设置唯一键约束。

    1. CREATE TABLE `distribute_lock` (
    2. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
    3. `resource_id` varchar(64) NOT NULL DEFAULT '' COMMENT '资源id',
    4. `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    5. PRIMARY KEY (`id`),
    6. UNIQUE KEY `uniq_resourceId` (`resource_id`)
    7. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='分布式锁';

            当需要获取锁时,往数据库中一条记录,因为有唯一键约束,插入成功则代表获取到了锁。 

    INSERT INTO distribute_lock(`resource_id`) VALUES ( 'resource_01');

            释放锁时删除这条记录即可。

    DELETE FROM distribute_lock WHERE resource_id = 'resource_01';

    显然,这把锁因为太过简单,所以存在很多问题。

    • 没有锁失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
    • 不可重入,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
    • 非阻塞,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。

            当然还有比如锁可靠性完全依赖于数据库,这是基于MySQL分布式锁的必然缺陷,这里不做讨论。下面将一步步优化这把最为简单的锁。

    3、锻造一把好锁

    3.1、锁超时失效

            通常想到的方法是做一个定时清理过期资源的程序,每隔一定时间把数据库中的超时数据清理一遍。这种做法最为简单直接,但也有一些相应的弊端,比如增加了程序复杂性(需要专门实现并配置定时任务),锁的超时时间也不方便灵活配置。本文试图说明一种笔者认为更为合理的方式。

            前文所述的拿锁方式非常简单粗暴,数据插入不成功则拿锁失败,但此时数据库中的锁记录可能已经超时,所以需要在插数失败之后做进一步动作,以判断之前拿锁的线程是否已经超时。只需取出当前锁记录,比较锁记录时间与当前时间差值是否已超出锁等待时间,如未超出,获取锁失败,如超出,修改锁记录时间为当前时间,拿锁成功。代码如下:

    1. boolean tryLock(String resourceId) throws SQLException {
    2. //插入数据,插入成功则成功获取锁
    3. if (ensureLockExist(null, resourceId)) {
    4. return true;
    5. }
    6. //数据插入失败,判断锁是否超时,超时则修改锁记录时间
    7. return tryUpdateLockTime(resourceId);
    8. }
    9. protected boolean ensureLockExist(Connection connection, String resourceId) throws SQLException {
    10. try {
    11. createLock(connection, resourceId); // 插入锁记录
    12. } catch (MySQLIntegrityConstraintViolationException ex) {
    13. return false;
    14. }
    15. return true;
    16. }
    17. private boolean tryUpdateLockTime(String resourceId) throws SQLException {
    18. LockModel entity = getLock(null, resourceId);//查询已有记录
    19. if (entity != null && isLeaseExpired(entity)) {//记录存在并且已过期
    20. //基于乐观锁CAS修改锁记录时间,修改成功就拿到锁
    21. return trySetLockTime(null, resourceId, entity.createTime) > 0;
    22. }
    23. return false;
    24. }

            这里需要强调,修改锁记录时间必须通过CAS操作,因为可能存在多个线程同时争抢一把已经过期的锁,如果不通过CAS操作,可能多个线程同时获取到锁。上述只是代码片段,程序中可以提供方法给调用者灵活修改锁的超时等待时间,同时也不再需要专门配置定时清理过期记录的任务。

    3.2、锁可重入

            有了前面解决超时失效问题的思路,很容易想到的方案是在表中加个字段记录当前获得锁的机器和线程信息,当线程再次获取锁的时候先查询数据库,如果当前机器和线程信息在数据库可以查到的话,直接把锁分配给该线程即可。这种方式多了一步查询操作,对锁性能有一定影响,是否可以把成功获取锁的线程和其获取到的锁放到一个容器里呢?当某个线程需要拿锁时,先在容器中找下自己是否已经拿到过锁,拿到了那就不必和数据库打交道了。那就这么干,代码如下:

    1. class State {
    2. private Thread exclusiveOwner;//拿到锁的线程
    3. private volatile int state; //重入次数
    4. }
    5. //为保证线程安全,采用ConcurrentHashMap盛放成功获取锁的线程
    6. ConcurrentHashMap/* resourceId*/, State> states = new ConcurrentHashMap<>();
    7. public boolean tryLock(String resourceId) {
    8. Thread current = Thread.currentThread();
    9. State state = states.get(resourceId);//从容器中拿出锁记录
    10. if (Objects.nonNull(state)) {
    11. //锁记录不为空,且为当前线程,则重入获取锁成功
    12. if (state.exclusiveOwner == current) {
    13. state.state++;//重入次数+1
    14. return true;
    15. }
    16. return false;
    17. } else {
    18. try {
    19. State newState = new State(current, 1);
    20. if (states.putIfAbsent(resourceId, newState) == null) {//第一个往容器中放入锁记录成功
    21. if (node.tryLock(resourceId)) {
    22. return true;
    23. } else {
    24. states.remove(resourceId);
    25. }
    26. }
    27. return false;
    28. } catch (Exception ex) {
    29. throw new DLockException(true, resourceId, ex);
    30. }
    31. }
    32. }

            同样需要强调的是放入锁记录的容器必须是线程安全的,同时只有第一个往容器中成功添加所记录的线程,才能往数据库插入锁记录,很大程度上降低了争抢锁记录的线程与MySQL打交道的频率,能有效提升性能。

    3.3、阻塞式获取锁

            阻塞?搞一个while循环,直到tryLock成功?其实这也不失为一种解决方式,emm……总觉得不够优雅。而且通过轮训的方式,会占用较多的CPU资源。

            能否借助MySQL的悲观锁实现呢?借助 for update 关键字来给被查询的记录添加行锁中悲观锁,这样别的线程就没有办法对这条记录进行任何操作,从而达到保护共享资源的目的。

    select * from distribute_lock where resource_id = ? for update

    采用这种方式需要注意:

    • MySQL默认是会自动提交事务的,应该手动禁止一下: SET AUTOCOMMIT = 0;
    • 行锁是建立在索引的基础上的,如果查询时候不是走的索引的话,行锁会升级为表锁进行全表扫描;
    • 申请锁操作:先往数据库中插入一条锁记录,然后select * from distribute_lock where resource_id = ? for update; 只要可以查的出来就是申请成功的,没有获取到的会被阻塞,阻塞的超时时可以通过设置 MySQL 的 innodb_lock_wait_timeout 来进行设置。
    • 释放锁操作:COMMIT; 事务提交之后别的线程就可以查询到这条记录。
    1. boolean lock(String resourceId) throws SQLException {
    2. ensureLockExist(null, resourceId);//插入一条锁记录
    3. connection = dataSource.getConnection();
    4. autoCommit = connection.getAutoCommit();
    5. lockResource(connection, resourceId);
    6. return true;
    7. }
    8. boolean unlock(String resourceId) throws SQLException {
    9. if (connection != null) {
    10. connection.commit();//提交事务达到释放锁的目的
    11. connection.setAutoCommit(autoCommit);
    12. connection.close();
    13. connection = null;
    14. return true;
    15. }
    16. return false;
    17. }
    18. protected void lockResource(Connection connection, String resourceId) throws SQLException {
    19. PreparedStatementCreator creator = (conn) -> {
    20. conn.setAutoCommit(false);//关闭事务自动提交
    21. PreparedStatement ps = conn.prepareStatement("select * from distribute_lock where resource_id = ? for update");
    22. ps.setString(1, resourceId);
    23. return ps;
    24. };
    25. PreparedStatementCallback action = ps -> {
    26. ResultSet rs = ps.executeQuery();
    27. if (rs != null) {
    28. rs.close();
    29. }
    30. return null;
    31. };
    32. execute(connection, creator, action);
    33. }

            虽然在此提出了这种阻塞式获取锁的方式,但笔者并不推荐,这种方式实际应用中造成过生产事故,因为MySQL数据库会做查询优化,即便使用了索引,优化时发现全表扫效率更高,则可能会将行锁升级为表锁,此时会造成其他资源锁也无法获取。

            难道真的只能通过while循环的方式以达到阻塞式获取锁吗?笔者在库存系统开发过程中,接触到了一种更为巧妙的MySQL阻塞式获取锁的方式。

    4、基于MySQL阻塞式分布式锁

            通过在事务中插入或更新一条带唯一索引的记录,在事务未提交之前,其他线程事务会处于阻塞等待的状态,以达到阻塞式获取锁的目的。

    1. @Transactional(rollbackFor = Exception.class)
    2. public void executeWithLock(String resourceId) throws OspException {
    3. //获取锁
    4. tryLock(resourceId);
    5. // do something
    6. }
    7. "tryLock">
    8. INSERT INTO distribute_lock
    9. (resource_id)
    10. VALUES
    11. (#{resourceId})
    12. ON DUPLICATE key UPDATE update_time = NOW()

            其原理笔者查阅了下MySQL(RR事务级别)插入一条记录的执行流程如下:

            首先对插入的间隙加插入意向锁(Insert Intension Locks)如果该间隙已被加上了间隙锁或 Next-Key 锁,则加锁失败进入等待;如果没有,则加锁成功,表示可以插入;

            然后判断插入记录是否有唯一键,如果有,则进行唯一性约束检查如果不存在相同键值,则完成插入。如果存在相同键值,则判断该键值是否加锁,如果没有锁, 判断该记录是否被标记为删除,如果标记为删除,说明事务已经提交,还没来得及 purge,这时加 S 锁等待;如果没有标记删除,则报 duplicate key 错误;如果有锁,说明该记录正在处理(新增、删除或更新),且事务还未提交,加 S 锁等待;

            这种方式是否会造成死锁呢?欢迎留言,避免笔者在项目中踩坑。

    作者:西门吹雪

  • 相关阅读:
    JVM | 第1部分:自动内存管理与性能调优《深入理解 Java 虚拟机》
    基于YOLOv8的车辆跟踪与车速计算应用
    java健身房管理系统设计计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    C++ STL & 标准库
    Pytorch Pytorch+深度学习神经网络相关学习收获
    ES6中的WeakMap和WeakSet:特性和用途
    【VulnHub靶场】——BOREDHACKERBLOG: SOCIAL NETWORK
    【Kafka】Docker安装kafka、搭建kafka集群
    JavaScript进阶(二十九): 走近 es6 之 new.target
    MySQL复合查询
  • 原文地址:https://blog.csdn.net/vipshop_fin_dev/article/details/126691317