• 利用zk实现分布式锁&zk数据同步原理


    一、什么是分布式锁

      顾名思义,分布式锁,就是分布式系统中的锁,是为了解决分布式场景下控制一些共享资源访问的问题,即某时刻同一个方法只有一个线程在运行。
      分布式锁的设计原则:互斥性、原子性、安全性、容错性、可重用性、公平性、高可用性、高性能、持久性、支持阻塞和非阻塞

    二、分布式锁的实现方式

    1. 基于数据库实现分布式锁【不推荐】

    实现方式:
    1. 基于数据库排他锁
    2. 基于表的唯一索引
    缺点:性能比较差

    2. 基于Redis实现分布式锁

    实现方式:
    1. setnx+expire命令(错误):
    直接分开使用命令其实是有问题的,因为不具备原子性,若执行了setnx的时候程序异常了,那么锁将永远不会过期
    2. lua脚本:
    通过lua脚本保证setnx和expire的原子性
    3. SET key uniqueId [EX time] [NX|XX]
    - nx:当key不存在时设置值
    - xx:当key存在时设置值
    4. 使用实现redlock算法的redisson
    缺点:CAP中满足AP模型,存在一致性问题

    3. 基于Zookeeper实现分布式锁

    1. zk实现分布式锁,这里需要提一下Curator。
    2. Apache Curator是一个Zookeeper的开源客户端,它提供了Zookeeper各种应用场景(Recipe,如共享锁服务、master选举、分布式计数器等)的抽象封装,接下来将利用Curator提供的类来实现分布式锁
      基于Curator客户端实现的分布式锁实现类型有以下五种:
      在这里插入图片描述

    缺点:性能上不如使用缓存实现的分布式锁,因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能

    4. 基于etcd的实现

      etcd像是专门为集群环境的服务发现和注册而设计,它提供了数据TTL失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群节点的状态
      因为 etcd 使用 Raft 算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序
      保持独占,即所有试图获取锁的用户最终只有一个可以得到。etcd为此提供了一套实现分布式锁原子操作CAS(CompareAndSwap)的API。通过设置prevExist值,可以保证在多个节点同时创建某个目录时,只有一个成功,而该用户即可认为是获得了锁。
      控制时序,即所有试图获取锁的用户都会进入等待队列,获得锁的顺序是全局唯一的,同时决定了队列执行顺序。etcd为此也提供了一套API(自动创建有序键),对一个目录建值时指定为POST动作,这样etcd会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用API按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号
    实现方式:

    1. 基于etcd API
      尝试拿锁+自动续租+关闭清理,倘若拿锁失败,进入等待队列
      缺点:没有现成的框架,需要一定自研

    5. 基于consul的实现

    实现方式:

    1. . consul中锁的主要是依赖KV Store和Session相关API

    三、Zookeeper实现分布式锁——代码实现:

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    可重入锁

    public class SharedReentrantLockTest {
        private static final String lockPath = "/test/sharedreentrantlock";
        private static final Integer clientNums = 5;
        final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的资源
        private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);
        @Autowired
        private static BaseZookeeper baseZookeeper;
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < clientNums; i++) {
                String clientName = "client#" + i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client= CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
                        client.start();
                        Random random = new Random();
                        try {
                            final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                            // 每个客户端请求10次共享资源
                            for (int j = 0; j < 10; j++) {
                                if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                    throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥锁");
                                }
                                try {
                                    System.out.println(j + ". " + clientName + " 已获取到互斥锁");
                                    resource.use(); // 使用资源
                                    if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                        throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥锁");
                                    }
                                    System.out.println(j + ". " + clientName + " 已再次获取到互斥锁");
                                    lock.release(); // 申请几次锁就要释放几次锁
                                } finally {
                                    System.out.println(j + ". " + clientName + " 释放互斥锁");
                                    lock.release(); 
                                }
                                Thread.sleep(random.nextInt(100));
                            }
                        } catch (Throwable e) {
                            System.out.println(e.getMessage());
                        } finally {
                            CloseableUtils.closeQuietly(client);
                            System.out.println(clientName + " client close!");
                            countDownLatch.countDown();
                        }
                    }
                }).start();
            }
            countDownLatch.await();
            System.out.println("ending");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    public class FakeLimitedResource {
        private final AtomicBoolean inUse = new AtomicBoolean(false);
    
        // 模拟只能单线程操作的资源
        public void use() throws InterruptedException {
            if (!inUse.compareAndSet(false, true)) {
                // 在正确使用锁的情况下,此异常不可能抛出
                throw new IllegalStateException("Needs to be used by one client at a time");
            }
            try {
                Thread.sleep((long) (100 * Math.random()));
            } finally {
                inUse.set(false);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    可重入读写锁

     public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < clientNums; i++) {
                final String clientName = "client#" + i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        //CuratorFramework client = ZKUtils.getClient();
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client= CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
                        client.start();
                        final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
                        final InterProcessMutex readLock = lock.readLock();
                        final InterProcessMutex writeLock = lock.writeLock();
    
                        try {
                            // 注意只能先得到写锁再得到读锁,不能反过来
                            if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
                                throw new IllegalStateException(clientName + " 不能得到写锁");
                            }
                            System.out.println(clientName + " 已得到写锁");
                            if (!readLock.acquire(10, TimeUnit.SECONDS)) {
                                throw new IllegalStateException(clientName + " 不能得到读锁");
                            }
                            System.out.println(clientName + " 已得到读锁");
                            try {
                                resource.use(); // 使用资源
                            } finally {
                                System.out.println(clientName + " 释放读写锁");
                                readLock.release();
                                writeLock.release();
                            }
                        } catch (Exception e) {
                            System.out.println(e.getMessage());
                        } finally {
                            CloseableUtils.closeQuietly(client);
                            countDownLatch.countDown();
                        }
                    }
                }).start();
            }
            countDownLatch.await();
            System.out.println("ending。。。");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    信号量

    public class SharedSemaphoreTest {
        private static final int MAX_LEASE = 10;
        private static final String PATH = "/test/semaphore";
        private static final FakeLimitedResource resource = new FakeLimitedResource();
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = ZKUtils.getClient();
            client.start();
            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("获取租约数量:" + leases.size());
            Lease lease = semaphore.acquire();
            System.out.println("获取单个租约");
            resource.use(); // 使用资源
            // 再次申请获取5个leases,此时leases数量只剩4个,不够,将超时
            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("获取租约,如果超时将为null: " + leases2);
            System.out.println("释放租约");
            semaphore.returnLease(lease);
            // 再次申请获取5个,这次刚好够
            leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("获取租约,如果超时将为null: " + leases2);
            System.out.println("释放集合中的所有租约");
            semaphore.returnAll(leases);
            semaphore.returnAll(leases2);
            client.close();
            System.out.println("结束!");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    多锁

    public class MultiSharedLockTest {
        private static final String lockPath1 = "/test/MSLock1";
        private static final String lockPath2 = "/test/MSLock2";
        private static final FakeLimitedResource resource = new FakeLimitedResource();
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = ZKUtils.getClient();
            client.start();
    
            InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入锁
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入锁
            // 组锁,多锁
            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("不能获取多锁");
            }
            System.out.println("已获取多锁");
            System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
            System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
            try {
                resource.use(); // 资源操作
            } finally {
                System.out.println("释放多个锁");
                lock.release(); // 释放多锁
            }
            System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
            System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
            client.close();
            System.out.println("结束!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    四、ZK数据同步原理

    在这里插入图片描述

    ZAB协议工作原理

    什么是ZAB协议

      ZAB(Zookeeper Atomic Broadcast) 协议是为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议。在zookeeper中,主要依赖ZAB协议来实现分布式数据一致性,基于该协议,zookeeper实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性
      ZAB协议包含两种基本模式:

    • 崩溃恢复:
        集群环境下,当leader由于某些因素出现问题时,需要选举出新的leader,当leader服务器选举出来后,并且集群中有过半的机器和该leader节点完成数据同步后,ZAB协议就会退出恢复模式
    • 消息广播(原子广播)
      在这里插入图片描述
      过程:
    1. leader接收到消息请求后,将消息赋予一个全局唯一的64位自增id,叫:zxid(采用递增,保证顺序一致性),通过zxid的大小比较就可以实现因果有序这个特征。
    2. leader为每个follower准备了一个FIFO队列(通过TCP协议来实现,以实现全局有序这一个特点)将带有zxid的消息作为一个提案(proposal)分发给所有的 follower。
    3. 当follower接收到proposal,先把proposal写到磁盘,写入成功以后再向leader回复一个ack。
    4. 当leader接收到合法数量(超过半数节点)的ack后,leader就会向这些follower发送commit命令,同时会在本地执行该消息。
    5. 当follower收到消息的commit命令以后,会提交该消息

    数据同步

       在崩溃恢复完成选举以后,就开始进行数据同步了,在选举过程中,通过投票已经确认 Leader 服务器是最大Zxid 的节点,同步阶段就是利用 Leader 前一阶段获得的最新Proposal历史,同步集群中所有的副本

  • 相关阅读:
    Oracle 的LogMiner
    【UI】饿了么 el-upload如何上传到不同的路径, 根据不同情况上传指不同的接口,不同的路径
    互联网轻量级框架整合之JavaEE基础II
    kvm虚拟机压缩qcow2镜像空间
    Ubuntu18.04安装IgH主站
    中睿天下参展2023海军工程大学首届网络安全文化周并发表主题演讲
    【Windows Docker:安装nginx】
    chrome v3开发插件实现所有网站允许跨域
    Oracle创建DBlink
    TypeScript - 枚举 - 数字枚举
  • 原文地址:https://blog.csdn.net/weixin_44827241/article/details/126257638