目录
前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。
1)创建一个工程:zookeeper
2)添加pom文件
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>RELEASE</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.8.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.7</version>
- </dependency>
- </dependencies>
3)拷贝log4j.properties文件到项目根目录
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。
- log4j.rootLogger=INFO, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4)创建包名com.atguigu.zk
5)创建类名称zkClient
- // 注意:逗号前后不能有空格
-
- private static String connectString =
- "hadoop102:2181,hadoop103:2181,hadoop104:2181";
-
- private static int sessionTimeout = 2000;
- private ZooKeeper zkClient = null;
-
-
- @Before
-
- public void init() throws Exception {
-
- zkClient = new ZooKeeper(connectString, sessionTimeout, new
- Watcher() {
-
- @Override
- public void process(WatchedEvent watchedEvent) {
-
- // 收到事件通知后的回调函数(用户的业务逻辑)
- System.out.println(watchedEvent.getType() + "--"
- + watchedEvent.getPath());
-
- // 再次启动监听
- try {
- List
children = zkClient.getChildren("/", - true);
- for (String child : children) {
- System.out.println(child);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- // 创建子节点
-
- @Test
-
- public void create() throws Exception {
-
- // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;
- 参数 4:节点的类型
- String nodeCreated = zkClient.create("/atguigu",
- "shuaige".getBytes(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
测试:在 hadoop102 的 zk 客户端上查看创建节点情况
- [zk: localhost:2181(CONNECTED) 16] get -s /atguigu
- shuaige
- // 获取子节点
-
- @Test
-
- public void getChildren() throws Exception {
-
- List<String> children = zkClient.getChildren("/", true);
-
- for (String child : children) {
- System.out.println(child);
- }
-
- // 延时阻塞
- Thread.sleep(Long.MAX_VALUE);
- }
(1)在 IDEA 控制台上看到如下节点:
- zookeeper
- sanguo
- atguigu
(2)在 hadoop102 的客户端上创建再创建一个节点/atguigu1,观察 IDEA 控制台
[zk: localhost:2181(CONNECTED) 3] create /atguigu1 "atguigu1"
(3)在 hadoop102 的客户端上删除节点/atguigu1,观察 IDEA 控制台
[zk: localhost:2181(CONNECTED) 4] delete /atguigu1
- // 判断 znode 是否存在
-
- @Test
-
- public void exist() throws Exception {
-
- Stat stat = zkClient.exists("/atguigu", false);
-
- System.out.println(stat == null ? "not exist" : "exist");
- }
1.写流程之写入请求直接发送给Leader节点
2.写流程之写入请求发送给follower节点
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
服务器动态上下线
1)先在集群上创建/servers 节点
- [zk: localhost:2181(CONNECTED) 10] create /servers "servers"
- Created /servers
(2)在 Idea 中创建包名:com.atguigu.zkcase1
(3)服务器端向 Zookeeper 注册代码
-
- package com.atguigu.case1;
- import java.io.IOException;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import org.apache.zookeeper.ZooDefs.Ids;
-
- public class DistributeServer {
-
- private static String connectString =
- "hadoop102:2181,hadoop103:2181,hadoop104:2181";
- private static int sessionTimeout = 2000;
- private ZooKeeper zk = null;
- private String parentNode = "/servers";
-
-
- // 创建到 zk 的客户端连接
-
- public void getConnect() throws IOException{
-
- zk = new ZooKeeper(connectString, sessionTimeout, new
- Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
-
- }
- });
- }
-
-
- // 注册服务器
-
- public void registServer(String hostname) throws Exception{
-
- String create = zk.create(parentNode + "/server",
- hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL);
-
- System.out.println(hostname +" is online "+ create);
- }
-
-
- // 业务功能
-
- public void business(String hostname) throws Exception{
- System.out.println(hostname + " is working ...");
-
- Thread.sleep(Long.MAX_VALUE);
- }
-
- public static void main(String[] args) throws Exception {
-
-
- // 1 获取 zk 连接
-
- DistributeServer server = new DistributeServer();
- server.getConnect();
-
- // 2 利用 zk 连接注册服务器信息
-
- server.registServer(args[0]);
-
- // 3 启动业务功能
-
- server.business(args[0]);
- }
- }
(3)客户端代码
-
- package com.atguigu.case1;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
-
- public class DistributeClient {
-
- private static String connectString =
- "hadoop102:2181,hadoop103:2181,hadoop104:2181";
- private static int sessionTimeout = 2000;
- private ZooKeeper zk = null;
- private String parentNode = "/servers";
-
- // 创建到 zk 的客户端连接
-
- public void getConnect() throws IOException {
-
- zk = new ZooKeeper(connectString, sessionTimeout, new
- Watcher() {
- @Override
- public void process(WatchedEvent event) {
-
- // 再次启动监听
-
- try {
- getServerList();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- // 获取服务器列表信息
-
- public void getServerList() throws Exception {
-
- // 1 获取服务器子节点信息,并且对父节点进行监听
-
- List
children = zk.getChildren(parentNode, true); -
-
- // 2 存储服务器信息列表
-
- ArrayList
servers = new ArrayList<>(); -
-
- // 3 遍历所有节点,获取节点中的主机名称信息
-
- for (String child : children) {
- byte[] data = zk.getData(parentNode + "/" + child,
- false, null);
-
- servers.add(new String(data));
- }
-
- // 4 打印服务器列表信息
-
- System.out.println(servers);
- }
-
-
- // 业务功能
-
- public void business() throws Exception{
-
- System.out.println("client is working ...");
- Thread.sleep(Long.MAX_VALUE);
- }
-
- public static void main(String[] args) throws Exception {
-
- // 1 获取 zk 连接
-
- DistributeClient client = new DistributeClient();
- client.getConnect();
-
- // 2 获取 servers 的子节点信息,从中获取服务器信息列表
-
- client.getServerList();
-
- // 3 业务进程启动
-
- client.business();
- }
- }
1)在 Linux 命令行上操作增加减少服务器
(1)启动 DistributeClient 客户端
(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
- [zk: localhost:2181(CONNECTED) 1] create -e -s
- /servers/hadoop102 "hadoop102"
-
- [zk: localhost:2181(CONNECTED) 2] create -e -s
- /servers/hadoop103 "hadoop103"
(3)观察 Idea 控制台变化
[hadoop102, hadoop103]
(4)执行删除操作
- [zk: localhost:2181(CONNECTED) 8] delete
- /servers/hadoop1020000000000
(5)观察 Idea 控制台变化
[hadoop103]
2)在 Idea 上操作增加减少服务器
(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)
(2)启动 DistributeServer 服务
①点击 Edit Configurations…
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
③回到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run “DistributeServer.main()”
④观察 DistributeServer 控制台,提示 hadoop102 is working
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
1)分布式锁实现
- package com.atguigu.case2;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- public class DistributedLock {
-
- // zookeeper server 列表
- private String connectString =
- "hadoop102:2181,hadoop103:2181,hadoop104:2181";
- // 超时时间
- private int sessionTimeout = 2000;
-
- private ZooKeeper zk;
-
- private String rootNode = "locks";
- private String subNode = "seq-";
- // 当前 client 等待的子节点
- private String waitPath;
-
- //ZooKeeper 连接
- private CountDownLatch connectLatch = new CountDownLatch(1);
-
-
- //ZooKeeper 节点等待
- private CountDownLatch waitLatch = new CountDownLatch(1);
-
- // 当前 client 创建的子节点
- private String currentNode;
-
- // 和 zk 服务建立连接,并创建根节点
- public DistributedLock() throws IOException,
- InterruptedException, KeeperException {
-
- zk = new ZooKeeper(connectString, sessionTimeout, new
- Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
- if (event.getState() ==
- Event.KeeperState.SyncConnected) {
- connectLatch.countDown();
- }
-
- // 发生了 waitPath 的删除事件
- if (event.getType() ==
- Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
- waitLatch.countDown();
- }
- }
- });
-
- // 等待连接建立,等待zk连接成功后,才执行程序的下一段代码
- connectLatch.await();
-
- //获取根节点状态
- Stat stat = zk.exists("/" + rootNode, false);
-
- //如果根节点不存在,则创建根节点,根节点类型为永久节点
- if (stat == null) {
- System.out.println("根节点不存在");
- zk.create("/" + rootNode, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- // 加锁方法
- public void zkLock() {
-
- try {
- //在根节点下创建临时顺序节点,返回值为创建的节点路径
- currentNode = zk.create("/" + rootNode + "/" + subNode,
- null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL);
-
- // wait 一小会, 让结果更清晰一些
- Thread.sleep(10);
-
- // 注意, 没有必要监听"/locks"的子节点的变化情况
-
- List<String> childrenNodes = zk.getChildren("/" +
- rootNode, false);
-
- // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
- if (childrenNodes.size() == 1) {
- return;
- } else {
- //对根节点下的所有临时顺序节点进行从小到大排序
- Collections.sort(childrenNodes);
-
- //当前节点名称
- String thisNode = currentNode.substring(("/" +
- rootNode + "/").length());
- //获取当前节点的位置
- int index = childrenNodes.indexOf(thisNode);
-
- if (index == -1) {
- System.out.println("数据异常");
- } else if (index == 0) {
- // index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
- return;
- } else {
- // 获得排名比 currentNode 前 1 位的节点
- this.waitPath = "/" + rootNode + "/" +
- childrenNodes.get(index - 1);
-
- // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
- zk.getData(waitPath, true, new Stat());
- //进入等待锁状态
- waitLatch.await();
-
- return;
- }
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- // 解锁方法
- public void zkUnlock() {
- try {
- zk.delete(this.currentNode, -1);
- } catch (InterruptedException | KeeperException e) {
- e.printStackTrace();
- }
- }
- }
-
-
-
2)分布式锁测试
- package com.atguigu.case2;
-
- import org.apache.zookeeper.KeeperException;
-
- import java.io.IOException;
-
- public class DistributedLockTest {
-
- public static void main(String[] args) throws
- InterruptedException, IOException, KeeperException {
-
- // 创建分布式锁 1
- final DistributedLock lock1 = new DistributedLock();
- // 创建分布式锁 2
- final DistributedLock lock2 = new DistributedLock();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- // 获取锁对象
- try {
- lock1.zkLock();
- System.out.println("线程 1 获取锁");
- Thread.sleep(5 * 1000);
-
- lock1.zkUnlock();
- System.out.println("线程 1 释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- // 获取锁对象
- try {
- lock2.zkLock();
- System.out.println("线程 2 获取锁");
- Thread.sleep(5 * 1000);
-
- lock2.zkUnlock();
- System.out.println("线程 2 释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- }
(2)观察控制台变化:
线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁
1)原生的 Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2)Curator 是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html
3)Curator 案例实操
(1)添加依赖
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>4.3.0</version>
- </dependency>
(2)代码实现
- package com.atguigu.case3;
-
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessLock;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- public class CuratorLockTest {
-
- private String rootNode = "/locks";
-
- // zookeeper server 列表
- private String connectString =
- "hadoop102:2181,hadoop103:2181,hadoop104:2181";
-
- // connection 超时时间
- private int connectionTimeout = 2000;
-
- // session 超时时间
- private int sessionTimeout = 2000;
-
- public static void main(String[] args) {
-
- new CuratorLockTest().test();
- }
-
- // 测试
- private void test() {
-
- // 创建分布式锁 1
- final InterProcessLock lock1 = new
- InterProcessMutex(getCuratorFramework(), rootNode);
-
- // 创建分布式锁 2
- final InterProcessLock lock2 = new
- InterProcessMutex(getCuratorFramework(), rootNode);
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- // 获取锁对象
- try {
- lock1.acquire();
- System.out.println("线程 1 获取锁");
- // 测试锁重入
- lock1.acquire();
- System.out.println("线程 1 再次获取锁");
- Thread.sleep(5 * 1000);
- lock1.release();
- System.out.println("线程 1 释放锁");
- lock1.release();
- System.out.println("线程 1 再次释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- // 获取锁对象
- try {
- lock2.acquire();
- System.out.println("线程 2 获取锁");
- // 测试锁重入
- lock2.acquire();
- System.out.println("线程 2 再次获取锁");
- Thread.sleep(5 * 1000);
- lock2.release();
- System.out.println("线程 2 释放锁");
- lock2.release();
- System.out.println("线程 2 再次释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
-
- // 分布式锁初始化
- public CuratorFramework getCuratorFramework (){
-
- //重试策略,初试时间 3 秒,重试 3 次
- RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
-
- //通过工厂创建 Curator
- CuratorFramework client =
- CuratorFrameworkFactory.builder()
- .connectString(connectString)
- .connectionTimeoutMs(connectionTimeout)
- .sessionTimeoutMs(sessionTimeout)
- .retryPolicy(policy).build();
-
- //开启连接
- client.start();
- System.out.println("zookeeper 初始化完成...");
- return client;
- }
-
- }
(2)观察控制台变化:
线程 1 获取锁
线程 1 再次获取锁
线程 1 释放锁
线程 1 再次释放锁
线程 2 获取锁
线程 2 再次获取锁
线程 2 释放锁
线程 2 再次释放锁
半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:
投票过半数时,服务器 id (myid)大的胜出
(2)第二次启动选举规则:
①EPOCH 大的直接胜出
②EPOCH 相同,事务 id 大的胜出
③事务 id 相同,服务器 id 大的胜出
安装奇数台。
生产经验:
⚫ 10 台服务器:3 台 zk;
⚫ 20 台服务器:5 台 zk;
⚫ 100 台服务器:11 台 zk;
⚫ 200 台服务器:11 台 zk
服务器台数多:好处,提高可靠性;
坏处:提高通信延时
ls、get、create、delete