• Redis分布式锁


    1. 什么是分布式锁

    分布式锁指的是,所有服务中的所有线程都去获得同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,等到获得锁的线程释放掉锁之后获得了锁才能进行操作
    Redis官网中,set key value有个带有NX参数的命令,这是一个原子性加锁的命令,指的是此key没有被lock时,当前线程才能加锁,如果已经被其他线程占用,就不能加锁。

    2. Redisson实现Redis分布式锁的底层原理

    在这里插入图片描述

    2.1 添加依赖

    <dependency>
        <groupId>org.redissongroupId>
        <artifactId>redisson-spring-boot-starterartifactId>
        <version>3.12.4version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2 测试用例

    库存数量100,调用一次减1,小于等于0的时候返回false,表示下单失败。

    @Component
    public class RedissonLock {
        private static Integer inventory = 100;
        /**
         * 测试
         *
         * @return true:下单成功 false:下单失败
         */
        public Boolean redisLockTest(){
            // 获取锁实例
            RLock inventoryLock = RedissonService.getRLock("inventory-number");
            try {
                // 加锁
                inventoryLock.lock();
                if (inventory <= 0){
                    return false;
                }
                inventory--;
                System.out.println("线程名称:" + Thread.currentThread().getName() + "剩余数量:" + RedissonLock.inventory);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                // 释放锁
                inventoryLock.unlock();
            }
            return true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.3 获取锁的实例

    RLock inventoryLock = RedissonService.getRLock("inventory-number");
    
    • 1

    这段就是获取锁的实例,inventory-number为指定锁名称,进去getLock(String name)方法之后就能看到获取锁的实例就是在RedissonLock构造方法中,初始化一些属性。

    public RLock getLock(String name) {
    	return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
    }
    
    • 1
    • 2
    • 3

    RedissonLock的构造函数:

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
            super(commandExecutor, name);
            //命令执行器
            this.commandExecutor = commandExecutor;
            //UUID字符串(MasterSlaveConnectionManager类的构造函数 传入UUID)
            this.id = commandExecutor.getConnectionManager().getId();
            //内部锁过期时间(防止死锁,默认时间为30s)
            this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
            //uuid+传进来的锁名称
            this.entryName = this.id + ":" + name;
            //redis消息体
            this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.4 加锁

    inventoryLock.lock();
    这段代码表示加锁,一步一步进去源码里面看看,进来首先看到如下lock()方法:

    public void lock() {
        try {
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到这里设置了一些默认值,然后继续调用了带参lock()方法,也是在这里,完成了加锁的逻辑,源码如下:

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            // 线程ID
            long threadId = Thread.currentThread().getId();
            // 尝试获取锁
            Long ttl = this.tryAcquire(leaseTime, unit, threadId);
            // 如果过期时间等于null,则表示获取到锁,直接返回,不等于null继续往下执行
            if (ttl != null) {
                // 如果获取锁失败,则订阅到对应这个锁的channel
                RFuture<RedissonLockEntry> future = this.subscribe(threadId);
                if (interruptibly) {
                    // 可中断订阅
                    this.commandExecutor.syncSubscriptionInterrupted(future);
                } else {
                    // 不可中断订阅
                    this.commandExecutor.syncSubscription(future);
                }
                try {
                    // 不断循环
                    while(true) {
                        // 再次尝试获取锁
                        ttl = this.tryAcquire(leaseTime, unit, threadId);
                        // ttl(过期时间)为空,说明成功获取锁,返回
                        if (ttl == null) {
                            return;
                        }
                        // ttl(过期时间)大于0 则等待ttl时间后继续尝试获取
                        if (ttl >= 0L) {
                            try {
                                ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            } catch (InterruptedException var13) {
                                if (interruptibly) {
                                    throw var13;
                                }
                                ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            }
                        } else if (interruptibly) {
                            ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                        } else {
                            ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                        }
                    }
                } finally {
                    // 取消对channel的订阅
                    this.unsubscribe(future, threadId);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    再来看下获取锁的tryAcquire方法:

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
            return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
    }
    
    • 1
    • 2
    • 3

    进去看下tryAcquireAsync方法:

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
            // 有设置过期时间
            if (leaseTime != -1L) {
                return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            } else {
                // 没有设置过期时间
                RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
                ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                    if (e == null) {
                        if (ttlRemaining == null) {
                            this.scheduleExpirationRenewal(threadId);
                        }
                    }
                });
                return ttlRemainingFuture;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    tryLockInnerAsync方法是真正执行获取锁的逻辑,它是一段LUA脚本代码。在这里,它使用的是hash数据结构。

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            this.internalLockLeaseTime = unit.toMillis(leaseTime);
            return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
            // 如果锁不存在,则通过hset设置它的值,并设置过期时间
            "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; 
            // 如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1(这里显示了redis分布式锁的可重入性)
            if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; 
            // 如果锁已存在,但并非本线程,则返回过期时间ttl
            return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    KEYS[1]代表的是你加锁的那个key,比如说:RLock inventoryLock = RedissonService.getRLock("inventory-number");这里你自己设置了加锁的那个锁key就是"inventory-number"。
    ARGV[1]代表的就是锁key的默认生存时间,上面也截图看了,默认时间为30秒。
    ARGV[2]代表的是加锁的客户端的ID,类似于后面这样: 8743c9c0-0795-4907-87fd-6c719a6b4586:1
    
    • 1
    • 2
    • 3

    上面这段LUA代码看起来也不是很复杂,其中有三个判断:

    • 通过exists判断锁存不存在,如果锁不存在,则设置值和过期时间,加锁成功。
    • 通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功,ARGV[2]的value+1,原来是1,现在变为2,当然,释放的时候也要释放两次。
    • 如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败

    2.5 解锁

    inventoryLock.unlock();
    这段代码表示解锁,跟刚才一样,一步一步进去源码里面看看,进来首先看到如下unlock()方法:

    public void unlock() {
            try {
                this.get(this.unlockAsync(Thread.currentThread().getId()));
            } catch (RedisException var2) {
                if (var2.getCause() instanceof IllegalMonitorStateException) {
                    throw (IllegalMonitorStateException)var2.getCause();
                } else {
                    throw var2;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    进去unlockAsync()查看,这是解锁的方法:

    public RFuture<Void> unlockAsync(long threadId) {
            RPromise<Void> result = new RedissonPromise();
            // 释放锁的方法
            RFuture<Boolean> future = this.unlockInnerAsync(threadId);
            // 添加监听器 解锁opStatus:返回值
            future.onComplete((opStatus, e) -> {
                this.cancelExpirationRenewal(threadId);
                if (e != null) {
                    result.tryFailure(e);
                //如果返回null,则证明解锁的线程和当前锁不是同一个线程,抛出异常
                } else if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                } else {
                    // 解锁成功
                    result.trySuccess((Object)null);
                }
            });
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    再进去看下释放锁的方法:unlockInnerAsync():

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
            // 如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; 
            // 如果是同一个线程,就通过hincrby减1的方式,释放一次锁
            local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
            // 若剩余次数大于0 ,则刷新过期时间
            if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
            // 其他就证明锁已经释放,删除key并发布锁释放的消息
            else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; 
            return nil;", 
            Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    上述代码是释放锁的逻辑。同样的,它也是有三个判断:

    • 如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常。
    • 如果解锁的线程和当前锁的线程是同一个,就通过hincrby减1的方式,释放一次锁。若剩余次数还大于0,则证明是重入锁,再次刷新过期时间。
    • 锁已不存在,通过publish发布锁释放的消息,解锁成功

    3. 其他方式实现分布式锁

    redis 分布式锁有什么缺陷?

    搬运
    redis分布式锁的缺点(zk分布式锁与redis分布式锁优缺点)
    面试必问:如何实现Redis分布式锁
    redis实现分布式锁的原理
    Redis分布式锁的实现以及原理

  • 相关阅读:
    12-Java 继承&&抽象类&&代码块(详解~)
    图书管理小练习
    并查集(UnionFind)总结
    马克思主义哲学原理
    RustDay05------Exercise[41-50]
    浅述量子计算
    基于AM5708开发板——开箱初探+环境搭建、源码编译
    Matlab 用法
    【Libevent】Libevent特征和事件集创建
    ES6 数组转为对象 ,以及 find 在数组里面找到对应一条数据
  • 原文地址:https://blog.csdn.net/Cool_Pepsi/article/details/127836313