1.Zookeeper Java客户端
2.Apache Curator 开源客户端
3.Zookeeper集群&不停机动态扩容/缩容
4.Zookeeper经典应用场景
Zookeeper的java客户端:
引入maven依赖:
zookeeper 官方的客户端没有和服务端代码分离,他们为同一个jar 文件,所以我们直接引入zookeeper的maven即可, 这里版本请保持与服务端版本一致,不然会有很多兼容性的问题
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
代码示例:
@Slf4j
public class ZookeeperClientDemo {
public static void main(String[] args) throws Exception {
String nodeName = "/mytest-node";
//创建客户端
ZooKeeper zooKeeper = new ZooKeeper("192.168.244.133:2181", 50000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
log.info("已经建立连接watchedEvent={}", JSON.toJSONString(watchedEvent));
}
});
//客户端创建节点
zooKeeper.create(nodeName, "mydata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//创建监听对象
Watcher watcher = new Watcher() {
@SneakyThrows
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDataChanged && Objects.equals(watchedEvent.getPath(), nodeName)) {
//因为每次监听的注册是一次性的,所以需要递归注册监听
byte[] data = zooKeeper.getData(nodeName, this, null);
log.info("获取的数据结果为{}", new String(data));
} else {
byte[] data = zooKeeper.getData(nodeName, this, null);
log.info("节点其他类型的变化{}", new String(data));
}
}
};
//获取节点数据
byte[] data = zooKeeper.getData(nodeName, watcher, null);
log.info("获取的数据结果为{}", new String(data));
Thread.sleep(Integer.MAX_VALUE);
}
}
创建Zookeeper实例的方法:
(1)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
(2)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig)
(3)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider)
(4)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider, ZKClientConfig)
(5)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
(6)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig)
(7)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[]) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider)
(8)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider, ZKClientConfig) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean)
构造函数中参数含义:
connectString:
ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成,
每一个都代表一台ZooKeeper机器,如,host1:port1,host2:port2,host3:port3。另外,也可以在connectString中设置客户端连接上ZooKeeper
后的根目录,方法是在host:port字符串之后添加上这个根目录,例如,host1:port1,host2:port2,host3:port3/zk-base,这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper
的操作,都会基于这个根目录。例如,客户端对/sub-node 的操作,最终创建 /zk-node/sub-node, 这个目录也叫Chroot,即客户端隔离命名空间。
sessionTimeout:
会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有
会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检
测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效
的心跳检测,会话就会失效。
Watcher:
ZooKeeper允许
客户端在构造方法中传入一个接口 watcher (org.apache. zookeeper.
Watcher)的实现类对象来作为默认的 Watcher事件通知处理器。当然,该参
数可以设置为null 以表明不需要设置默认的 Watcher处理器。
canBeReadOnly
这是一个boolean类型的参数,用于标识当前会话是否支持“read-only(只
读)”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及
以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请
求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们
还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)——
这就是 ZooKeeper的“read-only”模式。
sessionId和 sessionPasswd:
分别代表会话ID和会话秘钥。这两个参数能够唯一确定一个会话,同时客户
端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。具体
使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实
例的以下两个接口,即可获得当前会话的ID和秘钥:
long getSessionId();
byte[]getSessionPasswd( );
荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传
入构造方法了
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性
这里我们以 Maven 工程为例,首先要引入Curator 框架相关的开发包, 由于Zookeeper本身以来了 log4j 日志框架,所以这里可以创建对应的log4j配置文件后直接使用。 如下面的代码所示,我们通过将 Curator 相关的引用包配置到 Maven 工程的 pom 文件中,将 Curaotr 框架引用到工程项目里,在配置文件中分别引用了两个 Curator 相关的包,第一个是 curator-framework 包,该包是对 ZooKeeper 底层 API 的一些封装。另一个是 curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier。
引入的依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
简单的使用代码:
@Slf4j
public class CuratorDemo {
public static void main(String[] args) throws Exception {
//创建重试策略对象
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
//创建客户端对象,会话
CuratorFramework framework = CuratorFrameworkFactory.builder()
.connectString("192.168.244.133:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retry).build();
//启动客户端
framework.start();
String nodePath = "/curator-node";
//创建系节点和设置节点数据,CreateMode.EPHEMERAL临时节点
String path = framework.create().withMode(CreateMode.PERSISTENT).forPath(nodePath, "mydata".getBytes());
//获取节点数据
byte[] bytes = framework.getData().forPath(nodePath);
log.info("获得的结果为{}", new String(bytes));
//设置节点数据
framework.setData().forPath(nodePath, "dataChange".getBytes());
//删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点
/***
* guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
*/
framework.delete().guaranteed().deletingChildrenIfNeeded().forPath(nodePath);
}
}
这段代码的编码风格采用了流式方式,最核心的类是 CuratorFramework 类,该类的作用是定义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。在定义 CuratorFramework 对象实例的时候,我们使用了 CuratorFrameworkFactory 工厂方法,并指定了 connectionString 服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、connectionTimeoutMs 会话创建超时时间。下面我们分别对这几个参数进行讲解:
connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
异步接受服务端范围的数据:
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
代码如下:
public static void asyncBackGroundProcess() throws Exception {
String path="/asyn-node";
framework.create().forPath(path,"data...".getBytes());
framework.getData().inBackground((client,event)->{
log.info(Thread.currentThread().getName()+"获取的数据为event={}", JSON.toJSONString(event));
}).forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
指定线程池
public static void asyncBackGroundProcess1() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
String path="/asyn-node1";
framework.create().forPath(path,"data...".getBytes());
framework.getData().inBackground((client,event)->{
log.info(Thread.currentThread().getName()+"获取的数据为event={}", JSON.toJSONString(event));
},executorService).forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
添加节点监听事件
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能,就是一直监听这个节点。Cache 分为两类注册类型:节点监听和子节点监听。
1、NodeCache
监听节点本身数据变化(当前新建、修改、删除触发),对子节点的修改不监听;
参数:
代码案例:
private static void pathChildrenCacheProcess() throws Exception {
String path1 = "/asyn-node1";
framework.setData().forPath("/asyn-node1/sub-node1", "testData".getBytes());
PathChildrenCache pathChildrenCache = new PathChildrenCache(framework, path1, true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCache) throws Exception {
byte[] data = pathChildrenCache.getData().getData();
log.info(Thread.currentThread().getName() + "当前节点{}的数据为:{},事件类型为:{}", pathChildrenCache.getData().getPath(), Objects.isNull(data) ? "" :
new String(data), pathChildrenCache.getType());
log.info("state={}", JSON.toJSONString(pathChildrenCache.getData().getStat()));
}
});
pathChildrenCache.start();
}
2、PathChildrenCache
监听子节点变化,包括当前节点子节点的数据的修改、子节点的新增和删除;
代码如下
private static void pathChildrenCacheProcess() throws Exception {
String path1 = "/asyn-node1";
framework.setData().forPath("/asyn-node1/sub-node1", "testData".getBytes());
PathChildrenCache pathChildrenCache = new PathChildrenCache(framework, path1, true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCache) throws Exception {
byte[] data = pathChildrenCache.getData().getData();
log.info(Thread.currentThread().getName() + "当前节点{}的数据为:{},事件类型为:{}", pathChildrenCache.getData().getPath(), Objects.isNull(data) ? "" :
new String(data), pathChildrenCache.getType());
log.info("state={}", JSON.toJSONString(pathChildrenCache.getData().getStat()));
}
});
pathChildrenCache.start();
}
3、TreeCache
TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件;
监听的节点和递归子节点发生任何的变化:
代码如下:
private static void treeCacheProcess() throws Exception {
String path1 = "/asyn-node1";
TreeCache treeCache = new TreeCache(framework, path1);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData childData = treeCacheEvent.getData();
byte[] data = null;
if (Objects.nonNull(childData)) {
data = childData.getData();
}
ChildData oldeChildData = treeCacheEvent.getOldData();
byte[] data1 = null;
if (Objects.nonNull(oldeChildData)) {
data1 = oldeChildData.getData();
}
log.info(Thread.currentThread().getName() + "监听到的path为:{} " +
"监听到的由老数据{}变为新数据{} " +
"监听到的state={} " +
"监听到的type={}",
Objects.nonNull(childData) ? childData.getPath() : "",
Objects.isNull(data1) ? "" : new String(data1),
Objects.isNull(data) ? "" : new String(data), Objects.nonNull(childData) ? childData.getStat() : ""
, treeCacheEvent.getType());
}
});
treeCache.start();
}
Zookeeper 集群模式一共有三种类型的角色
Leader: 处理所有的事务请求(写请求),可以处理读请求,集群中只能有一个Leader
Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点要参与到新的Leader选举中,有可能成为新的Leader节点。
Observer:只能处理读请求。不能参与选举
客户端连接集群中的任意一台机器,如果连接的是一台follower,有写请求时,会转发给leader。
集群的搭建:按上图的搭建,(伪集群)
1、具有jdk1.8环境
2、解压包
tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz
3、创建文件夹来放集群的数据文件
Mkdir zkCluster mkdir ./zkCluster/data
分别在data文件夹下创建4个文件:zookeeper1 zookeeper2 zookeeper3 zookeeper4,用来最为配置文件中dataDir配置项的目录;
4、分别非四台机器创建配置文件
cp ./apache-zookeeper-3.5.9-bin/conf/zoo_sample.cfg ./zkCluster/zoo1.cfg
Vim ./zkCluster/zoo1.cfg
修改里面的配置项
dataDir=/usr/local/zookeeper/zkCluster/data/zookeeper1
clientPort=2181 #这个可以不变
在最后添加以下配置, participant 可以不用写,默认就是participant
server.1=192.168.244.133:2001:3001:participant
server.2=192.168.244.133:2002:3002:participant
server.3=192.168.244.133:2003:3003:participant
server.4=192.168.244.133:2004:3004:observer
第一份配置文件完毕
创建和配置第二份配置文件
cp ./zkCluster/zoo1.cfg ./zkCluster/zoo2.cfg
Vim ./zkCluster/zoo2.cfg
clientPort=2182 #因为在同一台机器上,所以端口号递增
server.1=192.168.244.133:2001:3001:participant
server.2=192.168.244.133:2002:3002:participant
server.3=192.168.244.133:2003:3003:participant
server.4=192.168.244.133:2004:3004:observer
第二份配置文件保存完毕,以此类推,一直zoo4.cfg创建和配置完成
5、配置server id(就是给每台机器设置一个唯一id)
就是在上面创建的四个文件中zookeeper1 zookeeper2 zookeeper3 zookeeper4分别添加myid文件,四个文件分别写入标识1 2 3 4
vim ./zkCluster/data/zookeeper1/myid
写入 1
vim ./zkCluster/data/zookeeper2/myid
写入 2
vim ./zkCluster/data/zookeeper3/myid
写入 3
vim ./zkCluster/data/zookeeper4/myid
写入 4
6、启动所有服务器
./apache-zookeeper-3.5.9-bin/bin/zkServer.sh start ./zkCluster/zoo1.cfg
./apache-zookeeper-3.5.9-bin/bin/zkServer.sh start ./zkCluster/zoo2.cfg
./apache-zookeeper-3.5.9-bin/bin/zkServer.sh start ./zkCluster/zoo3.cfg
./apache-zookeeper-3.5.9-bin/bin/zkServer.sh start ./zkCluster/zoo4.cfg
7、分别检查每个服务的状态
如:
./apache-zookeeper-3.5.9-bin/bin/zkServer.sh status ./zkCluster/zoo3.cfg
8、客户端连接集群
./apache-zookeeper-3.5.9-bin/bin/zkCli.sh -server 192.168.244.133:2181,192.168.244.133:2182,192.168.244.133:2183,192.168.244.133:2184
zoo.cfg配置文件配置项的介绍:
tickTime:用于配置Zookeeper中最小时间单位的长度,很多运行时的时间间隔都是使用tickTime的倍数来表示的。
initLimit:该参数用于配置Leader服务器等待Follower启动,并完成数据同步的时间。Follower服务器再启动过程中,会与Leader建立连接并完成数据的同步,从而确定自己对外提供服务的起始状态。Leader服务器允许Follower再initLimit 时间内完成这个工作。
syncLimit:Leader 与Follower心跳检测的最大延时时间
dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
server.A=B:C:D:E 其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。如果需要通过添加不参与集群选举以及事务请求的过半机制的 Observer节点,可以在E的位置,添加observer标识。
curator连接集群的方式和单机版是一样的,如下:
framework = CuratorFrameworkFactory.builder()
.connectString("192.168.244.133:2181,192.168.244.133:2182,192.168.244.133:2183,192.168.244.133:2184")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(50000)
.retryPolicy(retry).build();
Zookeeper 3.5.0 新特性: 集群动态配置
可以在不停机的情况下,不停机动态扩容/缩容;
Zookeeper 3.5.0 以前,Zookeeper集群角色要发生改变的话,只能通过停掉所有的Zookeeper服务,修改集群配置,重启服务来完成,这样集群服务将有一段不可用的状态,为了应对高可用需求,Zookeeper 3.5.0 提供了支持动态扩容/缩容的 新特性。但是通过客户端API可以变更服务端集群状态是件很危险的事情,所以在zookeeper 3.5.3 版本要用动态配置,需要开启超级管理员身份验证模式 ACLs。如果是在一个安全的环境也可以通过配置 系统参数 -Dzookeeper.skipACL=yes 来避免配置维护acl 权限配置。
这里不做集群扩容的搭建了演示了。