前置知识:
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线
首先创建节点servers:create /servers "servers"
- package com.why.zkCase;
-
- import org.apache.zookeeper.*;
- import org.junit.Before;
-
- import java.io.IOException;
-
- //服务端向zookeeper注册
- public class DistributeServer {
- private static String connetString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //客户端连接ip
- private static int sessionTimeout = 2000; //超时时间
- private ZooKeeper zkClient = null; //客户端对象
- private String parentNode = "/servers"; //父节点路径
-
- @Before
- public void getConnect() throws IOException {
- zkClient = new ZooKeeper(connetString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //收到事件通知后的回调函数
- System.out.println("事件类型:" + watchedEvent.getType());
- System.out.println("事件路径:" + watchedEvent.getPath());
- }
- });
- }
-
- //注册服务器
- public void registServer(String hostname) throws InterruptedException, KeeperException {
- String create = zkClient.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- System.out.println(hostname + "is online" + create);
- }
-
- //业务逻辑
- public void business(String hostname) throws Exception {
- System.out.println(hostname + " is working ...");
- Thread.sleep(Long.MAX_VALUE);
- }
-
- public static void main(String[] args) throws Exception {
- // 1 获取 zk 连接
- DistributeServer server = new DistributeServer();
- server.getConnect();
- // 2 利用 zk 连接注册服务器信息
- server.registServer(args[0]);
- // 3 启动业务功能
- server.business(args[0]);
- }
- }
- package com.why.zkCase;
-
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import org.junit.Before;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
-
- //客户端
- public class DistributeClient {
- private static String connetString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //客户端连接ip
- private static int sessionTimeout = 2000; //超时时间
- private ZooKeeper zkClient = null; //客户端对象
- private String parentNode = "/servers"; //父节点路径
-
- //创建到zk的客户端连接
- @Before
- public void getConnect() throws IOException {
- zkClient = new ZooKeeper(connetString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //收到事件通知后的回调函数
- System.out.println("事件类型:" + watchedEvent.getType());
- System.out.println("事件路径:" + watchedEvent.getPath());
- // 再次启动监听
- try {
- getServerList();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- //获取服务器列表信息
- public void getServerList() throws InterruptedException, KeeperException {
- //获取服务器子节点信息,并对父节点进行监听
- List
children = zkClient.getChildren(parentNode, true); -
- //存储服务器信息列表
- ArrayList
servers = new ArrayList<>(); -
- //遍历所有节点,获取主机名称信息
- for (String child : children)
- {
- byte[] data = zkClient.getData(parentNode + "/" + child, false, null);
- servers.add(new String(data));
- }
-
- //打印服务器列表信息
- System.out.println(servers);
- }
-
- // 业务功能
- public void business() throws Exception{
- System.out.println("client is working ...");
- Thread.sleep(Long.MAX_VALUE);
- }
-
- public static void main(String[] args) throws Exception {
- // 1 获取 zk 连接
- DistributeClient client = new DistributeClient();
-
- client.getConnect();
- // 2 获取 servers 的子节点信息,从中获取服务器信息列表
- client.getServerList();
- // 3 业务进程启动
- client.business();
- }
- }
启动DistributeClient客户端
在zk的命令行中新建节点:create -e -s /servers/hadoop103 "hadoop103"
在idea的控制台可以看到:
删除hadoop103:delete /servers/hadoop1030000000001
可以看到:
启动 DistributeClient 客户端
启动 DistributeServer 服务:
添加参数:
然后启动;
可以看到:
同时client也可以监听到服务器上线通知: