• Redis缓存初探


            在电商项目中商品分类往往显示在首页,是一个热点数据,它具有查多改少的特点,因此添加缓存是非常合适的,而缓存我们常用redis, 先在在缓存中去获取数据,如果获取不到,就去数据库里面查询,并且把查询到的结果放到缓存中。

    考虑到不同人群所用到的语言和redis工具的用法均不相同,本文的代码都是一些伪代码,运行不起来的......

    1. public List getCategoryNames(){
    2. String categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
    3. // No cache
    4. if(StringUtils.isEmpty(categoryNamesJson)){
    5. List allCategoryNames = getCategoryNameInDB();
    6. redisTemplate.opsForValue().set("CategoryName",
    7. JSONObject.toJSONString(allCategoryNames ), 1, TimeUnit.DAYS);
    8. return allCategoryNames;
    9. }
    10. List result = JSONObject.parseObject(categoryNamesJson, new TypeReference>);
    11. return result;
    12. }

    这段代码在单线程的情况下没有问题,而在多线程的环境下,多个线程同时判断StringUtils.isEmpty(categoryNameJson); 为true, 同时会有多个线程去查询数据库然后写到缓存中,这就失去了缓存的效果。我们可以使用JVM提供的synchronized关键字来控制并发。

    1. public List getCategoryNames(){
    2. String categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
    3. // No cache
    4. if(StringUtils.isEmpty(categoryNamesJson)){
    5. synchronized (this) {
    6. categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
    7. if(StringUtils.isEmpty(categoryNamesJson)){
    8. List allCategoryNames = getCategoryNameInDB();
    9. // add cache if db has data
    10. if(allCategoryNames != null && allCategoryNames.length > 0){
    11. redisTemplate.opsForValue().set("CategoryName",
    12. JSONObject.toJSONString(allCategoryNames ), 1, TimeUnit.DAYS);
    13. }
    14. return allCategoryNames;
    15. }
    16. }
    17. }
    18. List result = JSONObject.parseObject(categoryNamesJson, new TypeReference>);
    19. return result;
    20. }

    这是一个双重检查,这样我们能够保证多线程环境下只有一个线程会去查库。

    我们的方法目前还是有所缺陷

    • 如果redis中没有,数据库中也没有,并且这种请求是大量的,那么我们的数据库仍然会执行多次查询,缓存失去了作用,这是缓存穿透。因此就算数据库中没有数据,我们也应该往缓存中插入一条空数据。
    • 目前我们缓存的时间是一天,如果一天后当缓存过期时需要处理大量请求,同样会造成数据库压力过大,我们应该把缓存的过期时间设置成不一样的,可以在原有失效时间上加上一个随机数,这样缓存就不会一起过期,目前我们只用了一个键值对,暂时没有这个问题,这是缓存雪崩
    • 当一个key在大量请求来的时候刚好失效,所有的压力就会落到DB,这个我们通过互斥锁解决了,这是缓存击穿。锁的性能是较差的,还有一种方法是逻辑过期,保证redis中热点数据始终存在,这个实现比较困难,而且容易产生数据一致性的问题,如果并发量很大,而且可以接受数据不一致的问题,可以考虑使用。

    单机环境通过这种方式可以设置缓存,但是并发量较大的网站是不会使用单机部署的,我们会有多个机器,通过网关负载平衡到各个机器,这个时候就出现了新的问题。

    比如有三台机器,那么上面的代码就会有三个锁,同时三个进程上的某一个线程获取到了锁,这就会查数据库3次,如果我们只想让缓存存在一份,就必须使用到分布式锁。分布式锁就是可以串行化多个进程之内的线程。

    暂时先不对比性能,我们的目的就是多进程下只有一个线程能向数据库中添加数据。

    我们先来最简单的实现一下

    1. public List getCategoryNames(){
    2. String categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
    3. // No cache
    4. if(StringUtils.isEmpty(categoryNamesJson)){
    5. String lock = redis.Template.opsForValue().get("Lock");
    6. // Success get lock
    7. if(StringUtils.isEmpty(lock)){
    8. redis.Template.opsForValue().set("Lock", 1);
    9. categoryNamesJson = redis.Template.opsForValue().get("CategoryName");
    10. if(StringUtils.isEmpty(categoryNamesJson)){
    11. List allCategoryNames = getCategoryNameInDB();
    12. // add cache if db has data
    13. if(allCategoryNames != null && allCategoryNames.length > 0){
    14. redisTemplate.opsForValue().set("CategoryName",
    15. JSONObject.toJSONString(allCategoryNames ), 1, TimeUnit.DAYS);
    16. }
    17. // Unlock
    18. redis.Template.opsForValue().remove("Lock");
    19. return allCategoryNames;
    20. }
    21. }else{
    22. // Get lock fail, wait for get lock again.
    23. try {
    24. Thread.sleep(200);
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. }
    28. return getCategoryNames();
    29. }
    30. }
    31. List result = JSONObject.parseObject(categoryNamesJson, new TypeReference>);
    32. return result;
    33. }

    判断redis中是否有lock来完成加锁操作,如果有lock证明无法获取到锁,过一段时间继续去获取锁,代码的问题也很明显,当多个线程同时走到String lock = redis.Template.opsForValue().get("Lock"); 全都判断正确,然后去查询缓存,然后又都过了,这时还会有多个请求发送到数据库,不满足预期。

    现在缓存的问题主要变成了锁的问题,为了保持代码简短,我们把去掉与锁无关的代码。

    1. public List<string> getCategoryNamesWithRedisLock(){
    2. String lock = redis.Template.opsForValue().get("Lock");
    3. // Success get lock
    4. if(StringUtils.isEmpty(lock)){
    5. redis.Template.opsForValue().set("Lock", 1);
    6. List<string> names = getCategoryNamesImpl();
    7. redis.Template.opsForValue().remove("Lock");
    8. return names;
    9. }else{
    10. // Get lock fail, wait for get lock again.
    11. try {
    12. Thread.sleep(200);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. return getCategoryNamesWithRedisLock();
    17. }
    18. }

    这时候我们会发现,如果是这种情况,先从redis获取一下锁,然后判断锁是否存在的代码都是不可行的,我们至少需要让获取和判断是原子性的,如果我们可以编写一个伪单机版redis代码,我们会提供一个方法,如果没有这个键才让插入,如果有就禁止插入,返回false。注意这里是单机版的伪redis。这里使用了阻塞的锁,这里用非阻塞的锁会更好。

    1. Map<String,String> totalMap = new HashMap<String,String>()
    2. public boolean setNx(string key, string value){
    3. Lock(this){
    4. if(totalMap.contains(key)){
    5. return false;
    6. }else{
    7. totalMap.add(key, value);
    8. return true;
    9. }
    10. }
    11. }

    redis当然也为我们提供了这种功能。来自redis官网。

     尝试去添加一个键值对,如果键存在直接返回false, 如果设置成功返回true。

    使用了SetNx的代码如下

    1. public List getCategoryNamesWithRedisLock(){
    2. // Set nx
    3. Boolean getLockSuccess = redis.Template.opsForValue().setIfAbsent("Lock", 1);
    4. // Success get lock
    5. if(getLockSuccess){
    6. try{
    7. List names = getCategoryNamesImpl();
    8. }finally {
    9. // unlock
    10. redis.Template.opsForValue().remove("Lock");
    11. }
    12. return names;
    13. }else{
    14. // Get lock fail, wait for get lock again.
    15. try {
    16. Thread.sleep(200);
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. return getCategoryNamesWithRedisLock();
    21. }
    22. }

    这样,即便是在多台服务器上多个线程同时运行setIfAbsent这段代码,最终也只有一个线程会获取到getLockSuccess为true,在没有意外的情况下,当前代码就实现了分布式锁的功能。

    那么下面就是意外情况了

    • 如果我们的getCategoryNamesImpl是超长的任务,在执行过程中电脑断电了,那么永远都不会去释放锁,造成了其他机器获取不到锁。
    • 如果加了锁的过期时间就会有新的问题,比如业务还没跑完,锁释放掉了,其他线程就获取到了锁,造成多线程问题。业务时间是无法确定的,因此过期时间是多少也是不确定的。
    • 当前线程A锁过期了,但是业务代码还在跑,其他线程B获取到了锁,之后A在B业务运行期间结束了,调用了释放锁,就把B的锁释放掉了。

    针对上面三种情况,提出两种解决方向

    1. 保证锁不会过期,也就是业务跑多久锁一定在跑完之后才会释放。
    2. 过期认为业务出现问题,只要不影响其他线程使用锁即可。

    如果选用第一种解决方式需要用到redis的 expire命令

    简单点的做法就是跑业务的时候开一个线程去调用expire方法给锁续期,简单点来,用个中断标识符就可以。

    1. private static volatile boolean isInterrupt = false;
    2. public List getCategoryNamesWithRedisLock(){
    3. // Set nx
    4. Boolean getLockSuccess = redis.Template.opsForValue().setIfAbsent("Lock", 130, TimeUnit.SECONDS);
    5. // Success get lock
    6. if(getLockSuccess){
    7. try{
    8. isInterrupt = false;
    9. new Thread(() -> {
    10. while(!isInterrupt){
    11. try{
    12. Thread.sleep(2000);
    13. }catch(InterruptedException ex){
    14. Log.error(ex);
    15. }
    16. redisTemplate.expire("lock",10,TimeUnit.SECONDS);
    17. }
    18. }).start();
    19. List names = getCategoryNamesImpl();
    20. }finally {
    21. // unlock
    22. redis.Template.opsForValue().remove("Lock");
    23. isInterrupt = true;
    24. }
    25. return names;
    26. }else{
    27. // Get lock fail, wait for get lock again.
    28. try {
    29. Thread.sleep(200);
    30. } catch (InterruptedException e) {
    31. e.printStackTrace();
    32. }
    33. return getCategoryNamesWithRedisLock();
    34. }
    35. }

     如果业务再跑,续期也会再跑,断电了有过期时间,锁也会释放,正常业务结束,续期线程也就结束了。

    再说第二种不续期的,我们先谈论一下这个策略的合理性,比如我们设置30秒的过期时间,如果这个接口30秒都没有完成,就说明这个接口有问题。但是如果是调用别人的接口,就不能确保了。因此是否续期还是要看具体的业务场景。

    不续期核心问题就是避免释放掉别人的锁,这个很容易想到给Lock的值存储一个线程的唯一标识,最后进行比对,如果是当前线程的标识就说明是当前线程的锁,才会释放,但是,这里又出现了先取值,后比较的操作,这会造成多线程问题,所以我们还是需要原子性的操作,这次没有命令了,不过redis提供了lua的方式来进行原子操作。

    1. String lockValue = redisTemplate.opsForValue().get("lock");
    2. // At this moment, another thread get lock and set Lock uuid(another), still delete another thread lock.
    3. if(uuid.equals(lockValue)){
    4. redisTemplate.delete("lock");
    5. }

    用了Lua的代码。

    1. public List getCategoryNamesWithRedisLock(){
    2. String uuid = UUID.randomUUID().toString();
    3. // Set nx
    4. Boolean getLockSuccess = redis.Template.opsForValue().setIfAbsent("Lock", uuid,30, TimeUnit.SECONDS);
    5. // Success get lock
    6. if(getLockSuccess){
    7. try{
    8. List names = getCategoryNamesImpl();
    9. }finally {
    10. // Atomicity unlock
    11. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    12. Long lock1 = redisTemplate.execute(new DefaultRedisScript(script, Long.class) , Arrays.asList("lock"), uuid);
    13. }
    14. return names;
    15. }else{
    16. // Get lock fail, wait for get lock again.
    17. try {
    18. Thread.sleep(200);
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. return getCategoryNamesWithRedisLock();
    23. }
    24. }

    最后发起一个思考,集群环境下的缓存是否有必要使用分布式锁?

    如果使用synchronized 本地所,三台机器下最多也就是向数据库查询了3下,10台机器就是10下,这对数据库来说没什么压力可言。我们引入分布式锁代码变得复杂,而且性能不好,如果因为编程错误造成死锁,整个集群都会受影响,风险也比较大,分布式锁还是要在特定环境下做好权衡在使用。

    感谢您看到这里。

  • 相关阅读:
    算法和数据结构(b站尚硅谷韩老师教程学习笔记)
    学习笔记-Sysmon
    iterator 迭代器
    kudu 1.4.0 离线安装
    微信小程序多端应用 Donut 多端编译
    【40. 石子合并(区间DP)】
    java计算机毕业设计高校迎新管理系统源码+mysql数据库+系统+lw文档+部署
    【专栏】RPC系列(理论)-动态代理
    【剑指Offer】57.和为s的两个数字
    lc[二叉树]---101.对称二叉树
  • 原文地址:https://blog.csdn.net/h1311454244/article/details/126251607