我们直接在并发编程阶段学习过synchronized lock这些都是单机锁。什么是锁呢? 就是我们有一些互斥资源,不能并行执行,需要一个东西来保证是串行执行的 锁
单机锁的弊端

如图,有3个独立的订单服务,4个用户并发从客端访问,用户12访问订单服务1,由于单机锁,所以可以有一个用户能够进行支付操作,用户3 用户4分别进入订单服务2 订单服务3 也能够进行支付操作,因此我们想让4个用户在一个时间内只有一个用户能够进行支付操作,单机锁就无法做到
所以就引入了分布式锁

分布式锁是把锁放在第三方,而不是在单机服务中,用户1-4 获取锁都到第三方服务中去获取,这样就可以保证分布式服务下的支付操作时是串行执行的了
需求:
a.只能有一个线程能同时执行互斥的资源
b.其它的线程执行的时候,有线程在执行的话,要么等待,要么报错
需要实现这个需求的3个重要条件
1. 有一个标记来标记是不是有线程在执行
2. 这个标记必须可见 保证相互之间这个标记是能拿到最新的结果的 volatile
3. 获取这个标记不能同时抢占成功 安全 (lock中是使用cas机制来保证的)
a. 标记 redis是k-v结构 key作为一个标记,这个key存在的话说明有了标记,key不存在,说明没有标记
b.标记的可见性 单线程命令执行, 必须要set完 你才能get
c. 保证原子性 setnx 单线执行
setnx其实等价于两个命令(exists 和 set),假如我不想setnx,那么流程就等价于:
- if(exists(order:111)==0) //没锁
- {
- set(order:111,1)
- }
但是如果这样写,就会发现一个问题,这两个指令不是原子性的,就会导致同时进入判断中,得到没锁的场景

