• Zookeeper-JavaApI操作


    JavaApI操作

    1) Curator 介绍

    Curator 是 Apache ZooKeeper 的Java客户端库。

    常见的ZooKeeper Java API :

    • 原生Java API
    • ZkClient
    • Curator

    Curator 项目的目标是简化 ZooKeeper 客户端的使用。

    Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。

    官网:http://curator.apache.org/

    2) Curator API 常用操作

    a) 建立连接与CRUD基本操作

    Curator API 常用操作:

    • 建立连接
      • 第一种方法:CuratorFrameworkFactory.newClient
      • 第二种方法:CuratorFrameworkFactory.builder() 推荐
    /**
    * 建立连接
    */
    @Test
    public void testConnect() {
        /**
         * connectString – 连接字符串 zk server 地址和端口 "192.168.200.130:2181"
         * sessionTimeoutMs – 会话超时时间 单位ms
         * connectionTimeoutMs – 连接超时时间 单位ms
         * retryPolicy – 重试策略
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        // 1.第一种方式
        CuratorFramework client1 = CuratorFrameworkFactory.newClient("192.168.200.130:2181", 60 * 1000, 15 * 1000, retryPolicy);
        client1.start();
    
        // 2.第二种方式
        CuratorFramework client2 = CuratorFrameworkFactory.builder()
            .connectString("192.168.200.130:2181")
            .sessionTimeoutMs(60 * 1000)
            .connectionTimeoutMs(15 * 1000)
            .retryPolicy(retryPolicy)
            .namespace("dcy") // 名称空间
            .build();
        client2.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 添加节点
      • 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
      • 默认类型:持久化
      • creatingParentsIfNeeded 如果父节点不存在,则创建父节点
    /**
    * 创建节点: create 持久 临时 顺序 数据
    */
    @Test
    public void testCreate1() throws Exception {
        // 1.基本创建
        // 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
        String rs = client2.create().forPath("/app1");
        System.out.println("rs = " + rs);
    }
    
    @Test
    public void testCreate2() throws Exception {
        // 2.创建节点 带有数据
        String rs = client2.create().forPath("/app2", "hehe".getBytes());
        System.out.println("rs = " + rs);
    }
    
    @Test
    public void testCreate3() throws Exception {
        // 3.设置节点的类型
        // 默认类型:持久化
        String rs = client2.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
        System.out.println("rs = " + rs);
    }
    
    @Test
    public void testCreate4() throws Exception {
        // 4.创建多级节点
        // creatingParentsIfNeeded 如果父节点不存在,则创建父节点
        String rs = client2.create().creatingParentsIfNeeded().forPath("/app4/p1");
        System.out.println("rs = " + rs);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 删除节点
      • 1.删除单个节点 delete().forPath()
      • 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath()
      • 3.必须成功的删除(为了防止网络抖动,本质是重试) delete().guaranteed().forPath()
      • 4.回调 inBackground
    /**
    * 删除节点: delete, deleteall
    */
    @Test
    public void testDelete1() throws Exception {
        // 1.删除单个节点
        client2.delete().forPath("/app1");
    }
    
    @Test
    public void testDelete2() throws Exception {
        // 2.删除带有子节点的节点
        client2.delete().deletingChildrenIfNeeded().forPath("/app4");
    }
    
    @Test
    public void testDelete3() throws Exception {
        // 3.必须成功的删除 (可能会因为网络抖动等原因,操作超时)
        client2.delete().guaranteed().forPath("/app2");
    }
    
    @Test
    public void testDelete4() throws Exception {
        // 4.回调
        client2.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("我被删除了~");
                System.out.println(event);
            }
        }).forPath("/app1");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 修改节点
      • 1.修改数据 setData().forPath()
      • 2.根据版本修改 setData().withVersion().forPath()
        • version 是通过查询出来的,目的是为了让其他客户端和线程不干扰我
    /**
    * 修改数据
    */
    @Test
    public void testSet() throws Exception {
        client2.setData().forPath("/app1", "dcy".getBytes());
    }
    
    @Test
    public void testSetForVersion() throws Exception {
        Stat stat = new Stat();
        client2.getData().storingStatIn(stat).forPath("/app1");
    
        int version = stat.getVersion(); // 查询出来
        client2.setData().withVersion(version).forPath("/app1", "haha".getBytes());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 查询节点
      • 1.查询数据:get: getData().forPath()
      • 2.查询子节点:ls: getChildren().forPath()
      • 3.查询节点状态信息:ls -s: getData().storingStatIn(状态对象).forPath()
    /**
    * 查询节点:
    *  1.查询数据:get: getData().forPath()
    *  2.查询子节点:ls: getChildren().forPath()
    *  3.查询节点状态信息:ls -s: getData().storingStatIn(状态对象).forPath()
    */
    @Test
    public void testGet1() throws Exception {
        // 1.查询数据:get
        byte[] bytes = client2.getData().forPath("/app1");
        System.out.println(new String(bytes));
    }
    
    @Test
    public void testGet2() throws Exception {
        // 2.查询子节点:ls
        List<String> path = client2.getChildren().forPath("/");
        System.out.println(path);
    }
    
    @Test
    public void testGet3() throws Exception {
        // 3.查询节点状态信息:ls -s
        Stat status = new Stat();
        client2.getData().storingStatIn(status).forPath("/app1");
        System.out.println("status = " + status);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    b) Watch事件监听

    ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

    ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

    ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便 需要开发人员自己反复注册Watcher,比较繁琐。

    Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

    ZooKeeper提供了三种Watcher:

    • NodeCache : 只是监听某一个特定的节点
    • PathChildrenCache : 监控一个ZNode的子节点.
    • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

    1.NodeCache : 监听一个特定的节点

    /**
    * 演示NodeCache
    */
    @Test
    public void testNodeCache() throws Exception {
        // 1.创建NodeCache监听对象
        final NodeCache nodeCache = new NodeCache(client, "/app1");
        // 2.注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化~~~");
    
                // 获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        // 3.开启监听,如果设置true,则开启监听是,加载缓存数据
        nodeCache.start(true);
    
        while (true) {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.PathChildrenCache:监听某个节点的所有子节点

    /**
    * 演示PathChildrenCache:监听某个节点的所有子节点
    */
    @Test
    public void testPathChildrenCache () throws Exception {
        // 1.创建监听对象
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
    
        // 2.绑定监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子节点变化~~~");
                System.out.println("event = " + event);
                // 监听子节点的数据变更,并且拿到变更后的数据
                // 1.获取类型
                PathChildrenCacheEvent.Type type = event.getType();
                // 2.判断类型是否是update
                if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    byte[] data = event.getData().getData();
                    System.out.println(new String(data));
                }
            }
        });
    
        // 3.开启监听
        pathChildrenCache.start();
    
        while (true) {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.TreeCache:监听某个节点自己和所有的子节点

    /**
    * 演示TreeCache:监听某个节点自己和所有的子节点
    */
    @Test
    public void testTreeCache () throws Exception {
        // 1.创建监听器
        TreeCache treeCache = new TreeCache(client, "/app2");
    
        // 2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("节点变化了");
                System.out.println("event = " + event);
            }
        });
    
        // 3.开启监听
        treeCache.start();
    
        while (true) {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    c) 分布式锁

    c.1) 介绍

    在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。

    但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。

    在这里插入图片描述

    那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

    在这里插入图片描述

    c.2) Zookeeper分布式锁原理

    核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除节点

    • 1.客户端获取锁时,在lock节点下创建临时顺序节点
    • 2.然后获取lock下面的所有子节点,客户端获取到所有的子节点后。如果发现自己创建的节点顺序最小,那就认为该客户端获取到了锁。使用完锁后,将该节点删除
    • 3.如果发现自己创建的节点并发lock所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
    • 4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点 是否是lock子节点中序号最小的。如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点,并注册监听。
    c.3) 案例:模拟12306售票

    Curator实现分布式锁API

    在Curator中有五种锁方案:

    • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
    • InterProcessMutex:分布式可重入排它锁
    • InterProcessReadWriteLock:分布式读写锁
    • InterProcessMultiLock:将多个锁作为单个实体管理的容器
    • InterProcessSemaphoreV2:共享信号量

    在这里插入图片描述

    // 测试类
    public class LockTest {
        public static void main(String[] args) {
            Ticket12306 ticket12306 = new Ticket12306();
            // 创建客户端
            Thread t1 = new Thread(ticket12306, "携程");
            Thread t2 = new Thread(ticket12306, "飞猪");
    
            t1.start();
            t2.start();
        }
    }
    
    // Ticket12306
    public class Ticket12306 implements Runnable{
    
        private int tickets = 10; // 数据库的票数
    
        private InterProcessMutex lock; // 创建锁
    
        public Ticket12306() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("192.168.200.130:2181")
                    .sessionTimeoutMs(60 * 1000)
                    .connectionTimeoutMs(15 * 1000)
                    .retryPolicy(retryPolicy)
                    .build();
            client.start();
    
            this.lock = new InterProcessMutex(client, "/lock");
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    // 获取锁
                    lock.acquire(3, TimeUnit.SECONDS);
                    if (tickets > 0) {
                        System.out.println(Thread.currentThread().getName() + ":" + tickets);
                        tickets--;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 释放锁
                    try {
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    JavaScript 中的数组类型
    在 Node.js 中发出 HTTP 请求的 5 种方法
    智慧公厕建设的好处
    .[nicetomeetyou@onionmail.org].faust深入剖析勒索病毒及防范策略
    简单好用的文档管理系统MinDoc
    ubuntu忘记mysql密码,怎么办
    Node-RED系列(二九):使用slider与chart节点来实现双折线时间序列图
    Nginx
    OpenHarmony实战开发-如何实现tabContent内容可以在tabBar上显示并且tabBar可以响应滑动事件的功能。
    css 图片好玩的一个属性,添加滤镜
  • 原文地址:https://blog.csdn.net/weixin_46926189/article/details/133192658