• ZooKeeper(续)


    目录

    3.3 客户端 API 操作

    3.3.1 IDEA 环境搭建

    3.3.2 创建 ZooKeeper 客户端

    3.3.3 创建子节点

    3.3.4 获取子节点并监听节点变化

    3.3.5 判断 Znode 是否存在

    3.4 客户端向服务端写数据流程

    四:服务器动态上下线监听案例  

    4.1 需求

    4.2 需求分析

    4.3 具体实现  

    4.4 测试

    五:ZooKeeper 分布式锁案例

    5.1 原生 Zookeeper 实现分布式锁案例  

    5.2 Curator 框架实现分布式锁案例

    六:企业面试真题(面试重点)

    6.1 选举机制

    6.2 生产集群安装多少 zk 合适?

    6.3 常用命令


    3.3 客户端 API 操作

    前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。

    3.3.1 IDEA 环境搭建

    1)创建一个工程:zookeeper

    2)添加pom文件

    1. <dependencies>
    2. <dependency>
    3. <groupId>junit</groupId>
    4. <artifactId>junit</artifactId>
    5. <version>RELEASE</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.apache.logging.log4j</groupId>
    9. <artifactId>log4j-core</artifactId>
    10. <version>2.8.2</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>org.apache.zookeeper</groupId>
    14. <artifactId>zookeeper</artifactId>
    15. <version>3.5.7</version>
    16. </dependency>
    17. </dependencies>

    3)拷贝log4j.properties文件到项目根目录

    需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。

    1. log4j.rootLogger=INFO, stdout
    2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    5. log4j.appender.logfile=org.apache.log4j.FileAppender
    6. log4j.appender.logfile.File=target/spring.log
    7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    4)创建包名com.atguigu.zk

    5)创建类名称zkClient

    3.3.2 创建 ZooKeeper 客户端

    1. // 注意:逗号前后不能有空格
    2. private static String connectString =
    3. "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    4. private static int sessionTimeout = 2000;
    5. private ZooKeeper zkClient = null;
    6. @Before
    7. public void init() throws Exception {
    8. zkClient = new ZooKeeper(connectString, sessionTimeout, new
    9. Watcher() {
    10. @Override
    11. public void process(WatchedEvent watchedEvent) {
    12. // 收到事件通知后的回调函数(用户的业务逻辑)
    13. System.out.println(watchedEvent.getType() + "--"
    14. + watchedEvent.getPath());
    15. // 再次启动监听
    16. try {
    17. List children = zkClient.getChildren("/",
    18. true);
    19. for (String child : children) {
    20. System.out.println(child);
    21. }
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. }
    26. });
    27. }
    28. }

    3.3.3 创建子节点

    1. // 创建子节点
    2. @Test
    3. public void create() throws Exception {
    4. // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;
    5. 参数 4:节点的类型
    6. String nodeCreated = zkClient.create("/atguigu",
    7. "shuaige".getBytes(), Ids.OPEN_ACL_UNSAFE,
    8. CreateMode.PERSISTENT);
    9. }

    测试:在 hadoop102 的 zk 客户端上查看创建节点情况

    1. [zk: localhost:2181(CONNECTED) 16] get -s /atguigu
    2. shuaige

    3.3.4 获取子节点并监听节点变化

    1. // 获取子节点
    2. @Test
    3. public void getChildren() throws Exception {
    4. List<String> children = zkClient.getChildren("/", true);
    5. for (String child : children) {
    6. System.out.println(child);
    7. }
    8. // 延时阻塞
    9. Thread.sleep(Long.MAX_VALUE);
    10. }

    (1)在 IDEA 控制台上看到如下节点:

    1. zookeeper
    2. sanguo
    3. atguigu

    (2)在 hadoop102 的客户端上创建再创建一个节点/atguigu1,观察 IDEA 控制台

    [zk: localhost:2181(CONNECTED) 3] create /atguigu1 "atguigu1" 
    

    (3)在 hadoop102 的客户端上删除节点/atguigu1,观察 IDEA 控制台

    [zk: localhost:2181(CONNECTED) 4] delete /atguigu1 

    3.3.5 判断 Znode 是否存在

    1. // 判断 znode 是否存在
    2. @Test
    3. public void exist() throws Exception {
    4. Stat stat = zkClient.exists("/atguigu", false);
    5. System.out.println(stat == null ? "not exist" : "exist");
    6. }

    3.4 客户端向服务端写数据流程

    1.写流程之写入请求直接发送给Leader节点

    2.写流程之写入请求发送给follower节点  

    四:服务器动态上下线监听案例  

    4.1 需求

    分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

    4.2 需求分析

    服务器动态上下线

    4.3 具体实现  

    1)先在集群上创建/servers 节点

    1. [zk: localhost:2181(CONNECTED) 10] create /servers "servers"
    2. Created /servers

    (2)在 Idea 中创建包名:com.atguigu.zkcase1

    (3)服务器端向 Zookeeper 注册代码

    1. package com.atguigu.case1;
    2. import java.io.IOException;
    3. import org.apache.zookeeper.CreateMode;
    4. import org.apache.zookeeper.WatchedEvent;
    5. import org.apache.zookeeper.Watcher;
    6. import org.apache.zookeeper.ZooKeeper;
    7. import org.apache.zookeeper.ZooDefs.Ids;
    8. public class DistributeServer {
    9. private static String connectString =
    10. "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    11. private static int sessionTimeout = 2000;
    12. private ZooKeeper zk = null;
    13. private String parentNode = "/servers";
    14. // 创建到 zk 的客户端连接
    15. public void getConnect() throws IOException{
    16. zk = new ZooKeeper(connectString, sessionTimeout, new
    17. Watcher() {
    18. @Override
    19. public void process(WatchedEvent event) {
    20. }
    21. });
    22. }
    23. // 注册服务器
    24. public void registServer(String hostname) throws Exception{
    25. String create = zk.create(parentNode + "/server",
    26. hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
    27. CreateMode.EPHEMERAL_SEQUENTIAL);
    28. System.out.println(hostname +" is online "+ create);
    29. }
    30. // 业务功能
    31. public void business(String hostname) throws Exception{
    32. System.out.println(hostname + " is working ...");
    33. Thread.sleep(Long.MAX_VALUE);
    34. }
    35. public static void main(String[] args) throws Exception {
    36. // 1 获取 zk 连接
    37. DistributeServer server = new DistributeServer();
    38. server.getConnect();
    39. // 2 利用 zk 连接注册服务器信息
    40. server.registServer(args[0]);
    41. // 3 启动业务功能
    42. server.business(args[0]);
    43. }
    44. }

    (3)客户端代码

    1. package com.atguigu.case1;
    2. import java.io.IOException;
    3. import java.util.ArrayList;
    4. import java.util.List;
    5. import org.apache.zookeeper.WatchedEvent;
    6. import org.apache.zookeeper.Watcher;
    7. import org.apache.zookeeper.ZooKeeper;
    8. public class DistributeClient {
    9. private static String connectString =
    10. "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    11. private static int sessionTimeout = 2000;
    12. private ZooKeeper zk = null;
    13. private String parentNode = "/servers";
    14. // 创建到 zk 的客户端连接
    15. public void getConnect() throws IOException {
    16. zk = new ZooKeeper(connectString, sessionTimeout, new
    17. Watcher() {
    18. @Override
    19. public void process(WatchedEvent event) {
    20. // 再次启动监听
    21. try {
    22. getServerList();
    23. } catch (Exception e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. });
    28. }
    29. // 获取服务器列表信息
    30. public void getServerList() throws Exception {
    31. // 1 获取服务器子节点信息,并且对父节点进行监听
    32. List children = zk.getChildren(parentNode, true);
    33. // 2 存储服务器信息列表
    34. ArrayList servers = new ArrayList<>();
    35. // 3 遍历所有节点,获取节点中的主机名称信息
    36. for (String child : children) {
    37. byte[] data = zk.getData(parentNode + "/" + child,
    38. false, null);
    39. servers.add(new String(data));
    40. }
    41. // 4 打印服务器列表信息
    42. System.out.println(servers);
    43. }
    44. // 业务功能
    45. public void business() throws Exception{
    46. System.out.println("client is working ...");
    47. Thread.sleep(Long.MAX_VALUE);
    48. }
    49. public static void main(String[] args) throws Exception {
    50. // 1 获取 zk 连接
    51. DistributeClient client = new DistributeClient();
    52. client.getConnect();
    53. // 2 获取 servers 的子节点信息,从中获取服务器信息列表
    54. client.getServerList();
    55. // 3 业务进程启动
    56. client.business();
    57. }
    58. }

    4.4 测试

    1)在 Linux 命令行上操作增加减少服务器

    (1)启动 DistributeClient 客户端

    (2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点

    1. [zk: localhost:2181(CONNECTED) 1] create -e -s
    2. /servers/hadoop102 "hadoop102"
    3. [zk: localhost:2181(CONNECTED) 2] create -e -s
    4. /servers/hadoop103 "hadoop103"

    (3)观察 Idea 控制台变化

    [hadoop102, hadoop103] 

    (4)执行删除操作

    1. [zk: localhost:2181(CONNECTED) 8] delete
    2. /servers/hadoop1020000000000

    (5)观察 Idea 控制台变化

    [hadoop103]

    2)在 Idea 上操作增加减少服务器

    (1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)

    (2)启动 DistributeServer 服务

    ①点击 Edit Configurations…

    ②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102

     

    ③回到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run “DistributeServer.main()”

     

    ④观察 DistributeServer 控制台,提示 hadoop102 is working

    ⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线  

    五:ZooKeeper 分布式锁案例

    什么叫做分布式锁呢?

    比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

    5.1 原生 Zookeeper 实现分布式锁案例  

    1)分布式锁实现

    1. package com.atguigu.case2;
    2. import org.apache.zookeeper.*;
    3. import org.apache.zookeeper.data.Stat;
    4. import java.io.IOException;
    5. import java.util.Collections;
    6. import java.util.List;
    7. import java.util.concurrent.CountDownLatch;
    8. public class DistributedLock {
    9. // zookeeper server 列表
    10. private String connectString =
    11. "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    12. // 超时时间
    13. private int sessionTimeout = 2000;
    14. private ZooKeeper zk;
    15. private String rootNode = "locks";
    16. private String subNode = "seq-";
    17. // 当前 client 等待的子节点
    18. private String waitPath;
    19. //ZooKeeper 连接
    20. private CountDownLatch connectLatch = new CountDownLatch(1);
    21. //ZooKeeper 节点等待
    22. private CountDownLatch waitLatch = new CountDownLatch(1);
    23. // 当前 client 创建的子节点
    24. private String currentNode;
    25. // 和 zk 服务建立连接,并创建根节点
    26. public DistributedLock() throws IOException,
    27. InterruptedException, KeeperException {
    28. zk = new ZooKeeper(connectString, sessionTimeout, new
    29. Watcher() {
    30. @Override
    31. public void process(WatchedEvent event) {
    32. // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
    33. if (event.getState() ==
    34. Event.KeeperState.SyncConnected) {
    35. connectLatch.countDown();
    36. }
    37. // 发生了 waitPath 的删除事件
    38. if (event.getType() ==
    39. Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
    40. waitLatch.countDown();
    41. }
    42. }
    43. });
    44. // 等待连接建立,等待zk连接成功后,才执行程序的下一段代码
    45. connectLatch.await();
    46. //获取根节点状态
    47. Stat stat = zk.exists("/" + rootNode, false);
    48. //如果根节点不存在,则创建根节点,根节点类型为永久节点
    49. if (stat == null) {
    50. System.out.println("根节点不存在");
    51. zk.create("/" + rootNode, new byte[0],
    52. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    53. }
    54. }
    55. // 加锁方法
    56. public void zkLock() {
    57. try {
    58. //在根节点下创建临时顺序节点,返回值为创建的节点路径
    59. currentNode = zk.create("/" + rootNode + "/" + subNode,
    60. null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
    61. CreateMode.EPHEMERAL_SEQUENTIAL);
    62. // wait 一小会, 让结果更清晰一些
    63. Thread.sleep(10);
    64. // 注意, 没有必要监听"/locks"的子节点的变化情况
    65. List<String> childrenNodes = zk.getChildren("/" +
    66. rootNode, false);
    67. // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
    68. if (childrenNodes.size() == 1) {
    69. return;
    70. } else {
    71. //对根节点下的所有临时顺序节点进行从小到大排序
    72. Collections.sort(childrenNodes);
    73. //当前节点名称
    74. String thisNode = currentNode.substring(("/" +
    75. rootNode + "/").length());
    76. //获取当前节点的位置
    77. int index = childrenNodes.indexOf(thisNode);
    78. if (index == -1) {
    79. System.out.println("数据异常");
    80. } else if (index == 0) {
    81. // index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
    82. return;
    83. } else {
    84. // 获得排名比 currentNode 前 1 位的节点
    85. this.waitPath = "/" + rootNode + "/" +
    86. childrenNodes.get(index - 1);
    87. // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
    88. zk.getData(waitPath, true, new Stat());
    89. //进入等待锁状态
    90. waitLatch.await();
    91. return;
    92. }
    93. }
    94. } catch (KeeperException e) {
    95. e.printStackTrace();
    96. } catch (InterruptedException e) {
    97. e.printStackTrace();
    98. }
    99. }
    100. // 解锁方法
    101. public void zkUnlock() {
    102. try {
    103. zk.delete(this.currentNode, -1);
    104. } catch (InterruptedException | KeeperException e) {
    105. e.printStackTrace();
    106. }
    107. }
    108. }

    2)分布式锁测试

    1. package com.atguigu.case2;
    2. import org.apache.zookeeper.KeeperException;
    3. import java.io.IOException;
    4. public class DistributedLockTest {
    5. public static void main(String[] args) throws
    6. InterruptedException, IOException, KeeperException {
    7. // 创建分布式锁 1
    8. final DistributedLock lock1 = new DistributedLock();
    9. // 创建分布式锁 2
    10. final DistributedLock lock2 = new DistributedLock();
    11. new Thread(new Runnable() {
    12. @Override
    13. public void run() {
    14. // 获取锁对象
    15. try {
    16. lock1.zkLock();
    17. System.out.println("线程 1 获取锁");
    18. Thread.sleep(5 * 1000);
    19. lock1.zkUnlock();
    20. System.out.println("线程 1 释放锁");
    21. } catch (Exception e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. }).start();
    26. new Thread(new Runnable() {
    27. @Override
    28. public void run() {
    29. // 获取锁对象
    30. try {
    31. lock2.zkLock();
    32. System.out.println("线程 2 获取锁");
    33. Thread.sleep(5 * 1000);
    34. lock2.zkUnlock();
    35. System.out.println("线程 2 释放锁");
    36. } catch (Exception e) {
    37. e.printStackTrace();
    38. }
    39. }
    40. }).start();
    41. }
    42. }

    (2)观察控制台变化:

    线程 1 获取锁

    线程 1 释放锁

    线程 2 获取锁

    线程 2 释放锁

    5.2 Curator 框架实现分布式锁案例

    1)原生的 Java API 开发存在的问题

    (1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

    (2)Watch 需要重复注册,不然就不能生效

    (3)开发的复杂性还是比较高的

    (4)不支持多节点删除和创建。需要自己去递归

    2)Curator 是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题。

    详情请查看官方文档:https://curator.apache.org/index.html

    3)Curator 案例实操

    (1)添加依赖

    1. <dependency>
    2. <groupId>org.apache.curator</groupId>
    3. <artifactId>curator-framework</artifactId>
    4. <version>4.3.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.curator</groupId>
    8. <artifactId>curator-recipes</artifactId>
    9. <version>4.3.0</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.apache.curator</groupId>
    13. <artifactId>curator-client</artifactId>
    14. <version>4.3.0</version>
    15. </dependency>

    (2)代码实现

    1. package com.atguigu.case3;
    2. import org.apache.curator.RetryPolicy;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.CuratorFrameworkFactory;
    5. import org.apache.curator.framework.recipes.locks.InterProcessLock;
    6. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    7. import org.apache.curator.retry.ExponentialBackoffRetry;
    8. public class CuratorLockTest {
    9. private String rootNode = "/locks";
    10. // zookeeper server 列表
    11. private String connectString =
    12. "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    13. // connection 超时时间
    14. private int connectionTimeout = 2000;
    15. // session 超时时间
    16. private int sessionTimeout = 2000;
    17. public static void main(String[] args) {
    18. new CuratorLockTest().test();
    19. }
    20. // 测试
    21. private void test() {
    22. // 创建分布式锁 1
    23. final InterProcessLock lock1 = new
    24. InterProcessMutex(getCuratorFramework(), rootNode);
    25. // 创建分布式锁 2
    26. final InterProcessLock lock2 = new
    27. InterProcessMutex(getCuratorFramework(), rootNode);
    28. new Thread(new Runnable() {
    29. @Override
    30. public void run() {
    31. // 获取锁对象
    32. try {
    33. lock1.acquire();
    34. System.out.println("线程 1 获取锁");
    35. // 测试锁重入
    36. lock1.acquire();
    37. System.out.println("线程 1 再次获取锁");
    38. Thread.sleep(5 * 1000);
    39. lock1.release();
    40. System.out.println("线程 1 释放锁");
    41. lock1.release();
    42. System.out.println("线程 1 再次释放锁");
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. }).start();
    48. new Thread(new Runnable() {
    49. @Override
    50. public void run() {
    51. // 获取锁对象
    52. try {
    53. lock2.acquire();
    54. System.out.println("线程 2 获取锁");
    55. // 测试锁重入
    56. lock2.acquire();
    57. System.out.println("线程 2 再次获取锁");
    58. Thread.sleep(5 * 1000);
    59. lock2.release();
    60. System.out.println("线程 2 释放锁");
    61. lock2.release();
    62. System.out.println("线程 2 再次释放锁");
    63. } catch (Exception e) {
    64. e.printStackTrace();
    65. }
    66. }
    67. }).start();
    68. }
    69. // 分布式锁初始化
    70. public CuratorFramework getCuratorFramework (){
    71. //重试策略,初试时间 3 秒,重试 3
    72. RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
    73. //通过工厂创建 Curator
    74. CuratorFramework client =
    75. CuratorFrameworkFactory.builder()
    76. .connectString(connectString)
    77. .connectionTimeoutMs(connectionTimeout)
    78. .sessionTimeoutMs(sessionTimeout)
    79. .retryPolicy(policy).build();
    80. //开启连接
    81. client.start();
    82. System.out.println("zookeeper 初始化完成...");
    83. return client;
    84. }
    85. }

    (2)观察控制台变化:

    线程 1 获取锁

    线程 1 再次获取锁

    线程 1 释放锁

    线程 1 再次释放锁

    线程 2 获取锁

    线程 2 再次获取锁

    线程 2 释放锁

    线程 2 再次释放锁

    六:企业面试真题(面试重点)

    6.1 选举机制

    半数机制,超过半数的投票通过,即通过。

    (1)第一次启动选举规则:

    投票过半数时,服务器 id (myid)大的胜出

    (2)第二次启动选举规则:

    ①EPOCH 大的直接胜出

    ②EPOCH 相同,事务 id 大的胜出

    ③事务 id 相同,服务器 id 大的胜出

    6.2 生产集群安装多少 zk 合适?

    安装奇数台。

    生产经验:

    ⚫ 10 台服务器:3 台 zk;

    ⚫ 20 台服务器:5 台 zk;

    ⚫ 100 台服务器:11 台 zk;

    ⚫ 200 台服务器:11 台 zk

    服务器台数多:好处,提高可靠性;

    坏处:提高通信延时

    6.3 常用命令

    ls、get、create、delete

  • 相关阅读:
    UNIAPP day_02(8.31) 条件编辑、数据绑定、生命周期方法
    分享 6 个 Vue3 开发必备的 VSCode 插件
    VM及WindowsServer安装
    Amazon EC2的出现,是时代的选择了它,还是它选择了时代
    elementui的el-dialog组件与el-tabs同时用导致浏览器卡死的原因解决
    Vue3,Vite,TypeScript,Monorepo,qiankun...... Buff叠满,BUG没有
    图数据挖掘:幂律分布和无标度网络
    数据结构与算法二:时间/空间复杂度(complexity)
    RSA非对称加密解密概念
    iptables 防火墙配置
  • 原文地址:https://blog.csdn.net/JiaXingNashishua/article/details/126722919