• Zookeeper的api使用


            Zookeeper作为⼀个分布式框架,主要用来解决分布式⼀致性问题,它提供了简单的分布式原语,并且对多种编程语⾔提供了API,所以接下来重点来看下Zookeeperjava客户端API使用方式

    Zookeeper API共包含五个包,分别为

    1. org.apache.zookeeper
    2. org.apache.zookeeper.data
    3. org.apache.zookeeper.server
    4. org.apache.zookeeper.server.quorum
    5. org.apache.zookeeper.server.upgrade

            其中org.apache.zookeeper,包含Zookeeper类,他是我们编程时最常⽤的类文件。这个类是Zookeeper客户端的主要类文件。如果要使用Zookeeper服务,应⽤程序⾸先必须创建⼀个Zookeeper 实例,这时就需要使用此类。⼀旦客户端和Zookeeper服务端建立起了连接,Zookeeper系统将会给本次连接会话分配⼀个ID值,并且客户端将会周期性的向服务器端发送心跳来维持会话连接。只要连接有效,客户端就可以使⽤Zookeeper API来做相应处理了

    准备工作:导入依赖

    1. <dependency>
    2. <groupId>org.apache.zookeepergroupId>
    3. <artifactId>zookeeperartifactId>
    4. <version>3.4.14version>
    5. dependency>

     

    建立会话

    1. package com.lagou.api;
    2. import org.apache.zookeeper.WatchedEvent;
    3. import org.apache.zookeeper.Watcher;
    4. import org.apache.zookeeper.ZooKeeper;
    5. import java.io.IOException;
    6. import java.util.concurrent.CountDownLatch;
    7. public class CreateSession implements Watcher {
    8. private static CountDownLatch countDownLatch = new CountDownLatch(1);
    9. /**
    10. * 建立会话
    11. */
    12. public static void main(String[] args) throws IOException, InterruptedException {
    13. /*
    14. 客户端可以通过创建⼀个zk实例来连接zk服务器
    15. new Zookeeper(connectString,sesssionTimeOut,Wather)
    16. connectString: 连接地址:IP:端⼝
    17. sesssionTimeOut:会话超时时间:单位毫秒
    18. Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
    19. */
    20. ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
    21. System.out.println(zooKeeper.getState());
    22. // 计数工具类 CountDownLatch : 不让main方法结束,让线程处于等待阻塞
    23. countDownLatch.await();
    24. System.out.println("客户端与服务端会话真正建立了");
    25. }
    26. /**
    27. * 回调方法:处理来自服务器端的watcher通知
    28. */
    29. // 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上的等待阻塞,⾄此,会话创建完毕
    30. public void process(WatchedEvent watchedEvent) {
    31. //当连接创建了,服务端发送给客户端SyncConnected事件
    32. if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {
    33. // 解除主程序在CountDownLatch上的等待阻塞
    34. countDownLatch.countDown();
    35. }
    36. }
    37. }

            注意,ZooKeeper 客户端和服务端会话的建立是⼀个异步的过程,也就是说在程序中,构造⽅法会在处理完客户端初始化工作后立即返回,在⼤多数情况下,此时并没有真正建立好⼀个可用的会话,在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后ZooKeeper服务端会向会话对应的客户端发送⼀个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。

    创建节点

    1. package com.lagou.api;
    2. import org.apache.zookeeper.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.CountDownLatch;
    5. public class CreateNote implements Watcher {
    6. private static ZooKeeper zooKeeper;
    7. /**
    8. * 建立会话
    9. */
    10. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    11. /*
    12. 客户端可以通过创建⼀个zk实例来连接zk服务器
    13. new Zookeeper(connectString,sesssionTimeOut,Wather)
    14. connectString: 连接地址:IP:端⼝
    15. sesssionTimeOut:会话超时时间:单位毫秒
    16. Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
    17. */
    18. zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
    19. System.out.println(zooKeeper.getState());
    20. Thread.sleep(Integer.MAX_VALUE);
    21. }
    22. // 创建节点的方法
    23. private static void createNoteSync() throws InterruptedException, KeeperException {
    24. /**
    25. * path :节点创建的路径
    26. * data[] :节点创建要保存的数据,是个byte类型的
    27. * acl :节点创建的权限信息(4种类型)
    28. * ANYONE_ID_UNSAFE : 表示任何⼈
    29. * AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。
    30. * OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)--> world:anyone
    31. * CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
    32. * createMode :创建节点的类型(4种类型)
    33. * PERSISTENT:持久节点
    34. * PERSISTENT_SEQUENTIAL:持久顺序节点
    35. * EPHEMERAL:临时节点
    36. * EPHEMERAL_SEQUENTIAL:临时顺序节点
    37. String node = zookeeper.create(path,data,acl,createMode);
    38. */
    39. // 持久节点
    40. String note_persistent = zooKeeper.create("/lg-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    41. // 临时节点
    42. String note_ephemeral = zooKeeper.create("/lg-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    43. // 持久顺序节点
    44. String note_sequential = zooKeeper.create("/lg-sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    45. System.out.println("创建的持久节点: " + note_persistent);
    46. System.out.println("创建的临时节点: " + note_ephemeral);
    47. System.out.println("创建的持久顺序节点: " + note_sequential);
    48. }
    49. /**
    50. * 回调方法:处理来自服务器端的watcher通知
    51. */
    52. public void process(WatchedEvent watchedEvent) {
    53. // SyncConnected
    54. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
    55. // 创建节点
    56. try {
    57. createNoteSync();
    58. } catch (Exception e) {
    59. throw new RuntimeException(e);
    60. }
    61. }
    62. }
    63. }

    获取节点数据

    1. package com.lagou.api;
    2. import org.apache.zookeeper.*;
    3. import java.io.IOException;
    4. import java.util.List;
    5. public class GetNoteData implements Watcher {
    6. private static ZooKeeper zooKeeper;
    7. /**
    8. * 建立会话
    9. */
    10. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    11. /*
    12. 客户端可以通过创建⼀个zk实例来连接zk服务器
    13. new Zookeeper(connectString,sesssionTimeOut,Wather)
    14. connectString: 连接地址:IP:端⼝
    15. sesssionTimeOut:会话超时时间:单位毫秒
    16. Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
    17. */
    18. zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
    19. System.out.println(zooKeeper.getState());
    20. Thread.sleep(Integer.MAX_VALUE);
    21. }
    22. /**
    23. * 回调方法:处理来自服务器端的watcher通知
    24. */
    25. public void process(WatchedEvent watchedEvent) {
    26. /*
    27. 子节点列表发生改变时,服务端会发送noteChildrenChanged事件通知
    28. 要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
    29. */
    30. if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
    31. List children = null;
    32. try {
    33. children = zooKeeper.getChildren("/lg-persistent", true);
    34. } catch (KeeperException | InterruptedException e) {
    35. throw new RuntimeException(e);
    36. }
    37. System.out.println(children);
    38. }
    39. // SyncConnected
    40. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
    41. // 获取节点数据的方法
    42. try {
    43. getNoteData();
    44. // 获取节点的子节点列表方法
    45. getChildren();
    46. } catch (Exception e) {
    47. throw new RuntimeException(e);
    48. }
    49. }
    50. }
    51. /*
    52. 获取某个节点的内容
    53. */
    54. private void getNoteData() throws Exception {
    55. /**
    56. * path : 获取数据的路径
    57. * watch : 是否开启监听
    58. * stat : 节点状态信息
    59. * null: 表示获取最新版本的数据
    60. * zk.getData(path, watch, stat);
    61. */
    62. byte[] data = zooKeeper.getData("/lg-persistent", false, null);
    63. System.out.println(new String(data));
    64. }
    65. /*
    66. 获取某个节点的子节点列表方法
    67. */
    68. public static void getChildren() throws InterruptedException, KeeperException {
    69. /*
    70. path:路径
    71. watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
    72. zooKeeper.getChildren(path, watch);
    73. */
    74. List children = zooKeeper.getChildren("/lg-persistent", true);
    75. System.out.println(children);
    76. }
    77. }

    修改节点数据

    1. package com.lagou.api;
    2. import org.apache.zookeeper.*;
    3. import org.apache.zookeeper.data.Stat;
    4. import java.io.IOException;
    5. public class UpdateNoteData implements Watcher {
    6. private static ZooKeeper zooKeeper;
    7. /**
    8. * 建立会话
    9. */
    10. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    11. /*
    12. 客户端可以通过创建⼀个zk实例来连接zk服务器
    13. new Zookeeper(connectString,sesssionTimeOut,Wather)
    14. connectString: 连接地址:IP:端⼝
    15. sesssionTimeOut:会话超时时间:单位毫秒
    16. Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
    17. */
    18. zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new UpdateNoteData());
    19. System.out.println(zooKeeper.getState());
    20. Thread.sleep(Integer.MAX_VALUE);
    21. }
    22. /**
    23. * 回调方法:处理来自服务器端的watcher通知
    24. */
    25. public void process(WatchedEvent watchedEvent) {
    26. // SyncConnected
    27. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
    28. // 更新数据节点内容的方法
    29. try {
    30. updateNoteSync();
    31. } catch (InterruptedException | KeeperException e) {
    32. throw new RuntimeException(e);
    33. }
    34. }
    35. }
    36. /*
    37. 更新数据节点内容的方法
    38. */
    39. private void updateNoteSync() throws InterruptedException, KeeperException {
    40. /*
    41. path:路径
    42. data:要修改的内容 byte[]
    43. version:为-1,表示对最新版本的数据进⾏修改
    44. zooKeeper.setData(path, data,version);
    45. */
    46. byte[] data = zooKeeper.getData("/lg-persistent", false, null);
    47. System.out.println("修改前的值:" + new String(data));
    48. // 修改 /lg-persistent 的数据 stat: 状态信息对象
    49. Stat stat = zooKeeper.setData("/lg-persistent", "客户端修改了节点数据".getBytes(), -1);
    50. byte[] data2 = zooKeeper.getData("/lg-persistent", false, null);
    51. System.out.println("修改后的值:" + new String(data2));
    52. }
    53. }

    删除节点

    1. package com.lagou.api;
    2. import org.apache.zookeeper.*;
    3. import org.apache.zookeeper.data.Stat;
    4. import java.io.IOException;
    5. public class DeleteNote implements Watcher {
    6. private static ZooKeeper zooKeeper;
    7. /**
    8. * 建立会话
    9. */
    10. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    11. /*
    12. 客户端可以通过创建⼀个zk实例来连接zk服务器
    13. new Zookeeper(connectString,sesssionTimeOut,Wather)
    14. connectString: 连接地址:IP:端⼝
    15. sesssionTimeOut:会话超时时间:单位毫秒
    16. Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
    17. */
    18. zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
    19. System.out.println(zooKeeper.getState());
    20. Thread.sleep(Integer.MAX_VALUE);
    21. }
    22. /**
    23. * 回调方法:处理来自服务器端的watcher通知
    24. */
    25. public void process(WatchedEvent watchedEvent) {
    26. // SyncConnected
    27. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
    28. // 删除节点
    29. try {
    30. deleteNoteSync();
    31. } catch (InterruptedException | KeeperException e) {
    32. throw new RuntimeException(e);
    33. }
    34. }
    35. }
    36. /*
    37. 删除节点方法
    38. */
    39. private void deleteNoteSync() throws InterruptedException, KeeperException {
    40. /*
    41. zooKeeper.exists(path,watch) :判断节点是否存在
    42. zookeeper.delete(path,version) : 删除节点
    43. */
    44. Stat stat = zooKeeper.exists("/lg-persistent/c1", false);
    45. System.out.println(stat == null ? "该节点不存在" : "该节点存在");
    46. if (stat != null) {
    47. zooKeeper.delete("/lg-persistent/c1", -1);
    48. }
    49. Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false);
    50. System.out.println(stat2 == null ? "该节点不存在" : "该节点存在");
    51. }
    52. }

  • 相关阅读:
    过滤器和监听器
    Linux常用命令
    JDBC---封装JDBC代码,配置properties文件
    阿里影业S1财报解读:优质内容叠加整合效益,转动增长飞轮
    懒人制作企业期刊的秘籍
    详解内存SDRAM原理(P-Bank、L-Bank、刷新、预充电等)
    谷粒商城 (二十七) --------- 商品服务 API 商品管理
    IDA pro逆向工具寻找socket server的IP和port
    SpringMVC基础:请求转发和重定向
    Explain执行计划字段解释说明---type字段说明(02)
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/126259980