大家好,我是飓风
上一遍文章 分布式锁的实现- mysql,我们讲解了分布式锁实现的特性,主要包含:
- 互斥性
- 超时特性
- 提供阻塞和非阻塞接口
- 可重入性
- 公平锁和非公平锁
- 其他 高性能 高可用等
今天咱们来看看用zookeeper 怎么来实现分布式锁。
这里就不介绍jdk安装了,相信大家肯定google 或者百度都可以查到,很简单,略过。
这里我们利用docker 来快速安装和启动
安装:
docker pull zookeeper:3.4.11
启动:
docker run --name zookeeper --restart always -d zookeeper:3.4.11
这里创建maven 项目就省略了,下面的maven的依赖配置,具体的版本号在我的父pom里,完成代理,我会传到github 上。
<dependencies>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<exclusions>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>org.slf4jartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-configuration-processorartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-bootartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-autoconfigureartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>transmittable-thread-localartifactId>
dependency>
dependencies>
Watcher 在 zookeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应。
通过上面的zookeeper的介绍,我们知道,zookeeper 是读写是原子性的,且节点不能重复创建的,那么我们就让客户端,在获取分布式锁的时候,去创建这个临时节点,先创建的client ,那么就会返回创建成功,其他都会返回创建失败,其他创建失败,此时需要监听这个 临时节点,如果临时节点被删除了,那么说明就是释放锁了,其他client 可以接着创建这个临时节点,来争抢分布式锁。
之所以会利用临时节点,如果程序down掉了,那么此时临时节点就会自动删除,不会出现死锁的现象。
举例:比如我们进行某个SKU库存的扣减,那么此时zookeeper 的创建节点的路径咱们可以设置为 /lock/sku/100121212, 其中 100121212 就是要扣减的sku的值,也是一个临时节点。
下面我画个图,看完相信会更加清晰。
1.创建临时节点,也就是开始获取锁,如下图:

2.client1 获取锁成功,如下图:

3.此时client1 获取锁成功,其他client2 和client3 获取锁失败
接着client2 和 client3 开始监听 这个临时节点是否被删除了,如下图:

4.client1 执行完扣减库存业务,那么就会删除临时节点,也就是释放锁,那么其他client2 和client3 监听到这个临时节点被删除了,那么就会再次进行锁的获取,也就是创建这个临时节点了。

通过上面这个几个步骤,一个基于zookeeper临时节点的分布式锁就实现了。但是这里有些问题需要说明下:
当大量客户端去竞争锁的时候,会发生“惊群”效应,这里惊群效应指的是在分布式锁竞争的过程中,大量的"Watcher通知"和“创建/lock/sku/xxxx”两个操作重复运行,并且绝大多数运行结果都创建节点失败,从而继续等待下一次通知,若在集群规模较大的情况下,会对ZooKeeper服务器以及客户端服务器造成巨大的性能影响和网络冲击,所以基于这种方式的实现,并发量上支持不很高,大流量下不建议使用。
下面我来介绍改进方案
我们可以利用创建zookeeper的临时顺序节点的方式,来解决“惊群”效应,其实是一种公平锁的实现,下面说下具体的步骤:



如下图所示,001 被删除了,那么client2 就会监听到001 被删除了,于是再次获取到子节点集合,判断自己已经示最小的节点了,那么获取锁成功了。