因此我们应该思考的是怎么保证多个指令的原子性?
redis的开启提交回滚命令分别为multi、exec、discard
但是事务的特点是
1.命令是原子的 在执行事务中的指令时,服务器阻塞,不能执行其他指令2.不能拿到事务中间的指令的结果 来决定后面指令的逻辑。
因此,不能够通过判断而决定后面的逻辑,事务不能够满足我们上述的需求,因而采用Lua脚本
lua语言是一个轻量级的脚本语言,能够完美的融入redis中。
redis调用lua脚本 采用eval指令
预填充键值 如果传入的键是1 返回第一个参数 否则返回第二个参数
127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]end return ARGV[2] " 1 1 'HUIHUI' 'XIAOHUIHUI'"HUIHUI"127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]end return ARGV[2] " 1 2 'HUIHUI' 'XIAOHUIHUI'"XIAOHUIHUI"
既然可以传入键值对,我们能不能在lua脚本里面再去执行redis的指令,并且可以根据返回的结果继续执行后续的逻辑。
判断一个key是不是存在,如果不存在,则调用set的redis指令,在lua脚本调用redis指令,用redis.call
127.0 . 0.1 : 6379 > eval "localkey=redis.call('exists',KEYS[1]) if key==0 then returnredis.call('set',KEYS[1],ARGV[1]) end return 1" 1 nameHUIHUIOK127.0 . 0.1 : 6379 > get name"HUIHUI"
Lua脚本中的指令是原子的,在执行lua脚本期间,其它指令阻塞,必须等待lua脚本的指令执行完毕。 所以单个lua脚本的指令不宜有太多指令。
lua脚本的使用场景
1.需要原子性地执行多个命令2.需要中间值来组合后面的命令3.需要中间值来编排后面的命令
我们使用lua脚本来保证多个指令的原子性,实现我们的分布式锁。
- private static String script = "" +
- "local lockSet = redis.call('exists', KEYS[1])\n"
- + "if lockSet == 0 then\n" +
- "redis.call('set', KEYS[1], ARGV[2])\n" +
- //设置过期时间,防止死锁
- "redis.call('expire', KEYS[1], ARGV[2])\n" +
- "end\n" +
- "return lockSet\n";
加锁方法
- public String pay(Long orderId) {
- try {
- Long lock = (Long)
- redisTemplate.execute(RedisScript.of(script, Long.class),
- Arrays.asList("pay:" + orderId), "1", 30);
- if (lock == 0) {
- //模拟支付业务代码执行10s
- Thread.sleep(10000);
- //处理完业务逻辑删除锁 异常了
- redisTemplate.delete("lock");
- }
- return lock == 0 ? "正常支付完毕" : "请稍等,已经有人在支付!!";
- } catch (Exception exception) {
- redisTemplate.delete("lock" + orderId);
- return "系统异常";
- }
- }
对外接口
- @RequestMapping(value = "/pay",method = RequestMethod.GET)
- @ResponseBody
- public String pay(@RequestParam("id") Long id) throws InterruptedException {
- return productService.pay(id);
- }
思考:
a. 导入redission客户端包
- <dependency>
- <groupId>org.redissongroupId>
- <artifactId>redissonartifactId>
- <version>3.5.4version>
- dependency>
b.初始化RedissionClient,将RedissionClient交给Spring容器管理
- @Bean
- RedissonClientgetRedissonClient(){
- Configconfig=newConfig();
- config.useClusterServers() //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
- .addNodeAddress("redis://192.168.8.127:6381")
- .addNodeAddress("redis://192.168.8.128:6381")
- .addNodeAddress("redis://192.168.8.129:6381")
- .addNodeAddress("redis://192.168.8.127:6380")
- .addNodeAddress("redis://192.168.8.128:6380")
- .addNodeAddress("redis://192.168.8.129:6380");
- return Redisson.create(config);
- }
c. 调用Redission的分布式锁
- public String payRedisson(Long orderId)throws InterruptedException{
- RLockr Lock1=redissonClient.getLock("order_lock"+orderId);
- if(rLock1.tryLock(-1,-1,TimeUnit.SECONDS)){
- rLock1.tryLock(-1,-1,TimeUnit.SECONDS);
- System.out.println("获取锁成功");
- Thread.sleep(500000);
- rLock1.unlock();
- return"处理完成";
- } else {
- System.out.println("获取锁失败");
- return"请稍等,已经有人在支付!!";
- }
d. 暴露接口
- @RequestMapping(value="/pay_lock",method= RequestMethod.GET)
- @ResponseBody
- public String payRedisson(@RequestParam("id") Long id) throws InterruptedException{
- return product Service.payRedisson(id);
- }

RLockr Lock1=redissonClient.getLock("order_lock"+orderId);
主要初始化3个方法: RedissonLock、RedissonBaseLock、RedissonObject
主要是一些任务执行器、大key,UUID以及过期时间、订阅服务的一些初始化
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
- long time = unit.toMillis(waitTime); // 锁等待时间
- long current = System.currentTimeMillis(); // 当前时间
- final long threadId = Thread.currentThread().getId(); // 当前线程
- Long ttl = this.tryAcquire(leaseTime, unit, threadId); // 抢占锁
- if (ttl == null) { // 抢占成功 执行业务逻辑
- return true;
- } else { // 抢占失败的逻辑处理
- time -= System.currentTimeMillis() - current; // 看当前时间是否已经过了锁等待时间
- if (time <= 0L) { // 如果过了 直接失败
- this.acquireFailed(threadId);
- return false;
- } else { // 如果没过锁等待时间
- current = System.currentTimeMillis();
- // 当前线程的订阅 也就是说在锁等待时间内 如果锁释放了 则就进行直接订阅
- final RFuture
subscribeFuture = this.subscribe(threadId); - if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
- // 订阅失败 获取锁失败
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.addListener(new FutureListener
() { - public void operationComplete(Future
future) throws Exception { - if (subscribeFuture.isSuccess()) {
- RedissonLock.this.unsubscribe(subscribeFuture, threadId);
- }
-
- }
- });
- }
-
- this.acquireFailed(threadId);
- return false;
- } else {
- // 表示订阅成功了 就是监听到了锁释放 开始进行抢占锁
- boolean var14;
- try {
- time -= System.currentTimeMillis() - current;
- if (time > 0L) {
- boolean var16;
- do {
- long currentTime = System.currentTimeMillis();
- ttl = this.tryAcquire(leaseTime, unit, threadId);
- if (ttl == null) { // 表示抢占成功
- var16 = true;
- return var16;
- }
-
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0L) { // 已过锁等待时间 抢占失败
- this.acquireFailed(threadId);
- var16 = false;
- return var16;
- }
-
- currentTime = System.currentTimeMillis();
- // 如果没抢到锁,并且也还没到锁的释放时间 使用Semaphore进行阻塞,阻塞时间 : 剩余等待时间ttl< 锁释放时间time? 剩余等待时间: 锁释放时间
- if (ttl >= 0L && ttl < time) {
- this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
-
- time -= System.currentTimeMillis() - currentTime;
- } while(time > 0L);
-
- this.acquireFailed(threadId);
- var16 = false;
- return var16;
- }
-
- this.acquireFailed(threadId);
- var14 = false;
- } finally {
- this.unsubscribe(subscribeFuture, threadId);
- }
-
- return var14;
- }
- }
- }
- }
整个逻辑的实现就是我们4.2的左半部分
下面我们首先看一个Semaphore的demo 知道是怎么进行阻塞的
- Semaphoresemaphore=newSemaphore(0);
- semaphore.release(); //释放semaphore才能后面判断为true if(semaphore.tryAcquire(10,TimeUnit.SECONDS))
- {
- System.out.println("10s过后");
- }
- System.out.println("1");
接着我画了一个上述代码的时间轴图 可帮助理解

