• zookeeper实现分布式锁


    1、什么是锁

    • 在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全 (数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串行执行消除并发修改变量。
    • 对变量或者堆代码码块做同步本质上就是加锁。目的就是实现多个线程在一个时刻同一个代码块只能有一个线程可执行

    2、分布式锁

    分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下面的例子

    上面的设计是存在线程安全问题 

    问题

    • 假设Redis 里面的某个商品库存为 1;此时两个用户同时下单,其中一个下单请求执行到第 3 步,更新 数据库的库存为 0,但是第 4 步还没有执行。
    • 而另外一个用户下单执行到了第 2 步,发现库存还是 1,就继续执行第 3 步。但是商品库存已经为0, 所以如果数据库没有限制就会出现超卖的问题。

    解决方法

      用锁把 2、3、4 步锁住,让他们执行完之后,另一个线程才能进来执行。

        公司业务发展迅速,系统应对并发不断提高,解决方案是要增加一台机器,结果会出现更大的问题 

            假设有两个下单请求同时到来,分别由两个机器执行,那么这两个请求是可以同时执行了,依然存在超卖的问题。

            因为如图所示系统是运行在两个不同的 JVM 里面,不同的机器上,增加的锁只对自己当前 JVM 里面的线程有效,对于其他 JVM 的线程是无效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同一个锁,此时分布式锁就能解决该问题。

    分布式锁的作用:在整个系统提供一个全局、唯一的锁,在分布式系统中每个系统在进行相关操作的时候需要获取到该锁,才能执行相应操作。

    3、zk实现分布式锁

    利用Zookeeper可以创建临时带序号节点的特性来实现一个分布式锁

    实现思路

    • 锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
    • 每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
    • 获取锁失败的线程获取当前节点上一个临时顺序节点,并对对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁。

    流程图

    3.1、main方法 

    1. //zk实现分布式锁
    2. public class DisLockTest {
    3. public static void main(String[] args) {
    4. //使用10个线程模拟分布式环境
    5. for (int i = 0; i < 10; i++) {
    6. new Thread(new DisLockRunnable()).start();//启动线程
    7. }
    8. }
    9. static class DisLockRunnable implements Runnable {
    10. public void run() {
    11. //每个线程具体的任务,每个线程就是抢锁,
    12. final DisClient client = new DisClient();
    13. client.getDisLock();
    14. //模拟获取锁之后的其它动作
    15. try {
    16. Thread.sleep(2000);
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. //释放锁
    21. client.deleteLock();
    22. }
    23. }
    24. }

    3.2、核心方法

    1. import org.I0Itec.zkclient.IZkDataListener;
    2. import org.I0Itec.zkclient.ZkClient;
    3. import java.util.Collections;
    4. import java.util.List;
    5. import java.util.concurrent.CountDownLatch;
    6. //抢锁
    7. //1. 去zk创建临时序列节点,并获取到序号
    8. //2. 判断自己创建节点序号是否是当前节点最小序号,如果是则获取锁
    9. //执行相关操作,最后要释放锁
    10. //3. 不是最小节点,当前线程需要等待,等待你的前一个序号的节点
    11. //被删除,然后再次判断自己是否是最小节点。。。
    12. public class DisClient {
    13. public DisClient() {
    14. //初始化zk的/distrilocl节点,会出现线程安全问题
    15. synchronized (DisClient.class) {
    16. if (!zkClient.exists("/distrilock")) {
    17. zkClient.createPersistent("/distrilock");
    18. }
    19. }
    20. }
    21. //前一个节点
    22. String beforNodePath;
    23. String currentNoePath;
    24. //获取到zkClient
    25. private ZkClient zkClient = new ZkClient("linux121:2181,linux122:2181");
    26. //把抢锁过程为量部分,一部分是创建节点,比较序号,另一部分是等待锁
    27. //完整获取锁方法
    28. public void getDisLock() {
    29. //获取到当前线程名称
    30. final String threadName = Thread.currentThread().getName();
    31. //首先调用tryGetLock
    32. if (tryGetLock()) {
    33. //说明获取到锁
    34. System.out.println(threadName + ":获取到了锁");
    35. } else {
    36. // 没有获取到锁,
    37. System.out.println(threadName + ":获取锁失败,进入等待状态");
    38. waitForLock();
    39. //递归获取锁
    40. getDisLock();
    41. }
    42. }
    43. CountDownLatch countDownLatch = null;
    44. //尝试获取锁
    45. public boolean tryGetLock() {
    46. //创建临时顺序节点,/distrilock/序号
    47. if (null == currentNoePath || "".equals(currentNoePath)) {
    48. currentNoePath = zkClient.createEphemeralSequential("/distrilock/", "lock");
    49. }
    50. //获取到/distrilock下所有的子节点
    51. final List childs = zkClient.getChildren("/distrilock");
    52. //对节点信息进行排序
    53. Collections.sort(childs); //默认是升序
    54. final String minNode = childs.get(0);
    55. //判断自己创建节点是否与最小序号一致
    56. if (currentNoePath.equals("/distrilock/" + minNode)) {
    57. //说明当前线程创建的就是序号最小节点
    58. return true;
    59. } else {
    60. //说明最小节点不是自己创建,要监控自己当前节点序号前一个的节点
    61. final int i = Collections.binarySearch(childs, currentNoePath.substring("/distrilock/".length()));
    62. //前一个(lastNodeChild是不包括父节点)
    63. String lastNodeChild = childs.get(i - 1);
    64. beforNodePath = "/distrilock/" + lastNodeChild;
    65. }
    66. return false;
    67. }
    68. //等待之前节点释放锁,如何判断锁被释放,需要唤醒线程继续尝试tryGetLock
    69. public void waitForLock() {
    70. //准备一个监听器
    71. final IZkDataListener iZkDataListener = new IZkDataListener() {
    72. public void handleDataChange(String s, Object o) throws Exception {
    73. }
    74. //删除
    75. public void handleDataDeleted(String s) throws Exception {
    76. //提醒当前线程再次获取锁
    77. countDownLatch.countDown();//把值减1变为0,唤醒之前await线程
    78. }
    79. };
    80. //监控前一个节点
    81. zkClient.subscribeDataChanges(beforNodePath, iZkDataListener);
    82. //在监听的通知没来之前,该线程应该是等待状态,先判断一次上一个节点是否还存在
    83. if (zkClient.exists(beforNodePath)) {
    84. //开始等待,CountDownLatch:线程同步计数器
    85. countDownLatch = new CountDownLatch(1);
    86. try {
    87. countDownLatch.await();//阻塞,countDownLatch值变为0
    88. } catch (InterruptedException e) {
    89. e.printStackTrace();
    90. }
    91. }
    92. //解除监听
    93. zkClient.unsubscribeDataChanges(beforNodePath, iZkDataListener);
    94. }
    95. //释放锁
    96. public void deleteLock() {
    97. if (zkClient != null) {
    98. zkClient.delete(currentNoePath);
    99. zkClient.close();
    100. }
    101. }
    102. }

    注意:

            分布式锁的实现可以是 Redis、Zookeeper,相对来说生产环境如果使用分布式锁可以考虑使用Redis实现而非Zk。

  • 相关阅读:
    在express中使用session认证
    一篇博客学会系列(1) —— C语言中所有字符串函数以及内存函数的使用和注意事项
    idea导入maven项目依赖报错
    2022年11月30日 Fuzzy C-Means学习笔记
    Java源码分析 | Object
    说说mybatis中的#{} 和 ${}
    香港服务器在大陆连不上怎么回事?
    新开课day21+day22总结
    完美支持--WIN11--Crack--LightningChart-10.3.2.2
    【从0开始编写webserver·基础篇#03】TinyWeb源码阅读,还是得看看靠谱的项目
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/127575486