这里代码不做过多解释了,给了主要类的实现说明,可以和上面的原理对应上的。
1 实现了阻塞获取锁
2 实现了非阻塞获取锁
3 锁的可重入性
lock 接口 ,定义要实现获取锁和释放锁的方法
public interface Lock {
/**
* 阻塞获取锁
* @return
*/
void lock(String source) throws LockException;
/**
* 非阻塞获取锁
* @return
*/
boolean nonLock(String source,int retries);
/**
* 释放锁
*/
boolean unLock();
}
lock 接口的实现
@Slf4j
@RequiredArgsConstructor
@Component
@Scope(value = "prototype")
public class ZookeeperLock implements Lock {
private final CuratorFramework curatorFramework;
private final Watcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted){
notifyAllFromWatcher();
}
};
/**
* key: lock path
* value: 重入的次数
*/
private static final TransmittableThreadLocal<LockInfo> THREAD_LOCAL = new TransmittableThreadLocal<>();
private LockInfo getLocalMap(){
LockInfo lockInfo = THREAD_LOCAL.get();
if (lockInfo == null){
lockInfo = new LockInfo();
THREAD_LOCAL.set(lockInfo);
}
return lockInfo;
}
private String createLockPath(String source) {
String base = "/" + source;
// 创建临时节点,这里肯定谁最小是谁先创建出来
String currentPath = null;
try {
currentPath = curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(base+"/lock_");
} catch (Exception e) {
e.printStackTrace();
}
return currentPath;
}
private String getLockPath(){
final LockInfo lockInfo = getLocalMap();
return lockInfo.lockPath;
}
private String getBasePath(){
final LockInfo lockInfo = getLocalMap();
return "/"+lockInfo.source;
}
private String getSource(){
final LockInfo lockInfo = getLocalMap();
return lockInfo.source;
}
private void setLock(){
final LockInfo lockInfo = getLocalMap();
lockInfo.lock = true;
}
private void incCount(){
final LockInfo lockInfo = getLocalMap();
lockInfo.count++;
}
private void deCount(){
final LockInfo lockInfo = getLocalMap();
lockInfo.count--;
}
private int getCount(){
return getLocalMap().count;
}
private boolean isLock(){
return getLocalMap().lock;
}
private synchronized void notifyAllFromWatcher(){
notifyAll();
}
private void nextLock() throws LockException {
boolean deleted = false;
try {
// 不相等,那么说明有比它大的,那么找出它的弟弟节点,进行监听
//监听上一个节点
final List<String> childrenPath = curatorFramework.getChildren().forPath(getBasePath());
final String youngerBrother = CommonUtil.getYoungerBrother(getSource(), childrenPath, getLockPath());
//如果为空个,说明就剩下她自己一个了,那么直接返回获取
if (StringUtils.isEmpty(youngerBrother)){
log.error("currentThread=> "+Thread.currentThread().getId()+"'s youngerBrother is null ");
lock(getSource());
return;
}
curatorFramework.getData().usingWatcher(watcher).forPath(youngerBrother);
synchronized(this){
wait();
}
lock(getSource());
}catch (Exception e){
//如果是 NoNodeException ,说明我监听的节点不存了,那么如要继续获取锁
if(e instanceof KeeperException.NoNodeException){
lock(getSource());
return;
}
e.printStackTrace();
deleted = true;
throw new LockException("获取锁失败:" + e.getMessage());
}finally {
// 等待超时,没有获取到锁,那么删除zookeeper 中的临时节点和thread local内的数据
if (deleted){
removeResource();
throw new LockException("超时-获取锁失败");
}
}
}
private void initCurrentLock(String source) throws LockException {
final LockInfo lockInfo = getLocalMap();
//如果为空,那么说第一次尝试获取锁
if (StringUtils.isEmpty(lockInfo.lockPath)){
String lockPath = createLockPath(source);
if (StringUtils.isEmpty(lockPath)){
throw new LockException("创建锁失败,请稍后重试");
}
lockInfo.source = source;
lockInfo.lockPath = lockPath;
}
}
@Override
public void lock(String source) throws LockException {
initCurrentLock(source);
//如果获得了,那么不要继续了
if (lockResource()){
return;
}
try {
//阻塞 监听
nextLock();
} catch (Exception e) {
e.printStackTrace();
throw new LockException("获取锁失败,请稍后重试");
}
}
@Override
public boolean nonLock(String source,int retries){
boolean notLock = false;
try {
initCurrentLock(source);
while (retries>0){
if (lockResource()){
return true;
}
retries--;
}
if (retries==0){
notLock = true;
}
} catch (LockException e) {
e.printStackTrace();
log.error("上锁失败: {} ",e.getMessage());
notLock = true;
return false;
}finally {
//如果出现异常了,那么一定要删除
if (notLock){
removeResource();
}
}
return !notLock;
}
@Override
public boolean unLock() {
if (getCount()>1){
deCount();
return true;
}
return removeResource();
}
private boolean removeResource(){
try {
String lockPath = getLockPath();
THREAD_LOCAL.remove();
if (!StringUtils.isEmpty(lockPath) && curatorFramework.checkExists().forPath(lockPath)!=null){
curatorFramework.delete().forPath(lockPath);
}
} catch (Exception e) {
e.printStackTrace();
//todo: 如果删除锁失败了,那么要记录日志,同时报警,进行人工干预
return false;
}
return true;
}
private boolean lockResource() throws LockException {
//判断是否重入了
if (isLock()){
incCount();
return true;
}
String lockPath = getLockPath();
String basePath = getBasePath();
int currentNumber = CommonUtil.getNumber(lockPath);
List<String> childrenPath;
try {
childrenPath = curatorFramework.getChildren().forPath(basePath);
} catch (Exception e) {
e.printStackTrace();
throw new LockException("获取锁列表失败");
}
// 获取所有节点的最小节点数字
int minNumber = CommonUtil.getMin(childrenPath);
//如果相等,那么它就是最小的,获得锁
if (currentNumber == minNumber){
System.out.println("lock thread: " + Thread.currentThread().getId() +" , lock number: " + currentNumber);
setLock();
incCount();
return true;
}
return false;
}
@Data
public static class LockInfo{
private String source;
private String lockPath;
private int count;
private boolean lock = false;
}
}
具体源码地址: github 点击
zookeeper分布式锁的实现方式