tracquire就是重入锁抢占的一个实现,我们主要看一下lua脚本的实现
其中KEYS[1]是我的大key: order_lock111 ARGV[1]=30000 ARGV[2]:UUID+线程ID
- if(redis.call('exists',KEYS[1])==0) //判断我的锁是否存在 =0为不存在没人抢占锁
- then
- redis.call('hincrby',KEYS[1],ARGV[2],1); //把我的小key+1
- redis.call('pexpire',KEYS[1],ARGV[1]); //设置过期时间30s
- eturnnil;
- end;
- if(redis.call('hexists',KEYS[1],ARGV[2])==1) //进入该逻辑说明有线程抢占了锁继续判断是否同一个线程==1为同一线程
- then
- redis.call('hincrby', KEYS[1],ARGV[2],1); 把我的小key+1 代表重入次数
- redis.call('pexpire',KEYS[1],ARGV[1]); //设置过期时间30s
- returnnil;
- end;
- return redis.call('pttl',KEYS[1]); //前面2个if都没进,说明有人抢占并且不是同一线程,直接返回还有多少ms过期
watchDog看门够机制主要是为了解决锁时间到了,但是业务还没执行完,我希望我这个锁一直被我占有,我业务没执行完,就给我续上。怎么做?无非就是开个定时任务,定时的去判断下这个锁还存不存在,如果存在,续期,如果不存在,不续期。只有分布式锁中默认的30s时间才会触发看门狗机制
定时调度任务的方式有很多,比如es-job ssl-job rabbitmq死信队列等分布式的,还有Netty下面的时间轮算法,redis分布式就是采用的时间轮算法。
- public static void main(String[] args) {
- Timer timer = new HashedWheelTimer(); // 创建一个HashedWheelTimer hash轮定时器
- // 提交一个任务 让它在10s后执行
- timer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- System.out.println("10s 后执行该任务1");
- }
- }, 10, TimeUnit.SECONDS);
- // 再提交一个任务,让它在20s后执行
- timer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- System.out.println("20s 后执行该任务1");
- }
- }, 20, TimeUnit.SECONDS);
-
- // 提交一个任务 让它在10s后执行
- timer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- System.out.println("5s 后执行该任务");
- }
- }, 5, TimeUnit.SECONDS);
- }
执行结果:

