随着应用规模的迅速扩张,单台机器的部署已经难以支撑用户大规模、高并发的请求了, 因此服务化、集群化、分布式概念应运而生。 针对这种场景,人们通常使用的做法就是将软件按照模块进行拆分,形成独立的子系统,然后在局域网内部署到多台机器上面, 形成了一个集群。 这种方式即可以分滩请求压力,又可以起到灾备的效果。
然而, 集群的维护和多节点应用程序的协作运行远比单机模式复杂,需要顾及到的细节问题实在太多,比如说同一分配置在多台机器上的同步, 客户端程序实时感知服务机状态,应用与应用之间的公共资源的互斥访问等等一系列的问题。 如果这些问题都依靠开发人员或维护人员去解决的话, 非旦消耗人力,而且也达不到实时准确的效果。
所幸的是,zookeeper能够给我们非常完美的解决这些问题,zookeeper天生的就是为解决分布式协调服务这个问题而来, 应用zookeeper,能够非常好的解决如下问题:
大数据生态系统里的很多组件的命名都是某种动物或者昆虫, 比如hadoop就是大象,hive是蜜蜂。zookeeper即动物园管理者, 顾名思义就是管理大数据生态系统各组件的管理员
Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。
每个目录称为一个znode,具有唯一的路径标识
每个znode可以存储1MB数据
每个znode是有版本的,每个znode存储的数据可以设置版本,也就是说一个访问路径可以有多份数据
znode可以被监控包括这个目录节点数据修改以及子节点变化,一旦变化可以通知设置监控的客户端
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
统一配置管理
统一集群管理
服务动态上下线
负载均衡管理
官网下载:Apache ZooKeeper
找到页面中的in the achive链接打开
打开选择要下载的版本,这里选择3.7.0,注意从3.5.7开始要下载带bin的tar.gz
即apache-zookeeper-3.7.0-bin.tar.gz
下载下来解压,解压工具解压不了使用window自带的powershell解压即可
进入安装目录下的bin直接执行zkServer.cmd命令启动发现有错误,原因是默认配置是liunx配置,所以需要改配置
打开conf目录,将zoo_sample.cfg文件复制一份,文件名zoo.cfg
并且对zoo.cfg文件中的dataDir修改 dataDir=../data
然后在bin目录下创建data文件夹
进入bin目录,cmd中执行zkServer.cmd命令启动zk,然后重新打开一个cmd执行zkCli.cmd命令
查看根节点下的目录ls /
下载:https://archive.apache.org/dist/zookeeper/
解压到/opt/module
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
修改conf
- #(1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg;
- [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
-
- #(2)打开zoo.cfg文件,修改dataDir路径:
- [atguigu@hadoop102 zookeeper-3.4.10]$ vim zoo.cfg
- 修改如下内容:
- dataDir=/opt/module/zookeeper-3.4.10/zkData
-
- #(3)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹
- [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir zkData # 默认不会给你创建
启动
- #(1)启动Zookeeper
- [atguigu@hadoop102 zookeeper-3.4.10]$ /opt/module/zookeeper-3.4.10/bin/zkServer.sh start
-
- #(2)查看进程是否启动
- [atguigu@hadoop102 zookeeper-3.4.10]$ jps
- 4020 Jps
- 4001 QuorumPeerMain # 这个是
-
- #(3)查看状态:
- [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh status
-
- ZooKeeper JMX enabled by default
- Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
- Mode: standalone
-
- #(4)启动客户端:
- [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh
-
- #(5)退出客户端:
- [zk: localhost:2181(CONNECTED) 0] quit
-
- #(6)停止Zookeeper
- [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop
- 1.集群规划
- 在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。
-
- 2.解压安装
- #(1)解压Zookeeper安装包到/opt/module/目录下
- [atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
- #(2)同步/opt/module/zookeeper-3.4.10目录内容到hadoop103、hadoop104
- [atguigu@hadoop102 module]$ xsync zookeeper-3.4.10/
-
- 3.配置服务器编号
- #(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData
- [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir -p zkData
- #(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件
- [atguigu@hadoop102 zkData]$ touch myid
- #添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码
-
- #(3)编辑myid文件
- [atguigu@hadoop102 zkData]$ vi myid
- #在文件中添加与server对应的编号:
- 2
-
- #(4)拷贝配置好的zookeeper到其他机器上
- [atguigu@hadoop102 zkData]$ xsync myid
- 并分别在hadoop102、hadoop103上修改myid文件中内容为3、4
-
- 4.配置zoo.cfg文件
- #(1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg
- [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
- [atguigu@hadoop102 conf]$ vim zoo.cfg
- #修改数据存储路径配置
- dataDir=/opt/module/zookeeper-3.4.10/zkData
- # client端口为2181,即如kafka用2181与zk交互
- clientPort=2181
- #增加如下配置
- #######################cluster##########################
- server.2=hadoop102:2888:3888
- server.3=hadoop103:2888:3888
- server.4=hadoop104:2888:3888
- #server.第几号服务器=服务器的ip:服务器与集群leader交互端口2888:选举leader的端口3888
- # 这个第几号服务器是上面myid的内容
-
-
-
- #(3)同步zoo.cfg配置文件
- [atguigu@hadoop102 conf]$ xsync zoo.cfg
- #(4)配置参数解读
集群操作
- #(1)分别启动Zookeeper
- [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
- [atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
- [atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
- #(2)查看状态
- [atguigu@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status
- JMX enabled by default
- Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
- Mode: follower
-
- [atguigu@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status
- JMX enabled by default
- Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
- Mode: leader
-
- [atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
- JMX enabled by default
- Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
- Mode: follower
-
- # 也可以通过以下方式加入到集群
- ./zkCli.sh ‐server 192.168.60.130:2181
- ./zkCli.sh ‐server 192.168.60.130:2182
- ./zkCli.sh ‐server 192.168.60.130:2183
注:配置参数
Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
1.tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)
2.initLimit =10:LF初始通信时限
集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。
3.syncLimit =5:LF同步通信时限
集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
4.dataDir:数据文件目录+数据持久化路径
主要用于保存Zookeeper中的数据。
5.clientPort =2181:客户端连接端口,即如kafka用2181与zk交互
监听客户端连接的端口。
6 、server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
下载地址
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
解压后进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar
基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件,
安装前要求服务前先配置nc 和 sshd
1.下载数据库脚本
wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql
zookeeper中的节点有两种, 分别为临时节点和永久节点。 节点的类型在创建时即被确定, 并且不能改变。
PERSISTENT–持久化目录节点:客户端与zookeeper断开连接后,该节点依旧存在;
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点:客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号;
EPHEMERAL-临时目录节点:客户端与zookeeper断开连接后,该节点被删除。。 虽然每个临时的Znode都会绑定到一个客户端会话, 但他们对所有的客户端还是可见的。另外, ZooKeeper的临时节点不允许拥有子节点。;
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:客户端与zookeeper断开连接后(一旦会话(Session)结束),该节点被删除,只是Zookeeper给该节点名称进行顺序编号;
- #查看当前znode信息
- [zk: localhost:2181(CONNECTED) 3] ls /
- [dubbo, sanguo0000000003, services, zookeeper]
ls -s /命令或者stat / 命令都行
- [zk: localhost:2181(CONNECTED) 2] ls -s /
- [dubbo, sanguo0000000003, services, zookeeper]
- cZxid = 0x0
- ctime = Thu Jan 01 08:00:00 CST 1970
- mZxid = 0x0
- mtime = Thu Jan 01 08:00:00 CST 1970
- pZxid = 0x66
- cversion = 4
- dataVersion = 0
- aclVersion = 0
- ephemeralOwner = 0x0
- dataLength = 0
- numChildren = 4
1)czxid-创建节点的事务zxid
每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
2)ctime - znode被创建的毫秒数(从1970年开始)
3)mzxid - znode最后更新的事务zxid
4)mtime - znode最后修改的毫秒数(从1970年开始)
5)pZxid-znode最后更新的子节点zxid
6)cversion - znode子节点变化号,znode子节点修改次数
7)dataversion - znode数据变化号
8)aclVersion - znode访问控制列表的变化号
9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
10)dataLength- znode的数据长度
11)numChildren - znode子节点数量
[zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"
- [zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao"
- Created /sanguo/weiguo/zhangliao0000000000
- [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao"
- Created /sanguo/weiguo/zhangliao0000000001
- [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu"
- Created /sanguo/weiguo/xuchu0000000002
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"
当前客户端可以查看值 ,退出客户端重新进入值删除了
[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu"
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
[zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
查看监听原理节
创建maven引入依赖
- <dependencies>
- <!-- https://mvnrepository.com/artifact/junit/junit -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>RELEASE</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.14.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.7.0</version>
- </dependency>
- </dependencies>
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”
- log4j.rootLogger=INFO, stdout
-
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
-
- log4j.appender.logfile=org.apache.log4j.FileAppender
-
- log4j.appender.logfile.File=target/spring.log
-
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
-
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- public class TestZookeeper {
-
- private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private int sessionTimeout = 2000;
- private ZooKeeper zooKeeper;
-
- @Test
- public void init() throws IOException {
- zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
-
- }
- });
- }
- }
- public class TestZookeeper {
-
- // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private String connectionString = "127.0.0.1:2181";
- private int sessionTimeout = 2000;
- private ZooKeeper zkClient;
-
- @Before
- public void init() throws IOException {
- zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
-
- }
- });
- }
-
- //创建节点
- @Test
- public void create() throws InterruptedException, KeeperException {
- // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
- String path = zkClient.create("/atguigu", "chenchen".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- System.out.println(path);
- }
- }
- public class TestZookeeper {
-
- // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private String connectionString = "127.0.0.1:2181";
- private int sessionTimeout = 2000;
- private ZooKeeper zkClient;
-
- @Before
- public void init() throws IOException {
- zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
- List<String> children = null;
- try {
- System.out.println("********************************");
- children = zkClient.getChildren("/", true);
- for (int i = 0; i < children.size(); i++) {
- System.out.println(children.get(i));
- }
- System.out.println("********************************");
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- //获取子节点并监控节点的变化
- @Test
- public void getDataAndWatch() throws InterruptedException, KeeperException, IOException {
- List<String> children = zkClient.getChildren("/", true);
- for (int i = 0; i < children.size(); i++) {
- System.out.println(children.get(i));
- }
- //让程序不要结束
- System.in.read();
- }
-
- }
-
- public class TestZookeeper {
-
- // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private String connectionString = "127.0.0.1:2181";
- private int sessionTimeout = 2000;
- private ZooKeeper zkClient;
-
- @Before
- public void init() throws IOException {
- zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- List<String> children = null;
- try {
- System.out.println("********************************");
- children = zkClient.getChildren("/", true);
- for (int i = 0; i < children.size(); i++) {
- System.out.println(children.get(i));
- }
- System.out.println("********************************");
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- //判断节点是否存在
- @Test
- public void exist() throws InterruptedException, KeeperException {
- Stat exists = zkClient.exists("/dubbo", true);
- if (exists == null) {
- System.out.println("没有该节点");
- }else{
- System.out.println("该节点存在");
- }
- }
- }
- public class TestZookeeper {
-
- // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private String connectionString = "127.0.0.1:2181";
- private int sessionTimeout = 2000;
- private ZooKeeper zkClient;
-
- @Before
- public void init() throws IOException {
- zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
- List<String> children = null;
- try {
- System.out.println("********************************");
- children = zkClient.getChildren("/", true);
- for (int i = 0; i < children.size(); i++) {
- System.out.println(children.get(i));
- }
- System.out.println("********************************");
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- //删除节点
- @Test
- public void delete() throws InterruptedException, KeeperException {
- zkClient.delete("/atguigu2",-1);
- }
- }
-
Curator 是 Apache ZooKeeper 的Java客户端库。Curator 项目的目标是简化 ZooKeeper 客户端的使用。
创建maven项目引入依赖和日志文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.test</groupId>
- <artifactId>curator-zk</artifactId>
- <version>1.0-SNAPSHOT</version>
-
-
-
- <dependencies>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
-
- <!--curator-->
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.0.0</version>
- </dependency>
- <!--日志-->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.21</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.21</version>
- </dependency>
-
- </dependencies>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-
- </project>
log4j.properties
- log4j.rootLogger=off,stdout
-
- log4j.appender.stdout = org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.Target = System.out
- log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n
连接zk
- @Before
- public void testConnect() {
- //重试策略
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
- //2.第二种方式
- //CuratorFrameworkFactory.builder();
- client = CuratorFrameworkFactory.builder()
- .connectString("192.168.200.130:2181")
- .sessionTimeoutMs(60 * 1000)
- .connectionTimeoutMs(15 * 1000)
- .retryPolicy(retryPolicy)
- .namespace("test")
- .build();
- //开启连接
- client.start();
- }
- /**
- * 创建节点:create 持久 临时 顺序 数据
- * 1. 基本创建 :create().forPath("")
- * 2. 创建节点 带有数据:create().forPath("",data)
- * 3. 设置节点的类型:create().withMode().forPath("",data)
- * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
- */
- @Test
- public void testCreate() throws Exception {
- //2. 创建节点 带有数据
- //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
- String path = client.create().forPath("/app2", "hehe".getBytes());
- System.out.println(path);
- }
-
- @Test
- public void testCreate2() throws Exception {
- //1. 基本创建
- //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
- String path = client.create().forPath("/app1");
- System.out.println(path);
- }
- @Test
- public void testCreate3() throws Exception {
- //3. 设置节点的类型
- //默认类型:持久化
- String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
- System.out.println(path);
- }
- @Test
- public void testCreate4() throws Exception {
- //4. 创建多级节点 /app1/p1
- //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
- String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
- System.out.println(path);
- }
- /**
- * 查询节点:
- * 1. 查询数据:get: getData().forPath()
- * 2. 查询子节点: ls: getChildren().forPath()
- * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
- */
- @Test
- public void testGet1() throws Exception {
- //1. 查询数据:get
- byte[] data = client.getData().forPath("/app1");
- System.out.println(new String(data));
- }
-
- @Test
- public void testGet2() throws Exception {
- // 2. 查询子节点: ls
- List<String> path = client.getChildren().forPath("/");
- System.out.println(path);
- }
- @Test
- public void testGet3() throws Exception {
- Stat status = new Stat();
- System.out.println(status);
- //3. 查询节点状态信息:ls -s
- client.getData().storingStatIn(status).forPath("/app1");
- System.out.println(status);
- }
- /**
- * 删除节点: delete deleteall
- * 1. 删除单个节点:delete().forPath("/app1");
- * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
- * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
- * 4. 回调:inBackground
- * @throws Exception
- */
- @Test
- public void testDelete() throws Exception {
- // 1. 删除单个节点
- client.delete().forPath("/app1");
- }
- @Test
- public void testDelete2() throws Exception {
- //2. 删除带有子节点的节点
- client.delete().deletingChildrenIfNeeded().forPath("/app4");
- }
-
- @Test
- public void testDelete3() throws Exception {
- //3. 必须成功的删除
- client.delete().guaranteed().forPath("/app2");
- }
- @Test
- public void testDelete4() throws Exception {
- //4. 回调
- client.delete().guaranteed().inBackground(new BackgroundCallback(){
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
- System.out.println("我被删除了~");
- System.out.println(event);
- }
- }).forPath("/app1");
- }
- /**
- * 修改数据
- * 1. 基本修改数据:setData().forPath()
- * 2. 根据版本修改: setData().withVersion().forPath()
- * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
- *
- * @throws Exception
- */
- @Test
- public void testSet() throws Exception {
- client.setData().forPath("/app1", "itcast".getBytes());
- }
-
- @Test
- public void testSetForVersion() throws Exception {
- Stat status = new Stat();
- //3. 查询节点状态信息:ls -s
- client.getData().storingStatIn(status).forPath("/app1");
- int version = status.getVersion();//查询出来的 3
- System.out.println(version);
- client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
- }
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便
需要开发人员自己反复注册Watcher,比较繁琐。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
NodeCache : 只是监听某一个特定的节点
PathChildrenCache : 监控一个ZNode的子节点.
TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
- /**
- * 演示 NodeCache:给指定一个节点注册监听器
- */
- @Test
- public void testNodeCache() throws Exception {
- //1. 创建NodeCache对象
- final NodeCache nodeCache = new NodeCache(client,"/app1");
- //2. 注册监听
- nodeCache.getListenable().addListener(new NodeCacheListener() {
- @Override
- public void nodeChanged() throws Exception {
- System.out.println("节点变化了~");
- //获取修改节点后的数据
- byte[] data = nodeCache.getCurrentData().getData();
- System.out.println(new String(data));
- }
- });
- //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
- nodeCache.start(true);
- while (true){
- }
- }
-
- /**演示PathChildrenCache
- */
- @Test
- public void testPathChildrenCache() throws Exception {
- //1.创建监听对象
- PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
- //2. 绑定监听器
- pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- System.out.println("子节点变化了~");
- System.out.println(event);
- //监听子节点的数据变更,并且拿到变更后的数据
- //1.获取类型
- PathChildrenCacheEvent.Type type = event.getType();
- //2.判断类型是否是update
- if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
- System.out.println("数据变了!!!");
- byte[] data = event.getData().getData();
- System.out.println(new String(data));
- }
- }
- });
- //3. 开启
- pathChildrenCache.start();
- while (true){
- }
- }
-
-
- /**
- * 演示 TreeCache:监听某个节点自己和所有子节点们
- */
- @Test
- public void testTreeCache() throws Exception {
- //1. 创建监听器
- TreeCache treeCache = new TreeCache(client,"/app2");
- //2. 注册监听
- treeCache.getListenable().addListener(new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- System.out.println("节点变化了");
- System.out.println(event);
- }
- });
- //3. 开启
- treeCache.start();
- while (true){
- }
- }
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
客户端获取锁时,在lock节点下创建临时顺序节点。
然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点
是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。
分布式锁-模拟12306售票案例
Curator实现分布式锁API
在Curator中有五种锁方案:
创建线程进行加锁设置
- public class Ticket12306 implements Runnable{
- private int tickets = 10;//数据库的票数
- private InterProcessMutex lock ;
- @Override
- public void run() {
- while(true){
- //获取锁
- try {
- lock.acquire(3, TimeUnit.SECONDS);
- if(tickets > 0){
- System.out.println(Thread.currentThread()+":"+tickets);
- Thread.sleep(100);
- tickets--;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- //释放锁
- try {
- lock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
- }
- }
创建连接,并且初始化锁
- public Ticket12306(){
- //重试策略
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
- //2.第二种方式
- //CuratorFrameworkFactory.builder();
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString("192.168.149.135:2181")
- .sessionTimeoutMs(60 * 1000)
- .connectionTimeoutMs(15 * 1000)
- .retryPolicy(retryPolicy)
- .build();
- //开启连接
- client.start();
- lock = new InterProcessMutex(client,"/lock");
- }
运行多个线程进行测试
- public class LockTest {
- public static void main(String[] args) {
- Ticket12306 ticket12306 = new Ticket12306();
- //创建客户端
- Thread t1 = new Thread(ticket12306,"携程");
- Thread t2 = new Thread(ticket12306,"飞猪");
- t1.start();
- t2.start();
- }
- }
1)半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。
2)Zookeeper虽然在配置文件中并没有指定Master和Slave。但是,Zookeeper工作时,是有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的。
3)以一个简单的例子来说明整个选举的过程。
假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么,
(1)服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态。
(2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
(3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader。
(4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。
(5)服务器5启动,同4一样当小弟。
在 hadoop104 主机上注册监听/sanguo 节点数据变化
[zk: localhost:2181(CONNECTED) 26] get -w /sanguo
可能因为版本问题上面的命令会报错,可改用:
[zk: localhost:2181(CONNECTED) 26] get /sanguo [watch]
在 hadoop103 主机上修改/sanguo 节点的数据
[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
观察 hadoop104 主机收到数据变化的监听
- WATCHER::
- WatchedEvent state:SyncConnected type:NodeDataChanged
- path:/sanguo
注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
- [zk: localhost:2181(CONNECTED) 1] ls -w /sanguo
- [shuguo, weiguo]
在 hadoop103 主机/sanguo 节点上创建子节点
- [zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
- Created /sanguo/jin
观察 hadoop104 主机收到子节点变化的监听
- WATCHER::
- WatchedEvent state:SyncConnected type:NodeChildrenChanged
- path:/sanguo
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
集群中的角色有哪些?集群最少需要几台机器?
(1)部署方式单机模式、集群模式
(2)角色:Leader和Follower
(3)集群最少需要机器数:3
需求
现在集群创建server节点
create /servers "servers"
服务器端向Zookeeper注册代码
- public class DistributeServer {
-
- private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private int sessionTimeout = 2000;
- private ZooKeeper zkClient;
-
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
-
- DistributeServer server = new DistributeServer();
-
- //1 连接zookeeper集群
- server.getConnect();
-
- //2 注册节点
- server.regist("test2");
-
- //3 业务逻辑处理
- server.business();
- }
-
- //业务逻辑处理
- private void business() throws InterruptedException {
- Thread.sleep(Long.MAX_VALUE);
- }
-
- //注册节点
- private void regist(String hostName) throws InterruptedException, KeeperException {
- String path = zkClient.create
- ("/servers/server", hostName.getBytes(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- System.out.println(path);
- }
-
- //连接zookeeper集群
- private void getConnect() throws IOException {
-
- zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- List<String> children = null;
- try {
- System.out.println("********************************");
- children = zkClient.getChildren("/", true);
- for (int i = 0; i < children.size(); i++) {
- System.out.println(children.get(i));
- }
- System.out.println("********************************");
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
客户端代码
- public class DistributeClient {
- private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
- private int sessionTimeout = 2000;
- private ZooKeeper zkClient;
-
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
-
- DistributeClient server = new DistributeClient();
-
- //1 连接zookeeper集群
- server.getConnect();
-
- //2 注册节点
- server.getChildren();
-
- //3 业务逻辑处理
- server.business();
- }
-
- //注册监听
- private void getChildren() throws InterruptedException, KeeperException {
- List<String> children = zkClient.getChildren("/servers", true);
- //储存服务器节点主机名称集合
- ArrayList<String> hosts = new ArrayList<>();
-
- for (String child : children) {
- byte[] data = zkClient.getData("/servers/" + child, false, null);
- hosts.add(String.valueOf(data));
- }
-
- //将所有在线主机的名称打印出来
- System.out.println(hosts);
- }
-
- //业务逻辑处理
- private void business() throws InterruptedException {
- Thread.sleep(Long.MAX_VALUE);
- }
-
-
- //连接zookeeper集群
- private void getConnect() throws IOException {
-
- zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- try {
- getChildren();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
锁实现
- package com.herobin.flink.debug_local.zk;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
-
- public class DistributedLock {
-
- private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
-
- // 超时时间
- private int sessionTimeout = 2000;
- private ZooKeeper zk;
- private String rootNode = "locks";
- private String subNode = "seq-";
-
- // 当前 client 等待的子节点
- private String waitPath;
-
- //ZooKeeper 连接
- private CountDownLatch connectLatch = new CountDownLatch(1);
-
- //ZooKeeper 节点等待
- private CountDownLatch waitLatch = new CountDownLatch(1);
-
- // 当前 client 创建的子节点
- private String currentNode;
-
- // 和 zk 服务建立连接,并创建根节点
- public DistributedLock() throws IOException, InterruptedException, KeeperException {
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
- if (event.getState() ==
- Event.KeeperState.SyncConnected) {
- connectLatch.countDown();
- }
- // 发生了 waitPath 的删除事件
- if (event.getType() ==
- Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
- {
- waitLatch.countDown();
- } }
- });
-
- // 等待连接建立
- connectLatch.await();
- //获取根节点状态
- Stat stat = zk.exists("/" + rootNode, false);
- //如果根节点不存在,则创建根节点,根节点类型为永久节点
- if (stat == null) {
- System.out.println("根节点不存在");
- zk.create("/" + rootNode, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- // 加锁方法
- public void zkLock() {
- try {
- //在根节点下创建临时顺序节点,返回值为创建的节点路径
- currentNode = zk.create("/" + rootNode + "/" + subNode,
- null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL);
- // wait 一小会, 让结果更清晰一些
- Thread.sleep(10);
- // 注意, 没有必要监听"/locks"的子节点的变化情况
- List<String> childrenNodes = zk.getChildren("/" +
- rootNode, false);
- // 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
- if (childrenNodes.size() == 1) {
- return;
- } else {
- //对根节点下的所有临时顺序节点进行从小到大排序
- Collections.sort(childrenNodes);
- //当前节点名称
- String thisNode = currentNode.substring(("/" + rootNode + "/").length());
- //获取当前节点的位置
- int index = childrenNodes.indexOf(thisNode);
- if (index == -1) {
- System.out.println("数据异常");
- } else if (index == 0) {
- // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
- return;
- } else {
- // 获得排名比 currentNode 前 1 位的节点
- this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
- // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
- zk.getData(waitPath, true, new Stat());
- //进入等待锁状态
- waitLatch.await();
- return;
- }
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- // 解锁方法
- public void zkUnlock() {
- try {
- zk.delete(this.currentNode, -1);
- } catch (InterruptedException | KeeperException e) {
- e.printStackTrace();
- }
- }
- }
锁测试(创建两个线程)
- package com.herobin.flink.debug_local.zk;
-
- import org.apache.zookeeper.KeeperException;
- import java.io.IOException;
-
-
- public class DistributedLockTest {
-
- public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
-
- // 创建分布式锁 1
- final DistributedLock lock1 = new DistributedLock();
-
- // 创建分布式锁 2
- final DistributedLock lock2 = new DistributedLock();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- // 获取锁对象
- try {
- lock1.zkLock();
- System.out.println("线程 1 获取锁");
- Thread.sleep(5 * 1000);
- lock1.zkUnlock();
- System.out.println("线程 1 释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- } }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- // 获取锁对象
- try {
- lock2.zkLock();
- System.out.println("线程 2 获取锁");
- Thread.sleep(5 * 1000);
- lock2.zkUnlock();
- System.out.println("线程 2 释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- } }
- }).start();
- }
- }
观察运行
- 线程 1 获取锁
- 线程 1 释放锁
- 线程 2 获取锁
- 线程 2 释放锁
查看API Curator章节