Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK服务会使用Znode存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:
数据组织:数据节点以树形目录(类似文件系统)组织管理,每一个节点中都会保存数据信息和节点信息。

ZooKeeper's Hierarchical Namespace
集群模式:通常是由3、5个基数实例组成集群,当超过半数服务实例正常工作就能对外提供服务,既能避免单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局一致

ZooKeeper Service
顺序更新:更新请求都会转由leader执行,来自同一客户端的更新将按照发送的顺序被写入到ZK,处理写请求创建Znode时,Znode名称后会被分配一个全局唯一的递增编号,可以通过顺序号推断请求的顺序,利用这个特性可以实现高级协调服务

监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个Watch Event,可以感知到节点\数据的变更

临时节点:session链接断开临时节点就没了,不能创建子节点(很关键)
ZK的分布式锁正是基于以上特性来实现的,简单来说是:

创建一个永久节点作为锁节点(/lock2)
试图加锁的客户端在指定锁名称节点(/lock2)下,创建临时顺序子节点
获取锁节点(/lock2)下所有子节点
对所获取的子节点按节点自增序号从小到大排序
判断自己是不是第一个子节点,若是,则获取锁
若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的方式避免了惊群效应)
若是阻塞申请锁,则申请锁的操作可增加阻塞等待
若监听事件生效(说明前节点释放了,可以尝试去获取锁),则回到第3步重新进行判断,直到获取到锁
解锁时,将第一个子节点删除释放
可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:
基于上文的内容,这里简单总结一下ZK的能力矩阵(其它分布式锁的情况会在后续文章中补充):
| 能力 | ZK | MySql | Redis原生 | Redlock | ETCD |
|---|---|---|---|---|---|
| 互斥 | 是 | ||||
| 安全 | 链接异常,session关闭后锁会自动释放 | ||||
| 可用性 | 相对还好 | ||||
| 可重入 | 线程可重入 | ||||
| 加解锁速度 | 居中 | ||||
| 阻塞非阻塞 | 都支持 | ||||
| 公平非公平 | 仅公平锁 |
关于性能不太高的一种说法
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。
由于ZooKeeper的高可用特性,在并发量不是太高的场景,也推荐使用ZK的分布式锁。
Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。
4.1 接口介绍
- // 获取互斥锁
- public void acquire() throws Exception;
- // 在给定的时间内获取互斥锁
- public boolean acquire(long time, TimeUnit unit) throws Exception;
- // 释放锁处理
- public void release() throws Exception;
- // 如果当前线程获取了互斥锁,则返回true
- boolean isAcquiredInThisProcess();
- 复制代码
4.2 pom依赖
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.8.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.7</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>4.3.0</version>
- </dependency>
- 复制代码
4.3 示例
- package com.atguigu.case3;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- public class CuratorLockTest {
-
- public static void main(String[] args) {
-
- // 创建分布式锁1
- InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- // 创建分布式锁2
- InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock1.acquire();
- System.out.println("线程1 获取到锁");
-
- lock1.acquire();
- System.out.println("线程1 再次获取到锁");
-
- Thread.sleep(5 * 1000);
-
- lock1.release();
- System.out.println("线程1 释放锁");
-
- lock1.release();
- System.out.println("线程1 再次释放锁");
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.acquire();
- System.out.println("线程2 获取到锁");
-
- lock2.acquire();
- System.out.println("线程2 再次获取到锁");
-
- Thread.sleep(5 * 1000);
-
- lock2.release();
- System.out.println("线程2 释放锁");
-
- lock2.release();
- System.out.println("线程2 再次释放锁");
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
-
- private static CuratorFramework getCuratorFramework() {
-
- ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
-
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
- .connectionTimeoutMs(2000)
- .sessionTimeoutMs(2000)
- .retryPolicy(policy).build();
-
- // 启动客户端
- client.start();
-
- System.out.println("zookeeper 启动成功");
- return client;
- }
- }
- 复制代码
通过这个实例对照第2节内容来理解加解锁的流程,以及如何避免惊群效应。
- package com.rock.case2;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- /**
- * zk 分布式锁 v1版本:
- * 完成功能 :
- * 1. 避免了惊群效应
- * 缺失功能:
- * 1. 超时控制
- * 2. 读写锁
- * 3. 重入控制
- */
- public class DistributedLock {
-
- private String connectString;
- private int sessionTimeout;
- private ZooKeeper zk;
-
- private CountDownLatch connectLatch = new CountDownLatch(1);
- private CountDownLatch waitLatch = new CountDownLatch(1);
-
- private String waitPath;
- private String currentNode;
- private String LOCK_ROOT_PATH;
-
- private static String NODE_PREFIX = "w";
-
- public DistributedLock(String connectString, int sessionTimeout, String lockName) {
- //TODO:数据校验
- this.connectString = connectString;
- this.sessionTimeout = sessionTimeout;
- this.LOCK_ROOT_PATH = lockName;
- }
-
-
- public void init() throws IOException, KeeperException, InterruptedException {
- // 建联
- zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
- // connectLatch 连接上zk后 释放
- if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
- connectLatch.countDown();
- }
- });
-
- connectLatch.await();// 等待zk正常连接后
-
- // 判断锁名称节点是否存在
- Stat stat = zk.exists(LOCK_ROOT_PATH, false);
- if (stat == null) {
- // 创建一下锁名称节点
- try {
- zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- //并发创建冲突忽略。
- if (!e.code().name().equals("NODEEXISTS")) {
- throw e;
- }
- }
- }
- }
-
- /**
- * 待补充功能:
- * 1. 超时设置
- * 2. 读写区分
- * 3. 重入控制
- */
- public void zklock() throws KeeperException, InterruptedException {
- if (!tryLock()) {
- waitLock();
- zklock();
- }
- }
-
- /**
- *
- */
- private void waitLock() throws KeeperException, InterruptedException {
- try {
- zk.getData(waitPath, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- // waitLatch 需要释放
- if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
- waitLatch.countDown();
- }
- }
- }, new Stat());
- // 等待监听
- waitLatch.await();
- } catch (KeeperException.NoNodeException e) {
- //如果等待的节点已经被清除了,不等了,再尝试去抢锁
- return;
- }
-
- }
-
- private boolean tryLock() throws KeeperException, InterruptedException {
-
- currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
- List<String> children = zk.getChildren(LOCK_ROOT_PATH, false);
- // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
- if (children.size() == 1) {
- return true;
- } else {
- String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
- // 通过w00000000获取该节点在children集合的位置
- int index = children.indexOf(thisNode);
- if (index == 0) {
- //自己就是第一个节点
- return true;
- }
- // 需要监听 他前一个节点变化
- waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
- }
- return false;
- }
-
-
- // 解锁
- public void unZkLock() {
- // 删除节点
- try {
- zk.delete(this.currentNode, -1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
-
- }