ZooKeeper集群主要是帮我们做分布式协调的,今天我们用zk实现 分布式配置 。
本文的实现方式所采用的是 ZooKeeper 的原生API进行开发的,目的是更好的理解 ZooKeeper 的运作方式,当然可以用 Curator 这个轮子来简化开发。
为什么需要做分布式配置呢?
比如我们线上有很多微服务分布在很多服务器上,有一天其中一个微服务比如 user-service 的 ip地址需要变更,而 user-service 需要对很多其他程序提供服务,这个时候如果没有一个 统一协调 的东西,每个用到 user-service 的应用程序都要做相应的ip地址修改,这将是一件很麻烦的事情!
使用zk来做分布式的配置,能够解决这个问题。

下面演示一下如何code用zk做分布式配置。
- <dependency>
- <groupId>org.apache.zookeepergroupId>
- <artifactId>zookeeperartifactId>
- <version>3.7.1version>
- dependency>
- 复制代码
Jar 包版本和 ZooKeeper 的版本一致:

既然做分布式配置,我们先构造一个配置出来,这个配置就是用来同步 user-service 的ip地址用的:
- public class MyConfig {
-
- private String userServiceIP;
-
- public String getUserServiceIP() {
- return userServiceIP;
- }
-
- public void setUserServiceIP(String userServiceIP) {
- this.userServiceIP = userServiceIP;
- }
- }
- 复制代码
ZooKeeper 类:
- @Public
- public class ZooKeeper implements AutoCloseable
- 复制代码
ZooKeeper 类是 ZooKeeper客户端库的主要类。要使用ZooKeeper服务,应用程序必须首先实例化ZooKeeper类的对象。所有的操作都将通过调用ZooKeeper类的方法来完成。这个类的方法是线程安全的,除非另有说明。
常用的构造方法:
- ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- 复制代码
要创建一个ZooKeeper客户端对象,应用程序需要传递一个连接字符串,该字符串包含一个由逗号分隔的主机:端口对列表,每个端口对对应于一个ZooKeeper服务器。
- String servers = "192.168.242.11:2181,192.168.242.12:2181,192.168.242.13:2181,192.168.242.14:2181/conf";
-
- ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher{...});
- 复制代码
这里提一下,192.168.242.14:2181/conf 这个后面带了个 /conf ,表示客户端如果成功建立了到zk集群的连接,那么默认该客户端工作的根 path 就是 /conf ,如果不带 /conf ,默认根path是 / 。
一旦建立了到服务器的连接,会话ID就被分配给客户端。客户端将定期向服务器发送心跳,以保持会话有效。
只要客户机的会话ID保持有效,应用程序就可以通过客户机调用ZooKeeper api。
如果由于某种原因,客户端在很长一段时间内(例如超过sessionTimeout值)没有向服务器发送心跳,服务器将使会话过期,并且会话ID将失效。客户端对象将不再可用。此时要进行ZooKeeper API调用,应用程序必须创建一个新的客户端对象。
如果客户端当前连接的ZooKeeper服务器失败或没有响应,客户端将在其会话ID到期前自动尝试连接到另一个服务器。如果成功,应用程序可以继续使用客户机。
ZooKeeper API方法可以是同步的,也可以是异步的。同步方法会阻塞,直到服务器响应为止。异步方法只是将请求放入队列以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行或出现错误时执行,并带有一个指示错误的适当返回代码(rc)。
常用的API
- //返回给定路径的节点的stat。如果不存在这样的节点,则返回null。
- //如果监视为真,并且调用成功(没有抛出异常),则在节点上保留一个具有给定路径的监视。
- //当创建/删除节点或设置节点上的数据的操作成功时,将触发监视。
- public Stat exists(String path,
- boolean watch)
- throws KeeperException,
- InterruptedException
- 复制代码
- //exists的异步实现版本
- public void exists(String path,
- Watcher watcher,
- AsyncCallback.StatCallback cb,
- Object ctx)
- 复制代码
- //也是exists的异步实现版本
- public void exists(String path,
- boolean watch,
- AsyncCallback.StatCallback cb,
- Object ctx)
- 复制代码
一般情况下,我们会使用异步版本,需要自定义一个回调函数,如果传参 Watcher watcher ,需要实现Watcher接口 。
- //返回给定路径的节点的数据和stat。
- //如果监视是非空的,并且调用成功(没有抛出异常),则节点上将保留一个带有给定路径的监视。
- //在节点上设置数据或删除节点的操作成功后,将触发监视。
- public byte[] getData(String path,
- Watcher watcher,
- Stat stat)
- throws KeeperException,
- InterruptedException
-
- //返回给定路径的节点的数据和stat。
- //如果监视为真,并且调用成功(没有抛出异常),则在节点上保留一个具有给定路径的监视。
- //在节点上设置数据或删除节点的操作成功后,将触发监视。
- public byte[] getData(String path,
- boolean watch,
- Stat stat)
- throws KeeperException,
- InterruptedException
- 复制代码
- //getData的异步实现版本。
- public void getData(String path,
- Watcher watcher,
- AsyncCallback.DataCallback cb,
- Object ctx)
-
- public void getData(String path,
- boolean watch,
- AsyncCallback.DataCallback cb,
- Object ctx)
- 复制代码
基于以上API介绍,我们来实现一个简单的分布式配置。
创建zk对象的时候,需要一个watcher,后面进行node操作也需要watcher,但是这两类watcher的功能不一样,创建zk对象需要的watcher我定义成 DefaultWatcher :
- public class DefaultWatcher implements Watcher {
-
- private CountDownLatch latch;
-
- public DefaultWatcher() {
- }
-
- public DefaultWatcher(CountDownLatch latch) {
- this.latch = latch;
- }
-
- @Override
- public void process(WatchedEvent watchedEvent) {
- System.out.println("DefaultWatcher watchedEvent:" + watchedEvent);
- Event.KeeperState state = watchedEvent.getState();
- switch (state) {
- case Disconnected:
- break;
- case SyncConnected:
- System.out.println("SyncConnected.");
- //连接成功后,执行countDown,此时便可以拿zk对象使用了
- latch.countDown();
- break;
- case AuthFailed:
- break;
- case ConnectedReadOnly:
- break;
- case SaslAuthenticated:
- break;
- case Expired:
- break;
- case Closed:
- break;
- }
- }
- }
- 复制代码
由于是异步调用,如果直接new完ZooKeeper对象后,不进行阻塞,很有可能还没连接上就执行后续的操作了,因此,此处我用 CountDownLatch 来做一个阻塞,当监测到连接成功了,就countDown,放行,执行后续zk操作。
zk连接成功后,可通过 exists 方法判断有没有 /conf/userServiceConf 这个node,如果有就 getData 。
从前面的介绍,已经知道了 exists 和 getData 都需要一个 callback ,而且需要一个 watcher ,因此我可以构造一个 WatchAndCallback 类出来,该类实现 Watcher 以及节点状态的 StatCallback 和数据的 DataCallback :
- public class WatchAndCallback implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
-
- private ZooKeeper zk;
- private MyConfig config;
- CountDownLatch latch = new CountDownLatch(1);
-
- public void await() {
- zk.exists("/userServiceConf", this, this, "exists watch");
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
-
- /**
- * DataCallback
- * 此回调用于检索节点的数据和stat
- * @param rc 调用返回的code或结果
- * @param path 传递给异步调用的路径
- * @param ctx 传递给异步调用的上下文对象
- * @param data 节点的数据
- * @param stat 指定节点的Stat对象
- * @author 行百里者
- * @create 2020/9/16 10:42
- **/
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- if (data != null) {
- String res = new String(data);
- config.setUserServiceIP(res);
- latch.countDown();
- }
- }
-
- /**
- * StatCallback
- * 此回调用于检索节点的stat
- * @param rc 调用返回的code或结果
- * @param path 传递给异步调用的路径
- * @param ctx 传递给异步调用的上下文对象
- * @param stat 指定路径上节点的Stat对象
- * @author 行百里者
- * @create 2020/9/16 10:39
- **/
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (stat != null) {
- zk.getData("/userServiceConf", this, this, "Msg:Stat is not null");
- }
- }
-
- /**
- * Watcher接口的实现。
- * Watcher接口指定事件处理程序类必须实现的公共接口。
- * ZooKeeper客户机将从它连接到的ZooKeeper服务器获取各种事件。
- * 使用这种客户机的应用程序通过向客户机注册回调对象来处理这些事件。
- * 回调对象应该是实现监视器接口的类的实例。
- * @param watchedEvent WatchedEvent表示监视者能够响应的ZooKeeper上的更改。WatchedEvent包含发生了什么,ZooKeeper的当前状态,以及事件中涉及的znode的路径。
- * @author 行百里者
- * @create 2020/9/16 10:45
- **/
- @Override
- public void process(WatchedEvent watchedEvent) {
- Event.EventType type = watchedEvent.getType();
- switch (type) {
- case None:
- break;
- case NodeCreated:
- //当一个node被创建后,获取node
- //getData中又会触发StatCallback的回调processResult
- zk.getData("/userServiceConf", this, this, "Msg:NodeCreated");
- break;
- case NodeDeleted:
- //节点被删除
- config.setUserServiceIP("");
- //重新开启CountDownLatch
- latch = new CountDownLatch(1);
- break;
- case NodeDataChanged:
- //节点数据被改变了
- //会触发DataCallback的回调
- zk.getData("/userServiceConf", this, this, "Msg:NodeDataChanged");
- break;
- case NodeChildrenChanged:
- break;
- case DataWatchRemoved:
- break;
- case ChildWatchRemoved:
- break;
- case PersistentWatchRemoved:
- break;
- }
- }
-
- public ZooKeeper getZk() {
- return zk;
- }
-
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
-
- public MyConfig getConfig() {
- return config;
- }
-
- public void setConfig(MyConfig config) {
- this.config = config;
- }
- }
- 复制代码
前面都准备好了,可以编写测试了:
- public class ZooKeeperConfigTest {
- static String servers = "192.168.242.11:2181,192.168.242.12:2181,192.168.242.13:2181,192.168.242.14:2181/conf";
-
- static ZooKeeper zk;
-
- static CountDownLatch latch = new CountDownLatch(1);
-
- @SneakyThrows
- @BeforeAll
- public static void getZK() {
- //System.out.println("servers:" + servers);
- //因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作
- zk = new ZooKeeper(servers, 3000, new DefaultWatcher(latch));
- latch.await();
- }
-
- @SneakyThrows
- @AfterAll
- public static void close() {
- zk.close();
- }
-
- @Test
- public void zkConfigTest() {
- WatchAndCallback watchAndCallback = new WatchAndCallback();
- MyConfig config = new MyConfig();
- watchAndCallback.setConfig(config);
- watchAndCallback.setZk(zk);
-
- //阻塞等待
- watchAndCallback.await();
-
- //方便观测,写个死循环
- for(;;) {
- if (config.getUserServiceIP().equals("")) {
- System.out.println("node可能被删除了");
- } else {
- System.out.println("userServiceIP:" + config.getUserServiceIP());
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- }
- 复制代码
集群初始状态
- [zk: localhost:2181(CONNECTED) 0] ls /
- [laogong, zookeeper]
- 复制代码
可以看到,这时还没有 /conf/userServiceConf 这个节点,此时启动测试程序连接上zk集群。
我在 zknode04 这个zkserver上开一个zkCli,手动创建 /conf/userServiceConf 并赋值 8.8.8.8 ,同时观察程序运行情况:

可以看到,创建完成之后,程序马上给出相应,打印出了我配置的值 userServiceIP:8.8.8.8 !
此时,再设置 /conf/userServiceConf 的值为 2.2.2.2 :

客户端很快得到了新值。
此时删除 /conf/userServiceConf 这个节点,会发生什么?前面程序已经写了watch,当检测到node被删除,watch and callback执行:
- case NodeDeleted:
- //节点被删除
- config.setUserServiceIP("");
- //重新开启CountDownLatch
- latch = new CountDownLatch(1);
- break;
- 复制代码
在测试类:
- if (config.getUserServiceIP().equals("")) {
- System.out.println("node可能被删除了");
- //此时应该阻塞住,等待着node重新创建
- watchAndCallback.await();
- }
- 复制代码
实操删除node:

执行了 System.out.println("node可能被删除了"); 并且程序阻塞在此,此时重新创建/conf/userServiceConf 节点:

程序正常运行,并且成功获取到了zk配置的最新数据!
OK,至此已经体验了zk能够实现分布式配置中心这个功能了。
案例中用的是
getData,实际场景我们可能更多地用getChildren,获取一系列服务的ip。
zookeeper的高可用、快速、一致性等等非常优秀的特点能完美的实现分布式配置中心,也能用它来实现微服务的注册发现。
本文代码地址:
如果对你有帮助,请帮忙star,感谢!
来源:https://juejin.cn/post/7130546506796859400