• 分布式锁中-基于Zookeeper的实现是怎样


    Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK服务会使用Znode存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:

    • 数据组织:数据节点以树形目录(类似文件系统)组织管理,每一个节点中都会保存数据信息和节点信息。

      ZooKeeper's Hierarchical Namespace

    • 集群模式:通常是由3、5个基数实例组成集群,当超过半数服务实例正常工作就能对外提供服务,既能避免单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局一致

      ZooKeeper Service

    • 顺序更新:更新请求都会转由leader执行,来自同一客户端的更新将按照发送的顺序被写入到ZK,处理写请求创建Znode时,Znode名称后会被分配一个全局唯一的递增编号,可以通过顺序号推断请求的顺序,利用这个特性可以实现高级协调服务

    • 监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个Watch Event,可以感知到节点\数据的变更

    • 临时节点:session链接断开临时节点就没了,不能创建子节点(很关键)

    ZK的分布式锁正是基于以上特性来实现的,简单来说是:

    • 临时节点:用于支撑异常情况下的锁自动释放能力
    • 顺序节点:用于支撑公平锁获取锁和排队等待的能力
    • 监听机制:用于支撑抢锁能力
    • 集群模式:用于支撑锁服务的高可用

    2. 加解锁的流程描述

    1. 创建一个永久节点作为锁节点(/lock2)

    2. 试图加锁的客户端在指定锁名称节点(/lock2)下,创建临时顺序子节点

    3. 获取锁节点(/lock2)下所有子节点

    4. 对所获取的子节点按节点自增序号从小到大排序

    5. 判断自己是不是第一个子节点,若是,则获取锁

    6. 若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的方式避免了惊群效应)

    7. 若是阻塞申请锁,则申请锁的操作可增加阻塞等待

    8. 若监听事件生效(说明前节点释放了,可以尝试去获取锁),则回到第3步重新进行判断,直到获取到锁

    9. 解锁时,将第一个子节点删除释放

    3. ZK分布式锁的能力

    可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:

    • 互斥性:在同一时刻,只有一个客户端能持有锁
    • 安全性:避免死锁,如果某个客户端获得锁之后处理时间超过最大约定时间,或者持锁期间发生了故障导致无法主动释放锁,其持有的锁也能够被其他机制正确释放,并保证后续其它客户端也能加锁,整个处理流程继续正常执行
    • 可用性:也被称作容错性,分布式锁需要有高可用能力,避免单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务自身具备集群模式,遇到故障能自动切换恢复工作;另一种是客户端向多个独立的锁服务发起请求,当某个锁服务故障时仍然可以从其他锁服务读取到锁信息(Redlock)
    • 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其他线程程持有的锁给释放了
    • 高效灵活:加锁、解锁的速度要快;支持阻塞和非阻塞;支持公平锁和非公平锁

    基于上文的内容,这里简单总结一下ZK的能力矩阵(其它分布式锁的情况会在后续文章中补充):

    能力ZKMySqlRedis原生RedlockETCD
    互斥
    安全链接异常,session关闭后锁会自动释放
    可用性相对还好
    可重入线程可重入
    加解锁速度居中
    阻塞非阻塞都支持
    公平非公平仅公平锁

    关于性能不太高的一种说法

    因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。

    由于ZooKeeper的高可用特性,在并发量不是太高的场景,也推荐使用ZK的分布式锁。

    4. InterProcessMutex 使用示例

    Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。

    4.1 接口介绍

    1. // 获取互斥锁
    2. public void acquire() throws Exception;
    3. // 在给定的时间内获取互斥锁
    4. public boolean acquire(long time, TimeUnit unit) throws Exception;
    5. // 释放锁处理
    6. public void release() throws Exception;
    7. // 如果当前线程获取了互斥锁,则返回true
    8. boolean isAcquiredInThisProcess();
    9. 复制代码

    4.2 pom依赖

    1. <dependency>
    2.   <groupId>org.apache.logging.log4j</groupId>
    3.   <artifactId>log4j-core</artifactId>
    4.   <version>2.8.2</version>
    5. </dependency>
    6. <dependency>
    7.   <groupId>org.apache.zookeeper</groupId>
    8.   <artifactId>zookeeper</artifactId>
    9.   <version>3.5.7</version>
    10. </dependency>
    11. <dependency>
    12.   <groupId>org.apache.curator</groupId>
    13.   <artifactId>curator-framework</artifactId>
    14.   <version>4.3.0</version>
    15. </dependency>
    16. <dependency>
    17.   <groupId>org.apache.curator</groupId>
    18.   <artifactId>curator-recipes</artifactId>
    19.   <version>4.3.0</version>
    20. </dependency>
    21. <dependency>
    22.   <groupId>org.apache.curator</groupId>
    23.   <artifactId>curator-client</artifactId>
    24.   <version>4.3.0</version>
    25. </dependency>
    26. 复制代码

    4.3 示例

    1. package com.atguigu.case3;
    2. import org.apache.curator.framework.CuratorFramework;
    3. import org.apache.curator.framework.CuratorFrameworkFactory;
    4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    5. import org.apache.curator.retry.ExponentialBackoffRetry;
    6. public class CuratorLockTest {
    7.     public static void main(String[] args) {
    8.         // 创建分布式锁1
    9.         InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
    10.         // 创建分布式锁2
    11.         InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
    12.         new Thread(new Runnable() {
    13.             @Override
    14.             public void run() {
    15.                 try {
    16.                     lock1.acquire();
    17.                     System.out.println("线程1 获取到锁");
    18.                     lock1.acquire();
    19.                     System.out.println("线程1 再次获取到锁");
    20.                     Thread.sleep(5 * 1000);
    21.                     lock1.release();
    22.                     System.out.println("线程1 释放锁");
    23.                     lock1.release();
    24.                     System.out.println("线程1  再次释放锁");
    25.                 } catch (Exception e) {
    26.                     e.printStackTrace();
    27.                 }
    28.             }
    29.         }).start();
    30.         new Thread(new Runnable() {
    31.             @Override
    32.             public void run() {
    33.                 try {
    34.                     lock2.acquire();
    35.                     System.out.println("线程2 获取到锁");
    36.                     lock2.acquire();
    37.                     System.out.println("线程2 再次获取到锁");
    38.                     Thread.sleep(5 * 1000);
    39.                     lock2.release();
    40.                     System.out.println("线程2 释放锁");
    41.                     lock2.release();
    42.                     System.out.println("线程2  再次释放锁");
    43.                 } catch (Exception e) {
    44.                     e.printStackTrace();
    45.                 }
    46.             }
    47.         }).start();
    48.     }
    49.     private static CuratorFramework getCuratorFramework() {
    50.         ExponentialBackoffRetry policy = new ExponentialBackoffRetry(30003);
    51.         CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
    52.                 .connectionTimeoutMs(2000)
    53.                 .sessionTimeoutMs(2000)
    54.                 .retryPolicy(policy).build();
    55.         // 启动客户端
    56.         client.start();
    57.         System.out.println("zookeeper 启动成功");
    58.         return client;
    59.     }
    60. }
    61. 复制代码

    5. DIY一个阉割版的分布式锁

    通过这个实例对照第2节内容来理解加解锁的流程,以及如何避免惊群效应。

    1. package com.rock.case2;
    2. import org.apache.zookeeper.*;
    3. import org.apache.zookeeper.data.Stat;
    4. import java.io.IOException;
    5. import java.util.List;
    6. import java.util.concurrent.CountDownLatch;
    7. /**
    8.  * zk 分布式锁 v1版本:
    9.  * 完成功能 :
    10.  *      1. 避免了惊群效应
    11.  * 缺失功能:
    12.  *      1. 超时控制
    13.  *      2. 读写锁
    14.  *      3. 重入控制
    15.  */
    16. public class DistributedLock {
    17.     private String connectString;
    18.     private int sessionTimeout;
    19.     private ZooKeeper zk;
    20.     private CountDownLatch connectLatch = new CountDownLatch(1);
    21.     private CountDownLatch waitLatch = new CountDownLatch(1);
    22.     private String waitPath;
    23.     private String currentNode;
    24.     private String LOCK_ROOT_PATH;
    25.     private static String NODE_PREFIX = "w";
    26.     public DistributedLock(String connectString, int sessionTimeout, String lockName) {
    27.         //TODO:数据校验
    28.         this.connectString = connectString;
    29.         this.sessionTimeout = sessionTimeout;
    30.         this.LOCK_ROOT_PATH = lockName;
    31.     }
    32.     public void init() throws IOException, KeeperException, InterruptedException {
    33.         // 建联
    34.         zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
    35.             // connectLatch  连接上zk后  释放
    36.             if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
    37.                 connectLatch.countDown();
    38.             }
    39.         });
    40.         connectLatch.await();// 等待zk正常连接后
    41.         // 判断锁名称节点是否存在
    42.         Stat stat = zk.exists(LOCK_ROOT_PATH, false);
    43.         if (stat == null) {
    44.             // 创建一下锁名称节点
    45.             try {
    46.                 zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    47.             } catch (KeeperException e) {
    48.                 //并发创建冲突忽略。
    49.                 if (!e.code().name().equals("NODEEXISTS")) {
    50.                     throw e;
    51.                 }
    52.             }
    53.         }
    54.     }
    55.     /**
    56.      * 待补充功能:
    57.      * 1. 超时设置
    58.      * 2. 读写区分
    59.      * 3. 重入控制
    60.      */
    61.     public void zklock() throws KeeperException, InterruptedException {
    62.         if (!tryLock()) {
    63.             waitLock();
    64.             zklock();
    65.         }
    66.     }
    67.     /**
    68.      *
    69.      */
    70.     private void waitLock() throws KeeperException, InterruptedException {
    71.         try {
    72.             zk.getData(waitPath, new Watcher() {
    73.                 @Override
    74.                 public void process(WatchedEvent watchedEvent) {
    75.                     // waitLatch  需要释放
    76.                     if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
    77.                         waitLatch.countDown();
    78.                     }
    79.                 }
    80.             }, new Stat());
    81.             // 等待监听
    82.             waitLatch.await();
    83.         } catch (KeeperException.NoNodeException e) {
    84.             //如果等待的节点已经被清除了,不等了,再尝试去抢锁
    85.             return;
    86.         }
    87.     }
    88.     private boolean tryLock() throws KeeperException, InterruptedException {
    89.         currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    90.         // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
    91.         List<String> children = zk.getChildren(LOCK_ROOT_PATH, false);
    92.         // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
    93.         if (children.size() == 1) {
    94.             return true;
    95.         } else {
    96.             String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
    97.             // 通过w00000000获取该节点在children集合的位置
    98.             int index = children.indexOf(thisNode);
    99.             if (index == 0) {
    100.                 //自己就是第一个节点
    101.                 return true;
    102.             }
    103.             // 需要监听  他前一个节点变化
    104.             waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
    105.         }
    106.         return false;
    107.     }
    108.     // 解锁
    109.     public void unZkLock() {
    110.         // 删除节点
    111.         try {
    112.             zk.delete(this.currentNode, -1);
    113.         } catch (InterruptedException e) {
    114.             e.printStackTrace();
    115.         } catch (KeeperException e) {
    116.             e.printStackTrace();
    117.         }
    118.     }
    119. }
  • 相关阅读:
    国民MCU_freertos V10.3.1 使用经验避坑总结
    xray证书安装使用及Burp联动
    Jenkins与服务器时间不一致
    Vue3简单使用(一) --- 环境搭建
    [NSSRound#13 Basic] 刷题记录
    【Spring系列】DeferredResult异步处理
    这代码运行超时,怎么优化
    CSS 中的 white-space 渲染模型
    openGauss数据库基本操作(超详细)
    python字符串相关
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/127569868