• zookeeper 进阶 —— (动态上下线监听;;分布式锁;;企业面试)(师承尚硅谷)


    Zookeeper 进阶

    1. 服务器动态上下监听案例

    需求:服务器上下限,客户端可以监听

    (需求的图片)

    具体实现:
    1.先启动zookeeper创建一个/servers结点:
    2.上代码:
    服务器编写思路

    写代码具体的思路:
    1.连接zk
    2.创建结点(服务器上线)
    3.业务逻辑(睡觉)
    
    • 1
    • 2
    • 3
    • 4

    DirstributedServers类代码:

    package com.wts.case01;
    /**
     * 服务器注册到zk集群等相关操作
     */
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class DirstributedServers {
        private String connectionString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private int sessionTimeout = 2000;//2s
        private ZooKeeper zk;
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            //1.获取zk连接
            DirstributedServers servers = new DirstributedServers();
            servers.getConnection();
    
            //2.注册服务器到zk集群 -- 也就是创建节点
            servers.regiter(args[0]);
    
            //启动业务逻辑
            servers.bussiness();
        }
    
        //启动业务逻辑,等待服务器运行
        private void bussiness() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    
        //注册服务器到zk集群
        private void regiter(String hostname) throws InterruptedException, KeeperException {//临时带序号结点
            String create = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(hostname + " is online....");
        }
    
        //连接zk
        private void getConnection() throws IOException {
            zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                }
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    客户端编程思路:

    1.连接zk
    2.监听服务器上下线情况(监听结点)
    3.业务逻辑--睡觉
    
    • 1
    • 2
    • 3

    DistributeClient类:

    package com.wts.case01;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 客户端监听zk集群
     */
    public class DistributeClient {
        private String connectionString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private int sessionTime = 2000;
        private static ZooKeeper zk;
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            //1.连接zk
            DistributeClient client = new DistributeClient();
            client.getConnection();
    
            //2.监听/servers下面的子节点的删除和增加
            client.getServersList();
    
            //3.业务逻辑--也就是睡觉
            client.bussiness();
        }
    
        //业务逻辑--睡觉
        private void bussiness() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    
        //监听--得到List
        private void getServersList() throws InterruptedException, KeeperException {
            ArrayList<String> arrayList = new ArrayList<>();
            List<String> children = zk.getChildren("/servers", true);
            for (String child : children) {
                byte[] data = zk.getData("/servers/" + child, false, null);// 2个斜杠忘记了会报错
                arrayList.add(new String(data));
            }
            //打印
            System.out.println(arrayList);
        }
    
        private void getConnection() throws IOException {
            //zk快速生成了成员变量 -- alt+ctrl+f--current m
            zk = new ZooKeeper(connectionString, sessionTime, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //注册一次,监听一次,所以这里面再写监听 (2.写监听代码的最后一步)
                    try {
                        getServersList();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    其中报错记录:
    org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = Connec…
    ECS的端口没设置好
    查看连接ESC报错,org.apache.zookeeper.KeeperExceptionConnectionLossException: KeeperErrorCode = Connec…

    效果
    1.创建删除结点模拟服务器上下限,在Client客户端控制台上可以看到:

    在这里插入图片描述
    2.用代码上线服务器,在客户端也可以看到变化
    在这里插入图片描述

    2.zk分布式锁案例

    需求:

    在这里插入图片描述

    实现:
    zk的所谓的加锁,就是在特定位置创建临时带序号结点,判断最小的编号,做出相应的操作

    上代码:
    DistributedLock 类:

    思路:
    1、连接zk
    2、加锁,创建临时带序号结点,判断最小的编号,做出相应的操作
    (是否需要监听序号的前一个结点?)
    3、解锁,删除创建的临时结点
    
    • 1
    • 2
    • 3
    • 4
    • 5
    package com.wts.case02;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class DistributedLock {
        private final ZooKeeper zk;
        private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private final int sessionTimeout = 2000;
    
        private CountDownLatch connectLatch = new CountDownLatch(1);
        private CountDownLatch waitLatch = new CountDownLatch(1);//等待前一个步骤执行完毕之后,再执行后面的操作
        private String waitPath;//前一个结点的路径
        private String currentMode;//创建的当前节点
    
        public DistributedLock() throws IOException, InterruptedException, KeeperException {
    
    
            //连接zk
            zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //countDownLatch  ,,连接上zk后,可以释放掉了
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        //如果当前状态是连接上了,释放
                        connectLatch.countDown();
                    }
    
                    //waitLatch,,他也要释放
                    if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                        waitLatch.countDown();
                    }
                }
            });
    
            //等待zk正常连接后,往下走程序---阻塞。
            connectLatch.await();
    
            //判断根节点是否存在/locks是否存在
            Stat stat = zk.exists("/locks", false);
            if (stat == null) {
                //创建根节点
                zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
    
        }
    
        //对zk加锁---所谓的加锁就是在里面创建一个临时节点,判断这个临时结点是否是最小的一个
        public void zkLock() {
            //创建带序号的临时结点
            try {
                currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                //判断当前节点是否是序号最小的临时结点,如果是,获取锁;非,监听前一个结点
                List<String> children = zk.getChildren("/locks", false);
                if (children.size() == 1) {
                    return;
                } else {
                    //排序
                    Collections.sort(children);
    
                    //获取结点的名称seq-000000  截取字符串的方式
                    String thisNode = currentMode.substring("/locks/".length());
                    //获取这个结点当前的位置
                    int index = children.indexOf(thisNode);
                    if (index == -1) {
                        System.out.println("数据异常,,,");
                    } else if (index == 0) {
                        //当前结点再列表中是最小的,Client获取锁
                        return;
                    } else {
                        //需要监听前一个结点变化
                        waitPath = "/locks/" + children.get(index - 1);
                        zk.getData(waitPath, true, null);//watch是true,就转到监听部分的代码,位置在上面的new watch()上面
    
                        //等待执行
                        waitLatch.await();
    
                        //监听到之后立马return
                        return;
                    }
    
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
        //对zk解锁
        public void unZkLock() {
            //解锁就是删除节点
            try {
                zk.delete(currentMode, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    测试代码:

    package com.wts.case02;
    
    import org.apache.zookeeper.KeeperException;
    
    import java.io.IOException;
    
    public class DistributedLockDemo {
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            //创建两个客户端
            DistributedLock lock1 = new DistributedLock();
            DistributedLock lock2 = new DistributedLock();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock1.zkLock();
                        System.out.println("线程1-获取锁--");
    
                        Thread.sleep(2000);
    
                        lock1.unZkLock();
                        System.out.println("线程1-释放锁--");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock2.zkLock();
                        System.out.println("线程2--获取锁--");
    
                        Thread.sleep(5 * 1000);
    
                        lock2.unZkLock();
                        System.out.println("线程2--释放锁--");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    效果:
    在这里插入图片描述

    3. 企业面试

    1.选举机制
    半数机制,超过半数的投票通过,即通过。
    (1) 第一次启动选举规则:
    投票过半数时, 服务器 id 大的胜出
    (2) 第二次启动选举规则:
    ①EPOCH 大的直接胜出
    ②EPOCH 相同, 事务 id 大的胜出
    ③事务 id 相同,服务器 id 大的胜出

    2 .生产集群安装多少 zk 合适?
    安装奇数台。
    生产经验:

    • 0 台服务器: 3 台 zk;

    • 20 台服务器: 5 台 zk;

    • 100 台服务器: 11 台 zk;

    • 200 台服务器: 11 台 zk

    服务器台数多: 好处,提高可靠性;
    坏处:提高通信延时

    3.常用命令
    ls 、get 、create 、delete


    zookeeper源码分析。。。过几年再学

  • 相关阅读:
    nginx的正向代理和反向代理以及负载均衡
    虹科活动 | 探索全新AR应用时代,虹科AR VIP研讨会广州场回顾!
    【Java并发编程十二】线程池
    2023/10/23学习记录
    i.MX 6ULL 驱动开发 二十九:向 Linux 内核中添加自己编写驱动
    【论文阅读 05】图像异常检测研究现状综述
    探索大恒图像|MER2-301-125U3C在制造行业玻璃瓶质量检测的应用
    商业银行如何构建一体化监控
    我常用的几个 VueUse 最佳组合,推荐给你们!
    基于Supermap for leaflet 的投影转换
  • 原文地址:https://blog.csdn.net/Kevinwen0228/article/details/125991334