• 分布式锁的实现【转载】


    一、前言

    经常遇到这样的情况,项目中需要使用分布式锁,发现没有完整的产品可以使用,就临时写个分布式锁,花费许多时间和精力来开发和测试一个简单的分布式锁,实现部分功能就使用了。后面遇到这种情况,又去重新开发和测试,重复建设。

    这里分享一个分布式锁的产品,实现完整功能,接入简单,方便业务方使用,大家如果自己研发分布式锁工具可以参考,但由于是内部项目,地址也访问不到仅供参考

    【接入和使用】

    1. 工程名称:gtool
    2. 工程地址:http://gitlab.alibaba-inc.com/global-share-group/gtool
    3. 目前可以使用的是,xlock-tair,xlock-redis

    【使用方法】

    步骤1,接入pom依赖,获得jar包。

    只接入需要的pom依赖,不需要的就不用接了。隐式依赖 slf4j-api-1.7.5 。
    如下的pom,就是可用的最新版本。

    • tair 锁的pom。tair 只能使用ldb 。
    • redis 锁的pom。redis 依赖 jedis-2.6.0 及以上。
     <!-- tair lock -->
        <dependency>
            <groupId>com.alibaba.gtool</groupId>
            <artifactId>xlock-tair</artifactId>
            <version>1.2.1</version>
        </dependency>
        <!-- redis lock -->
        <dependency>
            <groupId>com.alibaba.gtool</groupId>
            <artifactId>xlock-redis</artifactId>
            <version>1.2.1</version>
        </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    步骤2,配置锁管理器。

    • tair 锁管理器(通过 incr 实现)
    • tair 锁管理器(通过 version 实现)
    • redis 锁管理器
      //tair 锁管理器(通过 incr 实现)
      TairLockManagerByIncr lockManager = new TairLockManagerByIncr();
      lockManager.setTairManager(tairManager);//参数
      lockManager.setTairNamespace(tairNamespace);//参数
      lockManager.setEnableLogWarn(true);//是否使用logWarn输出。默认false
      //tair 锁管理器(通过 version 实现)
      TairLockManagerByVersion lockManager = new TairLockManagerByVersion();
      lockManager.setTairManager(tairManager);//参数
      lockManager.setTairNamespace(tairNamespace);//参数
      lockManager.setEnableLogWarn(true);//是否使用logWarn输出。默认false
      //redis 锁管理器
      RedisLockManager lockManager = new RedisLockManager();
      lockManager.setJedisPool(jedisPool);//JedisPool是线程安全的。
      lockManager.setJedisDBIndex(0);//redis的库。默认0
      lockManager.setEnableLogWarn(true);//是否使用logWarn输出。默认false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    步骤3,由锁管理器创建锁。

     	//由锁管理器创建锁
     	XLock lock = lockManager.createLock("lock_taobao_user_1200000000");
    
    • 1
    • 2

    步骤4,配置监听器。(可省略)

    建议至少要配置事件 LOSE_LOCK(被动丢失锁) 的监听器。

    	//这里举个例子。实际可以配置多个不同的事件监听器。
    	YourBizListener listener = new YourBizListener(XLockEventEnum.LOSE_LOCK);
     	lock.addEventListener(listener);
    
    • 1
    • 2
    • 3

    步骤5,多线程使用锁。

    加锁,解锁。为了安全,建议使用 try-catch-finally 结构。

    	boolean lockResult = false;
    	try {
     		/**
    		 * 尝试一次加锁。不阻塞,不等待。
             * 加锁成功返回true,否则返回false。
             */
             lockResult = lock.tryLock();
             if (lockResult) {
             	//do something
                Thread.sleep(2000L);
             }
    	} catch (Exception e) {
             logger.error("ex", e);
        } finally {
        	if (lockResult) {
            	/**
                 * 解锁
                 */
                lock.unLock();
            }
         }
         try {
         	/**
             * 加锁。
             * 加锁成功,返回true。
             * 否则,一直等待,直到加锁成功。
             */
           	lock.lock();
    		//do something
     		Thread.sleep(2000L);
         } catch (Exception e) {
    	    logger.error("ex", e);
         } finally {
         	/**
             * 解锁
             */
            lock.unLock();
         }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    二. 架构和接口

    screenshot.png
    在这里插入图片描述

    主要的接口和类

    1. 顶层接口

    【分布式锁】

    • XLock 分布式锁。指定锁的基础功能。
    • XLockManager 分布式锁的管理器
    • XLockEventEnum 分布式锁的事件。
    • XLockEventListener 分布式锁的事件监听器。

    【原子操作型锁】

    • AtomicLock atomic 锁对象。
    • AtomicLockManager atomic 锁管理器
    • AtomicLockRemainTask 心跳法,让一个线程一直持有一个锁。
    • AtomicLockCheckFreeTask 检查锁是否空闲。如果空闲,就可以尝试加锁。
    • AtomicLockFireEventTask 锁,触发某事件

    【Tair锁】

    • AbstractTairLockManager tair 锁管理器
    • TairLockManagerByVersion tair 锁管理器(通过 version 实现)
    • TairLockManagerByIncr tair 锁管理器(通过 incr 实现)

    【Redis锁】

    • RedisLockManager redis 锁管理器

    2.代码和功能

    【分布式锁的管理器】

    /**
     * 分布式锁的管理器
     *
     * @author gumao
     * @since 2016-11-21
     */
    public interface XLockManager<A extends XLock> {
        /**
         * 创建锁对象。
         */
        public A createLock(String lockName);
    
        /**
         * 移除锁对象。由使用方自己调用,谨慎使用。
         */
        public boolean removeLock(A lock);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    【分布式锁,指定锁的基础功能】

    /**
     * 分布式锁。指定锁的基础功能。
     *
     * @author gumao
     * @since 2016-11-15
     */
    public interface XLock {
        /**
         * 尝试一次加锁。不阻塞,不等待。
         * 加锁成功返回true,否则返回false。
         */
        public boolean tryLock() throws Exception;
    
        /**
         * 尝试一次加锁。
         * 加锁成功,返回true。
         * 加锁失败,就等待最大时长。最终加锁成功,返回true;否则,返回false。
         */
        public boolean tryLock(long maxWaitMillis) throws Exception;
    
        /**
         * 加锁。
         * 加锁成功,返回true。
         * 否则,一直等待,直到加锁成功。
         */
        public void lock() throws Exception;
    
        /**
         * 解锁
         */
        public boolean unLock() throws Exception;
    
        /**
         * 锁的名称
         */
        public String getName();
    
        /**
         * 线程是否加锁成功
         */
        public boolean isThreadSuccess(Thread thread);
    
        /**
         * 添加一个事件监听器
         */
        public void addEventListener(XLockEventListener listener);
    
        /**
         * 删除一个事件监听器
         */
        public void removeEventListener(XLockEventListener listener);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    【分布式锁的事件】

    /**
     * 分布式锁的事件。
     *
     * @author gumao
     * @since 2016-11-20
     */
    public enum XLockEventEnum {
        LOCK_SUCCESS("加锁成功"), LOCK_FAIL("加锁失败"), UNLOCK_SUCCESS("解锁成功"), UNLOCK_FAIL("解锁失败"), LOSE_LOCK("被动丢失锁");
    
        private String name;
    
        private XLockEventEnum(String name) {
            this.name = name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    【分布式锁的事件监听器】

    /**
     * 分布式锁的事件监听器
     *
     * @author gumao
     * @since 2016-11-20
     */
    public interface XLockEventListener {
        /**
         * 事件
         */
        public XLockEventEnum getEvent();
    
        /**
         * 触发监听器
         */
        public void fire(Thread thread, XLock xlock);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    三、Atomic 锁实现

    【原子操作型分布式锁的实现】

    • 使用外部存储器的原子操作,实现多线程加锁,只有一个成功。
    • 不同的外部存储器,有不同的底层实现类。Tair,Redis,Mysql,等。
    • 管理线程和锁的状态信息。比如,线程一直持有锁,线程丢失锁。

    1. Atomic 锁管理器

    /**
     * atomic 锁管理器
     *
     * @author gumao
     * @since 2016-11-16
     */
    public abstract class AtomicLockManager implements XLockManager<AtomicLock> {
        private static Logger logger = LoggerFactory.getLogger(AtomicLockManager.class);
        //心跳秒值
        private int heartbeatSeconds = 2;
        //会话秒值
        private int sessionSeconds = heartbeatSeconds * 10;
        //检查锁空闲的毫秒值
        private long checkLockFreeMillis = 200L;
        //锁名称和锁对象的映射
        private Map<String, AtomicLock> lockMap = new HashMap<String, AtomicLock>(20);
        //是否启用warn日志。很多应用把级别设置为warn。
        private boolean enableLogWarn = false;
    
        /**
         * 创建锁对象。
         */
        public AtomicLock createLock(String lockName) {
            synchronized (this.lockMap) {
                AtomicLock lock = this.lockMap.get(lockName);
                if (null != lock) {
                    return lock;
                }
                lock = new AtomicLock(lockName);
                lock.setAtomicLockManager(this);
                this.lockMap.put(lockName, lock);
                return lock;
            }
        }
    
        /**
         * 移除锁对象。由使用方自己调用,谨慎使用。
         */
        public boolean removeLock(AtomicLock lock) {
            synchronized (this.lockMap) {
                atomicKill(lock);
                this.lockMap.remove(lock.getName());
            }
            return true;
        }
    
        /**
         * 删除锁的处理
         */
        protected void atomicKill(AtomicLock lock) {
        }
    
        /**
         * 加锁。
         * 成功返回true,失败返回false
         */
        protected abstract boolean atomicLock(Thread thread, AtomicLock lock);
    
        /**
         * 让锁的有效期延长。
         * 返回线程是否有锁。
         */
        protected abstract boolean atomicTouch(Thread thread, AtomicLock lock);
    
        /**
         * 线程是否有锁。
         */
        protected abstract boolean atomicIsLocked(Thread thread, AtomicLock lock);
    
        /**
         * 锁是否被锁住了。
         */
        protected abstract boolean atomicIsLocked(AtomicLock lock);
    
        /**
         * 解锁
         * 成功返回true,失败返回false
         */
        protected abstract boolean atomicUnlock(Thread thread, AtomicLock lock);
    
        /**
         * 某线程尝试一次加锁。加锁成功返回true,否则返回false。
         */
        public boolean tryLock(Thread thread, AtomicLock lock) {
            boolean lockSuccess = false;
            String extra = "";
            //如果已经有锁了,就返回成功,不用重复加锁
            if (lock.isThreadSuccess(thread)) {
                lockSuccess = true;
                extra = "alreadyHaveLock . no need to lock again . ";
            } else {
                //加锁成功
                if (this.atomicLock(thread, lock)) {
                    lockSuccess = true;
                    //保持锁的任务
                    lock.startRemainTask(thread);
                    //设置线程拥有锁
                    lock.markThreadSuccess(thread);
                } else {
                    //设置线程没有锁
                    lock.markThreadFail(thread);
                }
            }
            if (this.enableLogWarn) {
                StringBuilder buf = new StringBuilder(130);
                buf.append(" >> ").append(XlockUtil.nowDatetime()).append(" tryLock");
                buf.append(" lockResult=").append(lockSuccess);
                buf.append(" thread=").append(thread.getName());
                buf.append(" lock=").append(lock.getName());
                buf.append(" extra=").append(extra);
                logger.warn(buf.toString());
            }
    
            return lockSuccess;
        }
    
        /**
         * 加锁。里面用到了线程阻塞,必须用当前线程来调用此方法。
         * maxWaitMillis <= 0 时,加锁失败会一直阻塞。直到成功。
         * maxWaitMillis > 0 时,加锁失败会最多阻塞maxWaitMillis毫秒。
         *
         * @param lock          锁
         * @param maxWaitMillis 最大等待毫秒值
         * @return 加锁成功true, 加锁失败false
         * @throws Exception
         */
        public boolean tryLock(AtomicLock lock, long maxWaitMillis) throws Exception {
            Thread thread = Thread.currentThread();
            //最终的毫秒值
            long finalMillis = System.currentTimeMillis() + maxWaitMillis;
            //--------------------------------------------------------
            //--------------------------------------------------------
            try {
                //加锁
                lock.getReentrantLock().lock();
                //把线程加入锁的等待队列
                lock.addToWaitQueue(thread);
                //----------------------------------------------------------
                //无限阻塞
                if (maxWaitMillis <= 0) {
                    while (true) {
                        //如果指定线程获得了锁,就成功
                        if (this.tryLock(thread, lock)) {
                            break;
                        }
                        //等待可以重试这个条件
                        lock.getCanTryLockCondition().await();
                    }
                    return true;
                }
                //----------------------------------------------------------
                //指定超时时间
                else {
                    boolean lockSuccess = false;
                    while (true) {
                        //如果指定线程获得了锁,就成功
                        if (this.tryLock(thread, lock)) {
                            lockSuccess = true;
                            break;
                        }
                        //可以休眠的毫秒值
                        long sleepMillis = finalMillis - System.currentTimeMillis();
                        //如果超过最大等待时间,就退出
                        if (sleepMillis <= 0) {
                            break;
                        }
                        //等待可以重试这个条件
                        lock.getCanTryLockCondition().await(sleepMillis, TimeUnit.MILLISECONDS);
                    }
                    return lockSuccess;
                }
                //----------------------------------------------------------
            } catch (Exception e) {
                logger.error("tryLock", e);
                throw e;
            } finally {
                // 把线程移出锁的等待队列
                lock.removeFromWaitQueue(thread);
                //解锁
                lock.getReentrantLock().unlock();
            }
        }
    
        /**
         * 构建日志。统一格式。
         */
        protected String buildLog(String title, Thread thread, AtomicLock lock, String extra) {
            StringBuilder buf = new StringBuilder(200);
            buf.append("\n ==============================");
            buf.append("\n ").append(title);
            buf.append("\n  now    =").append(XlockUtil.nowDatetime());
            if (null != thread) {
                buf.append("\n  thread =").append(thread.getName()).append("  ").append(thread.getId());
            }
            if (null != lock) {
                buf.append("\n  lock   =").append(lock.getName());
            }
            if (null != extra) {
                buf.append("\n  extra  =").append(extra);
            }
            buf.append("\n ==============================");
            return buf.toString();
        }
    
        /**
         * 解锁。必须是当前线程解锁。
         */
        public boolean unLock(AtomicLock lock) {
            Thread thread = Thread.currentThread();
            //如果已经没有该锁,就不能解锁
            if (!lock.isThreadSuccess(thread)) {
                String log = this.buildLog("AtomicLockManager unLock fail !", thread, lock,
                        "Current thread does not have this lock , so cannot unlock this lock .");
                logger.error(log);
                return false;
            }
            //解锁
            boolean result = this.atomicUnlock(thread, lock);
            //如果解锁成功
            if (result) {
                //设置当前线程没有锁
                lock.markThreadFail(thread);
                //停止持有锁
                lock.stopRemainTask(thread);
                //唤醒正在等待此锁的线程
                lock.signalAll();
            }
            return result;
        }
    
    
        /**
         * 保持锁。
         */
        public void remainLock(AtomicLockRemainTask task) {
            //锁对象
            AtomicLock lock = task.getAtomicLock();
            //指定的线程
            Thread thread = task.getThread();
            //----------------------------------------------------------
            // 该key是否被锁定
            boolean isLocked = this.atomicTouch(thread, lock);
    
            if (this.enableLogWarn) {
                StringBuilder buf = new StringBuilder(120);
                buf.append(" >> ").append(XlockUtil.nowDatetime()).append(" remainLock thread=").append(thread.getName()).append(" lock=").append(lock
                        .getName()).append(" isLocked=").append(isLocked);
                logger.warn(buf.toString());
            }
            //----------------------------------------------------------
            //如果指定的线程,不在加锁成功的线程队列中,就终止任务。这种情况在线程主动unlock后会发生。
            if (!lock.isThreadSuccess(thread)) {
                //终止任务
                task.getFuture().cancel(true);
    
                if (this.enableLogWarn) {
                    String log = this.buildLog("AtomicLockRemainTask is cancelled  ", thread, lock,
                            "thread not lock success anymore , this may be normal !");
                    logger.warn(log);
                }
                return;
            }
            //----------------------------------------------------------
            //如果没有锁。就尝试重新加锁。(正常情况不应该发生此事)
            if (!isLocked) {
                StringBuilder buf = new StringBuilder(100);
                //尝试重新加锁。加锁成功。
                if (this.atomicLock(thread, lock)) {
                    buf.append(" >> retry lock success ! GOOD !");
                    //加锁成功。
                }
                //加锁失败。
                else {
                    buf.append(" >> retry lock fail , then cancel task , this may be normal ! Please find reason !");
                    //标记线程加锁失败
                    lock.markThreadFail(thread);
                    //终止任务
                    task.getFuture().cancel(true);
                    //某线程被动丢失了锁
                    AtomicLockFireEventTask task2 = new AtomicLockFireEventTask(XLockEventEnum.LOSE_LOCK, thread, lock);
                    XlockUtil.getScheduler().execute(task2);
                }
    
                String log = this.buildLog("AtomicLockRemainTask  #  lose lock , so retry lock again ", thread, lock,
                        buf.toString());
                logger.error(log);
            }
        }
    
        /**
         * 创建线程的uuid,全局标识一个线程。
         * 必须全局唯一。
         */
        public String createThreadUUID(Thread thread, AtomicLock lock) {
            return UUID.randomUUID().toString();
        }
    
        /**
         * 如果锁空闲,就唤醒等待队列里面的线程,去尝试加锁
         */
        public void signalAllIfFree(AtomicLock atomicLock) {
            //锁是否被锁住了
            boolean isLocked = this.atomicIsLocked(atomicLock);
            //如果锁没有被锁,可以尝试加锁
            if (!isLocked) {
                //唤醒正在等待此锁的线程
                atomicLock.signalAll();
                //------------------------------------------------
                if (this.enableLogWarn) {
                    StringBuilder buf = new StringBuilder(120);
                    buf.append(" >> ").append(XlockUtil.nowDatetime()).append(" AtomicLockCheckFreeTask ").append(" lock=").append(atomicLock
                            .getName()).append(" isLocked=").append(isLocked).append(" . So signalAll to try lock !");
                    logger.warn(buf.toString());
                }
            }
        }
    
        //--------------------------------------------------
    
        public int getHeartbeatSeconds() {
            return heartbeatSeconds;
        }
    
        public void setHeartbeatSeconds(int heartbeatSeconds) {
            this.heartbeatSeconds = heartbeatSeconds;
        }
    
        public int getSessionSeconds() {
            return sessionSeconds;
        }
    
        public void setSessionSeconds(int sessionSeconds) {
            this.sessionSeconds = sessionSeconds;
        }
    
        public boolean isEnableLogWarn() {
            return enableLogWarn;
        }
    
        public void setEnableLogWarn(boolean enableLogWarn) {
            this.enableLogWarn = enableLogWarn;
        }
    
        public long getCheckLockFreeMillis() {
            return checkLockFreeMillis;
        }
    
        public void setCheckLockFreeMillis(long checkLockFreeMillis) {
            this.checkLockFreeMillis = checkLockFreeMillis;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351

    2. Atomic 锁对象

    /**
     * atomic 锁对象。
     *
     * @author gumao
     * @since 2016-11-15
     */
    public class AtomicLock implements XLock {
        private static Logger logger = LoggerFactory.getLogger(AtomicLock.class);
        //名称
        private String name;
        //锁管理器
        private AtomicLockManager atomicLockManager;
        //加锁成功的线程
        private Set<Thread> successThreads = new HashSet<Thread>(4);
        //等待队列
        private Set<Thread> waitQueue = new HashSet<Thread>(10);
        //并发锁
        private Lock reentrantLock = new ReentrantLock();
        //可以重试加锁的条件
        private Condition canTryLockCondition = reentrantLock.newCondition();
        //校验锁是否空闲
        private AtomicLockCheckFreeTask checkFreeTask = new AtomicLockCheckFreeTask();
        //事件监听器
        private Map<XLockEventEnum, Set<XLockEventListener>> eventListenerMap = new ConcurrentHashMap<XLockEventEnum, Set<XLockEventListener>>(8);
        //线程保持锁的任务
        private Map<Thread, AtomicLockRemainTask> remainTaskMap = new HashMap<Thread, AtomicLockRemainTask>();
        //线程的唯一键
        private Map<Thread, String> threadUUIDMap = new HashMap<Thread, String>();
        //线程的扩展数据
        private Map<Thread, Object> threadExtendMap = new HashMap<Thread, Object>();
    
        //--------------------------------------------------
    
        public AtomicLock(String name) {
            String regexp = "^[\\w\\d\\._\\-\\+/,;:#=]*$";
            if (!name.matches(regexp)) {
                throw new IllegalArgumentException("AtomicLock name must match regexp " + regexp);
            }
            this.name = name;
    
            init();
        }
    
        /**
         * 某些初始化
         */
        private void init() {
            for (XLockEventEnum event : XLockEventEnum.values()) {
                eventListenerMap.put(event, new HashSet<XLockEventListener>(4));
            }
        }
    
        /**
         * 添加一个事件监听器
         */
        public void addEventListener(XLockEventListener listener) {
            Set<XLockEventListener> listeners = eventListenerMap.get(listener.getEvent());
            synchronized (listeners) {
                listeners.add(listener);
            }
        }
    
        /**
         * 删除一个事件监听器
         */
        public void removeEventListener(XLockEventListener listener) {
            Set<XLockEventListener> listeners = eventListenerMap.get(listener.getEvent());
            synchronized (listeners) {
                listeners.remove(listener);
            }
        }
    
        /**
         * 触发某事件
         */
        public void fireEvent(XLockEventEnum event, Thread thread) {
            Set<XLockEventListener> listeners = eventListenerMap.get(event);
            synchronized (listeners) {
                for (XLockEventListener listener : listeners) {
                    try {
                        listener.fire(thread, this);
                    } catch (Exception e) {
                        logger.error("fireEvent", e);
                        continue;
                    }
                }
            }
        }
    
        /**
         * 把线程放入等待队列
         */
        public void addToWaitQueue(Thread thread) {
            synchronized (this.waitQueue) {
                //加入等待队列
                this.waitQueue.add(thread);
                //如果为1,表示刚开始有线程在等待,就启动任务去检测锁是否空闲
                int size = this.waitQueue.size();
                if (1 == size) {
                    //重试加锁的任务
                    checkFreeTask.setAtomicLockManager(atomicLockManager);
                    checkFreeTask.setAtomicLock(this);
                    Future taskFuture = XlockUtil.getScheduler().scheduleAtFixedRate(checkFreeTask, atomicLockManager.getCheckLockFreeMillis(),
                            atomicLockManager.getCheckLockFreeMillis(), TimeUnit.MILLISECONDS);
                    checkFreeTask.setFuture(taskFuture);
                    if (this.atomicLockManager.isEnableLogWarn()) {
                        logger.warn(" >> " + XlockUtil.nowDatetime() + " waitQueue size is 1 , so start AtomicLockCheckFreeTask ");
                    }
                }
            }
        }
    
        /**
         * 把线程移出等待队列
         */
        public void removeFromWaitQueue(Thread thread) {
            synchronized (this.waitQueue) {
                //移出等待队列
                this.waitQueue.remove(thread);
                //如果小于等于0,表示没有线程在等待,就不需要检查锁的状态
                int size = this.waitQueue.size();
                if (size <= 0) {
                    //终止任务
                    checkFreeTask.getFuture().cancel(true);
                    if (this.atomicLockManager.isEnableLogWarn()) {
                        logger.warn(" >> " + XlockUtil.nowDatetime() + " waitQueue size is 0 , so stop AtomicLockCheckFreeTask ");
                    }
                }
            }
        }
    
        /**
         * 标记线程加锁成功
         */
        public void markThreadSuccess(Thread thread) {
            synchronized (successThreads) {
                successThreads.add(thread);
            }
        }
    
        /**
         * 标记线程加锁失败
         */
        public void markThreadFail(Thread thread) {
            synchronized (successThreads) {
                successThreads.remove(thread);
            }
        }
    
        /**
         * 线程是否加锁成功
         */
        public boolean isThreadSuccess(Thread thread) {
            synchronized (successThreads) {
                return successThreads.contains(thread);
            }
        }
    
        /**
         * 触发某事件
         */
        private void fireEventTask(XLockEventEnum event, Thread thread, AtomicLock lock) {
            //如果没有监听器,就不需要触发
            Set<XLockEventListener> listeners = eventListenerMap.get(event);
            synchronized (listeners) {
                if (listeners.isEmpty()) {
                    return;
                }
            }
            //异步执行事件触发器
            AtomicLockFireEventTask task = new AtomicLockFireEventTask(event, thread, lock);
            XlockUtil.getScheduler().execute(task);
        }
    
    
        /**
         * 尝试一次加锁。不阻塞,不等待。
         * 加锁成功返回true,否则返回false。
         */
        public boolean tryLock() throws Exception {
            Thread thread = Thread.currentThread();
            boolean result = this.atomicLockManager.tryLock(thread, this);
            if (result) {
                fireEventTask(XLockEventEnum.LOCK_SUCCESS, thread, this);
            } else {
                fireEventTask(XLockEventEnum.LOCK_FAIL, thread, this);
            }
            return result;
        }
    
        /**
         * 尝试一次加锁。
         * 加锁成功,返回true。
         * 加锁失败,就等待最大时长。最终加锁成功,返回true;否则,返回false。
         */
        public boolean tryLock(long maxWaitMillis) throws Exception {
            Thread thread = Thread.currentThread();
            boolean result = this.atomicLockManager.tryLock(this, maxWaitMillis);
            if (result) {
                fireEventTask(XLockEventEnum.LOCK_SUCCESS, thread, this);
            } else {
                fireEventTask(XLockEventEnum.LOCK_FAIL, thread, this);
            }
            return result;
        }
    
        /**
         * 加锁。
         * 加锁成功,返回true。
         * 否则,一直等待,直到加锁成功。
         */
        public void lock() throws Exception {
            this.atomicLockManager.tryLock(this, 0);
            Thread thread = Thread.currentThread();
            fireEventTask(XLockEventEnum.LOCK_SUCCESS, thread, this);
        }
    
        /**
         * 解锁
         */
        public boolean unLock() throws Exception {
            Thread thread = Thread.currentThread();
            boolean result = this.atomicLockManager.unLock(this);
            if (result) {
                fireEventTask(XLockEventEnum.UNLOCK_SUCCESS, thread, this);
            } else {
                fireEventTask(XLockEventEnum.UNLOCK_FAIL, thread, this);
            }
            return result;
        }
    
        /**
         * 唤醒正在等待此锁的线程
         */
        public void signalAll() {
            try {
                this.reentrantLock.lock();
                //唤醒等待线程
                this.canTryLockCondition.signalAll();
            } catch (Exception e) {
                logger.error("signalAll", e);
            } finally {
                this.reentrantLock.unlock();
            }
        }
    
        /**
         * 启动线程保持锁
         */
        public void startRemainTask(Thread thread) {
            synchronized (this.remainTaskMap) {
                AtomicLockRemainTask task = this.remainTaskMap.get(thread);
                if (null == task) {
                    //保持锁的任务
                    task = new AtomicLockRemainTask();
                    task.setAtomicLockManager(this.atomicLockManager);
                    task.setAtomicLock(this);
                    task.setThread(thread);
                    //放入缓存
                    this.remainTaskMap.put(thread, task);
                }
                //定时器执行
                Future taskFuture = XlockUtil.getScheduler().scheduleAtFixedRate(task, 1, this.atomicLockManager.getHeartbeatSeconds(), TimeUnit.SECONDS);
                task.setFuture(taskFuture);
            }
        }
    
        /**
         * 停止线程保持锁
         */
        public void stopRemainTask(Thread thread) {
            synchronized (this.remainTaskMap) {
                AtomicLockRemainTask task = this.remainTaskMap.remove(thread);
                if (null != task) {
                    //取消任务
                    task.getFuture().cancel(true);
                }
            }
        }
    
        /**
         * 找到线程的唯一键
         */
        public String findThreadUUID(Thread thread) {
            synchronized (this.threadUUIDMap) {
                String uuid = this.threadUUIDMap.get(thread);
                if (null == uuid) {
                    //创建线程的uuid
                    uuid = this.atomicLockManager.createThreadUUID(thread, this);
                    //放入缓存
                    this.threadUUIDMap.put(thread, uuid);
                }
                return uuid;
            }
        }
    
        /**
         * 删除线程的唯一键
         */
        public void deleteThreadUUID(Thread thread) {
            synchronized (this.threadUUIDMap) {
                this.threadUUIDMap.remove(thread);
            }
        }
    
        /**
         * 线程的扩展对象
         */
        public void setThreadExtend(Thread thread, Object obj) {
            synchronized (this.threadExtendMap) {
                this.threadExtendMap.put(thread, obj);
            }
        }
    
        /**
         * 线程的扩展对象
         */
        public Object getThreadExtend(Thread thread) {
            synchronized (this.threadExtendMap) {
                return this.threadExtendMap.get(thread);
            }
        }
    
        /**
         * 线程的扩展对象
         */
        public void deleteThreadExtend(Thread thread) {
            synchronized (this.threadExtendMap) {
                this.threadExtendMap.remove(thread);
            }
        }
    
        /**
         * 锁的名称
         */
        public String getName() {
            return name;
        }
    
        public AtomicLockManager getAtomicLockManager() {
            return atomicLockManager;
        }
    
        public void setAtomicLockManager(AtomicLockManager atomicLockManager) {
            this.atomicLockManager = atomicLockManager;
        }
    
        public Lock getReentrantLock() {
            return reentrantLock;
        }
    
        public void setReentrantLock(Lock reentrantLock) {
            this.reentrantLock = reentrantLock;
        }
    
        public Condition getCanTryLockCondition() {
            return canTryLockCondition;
        }
    
        public void setCanTryLockCondition(Condition canTryLockCondition) {
            this.canTryLockCondition = canTryLockCondition;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363

    四、Tair实现分布式锁

    Tair是蚂蚁内部的缓存工具
    
    • 1

    【注意事项】
    只能使用 ldb 。mdb可能导致不同的机房都写成功,但是值不一样。
    【Tair version 方法】

    ResultCode put(int namespace, Serializable key, Serializable value, int version, int expireTime)
    
    • 1

    初始version设置为大于1的整数,比如100 。多线程同时执行这个操作,只会有一个成功。
    加锁成功后,客户端保存线程的这个锁的version,每次touch锁,本地version加1,和Tair锁的version保持一致。version一致,才能更新成功。
    把线程的uuid写到Tair value中,则客户端和Tair锁都有同一个uuid。用uuid判断线程是否拥有这个锁。
    【Tair incr 方法】

    Result incr(int namespace, Serializable key, int value, int defaultValue, int expireTime, int lowBound, int upperBound)
    
    • 1

    value设为1,defaultValue设为0 。判断返回值,如果为1,成功;否则,失败。
    tair 锁管理器。抽象父类,保存一些公共方法。

    1. Tair锁管理器

    /**
     * tair 锁管理器
     *
     * @author gumao
     * @since 2016-11-16
     */
    public abstract class AbstractTairLockManager extends AtomicLockManager {
        private static Logger logger = LoggerFactory.getLogger(AbstractTairLockManager.class);
        //超时次数的最大值
        private static int maxTimeoutTimes = 3;
        //tair namespace
        protected int tairNamespace;
        //建议使用ldb。不能使用mdb。
        protected TairManager tairManager;
        //重试次数
        protected int tairRetryTimes = 3;
    
        /**
         * 是否超时
         */
        protected boolean isTimeout(Result result) {
            if (result == null) {
                return true;
            }
            ResultCode code = result.getRc();
            return this.isTimeout(code);
        }
    
        /**
         * 是否超时
         */
        protected boolean isTimeout(ResultCode code) {
            if (ResultCode.CONNERROR.equals(code) || ResultCode.TIMEOUT.equals(code) || ResultCode.UNKNOW.equals(code)) {
                return true;
            }
            return false;
        }
    
        /**
         * 超时发生时,是否应该继续。
         */
        protected boolean shouldTimeoutContinue(Result result, Integer timeoutCount) {
            if (this.isTimeout(result)) {
                timeoutCount++;
                if (timeoutCount > maxTimeoutTimes) {
                    String log = ">>   Tair timeoutCount beyond .  maxTimeoutTimes=" + maxTimeoutTimes + "  timeoutCount="
                            + timeoutCount + "  Result=" + result;
                    logger.error(log);
                    return false;
                }
                return true;
            }
            return false;
        }
    
        /**
         * 超时发生时,是否应该继续。
         */
        protected boolean shouldTimeoutContinue(ResultCode code, Integer timeoutCount) {
            if (this.isTimeout(code)) {
                timeoutCount++;
                if (timeoutCount > maxTimeoutTimes) {
                    String log = ">>   Tair timeoutCount beyond .  maxTimeoutTimes=" + maxTimeoutTimes + "  timeoutCount="
                            + timeoutCount + "  ResultCode=" + code;
                    logger.error(log);
                    return false;
                }
                return true;
            }
            return false;
        }
    
        /**
         * 取版本号
         */
        protected int getTairElementVersion(String key) {
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    Result<DataEntry> result = tairManager.get(tairNamespace, key);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = " Tair timeout  key=" + key + "  Result=" + result;
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != result && null != result.getValue()) {
                        DataEntry entry = result.getValue();
                        int version = entry.getVersion();
                        return version;
                    } else {
                        break;
                    }
                } catch (Exception e) {
                    logger.error("getTairElementVersion", e);
                    continue;
                }
            }
            String log = " Tair getTairElementVersion Error !  key=" + key;
            logger.error(log);
            return -1;
        }
    
        public int getTairNamespace() {
            return tairNamespace;
        }
    
        public void setTairNamespace(int tairNamespace) {
            this.tairNamespace = tairNamespace;
        }
    
        public TairManager getTairManager() {
            return tairManager;
        }
    
        public void setTairManager(TairManager tairManager) {
            this.tairManager = tairManager;
        }
    
        public int getTairRetryTimes() {
            return tairRetryTimes;
        }
    
        public void setTairRetryTimes(int tairRetryTimes) {
            this.tairRetryTimes = tairRetryTimes;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128

    2. tair 锁管理器(通过 version 实现)

    /**
     * tair 锁管理器(通过 version 实现)
     *
     * @author gumao
     * @since 2016-11-16
     */
    public class TairLockManagerByVersion extends AbstractTairLockManager {
        private static Logger logger = LoggerFactory.getLogger(TairLockManagerByVersion.class);
        //除0和1之外的正整数。version初始值是0,写一次变成1,所以要排除这2个。
        private static int VERSION_LOCK = 40000;
    
        /**
         * 加锁。
         * 成功返回true,失败返回false
         */
        protected boolean atomicLock(Thread thread, AtomicLock lock) {
            //先看下是否已经有锁
            boolean lockSuccess = this.atomicIsLocked(thread, lock);
            //如果已经有锁,就直接返回成功
            if (lockSuccess) {
                return true;
            }
            //---------------------------------------------------
            String lockName = lock.getName();
    
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    //线程的uuid
                    String uuid = lock.findThreadUUID(thread);
                    String val = uuid + "__" + XlockUtil.nowDatetime();
                    //用version put。当key不存在,此操作才会成功。
                    //把uuid作为锁值,方便匹配线程。
                    ResultCode ret = tairManager.put(tairNamespace, lockName, val, VERSION_LOCK, this.getSessionSeconds());
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(ret)) {
                        String log = this.buildLog(" Tair timeout ", thread, lock, "  " + ret);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != ret && ret.isSuccess()) {
                        //加锁成功
                        lockSuccess = true;
                    } else {
                        //因为超时的原因,有可能tair已经加锁成功,但是返回值是失败,所以这里多校验一次。
                        lockSuccess = this.atomicIsLocked(thread, lock);
                    }
                    //---------------------------------------------------
                    //如果加锁成功
                    if (lockSuccess) {
                        //key被更新了一次,当前version是1
                        AtomicInteger version = new AtomicInteger(1);
                        //保存
                        lock.setThreadExtend(thread, version);
                        return true;
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicLock", e);
                    continue;
                }
            }
            return false;
        }
    
        /**
         * 让锁的有效期延长。
         * 返回线程是否有锁。
         */
        protected boolean atomicTouch(Thread thread, AtomicLock lock) {
            //线程是否有锁
            boolean hasLock2 = this.atomicIsLocked(thread, lock);
            //没有锁,就不能操作。
            if (!hasLock2) {
                return false;
            }
            //---------------------------------------------------
            String lockName = lock.getName();
            //线程对应的版本
            AtomicInteger version = (AtomicInteger) lock.getThreadExtend(thread);
            //没有version,表示已经主动解锁了
            if (null == version) {
                return false;
            }
    
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    //线程的uuid
                    String uuid = lock.findThreadUUID(thread);
                    String val = uuid + "__" + XlockUtil.nowDatetime();
                    //用version put。version匹配成功,才能写成功
                    //把uuid作为锁值,方便匹配线程。
    //                if(version.get()%1000==0){
    //                    logger.error(" localVersion="+version.get());
    //                }
                    ResultCode ret = tairManager.put(tairNamespace, lockName, val, version.get(), this.getSessionSeconds());
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(ret)) {
                        String log = this.buildLog(" Tair timeout ", thread, lock, "  " + ret);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != ret && ret.isSuccess()) {
                        //版本加1
                        version.incrementAndGet();
                        return true;
                    } else {
                        //1、超时的原因,有可能tair已经更新成功,但是返回值是失败,所以这里多校验一次。
                        //2、tair version最大值是65534,之后会变成1 。所以重新设置version。
                        //线程是否有锁
                        boolean hasLock = this.atomicIsLocked(thread, lock);
                        if (hasLock) {
                            //取远程版本号
                            int remoteVersion = this.getTairElementVersion(lockName);
                            String log = this.buildLog(" Reset local version ", thread, lock,
                                    "  localVersion=" + version.get() + "   remoteVersion=" + remoteVersion);
                            logger.error(log);
                            //更新版本号
                            version.set(remoteVersion);
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicIsLocked", e);
                    continue;
                }
            }
            return false;
        }
    
        /**
         * 线程是否有锁。
         */
        protected boolean atomicIsLocked(Thread thread, AtomicLock lock) {
            String lockName = lock.getName();
    
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", thread, lock, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != result && null != result.getValue()) {
                        DataEntry entry = result.getValue();
                        String val = (String) entry.getValue();
    
                        //线程的uuid
                        String uuid = lock.findThreadUUID(thread);
                        //线程的uuid,匹配锁值。
                        if (null != val && val.startsWith(uuid)) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicIsLocked", e);
                    continue;
                }
            }
            return false;
        }
    
        /**
         * 锁是否被锁住了。
         */
        protected boolean atomicIsLocked(AtomicLock lock) {
            String lockName = lock.getName();
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", null, lock, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != result && null != result.getValue()) {
                        DataEntry entry = result.getValue();
                        String val = (String) entry.getValue();
                        //有值,就是锁被加了
                        if (null != val && val.length() > 0) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicIsLocked", e);
                    continue;
                }
            }
            return false;
        }
    
        /**
         * 解锁
         * 成功返回true,失败返回false
         */
        protected boolean atomicUnlock(Thread thread, AtomicLock lock) {
            String lockName = lock.getName();
            boolean unlockSuccess = false;
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    //先看线程是否有锁
                    boolean hasLock = this.atomicIsLocked(thread, lock);
                    //如果线程没有锁,就不能继续
                    if (!hasLock) {
                        return false;
                    }
                    //----------------------------------------------------------
                    //删除锁
                    ResultCode result = tairManager.invalid(this.tairNamespace, lockName);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", thread, lock, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (result != null && result.isSuccess()) {
                        unlockSuccess = true;
                    } else {
                        //因为超时的原因,有可能tair已经操作成功,但是返回值是失败,所以这里多校验一次。
                        //线程是否有锁
                        boolean hasLock2 = this.atomicIsLocked(thread, lock);
                        // 如果线程已经没有锁了,表示解锁成功了。
                        if (!hasLock2) {
                            unlockSuccess = true;
                        }
                    }
                    //---------------------------------------------------
                    //如果解锁成功
                    if (unlockSuccess) {
                        //删除线程的uuid。下次加锁的时候,可以重新生成uuid。
                        lock.deleteThreadUUID(thread);
                        //删除线程对应的锁version。下次加锁的时候,可以重新生成version。
                        lock.deleteThreadExtend(thread);
                        return true;
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicUnlock", e);
                    continue;
                }
            }
            return false;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261

    3. tair 锁管理器(通过 incr 实现)

    /**
     * tair 锁管理器(通过 incr 实现)
     *
     * @author gumao
     * @since 2016-11-16
     */
    public class TairLockManagerByIncr extends AbstractTairLockManager {
        private static Logger logger = LoggerFactory.getLogger(TairLockManagerByIncr.class);
        private static int INCR_DEFAULT = 0;
        private static int LOCK_INCR_MIN = INCR_DEFAULT;
        private static int LOCK_INCR_MAX = 5;
    
    
        /**
         * 加锁。
         * 成功返回true,失败返回false
         */
        protected boolean atomicLock(Thread thread, AtomicLock lock) {
            //原子加1
            Integer val = this.tairIncr(lock.getName(), 1);
            //从0到1,表示加锁成功。其他,表示失败。
            if (1 == val) {
                return true;
            }
            return false;
        }
    
        /**
         * 让锁的有效期延长。
         * 返回线程是否有锁。
         */
        protected boolean atomicTouch(Thread thread, AtomicLock lock) {
            //加0。
            Integer val = this.tairIncr(lock.getName(), 0);
            //大于0,表示有锁
            if (val > 0) {
                return true;
            }
            return false;
        }
    
        /**
         * 线程是否有锁。
         */
        protected boolean atomicIsLocked(Thread thread, AtomicLock lock) {
            String lockName = lock.getName();
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", thread, lock, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != result && null != result.getValue()) {
                        DataEntry entry = result.getValue();
                        Integer val = (Integer) entry.getValue();
                        if (null != val && val > 0) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicIsLocked", e);
                    continue;
                }
            }
            return false;
        }
    
        /**
         * 锁是否被锁住了。
         */
        protected boolean atomicIsLocked(AtomicLock lock) {
            String lockName = lock.getName();
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", null, lock, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (null != result && null != result.getValue()) {
                        DataEntry entry = result.getValue();
                        Integer val = (Integer) entry.getValue();
                        //值为正,就是锁被加了
                        if (null != val && val > 0) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicIsLocked", e);
                    continue;
                }
            }
            return false;
        }
    
        /**
         * 解锁
         * 成功返回true,失败返回false
         */
        protected boolean atomicUnlock(Thread thread, AtomicLock lock) {
            String lockName = lock.getName();
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    //先看线程是否有锁
                    boolean hasLock = this.atomicIsLocked(thread, lock);
                    //如果线程没有锁,就失败
                    if (!hasLock) {
                        return false;
                    }
                    //----------------------------------------------------------
                    //删除锁
                    ResultCode result = tairManager.invalid(this.tairNamespace, lockName);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", thread, lock, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (result != null && result.isSuccess()) {
                        return true;
                    }
                    return false;
                } catch (Exception e) {
                    logger.error("atomicUnlock", e);
                    continue;
                }
            }
            return false;
        }
    
        private Integer tairIncr(String key, int value) {
            for (int i = 1; i <= tairRetryTimes; ++i) {
                try {
                    //原子加数
                    Result<Integer> result = this.tairManager.incr(this.tairNamespace, key, value, INCR_DEFAULT, this.getSessionSeconds(),
                            LOCK_INCR_MIN, LOCK_INCR_MAX);
                    //---------------------------------------------------
                    //判断超时
                    if (this.isTimeout(result)) {
                        String log = this.buildLog(" Tair timeout ", null, null, "  " + result);
                        logger.error(log);
                        continue;
                    }
                    //---------------------------------------------------
                    if (result != null && result.isSuccess()) {
                        return result.getValue();
                    }
                    return INCR_DEFAULT;
                } catch (Exception e) {
                    logger.error("tairManager # incr", e);
                    continue;
                }
            }
            return INCR_DEFAULT;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170

    五.Redis实现分布式锁

    【 Redis set 方法】

    public String set(final String key, final String value, final String nxxx, final String expx, final long time)
    
    • 1

    String nxxx, 使用这个参数。值如下。
    NX – Only set the key if it does not already exist. 不存在时,操作才会成功。
    XX – Only set the key if it already exist. 存在时,操作才会成功。
    把线程的uuid写到Redis value中,则客户端和Redis锁都有同一个uuid。用uuid判断线程是否拥有这个锁。
    依赖jedis jar版本,比这个版本大,应该也可以。当然也可以考虑用redission替换jedis实现一个版本

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.6.0</version>
        </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1. redis 锁管理器

    /**
     * redis 锁管理器
     *
     * @author gumao
     * @since 2016-11-16
     */
    public class RedisLockManager extends AtomicLockManager {
        private static Logger logger = LoggerFactory.getLogger(RedisLockManager.class);
        private static String REDIS_OK = "OK";
        //redis 客户端池,线程安全
        private JedisPool jedisPool;
        //redis 库
        private int jedisDBIndex = 0;
        //重试次数
        private int redisRetryTimes = 3;
    
        //状态是成功
        private static boolean isRedisSuccess(String statusCode) {
            if (null != statusCode && REDIS_OK.equals(statusCode)) {
                return true;
            }
            return false;
        }
    
    	/**
     	 * public String set(final String key, final String value, final String nxxx, 							 final String expx, final long time)
     	 *
     	 * NX -- Only set the key if it does not already exist.
     	 * XX -- Only set the key if it already exist.
     	 *
     	 * EX = seconds
     	 * PX = milliseconds
     	 */
    
        /**
         * 加锁。
         * 成功返回true,失败返回false
         */
        protected boolean atomicLock(Thread thread, AtomicLock lock) {
            Jedis jedis = null;
            try {
                String uuid = lock.findThreadUUID(thread);
                jedis = jedisPool.getResource();
                jedis.select(this.jedisDBIndex);
                String lockName = lock.getName();
                for (int i = 1; i <= redisRetryTimes; ++i) {
                    try {
                        //NX,如果key不存在,才能set成功。
                        String val = uuid + "__" + XlockUtil.nowDatetime();
                        //更新值和过期时间
                        String ret = jedis.set(lockName, val, "NX", "EX", this.getSessionSeconds());
                        //操作是否成功
                        boolean isSucc = isRedisSuccess(ret);
                        return isSucc;
                    } catch (Exception e) {
                        logger.error("atomicLock", e);
                        continue;
                    }
                }
            } catch (Exception e) {
                logger.error("jedis", e);
            } finally {
                jedisPool.returnResource(jedis);
            }
            return false;
        }
    
        /**
         * 让锁的有效期延长。
         * 返回线程是否有锁。
         */
        protected boolean atomicTouch(Thread thread, AtomicLock lock) {
            Jedis jedis = null;
            try {
                String uuid = lock.findThreadUUID(thread);
                jedis = jedisPool.getResource();
                jedis.select(this.jedisDBIndex);
                String lockName = lock.getName();
                for (int i = 1; i <= redisRetryTimes; ++i) {
                    try {
                        //先校验当前线程是否有锁
                        boolean hasLock = this.atomicIsLocked(thread, lock);
                        //没有锁就不能后续操作
                        if (!hasLock) {
                            return false;
                        }
                        //-------------------------------------------------
                        //XX,如果key已经存在,才能set成功。
                        String val = uuid + "__" + XlockUtil.nowDatetime();
                        //更新值和过期时间
                        String ret = jedis.set(lockName, val, "XX", "EX", this.getSessionSeconds());
                        //操作是否成功
                        boolean isSucc = isRedisSuccess(ret);
                        return isSucc;
                    } catch (Exception e) {
                        logger.error(" xx", e);
                        continue;
                    }
                }
            } catch (Exception e) {
                logger.error("jedis", e);
            } finally {
                jedisPool.returnResource(jedis);
            }
            return false;
        }
    
        /**
         * 线程是否有锁。
         */
        protected boolean atomicIsLocked(Thread thread, AtomicLock lock) {
            Jedis jedis = null;
            try {
                String uuid = lock.findThreadUUID(thread);
                jedis = jedisPool.getResource();
                jedis.select(this.jedisDBIndex);
                String lockName = lock.getName();
                for (int i = 1; i <= redisRetryTimes; ++i) {
                    try {
                        //有值就表示被锁住了
                        String val = jedis.get(lockName);
                        if (null != val) {
                            //如果是线程uuid前缀,表示线程拥有锁
                            if (val.startsWith(uuid)) {
                                return true;
                            }
                        }
                        return false;
                    } catch (Exception e) {
                        logger.error("atomicIsLocked", e);
                        continue;
                    }
                }
            } catch (Exception e) {
                logger.error("jedis", e);
            } finally {
                jedisPool.returnResource(jedis);
            }
            return false;
        }
    
        /**
         * 锁是否被锁住了。
         */
        protected boolean atomicIsLocked(AtomicLock lock) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.select(this.jedisDBIndex);
                String lockName = lock.getName();
                for (int i = 1; i <= redisRetryTimes; ++i) {
                    try {
                        //有值就表示被锁住了
                        String val = jedis.get(lockName);
                        //有值就表示被锁了
                        if (null != val) {
                            return true;
                        }
                        return false;
                    } catch (Exception e) {
                        logger.error("atomicIsLocked", e);
                        continue;
                    }
                }
            } catch (Exception e) {
                logger.error("jedis", e);
            } finally {
                jedisPool.returnResource(jedis);
            }
            return false;
        }
    
        /**
         * 解锁
         * 成功返回true,失败返回false
         */
        protected boolean atomicUnlock(Thread thread, AtomicLock lock) {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.select(this.jedisDBIndex);
                String lockName = lock.getName();
                for (int i = 1; i <= redisRetryTimes; ++i) {
                    try {
                        //先看线程是否有锁
                        boolean hasLock = this.atomicIsLocked(thread, lock);
                        //如果线程没有锁,就失败
                        if (!hasLock) {
                            return false;
                        }
                        //----------------------------------------------------------
                        //解锁
                        Long ret = jedis.del(lockName);
                        //如果删除成功
                        if (null != ret && 1 == ret) {
                            //删除线程的uuid
                            lock.deleteThreadUUID(thread);
                            return true;
                        }
                        return false;
                    } catch (Exception e) {
                        logger.error("atomicUnlock", e);
                        continue;
                    }
                }
            } catch (Exception e) {
                logger.error("jedis", e);
            } finally {
                jedisPool.returnResource(jedis);
            }
            return false;
        }
    
        public JedisPool getJedisPool() {
            return jedisPool;
        }
    
        public void setJedisPool(JedisPool jedisPool) {
            this.jedisPool = jedisPool;
        }
    
        public int getRedisRetryTimes() {
            return redisRetryTimes;
        }
    
        public void setRedisRetryTimes(int redisRetryTimes) {
            this.redisRetryTimes = redisRetryTimes;
        }
    
        public int getJedisDBIndex() {
            return jedisDBIndex;
        }
    
        public void setJedisDBIndex(int jedisDBIndex) {
            this.jedisDBIndex = jedisDBIndex;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237

    六、Zookeeper实现分布式锁

    未实现,待补充

    七、分布式锁其他技术要点

    注意:如下的技术要点,并不是每个版本的分布式锁都会用到。比如,zookeeper内置了连接和状态管理,就不需要调度线程池执行保持锁的任务。

    (1)最小粒度是线程。

    “线程+锁” 标识一个分布式锁单元。
    底层实现代码中,许多方法都有 thread 和 lock 参数。
    在这里插入图片描述

    (2)使用外部存储器的原子操作

    【Tair version】
    ResultCode put(int namespace, Serializable key, Serializable value, int version, int expireTime)
    初始version设置为大于1的整数,比如100 。多线程同时执行这个操作,只会有一个成功。
    【Tair incr】
    Result incr(int namespace, Serializable key, int value, int defaultValue, int expireTime, int lowBound, int upperBound)
    value设为1,defaultValue设为0 。判断返回值,如果为1,成功;否则,失败。
    【Redis set】
    public String set(final String key, final String value, final String nxxx, final String expx, final long time)
    String nxxx, 使用这个参数。值如下。
    NX – Only set the key if it does not already exist. 不存在时,操作才会成功。
    XX – Only set the key if it already exist. 存在时,操作才会成功。

    【Zookeeper】
    Zookeeper : 序列子节点创建,同步通知。(暂时略)

    【Mysql : 乐观锁更新】
    update table_lock set expire_time = aaa , version_field = bbb+1, lock_value = eee where lock_key = ccc and version_field = bbb

    (3)使用的java并发功能
    • 多线程:多线程并发操作锁。
    • 线程池:使用ScheduledExecutorService维护锁和线程的状态信息。
    • ReentrantLock和Condition:锁的线程等待队列,线程阻塞和唤醒。
    • synchronized:小而美。维护一些状态信息。
    • 并发容器ConcurrentHashMap

    【客户端的锁对象】

    一个客户端的锁对象,存在于一个进程中。
    锁对象保存许多信息。比如,线程的保持锁任务,线程的自定义uuid,线程扩展信息,线程等待队列,等。
    锁的线程等待队列,使用在等待性加锁。实现线程阻塞和唤醒,最大等待时长。

    【调度线程池】

    使用 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);

    • AtomicLockRemainTask ,主动延长锁的有效期,保持锁。
    • AtomicLockFireEventTask ,触发了事件,防止阻塞主线程。
    • AtomicLockCheckFreeTask ,检查锁是否空闲。

    【等待性加锁的控制】

    等待性加锁: tryLock(long maxWaitMillis) , lock()
    ReentrantLock和Condition。一个锁创建一个"可以尝试加锁的条件",canTryLockCondition。
    await和signalAll。线程等待性加锁,失败就放入锁的等待队列,并且await。某线程解锁或检测到锁可用,就signalAll,唤醒等待队列,都尝试加锁。

    (4)线程保持锁

    zookeeper,用tcp连接,心跳,会话。

    其他,通过 “定时任务+心跳+会话” 来实现。会话时长,就是锁的有效期,是心跳时长*N 。加锁成功后,创建保持锁的定时任务,间隔心跳时长来执行,重置锁的会话时长。如果机器挂掉,或网络故障,锁超过有效期,就失效,然后丢失锁。

    (5)事件和事件监听器

    自定义事件枚举XLockEventEnum,和监听器XLockEventListener。
    需要注意的是事件LOSE_LOCK(“被动丢失锁”)。被动指,不是主动的unlock,但是丢失了锁。比如网络断了30秒,锁已经失效,然后被别的机器上的线程加锁成功,之前的线程就被动丢失了锁。业务方根据需要加点逻辑处理,比如打印日志,终止业务流程,尝试重新加锁。

    (6)其他

    【超时策略】

    • 重试
    • 打日志

    【封装性和易用性】
    内部逻辑还是比较复杂的。上层提供接口,方便替换不同的实现方式。
    提供给外部的功能方法,少而简单,易用性好。

    【测试】
    功能点比较多,测试起来不容易。
    目前主要依靠自己测试。
    缺少复杂业务场景,需要更多检验。
    logback的pom配置有点繁琐。如下是一个logback配置。

       <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.0.13</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.13</version>
        </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    如下是一点测试

    >> 2016-11-24 10:52:14,298 waitQueue size is 1 , so start AtomicLockCheckFreeTask 
    >> 2016-11-24 10:52:14,386 tryLock lockResult=true thread=TXX_1 lock=lockf extra=
    >> 2016-11-24 10:52:14,386 waitQueue size is 0 , so stop AtomicLockCheckFreeTask 
    >> 2016-11-24 10:52:14,388 waitQueue size is 1 , so start AtomicLockCheckFreeTask 
    ### listener 2016-11-24 10:52:14,394 LOCK_SUCCESS  TXX_1  lockf
    >> 2016-11-24 10:52:14,419 tryLock lockResult=false thread=TXX_2 lock=lockf extra=
    >> 2016-11-24 10:52:14,459 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
    >> 2016-11-24 10:52:15,397 remainLock thread=TXX_1 lock=lockf isLocked=true
    ### listener 2016-11-24 10:52:16,409 UNLOCK_SUCCESS  TXX_1  lockf
    >> 2016-11-24 10:52:16,422 tryLock lockResult=true thread=TXX_2 lock=lockf extra=
    ### listener 2016-11-24 10:52:16,422 LOCK_SUCCESS  TXX_2  lockf
    >> 2016-11-24 10:52:16,439 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
    >> 2016-11-24 10:52:17,461 remainLock thread=TXX_2 lock=lockf isLocked=true
    ### listener 2016-11-24 10:52:18,475 UNLOCK_SUCCESS  TXX_2  lockf
    >> 2016-11-24 10:52:18,527 tryLock lockResult=true thread=TXX_3 lock=lockf extra=
    >> 2016-11-24 10:52:18,527 waitQueue size is 0 , so stop AtomicLockCheckFreeTask 
    ### listener 2016-11-24 10:52:18,528 LOCK_SUCCESS  TXX_3  lockf
    >> 2016-11-24 10:52:19,546 remainLock thread=TXX_3 lock=lockf isLocked=true
    ### listener 2016-11-24 10:52:20,562 UNLOCK_SUCCESS  TXX_3  lockf
    >> 2016-11-24 10:52:26,414 waitQueue size is 1 , so start AtomicLockCheckFreeTask 
    >> 2016-11-24 10:52:26,428 tryLock lockResult=true thread=TXX_1 lock=lockf extra=
    >> 2016-11-24 10:52:26,428 waitQueue size is 0 , so stop AtomicLockCheckFreeTask 
    ### listener 2016-11-24 10:52:26,429 LOCK_SUCCESS  TXX_1  lockf
    >> 2016-11-24 10:52:27,445 remainLock thread=TXX_1 lock=lockf isLocked=true
    >> 2016-11-24 10:52:28,480 waitQueue size is 1 , so start AtomicLockCheckFreeTask 
    >> 2016-11-24 10:52:28,516 tryLock lockResult=false thread=TXX_2 lock=lockf extra=
    >> 2016-11-24 10:52:29,482 remainLock thread=TXX_1 lock=lockf isLocked=true
    >> 2016-11-24 10:52:30,596 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
    >> 2016-11-24 10:52:31,444 remainLock thread=TXX_1 lock=lockf isLocked=true
    >> 2016-11-24 10:52:31,907 AtomicLockCheckFreeTask  lock=lockf isLocked=false . So signalAll to try lock !
    >> 2016-11-24 10:52:31,926 tryLock lockResult=true thread=TXX_2 lock=lockf extra=
    ### listener 2016-11-24 10:52:31,926 LOCK_SUCCESS  TXX_2  lockf
    >> 2016-11-24 10:52:31,943 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
    >> 2016-11-24 10:52:32,939 remainLock thread=TXX_2 lock=lockf isLocked=true
    >> 2016-11-24 10:52:33,438 remainLock thread=TXX_1 lock=lockf isLocked=false
    ### listener 2016-11-24 10:52:33,456 LOSE_LOCK  TXX_1  lockf
    
    ==============================
    AtomicLockRemainTask  #  lose lock , so retry lock again 
    now    =2016-11-24 10:52:33,456
    thread =TXX_1  24
     lock   =lockf
    extra  = >> retry lock fail , then cancel task , this may be normal ! Please find reason !
    ==============================
    >> 2016-11-24 10:52:34,940 remainLock thread=TXX_2 lock=lockf isLocked=true
    >> 2016-11-24 10:52:36,941 remainLock thread=TXX_2 lock=lockf isLocked=true
    >> 2016-11-24 10:52:38,938 remainLock thread=TXX_2 lock=lockf isLocked=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    七、三种方案实现分析

    针对分布式锁的实现,目前比较常用的有以下几种方案:

    • 基于数据库实现分布式锁
    • 基于缓存(redis,memcached,tair)实现分布式锁
    • 基于Zookeeper实现分布式锁

    1. 分布式锁的需求

    在分析这几种实现方案之前我们先来想一下,我们需要的分布式锁应该是怎么样的?(这里以方法锁为例,资源锁同理)

    • 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。

    • 这把锁要是一把可重入锁(避免死锁)

    • 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)

    • 有高可用的获取锁和释放锁功能

    • 获取锁和释放锁的性能要好

    2. 基于数据库实现分布式锁

    【基于数据库表】
    要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。

    当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。

    创建这样一张数据库表:

    CREATE TABLE `methodLock` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
      `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    当我们想要锁住某个方法时,执行以下SQL:

    	insert into methodLock(method_name,desc) values (‘method_name’,desc)
    
    • 1

    因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

    当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

    delete from methodLock where method_name ='method_name'
    
    • 1

    上面这种简单的实现有以下几个问题:

    1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

    2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。

    3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。

    4、这把锁是不可重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

    当然,我们也可以有其他方式解决上面的问题。

    • 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
    • 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
    • 非阻塞的?搞一个while循环,直到insert成功再返回成功。
    • 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

    【基于数据库排他锁】
    除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。
    我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。
    基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:

    public boolean lock(){
        connection.setAutoCommit(false)
        while(true){
            try{
                result = select * from methodLock where method_name=xxx for update;
                if(result==null){
                    return true;
                }
            }catch(Exception e){
    
            }
            sleep(1000);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。(这里再多提一句,InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上)

    我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:

    public void unlock(){
        connection.commit();
    }
    
    • 1
    • 2
    • 3

    通过connection.commit()操作来释放锁。

    这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。

    • 阻塞锁? for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
    • 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。
      但是还是无法直接解决数据库单点和可重入问题。

    这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。

    还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。

    【数据库锁方案总结】
    总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。

    • 使用数据库实现分布式锁的优点
      直接借助数据库,容易理解。
    • 使用数据库实现分布式锁的缺点
      a. 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。
      b. 操作数据库需要一定的开销,性能问题需要考虑。
      c. 使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候。

    3. 基于缓存实现分布式锁

    相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点,而且很多缓存是可以集群部署的,可以解决单点问题。

    目前有很多成熟的缓存产品,包括Redis,memcached以及阿里内部的Tair。

    这里以Tair为例来分析下使用缓存实现分布式锁的方案。关于Redis和memcached在网络上有很多相关的文章,并且也有一些成熟的框架及算法可以直接使用。

    基于Tair的实现分布式锁在内网中有很多相关文章,其中主要的实现方式是使用TairManager.put方法来实现。

    public boolean trylock(String key) {
        ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, 0);
        if (ResultCode.SUCCESS.equals(code))
            return true;
        else
            return false;
    }
    public boolean unlock(String key) {
        ldbTairManager.invalid(NAMESPACE, key);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    以上实现方式同样存在几个问题:

    1. 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在tair中,其他线程无法再获得到锁。
    2. 这把锁只能是非阻塞的,无论成功还是失败都直接返回。
    3. 这把锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在tair中已经存在。无法再执行put操作。

    当然,同样有方式可以解决。

    • 没有失效时间?tair的put方法支持传入失效时间,到达时间之后数据会自动删除。
    • 非阻塞?while重复执行。
    • 非可重入?在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者。
      但是,失效时间我设置多长时间为好?如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。这个问题使用数据库实现分布式锁同样存在

    【缓存分布式锁方案总结】

    可以使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如Tair的put方法,redis的setnx方法等。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。

    • 使用缓存实现分布式锁的优点
      性能好,实现起来较为方便。

    • 使用缓存实现分布式锁的缺点
      通过超时时间来控制锁的失效时间并不是十分的靠谱。

    4. 基于Zookeeper实现分布式锁

    基于zookeeper临时有序节点可以实现的分布式锁。

    大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。
    判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
    当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。

    来看下Zookeeper能不能解决前面提到的问题。

    • 锁无法释放?使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
    • 非阻塞锁?使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
    • 不可重入?使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
    • 单点问题?使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。

    可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。

    public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
            return interProcessMutex.acquire(timeout, unit);
    }
    public void unlock() throws Exception {
            interProcessMutex.release();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。

    使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。

    其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)

    【Zookeeper实现分布式锁方案总结】

    • 使用Zookeeper实现分布式锁的优点
      有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
    • 使用Zookeeper实现分布式锁的缺点
      a.性能上不如使用缓存实现分布式锁。
      b.需要对ZK的原理有所了解。

    三种方案的比较

    上面几种方式,哪种方式都无法做到完美。就像CAP一样,在复杂性、可靠性、性能等方面无法同时满足,所以,根据不同的应用场景选择最适合自己的才是王道。

    • 从理解的难易程度角度(从低到高)
      数据库 > 缓存 > Zookeeper

    • 从实现的复杂性角度(从低到高)
      Zookeeper >= 缓存 > 数据库

    • 从性能角度(从高到低)
      缓存 > Zookeeper >= 数据库

    • 从可靠性角度(从高到低)
      Zookeeper > 缓存 > 数据库

  • 相关阅读:
    Ribbon
    Elasticsearch-05-Elasticsearch-sql组件史上最全详解
    静态Vxlan隧道同子网互访实验
    .net core/5/6/7中WPF如何优雅的开始开发
    【批处理DOS-CMD命令-汇总和小结】-添加注释命令(rem或::)
    打卡系统有什么用?如何通过日常管理系统提高企业员工的效率?
    三季度现货白银基本面分析
    child_process exec 不是内部或外部命令,也不是可运行的程序或批处理文件。
    16 JavaScript学习: 类型转换
    Elasticsearch:使用 huggingface 模型的 NLP 文本搜索
  • 原文地址:https://blog.csdn.net/weixin_38002497/article/details/106288257