虽然知道zk实现的分布式锁的原理,但是有些细节不自己写下代码确实容易遗漏(纸上得来终觉浅).
总体上来讲,zk实现的分布式锁比redis会简单,主要是
EPHEMERAL_SEQUENTIAL
)特性,解决了锁的队列.Watcher
特性提供了节点变更通知的能力zk除了基本的目录存储服务外,还提供了独特的EPHEMERAL_SEQUENTIAL
节点,Watcher
观察机制,前者可以保证节点的创建顺序并能在客户端断开连接时自动删除,可充当有序队列作用,后者提供了目录或者节点变更时的异步通知机制.
借助这2点特性,使我们能方便的实现分布式锁.
与jdk自带的可重入类似,差别在于唤醒行为是需要借助zk的watcher实现,而jdk的唤醒是由当前持锁对象主动通知.
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author Lion Zhou
* @date 2022/9/14
*/
public class MyReentrantLockOnZk implements Lock {
// 资源名占用节点
final String source;
// 重入计数器
AtomicInteger counter = new AtomicInteger(0);
// 记录当前持有锁的线程
AtomicLong curThreadId = new AtomicLong(-1);
// 保存锁节点路径
ConcurrentHashMap<Long, String> lockMap = new ConcurrentHashMap<>(8);
// 同步锁
final Object sync = new Object();
public MyReentrantLockOnZk(String source) {
this.source = "/" + source;
init();
}
public void init() {
boolean exists = ZookeeperClient.exists(source, null);
if (!exists) {
String node = ZookeeperClient.createNode(source, "");
}
}
@Override
public void lock() {
// 重入,减少访问zk
if (curThreadId.get() == Thread.currentThread().getId()) {
counter.incrementAndGet();
return;
}
String path = source + "/";
String node = ZookeeperClient.createEphemeralNode(path, counter.get() + "");
if (null == node) {
// 此处加锁失败应该进行自旋重试
return;
}
int seqNo = Integer.parseInt(node.replace(path, "")) - 1;
String lastPath = genNodePath(path, seqNo);
// System.out.println(node);
// 直接在检查前驱节点时添加 watch
boolean exists = ZookeeperClient.exists(lastPath, new CheckWatcher(this));
if (exists) {
try {
System.out.println(node + " wait " + lastPath);
// 堵塞当前线程,等待 watch通知
synchronized (this) {
this.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (; ; ) {
// 同步,避免当前持锁线程还没释放就被替换
synchronized (sync) {
if (curThreadId.compareAndSet(-1, Thread.currentThread().getId())) {
counter.incrementAndGet();
setLockPath(node);
break;
}
}
}
System.out.println("lock " + node);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
if (curThreadId.get() == Thread.currentThread().getId()) {
counter.decrementAndGet();
if (counter.get() <= 0) {
boolean b = ZookeeperClient.deleteEphemeralNode(getLockPath(), 0);
synchronized (sync) {
counter.set(0);
curThreadId.set(-1);
System.out.println("delete ephemeral node: " + getLockPath() + " is " + b);
setLockPath(null);
}
}
} else {
setLockPath(null);
}
}
@Override
public Condition newCondition() {
return null;
}
/**
* 构造符合格式的临时节点
*
* @param no
* @return
*/
public String genNodePath(String prefix, int no) {
StringBuilder sb = new StringBuilder();
sb.append(no);
int append = 10 - sb.length();
for (int i = 0; i < append; i++) {
sb.insert(0, '0');
}
sb.insert(0, prefix);
return sb.toString();
}
private void setLockPath(String path) {
if (null == path) {
lockMap.remove(Thread.currentThread().getId());
} else {
lockMap.put(Thread.currentThread().getId(), path);
}
}
public String getLockPath() {
return lockMap.get(Thread.currentThread().getId());
}
static class CheckWatcher implements Watcher {
final Object lock;
public CheckWatcher(Object lock) {
this.lock = lock;
}
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() + " -> " + event.getPath());
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
// 当收到监听节点删除事件时,通知下一个等待锁对象
synchronized (lock) {
lock.notify();
}
}
}
}
public static void main(String[] args) {
MyReentrantLockOnZk myReentrantLockOnZk = new MyReentrantLockOnZk("lock");
int threadSize = 5;
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
for (int k = 0; k < threadSize; k++) {
executorService.submit(() -> {
for (int i = 0; i < 5; i++) {
try {
myReentrantLockOnZk.lock();
// 模拟多次加锁
myReentrantLockOnZk.lock();
System.out.println(String.format("Thread-%d get lock no-%d", Thread.currentThread().getId(), i));
// 模拟耗时
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 1000));
myReentrantLockOnZk.unlock();
myReentrantLockOnZk.unlock();
myReentrantLockOnZk.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
这是封装的便捷操作工具类
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author Lion Zhou
* @date 2022/9/14
*/
public class ZookeeperClient {
static volatile ZooKeeper zooKeeper;
static String host = "10.1.1.188:2181";
static int timeout = 3000;
public static ZooKeeper get() {
if (zooKeeper != null) {
return zooKeeper;
}
CountDownLatch downLatch = new CountDownLatch(1);
try {
zooKeeper = new ZooKeeper(host, timeout,
(WatchedEvent e) -> {
System.out.println(e.toString());
if (e.getState() == Watcher.Event.KeeperState.SyncConnected) {
downLatch.countDown();
}
}
);
downLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return zooKeeper;
}
public static String createNode(String path, String data) {
ZooKeeper zooKeeper = ZookeeperClient.get();
String s = "";
try {
s = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return s;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NODEEXISTS) {
return s;
}
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public static String createEphemeralNode(String path, String data) {
ZooKeeper zooKeeper = ZookeeperClient.get();
try {
String s = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
return s;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public static boolean deleteEphemeralNode(String path, int version) {
ZooKeeper zooKeeper = ZookeeperClient.get();
try {
zooKeeper.delete(path, version);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public static boolean exists(String path, Watcher watcher) {
ZooKeeper zooKeeper = ZookeeperClient.get();
try {
Stat exists;
if (null != watcher) {
exists = zooKeeper.exists(path, watcher);
} else {
exists = zooKeeper.exists(path, true);
}
return null != exists;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
}
这只是一个基于ZK的分布锁演示.