• 服了,头条4面:因为一个问题问题砍了我10万薪水


    今天我们来探讨一下分布式锁的4种实现:

    1、通过MySQL实现分布式锁

    2、通过redis实现分布式锁

    3、通过zookeeper实现分布式锁

    4、通过etcd实现分布式锁

    1、什么是分布式锁?

    如何确保共享资源在同一时刻只能被一个线程访问?

    大家可能觉得这个很简单吧,在一个jvm中,通过synchronized或者ReentrantLock是很容易实现的。

    确实,单个jvm中确实没有问题。

    但是,通常我们的系统会采用集群的方式部署,此时集群中的每个节点都是一个jvm环境,那么通过synchronized或者ReentrantLock是无法解决共享资源访问的问题了。

    此时就要用到分布式锁了:分布式锁就是解决分布式环境中共享资源顺序访问的问题,同一时刻,集群中所有节点中,只允许有一个线程可以访问共享资源。

    2、分布式锁的功能

    1. 分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作
    2. 同一时刻所有机器中只有一个使用者可以获取到分布式锁
    3. 锁具有重要的功能:即一个使用者可以多次获取某个分布式锁
    4. 获取锁的过程允许指定定时功能:在指定的时间内尝试获取锁,过了超时时间,若还未获取到锁,则获取失败
    5. 防止死锁:如:A机器获取锁之后,在释放锁之前,A机器挂了,导致锁未释放,结果锁一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决;解决方式:持有锁的时候可以加个持有超时时间,超过了这个时间锁将自动释放,此时其他机器将有机会获取锁

    下面我们来看一下分布式锁的4种实现。

    3、方式1:数据库的方式

    3.1、原理

    锁的获取过程

    假如:一个集群环境中有n个系统,每个系统中有一个jvm,每个jvm中有m个线程去获取分布式锁,那么同时可能就有n*m个线程去获取分布式锁,此时分布式锁的压力是比较大的,每个jvm中多个线程同时去获取锁其实是没有意义的,可以在每个jvm中先加一把本地的锁,获取分布式锁之前需要先获取jvm本地的锁,本地锁获取成功之后,才可以尝试获取分布式锁,此时n个系统中最多有n个线程尝试获取分布式锁,获取锁的步骤主要2步:

    1. 1、先尝试获取jvm本地锁
    2. 2jvm本地锁获取成功之后尝试获取分布式锁

    超时时间

    获取锁的时候可以传递获取锁最大等待时间,在指定的时间内多次尝试获取锁,获取失败之后,休眠一会,再继续尝试获取,直到时间耗尽。

    锁有效期

    获取锁的时候需要指定有效期,有效期就是获取锁之后,使用者希望使用多长时间,为什么需要有效期?

    如果没有有效期,当使用者获取成功之后,系统突然down机了,那么这个锁就无法释放,其他线程就再也无法获取到这个锁了。

    所以需要有有效期,超过了有效期,锁将失效,其他线程将可以尝试获取锁。

    锁续命

    什么是锁续命?

    比如:使用者获取锁的时候,指定有效期是5分钟,但是5分钟之后,使用者事情还未干完,还想继续使用一会,那么可以使用续命功能,延迟锁的有效期。

    可以启动一个子线程,自动完成续命的操作,比如:原本有效期是5分钟,当使用4分钟的时候,续命2分钟,那么有效期是7分钟,这个比较简单,大家可以随意发挥。

    3.2、准备sql

    1. create table t_lock(
    2.   lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
    3.   request_id varchar(64NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
    4.   lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
    5.   timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
    6.   version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
    7. )COMMENT '锁信息表';

    注意:表中有个版本号字段,版本号主要用于乐观锁的方式更新数据,确保并发情况下更新数据的正确性。

    3.3、锁工具类代码

    代码比较简单,大家主要看获取锁的lock方法和释放锁的unlock方法,注释比较详细,大家看看就懂了。

    代码中的重点是更新数据的时候,通过比对版本号,采用cas的方式,确保并发情况下更新数据的正确性。

    本代码实现了获取锁和释放锁的操作,续命操作未实现,大家可以尝试实现一下。

    1. package lock;
    2. import lombok.Builder;
    3. import lombok.Getter;
    4. import lombok.Setter;
    5. import lombok.extern.slf4j.Slf4j;
    6. import java.sql.*;
    7. import java.util.Map;
    8. import java.util.Objects;
    9. import java.util.UUID;
    10. import java.util.concurrent.ConcurrentHashMap;
    11. import java.util.concurrent.TimeUnit;
    12. import java.util.concurrent.locks.ReentrantLock;
    13. @Slf4j
    14. public class DbLockUtil {
    15.     //将requestid保存在该变量中
    16.     static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
    17.     //jvm锁:当多个线程并发获取分布式锁时,需要先获取jvm锁,jvm锁获取成功,则尝试获取分布式锁
    18.     static Map<String, ReentrantLock> jvmLockMap = new ConcurrentHashMap<>();
    19.     /**
    20.      * 获取当前线程requestid
    21.      *
    22.      * @return
    23.      */
    24.     public static String getRequestId() {
    25.         String requestId = requestIdTL.get();
    26.         if (requestId == null || "".equals(requestId)) {
    27.             requestId = UUID.randomUUID().toString();
    28.             requestIdTL.set(requestId);
    29.         }
    30.         log.info("requestId:{}", requestId);
    31.         return requestId;
    32.     }
    33.     /**
    34.      * 获取锁
    35.      *
    36.      * @param lockKey         锁key
    37.      * @param lockTimeOut(毫秒) 持有锁的有效时间,防止死锁
    38.      * @param getTimeOut(毫秒)  获取锁的超时时间,这个时间内获取不到将重试
    39.      * @return
    40.      */
    41.     public static boolean lock(String lockKey, long lockTimeOut, int getTimeOut) throws Exception {
    42.         log.info("start");
    43.         boolean lockResult = false;
    44.         /**
    45.          * 单个jvm中可能有多个线程并发获取一个锁
    46.          * 此时我们只允许一个线程去获取分布式锁
    47.          * 所以如果同一个jvm中有多个线程尝试获取分布式锁,需要先获取jvm中的锁
    48.          */
    49.         ReentrantLock jvmLock = new ReentrantLock();
    50.         ReentrantLock oldJvmLock = jvmLockMap.putIfAbsent(lockKey, jvmLock);
    51.         oldJvmLock = oldJvmLock != null ? oldJvmLock : jvmLock;
    52.         boolean jvmLockSuccess = oldJvmLock.tryLock(getTimeOut, TimeUnit.MILLISECONDS);
    53.         //jvm锁获取失败,则直接失败
    54.         if (!jvmLockSuccess) {
    55.             return lockResult;
    56.         } else {
    57.             //jvm锁获取成功,则继续尝试获取分布式锁
    58.             try {
    59.                 String request_id = getRequestId();
    60.                 long startTime = System.currentTimeMillis();
    61.                 //循环尝试获取锁
    62.                 while (true) {
    63.                     //通过lockKey获取db中的记录
    64.                     LockModel lockModel = DbLockUtil.get(lockKey);
    65.                     if (Objects.isNull(lockModel)) {
    66.                         //记录不存在,则先插入一条
    67.                         DbLockUtil.insert(LockModel.builder().lock_key(lockKey).request_id("").lock_count(0).timeout(0L).version(0).build());
    68.                     } else {
    69.                         //获取请求id,稍后请求id会放入ThreadLocal中
    70.                         String requestId = lockModel.getRequest_id();
    71.                         //如果requestId为空字符,表示锁未被占用
    72.                         if ("".equals(requestId)) {
    73.                             lockModel.setRequest_id(request_id);
    74.                             lockModel.setLock_count(1);
    75.                             lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut);
    76.                             //并发情况下,采用cas方式更新记录
    77.                             if (DbLockUtil.update(lockModel) == 1) {
    78.                                 lockResult = true;
    79.                                 break;
    80.                             }
    81.                         } else if (request_id.equals(requestId)) {
    82.                             //如果requestId和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
    83.                             lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut);
    84.                             lockModel.setLock_count(lockModel.getLock_count() + 1);
    85.                             if (DbLockUtil.update(lockModel) == 1) {
    86.                                 lockResult = true;
    87.                                 break;
    88.                             }
    89.                         } else {
    90.                             //锁不是自己的,并且已经超时了,则重置锁,继续重试
    91.                             if (lockModel.getTimeout() < System.currentTimeMillis()) {
    92.                                 DbLockUtil.resetLock(lockModel);
    93.                             } else {
    94.                                 //如果未超时,休眠100毫秒,继续重试
    95.                                 if (startTime + getTimeOut > System.currentTimeMillis()) {
    96.                                     TimeUnit.MILLISECONDS.sleep(100);
    97.                                 } else {
    98.                                     break;
    99.                                 }
    100.                             }
    101.                         }
    102.                     }
    103.                 }
    104.             } finally {
    105.                 //释放jvm锁,将其从map中异常
    106.                 jvmLock.unlock();
    107.                 jvmLockMap.remove(lockKey);
    108.             }
    109.         }
    110.         log.info("end");
    111.         return lockResult;
    112.     }
    113.     /**
    114.      * 释放锁
    115.      *
    116.      * @param lock_key
    117.      * @throws Exception
    118.      */
    119.     private static void unlock(String lock_key) throws Exception {
    120.         //获取当前线程requestId
    121.         String requestId = getRequestId();
    122.         LockModel lockModel = DbLockUtil.get(lock_key);
    123.         //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
    124.         if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
    125.             if (lockModel.getLock_count() == 1) {
    126.                 //重置锁
    127.                 resetLock(lockModel);
    128.             } else {
    129.                 lockModel.setLock_count(lockModel.getLock_count() - 1);
    130.                 DbLockUtil.update(lockModel);
    131.             }
    132.         }
    133.     }
    134.     /**
    135.      * 重置锁
    136.      *
    137.      * @param lockModel
    138.      * @return
    139.      * @throws Exception
    140.      */
    141.     private static int resetLock(LockModel lockModel) throws Exception {
    142.         lockModel.setRequest_id("");
    143.         lockModel.setLock_count(0);
    144.         lockModel.setTimeout(0L);
    145.         return DbLockUtil.update(lockModel);
    146.     }
    147.     /**
    148.      * 更新lockModel信息,内部采用乐观锁来更新
    149.      *
    150.      * @param lockModel
    151.      * @return
    152.      * @throws Exception
    153.      */
    154.     private static int update(LockModel lockModel) throws Exception {
    155.         return exec(conn -> {
    156.             String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";
    157.             PreparedStatement ps = conn.prepareStatement(sql);
    158.             int colIndex = 1;
    159.             ps.setString(colIndex++, lockModel.getRequest_id());
    160.             ps.setInt(colIndex++, lockModel.getLock_count());
    161.             ps.setLong(colIndex++, lockModel.getTimeout());
    162.             ps.setString(colIndex++, lockModel.getLock_key());
    163.             ps.setInt(colIndex++, lockModel.getVersion());
    164.             return ps.executeUpdate();
    165.         });
    166.     }
    167.     private static LockModel get(String lock_key) throws Exception {
    168.         return exec(conn -> {
    169.             String sql = "select * from t_lock t WHERE t.lock_key=?";
    170.             PreparedStatement ps = conn.prepareStatement(sql);
    171.             int colIndex = 1;
    172.             ps.setString(colIndex++lock_key);
    173.             ResultSet rs = ps.executeQuery();
    174.             if (rs.next()) {
    175.                 return LockModel.builder().
    176.                         lock_key(lock_key).
    177.                         request_id(rs.getString("request_id")).
    178.                         lock_count(rs.getInt("lock_count")).
    179.                         timeout(rs.getLong("timeout")).
    180.                         version(rs.getInt("version")).build();
    181.             }
    182.             return null;
    183.         });
    184.     }
    185.     private static int insert(LockModel lockModel) throws Exception {
    186.         return exec(conn -> {
    187.             String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
    188.             PreparedStatement ps = conn.prepareStatement(sql);
    189.             int colIndex = 1;
    190.             ps.setString(colIndex++, lockModel.getLock_key());
    191.             ps.setString(colIndex++, lockModel.getRequest_id());
    192.             ps.setInt(colIndex++, lockModel.getLock_count());
    193.             ps.setLong(colIndex++, lockModel.getTimeout());
    194.             ps.setInt(colIndex++, lockModel.getVersion());
    195.             return ps.executeUpdate();
    196.         });
    197.     }
    198.     private static <T> T exec(SqlExec<T> sqlExec) throws Exception {
    199.         Connection conn = getConn();
    200.         try {
    201.             return sqlExec.exec(conn);
    202.         } finally {
    203.             closeConn(conn);
    204.         }
    205.     }
    206.     @FunctionalInterface
    207.     public interface SqlExec<T> {
    208.         T exec(Connection conn) throws Exception;
    209.     }
    210.     @Getter
    211.     @Setter
    212.     @Builder
    213.     public static class LockModel {
    214.         private String lock_key;
    215.         private String request_id;
    216.         private Integer lock_count;
    217.         private Long timeout;
    218.         private Integer version;
    219.     }
    220.     private static final String url = "jdbc:mysql://localhost:3306/dlock?useSSL=false";        //数据库地址
    221.     private static final String username = "";        //数据库用户名
    222.     private static final String password = "";        //数据库密码
    223.     private static final String driver = "com.mysql.jdbc.Driver";        //mysql驱动
    224.     /**
    225.      * 连接数据库
    226.      *
    227.      * @return
    228.      */
    229.     private static Connection getConn() {
    230.         Connection conn = null;
    231.         try {
    232.             Class.forName(driver);  //加载数据库驱动
    233.             try {
    234.                 conn = DriverManager.getConnection(url, username, password);  //连接数据库
    235.             } catch (SQLException e) {
    236.                 e.printStackTrace();
    237.             }
    238.         } catch (ClassNotFoundException e) {
    239.             e.printStackTrace();
    240.         }
    241.         return conn;
    242.     }
    243.     /**
    244.      * 关闭数据库链接
    245.      *
    246.      * @return
    247.      */
    248.     private static void closeConn(Connection conn) {
    249.         if (conn != null) {
    250.             try {
    251.                 conn.close();  //关闭数据库链接
    252.             } catch (SQLException e) {
    253.                 e.printStackTrace();
    254.             }
    255.         }
    256.     }
    257. }

    4、方式2:redis 的方式

    4.1、用到的几个命令

    1. setnx命令格式:SETNX key value;是『SET if Not eXists』(如果不存在,则 SET)的简写,只在简 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。命令在设置成功时返回 1 ,设置失败时返回 0 。
    2. getset命令格式:GETSET key value,将键 key 的值设为 value ,并返回键 key 在被设置之前的旧的value。返回值:如果键 key 没有旧值, 也即是说, 键 key 在被设置之前并不存在, 那么命令返回 nil 。当键 key 存在但不是字符串类型时,命令返回一个错误。
    3. expire命令格式:EXPIRE key seconds,使用:未给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。返回值:设置成功返回 1 。当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。
    4. del命令格式:DEL key [key …],使用:删除给定的一个或多个 key ,不存在的 key 会被忽略。返回值:被删除 key 的数量。

    4.2、原理

     

    过程分析,先看图左边过程

    1、A尝试去获取锁lockkey,通过setnx(lockkey,currenttime+timeout)命令,对lockkey进行setnx,将value值设置为当前时间+锁超时时间;

    2、如果返回值为1,说明redis服务器中还没有lockkey,也就是没有其他用户拥有这个锁,A就能获取锁成功;

    3、在进行相关业务执行之前,先执行expire(lockkey),对lockkey设置有效期,防止死锁;因为如果不设置有效期的话,lockkey将一直存在于redis中,其他用户尝试获取锁时,执行到setnx(lockkey,currenttime+timeout)时,将不能成功获取到该锁;

    4、执行相关业务

    5、释放锁,A完成相关业务之后,要释放拥有的锁,也就是删除redis中该锁的内容,del(lockkey),接下来的用户才能进行重新设置锁新值

    再看右边过程

    6、当A通过setnx(lockkey,currenttime+timeout)命令不能成功设置lockkey时,这是不能直接断定获取锁失败;因为我们在设置锁时,设置了锁的超时时间timeout,当当前时间大于redis中存储键值为lockkey的value值时,可以认为上一任的拥有者对锁的使用权已经失效了,A就可以强行拥有该锁;具体判定过程如下;

    7、A通过get(lockkey),获取redis中的存储键值为lockkey的value值,即获取锁的相对时间lockvalueA

    8、lockvalueA!=null && currenttime>lockvalue,A通过当前的时间与所设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;

    9、步骤4返回结果为true后,通过getSet设置新的超时时间,并返回旧值lockvalueB,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁

    10、lockvalueB == null || lockvalueA==lockvalueB,判断:如果lockvalueB为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的lockvalueB一致说明中间未被其他进程获取该锁,可以获取锁;否则不能获取锁,结束,获取锁失败。

    4.3、代码

    留给给大家,按照上面的过程实现下。

    5、方式3:zookeeper

    5.1、原理

    zookeeper是什么?是一个开源的中间件,可以做高可用配置中心使用,简单点理解:可以用来保存用户的一些数据。

    zookeeper有3个特点比较重要,这2个特点是实现分布式锁的关键。

    第1个特点:节点天然有序

    zookeeper中存储数据是树结构,树下面可以创建很多节点,节点中可以存储用户的数据。

    在每一个节点下面创建子节点时,只要选择的创建类型是有序类型,那么,此节点将自动在客户端指定的节点名后面添加一个单调递增序号,重点是,并发创建子节点的情况下,也可以确保多个子节点的有序性

    比如并发在/lock/lock1下面创建4个有序的节点,如下:

    客户端可以判断创建的节点序号是不是最小的,如果编号是子节点中最小的,则获取锁成功。

    第2个特点:临时节点

    客户端操作zookeeper,需要和zookeeper之间建立连接,如果客户端请求在zookeeper上创建的节点类型是临时节点,那么当客户端和zookeeper之间连接断开的时候,创建的临时节点自动会被zookeeper删除。

    这个可以防止死锁多功能,比如客户端获取锁之后挂了,那么节点会自动被删除,此时锁的其他获取者才有机会获取锁。

    第3个特点:监听器

    客户端可以对某个节点添加监听器,当节点信息发生变化的时候,zookeeper会通知客户端,比如节点数据被修改、节点被删除等等,都会通知客户端;

    这个特性特别牛逼:这个特别爽,后面的节点只需要监听他前面的一个节点,当前面的一个节点被删除时,zookeeper会通知监听者,监听者可以判断自己创建的节点编号是不是最小的,如果是最小的,即获取锁成功,这个是不是比上面数据库和redis的方式好一些,db和redis的方式需要自旋(获取失败了,休眠稍许,继续循环尝试),而zookeeper不需要自旋,锁被释放的时候,zookeeper会通知等待者。

    5.2、代码

    重点理解原理,代码大家可以在网上找找,比较多,这里就不贴出来了。

    6、方式4:etcd

    etcd 和 zookeeper功能差不多,也可以作为高可用配置中心,不过etcd基于Go语言实现,也可以用来实现分布式锁,实现原理上和zookeeper差不多,这里就不细说了。

    7、总结

    本文主要介绍了4种方式实现分布式锁,大家重点要理解每种方式的原理。

    db和redis的方式原理差不多,内部在获取失败的情况下,都需要采用自旋的方式重新尝试获取锁,而zookeeper采用监听的方式。

    redis和zookeeper这2种方式用的比较多,性能上面redis更好一些,并发量比较大的可以采用redis的方式。

    设计中还有一点:获取锁的时候分2步走,先获取jvm中的锁,然后再尝试获取分布式锁。

     

  • 相关阅读:
    关于ASO优化的分步入门指南1
    VSCode Python代码逐行运行
    使用并发 ssh 连接来提升捞日志脚本执行效率
    用Prometheus和Grafana监控Java Spring应用
    Oracle 11g RAC部署笔记
    Linux 将 /home 目录与 / 根目录磁盘合并
    AIOT数字孪生智慧工地一体化管理平台源码
    最适合运动的耳机类型是什么、最适合运动的耳机推荐
    多步验证Odoo功能模块
    表单制作软件有哪些?最火的8款表格制作软件?
  • 原文地址:https://blog.csdn.net/lt_xiaodou/article/details/126723477