• Redisson入坑篇


    基于redisson3.5.4

    概览

    是什么

    一个基于Java实现,提供操作Redis的客户端,其他客户端:https://redis.io/docs/clients/

    Jedis vs redisson
    Jedis:

    • redis基础操作(Map、Set、List、Queue、Deque、ScoredSortedSet、Publish/Subscribe、 BitSet…)

    redisson:

    • redis基础操作、PriorityQueue、 DelayedQueue、BloomFilter、RateLimite…增加分布式锁和同步器
    • 基于JDK提供的Lock、Semaphore、 CountDownLatch、FairLock、MultiLockReadWriteLock实现多种锁

    怎么用

    5种配置模式

    • Single node(单节点模式,如果是用阿里云或其他redis集群服务,一般只对外暴露一个地址,此时作为单节点处理即可)
    • Master with slave nodes
    • Sentinel nodes
    • Clustered nodes
    • Replicated nodes
    @Configuration
    public class RedisConfig {
     
        @Bean
        public RedissonClient redissonClient() throws IOException {
     
            // useSingleServer – for single node instance
            // useMasterSlaveServers – for master with slave nodes
            // useSentinelServers – for sentinel nodes.
            // useClusterServers – for clustered nodes.
            // useReplicatedServers – for replicated nodes.
            // addNodeAddress(xxx).addNodeAddress(xxx)
            Config config = new Config();
            config.setCodec(new StringCodec());
            // 30 * 1000
    //        config.setLockWatchdogTimeout();
            config.useSingleServer()
                    .setAddress("")
                    .setPassword("")
                    .setConnectionPoolSize(64)
                    .setIdleConnectionTimeout(10000)
                    .setConnectTimeout(3000)
                    .setTimeout(3000)
                    .setPingTimeout(30000)
                    .setReconnectionTimeout(3000)
                    .setDatabase(5);
            return Redisson.create(config);
     
     
        }
    }
     
        @Test
        public void operationTest() {
            // 字符串操作
            RBucket<Object> rBucket = redissonClient.getBucket("easicare:redisson:str");
            rBucket.set("120",5000, TimeUnit.SECONDS);
     
            // map操作
            RMap<String, String> map = redissonClient.getMap("easicare:redisson:map");
            map.put("id1", "119");
     
            // set操作
            RSet<String> rSet = redissonClient.getSet("easicare:redisson:set");
            rSet.add("idx2");
            rSet.readAll();
     
            // list操作
            RList<String> rList = redissonClient.getList("easicare:redisson:list");
            rList.add("idx4");
            System.out.println(rList.readAll());
     
            // 队列操作
            RQueue<String> rQueue = redissonClient.getQueue("easicare:redisson:queue");
            rQueue.add("idx5");
            System.out.println(rQueue.readAll());
     
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    为什么

    谈到Redisson总会提到看门口,这里也是说说自己的想法

    场景:
    用redis setnx实现分布式锁,会有哪些问题:

    • 获取锁后,应用崩了,一直占着锁?(设置过期时间)
    • 过期时间内,业务还没处理完成怎么办?(守护线程)
    • A线程获取锁,B线程可以去释放

    redisson的 watch dog

    • 从 lockInterruptibly 方法入手

      • 先尝试tryAcquire获取锁,返回结果为null,表示锁被占用中,return
      • 后边的 while (true)逻辑也很好理解,开启订阅,并尝试续期
    • 再看看 tryAcquire 方法

      • 通过get等待tryAcquireAsync返回结果
      • tryAcquireAsync中,先判断入参leaseTime 是否为-1,只有-1的情况下才会尝试走下面的续期逻辑,即看门狗了

        注意:指定锁定时间时,watch dog机制失效

      • 当我们指定锁时间是,redission会尊重我们的选择,按业务定义的时间执行,即leaseTime不会为-1
      • 续期luna脚本中KEYS[1], ARGV[2]的具体参数从evalWriteAsync方法的keys和params参数参数,KEYS[1]代表着keys参数的第一个
      • 接着添加监听器,定时执行续期任务,周期是 lockWatchdogTimeout/3时间
    • 源码

    @Override  
    public void lockInterruptibly() throws InterruptedException {  
        lockInterruptibly(-1, null);  
    }
     
     public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {  
            long threadId = Thread.currentThread().getId();  
            Long ttl = tryAcquire(leaseTime, unit, threadId);  
            // lock acquired  
            if (ttl == null) {  
                return;  
            }  
       
            RFuture<RedissonLockEntry> future = subscribe(threadId);  
            commandExecutor.syncSubscription(future);  
       
            try {  
                while (true) {  
                    ttl = tryAcquire(leaseTime, unit, threadId);  
                    // lock acquired  
                    if (ttl == null) {  
                        break;  
                    }  
       
                    // waiting for message  
                    if (ttl >= 0) {  
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
                    } else {  
                        getEntry(threadId).getLatch().acquire();  
                    }  
                }  
            } finally {  
                unsubscribe(future, threadId);  
            }  
    //        get(lockAsync(leaseTime, unit));  
        }
     
    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {  
        return get(tryAcquireAsync(leaseTime, unit, threadId));  
    }
     
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {  
        if (leaseTime != -1) {  
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  
        }  
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);  
        ttlRemainingFuture.addListener(new FutureListener<Long>() {  
            @Override  
            public void operationComplete(Future<Long> future) throws Exception {  
                if (!future.isSuccess()) {  
                    return;  
                }  
       
                Long ttlRemaining = future.getNow();  
                // lock acquired  
                if (ttlRemaining == null) {  
                    // 续期
                    scheduleExpirationRenewal(threadId);  
                }  
            }  
        });  
        return ttlRemainingFuture;  
    }
     
     
    private void scheduleExpirationRenewal(final long threadId) {
        // 如果续期任务里已经存在, 直接返回
        if (expirationRenewalMap.containsKey(getEntryName())) {  
            return;  
        }  
        // 开启定时任务
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {  
            @Override  
            public void run(Timeout timeout) throws Exception {  
                  // lua 脚本执行续期, 返回结果true or false
                RFuture<Boolean> future = commandExecutor.evalWriteAsync
                (getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
                            "return 1; " +  
                        "end; " +  
                        "return 0;",  
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  
                   
                future.addListener(new FutureListener<Boolean>() {  
                    @Override  
                    public void operationComplete(Future<Boolean> future) throws Exception {  
                    // 先移除任务, 避免读到上次留下的任务(脏数据)
                        expirationRenewalMap.remove(getEntryName());  
                        if (!future.isSuccess()) {  
                            log.error("Can't update lock " + getName() + " expiration", future.cause());  
                            return;  
                        }  
                        // 续期成功, 执行下次任务  
                        if (future.getNow()) {  
                            // reschedule itself  
                            scheduleExpirationRenewal(threadId);  
                        }  
                    }  
                });  
            }  
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);  
          // 续期结果为false且任务列表里没有了,取消此次定时任务
        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {  
            task.cancel();  
        }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
  • 相关阅读:
    error LNK2038: mismatch detected for ‘RuntimeLibrary
    算法DAY66
    C++预编译、编译、链接
    图与图的深度优先遍历、广度优先遍历
    上交所行情文件解析之mktdt00.txt
    基础算法训练(五)折半插入排序
    使用Flink的各种技术实现WordCount逻辑
    Springboot 使用升级小记-MVC path
    springMvc27-get乱码解决
    计算机毕业设计 基于Web的视频及游戏管理平台的设计与实现 Java实战项目 附源码+文档+视频讲解
  • 原文地址:https://blog.csdn.net/legendaryhaha/article/details/126807521