• ZooKeeper


    一、概述

    1、为什么使用zk

    随着应用规模的迅速扩张,单台机器的部署已经难以支撑用户大规模、高并发的请求了, 因此服务化、集群化、分布式概念应运而生。 针对这种场景,人们通常使用的做法就是将软件按照模块进行拆分,形成独立的子系统,然后在局域网内部署到多台机器上面, 形成了一个集群。 这种方式即可以分滩请求压力,又可以起到灾备的效果。

    然而, 集群的维护和多节点应用程序的协作运行远比单机模式复杂,需要顾及到的细节问题实在太多,比如说同一分配置在多台机器上的同步, 客户端程序实时感知服务机状态,应用与应用之间的公共资源的互斥访问等等一系列的问题。 如果这些问题都依靠开发人员或维护人员去解决的话, 非旦消耗人力,而且也达不到实时准确的效果。

    所幸的是,zookeeper能够给我们非常完美的解决这些问题,zookeeper天生的就是为解决分布式协调服务这个问题而来, 应用zookeeper,能够非常好的解决如下问题:

    • 配置信息同步
    • 分布式锁控制
    • 消息的发布与订阅(典型的生产者消费者模型)
    • 集群内节点状态的快速感知

    2、概念

    大数据生态系统里的很多组件的命名都是某种动物或者昆虫, 比如hadoop就是大象,hive是蜜蜂。zookeeper即动物园管理者, 顾名思义就是管理大数据生态系统各组件的管理员

    Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。

    3、节点模型

    每个目录称为一个znode,具有唯一的路径标识

    每个znode可以存储1MB数据

    每个znode是有版本的,每个znode存储的数据可以设置版本,也就是说一个访问路径可以有多份数据

    znode可以被监控包括这个目录节点数据修改以及子节点变化,一旦变化可以通知设置监控的客户端

    4、特点

    5、应用场景

    提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

    统一命名服务

    统一配置管理

    统一集群管理

    服务动态上下线

     负载均衡管理

    二、安装

    1、window安装

    官网下载:Apache ZooKeeper

    找到页面中的in the achive链接打开

    打开选择要下载的版本,这里选择3.7.0,注意从3.5.7开始要下载带bin的tar.gz

    即apache-zookeeper-3.7.0-bin.tar.gz

    下载下来解压,解压工具解压不了使用window自带的powershell解压即可

    进入安装目录下的bin直接执行zkServer.cmd命令启动发现有错误,原因是默认配置是liunx配置,所以需要改配置

    打开conf目录,将zoo_sample.cfg文件复制一份,文件名zoo.cfg

    并且对zoo.cfg文件中的dataDir修改 dataDir=../data

    然后在bin目录下创建data文件夹

    进入bin目录,cmd中执行zkServer.cmd命令启动zk,然后重新打开一个cmd执行zkCli.cmd命令

    查看根节点下的目录ls /

    2、linux安装

    下载:https://archive.apache.org/dist/zookeeper/

    解压到/opt/module

    tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

    修改conf

    1. #(1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg;
    2. [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
    3. #(2)打开zoo.cfg文件,修改dataDir路径:
    4. [atguigu@hadoop102 zookeeper-3.4.10]$ vim zoo.cfg
    5. 修改如下内容:
    6. dataDir=/opt/module/zookeeper-3.4.10/zkData
    7. #(3)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹
    8. [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir zkData # 默认不会给你创建

    启动

    1. #(1)启动Zookeeper
    2. [atguigu@hadoop102 zookeeper-3.4.10]$ /opt/module/zookeeper-3.4.10/bin/zkServer.sh start
    3. #(2)查看进程是否启动
    4. [atguigu@hadoop102 zookeeper-3.4.10]$ jps
    5. 4020 Jps
    6. 4001 QuorumPeerMain # 这个是
    7. #(3)查看状态:
    8. [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh status
    9. ZooKeeper JMX enabled by default
    10. Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    11. Mode: standalone
    12. #(4)启动客户端:
    13. [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh
    14. #(5)退出客户端:
    15. [zk: localhost:2181(CONNECTED) 0] quit
    16. #(6)停止Zookeeper
    17. [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop

    3、集群安装

    1. 1.集群规划
    2. 在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。
    3. 2.解压安装
    4. #(1)解压Zookeeper安装包到/opt/module/目录下
    5. [atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
    6. #(2)同步/opt/module/zookeeper-3.4.10目录内容到hadoop103、hadoop104
    7. [atguigu@hadoop102 module]$ xsync zookeeper-3.4.10/
    8. 3.配置服务器编号
    9. #(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData
    10. [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir -p zkData
    11. #(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件
    12. [atguigu@hadoop102 zkData]$ touch myid
    13. #添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码
    14. #(3)编辑myid文件
    15. [atguigu@hadoop102 zkData]$ vi myid
    16. #在文件中添加与server对应的编号:
    17. 2
    18. #(4)拷贝配置好的zookeeper到其他机器上
    19. [atguigu@hadoop102 zkData]$ xsync myid
    20. 并分别在hadoop102、hadoop103上修改myid文件中内容为3、4
    21. 4.配置zoo.cfg文件
    22. #(1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg
    23. [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
    24. [atguigu@hadoop102 conf]$ vim zoo.cfg
    25. #修改数据存储路径配置
    26. dataDir=/opt/module/zookeeper-3.4.10/zkData
    27. # client端口为2181,即如kafka用2181与zk交互
    28. clientPort=2181
    29. #增加如下配置
    30. #######################cluster##########################
    31. server.2=hadoop102:2888:3888
    32. server.3=hadoop103:2888:3888
    33. server.4=hadoop104:2888:3888
    34. #server.第几号服务器=服务器的ip:服务器与集群leader交互端口2888:选举leader的端口3888
    35. # 这个第几号服务器是上面myid的内容
    36. #(3)同步zoo.cfg配置文件
    37. [atguigu@hadoop102 conf]$ xsync zoo.cfg
    38. #(4)配置参数解读

    集群操作

    1. #(1)分别启动Zookeeper
    2. [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
    3. [atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
    4. [atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
    5. #(2)查看状态
    6. [atguigu@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status
    7. JMX enabled by default
    8. Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    9. Mode: follower
    10. [atguigu@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status
    11. JMX enabled by default
    12. Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    13. Mode: leader
    14. [atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
    15. JMX enabled by default
    16. Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
    17. Mode: follower
    18. # 也可以通过以下方式加入到集群
    19. ./zkCli.sh ‐server 192.168.60.130:2181
    20. ./zkCli.sh ‐server 192.168.60.130:2182
    21. ./zkCli.sh ‐server 192.168.60.130:2183

    注:配置参数

    Zookeeper中的配置文件zoo.cfg中参数含义解读如下:

    1.tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒

    Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。

    它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)

    2.initLimit =10:LF初始通信时限

    集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。

    3.syncLimit =5:LF同步通信时限

    集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

    4.dataDir:数据文件目录+数据持久化路径

    主要用于保存Zookeeper中的数据。

    5.clientPort =2181:客户端连接端口,即如kafka用2181与zk交互

    监听客户端连接的端口。

    6 、server.2=hadoop102:2888:3888
    server.3=hadoop103:2888:3888
    server.4=hadoop104:2888:3888

    server.A=B:C:D。
    A是一个数字,表示这个是第几号服务器;
    集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
    B是这个服务器的ip地址;
    C是这个服务器与集群中的Leader服务器交换信息的端口;
    D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

    4、zk客户端可视化工具

    4.1、ZooInspector

    下载地址

    https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
    解压后进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar

    4.2、taokeeper

    基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件,
    安装前要求服务前先配置nc 和 sshd
    1.下载数据库脚本

    wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql
     

    三、命令行操作

    1、节点类型

    zookeeper中的节点有两种, 分别为临时节点和永久节点。 节点的类型在创建时即被确定, 并且不能改变。

    PERSISTENT–持久化目录节点:客户端与zookeeper断开连接后,该节点依旧存在;
    PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点:客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号;
    EPHEMERAL-临时目录节点:客户端与zookeeper断开连接后,该节点被删除。。 虽然每个临时的Znode都会绑定到一个客户端会话, 但他们对所有的客户端还是可见的。另外, ZooKeeper的临时节点不允许拥有子节点。;
    EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:客户端与zookeeper断开连接后(一旦会话(Session)结束),该节点被删除,只是Zookeeper给该节点名称进行顺序编号;

    2、zookeeper常用shell命令

    2.1、命令概述

    2.2、查看当前znode信息

    1. #查看当前znode信息
    2. [zk: localhost:2181(CONNECTED) 3] ls /
    3. [dubbo, sanguo0000000003, services, zookeeper]

    2.3、node详情

    ls -s /命令或者stat / 命令都行

    1. [zk: localhost:2181(CONNECTED) 2] ls -s /
    2. [dubbo, sanguo0000000003, services, zookeeper]
    3. cZxid = 0x0
    4. ctime = Thu Jan 01 08:00:00 CST 1970
    5. mZxid = 0x0
    6. mtime = Thu Jan 01 08:00:00 CST 1970
    7. pZxid = 0x66
    8. cversion = 4
    9. dataVersion = 0
    10. aclVersion = 0
    11. ephemeralOwner = 0x0
    12. dataLength = 0
    13. numChildren = 4

    1)czxid-创建节点的事务zxid

    每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。

    事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。

    2)ctime - znode被创建的毫秒数(从1970年开始)

    3)mzxid - znode最后更新的事务zxid

    4)mtime - znode最后修改的毫秒数(从1970年开始)

    5)pZxid-znode最后更新的子节点zxid

    6)cversion - znode子节点变化号,znode子节点修改次数

    7)dataversion - znode数据变化号

    8)aclVersion - znode访问控制列表的变化号

    9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。

    10)dataLength- znode的数据长度

    11)numChildren - znode子节点数量
     

     2.4、创建无序永久节点

    [zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"
    

    2.5、创建有序永久节点

    1. [zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao"
    2. Created /sanguo/weiguo/zhangliao0000000000
    3. [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao"
    4. Created /sanguo/weiguo/zhangliao0000000001
    5. [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu"
    6. Created /sanguo/weiguo/xuchu0000000002

    如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。

    2.6、创建 无序号临时节点

    [zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"

    当前客户端可以查看值 ,退出客户端重新进入值删除了 

    2.7、创建有序号临时节点

    [zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu"

    2.8、修改节点值

    [zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"

    2.9、删除节点

    [zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin

    2.10、递归删除节点

    [zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo

    2.11、节点监听

    查看监听原理节 

    四、javaApi操作

    1、原生api

    创建maven引入依赖

    1. <dependencies>
    2. <!-- https://mvnrepository.com/artifact/junit/junit -->
    3. <dependency>
    4. <groupId>junit</groupId>
    5. <artifactId>junit</artifactId>
    6. <version>RELEASE</version>
    7. </dependency>
    8. <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
    9. <dependency>
    10. <groupId>org.apache.logging.log4j</groupId>
    11. <artifactId>log4j-core</artifactId>
    12. <version>2.14.1</version>
    13. </dependency>
    14. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    15. <dependency>
    16. <groupId>org.apache.zookeeper</groupId>
    17. <artifactId>zookeeper</artifactId>
    18. <version>3.7.0</version>
    19. </dependency>
    20. </dependencies>

    需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”

    1. log4j.rootLogger=INFO, stdout
    2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    5. log4j.appender.logfile=org.apache.log4j.FileAppender
    6. log4j.appender.logfile.File=target/spring.log
    7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    1.1、创建zk客户端

    1. public class TestZookeeper {
    2. private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private int sessionTimeout = 2000;
    4. private ZooKeeper zooKeeper;
    5. @Test
    6. public void init() throws IOException {
    7. zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    8. @Override
    9. public void process(WatchedEvent watchedEvent) {
    10. }
    11. });
    12. }
    13. }

    1.2、创建子节点

    1. public class TestZookeeper {
    2. // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private String connectionString = "127.0.0.1:2181";
    4. private int sessionTimeout = 2000;
    5. private ZooKeeper zkClient;
    6. @Before
    7. public void init() throws IOException {
    8. zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    9. @Override
    10. public void process(WatchedEvent watchedEvent) {
    11. }
    12. });
    13. }
    14. //创建节点
    15. @Test
    16. public void create() throws InterruptedException, KeeperException {
    17. // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
    18. String path = zkClient.create("/atguigu", "chenchen".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    19. System.out.println(path);
    20. }
    21. }

    1.3、获取子节点并监听节点变化

    1. public class TestZookeeper {
    2. // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private String connectionString = "127.0.0.1:2181";
    4. private int sessionTimeout = 2000;
    5. private ZooKeeper zkClient;
    6. @Before
    7. public void init() throws IOException {
    8. zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    9. @Override
    10. public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
    11. List<String> children = null;
    12. try {
    13. System.out.println("********************************");
    14. children = zkClient.getChildren("/", true);
    15. for (int i = 0; i < children.size(); i++) {
    16. System.out.println(children.get(i));
    17. }
    18. System.out.println("********************************");
    19. } catch (KeeperException e) {
    20. e.printStackTrace();
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. });
    26. }
    27. //获取子节点并监控节点的变化
    28. @Test
    29. public void getDataAndWatch() throws InterruptedException, KeeperException, IOException {
    30. List<String> children = zkClient.getChildren("/", true);
    31. for (int i = 0; i < children.size(); i++) {
    32. System.out.println(children.get(i));
    33. }
    34. //让程序不要结束
    35. System.in.read();
    36. }
    37. }

    1.4、判断znode是否存在

    1. public class TestZookeeper {
    2. // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private String connectionString = "127.0.0.1:2181";
    4. private int sessionTimeout = 2000;
    5. private ZooKeeper zkClient;
    6. @Before
    7. public void init() throws IOException {
    8. zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    9. @Override
    10. public void process(WatchedEvent watchedEvent) {
    11. List<String> children = null;
    12. try {
    13. System.out.println("********************************");
    14. children = zkClient.getChildren("/", true);
    15. for (int i = 0; i < children.size(); i++) {
    16. System.out.println(children.get(i));
    17. }
    18. System.out.println("********************************");
    19. } catch (KeeperException e) {
    20. e.printStackTrace();
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. });
    26. }
    27. //判断节点是否存在
    28. @Test
    29. public void exist() throws InterruptedException, KeeperException {
    30. Stat exists = zkClient.exists("/dubbo", true);
    31. if (exists == null) {
    32. System.out.println("没有该节点");
    33. }else{
    34. System.out.println("该节点存在");
    35. }
    36. }
    37. }

    1.5、删除znode

    1. public class TestZookeeper {
    2. // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private String connectionString = "127.0.0.1:2181";
    4. private int sessionTimeout = 2000;
    5. private ZooKeeper zkClient;
    6. @Before
    7. public void init() throws IOException {
    8. zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    9. @Override
    10. public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
    11. List<String> children = null;
    12. try {
    13. System.out.println("********************************");
    14. children = zkClient.getChildren("/", true);
    15. for (int i = 0; i < children.size(); i++) {
    16. System.out.println(children.get(i));
    17. }
    18. System.out.println("********************************");
    19. } catch (KeeperException e) {
    20. e.printStackTrace();
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. });
    26. }
    27. //删除节点
    28. @Test
    29. public void delete() throws InterruptedException, KeeperException {
    30. zkClient.delete("/atguigu2",-1);
    31. }
    32. }

    2、zookeeper 开源客户端curator

    Curator 是 Apache ZooKeeper 的Java客户端库。Curator 项目的目标是简化 ZooKeeper 客户端的使用。

    Apache Curator –

    2.1、创建连接

    创建maven项目引入依赖和日志文件

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.test</groupId>
    7. <artifactId>curator-zk</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>junit</groupId>
    12. <artifactId>junit</artifactId>
    13. <version>4.10</version>
    14. <scope>test</scope>
    15. </dependency>
    16. <!--curator-->
    17. <dependency>
    18. <groupId>org.apache.curator</groupId>
    19. <artifactId>curator-framework</artifactId>
    20. <version>4.0.0</version>
    21. </dependency>
    22. <dependency>
    23. <groupId>org.apache.curator</groupId>
    24. <artifactId>curator-recipes</artifactId>
    25. <version>4.0.0</version>
    26. </dependency>
    27. <!--日志-->
    28. <dependency>
    29. <groupId>org.slf4j</groupId>
    30. <artifactId>slf4j-api</artifactId>
    31. <version>1.7.21</version>
    32. </dependency>
    33. <dependency>
    34. <groupId>org.slf4j</groupId>
    35. <artifactId>slf4j-log4j12</artifactId>
    36. <version>1.7.21</version>
    37. </dependency>
    38. </dependencies>
    39. <build>
    40. <plugins>
    41. <plugin>
    42. <groupId>org.apache.maven.plugins</groupId>
    43. <artifactId>maven-compiler-plugin</artifactId>
    44. <version>3.1</version>
    45. <configuration>
    46. <source>1.8</source>
    47. <target>1.8</target>
    48. </configuration>
    49. </plugin>
    50. </plugins>
    51. </build>
    52. </project>

    log4j.properties 

    1. log4j.rootLogger=off,stdout
    2. log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    3. log4j.appender.stdout.Target = System.out
    4. log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    5. log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

    连接zk 

    1. @Before
    2. public void testConnect() {
    3. //重试策略
    4. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    5. //2.第二种方式
    6. //CuratorFrameworkFactory.builder();
    7. client = CuratorFrameworkFactory.builder()
    8. .connectString("192.168.200.130:2181")
    9. .sessionTimeoutMs(60 * 1000)
    10. .connectionTimeoutMs(15 * 1000)
    11. .retryPolicy(retryPolicy)
    12. .namespace("test")
    13. .build();
    14. //开启连接
    15. client.start();
    16. }

     

    2.2、节点创建

    1. /**
    2. * 创建节点:create 持久 临时 顺序 数据
    3. * 1. 基本创建 :create().forPath("")
    4. * 2. 创建节点 带有数据:create().forPath("",data)
    5. * 3. 设置节点的类型:create().withMode().forPath("",data)
    6. * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
    7. */
    8. @Test
    9. public void testCreate() throws Exception {
    10. //2. 创建节点 带有数据
    11. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    12. String path = client.create().forPath("/app2", "hehe".getBytes());
    13. System.out.println(path);
    14. }
    15. @Test
    16. public void testCreate2() throws Exception {
    17. //1. 基本创建
    18. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    19. String path = client.create().forPath("/app1");
    20. System.out.println(path);
    21. }
    22. @Test
    23. public void testCreate3() throws Exception {
    24. //3. 设置节点的类型
    25. //默认类型:持久化
    26. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    27. System.out.println(path);
    28. }
    29. @Test
    30. public void testCreate4() throws Exception {
    31. //4. 创建多级节点 /app1/p1
    32. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
    33. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
    34. System.out.println(path);
    35. }

    2.3、节点查看

    1. /**
    2. * 查询节点:
    3. * 1. 查询数据:get: getData().forPath()
    4. * 2. 查询子节点: ls: getChildren().forPath()
    5. * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
    6. */
    7. @Test
    8. public void testGet1() throws Exception {
    9. //1. 查询数据:get
    10. byte[] data = client.getData().forPath("/app1");
    11. System.out.println(new String(data));
    12. }
    13. @Test
    14. public void testGet2() throws Exception {
    15. // 2. 查询子节点: ls
    16. List<String> path = client.getChildren().forPath("/");
    17. System.out.println(path);
    18. }
    19. @Test
    20. public void testGet3() throws Exception {
    21. Stat status = new Stat();
    22. System.out.println(status);
    23. //3. 查询节点状态信息:ls -s
    24. client.getData().storingStatIn(status).forPath("/app1");
    25. System.out.println(status);
    26. }

    2.4、节点删除

    1. /**
    2. * 删除节点: delete deleteall
    3. * 1. 删除单个节点:delete().forPath("/app1");
    4. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
    5. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
    6. * 4. 回调:inBackground
    7. * @throws Exception
    8. */
    9. @Test
    10. public void testDelete() throws Exception {
    11. // 1. 删除单个节点
    12. client.delete().forPath("/app1");
    13. }
    14. @Test
    15. public void testDelete2() throws Exception {
    16. //2. 删除带有子节点的节点
    17. client.delete().deletingChildrenIfNeeded().forPath("/app4");
    18. }
    19. @Test
    20. public void testDelete3() throws Exception {
    21. //3. 必须成功的删除
    22. client.delete().guaranteed().forPath("/app2");
    23. }
    24. @Test
    25. public void testDelete4() throws Exception {
    26. //4. 回调
    27. client.delete().guaranteed().inBackground(new BackgroundCallback(){
    28. @Override
    29. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
    30. System.out.println("我被删除了~");
    31. System.out.println(event);
    32. }
    33. }).forPath("/app1");
    34. }

    2.5、节点修改

    1. /**
    2. * 修改数据
    3. * 1. 基本修改数据:setData().forPath()
    4. * 2. 根据版本修改: setData().withVersion().forPath()
    5. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
    6. *
    7. * @throws Exception
    8. */
    9. @Test
    10. public void testSet() throws Exception {
    11. client.setData().forPath("/app1", "itcast".getBytes());
    12. }
    13. @Test
    14. public void testSetForVersion() throws Exception {
    15. Stat status = new Stat();
    16. //3. 查询节点状态信息:ls -s
    17. client.getData().storingStatIn(status).forPath("/app1");
    18. int version = status.getVersion();//查询出来的 3
    19. System.out.println(version);
    20. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
    21. }

    2.6、节点监听

    ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

    ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

    ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便

    需要开发人员自己反复注册Watcher,比较繁琐。

    Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

    ZooKeeper提供了三种Watcher:

    NodeCache : 只是监听某一个特定的节点

    PathChildrenCache : 监控一个ZNode的子节点.

    TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

    1. /**
    2. * 演示 NodeCache:给指定一个节点注册监听器
    3. */
    4. @Test
    5. public void testNodeCache() throws Exception {
    6. //1. 创建NodeCache对象
    7. final NodeCache nodeCache = new NodeCache(client,"/app1");
    8. //2. 注册监听
    9. nodeCache.getListenable().addListener(new NodeCacheListener() {
    10. @Override
    11. public void nodeChanged() throws Exception {
    12. System.out.println("节点变化了~");
    13. //获取修改节点后的数据
    14. byte[] data = nodeCache.getCurrentData().getData();
    15. System.out.println(new String(data));
    16. }
    17. });
    18. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
    19. nodeCache.start(true);
    20. while (true){
    21. }
    22. }
    23. /**演示PathChildrenCache
    24. */
    25. @Test
    26. public void testPathChildrenCache() throws Exception {
    27. //1.创建监听对象
    28. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
    29. //2. 绑定监听器
    30. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override
    31. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    32. System.out.println("子节点变化了~");
    33. System.out.println(event);
    34. //监听子节点的数据变更,并且拿到变更后的数据
    35. //1.获取类型
    36. PathChildrenCacheEvent.Type type = event.getType();
    37. //2.判断类型是否是update
    38. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    39. System.out.println("数据变了!!!");
    40. byte[] data = event.getData().getData();
    41. System.out.println(new String(data));
    42. }
    43. }
    44. });
    45. //3. 开启
    46. pathChildrenCache.start();
    47. while (true){
    48. }
    49. }
    50. /**
    51. * 演示 TreeCache:监听某个节点自己和所有子节点们
    52. */
    53. @Test
    54. public void testTreeCache() throws Exception {
    55. //1. 创建监听器
    56. TreeCache treeCache = new TreeCache(client,"/app2");
    57. //2. 注册监听
    58. treeCache.getListenable().addListener(new TreeCacheListener() {
    59. @Override
    60. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
    61. System.out.println("节点变化了");
    62. System.out.println(event);
    63. }
    64. });
    65. //3. 开启
    66. treeCache.start();
    67. while (true){
    68. }
    69. }

    2.7、分布式锁

    在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。

    但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。

    那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁
    在这里插入图片描述

    核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。


    客户端获取锁时,在lock节点下创建临时顺序节点。

    然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。

    如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

    如果发现比自己小的那个节点被删除,则客户端的

    Watcher会收到相应通知,此时再次判断自己创建的节点

    是否是lock子节点中序号最小的,如果是则获取到了锁,

    如果不是则重复以上步骤继续获取到比自己小的一个节点

    并注册监听。
    在这里插入图片描述

    分布式锁-模拟12306售票案例

    Curator实现分布式锁API

    在Curator中有五种锁方案:

    • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
    • InterProcessMutex:分布式可重入排它锁
    • InterProcessReadWriteLock:分布式读写锁
    • InterProcessMultiLock:将多个锁作为单个实体管理的容器
    • InterProcessSemaphoreV2:共享信号量
       

     在这里插入图片描述

     

     

     创建线程进行加锁设置

    1. public class Ticket12306 implements Runnable{
    2. private int tickets = 10;//数据库的票数
    3. private InterProcessMutex lock ;
    4. @Override
    5. public void run() {
    6. while(true){
    7. //获取锁
    8. try {
    9. lock.acquire(3, TimeUnit.SECONDS);
    10. if(tickets > 0){
    11. System.out.println(Thread.currentThread()+":"+tickets);
    12. Thread.sleep(100);
    13. tickets--;
    14. }
    15. } catch (Exception e) {
    16. e.printStackTrace();
    17. }finally {
    18. //释放锁
    19. try {
    20. lock.release();
    21. } catch (Exception e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. }
    26. }
    27. }

    创建连接,并且初始化锁

    1. public Ticket12306(){
    2. //重试策略
    3. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    4. //2.第二种方式
    5. //CuratorFrameworkFactory.builder();
    6. CuratorFramework client = CuratorFrameworkFactory.builder()
    7. .connectString("192.168.149.135:2181")
    8. .sessionTimeoutMs(60 * 1000)
    9. .connectionTimeoutMs(15 * 1000)
    10. .retryPolicy(retryPolicy)
    11. .build();
    12. //开启连接
    13. client.start();
    14. lock = new InterProcessMutex(client,"/lock");
    15. }

     运行多个线程进行测试

    1. public class LockTest {
    2. public static void main(String[] args) {
    3. Ticket12306 ticket12306 = new Ticket12306();
    4. //创建客户端
    5. Thread t1 = new Thread(ticket12306,"携程");
    6. Thread t2 = new Thread(ticket12306,"飞猪");
    7. t1.start();
    8. t2.start();
    9. }
    10. }

    五、内部原理(面试题)

    1、选举机制

    1)半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。

    2)Zookeeper虽然在配置文件中并没有指定Master和Slave。但是,Zookeeper工作时,是有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的。

    3)以一个简单的例子来说明整个选举的过程。

    假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么,

    在这里插入图片描述

    (1)服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态。

    (2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。

    (3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader。

    (4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。

    (5)服务器5启动,同4一样当小弟。

    2、ZooKeeper的监听原理是什么?

    2.1、节点值变化监听

    在 hadoop104 主机上注册监听/sanguo 节点数据变化

    [zk: localhost:2181(CONNECTED) 26] get -w /sanguo

    可能因为版本问题上面的命令会报错,可改用:

    [zk: localhost:2181(CONNECTED) 26] get /sanguo [watch]

    在 hadoop103 主机上修改/sanguo 节点的数据 

    [zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"

    观察 hadoop104 主机收到数据变化的监听 

    1. WATCHER::
    2. WatchedEvent state:SyncConnected type:NodeDataChanged
    3. path:/sanguo

    注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。 

    2.2、节点的子节点变化监听(路径变化)

    在 hadoop104 主机上注册监听/sanguo 节点的子节点变化

    1. [zk: localhost:2181(CONNECTED) 1] ls -w /sanguo
    2. [shuguo, weiguo]

    在 hadoop103 主机/sanguo 节点上创建子节点 

    1. [zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
    2. Created /sanguo/jin

    观察 hadoop104 主机收到子节点变化的监听 

    1. WATCHER::
    2. WatchedEvent state:SyncConnected type:NodeChildrenChanged
    3. path:/sanguo

    注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

    • 监控数据变化用 set
    • 监控子节点变化用 ls

    3、写数据流程

    4、ZooKeeper的部署方式有哪几种?

    集群中的角色有哪些?集群最少需要几台机器?

    (1)部署方式单机模式、集群模式

    (2)角色:Leader和Follower

    (3)集群最少需要机器数:3

    六、实战案例

    1、服务器动态上下线

    需求

    现在集群创建server节点

    create /servers "servers"

    服务器端向Zookeeper注册代码

    1. public class DistributeServer {
    2. private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private int sessionTimeout = 2000;
    4. private ZooKeeper zkClient;
    5. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    6. DistributeServer server = new DistributeServer();
    7. //1 连接zookeeper集群
    8. server.getConnect();
    9. //2 注册节点
    10. server.regist("test2");
    11. //3 业务逻辑处理
    12. server.business();
    13. }
    14. //业务逻辑处理
    15. private void business() throws InterruptedException {
    16. Thread.sleep(Long.MAX_VALUE);
    17. }
    18. //注册节点
    19. private void regist(String hostName) throws InterruptedException, KeeperException {
    20. String path = zkClient.create
    21. ("/servers/server", hostName.getBytes(),
    22. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    23. System.out.println(path);
    24. }
    25. //连接zookeeper集群
    26. private void getConnect() throws IOException {
    27. zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    28. @Override
    29. public void process(WatchedEvent watchedEvent) {
    30. List<String> children = null;
    31. try {
    32. System.out.println("********************************");
    33. children = zkClient.getChildren("/", true);
    34. for (int i = 0; i < children.size(); i++) {
    35. System.out.println(children.get(i));
    36. }
    37. System.out.println("********************************");
    38. } catch (KeeperException e) {
    39. e.printStackTrace();
    40. } catch (InterruptedException e) {
    41. e.printStackTrace();
    42. }
    43. }
    44. });
    45. }
    46. }

    客户端代码

    1. public class DistributeClient {
    2. private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    3. private int sessionTimeout = 2000;
    4. private ZooKeeper zkClient;
    5. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    6. DistributeClient server = new DistributeClient();
    7. //1 连接zookeeper集群
    8. server.getConnect();
    9. //2 注册节点
    10. server.getChildren();
    11. //3 业务逻辑处理
    12. server.business();
    13. }
    14. //注册监听
    15. private void getChildren() throws InterruptedException, KeeperException {
    16. List<String> children = zkClient.getChildren("/servers", true);
    17. //储存服务器节点主机名称集合
    18. ArrayList<String> hosts = new ArrayList<>();
    19. for (String child : children) {
    20. byte[] data = zkClient.getData("/servers/" + child, false, null);
    21. hosts.add(String.valueOf(data));
    22. }
    23. //将所有在线主机的名称打印出来
    24. System.out.println(hosts);
    25. }
    26. //业务逻辑处理
    27. private void business() throws InterruptedException {
    28. Thread.sleep(Long.MAX_VALUE);
    29. }
    30. //连接zookeeper集群
    31. private void getConnect() throws IOException {
    32. zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
    33. @Override
    34. public void process(WatchedEvent watchedEvent) {
    35. try {
    36. getChildren();
    37. } catch (InterruptedException e) {
    38. e.printStackTrace();
    39. } catch (KeeperException e) {
    40. e.printStackTrace();
    41. }
    42. }
    43. });
    44. }
    45. }

    2、分布式锁案例

    2.1、原生zk实现

    锁实现

    1. package com.herobin.flink.debug_local.zk;
    2. import org.apache.zookeeper.*;
    3. import org.apache.zookeeper.data.Stat;
    4. import java.io.IOException;
    5. import java.util.Collections;
    6. import java.util.List;
    7. import java.util.concurrent.CountDownLatch;
    8. public class DistributedLock {
    9. private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    10. // 超时时间
    11. private int sessionTimeout = 2000;
    12. private ZooKeeper zk;
    13. private String rootNode = "locks";
    14. private String subNode = "seq-";
    15. // 当前 client 等待的子节点
    16. private String waitPath;
    17. //ZooKeeper 连接
    18. private CountDownLatch connectLatch = new CountDownLatch(1);
    19. //ZooKeeper 节点等待
    20. private CountDownLatch waitLatch = new CountDownLatch(1);
    21. // 当前 client 创建的子节点
    22. private String currentNode;
    23. // 和 zk 服务建立连接,并创建根节点
    24. public DistributedLock() throws IOException, InterruptedException, KeeperException {
    25. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    26. @Override
    27. public void process(WatchedEvent event) {
    28. // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
    29. if (event.getState() ==
    30. Event.KeeperState.SyncConnected) {
    31. connectLatch.countDown();
    32. }
    33. // 发生了 waitPath 的删除事件
    34. if (event.getType() ==
    35. Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
    36. {
    37. waitLatch.countDown();
    38. } }
    39. });
    40. // 等待连接建立
    41. connectLatch.await();
    42. //获取根节点状态
    43. Stat stat = zk.exists("/" + rootNode, false);
    44. //如果根节点不存在,则创建根节点,根节点类型为永久节点
    45. if (stat == null) {
    46. System.out.println("根节点不存在");
    47. zk.create("/" + rootNode, new byte[0],
    48. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    49. }
    50. }
    51. // 加锁方法
    52. public void zkLock() {
    53. try {
    54. //在根节点下创建临时顺序节点,返回值为创建的节点路径
    55. currentNode = zk.create("/" + rootNode + "/" + subNode,
    56. null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
    57. CreateMode.EPHEMERAL_SEQUENTIAL);
    58. // wait 一小会, 让结果更清晰一些
    59. Thread.sleep(10);
    60. // 注意, 没有必要监听"/locks"的子节点的变化情况
    61. List<String> childrenNodes = zk.getChildren("/" +
    62. rootNode, false);
    63. // 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
    64. if (childrenNodes.size() == 1) {
    65. return;
    66. } else {
    67. //对根节点下的所有临时顺序节点进行从小到大排序
    68. Collections.sort(childrenNodes);
    69. //当前节点名称
    70. String thisNode = currentNode.substring(("/" + rootNode + "/").length());
    71. //获取当前节点的位置
    72. int index = childrenNodes.indexOf(thisNode);
    73. if (index == -1) {
    74. System.out.println("数据异常");
    75. } else if (index == 0) {
    76. // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
    77. return;
    78. } else {
    79. // 获得排名比 currentNode 前 1 位的节点
    80. this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
    81. // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
    82. zk.getData(waitPath, true, new Stat());
    83. //进入等待锁状态
    84. waitLatch.await();
    85. return;
    86. }
    87. }
    88. } catch (KeeperException e) {
    89. e.printStackTrace();
    90. } catch (InterruptedException e) {
    91. e.printStackTrace();
    92. }
    93. }
    94. // 解锁方法
    95. public void zkUnlock() {
    96. try {
    97. zk.delete(this.currentNode, -1);
    98. } catch (InterruptedException | KeeperException e) {
    99. e.printStackTrace();
    100. }
    101. }
    102. }

    锁测试(创建两个线程)

    1. package com.herobin.flink.debug_local.zk;
    2. import org.apache.zookeeper.KeeperException;
    3. import java.io.IOException;
    4. public class DistributedLockTest {
    5. public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
    6. // 创建分布式锁 1
    7. final DistributedLock lock1 = new DistributedLock();
    8. // 创建分布式锁 2
    9. final DistributedLock lock2 = new DistributedLock();
    10. new Thread(new Runnable() {
    11. @Override
    12. public void run() {
    13. // 获取锁对象
    14. try {
    15. lock1.zkLock();
    16. System.out.println("线程 1 获取锁");
    17. Thread.sleep(5 * 1000);
    18. lock1.zkUnlock();
    19. System.out.println("线程 1 释放锁");
    20. } catch (Exception e) {
    21. e.printStackTrace();
    22. } }
    23. }).start();
    24. new Thread(new Runnable() {
    25. @Override
    26. public void run() {
    27. // 获取锁对象
    28. try {
    29. lock2.zkLock();
    30. System.out.println("线程 2 获取锁");
    31. Thread.sleep(5 * 1000);
    32. lock2.zkUnlock();
    33. System.out.println("线程 2 释放锁");
    34. } catch (Exception e) {
    35. e.printStackTrace();
    36. } }
    37. }).start();
    38. }
    39. }

    观察运行

    1. 线程 1 获取锁
    2. 线程 1 释放锁
    3. 线程 2 获取锁
    4. 线程 2 释放锁

    2.2、Curator 框架实现分布式锁案例

    查看API Curator章节

  • 相关阅读:
    ROS OpenCV 级联分类器
    使用cython加速代码运行
    idea内存不足
    音乐转录(AMT)库Omnizart论文笔记及实践
    x264参数介绍(帧类型和码率控制,分析和视频可用性信息)
    asp毕业设计——基于asp+access的学生论坛设计与实现(毕业论文+程序源码)——学生论坛
    神经网络 梯度与神经元参数w、b关系;梯度与导数关系
    如何基于容器网络流量指标进行弹性伸缩
    敏感词在线检测-敏感词在线检测工具
    线性回归详解(代码实现+理论证明)
  • 原文地址:https://blog.csdn.net/qq_34491508/article/details/125373957