• Zookeeper实战案例(1)


    前置知识:

    Zookeeper学习笔记(1)—— 基础知识-CSDN博客 

    Zookeeper学习笔记(2)—— Zookeeper API简单操作-CSDN博客

    Zookeeper 服务器动态上下线监听案例

    需求分析

    某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线

    具体实现

    首先创建节点servers:create /servers "servers"

    服务器向zookeeper注册的代码

    1. package com.why.zkCase;
    2. import org.apache.zookeeper.*;
    3. import org.junit.Before;
    4. import java.io.IOException;
    5. //服务端向zookeeper注册
    6. public class DistributeServer {
    7. private static String connetString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //客户端连接ip
    8. private static int sessionTimeout = 2000; //超时时间
    9. private ZooKeeper zkClient = null; //客户端对象
    10. private String parentNode = "/servers"; //父节点路径
    11. @Before
    12. public void getConnect() throws IOException {
    13. zkClient = new ZooKeeper(connetString, sessionTimeout, new Watcher() {
    14. @Override
    15. public void process(WatchedEvent watchedEvent) {
    16. //收到事件通知后的回调函数
    17. System.out.println("事件类型:" + watchedEvent.getType());
    18. System.out.println("事件路径:" + watchedEvent.getPath());
    19. }
    20. });
    21. }
    22. //注册服务器
    23. public void registServer(String hostname) throws InterruptedException, KeeperException {
    24. String create = zkClient.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    25. System.out.println(hostname + "is online" + create);
    26. }
    27. //业务逻辑
    28. public void business(String hostname) throws Exception {
    29. System.out.println(hostname + " is working ...");
    30. Thread.sleep(Long.MAX_VALUE);
    31. }
    32. public static void main(String[] args) throws Exception {
    33. // 1 获取 zk 连接
    34. DistributeServer server = new DistributeServer();
    35. server.getConnect();
    36. // 2 利用 zk 连接注册服务器信息
    37. server.registServer(args[0]);
    38. // 3 启动业务功能
    39. server.business(args[0]);
    40. }
    41. }

    客户端代码

    1. package com.why.zkCase;
    2. import org.apache.zookeeper.KeeperException;
    3. import org.apache.zookeeper.WatchedEvent;
    4. import org.apache.zookeeper.Watcher;
    5. import org.apache.zookeeper.ZooKeeper;
    6. import org.junit.Before;
    7. import java.io.IOException;
    8. import java.util.ArrayList;
    9. import java.util.List;
    10. //客户端
    11. public class DistributeClient {
    12. private static String connetString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //客户端连接ip
    13. private static int sessionTimeout = 2000; //超时时间
    14. private ZooKeeper zkClient = null; //客户端对象
    15. private String parentNode = "/servers"; //父节点路径
    16. //创建到zk的客户端连接
    17. @Before
    18. public void getConnect() throws IOException {
    19. zkClient = new ZooKeeper(connetString, sessionTimeout, new Watcher() {
    20. @Override
    21. public void process(WatchedEvent watchedEvent) {
    22. //收到事件通知后的回调函数
    23. System.out.println("事件类型:" + watchedEvent.getType());
    24. System.out.println("事件路径:" + watchedEvent.getPath());
    25. // 再次启动监听
    26. try {
    27. getServerList();
    28. } catch (Exception e) {
    29. e.printStackTrace();
    30. }
    31. }
    32. });
    33. }
    34. //获取服务器列表信息
    35. public void getServerList() throws InterruptedException, KeeperException {
    36. //获取服务器子节点信息,并对父节点进行监听
    37. List children = zkClient.getChildren(parentNode, true);
    38. //存储服务器信息列表
    39. ArrayList servers = new ArrayList<>();
    40. //遍历所有节点,获取主机名称信息
    41. for (String child : children)
    42. {
    43. byte[] data = zkClient.getData(parentNode + "/" + child, false, null);
    44. servers.add(new String(data));
    45. }
    46. //打印服务器列表信息
    47. System.out.println(servers);
    48. }
    49. // 业务功能
    50. public void business() throws Exception{
    51. System.out.println("client is working ...");
    52. Thread.sleep(Long.MAX_VALUE);
    53. }
    54. public static void main(String[] args) throws Exception {
    55. // 1 获取 zk 连接
    56. DistributeClient client = new DistributeClient();
    57. client.getConnect();
    58. // 2 获取 servers 的子节点信息,从中获取服务器信息列表
    59. client.getServerList();
    60. // 3 业务进程启动
    61. client.business();
    62. }
    63. }

    测试

    命令行操作

    启动DistributeClient客户端

    在zk的命令行中新建节点:create -e -s /servers/hadoop103 "hadoop103"

    在idea的控制台可以看到:

    删除hadoop103:delete /servers/hadoop1030000000001

    可以看到:

    idea操作

    启动 DistributeClient 客户端

    启动 DistributeServer 服务:

    添加参数:

    然后启动;

    可以看到:

    同时client也可以监听到服务器上线通知:

  • 相关阅读:
    ucontext的简单介绍
    数据挖掘项目(一)
    嵌入式开发中,嵌入式硬件和软件有什么区别?
    windows中elasticsearch7中添加用户名密码验证
    68.qt quick-qml多级折叠下拉导航菜单 支持动态添加/卸载 支持qml/widget加载等
    微服务到底该怎么样部署呢?
    聚合查询、联合查询【mysql数据库】
    Mac环境下反编译工具的使用
    教程 - 深度探讨在 Vue3 中引入 CesiumJS 的最佳方式
    创建order\stock模块 Dubbo 负载均衡
  • 原文地址:https://blog.csdn.net/qq_51235856/article/details/134491789