• zookeeper学习二


    zookeeper分布式协调,扩展,可靠性,时序性问题

    zookeeper具备可扩展,可靠性,以及时序性以及快速【写快,失败到恢复可用快】。

    zookeeper扩展性

    首先zk是采用的主从架构,并且把角色分为Leader,Follower,Observer
    并且是读写分离的,只有Follower才能选举,【Observer不参与,Observer能放大查询能力】
    在这里插入图片描述

    zookeeper可靠性

    可靠性来源于zk的快速恢复
    还有数据的可靠,可用,一致性【采用的最终一致性】
    但是需要确定,在同步数据过程中,节点是否对外提供服务?【属于分布式的】
    不对外提供服务
    参考如下链接:
    https://www.douban.com/note/208430424/?_i=0820002RAVL_BI,0820033RAVL_BI
    由此引出ZK的ZAB协议,原子广播协议
    在这里插入图片描述

    ZAB 原子广播协议

    原子:要么成功,要么失败,没有中间状态【使用队列,加上两阶段提交的方式】
    广播:分布式多节点,并不代表全部知道【过半】
    队列:先进先出,顺序性
    前提:,作用在可用状态,并且是有leader的。
    假设有一个leader以及两个follower:
    1、如果一个客户端对一个follower发起了一个写操作,
    2、这个写操作会转发给我们的leader,
    3、leader里面会创建一个事务id。
    4、会给两个follower发起第一阶段向磁盘写日志的事情
    4-1、在这个通信的过程中使用了队列的概念
    4-2、follower是会回复一个确认消息到leader
    5、只要任意一个follower返回确认消息到了leader(3/2+1,包含leader自己,过半了),
    然后会推送数据让数据写到ZK各自的内存里面去【最终一致性体现】
    6、上述写完之后,会给客户端连接的那个follower返回ok,这个follower再把ok返回给客户端。
    在这里插入图片描述
    有一个同步获取的方式,就是说如果当前拿到的数据可能不是最新的则调用同步的过程拿到最新的一个数据。

    ZOOKEEPER集群选举过程

    分为如下几种情况:
    1、第一次启动集群
    2、重启集群,leader挂了后
    每个节点自己都有它各自的myid,也有zxid
    选举新的leader()应该具备如下条件:
    1、数据最全的【zxid最大的】
    2、如果数据都全,则看myid最大的
    前置:过半通过的数据才是真数据。你见到的可用的Zxid
    如下准备四台机器,分别配置myid和zxid:
    在这里插入图片描述
    集群第一次启动的时候只要达到了过半的台数,那么就会选择一个myid最大的来作为leader。
    假设启动后:
    4是选出来的leader,
    在这里插入图片描述
    如果4号挂掉了的话,如何快速选出一个leader:
    在这里插入图片描述
    假如说是node03发现了node04离线了,那么它会给其他节点发送信息【myid,以及zxid】:

    在这里插入图片描述
    以及给自己投票加1,然后其他两个节点收到了这个节点的投票,会比较事务id,如果当前事务id小于
    其他节点的事务id,那么其他节点会把这张票给扔掉,然后将自己新的事务id给发送回去:
    在这里插入图片描述
    在这里插入图片描述
    然后node03记录node01和node02加一票
    在这里插入图片描述
    node02和node01给票的时候也会给自己投一张票,存在内存里面,并且把票广播出去
    在node01这边发现node02的myid大于它本身,所以要给node02投票
    在这里插入图片描述
    然后node03把票投回来,node01也把票投回来
    在这里插入图片描述
    再得到最后的状态,
    在这里插入图片描述
    ZK选举过程:
    1、38888造成两两通信【任何一个人投票都会投给其他广播的人】
    2、只要任何人投票,都会触发那个准leader发起自己的投票
    3、推选制度的逻辑是先比较zxid,如果zxid相同,则比较myid
    当配置4台zookeeper集群的时候,如果挂掉了其中两台,那么zookeeper就不能再
    对外提供服务了,输出如下错误信息:
    在这里插入图片描述
    如果1,2,3,4依次启动,
    然后再依次挂掉3,4
    再重启4,那么此时的2是leader,因为4的zxid小于1和2的,然后1和2再选一个老的
    就是2。
    redis可靠性比不上zookeeper,但是redis比zookeeper快。

    zookeeper的watch监控

    采用观察模式来实现的
    zookeeper是统一视图的(sync之后从每个节点取出的数值都是一样的)
    并且也是目录树结构
    watch【观察这个目录是否发生变化】:
    如果客户端与客户端之间有关联,并且将客户端的标识是存在目录树结构里面的
    ,那么当一个客户端挂掉了以后,另一个客户端需要对此做一些操作【zk是协调服务的
    并不是提供服务的】
    当然也可以采用心跳验证:watch是基于zk,心跳的话是需要自己代码实现。
    他们的区别在于方向性和时效性不一样
    心跳:
    1、需要每段实现询问
    而zookeeper:
    1、则是加上一个session,如果客户端挂掉了则会马上让session小时,并且会产生
    一个事件(event),目录树上产生的事件类型有(create,delete,change,children),产生事件后
    会主动回调到watch的那个方法,后面再产生回调。【时效性强】
    在这里插入图片描述

    zookeeper的API使用代码实现

    pom文件引入:
    注意这个版本要和linux环境上面的版本一致

    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
     public static void main(String[] args)  throws Exception{
            //zk是有session概念的,没有连接池的概念
            //watch:观察,回调
            //watch的注册值发生在 读类型调用,get,exites。。。
            //第一类:new zk 时候,传入的watch,这个watch,session级别的,跟path 、node没有关系。
            ZooKeeper zooKeeper = new ZooKeeper("192.168.75.128:2181,192.168.75.130:2181,192.168.75.131:2181,192.168.75.132:2181",
                    3000, new Watcher() {
                //Watch的回调方法
                @Override
                public void process(WatchedEvent watchedEvent) {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();
                    System.out.println(watchedEvent.toString());
                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            System.out.println("connected");
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                    }
    
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                    }
    
                }
            });
            ZooKeeper.States state = zooKeeper.getState();
            switch (state) {
                case CONNECTING:
                    System.out.println("...ing ");
                    break;
                case ASSOCIATING:
                    break;
                case CONNECTED:
                    System.out.println("ed........");
                    break;
                case CONNECTEDREADONLY:
                    break;
                case CLOSED:
                    break;
                case AUTH_FAILED:
                    break;
                case NOT_CONNECTED:
                    break;
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    如上代码启动最后输出ing,标识连接zk的状态为一个正在连接的状态并不是一个连接好了的状态。
    解决如上问题则需要加上一个countDownLatch

    
        public static void main(String[] args)  throws Exception{
            //zk是有session概念的,没有连接池的概念
            //watch:观察,回调
            //watch的注册值发生在 读类型调用,get,exites。。。
            //第一类:new zk 时候,传入的watch,这个watch,session级别的,跟path 、node没有关系。
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper("192.168.75.128:2181,192.168.75.130:2181,192.168.75.131:2181,192.168.75.132:2181",
                    3000, new Watcher() {
                //Watch的回调方法
                @Override
                public void process(WatchedEvent watchedEvent) {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();
                    System.out.println(watchedEvent.toString());
                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            System.out.println("connected");
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                    }
    
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                    }
    
                }
            });
            //只有真正集群回调了我的watch事件以后那么才会从这往后执行
            countDownLatch.await();
            ZooKeeper.States state = zooKeeper.getState();
            switch (state) {
                case CONNECTING:
                    System.out.println("...ing ");
                    break;
                case ASSOCIATING:
                    break;
                case CONNECTED:
                    System.out.println("ed........");
                    break;
                case CONNECTEDREADONLY:
                    break;
                case CLOSED:
                    break;
                case AUTH_FAILED:
                    break;
                case NOT_CONNECTED:
                    break;
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    输出如下:
    在这里插入图片描述
    下面是zk创建节点的代码:
    如下watch回调只发生了一次。

      //创建有两种方式分别是同步阻塞的以及异步模型的
            String pathName = zooKeeper.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            Stat stat = new Stat();
            //取数据的时候同时注册一个观察
            byte[] node = zooKeeper.getData("/ooxx", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //未来发生变化会回调到这里来
                    //getData wathch:WatchedEvent state:SyncConnected type:NodeDataChanged path:/ooxx,只发生一次
                    System.out.println("getData wathch:" + watchedEvent.toString());
                }
            }, stat);
            System.out.println(new String(node));
            //会触发上面的回调
            Stat stat1 = zooKeeper.setData("/ooxx", "newdata".getBytes(), 0);
            //还会触发吗?不会
            zooKeeper.setData("/ooxx","newdata01".getBytes(),stat1.getVersion());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    那么我想getData的时候重复注册这个watch应该怎么做呢(前一个事件被回调完那么立刻注册一个新的进去):

      String pathName = zooKeeper.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            Stat stat = new Stat();
            //取数据的时候同时注册一个观察
            byte[] node = zooKeeper.getData("/ooxx", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //未来发生变化会回调到这里来
                    //getData wathch:WatchedEvent state:SyncConnected type:NodeDataChanged path:/ooxx,只发生一次
                    System.out.println("getData wathch:" + watchedEvent.toString());
    
                    try {
                        //除了自己处理再去注册一个watch
                        //watch为true是default watch被注册,是newzk的watch
                        zooKeeper.getData("/ooxx",this,stat);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, stat);
            System.out.println(new String(node));
            //会触发上面的回调
            Stat stat1 = zooKeeper.setData("/ooxx", "newdata".getBytes(), 0);
            //还会触发吗?此时会再次触发
            zooKeeper.setData("/ooxx","newdata01".getBytes(),stat1.getVersion());
          
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    结果如下:
    在这里插入图片描述
    要控制台能够看到zk的连接日志首先将zk里面的log4j.propertie导入
    在这里插入图片描述
    然后再引入log4j的依赖

     <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-api</artifactId>
              <version>1.7.7</version>
        </dependency>
        <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
              <version>1.7.7</version>
        </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    启动能看到有如下日志打印:

    2022-08-18 22:32:52,282 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=192.168.75.128:2181,192.168.75.130:2181,192.168.75.131:2181,192.168.75.132:2181 sessionTimeout=3000 watcher=com.lsd.zookeeper.App$1@4f7d0008
    2022-08-18 22:32:52,595 [myid:] - INFO  [main-SendThread(192.168.75.128:2181):ClientCnxn$SendThread@975] - Opening socket connection to server 192.168.75.128/192.168.75.128:2181. Will not attempt to authenticate using SASL (unknown error)
    2022-08-18 22:32:52,596 [myid:] - INFO  [main-SendThread(192.168.75.128:2181):ClientCnxn$SendThread@852] - Socket connection established to 192.168.75.128/192.168.75.128:2181, initiating session
    2022-08-18 22:32:52,603 [myid:] - INFO  [main-SendThread(192.168.75.128:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server 192.168.75.128/192.168.75.128:2181, sessionid = 0x182b10432a50004, negotiated timeout = 4000
    new zk watch :WatchedEvent state:SyncConnected type:None path:null
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述
    那么如果我的zk挂了会怎么办呢?
    我们断掉重连一下,发现此时是连的132的客户端:
    在这里插入图片描述
    然后让132客户端挂掉:
    idea会打印如下信息:

    2022-08-18 22:41:27,467 [myid:] - INFO  [main-SendThread(192.168.75.132:2181):ClientCnxn$SendThread@1098] - Unable to read additional data from server sessionid 0x482b105294f0009, likely server has closed socket, closing socket connection and attempting reconnect
    new zk watch :WatchedEvent state:Disconnected type:None path:null
    getData wathch:WatchedEvent state:Disconnected type:None path:null
    2022-08-18 22:41:28,093 [myid:] - INFO  [main-SendThread(192.168.75.128:2181):ClientCnxn$SendThread@975] - Opening socket connection to server 192.168.75.128/192.168.75.128:2181. Will not attempt to authenticate using SASL (unknown error)
    2022-08-18 22:41:28,094 [myid:] - INFO  [main-SendThread(192.168.75.128:2181):ClientCnxn$SendThread@852] - Socket connection established to 192.168.75.128/192.168.75.128:2181, initiating session
    2022-08-18 22:41:28,098 [myid:] - INFO  [main-SendThread(192.168.75.128:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server 192.168.75.128/192.168.75.128:2181, sessionid = 0x482b105294f0009, negotiated timeout = 4000
    new zk watch :WatchedEvent state:SyncConnected type:None path:null
    connected
    getData wathch:WatchedEvent state:SyncConnected type:None path:null
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    new zk的watch会马上发一个断掉连接的回调,然后报一个syncConnected,并且会把连接切换到别的地方去,且sessionId是没有发生改变的。

    下面来异步回调的:

        System.out.println("------------------async start--------------------");
    
            zooKeeper.getData("/ooxx", false, new AsyncCallback.DataCallback() {
                //rc:返回的状态码 path:路径,ctx:上下文  data:数据 stat:元数据
                @Override
                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                    System.out.println("------ async call back --------");
                    System.out.println(new String(data));
                }
            },"abc");
    
            System.out.println("------------------async end--------------------");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    输出如下:
    在这里插入图片描述
    发现代码不会阻塞的,回调是在后面调用的

  • 相关阅读:
    CPU核心、使用率、负荷、是否开启超线程、如何排查java程序cpu使用率过高
    Python自动化办公【自动组织文件】
    人工智能基础_机器学习036_多项式回归升维实战3_使用线性回归模型_对天猫双十一销量数据进行预测_拟合---人工智能工作笔记0076
    华钜同创:跨境电商运营如何优化Listing文案
    输入和输出处理
    React.FC介绍
    Linux安装RabbitMQ详细教程
    java-net-php-python-springboot班级信息系统计算机毕业设计程序
    计算机毕业设计:基于python机器学习的全国气象数据采集预测可视化系统 预测模型+爬虫(包含文档+源码+部署教程)
    树/二叉树/森林之间的相互转换 与遍历
  • 原文地址:https://blog.csdn.net/lsdstone/article/details/126411703