• ZooKeeper-实战


    zookeeper-javaApi

    ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:· ZooKeeper官方的Java客户端API。· 第三方的Java客户端API。ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:

    • ZooKeeper官方的Java客户端API。
    • 第三方的Java客户端API。

    zooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:

    • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。
    • 会话超时之后没有实现重连机制。
    • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
    • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
    • 创建节点时如果抛出异常,需要自行检查节点是否存在。
    • 无法实现级联删除。

    总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。可供选择的Java客户端API之二,即第三方开源客户端API,主要有ZkClientCurator.

    1 ZkClient开源客户端介绍

    ZkClient是一个开源客户端,在ZooKeeper原生API接口的基础上进行了包装,更便于开发人员使用。ZkClient客户端在一些著名的互联网开源项目中得到了应用,例如,阿里的分布式Dubbo框架对它进行了无缝集成。ZkClient解决了ZooKeeper原生API接口的很多问题。例如,ZkClient提供了更加简洁的API,实现了会话超时重连、反复注册Watcher等问题。虽然ZkClient对原生API进行了封装,但也有它自身的不足之处,具体如下:

    • ZkClient社区不活跃,文档不够完善,几乎没有参考文档。
    • 异常处理简化(抛出RuntimeException)。
    • 重试机制比较难用。
    • 没有提供各种使用场景的参考实现。

    2 Curator开源客户端介绍

    Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。

    Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

    Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

    另外,Curator还提供了一套非常优雅的链式调用API,与ZkClient客户端API相比,Curator的API优雅太多了,以创建ZNode节点为例,让大家实际对比一下。使用ZkClient客户端创建ZNode节点的代码为:

            ZkClient client = new ZkClient("192.168.1.105:2181", 10000, 10000,
                              new SerializableSerializer());
                //根节点路径
                String PATH = "/test";
                //判断是否存在
                booleanrootExists = zkClient.exists(PATH);
                //如果存在,获取地址列表
                if(! rootExists){
                    zkClient.createPersistent(PATH);
                }
                String zkPath  = "/test/node-1";
                booleanserviceExists = zkClient.exists(zkPath);
                if(! serviceExists){
                    zkClient.createPersistent(zkPath);
                }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    使用Curator客户端创建ZNode节点的代码如下:

            CuratorFramework client =
                    CuratorFrameworkFactory.newClient( connectionString, retryPolicy);
            String zkPath = "/test/node-1";
            client.create().withMode(mode).forPath(zkPath);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    总之,尽管Curator不是官方的客户端,但是由于Curator客户端的确非常优秀,就连ZooKeeper一书的作者,鼎鼎大名的Patrixck Hunt都对Curator给予了高度评价,他的评语是:“Guava is to Java that Curator to ZooKeeper”。

    在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。对于ZooKeeper的客户端,我们这里只学习和研究Curator的使用,疯狂创客圈社群的高并发项目——“IM实战项目”,最终也是通过Curator客户端来操作ZooKeeper集群的。

    3 Curator开发的环境准备

    打开Curator的官网,我们可以看到,Curator包含了以下几个包:

    • curator-framework是对ZooKeeper的底层API的一些封装。
    • curator-client提供了一些客户端的操作,例如重试策略等。
    • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
      以上的三个包,在使用之前,首先要在Maven的pom文件中加上包的Maven依赖。这里使用Curator的版本为4.0.0,与之对应ZooKeeper的版本为3.4.x。在pom文件中与依赖设置相关的代码如下:
        <dependency>
        	<groupId>org.apache.curatorgroupId>
        	<artifactId>curator-clientartifactId>
        <version>4.0.0version>
        <exclusions>
       		 <exclusion>
       			 <groupId>org.apache.ZooKeepergroupId>
       			 <artifactId>ZooKeeperartifactId>
    	    exclusion>
        exclusions>
        dependency>
    
        <dependency>
        	<groupId>org.apache.curatorgroupId>
       		 <artifactId>curator-frameworkartifactId>
    	    <version>4.0.0version>
        	<exclusions>
        		<exclusion>
        			<groupId>org.apache.ZooKeepergroupId>
    			    <artifactId>ZooKeeperartifactId>
     		   exclusion>
        exclusions>
        dependency>
       <dependency>
      			  <groupId>org.apache.curatorgroupId>
        		<artifactId>curator-recipesartifactId>
        		<version>4.0.0version>
        <exclusions>
      		  <exclusion>
      		 		 <groupId>org.apache.ZooKeepergroupId>
      		 		 <artifactId>ZooKeeperartifactId>
        		exclusion>
        exclusions>
        dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    4 Curator客户端实例的创建
    在使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例——这是一个CuratorFramework类型的对象,有两种方法:

    • 使用工厂类CuratorFrameworkFactory的静态newClient()方法。
    • 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。

    下面实现一个通用的类,分别使用以上两种方法来创建一下Curator客户端实例,代码如下:

            /**
             * create by尼恩 @ 疯狂创客圈
             **/
            public class ClientFactory
            {
    
                /**
                *@param connectionStringzk的连接地址
                * @return CuratorFramework实例
                */
                public static CuratorFrameworkcreateSimple(String connectionString) {
                  // 重试策略:第一次重试等待1s,第二次重试等待2s,第三次重试等待4s
                  // 第一个参数:等待时间的基础单位,单位为毫秒
                  // 第二个参数:最大重试次数
                  ExponentialBackoffRetryretryPolicy =
                          new ExponentialBackoffRetry(1000, 3);
    
                  // 获取CuratorFramework实例的最简单方式
                  // 第一个参数:zk的连接地址
                  // 第二个参数:重试策略
                  return CuratorFrameworkFactory.newClient(connectionString,
                          retryPolicy);
                }
    
                /**
                *@param connectionStringzk的连接地址
                * @param retryPolicy重试策略
                * @param connectionTimeoutMs连接超时时间
                * @param sessionTimeoutMs会话超时时间
                * @return  CuratorFramework实例
                */
                public static CuratorFrameworkcreateWithOptions(
                      String connectionString, RetryPolicyretryPolicy,
                      int connectionTimeoutMs, int sessionTimeoutMs)
                {
    
                  // 用builder方法创建CuratorFramework实例
                  returnCuratorFrameworkFactory.builder()
                  .connectString(connectionString)
                  .retryPolicy(retryPolicy)
                  .connectionTimeoutMs(connectionTimeoutMs)
                  .sessionTimeoutMs(sessionTimeoutMs)
                  // 其他的创建选项
                  .build();
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    这里用到两种创建CuratorFramework客户端实例的方式,前一个是通过newClient函数去创建,相当于是一个简化版本,只需要设置ZK集群的连接地址和重试策略。

    后一个是通过CuratorFrameworkFactory.builder()函数去创建,相当于是一个复杂的版本,可以设置连接超时connectionTimeoutMs、会话超时sessionTimeoutMs等其他与会话创建相关的选项。

    上面的示例程序将两种创建客户端的方式封装成了一个通用的ClientFactory连接工具类,大家可以直接使用。

    5 通过Curator创建ZNode节点

    通过Curator框架创建ZNode节点,可使用create()方法。create()方法不需要传入ZNode的节点路径,所以并不会立即创建节点,仅仅返回一个CreateBuilder构造者实例。

    通过该CreateBuilder构造者实例,可以设置创建节点时的一些行为参数,最后再通过构造者实例的forPath(String znodePath, byte[] payload)方法来完成真正的节点创建。

    总之,一般使用链式调用来完成节点的创建。在链式调用的最后,需要使用forPath带上需要创建的节点路径,具体的代码如下:

            /**
             * 创建节点
             */
            @Test
            public void createNode() {
                //创建客户端
                CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
    
                try {
                  //启动客户端实例,连接服务器
                  client.start();
    
                  // 创建一个ZNode节点
                  // 节点的数据为payload
    
                  String data = "hello";
                  byte[] payload = data.getBytes("UTF-8");
                  String zkPath = "/test/CRUD/node-1";
                  client.create()
                  .creatingParentsIfNeeded()
                  .withMode(CreateMode.PERSISTENT)
                  .forPath(zkPath, payload);
    
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  CloseableUtils.closeQuietly(client);
              }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    在上面的代码中,在链式调用的forPath创建节点之前,通过该CreateBuilder构造者实例的withMode()方法,设置了节点的类型为CreateMode.PERSISTENT类型,表示节点的类型为持久化节点。

    ZooKeeper节点有4种类型:

    (1)PERSISTENT持久化节点
    (2)PERSISTENT_SEQUENTIAL持久化顺序节点
    (3)PHEMERAL临时节
    (4)EPHEMERAL_SEQUENTIAL临时顺序节点

    这4种节点类型的定义和联系具体如下:

    (1)持久化节点(PERSISTENT)所谓持久节点是指在节点创建后就一直存在,直到有删除操作来主动清除这个节点。持久化节点的生命周期是永久有效的,不会因为创建该节点的客户端会话失效而消失。
    (2)持久化顺序节点(PERSISTENT_SEQUENTIAL)这类节点的生命周期和持久节点是一致的。额外的特性是,在ZooKeeper中,每个父节点会它的第一级子节点维护一份顺序编码,会记录每个子节点创建的先后顺序。如果在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZooKeeper会自动为给定节点名加上一个表示顺序的数字后缀来作为新的节点名。这个顺序后缀数字的数值上限是整型的最大值。例如,在创建节点时只需要传入节点“/test_”, ZooKeeper自动会在”test_”后面补充数字顺序。例如,在创建节点时只需要传入节点“/test_”, ZooKeeper自动会在”test_”后面补充数字顺序。
    (3)临时节点(EPHEMERAL)
    和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效了,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。还要注意一件事,就是当客户端会话失效后,所产生的节点也不是一下子就消失了,也要过一段时间,大概是10秒以内。大家可以试一下,本机操作生成节点,在服务器端用命令来查看当前的节点数目,我们会发现有些客户端的状态已经是stop(中止),但是产生的节点还在。另外,在临时节点下面不能创建子节点。
    (4)临时顺序节点(EPHEMERAL_SEQUENTIAL)
    此节点是属于临时节点,不过带有顺序编号,客户端会话结束,所产生的节点就会消失.

    6 在Curator中读取节点

    在Curator框架中,与节点读取的有关的方法主要有三个:
    (1)首先是判断节点是否存在,调用checkExists方法。
    (2)其次是获取节点的数据,调用getData方法。
    (3)最后是获取子节点列表,调用getChildren方法。演示代码如下:

            /**
             * 读取节点
             */
            @Test
            public void readNode() {
                //创建客户端
                CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
                try {
                  //启动客户端实例,连接服务器
                  client.start();
    
                  String zkPath = "/test/CRUD/node-1";
    
                  Stat stat = client.checkExists().forPath(zkPath);
                  if (null ! = stat) {
                      //读取节点的数据
                      byte[] payload = client.getData().forPath(zkPath);
                      String data = new String(payload, "UTF-8");
                      log.info("read data:", data);
    
                      String parentPath = "/test";
                      List<String> children = client.getChildren().forPath(parentPath);
    
                      for (String child : children) {
                          log.info("child:", child);
                      }
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                } finally {
                  CloseableUtils.closeQuietly(client);
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    无论是checkExists、getData还是getChildren方法,都有一个共同的特点:
    · 返回构造者实例,不会立即执行。
    · 通过链式调用,在最末端调用forPath(String znodePath)方法执行实际的操作。

    7 在Curator中更新节点

    节点的更新分为同步更新与异步更新。同步更新就是更新线程是阻塞的,一直阻塞到更新操作执行完成为止。调用setData()方法进行同步更新,代码如下:

            /**
             * 更新节点
             */
            @Test
            public void updateNode() {
                //创建客户端
                CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
                try {
                  //启动客户端实例,连接服务器
                  client.start();
    
                  String data = "hello world";
                  byte[] payload = data.getBytes("UTF-8");
                  String zkPath = "/test/CRUD/node-1";
                  client.setData()
                  .forPath(zkPath, payload);
    
                } catch (Exception e) {
                  e.printStackTrace();
                } finally {
                  CloseableUtils.closeQuietly(client);
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在上面的代码中,通过调用setData()方法返回一个SetDataBuilder构造者实例,执行该实例的forPath(zkPath, payload)方法即可完成同步更新操作。如果是异步更新呢?其实很简单,通过SetDataBuilder构造者实例的inBackground(AsyncCallback callback)方法,设置一个AsyncCallback回调实例。简简单单的一个函数,就将更新数据的行为从同步执行变成了异步执行。异步执行完成之后,SetDataBuilder构造者实例会再执行AsyncCallback实例的processResult(…)方法中的回调逻辑,即可完成更新后的其他操作。异步更新的代码如下:

            package com.crazymakercircle.zk.basicOperate;
    
            import com.crazymakercircle.zk.ClientFactory;
            import lombok.extern.slf4j.Slf4j;
            import org.apache.curator.framework.CuratorFramework;
            import org.apache.curator.utils.CloseableUtils;
            import org.apache.ZooKeeper.AsyncCallback;
            import org.apache.ZooKeeper.CreateMode;
            import org.apache.ZooKeeper.data.Stat;
            import org.junit.Test;
    
            import java.util.List;
    
            /**
              * create by尼恩 @ 疯狂创客圈
              **/
             @Slf4j
             public class CRUD {
                private static final String ZK_ADDRESS = "127.0.0.1:2181";
    
                //.......省略其他
                /**
                  * 更新节点 - 异步模式
                  */
                @Test
                public void updateNodeAsync() {
                    //创建客户端
                    CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
                    try {
                        //异步更新完成,回调此实例
                        AsyncCallback.StringCallback callback
                              = new AsyncCallback.StringCallback() {
                          //回调方法
                            @Override
                            public void processResult(int i, String s, Object o, String s1)
                          {
                              System.out.println(
                                  "i = " + i + " | " +
                                  "s = " + s + " | " +
                                  "o = " + o + " | " +
                                  "s1 = " + s1
                              );
                            }
                        };
                        //启动客户端实例,连接服务器
                        client.start();
    
                        String data = "hello , every body! ";
                        byte[] payload = data.getBytes("UTF-8");
                        String zkPath = "/test/CRUD/remoteNode-1";
                        client.setData()
                            .inBackground(callback)  //设置回调实例
                            .forPath(zkPath, payload);
    
                        Thread.sleep(10000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        CloseableUtils.closeQuietly(client);
                    }
                }
    
             }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    8 在Curator中删除节点

    删除节点非常简单,只需调用delete方法,实例代码如下:

            package com.crazymakercircle.zk.basicOperate;
            import com.crazymakercircle.zk.ClientFactory;
            import lombok.extern.slf4j.Slf4j;
            import org.apache.curator.framework.CuratorFramework;
            import org.apache.curator.utils.CloseableUtils;
            import org.apache.ZooKeeper.AsyncCallback;
            import org.apache.ZooKeeper.CreateMode;
            import org.apache.ZooKeeper.data.Stat;
            import org.junit.Test;
    
            import java.util.List;
    
            /**
             * create by尼恩 @ 疯狂创客圈
             **/
            @Slf4j
            public class CRUD {
                private static final String ZK_ADDRESS = "127.0.0.1:2181";
    
                //.......省略其他
    
              /**
                * 删除节点
                */
                @Test
                public void deleteNode() {
                  //创建客户端
                  CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
                  try {
                      //启动客户端实例,连接服务器
                      client.start();
    
                      //删除节点
                      String zkPath = "/test/CRUD/remoteNode-1";
                      client.delete().forPath(zkPath);
    
                      //删除后查看结果
                      String parentPath = "/test";
                      List<String> children = client.getChildren().forPath(parentPath);
    
                      for (String child : children) {
                          log.info("child:", child);
                      }
    
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      CloseableUtils.closeQuietly(client);
                  }
                }
    
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    在上面的代码中,通过调用delete()方法返回一个执行删除操作的DeleteBuilder构造者实例,执行该实例的forPath(zkPath, payload)方法,即可完成同步删除操作。删除和更新操作一样,也可以异步进行。如何异步删除呢?思路是一样的:异步删除同样需要用到DeleteBuilder构造者实例的inBackground(AsyncCallbackasyncCallback)方法去设置回调,实际操作很简单,这里就不再赘述。至此,Curator的CRUD基本操作就介绍完了。下面介绍基于Curator的基本操作,完成一些基础的分布式应用。注:CRUD是指Create(创建), Retrieve(查询),Update(更新)和Delete(删除)。

  • 相关阅读:
    几何光学的基本原理
    VMware——WindowServer2012R2环境安装mysql5.7.14解压版_互为主从(图解版)
    《明解C语言》第三版(入门篇),第三章练习答案
    行业轮动从动量因子说起
    RabbitMQ - 如保证消息的可靠性?
    Golang基础-面向过程篇
    使用 prometheus 监控 MySQL
    qlib智能量化里的“因子分析“,“多空分析”
    [附源码]计算机毕业设计springbootQ宝商城
    连词分为并列连词和从属连词
  • 原文地址:https://blog.csdn.net/yitian881112/article/details/127709553