• Zookeeper


    一、介绍

    ZooKeeper是一个开源的分布式应用程序协调服务,是Hadoop和Hbase的重要组件,提供配置维护、域名服务、分布式同步、分布式锁等功能。

    Zookeeper是树形目录结构,和Unix文件系统类似,每一个节点都会保存少量(1MB)数据和节点信息,节点下有子节点。

    二、docker安装

    1.单机模式

    编写配置文件zoo.cfg

    # 基本时间单位(ms) 用于维持心跳连接
    tickTime=2000
    # 客户端连接并同步到服务器的tick时间量
    initLimit=5
    # 客户端同步到服务器的tick时间量 如果超过会断开会话
    syncLimit=2
    # 存放数据目录路径
    dataDir=../data
    # 存放日志目录路径
    dataLogDir=../logs
    # 客户端连接端口号
    clientPort=2181
    # 最大客户端连接数
    maxClientCnxns=60
    # 自动清除会保存dataDir、dataLogDir中的 autopurge.snapRetainCount 最新快照和相应的事务日志,并删除其余快照
    # 自动清除任务的时间间隔(以小时为单位)
    autopurge.purgeInterval=1
    # 自动清除保留的快照数量
    autopurge.snapRetainCount=3
    # 单机模式
    standaloneEnabled=true
    # 停用adminServer 8080端口
    admin.enableServer=false

    docker run -d -p 2181:2181 --name zookeeper --privileged=true
    -v /docker/registry/zookeeper/conf:/apache-zookeeper-3.7.1-bin/conf 
    -v /docker/registry/zookeeper/data:/apache-zookeeper-3.7.1-bin/data 
    -v /docker/registry/zookeeper/logs:/apache-zookeeper-3.7.1-bin/logs 
    zookeeper:3.7.1-temurin
    ​
    # 使用zkCli.sh连接
    docker exec -it zookeeper zkCli.sh

    2.集群模式

    zookeeper集群角色:

    角色职责
    leader处理增删改(事务请求),同步数据到其他节点
    follower处理查询(非事务请求),参与leader选举
    observer处理查询(非事务请求),缓解follower节点压力,不参与leader选举

    编写docker-compose.yaml

    version: '3.9'
    ​
    services:
      zoo1:
        image: zookeeper:3.7.1-temurin
        container_name: zoo1
        hostname: zoo1
        ports:
          - 2181:2181
        networks:
          - zookeeper_net
        volumes:
          - /docker/registry/zookeeper/zoo1/conf:/apache-zookeeper-3.7.1-bin/conf
          - /docker/registry/zookeeper/zoo1/data:/apache-zookeeper-3.7.1-bin/data
          - /docker/registry/zookeeper/zoo1/logs:/apache-zookeeper-3.7.1-bin/logs
        environment:
          ZOO_MY_ID: 1
          ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    ​
      zoo2:
        image: zookeeper:3.7.1-temurin
        container_name: zoo2
        hostname: zoo2
        ports:
          - 2182:2181
        networks:
          - zookeeper_net
        volumes:
          - /docker/registry/zookeeper/zoo2/conf:/apache-zookeeper-3.7.1-bin/conf
          - /docker/registry/zookeeper/zoo2/data:/apache-zookeeper-3.7.1-bin/data
          - /docker/registry/zookeeper/zoo2/logs:/apache-zookeeper-3.7.1-bin/logs
        environment:
          ZOO_MY_ID: 2
          ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    ​
      zoo3:
        image: zookeeper:3.7.1-temurin
        container_name: zoo3
        hostname: zoo3
        ports:
          - 2183:2181
        networks:
          - zookeeper_net
        volumes:
          - /docker/registry/zookeeper/zoo3/conf:/apache-zookeeper-3.7.1-bin/conf
          - /docker/registry/zookeeper/zoo3/data:/apache-zookeeper-3.7.1-bin/data
          - /docker/registry/zookeeper/zoo3/logs:/apache-zookeeper-3.7.1-bin/logs
        environment:
          ZOO_MY_ID: 3
          ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    ​
    networks:
      zookeeper_net:

    docker compose config -q
    docker compose up -d

    zoo3节点先运行,编号最大,经过选举后称为leader

    故障测试

    zoo1、zoo2其中一个节点宕机,其余节点均可正常运行,当可运行的节点未超过集群总节点数量的一半时,集群不可用

    当zoo1、zoo2重新连接后仍然为follower

    当zoo3 leader节点宕机后,会在剩余节点中重新选举出leader,zoo2的序号更大优先级更高选为leader

    就算zoo3节点重新连接,会成为follower

    三、基本命令

    # 进入容器对应bin目录
    docker exec -it zookeeper bash
    cd bin
    ​
    # 查看当前服务器状态
    zkServer.sh status 
    # 关闭服务器
    zkServer.sh stop
    # 开启服务器
    zkServer.sh start
    # 重启服务器
    zkServer.sh restart

    帮助命令help:

    ls path # 查看路径下的子节点
    create [-s] [-e] path data # 创建节点并设置数据 默认为持久节点 -s 指定为顺序节点 -e指定为临时节点当前会话结束后自动删除
    get path # 获取节点的数据
    set path data # 设置节点的数据
    delete path # 删除节点
    deleteall path # 删除节点及其子节点

    close # 关闭当前连接
    connect # 连接服务器
    ls -s path #查看节点的子节点和属性信息

    创建临时节点:

    创建顺序节点:

    属性名解释
    cZid创建节点的事务id
    ctime创建时间
    mZxid最后一次更新的事务id
    mtime修改时间
    pZxid子节点列表最后一次更新的事务id
    cversion子节点版本号
    dataVersion数据版本号
    aclVersion权限版本号
    ephemeralOwner临时节点事务id,持久节点为0
    dataLength节点存储数据长度
    numChildren当前节点子节点个数

    四、Java API

    1.基本操作
    @SpringBootTest
    class ZooKeeperApplicationTests {
    ​
        @Value("${zookeeper.address:localhost:2181}")
        private String address;
    ​
        @Value("${zookeeper.path:test}")
        private String path;
    ​
        private CuratorFramework client;
    ​
        /*
        * 创建连接
        */
        @BeforeEach
        void connect() {
            client = CuratorFrameworkFactory.builder()
                    .connectString(address)
                    .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                    .namespace(path)
                    .build();
            client.start();
        }
    ​
        /*
        * 创建节点 
        * creatingParentsIfNeeded() 创建多级节点
        * CreateMode.EPHEMERAL 创建临时节点在程序执行完毕后 节点自动删除
        */
        @Test
        void createNode() throws Exception {
            //String path = client.create().forPath("/node1"); //默认节点数据为IP地址
            //String path = client.create().forPath("/node1", "1".getBytes(StandardCharsets.UTF_8));
            //String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/node1", "1".getBytes(StandardCharsets.UTF_8));
            //String path = client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node1", "1".getBytes(StandardCharsets.UTF_8));
            String path = client.create().creatingParentsIfNeeded().forPath("/node1/a");
            System.out.println(path);
            System.in.read();
        }
    ​
        /*
        * 查询节点信息
        */
        @Test
        void getNode() throws Exception {
            Stat status = new Stat();
            byte[] bytes = client.getData().storingStatIn(status).forPath("/node1");
            System.out.println("节点数据: " + new String(bytes));
            System.out.println("节点状态信息: " + status);
    ​
            List children = client.getChildren().forPath("/node1");
            System.out.println("子节点: " + children);
        }
    ​
        /*
        * 修改节点信息 withVersion() 携带版本号修改避免并发修改异常(乐观锁)
        */
        @Test
        void setNode() throws Exception {
            Stat status = new Stat();
            byte[] bytes = client.getData().storingStatIn(status).forPath("/node1");
            int version = status.getVersion();
            System.out.printf("原本数据: %s, version: %d\n", new String(bytes), version);
            status = client.setData().withVersion(version).forPath("/node1", "update".getBytes(StandardCharsets.UTF_8));
            bytes = client.getData().storingStatIn(status).forPath("/node1");
            System.out.printf("修改后数据: %s, version: %d", new String(bytes), status.getVersion());
        }
    ​
        /*
        * 删除节点
        */
        @Test
        void deleteNode() throws Exception {
            client.delete().guaranteed().deletingChildrenIfNeeded().inBackground((client1, event) -> {
                int resultCode = event.getResultCode();
                System.out.println("删除节点是否成功: " + resultCode);
            }).forPath("/node1");
        }
    ​
    ​
        /*
        * 断开连接
        */
        @AfterEach
        void close() {
            if (client != null) {
                client.close();
            }
        }
    }

    测试:

    创建节点时未设置数据,默认为IP地址

    2.监听
    监听方式解释
    NodeCache只能监听到对于自身的信息修改、删除
    PathChildrenCache只能监听到对于子节点的增删改
    TreeCache能监听到自身和子节点的变化
    @SpringBootTest
    class CuratorWatchTests {
    ​
        @Value("${zookeeper.address:localhost:2181}")
        private String address;
    ​
        @Value("${zookeeper.path:test}")
        private String path;
    ​
        private CuratorFramework client;
    ​
        /**
        * 创建连接
        */
        @BeforeEach
        void connect() {
            client = CuratorFrameworkFactory.builder()
                    .connectString(address)
                    .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                    .namespace(path)
                    .build();
            client.start();
        }
    ​
        /**
        * NodeCache 创建CuratorCache时需要配置SINGLE_NODE_CACHE 只缓存根节点
        */
        @Test
        void nodeCache() throws Exception {
            CuratorCache curatorCache = CuratorCache.build(client, "/node", CuratorCache.Options.SINGLE_NODE_CACHE);
            CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(() -> {
                System.out.println("【NODE_UPDATED】当前节点信息修改...");
            }).build();
    ​
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
            System.in.read();
        }
    ​
        /**
        * PathChildrenCache
        */
        @Test
        void pathChildrenCache() throws Exception {
            CuratorCache curatorCache = CuratorCache.build(client, "/path");
            CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache("/path", client, (client, event) -> {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CHILD_ADDED:
                        System.out.println("【CHILD_ADDED】当前节点添加子节点...");
                        break;
                    case CHILD_UPDATED:
                        System.out.println("【CHILD_UPDATED】当前节点子节点信息修改...");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("【CHILD_REMOVED】当前节点删除子节点");
                        break;
                }
                ChildData childData = event.getData();
                if (childData != null) {
                    byte[] d = childData.getData();
                    if (d != null && d.length != 0) {
                        System.out.println("《INFO》当前节点信息: " + new String(d));
                    }
                }
            }).build();
    ​
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
            System.in.read();
        }
    ​
        /**
        * TreeCache
        */
        @Test
        void treeCache() throws Exception {
            CuratorCache curatorCache = CuratorCache.build(client, "/tree", CuratorCache.Options.SINGLE_NODE_CACHE);
            CuratorCacheListener listener = CuratorCacheListener.builder().forTreeCache(client, (client, event) -> {
                ChildData oldData = event.getOldData();
                if (oldData != null) {
                    byte[] od = oldData.getData();
                    if (od != null && od.length != 0) {
                        System.out.println("《OLD_INFO》当前节点旧信息: " + new String(od));
                    }
                }
                TreeCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case NODE_ADDED:
                        System.out.println("【NODE_ADDED】当前节点添加子节点...");
                        break;
                    case NODE_UPDATED:
                        System.out.println("【NODE_UPDATED】当前节点或子节点信息修改...");
                        break;
                    case NODE_REMOVED:
                        System.out.println("【NODE_REMOVED】当前节点或子节点删除");
                        break;
                }
                ChildData newData = event.getData();
                if (newData != null) {
                    byte[] nd = newData.getData();
                    if (nd != null && nd.length != 0) {
                        System.out.println("《NEW_INFO》当前节点新信息: " + new String(nd));
                    }
                }
            }).build();
    ​
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
            System.in.read();
        }
    ​
        /**
        * new API
        */
        @Test
        void watch() throws Exception {
            CuratorCache curatorCache = CuratorCache.builder(client, "/node1").build();
            CuratorCacheListener listener = CuratorCacheListener.builder()
                    .forInitialized(() -> {
                        System.out.println("【INITIALIZED】初始化...");
                    })
                    .forCreates(childData -> {
                        byte[] data = childData.getData();
                        System.out.println("【NODE_CREATED】新增节点...");
                        if (data != null && data.length != 0) {
                            System.out.println("《INFO》节点信息: " + new String(data));
                        }
                    })
                    .forChanges((oldNode, newNode) -> {
                        byte[] od = oldNode.getData();
                        byte[] nd = newNode.getData();
                        System.out.println("【NODE_CHANGED】修改节点信息...");
                        if (od != null && od.length != 0) {
                            System.out.println("《INFO》节点旧信息: " + new String(od));
                        }
                        if (nd != null && nd.length != 0) {
                            System.out.println("《INFO》节点新信息: " + new String(nd));
                        }
                    })
                    //.forCreatesAndChanges() //监听节点创建和修改
                    .forDeletes(childData -> {
                        byte[] data = childData.getData();
                        System.out.println("【NODE_DELETED】删除节点...");
                        if (data != null && data.length != 0) {
                            System.out.println("《INFO》节点信息: " + new String(data));
                        }
                    })
                    .forAll((type, oldData, data) -> {
                        System.out.println("【ALL】节点发生增删改...");
                    })
                    .build();
    ​
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
            System.in.read();
        }
    ​
        /**
        * 断开连接
        */
        @AfterEach
        void close() {
            if (client != null) {
                client.close();
            }
        }
    }

    测试NodeCache:

    测试PathChildrenCache:

    测试TreeCache:

    测试新API:

    五、分布式锁

    Zookeeper实现分布式锁流程:

    1. 客户端获取锁,在lock节点下创建临时顺序节点(-es)

    2. 然后获取lock下的所有子节点,如果发现自己创建的子节点序号最小,则认为获取到锁,使用完后,断开连接,临时节点被删除

    3. 如果发现自己创建的子节点序号不是最小的,就找到序号比自己小的相邻的子节点,同时对其监听删除事件

    4. 如果发现比自己小的相邻的子节点被删除,触发监听事件重复2

    Curator实现分布式锁:

    介绍
    InterProcessMutex可重入锁
    InterProcessSemaphoreMutex不可重入锁
    InterProcessMultiLock将多个锁作为一个锁使用
    InterProcessSemaphoreV2信号量,可以控制同时有多个线程获取锁
    InterProcessReadWriteLock读写锁
    @Component
    public class TicketSeller implements Runnable {
    ​
        @Value("${zookeeper.address:localhost:2181}")
        private String address;
    ​
        @Value("${zookeeper.lock-path:/lock}")
        private String lockPath;
    ​
        private CuratorFramework client;
    ​
        private static InterProcessLock lock;
    ​
        private static InterProcessSemaphoreV2 semaphore;
    ​
        private static InterProcessReadWriteLock readWriteLock;
    ​
        private static int ticketNumber = 20;
    ​
        @PostConstruct
        public void init() {
            connect();
            initLock();
        }
    ​
        @Override
        public void run() {
            boolean acquire = false;
            Lease lease = null;
            try {
                while (true) {
                    if (lock != null) {
                        acquire = lock.acquire(1, TimeUnit.SECONDS);
                    }
    ​
                    if (semaphore != null) {
                        lease = semaphore.acquire(1, TimeUnit.SECONDS);
                    }
    ​
                    if (readWriteLock != null) {
                        acquire = readWriteLock.writeLock().acquire(1, TimeUnit.SECONDS);
                    }
    ​
                    if (ticketNumber > 0) {
                        ticketNumber--;
                        System.out.println("窗口-" + Thread.currentThread().getName() + "卖掉一张票, 剩余票数" + ticketNumber);
                    } else {
                        break;
                    }
                }
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                try {
                    if (acquire) {
                        if(lock != null) {
                            lock.release();
                        }
    ​
                        if(readWriteLock != null){
                            readWriteLock.writeLock().release();
                        }
                    }
    ​
                    if (lease != null) {
                        lease.close();
                    }
                    
                    TimeUnit.MILLISECONDS.sleep(1000);
    ​
                    if (ticketNumber == 0) {
                        close();
                        System.out.println("票已售空...");
                    }
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                }
            }
        }
    ​
        private void connect() {
            client = CuratorFrameworkFactory.builder()
                    .connectString(address)
                    .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                    .build();
            client.start();
        }
    ​
        private void initLock() {
            //1. 可重入锁
            lock = new InterProcessMutex(client, lockPath);
    ​
            //2. 不可重入锁
            //lock = new InterProcessSemaphoreMutex(client, lockPath);
    ​
            //3. 将多个锁作为一个锁使用 当调用acquire()时, 将获取所有锁;如果失败,则会释放所获取的所有路径
            //lock = new InterProcessMultiLock(client, Collections.singletonList(lockPath));
    ​
            //4. 信号量 可以控制同时有多个线程获取锁
            //semaphore = new InterProcessSemaphoreV2(client, lockPath, 3);
    ​
            //5. 读写锁(可重入) 读锁共享 写锁独占
            //readWriteLock = new InterProcessReadWriteLock(client, lockPath);
        }
    ​
        private void close() {
            if (client != null) {
                client.close();
            }
        }
    }
    @SpringBootTest
    class TicketSellerTests {
    ​
        @Autowired
        private TicketSeller ticketSeller;
    ​
        @Test
        void sell() throws IOException {
            System.out.println("开始售票...");
            new Thread(ticketSeller, "A").start();
            new Thread(ticketSeller, "B").start();
            new Thread(ticketSeller, "C").start();
            System.in.read();
        }
    }

    测试InterProcessMutex:

    测试InterProcessSemaphoreMutex:

    测试InterProcessReadWriteLock:

  • 相关阅读:
    leetcode练习
    Dubbo使用入门xml配置
    MVCC机制与BufferPool缓存机制
    机器学习 —— 朴素贝叶斯
    JVM堆内存转储
    C\C++基础
    S32 Design Studio for ARM 2.2 快速入门
    Embind方便的实现js与C++的交互
    LeetCode算法心得——全排列(回溯型排列)
    docker build不输出echo内容,不打印构建过程
  • 原文地址:https://blog.csdn.net/qingsongxyz/article/details/134456795