前提:centos102、centos103、centos104 服务器都已经开启
pom.xml 依赖
junit
junit
4.13.2
org.apache.logging.log4j
log4j-core
2.17.1
org.apache.zookeeper
zookeeper
3.5.7
org.junit.jupiter
junit-jupiter
RELEASE
compile
log4j.properties 配置
# 设置全局的日志记录级别为 INFO
log4j.rootLogger=INFO, stdout
# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
# 文件输出
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
zkClient.java 代码
// 注意:逗号后面不能有空格
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
// 创建客户端
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {
String nodeCreated = zkClient.create("/frost", "cat".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
运行创建子节点,看看是否创建了该节点

@Test
public void getChildren() throws InterruptedException, KeeperException {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
}


那如果此时我再创建一个节点,此时控制台没有任何变化,我想要创建一个节点控制台能够看到相关变化怎么办?此时只需要将程序保持不结束,然后将客户端查看子节点函数放入监听器中。
@Test
public void exit() throws InterruptedException, KeeperException {
Stat stat = zkClient.exists("/frost", false);
System.out.println(stat == null ? "not exits" : "exits");
}



以下红色字体写错,应该是下线则通知注册监听器的客户端

对于ZooKeeper集群来说,客户端和服务器都相当于客户端,区别在于:服务器在ZooKeeper集群中是创建节点,客户端在ZooKeeper是监听信息。

服务器注册到zk集群
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer server = new DistributeServer();
// 1. 获取zk连接
server.getConnect();
// 2. 注册服务器到 zk 集群
server.regist(args[0]);
// 3. 启动业务逻辑(睡觉)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void regist(String hostname) throws InterruptedException, KeeperException {
String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 临时带序号的节点
System.out.println(hostname + "is online");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
客户端进行监听
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient client = new DistributeClient();
// 1. 获取zk连接
client.getConnect();
// 2. 监听/servers下子节点的增加和删除
client.getServerList();
// 3. 启动业务逻辑(睡觉)
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws InterruptedException, KeeperException {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
getServerList();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
启动客户端,然后在服务器上进行增加节点监听

删除节点监听

因为我们服务端的代码传参了,所以我们需要设置一下这个参数:

下图代表服务端启动的是hadoop102节点

先把客户端启动起来,发现有一个节点hadoop101:

在启动服务端,hadoop102上线:

然后返回看客户端的监听,发现节点有变化,打印出所有节点 [hadoop102, hadoop101]

此时我们修改一下再此启动服务端让 hadoop103 上线:

返回客户端查看发现 hadoop102 下线,hadoop103 上线

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private final String connectString = "centos102:2181,centos103:2181,centos104:2181";
private final int sessionTimeout = 2000;
ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
// 前一个节点
private String waitPath;
// 当前节点
String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 1. 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// connectLatch,如果连接上zk,可以释放
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// waitLatch,需要释放
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
// 等待zk正常连接后往下走
connectLatch.await();
// 2. 判断根节点/lock是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
// 创建根节点(永久节点)
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 对 zk 加锁
public void zkLock() {
// 创建对应的临时带序号的节点
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判断创建的节点是否是最小的序号节点,如果是,获取到锁;如果不是,监听前一个节点
List<String> children = zk.getChildren("/locks", false);
// 如果 children 只有一个节点,直接获取锁;如果有多个节点,需要判断,谁最小
if (children.size() == 1) {
return;
}else {
// 排序
Collections.sort(children);
// 获取节点名称seq00000001
String thisNode = currentMode.substring("/locks/".length());
// 通过seq00000001获取该节点在children当中的位置
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
}else if (index == 0) {
// 该节点为第一个,获取锁直接返回
return;
}else {
// 不是第一个,监听前一个节点
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, null);
// 等待监听结束
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁
public void unZkLock() throws InterruptedException, KeeperException {
// 删除节点
zk.delete(currentMode, -1);
}
}
测试
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
final DistributedLock lock1 = new DistributedLock();
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程1启动,获取到锁");
Thread.sleep(5000);
lock1.unZkLock();
System.out.println("线程1释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程2启动,获取到锁");
Thread.sleep(5000);
lock2.unZkLock();
System.out.println("线程2释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}).start();
}
}

原生JAVA API出现的问题:
(1)会话是异步的,需要自己去连接
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高
(4)不支持多节点的删除和创建,需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生JAVA API开发分布式遇到的的问题
curator 官方文档:https://curator.apache.org/index.html
pom.xml 文件添加依赖
org.apache.curator
curator-framework
4.3.0
org.apache.curator
curator-recipes
4.3.0
org.apache.curator
curator-client
4.3.0
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程1获取到锁");
lock1.acquire();
System.out.println("线程1获取到锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1释放锁");
lock1.release();
System.out.println("线程1再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程2获取到锁");
lock2.acquire();
System.out.println("线程2获取到锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2释放锁");
lock2.release();
System.out.println("线程2再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("centos102:2181,centos103:2181,centos104:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
// 启动客户端
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}
