• SpringBoot整合Redis、Redis、Jedis,Redis使用场景示例、面试点理论


    1. Redis

    附录

    Redis面试题(2020最新版)

    Redisson

    Redisson实现分布式锁

    1.1 概述

    1. Redis优缺点

    1. Redis优点
    • Redis的数据存储内存读写速度非常优异,大概读11万/s&写8万/s。
    • Redis可以持久化,两种持久化技术RDB和AOF。
    • Redis原子性所有操作都是原子性的,也可以多个操作合并后一起执行。
    • 数据结构丰富,除了支持String类型的value外还支持 hash、set、zset、list。
    • Redis支持主从复制,主机会把数据同步到从机,可以实现读写分离。
    1. Redis缺点
    • Redis的数据全部存储在内存,因此受物理内存大小的限制。因此Redis多用于局部较小数据的高速度读取方便。
    • 主机宕机,宕机前有部分数据未能及时同步到从机,切换IP后还会引入数据不一致的问题,降低了系统的可用性。
    • Redis 不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。
    • Redis 较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。为避免这一问题,运维人员在系统上线时必须确保有足够的空间,这对资源造成了很大的浪费。

    2. Redis中间件作用

    1. 高性能(缓存)
    • 用户第一次从硬盘数据库读取数据比较慢,取出数据缓存到Redis中。第二次从Redis(内存)中取数据速度非常快。注意数据库数据更新的时候Redis缓存中的数据要同步更新**数据一致性**解决方案后面重要介绍。
    Redis缓存:
    在这里插入图片描述
    1. 高并发

    直接操作缓存能够承受的请求是远远大于直接访问数据库的,所以我们可以考虑把数据库中的部分数据转移到缓存中去,这样用户的一部分请求会直接到缓存这里而不用经过数据库。

    3. Redis为什么这么快?

    • 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于 HashMap,HashMap 的优势就是查找和操作的时间复杂度都是O(1);

    • 数据结构简单,对数据操作也简单,Redis 中的数据结构是专门进行设计的;

    • 采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;

    • 使用多路 I/O 复用模型,非阻塞 IO;

    1.2 主要数据类型

    • string是最简单的类型
    • list是一个链表结构,主要功能是push、pop、获取一个范围的所有值等等。操作中key理解为链表的名字。
    • set是集合,和我们数学中的集合概念相似,对集合的操作有添加删除元素,有对多个集合求交并差等操作。操作中key理解为集合的名字。
    • zset是set的一个升级版本,他在set的基础上增加了一个顺序属性,这一属性在添加修改元素的时候可以指定,每次指定后,zset会自动重新按新的值调整顺序。可以理解了有两列的mysql表,一列存value,一列存顺序。操作中key理解为zset的名字。
    • Hash数据类型允许用户用Redis存储对象类型,Hash数据类型的一个重要优点是,当你存储的数据对象只有很少几个key值时,数据存储的内存消耗会很小

    在这里插入图片描述

    2. SpringBoot整合Redis操作

    1. 依赖

    <dependency>
       <groupId>org.springframework.bootgroupId>
       <artifactId>spring-boot-starter-data-redisartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    2. 概述

    1. Spring Boot Data Redis提供了RedisTemplateStringRedisTemplate。StringRedisTemplate是RedisTemplate子类。
    • StringRedisTemplate:操作Key value都是字符串的数据

    • RedisTemplate:操作的Key value都可以是对象

    1. 序列化
    • Redis是把数据序列化之后保存到内存中,取出对象的时候,也会自动进行反序列化。所以对象数据要实现序列化接口

    3. StringRedisTemplate

    3.1 操作key

            //删除一个key
            Boolean res = redisTemplate.delete("name");
            //判断一个key是否存在
            Boolean res = redisTemplate.hasKey("name");
            //判断key所对应的类型
            DataType res = redisTemplate.type("set1");
            //获取所有的key
            Set<String> res = redisTemplate.keys("*");
            //获取key的超时时间 -1 永不超时 -2 key不存在
            Long res = redisTemplate.getExpire("name");
            //修改key的名字  要求key必须存在 不存在会报错(ERR no such key)
            redisTemplate.rename("age", "age1");
            //如果key存在才修改名字,否则不修改 不存在会报错(ERR no such key)
            Boolean res = redisTemplate.renameIfAbsent("age", "age1");
            //移动key到指定库
            redisTemplate.move("name", 1);
            // 重新设置Key的过期时间(前提redis存在设定过期时间的key)
            Boolean res = redisTemplate.expire("ff", 200, TimeUnit.SECONDS);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.2 opsForValue操作value为String类型

            // 新增String类型的数据
            redisTemplate.opsForValue().set("name","xiaoMing");
            // 新增ttl数据
            redisTemplate.opsForValue().set("short-message","12345",120,TimeUnit.SECONDS);
            // 不存在相同key执行
            Boolean res = redisTemplate.opsForValue().setIfAbsent("hn_1", "120", 3600, TimeUnit.SECONDS);
            // 查询指定key的value值
            String name = redisTemplate.opsForValue().get("name");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.3 opsForList操作value为List类型

     // 保存
            String[] student = {"xiaoming","xiaohong"};
            redisTemplate.opsForList().leftPushAll("student", Arrays.asList(student));
            // 查询List集合
            List<String> names = redisTemplate.opsForList().range("student", 0, -1);
            // 保留list的1-3元素,其他丢弃
            redisTemplate.opsForList().trim("student", 1, 3);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.4 opsForSet操作value为Set集合

            //创建set,并放入多个元素
            redisTemplate.opsForSet().add("sets", "zhangsan", "lisi", "wangwu");
            // 取出内存中的set数据
            Set<String> sets = redisTemplate.opsForSet().members("sets");
            // 内存中set的size
            Long size = redisTemplate.opsForSet().size("sets");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.5 opsForZSet操作value为zset有序集合

            redisTemplate.opsForZSet().add("zsets", "zhangsan", 100);//创建并放入元素
            redisTemplate.opsForZSet().add("zsets", "lisi", 90);//创建并放入元素
            redisTemplate.opsForZSet().add("zsets", "wangwu", 80);//创建并放入元素
            //获取指定分数范围的值
            Set<ZSetOperations.TypedTuple<String>> zsets = redisTemplate.opsForZSet().rangeByScoreWithScores("zsets", 0, 90);
            for (ZSetOperations.TypedTuple<String> zset : zsets) {
                System.out.println(zset.getValue() + "::" + zset.getScore());
            }
            //获取指定分数范围的值的key
            Set<String> zsets1 = redisTemplate.opsForZSet().rangeByScore("zsets", 0, 90);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.6 opsForHash操作value为hash

            // insert 第一种方式
            redisTemplate.opsForHash().put("student1","name","zhangsan");
            redisTemplate.opsForHash().put("student1","age","23");
            redisTemplate.opsForHash().put("student1","address","shenzhen");
            redisTemplate.opsForHash().put("student1","code","8888");
            // insert 第二种方式
            HashMap hashMap = new HashMap();
            hashMap.put("name","zhangsan");
            hashMap.put("age","23");
            redisTemplate.opsForHash().putAll("student2",hashMap);
            // 指定key的所有map value
            List<Object> values = redisTemplate.opsForHash().values("student1");
            // 指定key的所有map key
            Set<Object> keys = redisTemplate.opsForHash().keys("student1");
            // 获取多个指定key的value
            String[] str = {"age","address"};
            List<Object> student1 = redisTemplate.opsForHash().multiGet("student1", Arrays.asList(str));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4. Redisson

    4.1 Redisson概述

    • Redisson是Redis服务器上的分布式可伸缩Java数据结构----驻内存数据网格(In-Memory Data Grid,IMDG)。底层使用netty框架,并提供了与java对象相对应的分布式对象、分布式集合、分布式锁和同步器、分布式服务等一系列的Redisson的分布式对象。
    • IMDG(驻内存数据网格):
      • 将内存作为存储介质,在对主存的使用上IMDG有以下特性:
        • 数据是分布式存储在多台服务器上,可以根据需要增减服务器
        • IMDG需要克服的核心问题是容量限制和可靠性。通过水平扩展克服了内存容量的限制,通过复制保证可靠性一般使用堆外内存降低垃圾回收压力

    4.2 Redisson分布式对象

    1、通用对象桶
    Redisson分布式对象RBucket,可以存放任意类型对象
    2、二进制流
    Redisson分布式对象RBinaryStream,InputStream和OutoutStream接口实现
    3、地理空间对象桶
    Reddisson分布式RGo,储存于地理位置有关的对象桶
    4、BitSet
    Reddisson分布式RBitSet,是分布式的可伸缩位向量
    通过实现RClusteredBitSet接口,可以在集群环境下数据分片
    5、布隆过滤器
    Reddisson利用Redis实现了java分布式的布隆过滤器RBloomFilter

    6、基数估计算法(RHyperLogLog)
    可以在有限的空间通过概率算法统计大量数据
    7、限流器(RRateLimiter )
    可以用来在分布式环境下限制请求方的调用频率。适用于不同或相同的Reddisson实例的多线程限流。并不保证公平性

    4.3 实现分布式锁

    1. Redis实现分布式锁的主要步骤
    • 向Redis插入一条数据,key作为锁的标记、value作为加锁
    • 满足互斥性,当key不存在时才能设置值(设置成功就代表拿到了锁)
    • 要有过期时间,保证系统故障无法删除时能过期删除。避免死锁的产生
    • 当处理完业务释放锁删除时要校验value,保证只有加锁的人才能释放锁

    特别注意:以上实现步骤考虑到了使用分布式锁需要考虑的互斥性、防死锁、加锁和解锁必须为同一个进程等问题但是锁的续期无法实现。所以,博主采用 Redisson 实现 Redis 的分布式锁,借助 Redisson 的 WatchDog 机制 能够很好的解决锁续期的问题,同样 Redisson 也是 Redis 官方推荐分布式锁实现方案,实现起来较为简单。

    1. 加锁代码
    • lock和tryLock的区别:lock会一直等待尝试获取锁直到获取到锁为止;tryLock可以设置最大等待时间。
    	public void tryLock() {
            // 创建锁
            RLock lock = redissonSingle.getLock("myLock");
            boolean lockFlag = false;
    
            try {
                // 等待超时时间5秒,执行任务时间使用WatchDog机制
                lockFlag = lock.tryLock(5, -1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (lockFlag) {
                // 获取锁成功,执行业务
                try {
                    logger.info("获取锁成功,执行业务");
                    Thread.sleep(7000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放锁
                    lock.unlock();
                    logger.info("业务执行完成,释放锁");
                }
            } else {
                logger.error("获取锁失败");
            }
        }
    
        public void lock() {
            // 创建锁
            RLock lock = redissonSingle.getLock("myLock");
            try {
                // 默认加锁时间是 30s,没有最大等待时间,会一直等待直到锁释放,另一个线程会再次获取锁
                lock.lock();
                logger.info("获取锁成功, 执行任务!");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放锁
                lock.unlock();
                logger.info("任务执行完毕, 释放锁!");
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    1. Lua脚本解析(RedissonLock.class:239源码)
    if (redis.call('exists', KEYS[1]) == 0) then " +
       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
       "return nil; " +
       "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
    "return redis.call('pttl', KEYS[1]);"
        
    // 参数:
    KEYS[1]: 存入Redis的key值    
    ARGV[1]:锁的生存时间 默认30秒
    ARGV[2]:加锁客户端ID
    最后面的一个 1 是为了后面可重入做的计数统计,后面会有讲解到。
    // 加锁代码解释
    if(redis中不存在相同key的锁){
        加锁
        给锁设定生存时间
    }
    if(已经加过锁的客户端再次要加锁 -"重入锁"){
            加锁
        给锁设定生存时间
    }
    其他情况返回锁的剩余存活时间    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    1. 锁互斥机制

    A客户端抢到锁,在A未结束之前B尝试加锁就是返回剩余存活时间

    1. Redissson tryLock 的主流程:
    @Override
        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            // 1.尝试获取锁
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
    
            // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
    
            current = System.currentTimeMillis();
    
            /**
             * 2.订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
             * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
             *
             * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
             * 当 this.await 返回 true,进入循环尝试获取锁.
             */
            RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
            // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
            if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                acquireFailed(threadId);
                return false;
            }
    
            try {
                // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
                time -= System.currentTimeMillis() - current;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
    
                  }
    
                /**
                 * 3.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
                 * 获取锁成功,则立马返回 true,
                 * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
                 */
                while (true) {
                    long currentTime = System.currentTimeMillis();
    
                    // 再次尝试获取锁
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
                    // 超过最大等待时间则返回 false 结束循环,获取锁失败
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
    
                    /**
                     * 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
                     */
                    currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                        //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        //则就在wait time 时间范围内等待可以通过信号量
                        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
    
                    // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
                }
            } finally {
                // 7.无论是否获得锁,都要取消订阅解锁消息
                unsubscribe(subscribeFuture, threadId);
            }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    流程分析:

    1. 尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。

    2. 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id(其实本质上就是进程 id)通过 Redis 的 channel 订阅锁释放的事件,。如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false,也就是第 39 行代码。如果等到了锁的释放事件的通知,则开始进入一个不断重试获取锁的循环。

    3. 循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 的信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

    特别注意:以上过程存在一个细节,这里有必要说明一下,也是分布式锁的一个关键点:当锁正在被占用时,等待获取锁的进程并不是通过一个 while(true) 死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题

    1. 锁的续期机制

    客户端 1 加锁的锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,怎么办呢?

    Redisson 提供了一个续期机制, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。

        private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
            if (leaseTime != -1L) {
                return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            } else {
                // Watch Dog 模式
                RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
                ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                    if (e == null) {
                        if (ttlRemaining) {
                            this.scheduleExpirationRenewal(threadId);
                        }
    
                    }
                });
                return ttlRemainingFuture;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    注意:从以上源码我们看到 leaseTime 必须是 -1 才会开启 Watch Dog 机制,也就是如果你想开启 Watch Dog 机制必须使用默认的加锁时间为 30s。如果你自己自定义时间,超过这个时间,锁就会自定释放,并不会延长。

    • Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下

    • 如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。

      注意:这里有一个细节问题,如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。

    1. 可重入加锁机制
    if (redis.call('exists', KEYS[1]) == 0) then " +
       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
       "return nil; " +
       "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
    "return redis.call('pttl', KEYS[1]);"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    第一个 if 判断肯定不成立,exists myLock 会显示锁 key 已经存在。第二个 if 判断会成立,因为 myLock 的 hash 数据结构中包含的那个 ID 即客户端 1 的 ID,此时就会执行可重入加锁的逻辑,使用:hincrby myLock 285475da-9152-4c83-822a-67ee2f116a79:52 1 对客户端 1 的加锁次数加 1。此时 myLock 数据结构变为下面这样:

    加锁成功Redis中保存数据的结构
    在这里插入图片描述
    1. 重入锁的解锁
    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        // 1. 异步释放锁
        RFuture<Boolean> future = unlockInnerAsync(threadId);
        // 取消 Watch Dog 机制
        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);
    
            if (e != null) {
                result.tryFailure(e);
                return;
            }
    
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
    
            result.trySuccess(null);
        });
    
        return result;
    }
    
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 判断锁 key 是否存在
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                // 将该客户端对应的锁的 hash 结构的 value 值递减为 0 后再进行删除
                // 然后再向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    从以上代码来看,释放锁的步骤主要分三步:

    • 删除锁(这里注意可重入锁,在上面的脚本中有详细分析)。

    • 广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。

    • 取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除,并且 cancel 掉 Netty 的那个定时任务线程。

    4.3.1 Redisson分布式锁方案的优缺点

    1. 优点
    • Redisson 通过 Watch Dog 机制很好的解决了锁的续期问题。

    • 和 Zookeeper 相比较,Redisson 基于 Redis 性能更高,适合对性能要求高的场景。

    • 通过 Redisson 实现分布式可重入锁,比原生的 SET mylock userId NX PX milliseconds + lua 实现的效果更好些,虽然基本原理都一样,但是它帮我们屏蔽了内部的执行细节。

    在等待申请锁资源的进程等待申请锁的实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。

    1. 缺点

    使用 Redisson 实现分布式锁方案最大的问题就是如果你对某个 Redis Master 实例完成了加锁,此时 Master 会异步复制给其对应的 slave 实例。但是这个过程中一旦 Master 宕机,主备切换,slave 变为了 Master。接着就会导致,客户端 2 来尝试加锁的时候,在新的 Master 上完成了加锁,而客户端 1 也以为自己成功加了锁,此时就会导致多个客户端对一个分布式锁完成了加锁,这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。所以这个就是 Redis Cluster 或者说是 Redis Master-Slave 架构的主从异步复制导致的 Redis 分布式锁的最大缺陷(在 Redis Master 实例宕机的时候,可能导致多个客户端同时完成加锁)。

    有个别观点说使用 Watch Dog 机制开启一个定时线程去不断延长锁的时间对系统有所损耗(这里只是网络上的一种说法,博主查了很多资料并且结合实际生产并不认为有很大系统损耗,这个仅供大家参考)。

    4.4 读写锁测试

        @Autowired
        private StringRedisTemplate redisTemplate;
        @Autowired
        private RedissonClient redissonClient;
    
        @ResponseBody
        @GetMapping("/write")
        public String write() {
            String s = UUID.randomUUID().toString();
            RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("product-lock");
            // 写锁
            RLock writeLock = readWriteLock.writeLock();
            try {
                writeLock.lock();
                redisTemplate.opsForValue().set("id:1", s);
                TimeUnit.SECONDS.sleep(30);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
            }
            return s;
        }
    
        @ResponseBody
        @GetMapping("read")
        public String read() {
            RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("product-lock");
            // 读锁
            RLock readLock = readWriteLock.readLock();
            String s = "";
            try {
                readLock.lock();
                readWriteLock.readLock();
                s = redisTemplate.opsForValue().get("id:1");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
            }
    
            return s;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 总结
      • 写锁:排他锁,只要存在写锁其他锁就必须等待
      • 读锁:共享锁,多个读锁可以共享

    4.5 信号量

        @GetMapping("park")
        public void park() throws InterruptedException {
            RSemaphore park = redissonClient.getSemaphore("park");
            // 获取信号量(阻塞式获取)
            park.acquire();
            // 获取信号量(非阻塞式获取)
            park.tryAcquire();
        }
    
        @GetMapping("/go")
        public void go() throws InterruptedException {
            RSemaphore go = redissonClient.getSemaphore("park");
            // 释放信号量
            go.release();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4.6 闭锁(countDownLatch)

        @GetMapping("close/door")
        public String closeDoor() throws InterruptedException {
            RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
            // 五个闭锁全部完成结束
            countDownLatch.trySetCount(5);
            // 等待闭锁全部完成
            countDownLatch.await();
            return "success";
        }
        @GetMapping("consumer/count/latch")
        public String close(){
            RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
            // 闭锁计数减一
            countDownLatch.countDown();
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    5. Jedis

    Jedis使用

    3. Redis集群

    4. Redis使用场景

    1. 使用场景概述

    • 计数器

    ​ 可以对 String 进行自增自减运算,从而实现计数器功能。Redis 这种内存型数据库的读写性能非常高,很适合存储频繁读写的计数量。

    • 缓存

    ​ 将热点数据放到内存中,设置内存的最大使用量以及淘汰策略来保证缓存的命中率。

    • 会话缓存

    ​ 可以使用 Redis 来统一存储多台应用服务器的会话信息。当应用服务器不再存储用户的会话信息,也就不再具有状态,一个用户可以请求任意一个应用服务器,从而更容易实现高可用性以及可伸缩性。

    • 全页缓存(FPC)

    ​ 除基本的会话token之外,Redis还提供很简便的FPC平台。以Magento为例,Magento提供一个插件来使用Redis作为全页缓存后端。此外,对WordPress的用户来说,Pantheon有一个非常好的插件 wp-redis,这个插件能帮助你以最快速度加载你曾浏览过的页面。

    • 查找表

    ​ 例如 DNS 记录就很适合使用 Redis 进行存储。查找表和缓存类似,也是利用了 Redis 快速的查找特性。但是查找表的内容不能失效,而缓存的内容可以失效,因为缓存不作为可靠的数据来源。

    • 消息队列(发布/订阅功能)

    ​ List 是一个双向链表,可以通过 lpush 和 rpop 写入和读取消息。不过最好使用 Kafka、RabbitMQ 等消息中间件。

    • 分布式锁实现

    ​ 在分布式场景下,无法使用单机环境下的锁来对多个节点上的进程进行同步。可以使用 Redis 自带的 SETNX 命令实现分布式锁,除此之外,还可以使用官方提供的 RedLock 分布式锁实现。

    • 其它

    ​ Set 可以实现交集、并集等操作,从而实现共同好友等功能。ZSet 可以实现有序性操作,从而实现排行榜等功能。

    2. Redis使用场景模拟

    2.1 分布式锁

    1. 如2.4Redisson

    2. Reids原生 SET mylock userId NX PX milliseconds + lua

    5. 理论系列

    1. Redis的持久化

    持久化就是把内存的数据写到磁盘中去,防止服务宕机了内存数据丢失。

    1.1 RDB(默认)

    1. RDB是Redis默认的持久化方式。按照一定的时间将内存的数据以快照的形式保存到硬盘中,对应产生的数据文件为dump.rdb。通过配置文件中的save参数来定义快照的周期。

    2. 优点:

      1、只有一个文件 dump.rdb,方便持久化。
      2、容灾性好,一个文件可以保存到安全的磁盘。
      3、性能最大化,fork 子进程来完成写操作,让主进程继续处理命令,所以是 IO 最大化。使用单独子进程来进行持久化,主进程不会进行任何 IO 操作,保证了 redis 的高性能
      4.相对于数据集大时,比 AOF 的启动效率更高。
      
      • 1
      • 2
      • 3
      • 4
    3. 缺点

    1、数据安全性低。RDB 是间隔一段时间进行持久化,如果持久化之间 redis 发生故障,会发生数据丢失。所以这种方式更适合数据要求不严谨的时候)
    
    • 1

    1.2 AOF

    1. AOF持久化(即Append Only File持久化),则是将Redis执行的每次写命令记录到单独的日志文件中,当重启Redis会重新将持久化的日志中文件恢复数据。

    2. 优点:

      1、数据安全,aof 持久化可以配置 appendfsync 属性,有 always,每进行一次 命令操作就记录到 aof 文件中一次。
      2、通过 append 模式写文件,即使中途服务器宕机,可以通过 redis-check-aof 工具解决数据一致性问题。
      3、AOF 机制的 rewrite 模式。AOF 文件没被 rewrite 之前(文件过大时会对命令 进行合并重写),可以删除其中的某些命令(比如误操作的 flushall))

    3. 缺点

    1、AOF 文件比 RDB 文件大,且恢复速度慢。
    2、数据集大的时候,比 rdb 启动效率低。
    
    • 1
    • 2

    1.3 RDB和AOF对比

    • AOF文件比RDB更新频率高(AOF记录每一条命令),优先使用AOF还原数据(更完整)。
    • AOF比RDB更安全也更大
    • RDB性能比AOF好
    • 如果两个都配了优先加载AOF

    1.4 如何选择合适的持久化方式

    • 一般来说, 如果想达到足以媲美PostgreSQL的数据安全性,你应该同时使用两种持久化功能。在这种情况下,当 Redis 重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。

    • 如果你非常关心你的数据, 但仍然可以承受数分钟以内的数据丢失,那么你可以只使用RDB持久化。

    • 有很多用户都只使用AOF持久化,但并不推荐这种方式,因为定时生成RDB快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比AOF恢复的速度要快,除此之外,使用RDB还可以避免AOF程序的bug。

    • 如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式。

    2. 过期键的删除策略

    2.1 Redis的过期键的删除策略

    我们都知道,Redis是key-value数据库,我们可以设置Redis中缓存的key的过期时间。Redis的过期策略就是指当Redis中缓存的key过期了,Redis如何处理。

    2.2 三种常用过期策略

    • 定时过期:每个设置过期时间的key都需要创建一个定时器,到过期时间就会立即清除。该策略可以立即清除过期的数据,对内存很友好;但是会占用大量的CPU资源去处理过期的数据,从而影响缓存的响应时间和吞吐量。
    • 惰性过期:只有当访问一个key时,才会判断该key是否已过期,过期则清除。该策略可以最大化地节省CPU资源,却对内存非常不友好。极端情况可能出现大量的过期key没有再次被访问,从而不会被清除,占用大量内存。
    • 定期过期:每隔一定的时间,会扫描一定数量的数据库的expires字典中一定数量的key,并清除其中已过期的key。该策略是前两者的一个折中方案。通过调整定时扫描的时间间隔和每次扫描的限定耗时,可以在不同情况下使得CPU和内存资源达到最优的平衡效果。

    Redis中同时使用了惰性过期和定期过期两种过期策略

    3. 内存相关

    3.1 热点数据

    1. MySQL里有2000w数据,redis中只存20w的数据,如何保证redis中的数据都是热点数据?
    • redis内存数据集大小上升到一定大小的时候,就会施行数据淘汰策略

    3.2 内存淘汰策略

    Redis的内存淘汰策略是指在Redis的用于缓存的内存不足时,怎么处理新写入且需要申请额外空间的数据。

    1. 全局的键空间选择性移除
    • noeviction:当内存不足以容纳新写入数据时,新写入操作会报错。
    • allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key。(这个是最常用的)
    • allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个key。
    1. 设置过期时间的键空间选择性移除
    • volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的key。
    • volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key。
    • volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除。

    3.3 Redis如何做内存优化?

    尽可能使用散列表(hash),散列表使用的内存非常小,所以你应该尽可能的将你的数据模型抽象到一个散列表里面。比如你的web系统中有一个用户对象,不要为这个用户的名称,姓氏,邮箱,密码设置单独的key,而是应该把这个用户的所有信息存储到一张散列表里面。

    4. 缓存和数据库一致性

    参考文档

    4.1 引入缓存提高性能

    4.2 缓存利用率和一致性问题

    1. 第一种缓存方案:定时把数据库数据全刷到缓存,且不设置过期时间
    • 问题
      • 不设置过期时间,一些经常不被使用的数据一直被缓存利用率太低
      • 每隔一段时间间隔刷新缓存,数据库和缓存的数据一致性没保证
    1. 第二种方案:更新数据库和更新缓存两步操作的方式

      但数据库和缓存都更新,又存在先后问题,那对应的方案就有 2 个:

      • 先更新缓存,后更新数据库

      • 先更新数据库,后更新缓存

      哪个方案更好呢?

      先不考虑并发问题,正常情况下,无论谁先谁后,都可以让两者保持一致,但现在我们需要重点考虑「异常」情况。

      因为操作分为两步,那么就很有可能存在「第一步成功、第二步失败」的情况发生。

      这 2 种方案我们一个个来分析。

      1) 先更新缓存,后更新数据库

      如果缓存更新成功了,但数据库更新失败,那么此时缓存中是最新值,但数据库中是「旧值」。

      虽然此时读请求可以命中缓存,拿到正确的值,但是,一旦缓存「失效」,就会从数据库中读取到「旧值」,重建缓存也是这个旧值。

      这时用户会发现自己之前修改的数据又「变回去」了,对业务造成影响。

      2) 先更新数据库,后更新缓存

      如果数据库更新成功了,但缓存更新失败,那么此时数据库中是最新值,缓存中是「旧值」。

      之后的读请求读到的都是旧数据,只有当缓存「失效」后,才能从数据库中得到正确的值。

      这时用户会发现,自己刚刚修改了数据,但却看不到变更,一段时间过后,数据才变更过来,对业务也会有影响。

      可见,无论谁先谁后,但凡后者发生异常,就会对业务造成影响。那怎么解决这个问题呢?

      别急,后面我会详细给出对应的解决方案。

      我们继续分析,除了操作失败问题,还有什么场景会影响数据一致性?

      这里我们还需要重点关注:并发问题

    4.3 并发引发的一致性问题

    假设我们采用「先更新数据库,再更新缓存」的方案,并且两步都可以「成功执行」的前提下,如果存在并发,情况会是怎样的呢?

    有线程 A 和线程 B 两个线程,需要更新「同一条」数据,会发生这样的场景:

    1. 线程 A 更新数据库(X = 1)
    2. 线程 B 更新数据库(X = 2)
    3. 线程 B 更新缓存(X = 2)
    4. 线程 A 更新缓存(X = 1)

    最终 X 的值在缓存中是 1,在数据库中是 2,发生不一致。

    也就是说,A 虽然先于 B 发生,但 B 操作数据库和缓存的时间,却要比 A 的时间短,执行时序发生「错乱」,最终这条数据结果是不符合预期的。

    同样地,采用「先更新缓存,再更新数据库」的方案,也会有类似问题,这里不再详述。

    除此之外,我们从「缓存利用率」的角度来评估这个方案,也是不太推荐的。

    这是因为每次数据发生变更,都「无脑」更新缓存,但是缓存中的数据不一定会被「马上读取」,这就会导致缓存中可能存放了很多不常访问的数据,浪费缓存资源。

    由此可见,这种「更新数据库 + 更新缓存」的方案,不仅缓存利用率不高,还会造成机器性能的浪费。

    所以此时我们需要考虑另外一种方案:删除缓存

    4.4 删除缓存可以保证一致性吗?

    • 删除缓存对应的方案也有 2 种:
    1. 先删除缓存,后更新数据库
    2. 先更新数据库,后删除缓存

    经过前面的分析我们已经得知,但凡「第二步」操作失败,都会导致数据不一致。

    这里我不再详述具体场景,你可以按照前面的思路推演一下,就可以看到依旧存在数据不一致的情况。

    • 这里我们重点来看「并发」问题。

    1) 先删除缓存,后更新数据库

    如果有 2 个线程要并发「读写」数据,可能会发生以下场景:

    1. 线程 A 要更新 X = 2(原值 X = 1)
    2. 线程 A 先删除缓存
    3. 线程 B 读缓存,发现不存在,从数据库中读取到旧值(X = 1)
    4. 线程 A 将新值写入数据库(X = 2)
    5. 线程 B 将旧值写入缓存(X = 1)

    最终 X 的值在缓存中是 1(旧值),在数据库中是 2(新值),发生不一致。

    可见,先删除缓存,后更新数据库,当发生「读+写」并发时,还是存在数据不一致的情况。

    2) 先更新数据库,后删除缓存

    依旧是 2 个线程并发「读写」数据:

    1. 缓存中 X 不存在(数据库 X = 1)
    2. 线程 A 读取数据库,得到旧值(X = 1)
    3. 线程 B 更新数据库(X = 2)
    4. 线程 B 删除缓存
    5. 线程 A 将旧值写入缓存(X = 1)

    最终 X 的值在缓存中是 1(旧值),在数据库中是 2(新值),也发生不一致。

    这种情况「理论」来说是可能发生的,但实际真的有可能发生吗?

    其实概率「很低」,这是因为它必须满足 3 个条件:

    1. 缓存刚好已失效
    2. 读请求 + 写请求并发
    3. 更新数据库 + 删除缓存的时间(步骤 3-4),要比读数据库 + 写缓存时间短(步骤 2 和 5)

    仔细想一下,条件 3 发生的概率其实是非常低的。

    因为写数据库一般会先「加锁」,所以写数据库,通常是要比读数据库的时间更长的。

    这么来看,「先更新数据库 + 再删除缓存」的方案,是可以保证数据一致性的。

    所以,我们应该采用这种方案,来操作数据库和缓存。

    好,解决了并发问题,我们继续来看前面遗留的,第二步执行「失败」导致数据不一致的问题

    4.5 如何保证数据库和缓存操作都执行成功?

    前面我们分析到,无论是更新缓存还是删除缓存,只要第二步发生失败,那么就会导致数据库和缓存不一致。

    保证第二步成功执行,就是解决问题的关键。

    想一下,程序在执行过程中发生异常,最简单的解决办法是什么?

    答案是:重试

    是的,其实这里我们也可以这样做。

    无论是先操作缓存,还是先操作数据库,但凡后者执行失败了,我们就可以发起重试,尽可能地去做「补偿」。

    那这是不是意味着,只要执行失败,我们「无脑重试」就可以了呢?

    答案是否定的。现实情况往往没有想的这么简单,失败后立即重试的问题在于:

    • 立即重试很大概率「还会失败」
    • 「重试次数」设置多少才合理?
    • 重试会一直「占用」这个线程资源,无法服务其它客户端请求

    看到了么,虽然我们想通过重试的方式解决问题,但这种「同步」重试的方案依旧不严谨。

    那更好的方案应该怎么做?

    答案是:异步重试。什么是异步重试?

    其实就是把重试请求写到「消息队列」中,然后由专门的消费者来重试,直到成功。

    或者更直接的做法,为了避免第二步执行失败,我们可以把操作缓存这一步,直接放到消息队列中,由消费者来操作缓存。

    到这里你可能会问,写消息队列也有可能会失败啊?而且,引入消息队列,这又增加了更多的维护成本,这样做值得吗?

    这个问题很好,但我们思考这样一个问题:如果在执行失败的线程中一直重试,还没等执行成功,此时如果项目「重启」了,那这次重试请求也就「丢失」了,那这条数据就一直不一致了。

    所以,这里我们必须把重试或第二步操作放到另一个「服务」中,这个服务用「消息队列」最为合适。这是因为消息队列的特性,正好符合我们的需求:

    • 消息队列保证可靠性:写到队列中的消息,成功消费之前不会丢失(重启项目也不担心)
    • 消息队列保证消息成功投递:下游从队列拉取消息,成功消费后才会删除消息,否则还会继续投递消息给消费者(符合我们重试的场景)

    至于写队列失败和消息队列的维护成本问题:

    • 写队列失败:操作缓存和写消息队列,「同时失败」的概率其实是很小的
    • 维护成本:我们项目中一般都会用到消息队列,维护成本并没有新增很多

    所以,引入消息队列来解决这个问题,是比较合适的。这时架构模型就变成了这样:

    图片

    那如果你确实不想在应用中去写消息队列,是否有更简单的方案,同时又可以保证一致性呢?

    方案还是有的,这就是近几年比较流行的解决方案:订阅数据库变更日志,再操作缓存

    具体来讲就是,我们的业务应用在修改数据时,「只需」修改数据库,无需操作缓存。

    那什么时候操作缓存呢?这就和数据库的「变更日志」有关了。

    拿 MySQL 举例,当一条数据发生修改时,MySQL 就会产生一条变更日志(Binlog),我们可以订阅这个日志,拿到具体操作的数据,然后再根据这条数据,去删除对应的缓存。

    图片

    订阅变更日志,目前也有了比较成熟的开源中间件,例如阿里的 canal,使用这种方案的优点在于:

    • 无需考虑写消息队列失败情况:只要写 MySQL 成功,Binlog 肯定会有
    • 自动投递到下游队列:canal 自动把数据库变更日志「投递」给下游的消息队列

    当然,与此同时,我们需要投入精力去维护 canal 的高可用和稳定性。

    如果你有留意观察很多数据库的特性,就会发现其实很多数据库都逐渐开始提供「订阅变更日志」的功能了,相信不远的将来,我们就不用通过中间件来拉取日志,自己写程序就可以订阅变更日志了,这样可以进一步简化流程。

    至此,我们可以得出结论,想要保证数据库和缓存一致性,推荐采用「先更新数据库,再删除缓存」方案,并配合「消息队列」或「订阅变更日志」的方式来做

    4.6 主从库延迟和延迟双删问题

    到这里,还有 2 个问题,是我们没有重点分析过的。

    第一个问题,还记得前面讲到的「先删除缓存,再更新数据库」方案,导致不一致的场景么?

    这里我再把例子拿过来让你复习一下:

    2 个线程要并发「读写」数据,可能会发生以下场景:

    1. 线程 A 要更新 X = 2(原值 X = 1)
    2. 线程 A 先删除缓存
    3. 线程 B 读缓存,发现不存在,从数据库中读取到旧值(X = 1)
    4. 线程 A 将新值写入数据库(X = 2)
    5. 线程 B 将旧值写入缓存(X = 1)

    最终 X 的值在缓存中是 1(旧值),在数据库中是 2(新值),发生不一致。

    第二个问题:是关于「读写分离 + 主从复制延迟」情况下,缓存和数据库一致性的问题。

    在「先更新数据库,再删除缓存」方案下,「读写分离 + 主从库延迟」其实也会导致不一致:

    1. 线程 A 更新主库 X = 2(原值 X = 1)
    2. 线程 A 删除缓存
    3. 线程 B 查询缓存,没有命中,查询「从库」得到旧值(从库 X = 1)
    4. 从库「同步」完成(主从库 X = 2)
    5. 线程 B 将「旧值」写入缓存(X = 1)

    最终 X 的值在缓存中是 1(旧值),在主从库中是 2(新值),也发生不一致。

    看到了么?这 2 个问题的核心在于:缓存都被回种了「旧值」

    那怎么解决这类问题呢?

    最有效的办法就是,把缓存删掉

    但是,不能立即删,而是需要「延迟删」,这就是业界给出的方案:缓存延迟双删策略

    按照延时双删策略,这 2 个问题的解决方案是这样的:

    解决第一个问题:在线程 A 删除缓存、更新完数据库之后,先「休眠一会」,再「删除」一次缓存。

    解决第二个问题:线程 A 可以生成一条「延时消息」,写到消息队列中,消费者延时「删除」缓存。

    这两个方案的目的,都是为了把缓存清掉,这样一来,下次就可以从数据库读取到最新值,写入缓存。

    但问题来了,这个「延迟删除」缓存,延迟时间到底设置要多久呢?

    • 问题1:延迟时间要大于「主从复制」的延迟时间
    • 问题2:延迟时间要大于线程 B 读取数据库 + 写入缓存的时间

    但是,这个时间在分布式和高并发场景下,其实是很难评估的**。**

    很多时候,我们都是凭借经验大致估算这个延迟时间,例如延迟 1-5s,只能尽可能地降低不一致的概率。

    所以你看,采用这种方案,也只是尽可能保证一致性而已,极端情况下,还是有可能发生不一致。

    所以实际使用中,我还是建议你采用「先更新数据库,再删除缓存」的方案,同时,要尽可能地保证「主从复制」不要有太大延迟,降低出问题的概率。

    4.7 可以做到强一致吗?

    看到这里你可能会想,这些方案还是不够完美,我就想让缓存和数据库「强一致」,到底能不能做到呢?

    其实很难。

    要想做到强一致,最常见的方案是 2PC、3PC、Paxos、Raft 这类一致性协议,但它们的性能往往比较差,而且这些方案也比较复杂,还要考虑各种容错问题。

    相反,这时我们换个角度思考一下,我们引入缓存的目的是什么?

    没错,性能

    一旦我们决定使用缓存,那必然要面临一致性问题。性能和一致性就像天平的两端,无法做到都满足要求。

    而且,就拿我们前面讲到的方案来说,当操作数据库和缓存完成之前,只要有其它请求可以进来,都有可能查到「中间状态」的数据。

    所以如果非要追求强一致,那必须要求所有更新操作完成之前期间,不能有「任何请求」进来。

    虽然我们可以通过加「分布锁」的方式来实现,但我们要付出的代价,很可能会超过引入缓存带来的性能提升。

    所以,既然决定使用缓存,就必须容忍「一致性」问题,我们只能尽可能地去降低问题出现的概率。

    同时我们也要知道,缓存都是有「失效时间」的,就算在这期间存在短期不一致,我们依旧有失效时间来兜底,这样也能达到最终一致。

    4.8 总结

    1、想要提高应用的性能,可以引入「缓存」来解决

    2、引入缓存后,需要考虑缓存和数据库一致性问题,可选的方案有:「更新数据库 + 更新缓存」、「更新数据库 + 删除缓存」

    3、更新数据库 + 更新缓存方案,在「并发」场景下无法保证缓存和数据一致性,且存在「缓存资源浪费」和「机器性能浪费」的情况发生

    4、在更新数据库 + 删除缓存的方案中,「先删除缓存,再更新数据库」在「并发」场景下依旧有数据不一致问题,解决方案是「延迟双删」,但这个延迟时间很难评估,所以推荐用「先更新数据库,再删除缓存」的方案

    5、在「先更新数据库,再删除缓存」方案下,为了保证两步都成功执行,需配合「消息队列」或「订阅变更日志」的方案来做,本质是通过「重试」的方式保证数据一致性

    6、在「先更新数据库,再删除缓存」方案下,「读写分离 + 主从库延迟」也会导致缓存和数据库不一致,缓解此问题的方案是「延迟双删」,凭借经验发送「延迟消息」到队列中,延迟删除缓存,同时也要控制主从库延迟,尽可能降低不一致发生的概率

    4.9 数据库操作有限

    4.9.1 先更新数据库再更新缓存(双写模式)

    数据不一致示例
    在这里插入图片描述

    4.9.2 先更新数据库再删除缓存(失效模式)

    数据不一致示例
    在这里插入图片描述

    5. Redis常见性能问题和解决方案?

    • Master最好不要做任何持久化工作,包括内存快照和AOF日志文件,特别是不要启用内存快照做持久化。
    • 如果数据比较关键,某个Slave开启AOF备份数据,策略为每秒同步一次。
    • 为了主从复制的速度和连接的稳定性,Slave和Master最好在同一个局域网内。
    • 尽量避免在压力较大的主库上增加从库
    • Master调用BGREWRITEAOF重写AOF文件,AOF在重写的时候会占大量的CPU和内存资源,导致服务load过高,出现短暂服务暂停现象。
    • 为了Master的稳定性,主从复制不要用图状结构,用单向链表结构更稳定,即主从关系为:Master<–Slave1<–Slave2<–Slave3…,这样的结构也方便解决单点故障问题,实现Slave对Master的替换,也即,如果Master挂了,可以立马启用Slave1做Master,其他不变。

    6. 取固定的已知的前缀开头的Key

    1. 使用keys指令可以扫出指定模式的key列表。
    2. 对方接着追问:如果这个redis正在给线上的业务提供服务,那使用keys指令会有什么问题?

    ​ 这个时候你要回答redis关键的一个特性:redis的单线程的。keys指令会导致线程阻塞一段时间,线上服务会停顿,直到指令执行完毕,服务才能恢复。这个时候可以使用scan指令,scan指令可以无阻塞的提取出指定模式的key列表,但是会有一定的重复概率,在客户端做一次去重就可以了,但是整体所花费的时间会比直接用keys指令长。

    7. 使用Redis做过异步队列吗,是如何实现的

    使用list类型保存数据信息,rpush生产消息,lpop消费消息,当lpop没有消息时,可以sleep一段时间,然后再检查有没有信息,如果不想sleep的话,可以使用blpop, 在没有信息的时候,会一直阻塞,直到信息的到来。redis可以通过pub/sub主题订阅模式实现一个生产者,多个消费者,当然也存在一定的缺点,当消费者下线时,生产的消息会丢失。

    8. Redis如何实现延时队列

    使用sortedset,使用时间戳做score, 消息内容作为key,调用zadd来生产消息,消费者使用zrangbyscore获取n秒之前的数据做轮询处理。

    9. 雪崩、击穿、穿透

    9.1 雪崩

    1. 概念
    • 大量的key同时失效,大量请求对数据库造成毁灭性压力
    • 导致大量缓存key失效的原因可能是缓存服务器宕机或者key过期时间同时到期
    1. 解决方案(key过期时间同时到期)
    • 热点key设置随机的过期时间
    • 缓存永不失效
    1. 解决方案(缓存服务器宕机)
    • 采用 Redis 集群,避免单机出现问题整个缓存服务都没办法使用。
    • 限流,避免同时处理大量的请求。

    9.2 击穿

    1. 概念
    • 热点key失效,与该key相关的大量请求对数据库造成毁灭性压力
    1. 解决方案
    • 加锁,保证只有拿到锁的一个请求真正请求数据库其他的自旋等待数据被放入缓存

    9.3 穿透

    1. 概念
    • 请求携带缓存一定不存在的参数数据,迫使大量请求对数据库造成毁灭性压力
    1. 解决方案
    • 做好请求参数校验,对携带明显非法请求参数的请求直接抛出异常
    • 缓存数据库不存在数据的key并设置较短的过期时间 ;存在bug如果每一次非法请求都不一样的情况,缓存没任何作用反而消耗了缓存服务器
    • 布隆过滤器:把所有可能存在的请求的值都存放在布隆过滤器中,当用户请求过来,先判断用户发来的请求的值是否存在于布隆过滤器中。不存在的话,直接返回请求参数错误信息给客户端,存在的话才会走下面的流程。
    布隆过滤器
    在这里插入图片描述
    • 但是,需要注意的是布隆过滤器可能会存在误判的情况。总结来说就是: **布隆过滤器说某个元素存在,小概率会误判。布隆过滤器说某个元素不在,那么这个元素一定不在。**所以对检索准确性没有任何影响。

    布隆过滤器的原理当一个元素加入布隆过滤器中的时候,会进行哪些操作:

    1. 使用布隆过滤器中的哈希函数对元素值进行计算,得到哈希值(有几个哈希函数得到几个哈希值)。
    2. 根据得到的哈希值,在位数组中把对应下标的值置为 1。

    再来看一下,当我们需要判断一个元素是否存在于布隆过滤器的时候,会进行哪些操作

    1. 对给定元素再次进行相同的哈希计算;
    2. 得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。

    然后,一定会出现这样一种情况:不同的字符串可能哈希出来的位置相同。 (可以适当增加位数组大小或者调整我们的哈希函数来降低概率)

  • 相关阅读:
    编译安装Nginx+GeoIP2自动更新+防盗链+防爬虫+限制访问速度+限制连接数
    图像处理:U-Net中的重叠-切片(Overlap-tile)
    时间复杂度计算超全整理!!(数据结构和算法的第一步
    共享自行车需求预测学习记录
    blender 快捷键 常见问题
    idea手动创建webapp(在main文件夹下)
    Nacos作为配置中心
    【深入理解C++】局部变量及初始化
    Java 使用Lock、Condition的生产者与消费者问题(JUC)
    【网络协议】 TCP与UDP协议区别及应用场景深度分析
  • 原文地址:https://blog.csdn.net/qq_42263473/article/details/125968983