• zookeeper学习记录


    本文Java代码地址: https://gitee.com/blackjie_1/ljUp/tree/master/zookeeperDemo

    个人博客网站:什么是快乐

    基于docker 安装

    拉取zookeeper 3.4.10

    docker pull zookeeper:3.4.10
    
    • 1

    启动服务端

    docker run -d -p 2181:2181 -v /root/docker/zookeeper:/home/zookeeper --name zookeeper_1 --restart always zookeeper:3.4.10
    
    • 1

    启动客户端

    docker run -it --``rm` `--link zookeeper_one:zookeeper zookeeper zkCli.sh -server zookeeper
    
    • 1

    或者

    docker exec -it zookeeper_1 zkCli.sh
    
    • 1

    数据模型

    其数据结构类似一个树形结构,每个节点可以拥有子节点并能存储1M的数据

    1、持久化

    2、临时节点 -e

    3、持久化顺序节点 -s

    4、临时顺序节点 -es

    客户端命令

    连接本地zookeeper
    docker exec -it zookeeper_1 zkCli.sh
    
    • 1

    或者

    docker run -it --rm --link zookeeper_1:zookeeper_1 zookeeper:3.4.10  zkCli.sh -server zookeeper_1
    
    • 1
    退出
    quit
    
    • 1
    查看节点
    ls  /节点名称
    
    • 1
    创建节点
    create /节点名称 [数据] 
    
    create -e /节点名称 [数据]    临时节点,当前会话断开时,临时节点会删除
    
    create -s /节点名称 [数据]    顺序节点,节点名称后会有编号
      
    create -s /节点名称 [数据]    临时的顺序节点
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    获取数据
    get /节点名称
    
    • 1
    设置数据
    set /节点名称 [数据]
    
    • 1
    删除
    delete  /节点名称
    
    delete all /节点名称  删除节点及节点下所有节点
    
    • 1
    • 2
    • 3

    Java代码操作

    maven依赖

    
    
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-clientartifactId>
                <version>2.12.0version>
    
            dependency>
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-frameworkartifactId>
                <version>2.12.0version>
            dependency>
    
            <dependency>
                <groupId>org.apache.curatorgroupId>
                <artifactId>curator-recipesartifactId>
                <version>2.12.0version>
            dependency>
            <dependency>
                <groupId>junitgroupId>
                <artifactId>junitartifactId>
            dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    增删改查

    连接
    public void curatorFramework() {
            //重试策略
            ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);
    
            //第一种方式
    //        CuratorFramework client = CuratorFrameworkFactory
    //                .newClient(zookeeperUrl, 60000, 15000, exponentialBackoffRetry);
    
            //第二种方法
            client = CuratorFrameworkFactory.builder()
                    .connectString("192.168.106.128:2181")
                    .connectionTimeoutMs(15000)
                    .sessionTimeoutMs(60000)
                    .retryPolicy(exponentialBackoffRetry)
                    .namespace("lj")
                    .build();
    
            client.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    新增
        /**
         * zookeeper 创建节点 持久、临时、顺序  带有数据
         * create
         * 1、创建节点并带有数据
         * 2、设置节点类型
         * 3、创建多级节点
         */
        @Test
        public void curatorCreate() throws Exception {
            //基本创建
            String app1 = client.create().forPath("/app1");
            //创建带有数据的节点
            String s1 = client.create().forPath("/app2", "李杰_app2".getBytes(StandardCharsets.UTF_8));
            //创建节点默认类型:持久化。可通过withMode方法设置类型
            String s2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "李杰_临时".getBytes(StandardCharsets.UTF_8));
            //创建多级节点
            //creatingParentsIfNeeded  :创建多级节点,如果父节点不存在则创建父节点
            String s3 = client.create().creatingParentsIfNeeded().forPath("/app4/app5", "李杰_多级节点".getBytes(StandardCharsets.UTF_8));
    
            System.out.println(s3);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    查询
     /**
         * zookeeper 查询节点
         * 1、查询字节点 ls /
         * 2、获取数据  get /
         * 3、查询节点状态 ls -s
         */
        @Test
        public void curatorQuery() throws Exception {
            //获取数据  getData
            byte[] bytes = client.getData().forPath("/app1");
            //查询子节点  getChildren
            List<String> strings = client.getChildren().forPath("/");
    
            //查询子节点信息+数据信息
            //stat 用于获取节点信息,结果会放在stat对象中
            Stat stat = new Stat();
            byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/app2");
    
            System.out.println(new String(bytes1));
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    修改
    /**
         * zookeeper 修改节点数据
         * 1、修改数据
         * 2、根据版本修改数据
         */
        @Test
        public void curatorUpdate() throws Exception {
            //1、修改数据
    //        client.setData().forPath("/app2","app2_修改".getBytes(StandardCharsets.UTF_8));
    
            //2、根据版本修改数据  withVersion
            //获取版本号
            Stat stat = new Stat();
            client.getData().storingStatIn(stat).forPath("/app2");
            //根据版本号修改数据
            client.setData()
                    .withVersion(stat.getVersion())
                    .forPath("/app2","app2_version_update".getBytes(StandardCharsets.UTF_8));
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    删除
    /**
         * zookeeper 删除节点
         * 1、删除单个节点
         * 2、删除带有子节点的节点
         * 3、必须成功的删除
         * 4、回调函数
         */
        @Test
        public void curatorDelete() throws Exception {
            //1、删除数据
            client.delete().forPath("/app1");
    
            //2、删除带有子节点的节点   deletingChildrenIfNeeded
            client.delete().deletingChildrenIfNeeded().forPath("/app4");
    
            //3、删除子节点 (必须删除成功,本质是重试策略) guaranteed
            client.delete().guaranteed().forPath("/app4");
    
            //4、回调函数 inBackground 。在删除后执行的方法
            client.delete().guaranteed().inBackground(new BackgroundCallback(){
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    System.out.println("我被删除了");
                    System.out.println(curatorEvent);
                }
            }).forPath("/app4");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    监听器

    * 监听节点
    * 1、监听单个节点   nodeCache
    * 2、监听节点下所有子节点  PathChildrenCache
    * 3、监听单个节点和节点下的所有子节点  TreeCache
    
    • 1
    • 2
    • 3
    • 4
    监听单个节点
    @Test
        public void curatorFrameworkWatch() throws Exception {
            //监听单个节点
            //1、创建监听对象
            NodeCache nodeCache = new NodeCache(client,"/app2",false);
    
            //2、注册监听
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    byte[] data = nodeCache.getCurrentData().getData();
                    System.out.println("节点发生改变,当前值:"+new String(data));
                }
            });
            //3、开启监听
            nodeCache.start();
            while (true){
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    监听某节点下的所有子节点
    /**
         * 监听某节点的所有子节点
         * @throws Exception
         */
        @Test
        public void curatorFrameworkWatchChildren() throws Exception {
    
            //监听某节点的所有子节点
            //1、创建监听对象PathChildrenCache
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
    
            //2、注册监听
            pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    //会监听很多数据,包括节点新增,删除,修改连接等
                    System.out.println("节点发生改变");
                    System.out.println(event);
    
                    //监听子节点数据发生变化
                    if(PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(event.getType())){
                        // 确实是子节点数据发生变化,获取变化后的值
                        byte[] data = event.getData().getData();
                        String s = new String(data);
                        System.out.println(s);
                    }
                }
            });
            //3、开启监听
            pathChildrenCache.start();
            while (true){
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    监听某节点和其所有的子节点
     /**
         * 监听某节点和其所有子节点
         * @throws Exception
         */
        @Test
        public void curatorFrameworkWatchAll() throws Exception {
    
            //1、创建监听对象PathChildrenCache
            TreeCache pathChildrenCache = new TreeCache(client, "/app2");
    
            //2、注册监听
            pathChildrenCache.getListenable().addListener(new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
                    //会监听很多数据,包括节点新增,删除,修改连接等
                    System.out.println("节点发生改变");
                    System.out.println(event);
    
                    //监听子节点数据发生变化
                    if(TreeCacheEvent.Type.NODE_UPDATED.equals(event.getType())){
                        // 确实是节点数据发生变化,获取变化后的值
                        byte[] data = event.getData().getData();
                        String s = new String(data);
                        System.out.println(s);
                    }
                }
            });
            //3、开启监听
            pathChildrenCache.start();
            while (true){
    
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    分布式锁

    简略概念:多机器下的对于锁的处理。实现方式:

    1、redis (性能高,但是不是很可靠)

    2、数据库实现(获得锁:数据库新增一条唯一数据。释放锁:删除新增的数据。锁等待:等新增成功。此思想同样可以用redis实现。)

    3、zookeeper

    在这里插入图片描述

    Java代码实现

    本次使用的锁是InterProcessMutex

    主要步骤:

    1、构建CuratorFramework client 对象

    2、通过client 构建InterProcessMutex 对象:lock= new InterProcessMutex(client, “/lock”);

    3、执行业务前获取锁:boolean acquire = lock.acquire(5, TimeUnit.SECONDS);

    4、业务结束后释放锁:lock.release();

    模拟售票

    public class ZookeeperLockTests {
    
       private static class Tick12306{
            private int tick=100;
    
            public int buyTick(){
                int result=0;
                if(tick>0){
                    result=tick;
                    tick--;
                }else{
                    System.out.println("无票了");
                    return -1000;
                }
                return result;
            }
       }
       private static class OtherTick implements Runnable{
           //抢票机构名称
           private String name;
    
           //12306票池
           private Tick12306 tick12306;
    
           //分布式锁
           private InterProcessMutex lock;
    
           public OtherTick(String name,Tick12306 tick12306){
               this.name=name;
               this.tick12306=tick12306;
               //重试策略
               ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);
    
               CuratorFramework client = CuratorFrameworkFactory.builder()
                       .connectString("192.168.106.128:2181")
                       .connectionTimeoutMs(15000)
                       .sessionTimeoutMs(60000)
                       .retryPolicy(exponentialBackoffRetry)
                       .namespace("lj")
                       .build();
    
               client.start();
    
               lock = new InterProcessMutex(client, "/lock");
           }
    
           //抢票
           @Override
           public void run() {
    
               while (tick12306.tick>0){
                   try {
                       //获取锁
                       boolean acquire = lock.acquire(5, TimeUnit.SECONDS);
                       if(acquire){
                           System.out.println(this.name+"抢票:"+tick12306.buyTick());
                       }
                   }catch (Exception e){
                       e.printStackTrace();
                   }finally {
                       try {
                           //锁释放
                           lock.release();
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   }
    
               }
           }
       }
    
    
        public static void main(String[] args) {
            Tick12306 tick12306 = new Tick12306();
            OtherTick t1 = new OtherTick("携程", tick12306);
            OtherTick t2 = new OtherTick("飞猪", tick12306);
            Thread thread1 = new Thread(t1);
            Thread thread2 = new Thread(t2);
            thread1.start();
            thread2.start();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
  • 相关阅读:
    2022谷粒商城学习笔记(二十五)支付宝沙箱模拟支付
    【React源码】(十六)React 合成事件
    R语言多元Copula GARCH 模型时间序列预测
    Springboot项目多模块打包jar移动到指定目录,docker打jar包构建镜像部署并运行
    [普通物理] 半波损失 等厚与等倾干涉
    快速幂算法
    JVM基础-Hotspot VM相关知识学习
    【秒杀办法】根据二叉树的先序遍历、中序遍历、后序遍历快速创建二叉树
    狄拉克函数及其性质
    我代码就加了一行 log 日志,结果引发了 P1 的线上事故
  • 原文地址:https://blog.csdn.net/weixin_43887870/article/details/134450046