• Zookeeper学习一


    初识 Zookeeper

    Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务(B树)

    Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称zk

    Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务

    Zookeeper 提供的主要功能包括: 配置管理 分布式锁 集群管理

    Zookeeper 安装与配置

    1.1 下载安装

    1、环境准备

    ZooKeeper服务器是用Java创建的,它运行在JVM之上。需要安装JDK 7或更高版本。

    2、上传

    将下载的ZooKeeper放到/opt/ZooKeeper目录下

    1. #上传zookeeper alt+p
    2. put f:/setup/apache-zookeeper-3.5.6-bin.tar.gz
    3. #打开 opt目录
    4. cd /opt
    5. #创建zooKeeper目录
    6. mkdir zooKeeper
    7. #将zookeeper安装包移动到 /opt/zooKeeper
    8. mv apache-zookeeper-3.5.6-bin.tar.gz /opt/zookeeper/

    3、解压

    将tar包解压到/opt/zookeeper目录下

    tar -zxvf apache-ZooKeeper-3.5.6-bin.tar.gz 

    1.2 配置启动

    1、配置zoo.cfg

    进入到conf目录拷贝一个zoo_sample.cfg并完成配置

    1. #进入到conf目录
    2. cd /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/conf/
    3. #拷贝
    4. cp zoo_sample.cfg zoo.cfg

    修改zoo.cfg

    1. #打开目录
    2. cd /opt/zooKeeper/
    3. #创建zooKeeper存储目录
    4. mkdir zkdata
    5. #修改zoo.cfg
    6. vim /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/conf/zoo.cfg

    修改存储目录:dataDir=/opt/zookeeper/zkdata

    2、启动ZooKeeper

    1. cd /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/bin/
    2. #启动
    3. ./zkServer.sh  start

    看到上图表示ZooKeeper成功启动

    3、查看ZooKeeper状态

    ./zkServer.sh status

    zookeeper启动成功。standalone代表zk没有搭建集群,现在是单节点

    zookeeper没有启动

    Zookeeper 命令操作

    Zookeeper 数据模型

    ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

    这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。

     节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

    节点可以分为四大类:

    • PERSISTENT 持久化节点
    • EPHEMERAL 临时节点 :-e
    • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
    • EPHEMERAL_SEQUENTIAL 临时顺序节点  :-es

    Zookeeper服务端常用命令

    • 启动 ZooKeeper 服务: ./zkServer.sh start
    • 查看 ZooKeeper 服务状态: ./zkServer.sh status
    • 停止 ZooKeeper 服务: ./zkServer.sh stop
    • 重启 ZooKeeper 服务: ./zkServer.sh restart

    Zookeeper客户端常见命令

    连接ZooKeeper服务器:./zkCli.sh –server ip:port

    断开连接:quit

    设置节点值:set /节点path value

    查看命令帮助:help

    删除单个节点:delete /节点path

    显示指定目录下节点:ls 目录

    删除带有子节点的节点:deleteall /节点path

    创建节点:create /节点path value

    获取节点值:get /节点path

    Zookeeper JavaAPI 操作

    建立连接

    建立连接有两种方式,一种是调用工厂对象的newClient()方法,另一种就是调用工厂对象的builder(),通过链式调用的方法就连接信息传入工厂中。

    下面是代码示例:

    1. /**
    2. * 建立连接
    3. */
    4. @Before
    5. public void testConnect() {
    6. /*
    7. *
    8. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
    9. * @param sessionTimeoutMs 会话超时时间 单位ms
    10. * @param connectionTimeoutMs 连接超时时间 单位ms
    11. * @param retryPolicy 重试策略
    12. */
    13. /* //重试策略
    14. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
    15. //1.第一种方式
    16. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
    17. 60 * 1000, 15 * 1000, retryPolicy);*/
    18. //重试策略
    19. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    20. //2.第二种方式
    21. //CuratorFrameworkFactory.builder();
    22. client = CuratorFrameworkFactory.builder()
    23. .connectString("192.168.149.135:2181")
    24. .sessionTimeoutMs(60 * 1000)
    25. .connectionTimeoutMs(15 * 1000)
    26. .retryPolicy(retryPolicy)
    27. //命名空间,使创建的节点都在命名空间的路径下面
    28. .namespace("kjz")
    29. .build();
    30. //开启连接
    31. client.start();
    32. }

    注意此方法需要加上@Before注解,表示其他测试方法执行前需要先执行加了@Before注解的方法。因为每次进行crud操作时都需要与ZooKeeper Server建立连接。

    建立了连接,操作完毕后同时需要释放连接,在对应方法上面加一个@After注解,表示每次进行测试最后都要执行该方法。

    1. @After
    2. public void close() {
    3. if (client != null) {
    4. client.close();
    5. }
    6. }

    添加节点

    代码示例如下:

    1. /**
    2. * 创建节点:create 持久 临时 顺序 数据
    3. * 1. 基本创建 :create().forPath("")
    4. * 2. 创建节点 带有数据:create().forPath("",data)
    5. * 3. 设置节点的类型:create().withMode().forPath("",data)
    6. * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
    7. */
    8. @Test
    9. public void testCreate() throws Exception {
    10. //2. 创建节点 带有数据
    11. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    12. String path = client.create().forPath("/app2", "hehe".getBytes());
    13. System.out.println(path);
    14. }
    15. @Test
    16. public void testCreate2() throws Exception {
    17. //1. 基本创建
    18. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    19. String path = client.create().forPath("/app1");
    20. System.out.println(path);
    21. }
    22. @Test
    23. public void testCreate3() throws Exception {
    24. //3. 设置节点的类型
    25. //默认类型:持久化
    26. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    27. System.out.println(path);
    28. }
    29. @Test
    30. public void testCreate4() throws Exception {
    31. //4. 创建多级节点 /app1/p1
    32. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
    33. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
    34. System.out.println(path);
    35. }

    Curator中提供了一个枚举类,里面定义了设置不同类型节点的常量

    运行testCreate3()时我发现ZooKeeper中并没有保存我创建的节点,原因是在创建节点时指定的节点类型为临时节点,临时节点在会话结束后就会删除。我通过运行testCreate3()这个方法来创建一个节点,运行结束,说明会话也结束了,ZooKeeper就会把节点删除了。

    删除节点

    删除节点: delete deleteall

    删除单个节点:delete().forPath("/app1");

    删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");

    必须成功的删除:为了防止网络抖动。本质就是重试。client.delete().guaranteed().forPath("/app2");

    回调:inBackground;

    代码示例:

    1. /**
    2. * 删除节点: delete deleteall
    3. * 1. 删除单个节点:delete().forPath("/app1");
    4. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
    5. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
    6. * 4. 回调:inBackground
    7. * @throws Exception
    8. */
    9. @Test
    10. public void testDelete() throws Exception {
    11. // 1. 删除单个节点
    12. client.delete().forPath("/app1");
    13. }
    14. @Test
    15. public void testDelete2() throws Exception {
    16. //2. 删除带有子节点的节点
    17. client.delete().deletingChildrenIfNeeded().forPath("/app4");
    18. }
    19. @Test
    20. public void testDelete3() throws Exception {
    21. //3. 必须成功的删除
    22. client.delete().guaranteed().forPath("/app2");
    23. }
    24. @Test
    25. public void testDelete4() throws Exception {
    26. //4. 回调
    27. client.delete().guaranteed().inBackground(new BackgroundCallback(){
    28. @Override
    29. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
    30. System.out.println("我被删除了~");
    31. System.out.println(event);
    32. }
    33. }).forPath("/app1");
    34. }

    修改节点

    基本修改数据 setData().forPath()

    根据版本修改数据 setData().withVersion().forPath()

    version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。

    代码示例:

    1. /**
    2. * 修改数据
    3. * 1. 基本修改数据:setData().forPath()
    4. * 2. 根据版本修改: setData().withVersion().forPath()
    5. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
    6. *
    7. * @throws Exception
    8. */
    9. @Test
    10. public void testSet() throws Exception {
    11. client.setData().forPath("/app1", "itcast".getBytes());
    12. }
    13. @Test
    14. public void testSetForVersion() throws Exception {
    15. Stat status = new Stat();
    16. //3. 查询节点状态信息:ls -s
    17. client.getData().storingStatIn(status).forPath("/app1");
    18. int version = status.getVersion();//查询出来的 3
    19. System.out.println(version);
    20. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
    21. }

    查询节点

    查询数据:get: getData().forPath()

    查询子节点: ls: getChildren().forPath()

    查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath() 

    代码示例:

    1. @Test
    2. public void testGet1() throws Exception {
    3. //1. 查询数据:get
    4. byte[] data = client.getData().forPath("/app1");
    5. System.out.println(new String(data));
    6. }
    7. @Test
    8. public void testGet2() throws Exception {
    9. // 2. 查询子节点: ls
    10. List path = client.getChildren().forPath("/");
    11. System.out.println(path);
    12. }
    13. @Test
    14. public void testGet3() throws Exception {
    15. Stat status = new Stat();
    16. System.out.println(status);
    17. //3. 查询节点状态信息:ls -s
    18. client.getData().storingStatIn(status).forPath("/app1");
    19. System.out.println(status);
    20. }

    Stat里面封装了节点的状态信息

    完整代码如下:

    1. package com.kjz.curator;
    2. import org.apache.curator.RetryPolicy;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.CuratorFrameworkFactory;
    5. import org.apache.curator.framework.api.BackgroundCallback;
    6. import org.apache.curator.framework.api.CuratorEvent;
    7. import org.apache.curator.retry.ExponentialBackoffRetry;
    8. import org.apache.zookeeper.CreateMode;
    9. import org.apache.zookeeper.data.Stat;
    10. import org.junit.After;
    11. import org.junit.Before;
    12. import org.junit.Test;
    13. import java.util.List;
    14. public class CuratorTest {
    15. private CuratorFramework client;
    16. /**
    17. * 建立连接
    18. */
    19. @Before
    20. public void testConnect() {
    21. /*
    22. *
    23. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
    24. * @param sessionTimeoutMs 会话超时时间 单位ms
    25. * @param connectionTimeoutMs 连接超时时间 单位ms
    26. * @param retryPolicy 重试策略
    27. */
    28. /* //重试策略
    29. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
    30. //1.第一种方式
    31. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
    32. 60 * 1000, 15 * 1000, retryPolicy);*/
    33. //重试策略
    34. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    35. //2.第二种方式
    36. //CuratorFrameworkFactory.builder();
    37. client = CuratorFrameworkFactory.builder()
    38. .connectString("192.168.149.135:2181")
    39. .sessionTimeoutMs(60 * 1000)
    40. .connectionTimeoutMs(15 * 1000)
    41. .retryPolicy(retryPolicy)
    42. .namespace("kjz")
    43. .build();
    44. //开启连接
    45. client.start();
    46. }
    47. //==============================create=============================================================================
    48. /**
    49. * 创建节点:create 持久 临时 顺序 数据
    50. * 1. 基本创建 :create().forPath("")
    51. * 2. 创建节点 带有数据:create().forPath("",data)
    52. * 3. 设置节点的类型:create().withMode().forPath("",data)
    53. * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
    54. */
    55. @Test
    56. public void testCreate() throws Exception {
    57. //2. 创建节点 带有数据
    58. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    59. String path = client.create().forPath("/app2", "hehe".getBytes());
    60. System.out.println(path);
    61. }
    62. @Test
    63. public void testCreate2() throws Exception {
    64. //1. 基本创建
    65. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    66. String path = client.create().forPath("/app1");
    67. System.out.println(path);
    68. }
    69. @Test
    70. public void testCreate3() throws Exception {
    71. //3. 设置节点的类型
    72. //默认类型:持久化
    73. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    74. System.out.println(path);
    75. }
    76. @Test
    77. public void testCreate4() throws Exception {
    78. //4. 创建多级节点 /app1/p1
    79. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
    80. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
    81. System.out.println(path);
    82. }
    83. //===========================get================================================================================
    84. /**
    85. * 查询节点:
    86. * 1. 查询数据:get: getData().forPath()
    87. * 2. 查询子节点: ls: getChildren().forPath()
    88. * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
    89. */
    90. @Test
    91. public void testGet1() throws Exception {
    92. //1. 查询数据:get
    93. byte[] data = client.getData().forPath("/app1");
    94. System.out.println(new String(data));
    95. }
    96. @Test
    97. public void testGet2() throws Exception {
    98. // 2. 查询子节点: ls
    99. List path = client.getChildren().forPath("/");
    100. System.out.println(path);
    101. }
    102. @Test
    103. public void testGet3() throws Exception {
    104. Stat status = new Stat();
    105. System.out.println(status);
    106. //3. 查询节点状态信息:ls -s
    107. client.getData().storingStatIn(status).forPath("/app1");
    108. System.out.println(status);
    109. }
    110. //===========================set================================================================================
    111. /**
    112. * 修改数据
    113. * 1. 基本修改数据:setData().forPath()
    114. * 2. 根据版本修改: setData().withVersion().forPath()
    115. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
    116. *
    117. * @throws Exception
    118. */
    119. @Test
    120. public void testSet() throws Exception {
    121. client.setData().forPath("/app1", "kjz".getBytes());
    122. }
    123. @Test
    124. public void testSetForVersion() throws Exception {
    125. Stat status = new Stat();
    126. //3. 查询节点状态信息:ls -s
    127. client.getData().storingStatIn(status).forPath("/app1");
    128. int version = status.getVersion();//查询出来的 3
    129. System.out.println(version);
    130. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
    131. }
    132. //===========================delete================================================================================
    133. /**
    134. * 删除节点: delete deleteall
    135. * 1. 删除单个节点:delete().forPath("/app1");
    136. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
    137. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
    138. * 4. 回调:inBackground
    139. * @throws Exception
    140. */
    141. @Test
    142. public void testDelete() throws Exception {
    143. // 1. 删除单个节点
    144. client.delete().forPath("/app1");
    145. }
    146. @Test
    147. public void testDelete2() throws Exception {
    148. //2. 删除带有子节点的节点
    149. client.delete().deletingChildrenIfNeeded().forPath("/app4");
    150. }
    151. @Test
    152. public void testDelete3() throws Exception {
    153. //3. 必须成功的删除
    154. client.delete().guaranteed().forPath("/app2");
    155. }
    156. @Test
    157. public void testDelete4() throws Exception {
    158. //4. 回调
    159. client.delete().guaranteed().inBackground(new BackgroundCallback(){
    160. @Override
    161. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
    162. System.out.println("我被删除了~");
    163. System.out.println(event);
    164. }
    165. }).forPath("/app1");
    166. }
    167. @After
    168. public void close() {
    169. if (client != null) {
    170. client.close();
    171. }
    172. }
    173. }

    Watch事件监听

    • ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
    • ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
    • ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便     需要开发人员自己反复注册Watcher,比较繁琐。
    • Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

    ZooKeeper提供了三种Watcher:

    •         NodeCache : 只是监听某一个特定的节点
    •          PathChildrenCache : 监控一个ZNode的子节点.
    •          TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache             的组合

    演示 NodeCache:给指定一个节点注册监听器

    1. /**
    2. * 演示 NodeCache:给指定一个节点注册监听器
    3. */
    4. @Test
    5. public void testNodeCache() throws Exception {
    6. //1. 创建NodeCache对象
    7. final NodeCache nodeCache = new NodeCache(client,"/app1");
    8. //2. 注册监听
    9. nodeCache.getListenable().addListener(new NodeCacheListener() {
    10. @Override
    11. public void nodeChanged() throws Exception {
    12. System.out.println("节点变化了~");
    13. //获取修改节点后的数据
    14. byte[] data = nodeCache.getCurrentData().getData();
    15. System.out.println(new String(data));
    16. }
    17. });
    18. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
    19. nodeCache.start(true);
    20. while (true){
    21. }
    22. }

    演示 PathChildrenCache:监听某个节点的所有子节点们

    1. /**
    2. * 演示 PathChildrenCache:监听某个节点的所有子节点们
    3. */
    4. @Test
    5. public void testPathChildrenCache() throws Exception {
    6. //1.创建监听对象
    7. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
    8. //2. 绑定监听器
    9. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    10. @Override
    11. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    12. System.out.println("子节点变化了~");
    13. System.out.println(event);
    14. //监听子节点的数据变更,并且拿到变更后的数据
    15. //1.获取类型
    16. PathChildrenCacheEvent.Type type = event.getType();
    17. //2.判断类型是否是update
    18. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    19. System.out.println("数据变了!!!");
    20. byte[] data = event.getData().getData();
    21. System.out.println(new String(data));
    22. }
    23. }
    24. });
    25. //3. 开启
    26. pathChildrenCache.start();
    27. while (true){
    28. }
    29. }

    数据变更类型的枚举类。

    演示 TreeCache:监听某个节点自己和所有子节点们,相当于PathChildrenCache和NodeCache的组合

    1. /**
    2. * 演示 NodeCache:给指定一个节点注册监听器
    3. */
    4. @Test
    5. public void testNodeCache() throws Exception {
    6. //1. 创建NodeCache对象
    7. final NodeCache nodeCache = new NodeCache(client,"/app1");
    8. //2. 注册监听
    9. nodeCache.getListenable().addListener(new NodeCacheListener() {
    10. @Override
    11. public void nodeChanged() throws Exception {
    12. System.out.println("节点变化了~");
    13. //获取修改节点后的数据
    14. byte[] data = nodeCache.getCurrentData().getData();
    15. System.out.println(new String(data));
    16. }
    17. });
    18. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
    19. nodeCache.start(true);
    20. while (true){
    21. }
    22. }

    完整代码如下:

    1. package com.kjz.curator;
    2. import org.apache.curator.RetryPolicy;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.CuratorFrameworkFactory;
    5. import org.apache.curator.framework.api.BackgroundCallback;
    6. import org.apache.curator.framework.api.CuratorEvent;
    7. import org.apache.curator.framework.recipes.cache.*;
    8. import org.apache.curator.retry.ExponentialBackoffRetry;
    9. import org.apache.zookeeper.CreateMode;
    10. import org.apache.zookeeper.data.Stat;
    11. import org.junit.After;
    12. import org.junit.Before;
    13. import org.junit.Test;
    14. import java.util.List;
    15. public class CuratorWatcherTest {
    16. private CuratorFramework client;
    17. /**
    18. * 建立连接
    19. */
    20. @Before
    21. public void testConnect() {
    22. /*
    23. *
    24. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
    25. * @param sessionTimeoutMs 会话超时时间 单位ms
    26. * @param connectionTimeoutMs 连接超时时间 单位ms
    27. * @param retryPolicy 重试策略
    28. */
    29. /* //重试策略
    30. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
    31. //1.第一种方式
    32. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
    33. 60 * 1000, 15 * 1000, retryPolicy);*/
    34. //重试策略
    35. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    36. //2.第二种方式
    37. //CuratorFrameworkFactory.builder();
    38. client = CuratorFrameworkFactory.builder()
    39. .connectString("192.168.149.135:2181")
    40. .sessionTimeoutMs(60 * 1000)
    41. .connectionTimeoutMs(15 * 1000)
    42. .retryPolicy(retryPolicy)
    43. .namespace("kjz")
    44. .build();
    45. //开启连接
    46. client.start();
    47. }
    48. @After
    49. public void close() {
    50. if (client != null) {
    51. client.close();
    52. }
    53. }
    54. /**
    55. * 演示 NodeCache:给指定一个节点注册监听器
    56. */
    57. @Test
    58. public void testNodeCache() throws Exception {
    59. //1. 创建NodeCache对象
    60. final NodeCache nodeCache = new NodeCache(client,"/app1");
    61. //2. 注册监听
    62. nodeCache.getListenable().addListener(new NodeCacheListener() {
    63. @Override
    64. public void nodeChanged() throws Exception {
    65. System.out.println("节点变化了~");
    66. //获取修改节点后的数据
    67. byte[] data = nodeCache.getCurrentData().getData();
    68. System.out.println(new String(data));
    69. }
    70. });
    71. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
    72. nodeCache.start(true);
    73. while (true){
    74. }
    75. }
    76. /**
    77. * 演示 PathChildrenCache:监听某个节点的所有子节点们
    78. */
    79. @Test
    80. public void testPathChildrenCache() throws Exception {
    81. //1.创建监听对象
    82. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
    83. //2. 绑定监听器
    84. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    85. @Override
    86. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    87. System.out.println("子节点变化了~");
    88. System.out.println(event);
    89. //监听子节点的数据变更,并且拿到变更后的数据
    90. //1.获取类型
    91. PathChildrenCacheEvent.Type type = event.getType();
    92. //2.判断类型是否是update
    93. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    94. System.out.println("数据变了!!!");
    95. byte[] data = event.getData().getData();
    96. System.out.println(new String(data));
    97. }
    98. }
    99. });
    100. //3. 开启
    101. pathChildrenCache.start();
    102. while (true){
    103. }
    104. }
    105. /**
    106. * 演示 TreeCache:监听某个节点自己和所有子节点们
    107. */
    108. @Test
    109. public void testTreeCache() throws Exception {
    110. //1. 创建监听器
    111. TreeCache treeCache = new TreeCache(client,"/app2");
    112. //2. 注册监听
    113. treeCache.getListenable().addListener(new TreeCacheListener() {
    114. @Override
    115. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
    116. System.out.println("节点变化了");
    117. System.out.println(event);
    118. }
    119. });
    120. //3. 开启
    121. treeCache.start();
    122. while (true){
    123. }
    124. }
    125. }

    分布式锁实现

    在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。 但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。 那么就需要一种更加高级的锁机制,来处理种 跨机器的进程之间的数据同步问题——这就是分布式锁。
    分布式锁常见的实现方式:
    下面介绍ZooKeeper的实现方式:

    Zookeeper分布式锁原理

    核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

    1. 客户端获取锁时,在lock节点下创建临时顺序节点。临时:防止获取到锁的服务宕机了导致锁无法释放,临时节点会在会话结束后自动删除。顺序:找到最小节点,创建最小节点的服务获取到锁。
    2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
    3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
    4. 如果发现比自己小的那个节点被删除,则客户端的 Watcher会收到相应通知,此时再次判断自己创建的节点     是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点     并注册监听。

    Curator实现分布式锁API

    在Curator中有五种锁方案:

    • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
    • InterProcessMutex:分布式可重入排它锁
    • InterProcessReadWriteLock:分布式读写锁
    • InterProcessMultiLock:将多个锁作为单个实体管理的容器
    • InterProcessSemaphoreV2:共享信号量

    案例:模拟12306售票

    模拟12306服务

    1. package com.kjz.curator;
    2. import org.apache.curator.RetryPolicy;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.CuratorFrameworkFactory;
    5. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    6. import org.apache.curator.retry.ExponentialBackoffRetry;
    7. import java.util.concurrent.TimeUnit;
    8. public class Ticket12306 implements Runnable{
    9. private int tickets = 10;//数据库的票数
    10. private InterProcessMutex lock ;
    11. public Ticket12306(){
    12. //重试策略
    13. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    14. //2.第二种方式
    15. //CuratorFrameworkFactory.builder();
    16. CuratorFramework client = CuratorFrameworkFactory.builder()
    17. .connectString("192.168.149.135:2181")
    18. .sessionTimeoutMs(60 * 1000)
    19. .connectionTimeoutMs(15 * 1000)
    20. .retryPolicy(retryPolicy)
    21. .build();
    22. //开启连接
    23. client.start();
    24. lock = new InterProcessMutex(client,"/lock");
    25. }
    26. @Override
    27. public void run() {
    28. while(true){
    29. //获取锁
    30. try {
    31. lock.acquire(3, TimeUnit.SECONDS);
    32. if(tickets > 0){
    33. System.out.println(Thread.currentThread()+":"+tickets);
    34. Thread.sleep(100);
    35. tickets--;
    36. }
    37. } catch (Exception e) {
    38. e.printStackTrace();
    39. }finally {
    40. //释放锁
    41. try {
    42. lock.release();
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. }
    48. }
    49. }

    模拟买票

    1. package com.kjz.curator;
    2. import org.apache.curator.RetryPolicy;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.CuratorFrameworkFactory;
    5. import org.apache.curator.framework.recipes.cache.*;
    6. import org.apache.curator.retry.ExponentialBackoffRetry;
    7. import org.junit.After;
    8. import org.junit.Before;
    9. import org.junit.Test;
    10. public class LockTest {
    11. public static void main(String[] args) {
    12. Ticket12306 ticket12306 = new Ticket12306();
    13. //创建客户端
    14. Thread t1 = new Thread(ticket12306,"携程");
    15. Thread t2 = new Thread(ticket12306,"飞猪");
    16. t1.start();
    17. t2.start();
    18. }
    19. }
  • 相关阅读:
    No6-5.从零搭建spring-cloud-alibaba微服务框架,添加用户鉴权逻辑,操作权限等(五,no6-5)
    依赖注入的正确打开方式 bilibili/kratos × google/wire
    高校教务系统登录页面JS分析——巢湖学院
    加密大崩盘,Web3游戏到底还有没有未来?5篇论文深度探讨
    计算机操作系统学习(六)设备管理
    Apache HttpClient 5 使用详细教程
    「PCB智能生产」MES系统在设备管理中的应用
    字节5年测试工程师经验分享,软件测试真的很简单么?
    tableau高级绘图(十)-Tableau绘制圆形图
    英语学术论文简短语句摘抄
  • 原文地址:https://blog.csdn.net/m0_74229735/article/details/137245523