分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下面的例子
上面的设计是存在线程安全问题
问题
解决方法
用锁把 2、3、4 步锁住,让他们执行完之后,另一个线程才能进来执行。
公司业务发展迅速,系统应对并发不断提高,解决方案是要增加一台机器,结果会出现更大的问题
假设有两个下单请求同时到来,分别由两个机器执行,那么这两个请求是可以同时执行了,依然存在超卖的问题。
因为如图所示系统是运行在两个不同的 JVM 里面,不同的机器上,增加的锁只对自己当前 JVM 里面的线程有效,对于其他 JVM 的线程是无效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同一个锁,此时分布式锁就能解决该问题。
分布式锁的作用:在整个系统提供一个全局、唯一的锁,在分布式系统中每个系统在进行相关操作的时候需要获取到该锁,才能执行相应操作。
利用Zookeeper可以创建临时带序号节点的特性来实现一个分布式锁
实现思路
流程图
- //zk实现分布式锁
- public class DisLockTest {
- public static void main(String[] args) {
- //使用10个线程模拟分布式环境
- for (int i = 0; i < 10; i++) {
- new Thread(new DisLockRunnable()).start();//启动线程
- }
- }
-
- static class DisLockRunnable implements Runnable {
-
- public void run() {
- //每个线程具体的任务,每个线程就是抢锁,
- final DisClient client = new DisClient();
- client.getDisLock();
-
- //模拟获取锁之后的其它动作
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //释放锁
- client.deleteLock();
- }
- }
- }
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
-
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- //抢锁
- //1. 去zk创建临时序列节点,并获取到序号
- //2. 判断自己创建节点序号是否是当前节点最小序号,如果是则获取锁
- //执行相关操作,最后要释放锁
- //3. 不是最小节点,当前线程需要等待,等待你的前一个序号的节点
- //被删除,然后再次判断自己是否是最小节点。。。
- public class DisClient {
-
- public DisClient() {
- //初始化zk的/distrilocl节点,会出现线程安全问题
- synchronized (DisClient.class) {
- if (!zkClient.exists("/distrilock")) {
- zkClient.createPersistent("/distrilock");
- }
- }
-
- }
-
- //前一个节点
- String beforNodePath;
-
- String currentNoePath;
- //获取到zkClient
- private ZkClient zkClient = new ZkClient("linux121:2181,linux122:2181");
- //把抢锁过程为量部分,一部分是创建节点,比较序号,另一部分是等待锁
-
- //完整获取锁方法
- public void getDisLock() {
- //获取到当前线程名称
- final String threadName = Thread.currentThread().getName();
- //首先调用tryGetLock
- if (tryGetLock()) {
- //说明获取到锁
- System.out.println(threadName + ":获取到了锁");
- } else {
- // 没有获取到锁,
- System.out.println(threadName + ":获取锁失败,进入等待状态");
- waitForLock();
- //递归获取锁
- getDisLock();
- }
-
- }
-
- CountDownLatch countDownLatch = null;
-
- //尝试获取锁
- public boolean tryGetLock() {
- //创建临时顺序节点,/distrilock/序号
- if (null == currentNoePath || "".equals(currentNoePath)) {
- currentNoePath = zkClient.createEphemeralSequential("/distrilock/", "lock");
- }
- //获取到/distrilock下所有的子节点
- final List
childs = zkClient.getChildren("/distrilock"); - //对节点信息进行排序
- Collections.sort(childs); //默认是升序
- final String minNode = childs.get(0);
- //判断自己创建节点是否与最小序号一致
- if (currentNoePath.equals("/distrilock/" + minNode)) {
- //说明当前线程创建的就是序号最小节点
- return true;
- } else {
- //说明最小节点不是自己创建,要监控自己当前节点序号前一个的节点
- final int i = Collections.binarySearch(childs, currentNoePath.substring("/distrilock/".length()));
- //前一个(lastNodeChild是不包括父节点)
- String lastNodeChild = childs.get(i - 1);
- beforNodePath = "/distrilock/" + lastNodeChild;
- }
-
- return false;
- }
-
- //等待之前节点释放锁,如何判断锁被释放,需要唤醒线程继续尝试tryGetLock
- public void waitForLock() {
-
- //准备一个监听器
- final IZkDataListener iZkDataListener = new IZkDataListener() {
-
- public void handleDataChange(String s, Object o) throws Exception {
-
- }
-
- //删除
- public void handleDataDeleted(String s) throws Exception {
- //提醒当前线程再次获取锁
- countDownLatch.countDown();//把值减1变为0,唤醒之前await线程
- }
- };
- //监控前一个节点
- zkClient.subscribeDataChanges(beforNodePath, iZkDataListener);
-
- //在监听的通知没来之前,该线程应该是等待状态,先判断一次上一个节点是否还存在
- if (zkClient.exists(beforNodePath)) {
- //开始等待,CountDownLatch:线程同步计数器
- countDownLatch = new CountDownLatch(1);
- try {
- countDownLatch.await();//阻塞,countDownLatch值变为0
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- //解除监听
- zkClient.unsubscribeDataChanges(beforNodePath, iZkDataListener);
- }
-
-
- //释放锁
- public void deleteLock() {
- if (zkClient != null) {
- zkClient.delete(currentNoePath);
- zkClient.close();
- }
- }
- }
注意:
分布式锁的实现可以是 Redis、Zookeeper,相对来说生产环境如果使用分布式锁可以考虑使用Redis实现而非Zk。