目录
(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客
1.在bin目录在执行这个工具(连接server服务端口)开始客户端
我们对于Zookeeper的服务端事件监听也是用的Curator来实现的——>引入了Cache
介绍
简介:Zookeeper是一个分布式开源的协调服务(感觉有点像nacos)
功能:配置管理,分布式锁,集群管理
配置管理:比如三个服务节点都用同一个配置文件,一个个去配置就很麻烦,我们直接在配置中心配置,abc节点连接即可(Nacos一站式解决)
(51条消息) Nacos注册中心_Fairy要carry的博客-CSDN博客_nacos注册中心
(51条消息) nacos实战项目中的配置_Fairy要carry的博客-CSDN博客_nacos配置有哪些
(51条消息) 面试-SpringCloud常见组件和注册表结构+nacos_Fairy要carry的博客-CSDN博客
分布式锁:
主要是解决数据一致性,比如我们的seata就是解决分布式情况下数据一致性问题
(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存(51条消息) Seata的几种事务模式_Fairy要carry的博客-CSDN博客_seata 模式(51条消息) 缓存同步-Canal_Fairy要carry的博客-CSDN博客_canal同步缓存
当多个节点对数据进行更改,你加synchornized这种锁只是在这个当前节点修改,比较JVM不一样了,这时候就要引入我们的分布式锁去保证数据的一致性,话说seata还跟这个有点不一样,它主要是应用在数据库,分布式下,各个服务数据库不一样,要保证数据一致性(引入了XA,AT,TCC等模式)
集群管理
就是类似于注册中心,拉取服务用的
Zookeeper的数据模型
节点也可以有子节点,允许少量的数据(1MB)存储在该节点下
1.PERSISENT: 持久化节点——>就是宕机客户端与服务端断开再连时,数据还是保持一致性的
2.EPHEMERAL:临时节点——>断开之后,数据节点就没了,相当于一次会话
服务端常见命令操作
- #在bin目录下
- ./zkServer.sh restart
客户端命令操作
客户端工具zkCli.sh
./zkCli.sh -server localhost:2181
创建临时节点
create -e /test2
创建顺序节点
create -s /test2
创建临时顺序节点
create -es /test2
发现临时节点消失了
ls -s /Path
Java API的操作
- /**
- * 1.建立连接
- */
- @Before
- public void testConnect() throws Exception {
- //1.1第一种方式
- // CuratorFrameworkFactory client = CuratorFrameworkFactory.class.getConstructor().newInstance();
-
- /**
- * 1.2创建一个新的客户端
- * 参数: connectString - 连接到服务器的列表
- * sessionTimeoutMs - 会话超时 ,多久没连就挂
- * connectionTimeoutMs - 连接超时 ,连了多久
- * retryPolicy -使用重试策略返回:客户端
- */
- RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
- // client = CuratorFrameworkFactory.newClient(
- // "192.168.184.129:2181",
- // 60 * 1000,
- // 15 * 1000, retry);
- 开启连接
- // client.start();
-
- /**
- * 1.3第二种方式
- * namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
- */
- client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
- .sessionTimeoutMs(60 * 1000)
- .connectionTimeoutMs(15 * 1000)
- .retryPolicy(retry).namespace("itheima").build();
- client.start();
- }
2.创建节点并且赋值
- /**
- * 2.创建节点:持久 临时 顺序 数据
- * 2.1基本创建
- * 2.2创建带有数据的
- * 2.3多级节点+设置节点类型
- */
- @Test
- public void testCreate() throws Exception {
- //1.基本创建持久化
- String path = client.create()
- .withMode(CreateMode.EPHEMERAL)
- .forPath("/app3", "hehe".getBytes(StandardCharsets.UTF_8));
- System.out.println(path);
-
- //2.让他不结束
- while (true) {
-
- }
- }
3.创建子节点
- /**
- * 创建子节点
- *
- * @throws Exception
- */
- @Test
- public void testMulti() throws Exception {
- //creatingParentsIfNeeded():如果父节点不存在就进行创建
- String path = client.create()
- .creatingParentsIfNeeded()
- .forPath("/app4/p1");
- System.out.println(path);
-
- }
4.查询节点数据
- /**
- * 查询节点数据
- */
- @Test
- public void testGet() throws Exception {
- byte[] bytes = client.getData().forPath("/app3");
- System.out.println(new String(bytes));
- }
5.查询当前路径下所有子节点
- /**
- * 查询子节点
- * getChildren().forPath():得到目录下所有子节点
- * 当前命名空间下的子目录节点
- */
- @Test
- public void testChildren() throws Exception {
- List
path = client.getChildren().forPath("/"); - System.out.println(path);
- }
6.查询节点状态
- /**
- * 查询节点状态信息
- * new Stat()
- * client.getData():查询数据
- * .storingStateIn(stat).forPath():查询节点状态值,将状态值给到Stat
- */
- @Test
- public void testState() throws Exception {
- Stat stat = new Stat();
- System.out.println(stat);
-
- //查询节点状态信息ls -s
- client.getData().storingStatIn(stat).forPath("/app3");
- System.out.println(stat);
- }
7.利用版本version修改节点数据
- /**
- * 根据版本判断修改节点状态值
- * setData().withVersion().forPath()
- * 如果版本不一致就不允许更改,保证了数据一致性,操作的原子性
- */
- @Test
- public void testSetForVersion() throws Exception {
- Stat stat = new Stat();
- //1.查询节点状态
- client.getData().storingStatIn(stat).forPath("/app3");
- //2.得到查询出来节点状态中的版本version
- int version = stat.getVersion();
- System.out.println("该节点数据的版本:"+version);
-
- client.setData().withVersion(version).forPath("/app3","newLife".getBytes(StandardCharsets.UTF_8));
- }
8.删除节点
- /**
- * 4.删除节点
- * 1.删除单个节点 delete().forPath()
- * 2.删除带有子节点的节点 delete().deleteChildrenIfNeeded().forPath
- * 3.必须成功的删除 delete().guaranteed().forPath()
- * 4.回调 inBackground(new BackgroundCallBack()).forPath()
- *
- */
- @Test
- public void testDelete() throws Exception {
- //1.删除单个节点
- client.delete().forPath("/app3");
-
- }
-
- @Test
- public void testDeleteNeeded() throws Exception {
- //1.删除带有子节点的节点
- client.delete().deletingChildrenIfNeeded().forPath("/app4");
- }
-
- /**
- * 可能删除的时候(客户端和Server需要建立连接),可能出现耗时较长导致删除失败的情况
- * delete().guaranteed().forPath()
- */
- @Test
- public void testMustDelete() throws Exception {
- //1.必须成功删除
- client.delete().guaranteed().forPath("app2");
- }
9.回调机制
- /**
- * 回调机制——>这里是删除后的回调
- * delete().guaranteed().inBackground(new BackgroundCallBack())
- * @throws Exception
- */
- @Test
- public void testBack() throws Exception {
- //1.调用回调方法inBackground->并且实现参数接口的回调机制processResult
- client.delete().guaranteed().inBackground((client, event) -> {
- System.out.println("当前节点被删除~");
- System.out.println(event);//2.打印节点信息——>被CuratorEvent捕捉
- }).forPath("/app2");
- }
Watch事件的监听
主要用于发布订阅,比如三个子节点订阅了APP1,当其中一个节点修改了APP1的数据时,会发布信息给到其他两个节点告诉数据的变化
我们对比一下Redis的发布订阅
(53条消息) Redis配置文件+发布订阅+新数据类型_Fairy要carry的博客-CSDN博客
NodeCache:监听特定节点
PathChildrenCache:监听某节点的子节点们
TreeCache:监听整个树节点=NodeCache+PathChildrenCache
- package com.wyh.curator;
-
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.cache.*;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.data.Stat;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
-
- import java.nio.charset.StandardCharsets;
- import java.util.List;
-
- /**
- * @author diao 2022/9/4
- */
-
- public class CuratorWatchTest {
-
- private CuratorFramework client;
-
- /**
- * 1.建立连接
- */
- @Before
- public void testConnect() throws Exception {
-
- RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);//重试策略:3s重试1次,最多10次
-
- /**
- * 1.3第二种方式
- * namespace(“xxx”)相当于是一个命名空间,方便区分,在根目录前加前缀
- */
- client = CuratorFrameworkFactory.builder().connectString("192.168.184.129:2181")
- .sessionTimeoutMs(60 * 1000)
- .connectionTimeoutMs(15 * 1000)
- .retryPolicy(retry).namespace("itheima").build();
- client.start();
- }
-
- /**
- * 1.NodeCache监听当前节点
- * @throws Exception
- */
- @Test
- public void testNodeCache() throws Exception {
- NodeCache nodeCache = new NodeCache(client, "/app2");//1、监听的节点
- //2、注册监听
- nodeCache.getListenable().addListener(new NodeCacheListener() {
- @Override
- public void nodeChanged() throws Exception {
- System.out.println("节点发生变化");
- //2.1变化的结果
- byte[] data = nodeCache.getCurrentData().getData();
- System.out.println(new String(data));
- }
- });
-
- //3.开启监听
- nodeCache.start(true);
-
- //4.需要持续监听——>线程则需要一直存活
- while (true) {
-
- }
- }
-
- /**
- * PathNodeCache监听子节点
- * @throws Exception
- */
- @Test
- public void testPathChildrenCache() throws Exception {
- //1.创建监听对象
- PathChildrenCache childrenCache = new PathChildrenCache(client, "/app2", true);
-
- //2.绑定监听器
- childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
- System.out.println("子节点发生变化...");
- //2.1打印监听事件
- System.out.println(pathChildrenCacheEvent);
- //2.2判断监听事件类型判断哪些是不能监听的
- PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
- //2.3当监听到的事件是update时
- if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
- System.out.println("数据变了!!!");
- byte[] data = pathChildrenCacheEvent.getData().getData();
- System.out.println(new String(data));
- }
- }
- });
-
- //3.开启
- childrenCache.start();
-
- //4.保证持续监听
- while(true){
-
- }
- }
-
- /**
- * TreeNodeCache完成监听
- * 1.new TreeCache封装连接以及节点
- * 2.getListenable().addListener(new TreeCacheListener{
- * childEvent();
- * })
- * @throws Exception
- */
- @Test
- public void testTreeCache() throws Exception {
- //1.创建监听器
- TreeCache treeCache = new TreeCache(client, "/app2");
-
- //2.注册监听
- treeCache.getListenable().addListener(new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
- System.out.println("节点发生变化");
- System.out.println(treeCacheEvent);
- byte[] data = treeCacheEvent.getData().getData();
- System.out.println("改变后数据为:"+data);
- }
- });
- //3.启动
- treeCache.start();
-
- //4.持续监听
- while(true){
-
- }
- }
-
-
- /**
- * 3.释放资源
- */
- @After
- public void testClose() {
- if (client != null) client.close();
- }
-
- }
分布式锁
1.首先先创建节点,比如三个client都要获取,那就在lock节点下创建三个节点(临时节点,保证宕机的时候删除)——>2.获取lock下的所有子节点,客户端getChildren返回所有子节点后比较大小,最小的先获取到锁,如果说用完之后,会将节点删除(类似于之前在finally中将锁释放一样),这里是client.close() ——>3.其他节点不是最小的那么就会创建一个事件监听器,监听比自己小的节点,监听删除事件——>4.当发现监听的节点被删除后,也就是锁释放了,那么此时再次判断是不是最小的,如果是,则获取锁,不是则重复以上操作
分布式锁12306售票
1.将锁加到12306上面
1.任务类
- package com.wyh.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessLock;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author diao 2022/9/4
- */
- public class Ticket12306 implements Runnable {
-
- private volatile int tickets = 10;//数据库票数
-
- private InterProcessMutex lock;
-
- /**
- * 1.通过构造方法初始化lock
- */
- public Ticket12306() {
- ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
- CuratorFramework client= CuratorFrameworkFactory.builder()
- .connectString("192.168.184.129:2181")
- .sessionTimeoutMs(60*1000)
- .connectionTimeoutMs(15*1000)
- .retryPolicy(retry).build();
- client.start();
-
- this.lock=new InterProcessMutex(client,"/lock");
- }
-
- /**
- * 2.线程执行方法
- */
- @Override
- public void run() {
- while (true) {
- //1.加锁
- try {
- lock.acquire(3, TimeUnit.SECONDS);//本质其实就是cas
- if (tickets > 0) {
- System.out.println(Thread.currentThread() + ":" + tickets);
- tickets--;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- try {
- //2.释放锁
- lock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- }
- }
- }
2.主方法
- package com.wyh.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.junit.Before;
-
- /**
- * @author diao 2022/9/4
- */
- public class LockTest {
-
- public static void main(String[] args) {
- Ticket12306 ticket12306 = new Ticket12306();
-
- //1.创建客户端
- Thread t1 = new Thread(ticket12306, "携程");
- Thread t2 = new Thread(ticket12306, "飞猪");
-
- t1.start();
- t2.start();
- }
-
- }
Zookeeper集群搭建
类似redis的分片集群,master互票
Redis分片集群_Fairy要carry的博客-CSDN博客_redis分片和集群
这里弄个假的,多个端口,而不是多个ip
1.创建三个单节点的zk
(55条消息) Zookeeper安装与配置_Murmure__的博客-CSDN博客
2.然后修改配置文件的端口,data,log路径
3.分别记录id值(对于每个节点的id文件)
4.让他们互相知道彼此(在配置文件中配置)
server.服务器id=服务器ip:服务器通信端口:服务器之间投票选举端口
此时已经构建集群环境
5.启动即可
三个都启动
usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
然后看彼此节点状态发现2节点是leader
- [root@localhost ~]# /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
- /usr/bin/java
- ZooKeeper JMX enabled by default
- Using config: /usr/local/zookeeper-cluster/zookeeper-2/bin/../conf/zoo.cfg
- Client port found: 2182. Client address: localhost. Client SSL: false.
- Mode: leader
异常测试
当你把主节点stop之后,再次start之后,主节点还是主节点