• redis 分布式锁


    redis数据类型和使用场景

    1.在redis中的命令语句中,命令是忽略大小写的,而key是不忽略大小写的。

    • 字符串
    • Hash类型
    • List
    • Set
    • SortedSet(zset)

    string 类型 set key value

    get key

    hash类型 hset

    set 类型 sadd

    zset 是排序set集合

    分布式锁

    高并发情况下 setnx

    1.场景: 在并发情况下 可能有线程1 把线程2的锁进行释放

    解决: 每个线程 在lockkey 的value 设置自己的单独的value 和设置过期时间

    2.在时间过期后 任务还未执行完 如何处理

    luan 脚本 eval

    加锁 解锁

    重入锁的逻辑

    redisson 是非公平锁

    线程1抢到锁之后 会有监听器来监听执行 timetask 执行续命

    线程2 会间隙性去判断是是否可以抢锁

    双写 不一致

    mvcc

    分段锁

    读写锁

    redisson 的实现方式

    eq:

        @RequestMapping("/testStock")
        public String testStock() {
            String lockKey = "lock:product_101";
            //获取锁对象
            RLock redissonLock = redisson.getLock(lockKey);
            //加分布式锁
            redissonLock.lock();  //  .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
            try {
                // 业务逻辑
            } finally {
                redissonLock.unlock();
            }
            return "end";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    1.获取锁对象

    // name 是 lockkey 
    public RLock getLock(String name) {
        // 构造一个  RedissonLock 对象  internalLockLeaseTime 为锁租用时间 30s 
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.加锁

    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            // todo 为啥会有打断标志
            Thread.currentThread().interrupt();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
       // 第一步 : leaseTime  -1 unit null
       @Override
        public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            // 这个方法 是 线程去持有锁 
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired  获得的 
            if (ttl == null) {
                return;
            }
    		// 线程2走到这里来了  redisson_lock__channel  subscribe 订阅 
            // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            commandExecutor.syncSubscription(future);
    
            try {
                while (true) {
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        break;
                    }
    
                    // waiting for message
                    // 这个方法是 客户端2 等待ttl 秒 再去看锁是否被持有 todo  future的用法
                    if (ttl >= 0) {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                unsubscribe(future, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    //  第一步 :  leaseTime  -1 unit null
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
             // 由于第一次进来 所以不走这个方法
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // getLockWatchdogTimeout 默认时间为30s
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
         // 持有锁的 线程 加了一个Listener   
        // 我的理解是Future异步任务 异步任务完成的时候 会调用Listener回调方法 
        // future.isSuccess() 任务完成 future.getNow() 返回值(callback)  
        // scheduleExpirationRenewal(threadId); 这个是主要的看门狗方法
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
    
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    // leaseTime 为 30s  unit 为 秒 单位 command为eval 
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
    	
        //   KEYS[1] 为 lockkeyName  ARGV[1] 为 id:线程id  ARGV[2]: 持有时间 默认30s
        //   exists  判断key是否存在 不存在 设置值 和 时间 
        //   hexists 判断key val 是否一致 一致的话   当前线程 值 加 1
        //   pttl key 以毫秒为单位返回 key 的剩余生存时间 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回        -1 。
        //   否则,以毫秒为单位,返回 key 的剩余生存时间。     
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; 	" +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    第一个请求去加锁过程中 会持有一个Listener 这个 是 看门狗的实现原理

    只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

    // expirationRenewalMap 到期续约map 这个是ConcurrentMap  key id:name  Timeout 时间
    /*
    * hexists 查看哈希表 key 中,给定域 field 是否存在
    * pexpire 续时 30s
    * TimerTask 定时任务 每隔10s运行一次
    *
    */
    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }
    
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    // expirationRenewalMap 直接 移出 key 
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    	//  putIfAbsent  如何key值不存在,则插入map,返回Null。如果key值存在,不替换旧值,且返回旧的value。
        // 所以下面的情况是 首次进来没有key 所以 map set key 
        // 如果 value发生了变化 定时任务取消 
        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3.解锁

     public void unlock() {
            Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
            if (opStatus == null) {
                throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + Thread.currentThread().getId());
            }
            if (opStatus) {
                cancelExpirationRenewal();
            }
    
    //        Future future = unlockAsync();
    //        future.awaitUninterruptibly();
    //        if (future.isSuccess()) {
    //            return;
    //        }
    //        if (future.cause() instanceof IllegalMonitorStateException) {
    //            throw (IllegalMonitorStateException)future.cause();
    //        }
    //        throw commandExecutor.convertException(future);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    // 这里是把锁给删除 
    //  重入锁有减1 
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    void cancelExpirationRenewal() {
        // expirationRenewalMap 移出key  定时任务取消 
        Timeout task = expirationRenewalMap.remove(getEntryName());
        if (task != null) {
            task.cancel();
        }
    }
    ``
    
    ### redis 的 stream 
    
    1.生产者
    
    redis 的多线程
    
    redis为什么这么快 1.内存 2.单线程 3.非阻塞式io 4.rest
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
  • 相关阅读:
    el-menu-item使用自定义图标、使用图片做图标
    9.吴恩达深度学习--机器翻译
    java 数组
    【数据库08】数据库物理存储系统的选型与管理
    centOS7| 编译安装 gdal 库
    秋招面经第十弹:字节跳动二面-大数据开发工程师(电商)
    java计算机毕业设计郑工社团交流服务信息平台源码+mysql数据库+系统+lw文档+部署
    tda4vm mcu1_0应用程序开发系列之ADC采样
    从 HPC 到 AI:探索文件系统的发展及性能评估
    旷视研究院获得第一届DanceTrack挑战赛冠军
  • 原文地址:https://blog.csdn.net/qq_28852755/article/details/126168084