• Redis实现并发阻塞锁方案


    由于用户同时访问线上的下订单接口,导致在扣减库存时出现了异常,这是一个很典型的并发问题,本篇文章为解决并发问题而生,采用的技术为Redis锁机制+多线程的阻塞唤醒方法。

    在实现Redis锁机制之前,我们需要了解一下前置知识。

    一、前置知识

    1、多线程

    将wait()、notifyAll()归为到多线程的方法中略有一些不恰当,这两个方法是Object中的方法。

    ① 当调用了wait()方法后,让当前线程进入等待状态,并且让当前线程释放对象锁,等待既为阻塞状态,等待notifyAll()方法的唤醒。

    wait()方法和sleep()方法有一些相似之处,都是使当前线程阻塞,但他们实际是有一些区别的。

    1. 执行wait() 方法之前需要请求锁,wait()方法执行的时候会释放锁,等待被唤醒的时候竞争锁。
    2. sleep()只是让当前线程休眠一段时间,无视锁的存在。
    3. wait() 是Object类的方法 sleep()是Thread的静态方法

    ② notifyAll()方法为唤醒wait()中的线程。

    notifyAll() 和 notify() 方法都是可以唤醒调用了wait()方法,而陷入阻塞的线程。

    但是notify()是随机唤醒这个阻塞队列中随机的一个线程,而notifyAll()是唤醒所用的调用了wait()方法而陷入阻塞的线程,让他们自己去抢占对象锁。

    notifyAll() 和 notify() 也都是必须在加锁的同步代码块中被调用,它们起的是唤醒的作用,不是释放锁的作用,只用在当前同步代码块中的程序执行完,也就是对象锁自然释放了,notifyAll() 和 notify()方法才会起作用,去唤醒线程。

    wait()方法一般是和notify() 或者 notifyAll() 方法一起连用的。

    以上为掌握本篇博客必备的多线程知识,如果系统学习多线程的相关知识可查阅博客 程序员田同学

    2、Redis

    加锁的过程本质上就是往Redis中set值,当别的进程也来set值时候,发现里面已经有值了,就只能放弃获取稍后再试。

    Redis提供了一个天然实现锁机制的方法。

    在Redis客户端的命令为 setnx(set if not exists) 
    

    在集成Springboot中采用的方法为:

    redisTemplate.opsForValue().setIfAbsent(key, value);
    

    如果里面set值成功会返回True,如果里面已经存在值就会返回False。

    在我们实际使用的时候,setIfAbsent()方法并不是总是返回True和False。

    如果我们的业务中加了事务,该方法会返回null,不知道这是一个bug还是什么,这是Redis的一个巨坑,浪费了很长时间才发现了这个问题,如果解决此问题可以跳转到第四章。

    二、实现原理

    分布式锁本质上要实现的目标就是在 Redis 里面占一个位置,当别的进程也要来占时,发现已经有人占在那里了,就只好放弃或者稍后再试。占位一般是使用 setnx(set if not exists) 指令,只允许被一个客户端占位。先来先占, 事办完了,再调用 del 指令释放茅坑。

    其中,发现Redis中已经有值了,当前线程是直接放弃还是稍后再试分别就代表着,非阻塞锁和阻塞锁。

    在我们的业务场景中肯定是要稍后再试(阻塞锁),如果是直接放弃(非阻塞锁)在数据库层面就可以直接做,就不需要我们在代码大费周章了。

    非阻塞锁只能保存数据的正确性,在高并发的情况下会抛出大量的异常,当一百个并发请求到来时,只有一个请求成功,其他均会抛出异常。

    Redis非阻塞锁和 MySQL的乐观锁,最终达到的效果是一样的,乐观锁是采用CAS的思想。

    乐观锁方法:表字段 加一个版本号,或者别的字段也可以!加版本号,可以知道控制顺序而已!在update 的时候可以where后面加上version= oldVersion。数据库,在任何并发的情况下,update 成功就是 1 失败就是 0 .可以根据返回的 1 ,0 做相应的处理!

    我们更推荐大家使用阻塞锁的方式。

    当获取不到锁时候,我们让当前线程使用wait()方法唤醒,当持有锁的线程使用完成后,调用notifyAll()唤醒所有等待的方法。

    三、具体实现

    以下代码为阻塞锁的实现方式。

    业务层:

        public String test() throws InterruptedException {
    
            lock("lockKey");
            System.out.println("11");
            System.out.println("22");
            System.out.println(Thread.currentThread().getName()+"***********");
            Thread.sleep(2000);
            System.out.println("33");
            System.out.println("44");
            System.out.println("55");
            unlock("lockKey");
            return "String";
        }
    

    锁的工具类:

    主要是加锁和解锁的两个方法。

     //每一个redis的key对应一个阻塞对象
        private static HashMap<String, Object> blockers = new HashMap<>();
    
        //当前获得锁的线程
        private static Thread curThread;
    
        public static RedisTemplate redisTemplate = (RedisTemplate) SpringUtils.getBean("redisTemplate") ;
    
        /**
         * 加锁
         * @param key
         * @throws InterruptedException
         */
    
        public static void lock(String key) {
            //循环判断是否能够创建key, 不能则直接wait释放CPU执行权
    
            //放不进指说明锁正在被占用
            System.out.println(key+"**");
    
            while (!RedisUtil.setLock(key,"1",3)){
    
                synchronized (key) {
    
                    blockers.put(key, key);
                    //wait释放CPU执行权
                    try {
                        key.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            blockers.put(key, key);
            //能够成功创建,获取锁成功记录当前获取锁线程
            curThread = Thread.currentThread();
        }
    
        /**
         * 解锁
         * @param key
         */
        public static void unlock(String key) {
            //判断是否为加锁的线程执行解锁, 不是则直接忽略
            if( curThread == Thread.currentThread()) {
                RedisUtil.delete(key);
                //删除key之后需要notifyAll所有的应用, 所以这里采用发订阅消息给所有的应用
              //  RedisUtil.publish("lock", key);
    
                //notifllall其他线程
                Object lock = blockers.get(key);
                if(lock != null) {
                    synchronized (lock) {
                        lock.notifyAll();
                    }
                }
    
            }
        }
    

    当我们在不加锁时候,使用接口测试工具测试时,12345并不能都是顺序执行的,会造成输出顺序不一致,如果是在我们的实际场景中,这是输入换成了数据库的select和update,数据出现错乱也是很正常的情况了。

    当我们加上锁以后,12345都是顺序输出,并发问题顺利解决了。

    四、附录

    1、Redis存在的bug

    本来lock()方法是直接调用 "Redis.setIfAbsent()" 方法,但是在使用时候一直报空指针异常,最终定位问题为Redis.setIfAbsent()方法存在问题。

    在我的实际业务中,下订单的方法使用了@Transflastion增加了事务,导致该方法返回null,我们手写一个实现setIfAbsent()的作用。

     /**
         * 只有key不存在时,才设置值, 返回true, 否则返回false
         *
         * @param key     key 不能为null
         * @param value   value 不能为null
         * @param timeout 过期时长, 单位为妙
         * @return
         */
        public static Boolean setLock(String key,String value, long timeout) {
    
            SessionCallback<Boolean> sessionCallback = new SessionCallback<Boolean>() {
                List<Object> exec = null;
                @Override
                @SuppressWarnings("unchecked")
                public Boolean execute(RedisOperations operations) throws DataAccessException {
                    operations.multi();
    
                    redisTemplate.opsForValue().setIfAbsent(key, value);
                    redisTemplate.expire(key,timeout, TimeUnit.SECONDS);
    
                    exec = operations.exec();
    
                    if(exec.size() > 0) {
                        return (Boolean) exec.get(0);
                    }
                    return false;
                }
            };
            return (Boolean) redisTemplate.execute(sessionCallback);
        }
    

    方便对比,以下贴上原本的setIfAbsent()方法。

     /**
       * 只有key不存在时,才设置值, 返回true, 否则返回false [警告:事务或者管道情况下会报错-可使用 setLock方法]
       *
       * @param key     key 不能为null
       * @param value   value 不能为null
       * @param timeout 过期时长, 单位为妙
       * @return
       */
      @Deprecated
      public static  Boolean setIfAbsent(String key, T value, long timeout) {
    
         // redisTemplate.multi();
          ValueOperations<String, T> valueOperations = redisTemplate.opsForValue();
          Boolean aBoolean = valueOperations.setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
         // redisTemplate.exec();
        return aBoolean;
      }
    

    2、MySQL的锁机制

    在并发场景下MySQL会报错,报错信息如下:

    ### Cause: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
    ; SQL []; Lock wait timeout exceeded; try restarting transaction; nested exception is com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
    

    问题出现的原因是,某一种表频繁被锁表,导致另外一个事务超时,出现问题的原因是MySQL的机制。

    MySQL更新时如果where字段存在索引会使用行锁,否则会使用表锁。

    我们使用navichat在where字段上加上索引,问题顺利的迎刃而解。

  • 相关阅读:
    systemctl服务部署错误:code=exited, status=217/USER
    JDK动态代理与Cglib动态代理使用详解
    什么是粘包和拆包,Netty如何解决粘包拆包?
    使用phpMyAdmin管理WordPress数据库入门指南
    C语言的冒泡排序
    【校招VIP】高校大学生自己的商业项目|产品脑图的重要性:活动模型的细节分析
    Element-UI el-select下拉框多选实现全选
    Paxos 学习笔记2 - Multi-Paxos
    Spring中的事务简介说明
    Scala的字符串插值
  • 原文地址:https://www.cnblogs.com/tianClassmate/p/16203042.html