可以看到 hash轮定时器是只会延时执行一次。
有家足浴店,他们的技师是这么安排的!总共有 8 个技师,分别是技师 1.2.3.4.5.6.7.8 号。技师 1 :负责 0-3 点的顾客技师 2 :负责 3-6 点的顾客技师 3 :负责 6-9 点的顾客技师 4 :负责 9-12 点的顾客技师 5 :负责 12-15 点的顾客技师 6 :负责 15-18 点的顾客技师 7 :负责 18-21 点的顾客技师 8 :负责 21-24 点的顾客那么,假如哪天放假,现在是早上 6 点,我想 9 个小时候去按摩,会是哪个技师为我服务?早点 6 点,一天已经过了 6 个小时,那么我服务的绝对时间是 6+9=15 个小时之后。,每个技师是3 小时,那么前面总共要 5 个技师,到了 15 以后就得技师6 来负责。那么,哪天晚上我累了,我想去按摩,假如现在是 21 点,我想预约 6 个小时后的按摩,就是6 个小时后我需要按摩,那么哪个技师为我按摩?技师 2 我们已知一天会有 8 个技师轮询,一天已经过了 21 点,而我是 6个小 时,那么假如从按摩店的角度去看的话,我要执行的决定时间 21+6 也就是 27 小时, 1 个技师的服务时间是 3 个小时,所以我前面 27 个小时需要 9 个技 师服务,那么你是 27 小时候,就得第 10 个技师,因为是每天轮询的,所以 第十个是技师 2 。
时间轮算法流程图: https://www.processon.com/view/link/62b2d1751e08530728b1f954
在抢占锁的时候,如果抢占成功,才会触发看门狗机制


我们主要看newTimeout部分
- Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- public void run(Timeout timeout) throws Exception {
- RFuture
future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)}); - future.addListener(new FutureListener
() { - public void operationComplete(Future
future) throws Exception { - RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
- if (!future.isSuccess()) {
- RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
- } else {
- if ((Boolean)future.getNow()) {
- RedissonLock.this.scheduleExpirationRenewal(threadId);
- }
-
- }
- }
- });
- }
- }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
run方法里面为任务,this.internalLockLeaseTime / 3L 代表是多长时间执行1次,等于10s,future中的任务是判断主线程中的业务key是否需要续期,并且重置为默认时间30s; 也就是说,10s会去监听一下执行该延时任务,如果该业务key还在,就续期。那么该延时任务究竟是怎么执行的呢?

newTimeout源码 执行start方法,该start方法会另起一个守护线程,执行我们的task

守护线程的真正执行方法在Work的run方法中

