• 玩转Mysql系列 - 第26篇:聊聊mysql如何实现分布式锁?


    这是Mysql系列第26篇。

    本篇我们使用mysql实现一个分布式锁

    分布式锁的功能

    1. 分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作

    2. 锁具有重入的功能:即一个使用者可以多次获取某个锁

    3. 获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败

    4. 能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁

    预备技能:乐观锁

    通常我们修改表中一条数据过程如下:

    1. t1select获取记录R1
    2. t2:对R1进行编辑
    3. t3:update R1

    我们来看一下上面的过程存在的问题:

    如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。

    我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:

    1. t1:打开事务start transaction
    2. t2select获取记录R1,声明变量v=R1.version
    3. t3:对R1进行编辑
    4. t4:执行更新操作
    5.     update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
    6. t5:t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚
    7.     if(count==1){
    8.         //提交事务
    9.         commit;
    10.     }else{
    11.         //回滚事务
    12.         rollback;
    13.     }

    上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。

    上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。

    使用mysql实现分布式锁

    建表

    我们创建一个分布式锁表,如下

    1. DROP DATABASE IF EXISTS javacode2018;
    2. CREATE DATABASE javacode2018;
    3. USE javacode2018;
    4. DROP TABLE IF EXISTS t_lock;
    5. create table t_lock(
    6.   lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
    7.   request_id varchar(64NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
    8.   lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
    9.   timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
    10.   version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
    11. )COMMENT '锁信息表';
    分布式锁工具类:
    1. package com.itsoku.sql;
    2. import lombok.Builder;
    3. import lombok.Getter;
    4. import lombok.Setter;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.junit.Test;
    7. import java.sql.*;
    8. import java.util.Objects;
    9. import java.util.UUID;
    10. import java.util.concurrent.TimeUnit;
    11. /**
    12.  * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
    13.  * 喜欢的请关注公众号:路人甲Java
    14.  */
    15. @Slf4j
    16. public class LockUtils {
    17.     //将requestid保存在该变量中
    18.     static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
    19.     /**
    20.      * 获取当前线程requestid
    21.      *
    22.      * @return
    23.      */
    24.     public static String getRequestId() {
    25.         String requestId = requestIdTL.get();
    26.         if (requestId == null || "".equals(requestId)) {
    27.             requestId = UUID.randomUUID().toString();
    28.             requestIdTL.set(requestId);
    29.         }
    30.         log.info("requestId:{}", requestId);
    31.         return requestId;
    32.     }
    33.     /**
    34.      * 获取锁
    35.      *
    36.      * @param lock_key        锁key
    37.      * @param locktimeout(毫秒) 持有锁的有效时间,防止死锁
    38.      * @param gettimeout(毫秒)  获取锁的超时时间,这个时间内获取不到将重试
    39.      * @return
    40.      */
    41.     public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {
    42.         log.info("start");
    43.         boolean lockResult = false;
    44.         String request_id = getRequestId();
    45.         long starttime = System.currentTimeMillis();
    46.         while (true) {
    47.             LockModel lockModel = LockUtils.get(lock_key);
    48.             if (Objects.isNull(lockModel)) {
    49.                 //插入一条记录,重新尝试获取锁
    50.                 LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());
    51.             } else {
    52.                 String reqid = lockModel.getRequest_id();
    53.                 //如果reqid为空字符,表示锁未被占用
    54.                 if ("".equals(reqid)) {
    55.                     lockModel.setRequest_id(request_id);
    56.                     lockModel.setLock_count(1);
    57.                     lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
    58.                     if (LockUtils.update(lockModel) == 1) {
    59.                         lockResult = true;
    60.                         break;
    61.                     }
    62.                 } else if (request_id.equals(reqid)) {
    63.                     //如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
    64.                     lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
    65.                     lockModel.setLock_count(lockModel.getLock_count() + 1);
    66.                     if (LockUtils.update(lockModel) == 1) {
    67.                         lockResult = true;
    68.                         break;
    69.                     }
    70.                 } else {
    71.                     //锁不是自己的,并且已经超时了,则重置锁,继续重试
    72.                     if (lockModel.getTimeout() < System.currentTimeMillis()) {
    73.                         LockUtils.resetLock(lockModel);
    74.                     } else {
    75.                         //如果未超时,休眠100毫秒,继续重试
    76.                         if (starttime + gettimeout > System.currentTimeMillis()) {
    77.                             TimeUnit.MILLISECONDS.sleep(100);
    78.                         } else {
    79.                             break;
    80.                         }
    81.                     }
    82.                 }
    83.             }
    84.         }
    85.         log.info("end");
    86.         return lockResult;
    87.     }
    88.     /**
    89.      * 释放锁
    90.      *
    91.      * @param lock_key
    92.      * @throws Exception
    93.      */
    94.     public static void unlock(String lock_key) throws Exception {
    95.         //获取当前线程requestId
    96.         String requestId = getRequestId();
    97.         LockModel lockModel = LockUtils.get(lock_key);
    98.         //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
    99.         if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
    100.             if (lockModel.getLock_count() == 1) {
    101.                 //重置锁
    102.                 resetLock(lockModel);
    103.             } else {
    104.                 lockModel.setLock_count(lockModel.getLock_count() - 1);
    105.                 LockUtils.update(lockModel);
    106.             }
    107.         }
    108.     }
    109.     /**
    110.      * 重置锁
    111.      *
    112.      * @param lockModel
    113.      * @return
    114.      * @throws Exception
    115.      */
    116.     public static int resetLock(LockModel lockModel) throws Exception {
    117.         lockModel.setRequest_id("");
    118.         lockModel.setLock_count(0);
    119.         lockModel.setTimeout(0L);
    120.         return LockUtils.update(lockModel);
    121.     }
    122.     /**
    123.      * 更新lockModel信息,内部采用乐观锁来更新
    124.      *
    125.      * @param lockModel
    126.      * @return
    127.      * @throws Exception
    128.      */
    129.     public static int update(LockModel lockModel) throws Exception {
    130.         return exec(conn -> {
    131.             String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";
    132.             PreparedStatement ps = conn.prepareStatement(sql);
    133.             int colIndex = 1;
    134.             ps.setString(colIndex++, lockModel.getRequest_id());
    135.             ps.setInt(colIndex++, lockModel.getLock_count());
    136.             ps.setLong(colIndex++, lockModel.getTimeout());
    137.             ps.setString(colIndex++, lockModel.getLock_key());
    138.             ps.setInt(colIndex++, lockModel.getVersion());
    139.             return ps.executeUpdate();
    140.         });
    141.     }
    142.     public static LockModel get(String lock_key) throws Exception {
    143.         return exec(conn -> {
    144.             String sql = "select * from t_lock t WHERE t.lock_key=?";
    145.             PreparedStatement ps = conn.prepareStatement(sql);
    146.             int colIndex = 1;
    147.             ps.setString(colIndex++lock_key);
    148.             ResultSet rs = ps.executeQuery();
    149.             if (rs.next()) {
    150.                 return LockModel.builder().
    151.                         lock_key(lock_key).
    152.                         request_id(rs.getString("request_id")).
    153.                         lock_count(rs.getInt("lock_count")).
    154.                         timeout(rs.getLong("timeout")).
    155.                         version(rs.getInt("version")).build();
    156.             }
    157.             return null;
    158.         });
    159.     }
    160.     public static int insert(LockModel lockModel) throws Exception {
    161.         return exec(conn -> {
    162.             String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
    163.             PreparedStatement ps = conn.prepareStatement(sql);
    164.             int colIndex = 1;
    165.             ps.setString(colIndex++, lockModel.getLock_key());
    166.             ps.setString(colIndex++, lockModel.getRequest_id());
    167.             ps.setInt(colIndex++, lockModel.getLock_count());
    168.             ps.setLong(colIndex++, lockModel.getTimeout());
    169.             ps.setInt(colIndex++, lockModel.getVersion());
    170.             return ps.executeUpdate();
    171.         });
    172.     }
    173.     public static <T> T exec(SqlExec<T> sqlExec) throws Exception {
    174.         Connection conn = getConn();
    175.         try {
    176.             return sqlExec.exec(conn);
    177.         } finally {
    178.             closeConn(conn);
    179.         }
    180.     }
    181.     @FunctionalInterface
    182.     public interface SqlExec<T> {
    183.         T exec(Connection conn) throws Exception;
    184.     }
    185.     @Getter
    186.     @Setter
    187.     @Builder
    188.     public static class LockModel {
    189.         private String lock_key;
    190.         private String request_id;
    191.         private Integer lock_count;
    192.         private Long timeout;
    193.         private Integer version;
    194.     }
    195.     private static final String url = "jdbc:mysql://localhost:3306/javacode2018?useSSL=false";        //数据库地址
    196.     private static final String username = "root";        //数据库用户名
    197.     private static final String password = "root123";        //数据库密码
    198.     private static final String driver = "com.mysql.jdbc.Driver";        //mysql驱动
    199.     /**
    200.      * 连接数据库
    201.      *
    202.      * @return
    203.      */
    204.     public static Connection getConn() {
    205.         Connection conn = null;
    206.         try {
    207.             Class.forName(driver);  //加载数据库驱动
    208.             try {
    209.                 conn = DriverManager.getConnection(url, username, password);  //连接数据库
    210.             } catch (SQLException e) {
    211.                 e.printStackTrace();
    212.             }
    213.         } catch (ClassNotFoundException e) {
    214.             e.printStackTrace();
    215.         }
    216.         return conn;
    217.     }
    218.     /**
    219.      * 关闭数据库链接
    220.      *
    221.      * @return
    222.      */
    223.     public static void closeConn(Connection conn) {
    224.         if (conn != null) {
    225.             try {
    226.                 conn.close();  //关闭数据库链接
    227.             } catch (SQLException e) {
    228.                 e.printStackTrace();
    229.             }
    230.         }
    231.     }
    232. }

    上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock

    测试用例
    1. package com.itsoku.sql;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.junit.Test;
    4. import static com.itsoku.sql.LockUtils.lock;
    5. import static com.itsoku.sql.LockUtils.unlock;
    6. /**
    7.  * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
    8.  * 喜欢的请关注公众号:路人甲Java
    9.  */
    10. @Slf4j
    11. public class LockUtilsTest {
    12.     //测试重复获取和重复释放
    13.     @Test
    14.     public void test1() throws Exception {
    15.         String lock_key = "key1";
    16.         for (int i = 0; i < 10; i++) {
    17.             lock(lock_key10000L, 1000);
    18.         }
    19.         for (int i = 0; i < 9; i++) {
    20.             unlock(lock_key);
    21.         }
    22.     }
    23.     //获取之后不释放,超时之后被thread1获取
    24.     @Test
    25.     public void test2() throws Exception {
    26.         String lock_key = "key2";
    27.         lock(lock_key5000L, 1000);
    28.         Thread thread1 = new Thread(() -> {
    29.             try {
    30.                 try {
    31.                     lock(lock_key5000L, 7000);
    32.                 } finally {
    33.                     unlock(lock_key);
    34.                 }
    35.             } catch (Exception e) {
    36.                 e.printStackTrace();
    37.             }
    38.         });
    39.         thread1.setName("thread1");
    40.         thread1.start();
    41.         thread1.join();
    42.     }
    43. }

    test1方法测试了重入锁的效果。

    test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了。

    留给大家一个问题

    上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。

  • 相关阅读:
    Python数值求解微分方程(欧拉法,隐式欧拉)
    源码分析Mybatis拦截器(Interceptor)拦截saveBatch()获取不到实体id的原因
    卷妹带你回顾Java基础(一)每日更新Day4
    手把手教程6-2: F460把debug printf功能改到UART1
    Springboot毕设项目基于javaweb电费管理系统v39ge(java+VUE+Mybatis+Maven+Mysql)
    做自媒体一个重要的问题就是:自己的定位是什么?
    Nature Microbiology | 可感染阿斯加德古菌的六种深海沉积物中的病毒基因组
    Spring事件机制之ApplicationEvent
    MySQL数据库-数据表(下)
    [附源码]JAVA毕业设计计算机类课程实验平台(系统+LW)
  • 原文地址:https://blog.csdn.net/weixin_46228112/article/details/133378204