• zookeeper第三章:项目案例


    系列文章目录

    zookeeper第一章:集群搭建
    zookeeper第二章:API接口
    zookeeper第三章:项目案例



    前言

    zookeeper的基础内容差不多学完了,所以我们来完成几个案例


    一、修复日志警告

    再之前的项目中,由于依赖的问题,控制台无法正常输出日志文件,现在我们来修复这个问题。
    修改pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
    
        <groupId>org.example</groupId>
        <artifactId>zk</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.13.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.18.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.7.1</version>
            </dependency>
    		//新增依赖
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    之后运行上一个项目的最后一个方法

    在这里插入图片描述
    可以看到日志功能以及恢复。

    二、服务器动态上下线监听案例

    1.需求分析

    分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

    2.代码编写

    创建新项目
    在这里插入图片描述
    DistributeServer.java

    package com.atguigu.case1;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class DistributeServer {
        private String connectString="hadoop103:2181,hadoop103:2181,hadoop104:2181";
        private int seesionTimeout=4000;
        private ZooKeeper zk;
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            DistributeServer server = new DistributeServer();
    
            server.getConnect();
    
            server.regist(args[0]);
    
            server.business();
    
        }
    	//业务逻辑
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    	//客户端注册
        private void regist(String hostname) throws InterruptedException, KeeperException {
            zk.create("/servers/"+hostname, hostname.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(hostname + " is online");
    
        }
    	// 创建到 zk 的客户端连接
        private void getConnect() throws IOException {
            zk = new ZooKeeper(connectString, seesionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                }
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    服务端注册需要输入注册信息所以需要输入参数。
    在这里插入图片描述
    这里输入注册的节点名称
    在这里插入图片描述
    一定要应用在这里插入图片描述

    DistributeClient.java

    package com.atguigu.case1;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class DistributeClient {
        private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private int seesionTimeout=4000;
        private ZooKeeper zk;
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            DistributeClient client = new DistributeClient();
    
            client.getConnect();
    
            client.getServerList();
    
            client.business();
        }
    	//业务代码
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    	//获取节点信息
        private void getServerList() throws InterruptedException, KeeperException {
            List<String> children = zk.getChildren("/servers", true);
    
            ArrayList<Object> servers = new ArrayList<>();
            for (String child : children) {
                byte[] data = zk.getData("/servers/" + child, false, null);
    
                servers.add(new String(data));
            }
            System.out.println(servers);
        }
    	//创建链接
        private void getConnect() throws IOException {
            zk = new ZooKeeper(connectString, seesionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        getServerList();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch (KeeperException e) {
                        throw new RuntimeException(e);
                    }
    
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    3.案例测试

    咱们再/servers目录下测试,要先自己创建。
    在这里插入图片描述
    先运行DistributeClient.java
    在这里插入图片描述
    在运行DistributeServer.java
    在这里插入图片描述
    再查看DistributeClient
    在这里插入图片描述
    现在我们再运行DistributeServer,输入换成hadoop103
    在这里插入图片描述
    由于咱们创建的是临时节点,断开后源节点就会消失,可以根据自己需求修改。
    在这里插入图片描述

    三、ZooKeeper 分布式锁案例

    1.需求分析

    什么叫做分布式锁呢?
    比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
    在这里插入图片描述

    2.代码编写

    在这里插入图片描述

    DistributedLock.java

    package com.atguigu.case2;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class DistributedLock {
        private final String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
        private final int seesionTimeout=4000;
        private final ZooKeeper zk;
    
        private CountDownLatch connectLatch =new CountDownLatch(1);
        private CountDownLatch waitLath =new CountDownLatch(1);
    
        private String waitPath;
        private String currentMode;
    
    	//建立连接
        public DistributedLock() throws IOException, InterruptedException, KeeperException {
            zk = new ZooKeeper(connectString, seesionTimeout, new Watcher() {
                @Override
                //监控节点
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
                        connectLatch.countDown();
                    }
    
                    if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                        waitLath.countDown();
                    }
    
                }
            });
    		//等待连接建立(成功前阻塞进程)
            connectLatch.await();
    		//检测测试节点/locks
            Stat stat = zk.exists("/locks", false);
    		//若测试节点不存在便创建接待你
            if (stat == null) {
                zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    
            }
    
        }
        //加锁
        public void zkLock(){
            try {
                currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
                List<String> children = zk.getChildren("/locks", false);
    
                if (children.size() == 1) {
                    return;
                }else {
                    Collections.sort(children);
    
                    String thisNode = currentMode.substring("/locks/".length());
    
                    int index = children.indexOf(thisNode);
    
                    if (index == -1) {
                        System.out.println("数据异常");
                    } else if (index == 0) {
                        return;
                    }else {
                        waitPath="/locks/"+children.get(index-1);
                        zk.getData(waitPath,true,null);
    
                        waitLath.await();
                        return;
                    }
                }
    
    
            } catch (KeeperException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    
        }
        //释放锁
        public void unzkLock(){
            try {
                zk.delete(currentMode,-1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (KeeperException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    DistractedLockTest.java

    package com.atguigu.case2;
    
    import org.apache.zookeeper.KeeperException;
    
    import java.io.IOException;
    
    public class DistractedLockTest {
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            DistributedLock lock1 = new DistributedLock();
    
            DistributedLock lock2 = new DistributedLock();
    		//两个线程同时运行
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock1.zkLock();
                        System.out.println("线程1 启动 获取到锁");
                        Thread.sleep(5000);
    
                        lock1.unzkLock();
                        System.out.println("线程1 释放锁");
    
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock2.zkLock();
                        System.out.println("线程2 启动 获取到锁");
                        Thread.sleep(5000);
    
                        lock2.unzkLock();
                        System.out.println("线程2 释放锁");
    
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    3.案例测试

    运行DistractedLockTest.java文件.
    在这里插入图片描述

    四、Curator

    Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
    代码编写
    在这里插入图片描述

    CuratorLockTest.java

    package com.atguigu.case3;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
            
    public class CuratorLockTest {
        public static void main(String[] args) {
            InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
    
    
            InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock1.acquire();
                        System.out.println("线程1 获取到锁");
    
                        lock1.acquire();
                        System.out.println("线程1 再次获取到锁");
                        Thread.sleep(5000);
    
                        lock1.release();
                        System.out.println("线程1 释放锁");
    
                        lock1.release();
                        System.out.println("线程1 再次释放锁");
    
    
    
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
    
             new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock2.acquire();
                        System.out.println("线程2 获取到锁");
    
                        lock2.acquire();
                        System.out.println("线程2 再次获取到锁");
                        Thread.sleep(5000);
    
                        lock2.release();
                        System.out.println("线程2 释放锁");
    
                        lock2.release();
                        System.out.println("线程1 再次释放锁");
    
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
        }
    
    
        private static CuratorFramework getCuratorFramework() {
    
            ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
            String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
            int seesionTimeout=4000;
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString)
                    .connectionTimeoutMs(seesionTimeout)
                    .sessionTimeoutMs(seesionTimeout)
                    .retryPolicy(policy).build();
    
            client.start();
    
            System.out.println("zookeeper启动成功");
            return client;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    在这里插入图片描述


    总结

    一些zookeeper的基础案例就分享到这里。

  • 相关阅读:
    Apache Echarts介绍与入门
    前端自动识别CAD图纸提取信息方法总结
    Sentinel微服务流量治理组件实战上
    【Mysql】——索引的深度理解
    ElasticSearch (ES)万字保姆级教程学习笔记(超详细)搜索引擎ElasticSearch7.x笔记(03)
    mysql_config_editor的配置
    硬件基本功--过流、过压保护电路
    使用wireshark解析ipsec esp包
    c++ 栈和栈的应用(图解)
    【1024程序员节专访】聚焦行业前沿,共话IT发展趋势
  • 原文地址:https://blog.csdn.net/weixin_50835854/article/details/126448835