- public void run() {
- // 得到当前的系统时间
- HashedWheelTimer.this.startTime = System.nanoTime();
- if (HashedWheelTimer.this.startTime == 0L) {
- HashedWheelTimer.this.startTime = 1L;
- }
- // 得到了当前的系统时间后,主线程可以正常执行
- HashedWheelTimer.this.startTimeInitialized.countDown();
-
- int idx;
- HashedWheelBucket bucket;
- do {
- // 获取指针下一次走到的时间,如果没到下一次时间,就等待
- long deadline = this.waitForNextTick();
- if (deadline > 0L) {
- // 根据tick与轮的大小取模 得到当前tick所在的环的下标
- idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
- // 移除已经取消了的任务
- this.processCancelledTasks();
- // 根据idx下标 得到轮的hash桶
- bucket = HashedWheelTimer.this.wheel[idx];
- // 将队列的任务放到相关的hash桶,如果一个数组有多个任务,则采用链表的形式
- this.transferTimeoutsToBuckets();
- // 去执行hash桶下的任务
- bucket.expireTimeouts(deadline);
- ++this.tick;
- }
- } while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
-
- HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
- int var2 = var5.length;
-
- for(idx = 0; idx < var2; ++idx) {
- bucket = var5[idx];
- bucket.clearTimeouts(this.unprocessedTimeouts);
- }
-
- while(true) {
- HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
- if (timeout == null) {
- this.processCancelledTasks();
- return;
- }
-
- if (!timeout.isCancelled()) {
- this.unprocessedTimeouts.add(timeout);
- }
- }
- }
- private long waitForNextTick() {
- // 100ms走一次,得到下一次得deadLine
- long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);
-
- while(true) {
- // 得到当前时间跟开始时间的差值(时间轮走了多少时间)
- long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
- //sleepTimeMs如果大于0,说明还没走到指针该走的时间,睡眠
- long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
- //如果到了时间,直接返回
- if (sleepTimeMs <= 0L) {
- if (currentTime == Long.MIN_VALUE) {
- return -9223372036854775807L;
- }
-
- return currentTime;
- }
-
- if (PlatformDependent.isWindows()) {
- sleepTimeMs = sleepTimeMs / 10L * 10L;
- if (sleepTimeMs == 0L) {
- sleepTimeMs = 1L;
- }
- }
-
- try {
- //如果没到指针时间,睡眠
- Thread.sleep(sleepTimeMs);
- } catch (InterruptedException var8) {
- if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
- return Long.MIN_VALUE;
- }
- }
- }
- }
- private void transferTimeoutsToBuckets() {
- //从队列中获取任务,每次最多只能获取100000次
- for(int i = 0; i < 100000; ++i) {
- //从队列中获取task,任务会在主线程中添加到队列
- HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
- if (timeout == null) {
- break;
- }
-
- if (timeout.state() != 1) {
- // 任务的deadline/每次指针时间
- // 从第一个starttime开始得到需要多少指针
- long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
- /轮数 默认512的环的大小,如果大于,则需要在第二次环的时候才会执行
- timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
- // 取2个的最大值,确保不会把时间安排在过去
- long ticks = Math.max(calculated, this.tick);
- //取模得到我的指针跳到哪个hash环执行
- int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
- //放到对应的hashWheeld的下标中
- HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
- //如果这个hash下标中有值,则添加至链表后面
- bucket.addTimeout(timeout);
- }
- }
-
- }
- public void expireTimeouts(long deadline) {
- HashedWheelTimeout next;
- // 循环hash环中链表
- for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
- next = timeout.next;
- // 是不是当前轮应该执行的任务
- if (timeout.remainingRounds <= 0L) {
- /从链表清除
- next = this.remove(timeout);
- // 判断是否到了执行时间
- if (timeout.deadline > deadline) {
- throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
- }
- //调用timeout.expire方法,执行task
- timeout.expire();
- } else if (timeout.isCancelled()) {
- next = this.remove(timeout);
- } else {
- --timeout.remainingRounds;
- }
- }
-
- }
expire执行task任务

KEYS[1] 为大 key KEYS[2] 为加锁的时候订阅的 channelNameARGV[1]=UNLOCK_MESSAGE , ARGV[2]=释放时间 ARGV[3]= 重入的线程信息
- if(redis.call('hexists',KEYS[1],ARGV[3])==0) //如果小key不存在 返回空
- then
- return nil;
- end;
- local counter=redis.call('hincrby',KEYS[1],ARGV[3],-1); //如果存在 可重入次数-1
- if(counter>0) //如果大于0,说明加锁次数大于释放锁次数,不能释放锁
- then redis.call('pexpire',KEYS[1],ARGV[2]); //设置锁的过期时间
- return0;//返回0
- else
- redis.call('del',KEYS[1]); //删除大key
- redis.call('publish',KEYS[2],ARGV[1]);//往订阅的频道发送message,发送UNLOCK_MESSAGE
- return1;
- end;
- returnnil;
Redis做分布式遇到的问题?
因为 Redis 属于 CAP 中的 AP, 为了优先保证高可用性,所以会牺牲一定的数据一致性。比如主从方案中,如果主库挂的话,从库是不管数据有没有同步完主库的数据,都会自动升级主。那么这样就会出现一种情况:加锁返回是成功的,但是由于发生了主库挂的,从库切换的时候,没有同步到这个锁,从而导致锁失效。那么这个怎么解决?解决不了,能做的是我尽可能的去减少这种情况!怎么解决?既然你一个主从实例,会导致锁丢失,那么假如说我把这种风险分担,同时在5 台机器加锁,这 5 台机器只要有一半以上的锁能成功获取,这样就尽可能的减少了锁丢失的场景。就相当于你家的门钥匙,假如钥匙只有一把,你丢了就丢了,但是把这个钥匙给你们家里面所有的人,那么你丢了家门还是能进去。因为家里其他人还有。
实现方案: 给你同时往多个主节点+同一个锁 只要超过一半的节点能够去加锁成功,就成功