Zookeeper如何实现分布式锁?
本文记录一下如何使用Zookeeper原生API和Curator的API两种方式实现分布式锁。
Curator是Netflix开源的一套ZooKeeper客户端框架。
项目:Maven
项目依赖:
因为在curator-framework依赖中是包含了zk的依赖了,所以不需要再加入zk的依赖。
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>5.1.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>5.1.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-testartifactId>
<version>5.1.0version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.12version>
dependency>
curator-test 主要是用来模拟zk环境,如果没有部署zk环境的话可以使用curator-test的TestingServer
来模拟ZK:
public CuratorFramework curatorClient() throws Exception {
TestingServer testingServer = new TestingServer(2181, new File("zk-data"));
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(testingServer.getConnectString())//使用TestingServer模拟zk环境(无需部署zk环境)
.connectionTimeoutMs(15 * 1000)
//会话超时时间
.sessionTimeoutMs(60 * 1000)
//设置重试机制
.retryPolicy(new ExponentialBackoffRetry(10*1000,3))
.build();
curatorFramework.start();
return curatorFramework;
使用zookeeper来实现分布式锁主要是使用zk的临时序列节点
。
zk的节点类型有四种:
即:临时节点、持久节点和序列节点(可以是临时节点也可以是持久节点)
而要实现分布式锁主要借助的是 临时序列节点,临时节点可以在Session会话结束后自动删除,序列节点可以按照顺序创建出节点,如下图所示,在/lock
节点下创建path为hello
的序列节点:节点会自动按顺序创建出带编号:
所以实现zk分布式锁的原理就是:
多个线程争抢一把锁:每个线程去zk创建临时序列节点,利用zookeeper的watch机制,每个线程只监听他前面的节点
,如果自己是这把锁的第一个位置,则执行,否则等待。当第一把锁的任务执行完成后,释放锁,即删除节点,这个事件只会被第二个节点监听到,所以只会有第二个节点收到通知,获得锁。这样的好处是不会通知到所有的节点去争夺锁,
释放锁:第一把锁的任务执行完成后,会删除节点。当Session会话断开后,临时节点会被自动删除,所以也避免了死锁的问题。
注意:所以使用Zookeeper实现的分布式锁是公平锁
。
import lombok.Data;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @Author: LiuShihao
* @Date: 2022/11/2 15:38
* @Desc: 直接使用Zookeeper原生API实现分布式锁
*/
@Data
public class ZookeeperLock implements Watcher,AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback {
ZooKeeper zk;
CountDownLatch countDownLatch = new CountDownLatch(1);
String lockName ;
String threadName;
String lockRoot = "/lock";
public ZookeeperLock(ZooKeeper zk, String threadName) {
this.zk = zk;
this.threadName = threadName;
}
/**
* 使用异步方式创建临时序列节点,阻塞
*/
public void tryLock(String name){
try {
zk.create(lockRoot+"/"+name,threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
countDownLatch.await();
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 删除临时节点,触发watch delete事件
*/
public void unLock(){
try {
zk.delete(lockRoot+lockName,-1);
System.out.println(threadName + " over work....");
}catch (Exception e){
e.printStackTrace();
}
}
/**
* create callback
* @param rc
* @param path
* @param ctx
* @param name
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (name != null){
System.out.println(ctx.toString()+" create path: "+ name);
lockName = name.substring(5);
zk.getChildren(lockRoot, false, this, ctx );
}
}
/**
* getChildren callback
* @param rc
* @param path
* @param ctx
* @param children
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
Collections.sort(children);
int i = children.indexOf(lockName.substring(1));
if(i == 0){
//是当前第一位置, countDownLatch放行
System.out.println(threadName +" i am first....");
try {
zk.setData(lockRoot,threadName.getBytes(),-1);
countDownLatch.countDown();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
//不是,检查前一个是否还存在?
zk.exists(lockRoot+"/"+children.get(i-1),this,this,"abc");
}
}
/**
* exists callback
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
//todo
}
/**
* watch event callback
* 如果第一线程,锁释放了,其实只有第二个节点收到了回调事件!!
* 如果不是第一个节点主动释放锁,而是某一个节点挂了(session断开连接,临时节点自动被删除),也能造成后边的节点收到这个通知,从而让他后边的节点那个跟去watch挂掉这个节点前边的
* @param event
*/
@Override
public void process(WatchedEvent event) {
System.out.println("watch event: "+event.getPath()+" "+event.getType());
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zk.getChildren(lockRoot,false,this ,"abc");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
}
测试:
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @Author: LiuShihao
* @Date: 2022/11/2 16:03
* @Desc:
*/
public class ZkLockTest {
public static ZooKeeper getZkClient()throws Exception{
String connectionString = "192.168.153.131:2181";
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(connectionString,30*1000 , new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
countDownLatch.countDown();
}
}
});
countDownLatch.await();
return zooKeeper;
}
public static void main(String[] args) throws Exception {
ZooKeeper zkClient = ZkLockTest.getZkClient();
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
String threadName = Thread.currentThread().getName();
ZookeeperLock lock = new ZookeeperLock(zkClient,threadName);
lock.tryLock("lock");
Thread.sleep(2000);
lock.unLock();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
System.in.read();
}
}
使用Curator实现分布式锁就简单的多了,因为curator已经帮我们实现了分布式锁:InterProcessLock 类。
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @Author: LiuShihao
* @Date: 2022/10/31 17:46
* @Desc: 使用Curator InterProcessLock 实现分布式锁(公平锁)
* 实现Zookeeper分布式锁,主要是基于Zookeeper的 临时序列节点来实现的。
* 1. 临时节点,指的是节点创建后,如果创建节点的客户端和 Zookeeper 服务端的会话失效(例如断开连接),那么节点就会被删除。
* 2. 持久节点指的是节点创建后,即使创建节点的客户端和 Zookeeper 服务端的会话失效(例如断开连接),节点也不会被删除,只有客户端主动发起删除节点的请求,节点才会被删除。
* 3. 序列节点,这种节点在创建时会有一个序号,这个序号是自增的。序列节点既可以是临时序列节点,也可以是持久序列节点。
*
* 临时序列实现分布式锁原理:
* 当客户端来加锁的时候,会先在加锁的节点下建立一个子节点,这个节点就有一个序号,类似 lock-000001 ,
* 创建成功之后会返回给客户端所创建的节点,然后客户端会去获取这个加锁节点下的所有客户端创建的子节点,当然也包括自己创建的子节点。
* 拿到所有节点之后,给这些节点进行排序,然后判断自己创建的节点在这些节点中是否排在第一位,
* 如果是的话,那么就代表当前客户端就算加锁成功了,如果不是的话,那么就代表当前客户端加锁失败。
* 加锁失败的节点并不会不停地循环去尝试加锁,而是在自己创建节点的前一个节点上加一个监听器,然后就进行等待。
* 当前面一个节点释放了锁,就会反过来通知等待的客户端,然后客户端就加锁成功了。
*
* 从这里可以看出redis和zk防止死锁的实现是不同的,redis是通过过期时间来防止死锁,而zk是通过临时节点来防止死锁的。
*
* 为什么使用顺序节点?其实为了防止羊群效应。
* 如果没有使用顺序节点,假设很多客户端都会去加锁,那么加锁就会都失败,都会对加锁的节点加个监听器,
* 那么一旦锁释放,那么所有的加锁客户端都会被唤醒来加锁,那么一瞬间就会造成很多加锁的请求,增加服务端的压力。
*
* zk实现的分布式锁是公平的吗?
* 其实使用临时顺序节点实现的分布式锁就是公平锁。所谓的公平锁就是加锁的顺序跟成功加锁的顺序是一样的。
* 因为节点的顺序就是被唤醒的顺序,所以也就是加锁的顺序,所以天生就是公平锁。
*/
@Slf4j
public class CuratorLock {
public CuratorFramework getCuratorFramework(){
String connectionString = "192.168.153.131:2181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) throws Exception {
String lock = "/lock";
//获得两个客户端
CuratorFramework client1 = new CuratorLock().getCuratorFramework();
CuratorFramework client2 = new CuratorLock().getCuratorFramework();
final InterProcessLock lock1 = new InterProcessMutex(client1, lock);
final InterProcessLock lock2 = new InterProcessMutex(client2, lock);
//模拟两个线程
new Thread(() -> {
try {
//线程加锁
lock1.acquire();
System.out.println("线程1获取锁");
//线程沉睡
Thread.sleep(5*1000);
//线程解锁
lock1.release();
System.out.println("线程1释放了锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
//线程2
new Thread(() -> {
//线程加锁
try {
lock2.acquire();
System.out.println("线程2获取到锁");
//线程沉睡
Thread.sleep(5*1000);
lock2.release();
System.out.println("线程2释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
Github:https://github.com/Liu-Shihao/springboot-curator
https://baijiahao.baidu.com/s?id=1735496665409408331&wfr=spider&for=pc
https://blog.csdn.net/weixin_47025166/article/details/125451987